remove local telegraf plugins.inputs (#563)

This commit is contained in:
yubo 2021-01-29 23:49:48 +08:00 committed by GitHub
parent 3df2536bb6
commit 3754e0cbe3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
27 changed files with 85 additions and 5492 deletions

1
go.mod
View File

@ -14,7 +14,6 @@ require (
github.com/gin-contrib/pprof v1.3.0
github.com/gin-gonic/gin v1.6.3
github.com/go-sql-driver/mysql v1.5.0
github.com/google/go-github/v32 v32.1.0
github.com/google/uuid v1.1.2-0.20190416172445-c2e93f3ae59f
github.com/gorilla/mux v1.7.3
github.com/hashicorp/golang-lru v0.5.4

View File

@ -5,9 +5,10 @@ import (
"time"
"github.com/didi/nightingale/src/modules/monapi/collector"
"github.com/didi/nightingale/src/modules/monapi/plugins/github/github"
"github.com/didi/nightingale/src/modules/monapi/plugins"
"github.com/didi/nightingale/src/toolkits/i18n"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/inputs/github"
)
func init() {
@ -64,10 +65,15 @@ func (p *GitHubRule) TelegrafInput() (telegraf.Input, error) {
return nil, err
}
return &github.GitHub{
input := &github.GitHub{
Repositories: p.Repositories,
AccessToken: p.AccessToken,
EnterpriseBaseURL: p.EnterpriseBaseURL,
HTTPTimeout: time.Second * time.Duration(p.HTTPTimeout),
}, nil
}
if err := plugins.SetValue(&input.HTTPTimeout.Duration, time.Second*time.Duration(p.HTTPTimeout)); err != nil {
return nil, err
}
return input, nil
}

View File

@ -1,64 +0,0 @@
# GitHub Input Plugin
Gather repository information from [GitHub][] hosted repositories.
**Note:** Telegraf also contains the [webhook][] input which can be used as an
alternative method for collecting repository information.
### Configuration
```toml
[[inputs.github]]
## List of repositories to monitor
repositories = [
"influxdata/telegraf",
"influxdata/influxdb"
]
## Github API access token. Unauthenticated requests are limited to 60 per hour.
# access_token = ""
## Github API enterprise url. Github Enterprise accounts must specify their base url.
# enterprise_base_url = ""
## Timeout for HTTP requests.
# http_timeout = "5s"
```
### Metrics
- github_repository
- tags:
- name - The repository name
- owner - The owner of the repository
- language - The primary language of the repository
- license - The license set for the repository
- fields:
- forks (int)
- open_issues (int)
- networks (int)
- size (int)
- subscribers (int)
- stars (int)
- watchers (int)
When the [internal][] input is enabled:
+ internal_github
- tags:
- access_token - An obfuscated reference to the configured access token or "Unauthenticated"
- fields:
- limit - How many requests you are limited to (per hour)
- remaining - How many requests you have remaining (per hour)
- blocks - How many requests have been blocked due to rate limit
### Example Output
```
github_repository,language=Go,license=MIT\ License,name=telegraf,owner=influxdata forks=2679i,networks=2679i,open_issues=794i,size=23263i,stars=7091i,subscribers=316i,watchers=7091i 1563901372000000000
internal_github,access_token=Unauthenticated rate_limit_remaining=59i,rate_limit_limit=60i,rate_limit_blocks=0i 1552653551000000000
```
[GitHub]: https://www.github.com
[internal]: /plugins/inputs/internal
[webhook]: /plugins/inputs/webhooks/github

View File

@ -1,200 +0,0 @@
package github
import (
"context"
"fmt"
"net/http"
"strings"
"sync"
"time"
"github.com/google/go-github/v32/github"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/selfstat"
"golang.org/x/oauth2"
)
// GitHub - plugin main structure
type GitHub struct {
Repositories []string `toml:"repositories"`
AccessToken string `toml:"access_token"`
EnterpriseBaseURL string `toml:"enterprise_base_url"`
HTTPTimeout time.Duration `toml:"http_timeout"`
githubClient *github.Client
obfuscatedToken string
RateLimit selfstat.Stat
RateLimitErrors selfstat.Stat
RateRemaining selfstat.Stat
}
const sampleConfig = `
## List of repositories to monitor.
repositories = [
"influxdata/telegraf",
"influxdata/influxdb"
]
## Github API access token. Unauthenticated requests are limited to 60 per hour.
# access_token = ""
## Github API enterprise url. Github Enterprise accounts must specify their base url.
# enterprise_base_url = ""
## Timeout for HTTP requests.
# http_timeout = "5s"
`
// SampleConfig returns sample configuration for this plugin.
func (g *GitHub) SampleConfig() string {
return sampleConfig
}
// Description returns the plugin description.
func (g *GitHub) Description() string {
return "Gather repository information from GitHub hosted repositories."
}
// Create GitHub Client
func (g *GitHub) createGitHubClient(ctx context.Context) (*github.Client, error) {
httpClient := &http.Client{
Transport: &http.Transport{
Proxy: http.ProxyFromEnvironment,
},
Timeout: g.HTTPTimeout,
}
g.obfuscatedToken = "Unauthenticated"
if g.AccessToken != "" {
tokenSource := oauth2.StaticTokenSource(
&oauth2.Token{AccessToken: g.AccessToken},
)
oauthClient := oauth2.NewClient(ctx, tokenSource)
_ = context.WithValue(ctx, oauth2.HTTPClient, oauthClient)
g.obfuscatedToken = g.AccessToken[0:4] + "..." + g.AccessToken[len(g.AccessToken)-3:]
return g.newGithubClient(oauthClient)
}
return g.newGithubClient(httpClient)
}
func (g *GitHub) newGithubClient(httpClient *http.Client) (*github.Client, error) {
if g.EnterpriseBaseURL != "" {
return github.NewEnterpriseClient(g.EnterpriseBaseURL, "", httpClient)
}
return github.NewClient(httpClient), nil
}
// Gather GitHub Metrics
func (g *GitHub) Gather(acc telegraf.Accumulator) error {
ctx := context.Background()
if g.githubClient == nil {
githubClient, err := g.createGitHubClient(ctx)
if err != nil {
return err
}
g.githubClient = githubClient
tokenTags := map[string]string{
"access_token": g.obfuscatedToken,
}
g.RateLimitErrors = selfstat.Register("github", "rate_limit_blocks", tokenTags)
g.RateLimit = selfstat.Register("github", "rate_limit_limit", tokenTags)
g.RateRemaining = selfstat.Register("github", "rate_limit_remaining", tokenTags)
}
var wg sync.WaitGroup
wg.Add(len(g.Repositories))
for _, repository := range g.Repositories {
go func(repositoryName string, acc telegraf.Accumulator) {
defer wg.Done()
owner, repository, err := splitRepositoryName(repositoryName)
if err != nil {
acc.AddError(err)
return
}
repositoryInfo, response, err := g.githubClient.Repositories.Get(ctx, owner, repository)
if _, ok := err.(*github.RateLimitError); ok {
g.RateLimitErrors.Incr(1)
}
if err != nil {
acc.AddError(err)
return
}
g.RateLimit.Set(int64(response.Rate.Limit))
g.RateRemaining.Set(int64(response.Rate.Remaining))
now := time.Now()
tags := getTags(repositoryInfo)
fields := getFields(repositoryInfo)
acc.AddFields("github_repository", fields, tags, now)
}(repository, acc)
}
wg.Wait()
return nil
}
func splitRepositoryName(repositoryName string) (string, string, error) {
splits := strings.SplitN(repositoryName, "/", 2)
if len(splits) != 2 {
return "", "", fmt.Errorf("%v is not of format 'owner/repository'", repositoryName)
}
return splits[0], splits[1], nil
}
func getLicense(rI *github.Repository) string {
if licenseName := rI.GetLicense().GetName(); licenseName != "" {
return licenseName
}
return "None"
}
func getTags(repositoryInfo *github.Repository) map[string]string {
return map[string]string{
"owner": repositoryInfo.GetOwner().GetLogin(),
"name": repositoryInfo.GetName(),
"language": repositoryInfo.GetLanguage(),
"license": getLicense(repositoryInfo),
}
}
func getFields(repositoryInfo *github.Repository) map[string]interface{} {
return map[string]interface{}{
"stars": repositoryInfo.GetStargazersCount(),
"subscribers": repositoryInfo.GetSubscribersCount(),
"watchers": repositoryInfo.GetWatchersCount(),
"networks": repositoryInfo.GetNetworkCount(),
"forks": repositoryInfo.GetForksCount(),
"open_issues": repositoryInfo.GetOpenIssuesCount(),
"size": repositoryInfo.GetSize(),
}
}
/*
func init() {
inputs.Add("github", func() telegraf.Input {
return &GitHub{
HTTPTimeout: internal.Duration{Duration: time.Second * 5},
}
})
}
*/

View File

@ -1,140 +0,0 @@
package github
import (
"net/http"
"reflect"
"testing"
gh "github.com/google/go-github/v32/github"
"github.com/stretchr/testify/require"
)
func TestNewGithubClient(t *testing.T) {
httpClient := &http.Client{}
g := &GitHub{}
client, err := g.newGithubClient(httpClient)
require.NoError(t, err)
require.Contains(t, client.BaseURL.String(), "api.github.com")
g.EnterpriseBaseURL = "api.example.com/"
enterpriseClient, err := g.newGithubClient(httpClient)
require.NoError(t, err)
require.Contains(t, enterpriseClient.BaseURL.String(), "api.example.com")
}
func TestSplitRepositoryNameWithWorkingExample(t *testing.T) {
var validRepositoryNames = []struct {
fullName string
owner string
repository string
}{
{"influxdata/telegraf", "influxdata", "telegraf"},
{"influxdata/influxdb", "influxdata", "influxdb"},
{"rawkode/saltstack-dotfiles", "rawkode", "saltstack-dotfiles"},
}
for _, tt := range validRepositoryNames {
t.Run(tt.fullName, func(t *testing.T) {
owner, repository, _ := splitRepositoryName(tt.fullName)
require.Equal(t, tt.owner, owner)
require.Equal(t, tt.repository, repository)
})
}
}
func TestSplitRepositoryNameWithNoSlash(t *testing.T) {
var invalidRepositoryNames = []string{
"influxdata-influxdb",
}
for _, tt := range invalidRepositoryNames {
t.Run(tt, func(t *testing.T) {
_, _, err := splitRepositoryName(tt)
require.Error(t, err)
})
}
}
func TestGetLicenseWhenExists(t *testing.T) {
licenseName := "MIT"
license := gh.License{Name: &licenseName}
repository := gh.Repository{License: &license}
getLicenseReturn := getLicense(&repository)
require.Equal(t, "MIT", getLicenseReturn)
}
func TestGetLicenseWhenMissing(t *testing.T) {
repository := gh.Repository{}
getLicenseReturn := getLicense(&repository)
require.Equal(t, "None", getLicenseReturn)
}
func TestGetTags(t *testing.T) {
licenseName := "MIT"
license := gh.License{Name: &licenseName}
ownerName := "influxdata"
owner := gh.User{Login: &ownerName}
fullName := "influxdata/influxdb"
repositoryName := "influxdb"
language := "Go"
repository := gh.Repository{
FullName: &fullName,
Name: &repositoryName,
License: &license,
Owner: &owner,
Language: &language,
}
getTagsReturn := getTags(&repository)
correctTagsReturn := map[string]string{
"owner": ownerName,
"name": repositoryName,
"language": language,
"license": licenseName,
}
require.Equal(t, true, reflect.DeepEqual(getTagsReturn, correctTagsReturn))
}
func TestGetFields(t *testing.T) {
stars := 1
forks := 2
openIssues := 3
size := 4
subscribers := 5
watchers := 6
repository := gh.Repository{
StargazersCount: &stars,
ForksCount: &forks,
OpenIssuesCount: &openIssues,
Size: &size,
NetworkCount: &forks,
SubscribersCount: &subscribers,
WatchersCount: &watchers,
}
getFieldsReturn := getFields(&repository)
correctFieldReturn := make(map[string]interface{})
correctFieldReturn["stars"] = 1
correctFieldReturn["forks"] = 2
correctFieldReturn["networks"] = 2
correctFieldReturn["open_issues"] = 3
correctFieldReturn["size"] = 4
correctFieldReturn["subscribers"] = 5
correctFieldReturn["watchers"] = 6
require.Equal(t, true, reflect.DeepEqual(getFieldsReturn, correctFieldReturn))
}

View File

@ -2,12 +2,14 @@ package mongodb
import (
"fmt"
"reflect"
"unsafe"
"github.com/didi/nightingale/src/modules/monapi/collector"
"github.com/didi/nightingale/src/modules/monapi/plugins"
"github.com/didi/nightingale/src/modules/monapi/plugins/mongodb/mongodb"
"github.com/didi/nightingale/src/toolkits/i18n"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/inputs/mongodb"
)
func init() {
@ -66,14 +68,19 @@ func (p *MongodbRule) TelegrafInput() (telegraf.Input, error) {
return nil, err
}
return &mongodb.MongoDB{
input := &mongodb.MongoDB{
Servers: p.Servers,
Mongos: make(map[string]*mongodb.Server),
GatherClusterStatus: p.GatherClusterStatus,
GatherPerdbStats: p.GatherPerdbStats,
GatherColStats: p.GatherColStats,
ColStatsDbs: p.ColStatsDbs,
Log: plugins.GetLogger(),
ClientConfig: p.ClientConfig.TlsClientConfig(),
}, nil
}
rv := reflect.Indirect(reflect.ValueOf(input)).FieldByName("mongos")
ptr := (*map[string]*mongodb.Server)(unsafe.Pointer(rv.UnsafeAddr()))
*ptr = make(map[string]*mongodb.Server)
return input, nil
}

View File

@ -1,275 +0,0 @@
# MongoDB Input Plugin
### Configuration:
```toml
[[inputs.mongodb]]
## An array of URLs of the form:
## "mongodb://" [user ":" pass "@"] host [ ":" port]
## For example:
## mongodb://user:auth_key@10.10.3.30:27017,
## mongodb://10.10.3.33:18832,
servers = ["mongodb://127.0.0.1:27017"]
## When true, collect cluster status.
## Note that the query that counts jumbo chunks triggers a COLLSCAN, which
## may have an impact on performance.
# gather_cluster_status = true
## When true, collect per database stats
# gather_perdb_stats = false
## When true, collect per collection stats
# gather_col_stats = false
## List of db where collections stats are collected
## If empty, all db are concerned
# col_stats_dbs = ["local"]
## Optional TLS Config
# tls_ca = "/etc/telegraf/ca.pem"
# tls_cert = "/etc/telegraf/cert.pem"
# tls_key = "/etc/telegraf/key.pem"
## Use TLS but skip chain & host verification
# insecure_skip_verify = false
```
#### Permissions:
If your MongoDB instance has access control enabled you will need to connect
as a user with sufficient rights.
With MongoDB 3.4 and higher, the `clusterMonitor` role can be used. In
version 3.2 you may also need these additional permissions:
```
> db.grantRolesToUser("user", [{role: "read", actions: "find", db: "local"}])
```
If the user is missing required privileges you may see an error in the
Telegraf logs similar to:
```
Error in input [mongodb]: not authorized on admin to execute command { serverStatus: 1, recordStats: 0 }
```
Some permission related errors are logged at debug level, you can check these
messages by setting `debug = true` in the agent section of the configuration or
by running Telegraf with the `--debug` argument.
### Metrics:
- mongodb
- tags:
- hostname
- node_type
- rs_name
- fields:
- active_reads (integer)
- active_writes (integer)
- aggregate_command_failed (integer)
- aggregate_command_total (integer)
- assert_msg (integer)
- assert_regular (integer)
- assert_rollovers (integer)
- assert_user (integer)
- assert_warning (integer)
- available_reads (integer)
- available_writes (integer)
- commands (integer)
- connections_available (integer)
- connections_current (integer)
- connections_total_created (integer)
- count_command_failed (integer)
- count_command_total (integer)
- cursor_no_timeout_count (integer)
- cursor_pinned_count (integer)
- cursor_timed_out_count (integer)
- cursor_total_count (integer)
- delete_command_failed (integer)
- delete_command_total (integer)
- deletes (integer)
- distinct_command_failed (integer)
- distinct_command_total (integer)
- document_deleted (integer)
- document_inserted (integer)
- document_returned (integer)
- document_updated (integer)
- find_and_modify_command_failed (integer)
- find_and_modify_command_total (integer)
- find_command_failed (integer)
- find_command_total (integer)
- flushes (integer)
- flushes_total_time_ns (integer)
- get_more_command_failed (integer)
- get_more_command_total (integer)
- getmores (integer)
- insert_command_failed (integer)
- insert_command_total (integer)
- inserts (integer)
- jumbo_chunks (integer)
- latency_commands_count (integer)
- latency_commands (integer)
- latency_reads_count (integer)
- latency_reads (integer)
- latency_writes_count (integer)
- latency_writes (integer)
- member_status (string)
- net_in_bytes_count (integer)
- net_out_bytes_count (integer)
- open_connections (integer)
- operation_scan_and_order (integer)
- operation_write_conflicts (integer)
- page_faults (integer)
- percent_cache_dirty (float)
- percent_cache_used (float)
- queries (integer)
- queued_reads (integer)
- queued_writes (integer)
- repl_apply_batches_num (integer)
- repl_apply_batches_total_millis (integer)
- repl_apply_ops (integer)
- repl_buffer_count (integer)
- repl_buffer_size_bytes (integer)
- repl_commands (integer)
- repl_deletes (integer)
- repl_executor_pool_in_progress_count (integer)
- repl_executor_queues_network_in_progress (integer)
- repl_executor_queues_sleepers (integer)
- repl_executor_unsignaled_events (integer)
- repl_getmores (integer)
- repl_inserts (integer)
- repl_lag (integer)
- repl_network_bytes (integer)
- repl_network_getmores_num (integer)
- repl_network_getmores_total_millis (integer)
- repl_network_ops (integer)
- repl_queries (integer)
- repl_updates (integer)
- repl_oplog_window_sec (integer)
- repl_state (integer)
- resident_megabytes (integer)
- state (string)
- storage_freelist_search_bucket_exhausted (integer)
- storage_freelist_search_requests (integer)
- storage_freelist_search_scanned (integer)
- tcmalloc_central_cache_free_bytes (integer)
- tcmalloc_current_allocated_bytes (integer)
- tcmalloc_current_total_thread_cache_bytes (integer)
- tcmalloc_heap_size (integer)
- tcmalloc_max_total_thread_cache_bytes (integer)
- tcmalloc_pageheap_commit_count (integer)
- tcmalloc_pageheap_committed_bytes (integer)
- tcmalloc_pageheap_decommit_count (integer)
- tcmalloc_pageheap_free_bytes (integer)
- tcmalloc_pageheap_reserve_count (integer)
- tcmalloc_pageheap_scavenge_count (integer)
- tcmalloc_pageheap_total_commit_bytes (integer)
- tcmalloc_pageheap_total_decommit_bytes (integer)
- tcmalloc_pageheap_total_reserve_bytes (integer)
- tcmalloc_pageheap_unmapped_bytes (integer)
- tcmalloc_spinlock_total_delay_ns (integer)
- tcmalloc_thread_cache_free_bytes (integer)
- tcmalloc_total_free_bytes (integer)
- tcmalloc_transfer_cache_free_bytes (integer)
- total_available (integer)
- total_created (integer)
- total_docs_scanned (integer)
- total_in_use (integer)
- total_keys_scanned (integer)
- total_refreshing (integer)
- total_tickets_reads (integer)
- total_tickets_writes (integer)
- ttl_deletes (integer)
- ttl_passes (integer)
- update_command_failed (integer)
- update_command_total (integer)
- updates (integer)
- uptime_ns (integer)
- version (string)
- vsize_megabytes (integer)
- wtcache_app_threads_page_read_count (integer)
- wtcache_app_threads_page_read_time (integer)
- wtcache_app_threads_page_write_count (integer)
- wtcache_bytes_read_into (integer)
- wtcache_bytes_written_from (integer)
- wtcache_pages_read_into (integer)
- wtcache_pages_requested_from (integer)
- wtcache_current_bytes (integer)
- wtcache_max_bytes_configured (integer)
- wtcache_internal_pages_evicted (integer)
- wtcache_modified_pages_evicted (integer)
- wtcache_unmodified_pages_evicted (integer)
- wtcache_pages_evicted_by_app_thread (integer)
- wtcache_pages_queued_for_eviction (integer)
- wtcache_server_evicting_pages (integer)
- wtcache_tracked_dirty_bytes (integer)
- wtcache_worker_thread_evictingpages (integer)
- commands_per_sec (integer, deprecated in 1.10; use `commands`))
- cursor_no_timeout (integer, opened/sec, deprecated in 1.10; use `cursor_no_timeout_count`))
- cursor_pinned (integer, opened/sec, deprecated in 1.10; use `cursor_pinned_count`))
- cursor_timed_out (integer, opened/sec, deprecated in 1.10; use `cursor_timed_out_count`))
- cursor_total (integer, opened/sec, deprecated in 1.10; use `cursor_total_count`))
- deletes_per_sec (integer, deprecated in 1.10; use `deletes`))
- flushes_per_sec (integer, deprecated in 1.10; use `flushes`))
- getmores_per_sec (integer, deprecated in 1.10; use `getmores`))
- inserts_per_sec (integer, deprecated in 1.10; use `inserts`))
- net_in_bytes (integer, bytes/sec, deprecated in 1.10; use `net_out_bytes_count`))
- net_out_bytes (integer, bytes/sec, deprecated in 1.10; use `net_out_bytes_count`))
- queries_per_sec (integer, deprecated in 1.10; use `queries`))
- repl_commands_per_sec (integer, deprecated in 1.10; use `repl_commands`))
- repl_deletes_per_sec (integer, deprecated in 1.10; use `repl_deletes`)
- repl_getmores_per_sec (integer, deprecated in 1.10; use `repl_getmores`)
- repl_inserts_per_sec (integer, deprecated in 1.10; use `repl_inserts`))
- repl_queries_per_sec (integer, deprecated in 1.10; use `repl_queries`))
- repl_updates_per_sec (integer, deprecated in 1.10; use `repl_updates`))
- ttl_deletes_per_sec (integer, deprecated in 1.10; use `ttl_deletes`))
- ttl_passes_per_sec (integer, deprecated in 1.10; use `ttl_passes`))
- updates_per_sec (integer, deprecated in 1.10; use `updates`))
+ mongodb_db_stats
- tags:
- db_name
- hostname
- fields:
- avg_obj_size (float)
- collections (integer)
- data_size (integer)
- index_size (integer)
- indexes (integer)
- num_extents (integer)
- objects (integer)
- ok (integer)
- storage_size (integer)
- type (string)
- mongodb_col_stats
- tags:
- hostname
- collection
- db_name
- fields:
- size (integer)
- avg_obj_size (integer)
- storage_size (integer)
- total_index_size (integer)
- ok (integer)
- count (integer)
- type (string)
- mongodb_shard_stats
- tags:
- hostname
- fields:
- in_use (integer)
- available (integer)
- created (integer)
- refreshing (integer)
### Example Output:
```
mongodb,hostname=127.0.0.1:27017 active_reads=3i,active_writes=0i,aggregate_command_failed=0i,aggregate_command_total=87210i,assert_msg=0i,assert_regular=0i,assert_rollovers=0i,assert_user=0i,assert_warning=0i,available_reads=125i,available_writes=128i,commands=218126i,commands_per_sec=1876i,connections_available=838853i,connections_current=7i,connections_total_created=8i,count_command_failed=0i,count_command_total=7i,cursor_no_timeout=0i,cursor_no_timeout_count=0i,cursor_pinned=0i,cursor_pinned_count=0i,cursor_timed_out=0i,cursor_timed_out_count=0i,cursor_total=0i,cursor_total_count=0i,delete_command_failed=0i,delete_command_total=0i,deletes=0i,deletes_per_sec=0i,distinct_command_failed=0i,distinct_command_total=87190i,document_deleted=0i,document_inserted=0i,document_returned=7i,document_updated=43595i,find_and_modify_command_failed=0i,find_and_modify_command_total=43595i,find_command_failed=0i,find_command_total=348819i,flushes=1i,flushes_per_sec=0i,flushes_total_time_ns=5000000i,get_more_command_failed=0i,get_more_command_total=0i,getmores=7i,getmores_per_sec=1i,insert_command_failed=0i,insert_command_total=0i,inserts=0i,inserts_per_sec=0i,jumbo_chunks=0i,latency_commands=44179i,latency_commands_count=122i,latency_reads=36662189i,latency_reads_count=523229i,latency_writes=6768713i,latency_writes_count=87190i,net_in_bytes=837378i,net_in_bytes_count=97692502i,net_out_bytes=690836i,net_out_bytes_count=75377383i,open_connections=7i,operation_scan_and_order=87193i,operation_write_conflicts=7i,page_faults=0i,percent_cache_dirty=0.9,percent_cache_used=1,queries=348816i,queries_per_sec=2988i,queued_reads=0i,queued_writes=0i,resident_megabytes=77i,storage_freelist_search_bucket_exhausted=0i,storage_freelist_search_requests=0i,storage_freelist_search_scanned=0i,tcmalloc_central_cache_free_bytes=280136i,tcmalloc_current_allocated_bytes=77677288i,tcmalloc_current_total_thread_cache_bytes=1222608i,tcmalloc_heap_size=142659584i,tcmalloc_max_total_thread_cache_bytes=260046848i,tcmalloc_pageheap_commit_count=1898i,tcmalloc_pageheap_committed_bytes=130084864i,tcmalloc_pageheap_decommit_count=889i,tcmalloc_pageheap_free_bytes=50610176i,tcmalloc_pageheap_reserve_count=50i,tcmalloc_pageheap_scavenge_count=884i,tcmalloc_pageheap_total_commit_bytes=13021937664i,tcmalloc_pageheap_total_decommit_bytes=12891852800i,tcmalloc_pageheap_total_reserve_bytes=142659584i,tcmalloc_pageheap_unmapped_bytes=12574720i,tcmalloc_spinlock_total_delay_ns=9767500i,tcmalloc_thread_cache_free_bytes=1222608i,tcmalloc_total_free_bytes=1797400i,tcmalloc_transfer_cache_free_bytes=294656i,total_available=0i,total_created=0i,total_docs_scanned=43595i,total_in_use=0i,total_keys_scanned=130805i,total_refreshing=0i,total_tickets_reads=128i,total_tickets_writes=128i,ttl_deletes=0i,ttl_deletes_per_sec=0i,ttl_passes=0i,ttl_passes_per_sec=0i,update_command_failed=0i,update_command_total=43595i,updates=43595i,updates_per_sec=372i,uptime_ns=60023000000i,version="3.6.17",vsize_megabytes=1048i,wtcache_app_threads_page_read_count=108i,wtcache_app_threads_page_read_time=25995i,wtcache_app_threads_page_write_count=0i,wtcache_bytes_read_into=2487250i,wtcache_bytes_written_from=74i,wtcache_current_bytes=5014530i,wtcache_internal_pages_evicted=0i,wtcache_max_bytes_configured=505413632i,wtcache_modified_pages_evicted=0i,wtcache_pages_evicted_by_app_thread=0i,wtcache_pages_queued_for_eviction=0i,wtcache_pages_read_into=139i,wtcache_pages_requested_from=699135i,wtcache_server_evicting_pages=0i,wtcache_tracked_dirty_bytes=4797426i,wtcache_unmodified_pages_evicted=0i,wtcache_worker_thread_evictingpages=0i 1586379818000000000
mongodb,hostname=127.0.0.1:27017,node_type=SEC,rs_name=rs0 active_reads=1i,active_writes=0i,aggregate_command_failed=0i,aggregate_command_total=1i,assert_msg=0i,assert_regular=0i,assert_rollovers=0i,assert_user=79i,assert_warning=0i,available_reads=127i,available_writes=128i,commands=1121855i,commands_per_sec=10i,connections_available=51183i,connections_current=17i,connections_total_created=557i,count_command_failed=0i,count_command_total=46307i,cursor_no_timeout=0i,cursor_no_timeout_count=0i,cursor_pinned=0i,cursor_pinned_count=0i,cursor_timed_out=0i,cursor_timed_out_count=28i,cursor_total=0i,cursor_total_count=0i,delete_command_failed=0i,delete_command_total=0i,deletes=0i,deletes_per_sec=0i,distinct_command_failed=0i,distinct_command_total=0i,document_deleted=0i,document_inserted=0i,document_returned=2248129i,document_updated=0i,find_and_modify_command_failed=0i,find_and_modify_command_total=0i,find_command_failed=2i,find_command_total=8764i,flushes=7850i,flushes_per_sec=0i,flushes_total_time_ns=4535446000000i,get_more_command_failed=0i,get_more_command_total=1993i,getmores=2018i,getmores_per_sec=0i,insert_command_failed=0i,insert_command_total=0i,inserts=0i,inserts_per_sec=0i,jumbo_chunks=0i,latency_commands=112011949i,latency_commands_count=1072472i,latency_reads=1877142443i,latency_reads_count=57086i,latency_writes=0i,latency_writes_count=0i,member_status="SEC",net_in_bytes=1212i,net_in_bytes_count=263928689i,net_out_bytes=41051i,net_out_bytes_count=2475389483i,open_connections=17i,operation_scan_and_order=34i,operation_write_conflicts=0i,page_faults=317i,percent_cache_dirty=1.6,percent_cache_used=73,queries=8764i,queries_per_sec=0i,queued_reads=0i,queued_writes=0i,repl_apply_batches_num=17839419i,repl_apply_batches_total_millis=399929i,repl_apply_ops=23355263i,repl_buffer_count=0i,repl_buffer_size_bytes=0i,repl_commands=11i,repl_commands_per_sec=0i,repl_deletes=440608i,repl_deletes_per_sec=0i,repl_executor_pool_in_progress_count=0i,repl_executor_queues_network_in_progress=0i,repl_executor_queues_sleepers=4i,repl_executor_unsignaled_events=0i,repl_getmores=0i,repl_getmores_per_sec=0i,repl_inserts=1875729i,repl_inserts_per_sec=0i,repl_lag=0i,repl_network_bytes=39122199371i,repl_network_getmores_num=34908797i,repl_network_getmores_total_millis=434805356i,repl_network_ops=23199086i,repl_oplog_window_sec=619292i,repl_queries=0i,repl_queries_per_sec=0i,repl_updates=21034729i,repl_updates_per_sec=38i,repl_state=2,resident_megabytes=6721i,state="SECONDARY",storage_freelist_search_bucket_exhausted=0i,storage_freelist_search_requests=0i,storage_freelist_search_scanned=0i,tcmalloc_central_cache_free_bytes=358512400i,tcmalloc_current_allocated_bytes=5427379424i,tcmalloc_current_total_thread_cache_bytes=70349552i,tcmalloc_heap_size=10199310336i,tcmalloc_max_total_thread_cache_bytes=1073741824i,tcmalloc_pageheap_commit_count=790819i,tcmalloc_pageheap_committed_bytes=7064821760i,tcmalloc_pageheap_decommit_count=533347i,tcmalloc_pageheap_free_bytes=1207816192i,tcmalloc_pageheap_reserve_count=7706i,tcmalloc_pageheap_scavenge_count=426235i,tcmalloc_pageheap_total_commit_bytes=116127649792i,tcmalloc_pageheap_total_decommit_bytes=109062828032i,tcmalloc_pageheap_total_reserve_bytes=10199310336i,tcmalloc_pageheap_unmapped_bytes=3134488576i,tcmalloc_spinlock_total_delay_ns=2518474348i,tcmalloc_thread_cache_free_bytes=70349552i,tcmalloc_total_free_bytes=429626144i,tcmalloc_transfer_cache_free_bytes=764192i,total_available=0i,total_created=0i,total_docs_scanned=735004782i,total_in_use=0i,total_keys_scanned=6188216i,total_refreshing=0i,total_tickets_reads=128i,total_tickets_writes=128i,ttl_deletes=0i,ttl_deletes_per_sec=0i,ttl_passes=7892i,ttl_passes_per_sec=0i,update_command_failed=0i,update_command_total=0i,updates=0i,updates_per_sec=0i,uptime_ns=473590288000000i,version="3.6.17",vsize_megabytes=11136i,wtcache_app_threads_page_read_count=11467625i,wtcache_app_threads_page_read_time=1700336840i,wtcache_app_threads_page_write_count=13268184i,wtcache_bytes_read_into=348022587843i,wtcache_bytes_written_from=322571702254i,wtcache_current_bytes=5509459274i,wtcache_internal_pages_evicted=109108i,wtcache_max_bytes_configured=7547650048i,wtcache_modified_pages_evicted=911196i,wtcache_pages_evicted_by_app_thread=17366i,wtcache_pages_queued_for_eviction=16572754i,wtcache_pages_read_into=11689764i,wtcache_pages_requested_from=499825861i,wtcache_server_evicting_pages=0i,wtcache_tracked_dirty_bytes=117487510i,wtcache_unmodified_pages_evicted=11058458i,wtcache_worker_thread_evictingpages=11907226i 1586379707000000000
mongodb_db_stats,db_name=admin,hostname=127.0.0.1:27017 avg_obj_size=241,collections=2i,data_size=723i,index_size=49152i,indexes=3i,num_extents=0i,objects=3i,ok=1i,storage_size=53248i,type="db_stat" 1547159491000000000
mongodb_db_stats,db_name=local,hostname=127.0.0.1:27017 avg_obj_size=813.9705882352941,collections=6i,data_size=55350i,index_size=102400i,indexes=5i,num_extents=0i,objects=68i,ok=1i,storage_size=204800i,type="db_stat" 1547159491000000000
mongodb_col_stats,collection=foo,db_name=local,hostname=127.0.0.1:27017 size=375005928i,avg_obj_size=5494,type="col_stat",storage_size=249307136i,total_index_size=2138112i,ok=1i,count=68251i 1547159491000000000
mongodb_shard_stats,hostname=127.0.0.1:27017,in_use=3i,available=3i,created=4i,refreshing=0i 1522799074000000000
```

View File

@ -1,16 +0,0 @@
version: '3'
services:
mongodb:
image: mongo
telegraf:
image: glinton/scratch
volumes:
- ./telegraf.conf:/telegraf.conf
- ../../../../telegraf:/telegraf
depends_on:
- mongodb
entrypoint:
- /telegraf
- --config
- /telegraf.conf

View File

@ -1,9 +0,0 @@
[agent]
interval="1s"
flush_interval="3s"
[[inputs.mongodb]]
servers = ["mongodb://mongodb:27017"]
[[outputs.file]]
files = ["stdout"]

View File

@ -1,200 +0,0 @@
package mongodb
import (
"crypto/tls"
"crypto/x509"
"fmt"
"net"
"net/url"
"strings"
"sync"
"time"
"github.com/influxdata/telegraf"
tlsint "github.com/influxdata/telegraf/plugins/common/tls"
"gopkg.in/mgo.v2"
)
type MongoDB struct {
Servers []string
Ssl Ssl
Mongos map[string]*Server
GatherClusterStatus bool
GatherPerdbStats bool
GatherColStats bool
ColStatsDbs []string
tlsint.ClientConfig
Log telegraf.Logger
}
type Ssl struct {
Enabled bool
CaCerts []string `toml:"cacerts"`
}
var sampleConfig = `
## An array of URLs of the form:
## "mongodb://" [user ":" pass "@"] host [ ":" port]
## For example:
## mongodb://user:auth_key@10.10.3.30:27017,
## mongodb://10.10.3.33:18832,
servers = ["mongodb://127.0.0.1:27017"]
## When true, collect cluster status
## Note that the query that counts jumbo chunks triggers a COLLSCAN, which
## may have an impact on performance.
# gather_cluster_status = true
## When true, collect per database stats
# gather_perdb_stats = false
## When true, collect per collection stats
# gather_col_stats = false
## List of db where collections stats are collected
## If empty, all db are concerned
# col_stats_dbs = ["local"]
## Optional TLS Config
# tls_ca = "/etc/telegraf/ca.pem"
# tls_cert = "/etc/telegraf/cert.pem"
# tls_key = "/etc/telegraf/key.pem"
## Use TLS but skip chain & host verification
# insecure_skip_verify = false
`
func (m *MongoDB) SampleConfig() string {
return sampleConfig
}
func (*MongoDB) Description() string {
return "Read metrics from one or many MongoDB servers"
}
var localhost = &url.URL{Host: "mongodb://127.0.0.1:27017"}
// Reads stats from all configured servers accumulates stats.
// Returns one of the errors encountered while gather stats (if any).
func (m *MongoDB) Gather(acc telegraf.Accumulator) error {
if len(m.Servers) == 0 {
m.gatherServer(m.getMongoServer(localhost), acc)
return nil
}
var wg sync.WaitGroup
for i, serv := range m.Servers {
if !strings.HasPrefix(serv, "mongodb://") {
// Preserve backwards compatibility for hostnames without a
// scheme, broken in go 1.8. Remove in Telegraf 2.0
serv = "mongodb://" + serv
m.Log.Warnf("Using %q as connection URL; please update your configuration to use an URL", serv)
m.Servers[i] = serv
}
u, err := url.Parse(serv)
if err != nil {
m.Log.Errorf("Unable to parse address %q: %s", serv, err.Error())
continue
}
if u.Host == "" {
m.Log.Errorf("Unable to parse address %q", serv)
continue
}
wg.Add(1)
go func(srv *Server) {
defer wg.Done()
err := m.gatherServer(srv, acc)
if err != nil {
m.Log.Errorf("Error in plugin: %v", err)
}
}(m.getMongoServer(u))
}
wg.Wait()
return nil
}
func (m *MongoDB) getMongoServer(url *url.URL) *Server {
if _, ok := m.Mongos[url.Host]; !ok {
m.Mongos[url.Host] = &Server{
Log: m.Log,
Url: url,
}
}
return m.Mongos[url.Host]
}
func (m *MongoDB) gatherServer(server *Server, acc telegraf.Accumulator) error {
if server.Session == nil {
var dialAddrs []string
if server.Url.User != nil {
dialAddrs = []string{server.Url.String()}
} else {
dialAddrs = []string{server.Url.Host}
}
dialInfo, err := mgo.ParseURL(dialAddrs[0])
if err != nil {
return fmt.Errorf("unable to parse URL %q: %s", dialAddrs[0], err.Error())
}
dialInfo.Direct = true
dialInfo.Timeout = 5 * time.Second
var tlsConfig *tls.Config
if m.Ssl.Enabled {
// Deprecated TLS config
tlsConfig = &tls.Config{}
if len(m.Ssl.CaCerts) > 0 {
roots := x509.NewCertPool()
for _, caCert := range m.Ssl.CaCerts {
ok := roots.AppendCertsFromPEM([]byte(caCert))
if !ok {
return fmt.Errorf("failed to parse root certificate")
}
}
tlsConfig.RootCAs = roots
} else {
tlsConfig.InsecureSkipVerify = true
}
} else {
tlsConfig, err = m.ClientConfig.TLSConfig()
if err != nil {
return err
}
}
// If configured to use TLS, add a dial function
if tlsConfig != nil {
dialInfo.DialServer = func(addr *mgo.ServerAddr) (net.Conn, error) {
conn, err := tls.Dial("tcp", addr.String(), tlsConfig)
if err != nil {
fmt.Printf("error in Dial, %s\n", err.Error())
}
return conn, err
}
}
sess, err := mgo.DialWithInfo(dialInfo)
if err != nil {
return fmt.Errorf("unable to connect to MongoDB: %s", err.Error())
}
server.Session = sess
}
return server.gatherData(acc, m.GatherClusterStatus, m.GatherPerdbStats, m.GatherColStats, m.ColStatsDbs)
}
/*
func init() {
inputs.Add("mongodb", func() telegraf.Input {
return &MongoDB{
Mongos: make(map[string]*Server),
GatherClusterStatus: true,
GatherPerdbStats: false,
GatherColStats: false,
ColStatsDbs: []string{"local"},
}
})
}
*/

View File

@ -1,412 +0,0 @@
package mongodb
import (
"fmt"
"reflect"
"strconv"
"github.com/influxdata/telegraf"
)
type MongodbData struct {
StatLine *StatLine
Fields map[string]interface{}
Tags map[string]string
DbData []DbData
ColData []ColData
ShardHostData []DbData
}
type DbData struct {
Name string
Fields map[string]interface{}
}
type ColData struct {
Name string
DbName string
Fields map[string]interface{}
}
func NewMongodbData(statLine *StatLine, tags map[string]string) *MongodbData {
return &MongodbData{
StatLine: statLine,
Tags: tags,
Fields: make(map[string]interface{}),
DbData: []DbData{},
}
}
var DefaultStats = map[string]string{
"uptime_ns": "UptimeNanos",
"inserts": "InsertCnt",
"inserts_per_sec": "Insert",
"queries": "QueryCnt",
"queries_per_sec": "Query",
"updates": "UpdateCnt",
"updates_per_sec": "Update",
"deletes": "DeleteCnt",
"deletes_per_sec": "Delete",
"getmores": "GetMoreCnt",
"getmores_per_sec": "GetMore",
"commands": "CommandCnt",
"commands_per_sec": "Command",
"flushes": "FlushesCnt",
"flushes_per_sec": "Flushes",
"flushes_total_time_ns": "FlushesTotalTime",
"vsize_megabytes": "Virtual",
"resident_megabytes": "Resident",
"queued_reads": "QueuedReaders",
"queued_writes": "QueuedWriters",
"active_reads": "ActiveReaders",
"active_writes": "ActiveWriters",
"available_reads": "AvailableReaders",
"available_writes": "AvailableWriters",
"total_tickets_reads": "TotalTicketsReaders",
"total_tickets_writes": "TotalTicketsWriters",
"net_in_bytes_count": "NetInCnt",
"net_in_bytes": "NetIn",
"net_out_bytes_count": "NetOutCnt",
"net_out_bytes": "NetOut",
"open_connections": "NumConnections",
"ttl_deletes": "DeletedDocumentsCnt",
"ttl_deletes_per_sec": "DeletedDocuments",
"ttl_passes": "PassesCnt",
"ttl_passes_per_sec": "Passes",
"cursor_timed_out": "TimedOutC",
"cursor_timed_out_count": "TimedOutCCnt",
"cursor_no_timeout": "NoTimeoutC",
"cursor_no_timeout_count": "NoTimeoutCCnt",
"cursor_pinned": "PinnedC",
"cursor_pinned_count": "PinnedCCnt",
"cursor_total": "TotalC",
"cursor_total_count": "TotalCCnt",
"document_deleted": "DeletedD",
"document_inserted": "InsertedD",
"document_returned": "ReturnedD",
"document_updated": "UpdatedD",
"connections_current": "CurrentC",
"connections_available": "AvailableC",
"connections_total_created": "TotalCreatedC",
"operation_scan_and_order": "ScanAndOrderOp",
"operation_write_conflicts": "WriteConflictsOp",
"total_keys_scanned": "TotalKeysScanned",
"total_docs_scanned": "TotalObjectsScanned",
}
var DefaultAssertsStats = map[string]string{
"assert_regular": "Regular",
"assert_warning": "Warning",
"assert_msg": "Msg",
"assert_user": "User",
"assert_rollovers": "Rollovers",
}
var DefaultCommandsStats = map[string]string{
"aggregate_command_total": "AggregateCommandTotal",
"aggregate_command_failed": "AggregateCommandFailed",
"count_command_total": "CountCommandTotal",
"count_command_failed": "CountCommandFailed",
"delete_command_total": "DeleteCommandTotal",
"delete_command_failed": "DeleteCommandFailed",
"distinct_command_total": "DistinctCommandTotal",
"distinct_command_failed": "DistinctCommandFailed",
"find_command_total": "FindCommandTotal",
"find_command_failed": "FindCommandFailed",
"find_and_modify_command_total": "FindAndModifyCommandTotal",
"find_and_modify_command_failed": "FindAndModifyCommandFailed",
"get_more_command_total": "GetMoreCommandTotal",
"get_more_command_failed": "GetMoreCommandFailed",
"insert_command_total": "InsertCommandTotal",
"insert_command_failed": "InsertCommandFailed",
"update_command_total": "UpdateCommandTotal",
"update_command_failed": "UpdateCommandFailed",
}
var DefaultLatencyStats = map[string]string{
"latency_writes_count": "WriteOpsCnt",
"latency_writes": "WriteLatency",
"latency_reads_count": "ReadOpsCnt",
"latency_reads": "ReadLatency",
"latency_commands_count": "CommandOpsCnt",
"latency_commands": "CommandLatency",
}
var DefaultReplStats = map[string]string{
"repl_inserts": "InsertRCnt",
"repl_inserts_per_sec": "InsertR",
"repl_queries": "QueryRCnt",
"repl_queries_per_sec": "QueryR",
"repl_updates": "UpdateRCnt",
"repl_updates_per_sec": "UpdateR",
"repl_deletes": "DeleteRCnt",
"repl_deletes_per_sec": "DeleteR",
"repl_getmores": "GetMoreRCnt",
"repl_getmores_per_sec": "GetMoreR",
"repl_commands": "CommandRCnt",
"repl_commands_per_sec": "CommandR",
"member_status": "NodeType",
"state": "NodeState",
"repl_state": "NodeStateInt",
"repl_lag": "ReplLag",
"repl_network_bytes": "ReplNetworkBytes",
"repl_network_getmores_num": "ReplNetworkGetmoresNum",
"repl_network_getmores_total_millis": "ReplNetworkGetmoresTotalMillis",
"repl_network_ops": "ReplNetworkOps",
"repl_buffer_count": "ReplBufferCount",
"repl_buffer_size_bytes": "ReplBufferSizeBytes",
"repl_apply_batches_num": "ReplApplyBatchesNum",
"repl_apply_batches_total_millis": "ReplApplyBatchesTotalMillis",
"repl_apply_ops": "ReplApplyOps",
"repl_executor_pool_in_progress_count": "ReplExecutorPoolInProgressCount",
"repl_executor_queues_network_in_progress": "ReplExecutorQueuesNetworkInProgress",
"repl_executor_queues_sleepers": "ReplExecutorQueuesSleepers",
"repl_executor_unsignaled_events": "ReplExecutorUnsignaledEvents",
}
var DefaultClusterStats = map[string]string{
"jumbo_chunks": "JumboChunksCount",
}
var DefaultShardStats = map[string]string{
"total_in_use": "TotalInUse",
"total_available": "TotalAvailable",
"total_created": "TotalCreated",
"total_refreshing": "TotalRefreshing",
}
var ShardHostStats = map[string]string{
"in_use": "InUse",
"available": "Available",
"created": "Created",
"refreshing": "Refreshing",
}
var MmapStats = map[string]string{
"mapped_megabytes": "Mapped",
"non-mapped_megabytes": "NonMapped",
"page_faults": "FaultsCnt",
"page_faults_per_sec": "Faults",
}
var WiredTigerStats = map[string]string{
"percent_cache_dirty": "CacheDirtyPercent",
"percent_cache_used": "CacheUsedPercent",
}
var WiredTigerExtStats = map[string]string{
"wtcache_tracked_dirty_bytes": "TrackedDirtyBytes",
"wtcache_current_bytes": "CurrentCachedBytes",
"wtcache_max_bytes_configured": "MaxBytesConfigured",
"wtcache_app_threads_page_read_count": "AppThreadsPageReadCount",
"wtcache_app_threads_page_read_time": "AppThreadsPageReadTime",
"wtcache_app_threads_page_write_count": "AppThreadsPageWriteCount",
"wtcache_bytes_written_from": "BytesWrittenFrom",
"wtcache_bytes_read_into": "BytesReadInto",
"wtcache_pages_evicted_by_app_thread": "PagesEvictedByAppThread",
"wtcache_pages_queued_for_eviction": "PagesQueuedForEviction",
"wtcache_pages_read_into": "PagesReadIntoCache",
"wtcache_pages_written_from": "PagesWrittenFromCache",
"wtcache_pages_requested_from": "PagesRequestedFromCache",
"wtcache_server_evicting_pages": "ServerEvictingPages",
"wtcache_worker_thread_evictingpages": "WorkerThreadEvictingPages",
"wtcache_internal_pages_evicted": "InternalPagesEvicted",
"wtcache_modified_pages_evicted": "ModifiedPagesEvicted",
"wtcache_unmodified_pages_evicted": "UnmodifiedPagesEvicted",
}
var DefaultTCMallocStats = map[string]string{
"tcmalloc_current_allocated_bytes": "TCMallocCurrentAllocatedBytes",
"tcmalloc_heap_size": "TCMallocHeapSize",
"tcmalloc_central_cache_free_bytes": "TCMallocCentralCacheFreeBytes",
"tcmalloc_current_total_thread_cache_bytes": "TCMallocCurrentTotalThreadCacheBytes",
"tcmalloc_max_total_thread_cache_bytes": "TCMallocMaxTotalThreadCacheBytes",
"tcmalloc_total_free_bytes": "TCMallocTotalFreeBytes",
"tcmalloc_transfer_cache_free_bytes": "TCMallocTransferCacheFreeBytes",
"tcmalloc_thread_cache_free_bytes": "TCMallocThreadCacheFreeBytes",
"tcmalloc_spinlock_total_delay_ns": "TCMallocSpinLockTotalDelayNanos",
"tcmalloc_pageheap_free_bytes": "TCMallocPageheapFreeBytes",
"tcmalloc_pageheap_unmapped_bytes": "TCMallocPageheapUnmappedBytes",
"tcmalloc_pageheap_committed_bytes": "TCMallocPageheapComittedBytes",
"tcmalloc_pageheap_scavenge_count": "TCMallocPageheapScavengeCount",
"tcmalloc_pageheap_commit_count": "TCMallocPageheapCommitCount",
"tcmalloc_pageheap_total_commit_bytes": "TCMallocPageheapTotalCommitBytes",
"tcmalloc_pageheap_decommit_count": "TCMallocPageheapDecommitCount",
"tcmalloc_pageheap_total_decommit_bytes": "TCMallocPageheapTotalDecommitBytes",
"tcmalloc_pageheap_reserve_count": "TCMallocPageheapReserveCount",
"tcmalloc_pageheap_total_reserve_bytes": "TCMallocPageheapTotalReserveBytes",
}
var DefaultStorageStats = map[string]string{
"storage_freelist_search_bucket_exhausted": "StorageFreelistSearchBucketExhausted",
"storage_freelist_search_requests": "StorageFreelistSearchRequests",
"storage_freelist_search_scanned": "StorageFreelistSearchScanned",
}
var DbDataStats = map[string]string{
"collections": "Collections",
"objects": "Objects",
"avg_obj_size": "AvgObjSize",
"data_size": "DataSize",
"storage_size": "StorageSize",
"num_extents": "NumExtents",
"indexes": "Indexes",
"index_size": "IndexSize",
"ok": "Ok",
}
var ColDataStats = map[string]string{
"count": "Count",
"size": "Size",
"avg_obj_size": "AvgObjSize",
"storage_size": "StorageSize",
"total_index_size": "TotalIndexSize",
"ok": "Ok",
}
func (d *MongodbData) AddDbStats() {
for _, dbstat := range d.StatLine.DbStatsLines {
dbStatLine := reflect.ValueOf(&dbstat).Elem()
newDbData := &DbData{
Name: dbstat.Name,
Fields: make(map[string]interface{}),
}
newDbData.Fields["type"] = "db_stat"
for key, value := range DbDataStats {
val := dbStatLine.FieldByName(value).Interface()
newDbData.Fields[key] = val
}
d.DbData = append(d.DbData, *newDbData)
}
}
func (d *MongodbData) AddColStats() {
for _, colstat := range d.StatLine.ColStatsLines {
colStatLine := reflect.ValueOf(&colstat).Elem()
newColData := &ColData{
Name: colstat.Name,
DbName: colstat.DbName,
Fields: make(map[string]interface{}),
}
newColData.Fields["type"] = "col_stat"
for key, value := range ColDataStats {
val := colStatLine.FieldByName(value).Interface()
newColData.Fields[key] = val
}
d.ColData = append(d.ColData, *newColData)
}
}
func (d *MongodbData) AddShardHostStats() {
for host, hostStat := range d.StatLine.ShardHostStatsLines {
hostStatLine := reflect.ValueOf(&hostStat).Elem()
newDbData := &DbData{
Name: host,
Fields: make(map[string]interface{}),
}
newDbData.Fields["type"] = "shard_host_stat"
for k, v := range ShardHostStats {
val := hostStatLine.FieldByName(v).Interface()
newDbData.Fields[k] = val
}
d.ShardHostData = append(d.ShardHostData, *newDbData)
}
}
func (d *MongodbData) AddDefaultStats() {
statLine := reflect.ValueOf(d.StatLine).Elem()
d.addStat(statLine, DefaultStats)
if d.StatLine.NodeType != "" {
d.addStat(statLine, DefaultReplStats)
d.Tags["node_type"] = d.StatLine.NodeType
}
if d.StatLine.ReadLatency > 0 {
d.addStat(statLine, DefaultLatencyStats)
}
if d.StatLine.ReplSetName != "" {
d.Tags["rs_name"] = d.StatLine.ReplSetName
}
if d.StatLine.OplogStats != nil {
d.add("repl_oplog_window_sec", d.StatLine.OplogStats.TimeDiff)
}
if d.StatLine.Version != "" {
d.add("version", d.StatLine.Version)
}
d.addStat(statLine, DefaultAssertsStats)
d.addStat(statLine, DefaultClusterStats)
d.addStat(statLine, DefaultCommandsStats)
d.addStat(statLine, DefaultShardStats)
d.addStat(statLine, DefaultStorageStats)
d.addStat(statLine, DefaultTCMallocStats)
if d.StatLine.StorageEngine == "mmapv1" || d.StatLine.StorageEngine == "rocksdb" {
d.addStat(statLine, MmapStats)
} else if d.StatLine.StorageEngine == "wiredTiger" {
for key, value := range WiredTigerStats {
val := statLine.FieldByName(value).Interface()
percentVal := fmt.Sprintf("%.1f", val.(float64)*100)
floatVal, _ := strconv.ParseFloat(percentVal, 64)
d.add(key, floatVal)
}
d.addStat(statLine, WiredTigerExtStats)
d.add("page_faults", d.StatLine.FaultsCnt)
}
}
func (d *MongodbData) addStat(statLine reflect.Value, stats map[string]string) {
for key, value := range stats {
val := statLine.FieldByName(value).Interface()
d.add(key, val)
}
}
func (d *MongodbData) add(key string, val interface{}) {
d.Fields[key] = val
}
func (d *MongodbData) flush(acc telegraf.Accumulator) {
acc.AddFields(
"mongodb",
d.Fields,
d.Tags,
d.StatLine.Time,
)
d.Fields = make(map[string]interface{})
for _, db := range d.DbData {
d.Tags["db_name"] = db.Name
acc.AddFields(
"mongodb_db_stats",
db.Fields,
d.Tags,
d.StatLine.Time,
)
db.Fields = make(map[string]interface{})
}
for _, col := range d.ColData {
d.Tags["collection"] = col.Name
d.Tags["db_name"] = col.DbName
acc.AddFields(
"mongodb_col_stats",
col.Fields,
d.Tags,
d.StatLine.Time,
)
col.Fields = make(map[string]interface{})
}
for _, host := range d.ShardHostData {
d.Tags["hostname"] = host.Name
acc.AddFields(
"mongodb_shard_stats",
host.Fields,
d.Tags,
d.StatLine.Time,
)
host.Fields = make(map[string]interface{})
}
}

View File

@ -1,487 +0,0 @@
package mongodb
import (
"sort"
"testing"
"time"
"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/assert"
)
var tags = make(map[string]string)
func TestAddNonReplStats(t *testing.T) {
d := NewMongodbData(
&StatLine{
StorageEngine: "",
Time: time.Now(),
UptimeNanos: 0,
Insert: 0,
Query: 0,
Update: 0,
UpdateCnt: 0,
Delete: 0,
GetMore: 0,
Command: 0,
Flushes: 0,
FlushesCnt: 0,
Virtual: 0,
Resident: 0,
QueuedReaders: 0,
QueuedWriters: 0,
ActiveReaders: 0,
ActiveWriters: 0,
AvailableReaders: 0,
AvailableWriters: 0,
TotalTicketsReaders: 0,
TotalTicketsWriters: 0,
NetIn: 0,
NetOut: 0,
NumConnections: 0,
Passes: 0,
DeletedDocuments: 0,
TimedOutC: 0,
NoTimeoutC: 0,
PinnedC: 0,
TotalC: 0,
DeletedD: 0,
InsertedD: 0,
ReturnedD: 0,
UpdatedD: 0,
CurrentC: 0,
AvailableC: 0,
TotalCreatedC: 0,
ScanAndOrderOp: 0,
WriteConflictsOp: 0,
TotalKeysScanned: 0,
TotalObjectsScanned: 0,
},
tags,
)
var acc testutil.Accumulator
d.AddDefaultStats()
d.flush(&acc)
for key := range DefaultStats {
assert.True(t, acc.HasFloatField("mongodb", key) || acc.HasInt64Field("mongodb", key), key)
}
}
func TestAddReplStats(t *testing.T) {
d := NewMongodbData(
&StatLine{
StorageEngine: "mmapv1",
Mapped: 0,
NonMapped: 0,
Faults: 0,
},
tags,
)
var acc testutil.Accumulator
d.AddDefaultStats()
d.flush(&acc)
for key := range MmapStats {
assert.True(t, acc.HasInt64Field("mongodb", key), key)
}
}
func TestAddWiredTigerStats(t *testing.T) {
d := NewMongodbData(
&StatLine{
StorageEngine: "wiredTiger",
CacheDirtyPercent: 0,
CacheUsedPercent: 0,
TrackedDirtyBytes: 0,
CurrentCachedBytes: 0,
MaxBytesConfigured: 0,
AppThreadsPageReadCount: 0,
AppThreadsPageReadTime: 0,
AppThreadsPageWriteCount: 0,
BytesWrittenFrom: 0,
BytesReadInto: 0,
PagesEvictedByAppThread: 0,
PagesQueuedForEviction: 0,
PagesWrittenFromCache: 1247,
ServerEvictingPages: 0,
WorkerThreadEvictingPages: 0,
FaultsCnt: 204,
},
tags,
)
var acc testutil.Accumulator
d.AddDefaultStats()
d.flush(&acc)
for key := range WiredTigerStats {
assert.True(t, acc.HasFloatField("mongodb", key), key)
}
for key := range WiredTigerExtStats {
assert.True(t, acc.HasFloatField("mongodb", key) || acc.HasInt64Field("mongodb", key), key)
}
assert.True(t, acc.HasInt64Field("mongodb", "page_faults"))
}
func TestAddShardStats(t *testing.T) {
d := NewMongodbData(
&StatLine{
TotalInUse: 0,
TotalAvailable: 0,
TotalCreated: 0,
TotalRefreshing: 0,
},
tags,
)
var acc testutil.Accumulator
d.AddDefaultStats()
d.flush(&acc)
for key := range DefaultShardStats {
assert.True(t, acc.HasInt64Field("mongodb", key))
}
}
func TestAddLatencyStats(t *testing.T) {
d := NewMongodbData(
&StatLine{
CommandOpsCnt: 73,
CommandLatency: 364,
ReadOpsCnt: 113,
ReadLatency: 201,
WriteOpsCnt: 7,
WriteLatency: 55,
},
tags,
)
var acc testutil.Accumulator
d.AddDefaultStats()
d.flush(&acc)
for key := range DefaultLatencyStats {
assert.True(t, acc.HasInt64Field("mongodb", key))
}
}
func TestAddAssertsStats(t *testing.T) {
d := NewMongodbData(
&StatLine{
Regular: 3,
Warning: 9,
Msg: 2,
User: 34,
Rollovers: 0,
},
tags,
)
var acc testutil.Accumulator
d.AddDefaultStats()
d.flush(&acc)
for key := range DefaultAssertsStats {
assert.True(t, acc.HasInt64Field("mongodb", key))
}
}
func TestAddCommandsStats(t *testing.T) {
d := NewMongodbData(
&StatLine{
AggregateCommandTotal: 12,
AggregateCommandFailed: 2,
CountCommandTotal: 18,
CountCommandFailed: 5,
DeleteCommandTotal: 73,
DeleteCommandFailed: 364,
DistinctCommandTotal: 87,
DistinctCommandFailed: 19,
FindCommandTotal: 113,
FindCommandFailed: 201,
FindAndModifyCommandTotal: 7,
FindAndModifyCommandFailed: 55,
GetMoreCommandTotal: 4,
GetMoreCommandFailed: 55,
InsertCommandTotal: 34,
InsertCommandFailed: 65,
UpdateCommandTotal: 23,
UpdateCommandFailed: 6,
},
tags,
)
var acc testutil.Accumulator
d.AddDefaultStats()
d.flush(&acc)
for key := range DefaultCommandsStats {
assert.True(t, acc.HasInt64Field("mongodb", key))
}
}
func TestAddTCMallocStats(t *testing.T) {
d := NewMongodbData(
&StatLine{
TCMallocCurrentAllocatedBytes: 5877253096,
TCMallocHeapSize: 8067108864,
TCMallocPageheapFreeBytes: 1054994432,
TCMallocPageheapUnmappedBytes: 677859328,
TCMallocMaxTotalThreadCacheBytes: 1073741824,
TCMallocCurrentTotalThreadCacheBytes: 80405312,
TCMallocTotalFreeBytes: 457002008,
TCMallocCentralCacheFreeBytes: 375131800,
TCMallocTransferCacheFreeBytes: 1464896,
TCMallocThreadCacheFreeBytes: 80405312,
TCMallocPageheapComittedBytes: 7389249536,
TCMallocPageheapScavengeCount: 396394,
TCMallocPageheapCommitCount: 641765,
TCMallocPageheapTotalCommitBytes: 102248751104,
TCMallocPageheapDecommitCount: 396394,
TCMallocPageheapTotalDecommitBytes: 94859501568,
TCMallocPageheapReserveCount: 6179,
TCMallocPageheapTotalReserveBytes: 8067108864,
TCMallocSpinLockTotalDelayNanos: 2344453860,
},
tags,
)
var acc testutil.Accumulator
d.AddDefaultStats()
d.flush(&acc)
for key := range DefaultTCMallocStats {
assert.True(t, acc.HasInt64Field("mongodb", key))
}
}
func TestAddStorageStats(t *testing.T) {
d := NewMongodbData(
&StatLine{
StorageFreelistSearchBucketExhausted: 0,
StorageFreelistSearchRequests: 0,
StorageFreelistSearchScanned: 0,
},
tags,
)
var acc testutil.Accumulator
d.AddDefaultStats()
d.flush(&acc)
for key := range DefaultStorageStats {
assert.True(t, acc.HasInt64Field("mongodb", key))
}
}
func TestAddShardHostStats(t *testing.T) {
expectedHosts := []string{"hostA", "hostB"}
hostStatLines := map[string]ShardHostStatLine{}
for _, host := range expectedHosts {
hostStatLines[host] = ShardHostStatLine{
InUse: 0,
Available: 0,
Created: 0,
Refreshing: 0,
}
}
d := NewMongodbData(
&StatLine{
ShardHostStatsLines: hostStatLines,
},
map[string]string{}, // Use empty tags, so we don't break existing tests
)
var acc testutil.Accumulator
d.AddShardHostStats()
d.flush(&acc)
var hostsFound []string
for host := range hostStatLines {
for key := range ShardHostStats {
assert.True(t, acc.HasInt64Field("mongodb_shard_stats", key))
}
assert.True(t, acc.HasTag("mongodb_shard_stats", "hostname"))
hostsFound = append(hostsFound, host)
}
sort.Strings(hostsFound)
sort.Strings(expectedHosts)
assert.Equal(t, hostsFound, expectedHosts)
}
func TestStateTag(t *testing.T) {
d := NewMongodbData(
&StatLine{
StorageEngine: "",
Time: time.Now(),
Insert: 0,
Query: 0,
NodeType: "PRI",
NodeState: "PRIMARY",
ReplSetName: "rs1",
Version: "3.6.17",
},
tags,
)
stateTags := make(map[string]string)
stateTags["node_type"] = "PRI"
stateTags["rs_name"] = "rs1"
var acc testutil.Accumulator
d.AddDefaultStats()
d.flush(&acc)
fields := map[string]interface{}{
"active_reads": int64(0),
"active_writes": int64(0),
"aggregate_command_failed": int64(0),
"aggregate_command_total": int64(0),
"assert_msg": int64(0),
"assert_regular": int64(0),
"assert_rollovers": int64(0),
"assert_user": int64(0),
"assert_warning": int64(0),
"available_reads": int64(0),
"available_writes": int64(0),
"commands": int64(0),
"commands_per_sec": int64(0),
"connections_available": int64(0),
"connections_current": int64(0),
"connections_total_created": int64(0),
"count_command_failed": int64(0),
"count_command_total": int64(0),
"cursor_no_timeout": int64(0),
"cursor_no_timeout_count": int64(0),
"cursor_pinned": int64(0),
"cursor_pinned_count": int64(0),
"cursor_timed_out": int64(0),
"cursor_timed_out_count": int64(0),
"cursor_total": int64(0),
"cursor_total_count": int64(0),
"delete_command_failed": int64(0),
"delete_command_total": int64(0),
"deletes": int64(0),
"deletes_per_sec": int64(0),
"distinct_command_failed": int64(0),
"distinct_command_total": int64(0),
"document_deleted": int64(0),
"document_inserted": int64(0),
"document_returned": int64(0),
"document_updated": int64(0),
"find_and_modify_command_failed": int64(0),
"find_and_modify_command_total": int64(0),
"find_command_failed": int64(0),
"find_command_total": int64(0),
"flushes": int64(0),
"flushes_per_sec": int64(0),
"flushes_total_time_ns": int64(0),
"get_more_command_failed": int64(0),
"get_more_command_total": int64(0),
"getmores": int64(0),
"getmores_per_sec": int64(0),
"insert_command_failed": int64(0),
"insert_command_total": int64(0),
"inserts": int64(0),
"inserts_per_sec": int64(0),
"jumbo_chunks": int64(0),
"member_status": "PRI",
"net_in_bytes": int64(0),
"net_in_bytes_count": int64(0),
"net_out_bytes": int64(0),
"net_out_bytes_count": int64(0),
"open_connections": int64(0),
"operation_scan_and_order": int64(0),
"operation_write_conflicts": int64(0),
"queries": int64(0),
"queries_per_sec": int64(0),
"queued_reads": int64(0),
"queued_writes": int64(0),
"repl_apply_batches_num": int64(0),
"repl_apply_batches_total_millis": int64(0),
"repl_apply_ops": int64(0),
"repl_buffer_count": int64(0),
"repl_buffer_size_bytes": int64(0),
"repl_commands": int64(0),
"repl_commands_per_sec": int64(0),
"repl_deletes": int64(0),
"repl_deletes_per_sec": int64(0),
"repl_executor_pool_in_progress_count": int64(0),
"repl_executor_queues_network_in_progress": int64(0),
"repl_executor_queues_sleepers": int64(0),
"repl_executor_unsignaled_events": int64(0),
"repl_getmores": int64(0),
"repl_getmores_per_sec": int64(0),
"repl_inserts": int64(0),
"repl_inserts_per_sec": int64(0),
"repl_lag": int64(0),
"repl_network_bytes": int64(0),
"repl_network_getmores_num": int64(0),
"repl_network_getmores_total_millis": int64(0),
"repl_network_ops": int64(0),
"repl_queries": int64(0),
"repl_queries_per_sec": int64(0),
"repl_updates": int64(0),
"repl_updates_per_sec": int64(0),
"repl_state": int64(0),
"resident_megabytes": int64(0),
"state": "PRIMARY",
"storage_freelist_search_bucket_exhausted": int64(0),
"storage_freelist_search_requests": int64(0),
"storage_freelist_search_scanned": int64(0),
"tcmalloc_central_cache_free_bytes": int64(0),
"tcmalloc_current_allocated_bytes": int64(0),
"tcmalloc_current_total_thread_cache_bytes": int64(0),
"tcmalloc_heap_size": int64(0),
"tcmalloc_max_total_thread_cache_bytes": int64(0),
"tcmalloc_pageheap_commit_count": int64(0),
"tcmalloc_pageheap_committed_bytes": int64(0),
"tcmalloc_pageheap_decommit_count": int64(0),
"tcmalloc_pageheap_free_bytes": int64(0),
"tcmalloc_pageheap_reserve_count": int64(0),
"tcmalloc_pageheap_scavenge_count": int64(0),
"tcmalloc_pageheap_total_commit_bytes": int64(0),
"tcmalloc_pageheap_total_decommit_bytes": int64(0),
"tcmalloc_pageheap_total_reserve_bytes": int64(0),
"tcmalloc_pageheap_unmapped_bytes": int64(0),
"tcmalloc_spinlock_total_delay_ns": int64(0),
"tcmalloc_thread_cache_free_bytes": int64(0),
"tcmalloc_total_free_bytes": int64(0),
"tcmalloc_transfer_cache_free_bytes": int64(0),
"total_available": int64(0),
"total_created": int64(0),
"total_docs_scanned": int64(0),
"total_in_use": int64(0),
"total_keys_scanned": int64(0),
"total_refreshing": int64(0),
"total_tickets_reads": int64(0),
"total_tickets_writes": int64(0),
"ttl_deletes": int64(0),
"ttl_deletes_per_sec": int64(0),
"ttl_passes": int64(0),
"ttl_passes_per_sec": int64(0),
"update_command_failed": int64(0),
"update_command_total": int64(0),
"updates": int64(0),
"updates_per_sec": int64(0),
"uptime_ns": int64(0),
"version": "3.6.17",
"vsize_megabytes": int64(0),
}
acc.AssertContainsTaggedFields(t, "mongodb", fields, stateTags)
}

View File

@ -1,299 +0,0 @@
package mongodb
import (
"fmt"
"net/url"
"strings"
"time"
"github.com/influxdata/telegraf"
"gopkg.in/mgo.v2"
"gopkg.in/mgo.v2/bson"
)
type Server struct {
Url *url.URL
Session *mgo.Session
lastResult *MongoStatus
Log telegraf.Logger
}
func (s *Server) getDefaultTags() map[string]string {
tags := make(map[string]string)
tags["hostname"] = s.Url.Host
return tags
}
type oplogEntry struct {
Timestamp bson.MongoTimestamp `bson:"ts"`
}
func IsAuthorization(err error) bool {
return strings.Contains(err.Error(), "not authorized")
}
func (s *Server) authLog(err error) {
if IsAuthorization(err) {
s.Log.Debug(err.Error())
} else {
s.Log.Error(err.Error())
}
}
func (s *Server) gatherServerStatus() (*ServerStatus, error) {
serverStatus := &ServerStatus{}
err := s.Session.DB("admin").Run(bson.D{
{
Name: "serverStatus",
Value: 1,
},
{
Name: "recordStats",
Value: 0,
},
}, serverStatus)
if err != nil {
return nil, err
}
return serverStatus, nil
}
func (s *Server) gatherReplSetStatus() (*ReplSetStatus, error) {
replSetStatus := &ReplSetStatus{}
err := s.Session.DB("admin").Run(bson.D{
{
Name: "replSetGetStatus",
Value: 1,
},
}, replSetStatus)
if err != nil {
return nil, err
}
return replSetStatus, nil
}
func (s *Server) gatherClusterStatus() (*ClusterStatus, error) {
chunkCount, err := s.Session.DB("config").C("chunks").Find(bson.M{"jumbo": true}).Count()
if err != nil {
return nil, err
}
return &ClusterStatus{
JumboChunksCount: int64(chunkCount),
}, nil
}
func (s *Server) gatherShardConnPoolStats() (*ShardStats, error) {
shardStats := &ShardStats{}
err := s.Session.DB("admin").Run(bson.D{
{
Name: "shardConnPoolStats",
Value: 1,
},
}, &shardStats)
if err != nil {
return nil, err
}
return shardStats, nil
}
func (s *Server) gatherDBStats(name string) (*Db, error) {
stats := &DbStatsData{}
err := s.Session.DB(name).Run(bson.D{
{
Name: "dbStats",
Value: 1,
},
}, stats)
if err != nil {
return nil, err
}
return &Db{
Name: name,
DbStatsData: stats,
}, nil
}
func (s *Server) getOplogReplLag(collection string) (*OplogStats, error) {
query := bson.M{"ts": bson.M{"$exists": true}}
var first oplogEntry
err := s.Session.DB("local").C(collection).Find(query).Sort("$natural").Limit(1).One(&first)
if err != nil {
return nil, err
}
var last oplogEntry
err = s.Session.DB("local").C(collection).Find(query).Sort("-$natural").Limit(1).One(&last)
if err != nil {
return nil, err
}
firstTime := time.Unix(int64(first.Timestamp>>32), 0)
lastTime := time.Unix(int64(last.Timestamp>>32), 0)
stats := &OplogStats{
TimeDiff: int64(lastTime.Sub(firstTime).Seconds()),
}
return stats, nil
}
// The "oplog.rs" collection is stored on all replica set members.
//
// The "oplog.$main" collection is created on the master node of a
// master-slave replicated deployment. As of MongoDB 3.2, master-slave
// replication has been deprecated.
func (s *Server) gatherOplogStats() (*OplogStats, error) {
stats, err := s.getOplogReplLag("oplog.rs")
if err == nil {
return stats, nil
}
return s.getOplogReplLag("oplog.$main")
}
func (s *Server) gatherCollectionStats(colStatsDbs []string) (*ColStats, error) {
names, err := s.Session.DatabaseNames()
if err != nil {
return nil, err
}
results := &ColStats{}
for _, dbName := range names {
if stringInSlice(dbName, colStatsDbs) || len(colStatsDbs) == 0 {
var colls []string
colls, err = s.Session.DB(dbName).CollectionNames()
if err != nil {
s.Log.Errorf("Error getting collection names: %s", err.Error())
continue
}
for _, colName := range colls {
colStatLine := &ColStatsData{}
err = s.Session.DB(dbName).Run(bson.D{
{
Name: "collStats",
Value: colName,
},
}, colStatLine)
if err != nil {
s.authLog(fmt.Errorf("error getting col stats from %q: %v", colName, err))
continue
}
collection := &Collection{
Name: colName,
DbName: dbName,
ColStatsData: colStatLine,
}
results.Collections = append(results.Collections, *collection)
}
}
}
return results, nil
}
func (s *Server) gatherData(acc telegraf.Accumulator, gatherClusterStatus bool, gatherDbStats bool, gatherColStats bool, colStatsDbs []string) error {
s.Session.SetMode(mgo.Eventual, true)
s.Session.SetSocketTimeout(0)
serverStatus, err := s.gatherServerStatus()
if err != nil {
return err
}
// Get replica set status, an error indicates that the server is not a
// member of a replica set.
replSetStatus, err := s.gatherReplSetStatus()
if err != nil {
s.Log.Debugf("Unable to gather replica set status: %s", err.Error())
}
// Gather the oplog if we are a member of a replica set. Non-replica set
// members do not have the oplog collections.
var oplogStats *OplogStats
if replSetStatus != nil {
oplogStats, err = s.gatherOplogStats()
if err != nil {
s.authLog(fmt.Errorf("Unable to get oplog stats: %v", err))
}
}
var clusterStatus *ClusterStatus
if gatherClusterStatus {
status, err := s.gatherClusterStatus()
if err != nil {
s.Log.Debugf("Unable to gather cluster status: %s", err.Error())
}
clusterStatus = status
}
shardStats, err := s.gatherShardConnPoolStats()
if err != nil {
s.authLog(fmt.Errorf("unable to gather shard connection pool stats: %s", err.Error()))
}
var collectionStats *ColStats
if gatherColStats {
stats, err := s.gatherCollectionStats(colStatsDbs)
if err != nil {
return err
}
collectionStats = stats
}
dbStats := &DbStats{}
if gatherDbStats {
names, err := s.Session.DatabaseNames()
if err != nil {
return err
}
for _, name := range names {
db, err := s.gatherDBStats(name)
if err != nil {
s.Log.Debugf("Error getting db stats from %q: %s", name, err.Error())
}
dbStats.Dbs = append(dbStats.Dbs, *db)
}
}
result := &MongoStatus{
ServerStatus: serverStatus,
ReplSetStatus: replSetStatus,
ClusterStatus: clusterStatus,
DbStats: dbStats,
ColStats: collectionStats,
ShardStats: shardStats,
OplogStats: oplogStats,
}
result.SampleTime = time.Now()
if s.lastResult != nil && result != nil {
duration := result.SampleTime.Sub(s.lastResult.SampleTime)
durationInSeconds := int64(duration.Seconds())
if durationInSeconds == 0 {
durationInSeconds = 1
}
data := NewMongodbData(
NewStatLine(*s.lastResult, *result, s.Url.Host, true, durationInSeconds),
s.getDefaultTags(),
)
data.AddDefaultStats()
data.AddDbStats()
data.AddColStats()
data.AddShardHostStats()
data.flush(acc)
}
s.lastResult = result
return nil
}
func stringInSlice(a string, list []string) bool {
for _, b := range list {
if b == a {
return true
}
}
return false
}

View File

@ -1,41 +0,0 @@
// +build integration
package mongodb
import (
"testing"
"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestGetDefaultTags(t *testing.T) {
var tagTests = []struct {
in string
out string
}{
{"hostname", server.Url.Host},
}
defaultTags := server.getDefaultTags()
for _, tt := range tagTests {
if defaultTags[tt.in] != tt.out {
t.Errorf("expected %q, got %q", tt.out, defaultTags[tt.in])
}
}
}
func TestAddDefaultStats(t *testing.T) {
var acc testutil.Accumulator
err := server.gatherData(&acc, false)
require.NoError(t, err)
// need to call this twice so it can perform the diff
err = server.gatherData(&acc, false)
require.NoError(t, err)
for key := range DefaultStats {
assert.True(t, acc.HasInt64Field("mongodb", key))
}
}

View File

@ -1,71 +0,0 @@
// +build integration
package mongodb
import (
"log"
"math/rand"
"net/url"
"os"
"testing"
"time"
"gopkg.in/mgo.v2"
)
var connect_url string
var server *Server
func init() {
connect_url = os.Getenv("MONGODB_URL")
if connect_url == "" {
connect_url = "127.0.0.1:27017"
server = &Server{Url: &url.URL{Host: connect_url}}
} else {
full_url, err := url.Parse(connect_url)
if err != nil {
log.Fatalf("Unable to parse URL (%s), %s\n", full_url, err.Error())
}
server = &Server{Url: full_url}
}
}
func testSetup(m *testing.M) {
var err error
var dialAddrs []string
if server.Url.User != nil {
dialAddrs = []string{server.Url.String()}
} else {
dialAddrs = []string{server.Url.Host}
}
dialInfo, err := mgo.ParseURL(dialAddrs[0])
if err != nil {
log.Fatalf("Unable to parse URL (%s), %s\n", dialAddrs[0], err.Error())
}
dialInfo.Direct = true
dialInfo.Timeout = 5 * time.Second
sess, err := mgo.DialWithInfo(dialInfo)
if err != nil {
log.Fatalf("Unable to connect to MongoDB, %s\n", err.Error())
}
server.Session = sess
server.Session, _ = mgo.Dial(server.Url.Host)
if err != nil {
log.Fatalln(err.Error())
}
}
func testTeardown(m *testing.M) {
server.Session.Close()
}
func TestMain(m *testing.M) {
// seed randomness for use with tests
rand.Seed(time.Now().UTC().UnixNano())
testSetup(m)
res := m.Run()
testTeardown(m)
os.Exit(res)
}

File diff suppressed because it is too large Load Diff

View File

@ -1,205 +0,0 @@
package mongodb
import (
"testing"
//"time"
//"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/assert"
)
func TestLatencyStats(t *testing.T) {
sl := NewStatLine(
MongoStatus{
ServerStatus: &ServerStatus{
Connections: &ConnectionStats{},
Mem: &MemStats{
Bits: 0,
Resident: 0,
Virtual: 0,
Supported: false,
Mapped: 0,
MappedWithJournal: 0,
},
},
},
MongoStatus{
ServerStatus: &ServerStatus{
Connections: &ConnectionStats{},
Mem: &MemStats{
Bits: 0,
Resident: 0,
Virtual: 0,
Supported: false,
Mapped: 0,
MappedWithJournal: 0,
},
OpLatencies: &OpLatenciesStats{
Reads: &LatencyStats{
Ops: 0,
Latency: 0,
},
Writes: &LatencyStats{
Ops: 0,
Latency: 0,
},
Commands: &LatencyStats{
Ops: 0,
Latency: 0,
},
},
},
},
"foo",
true,
60,
)
assert.Equal(t, sl.CommandLatency, int64(0))
assert.Equal(t, sl.ReadLatency, int64(0))
assert.Equal(t, sl.WriteLatency, int64(0))
assert.Equal(t, sl.CommandOpsCnt, int64(0))
assert.Equal(t, sl.ReadOpsCnt, int64(0))
assert.Equal(t, sl.WriteOpsCnt, int64(0))
}
func TestLatencyStatsDiffZero(t *testing.T) {
sl := NewStatLine(
MongoStatus{
ServerStatus: &ServerStatus{
Connections: &ConnectionStats{},
Mem: &MemStats{
Bits: 0,
Resident: 0,
Virtual: 0,
Supported: false,
Mapped: 0,
MappedWithJournal: 0,
},
OpLatencies: &OpLatenciesStats{
Reads: &LatencyStats{
Ops: 0,
Latency: 0,
},
Writes: &LatencyStats{
Ops: 0,
Latency: 0,
},
Commands: &LatencyStats{
Ops: 0,
Latency: 0,
},
},
},
},
MongoStatus{
ServerStatus: &ServerStatus{
Connections: &ConnectionStats{},
Mem: &MemStats{
Bits: 0,
Resident: 0,
Virtual: 0,
Supported: false,
Mapped: 0,
MappedWithJournal: 0,
},
OpLatencies: &OpLatenciesStats{
Reads: &LatencyStats{
Ops: 0,
Latency: 0,
},
Writes: &LatencyStats{
Ops: 0,
Latency: 0,
},
Commands: &LatencyStats{
Ops: 0,
Latency: 0,
},
},
},
},
"foo",
true,
60,
)
assert.Equal(t, sl.CommandLatency, int64(0))
assert.Equal(t, sl.ReadLatency, int64(0))
assert.Equal(t, sl.WriteLatency, int64(0))
assert.Equal(t, sl.CommandOpsCnt, int64(0))
assert.Equal(t, sl.ReadOpsCnt, int64(0))
assert.Equal(t, sl.WriteOpsCnt, int64(0))
}
func TestLatencyStatsDiff(t *testing.T) {
sl := NewStatLine(
MongoStatus{
ServerStatus: &ServerStatus{
Connections: &ConnectionStats{},
Mem: &MemStats{
Bits: 0,
Resident: 0,
Virtual: 0,
Supported: false,
Mapped: 0,
MappedWithJournal: 0,
},
OpLatencies: &OpLatenciesStats{
Reads: &LatencyStats{
Ops: 4189041956,
Latency: 2255922322753,
},
Writes: &LatencyStats{
Ops: 1691019457,
Latency: 494478256915,
},
Commands: &LatencyStats{
Ops: 1019150402,
Latency: 59177710371,
},
},
},
},
MongoStatus{
ServerStatus: &ServerStatus{
Connections: &ConnectionStats{},
Mem: &MemStats{
Bits: 0,
Resident: 0,
Virtual: 0,
Supported: false,
Mapped: 0,
MappedWithJournal: 0,
},
OpLatencies: &OpLatenciesStats{
Reads: &LatencyStats{
Ops: 4189049884,
Latency: 2255946760057,
},
Writes: &LatencyStats{
Ops: 1691021287,
Latency: 494479456987,
},
Commands: &LatencyStats{
Ops: 1019152861,
Latency: 59177981552,
},
},
},
},
"foo",
true,
60,
)
assert.Equal(t, sl.CommandLatency, int64(59177981552))
assert.Equal(t, sl.ReadLatency, int64(2255946760057))
assert.Equal(t, sl.WriteLatency, int64(494479456987))
assert.Equal(t, sl.CommandOpsCnt, int64(1019152861))
assert.Equal(t, sl.ReadOpsCnt, int64(4189049884))
assert.Equal(t, sl.WriteOpsCnt, int64(1691021287))
}

View File

@ -0,0 +1,20 @@
package mongodb
import (
"testing"
"time"
"github.com/didi/nightingale/src/modules/monapi/plugins"
)
func TestCollect(t *testing.T) {
input := plugins.PluginTest(t, &MongodbRule{
Servers: []string{"mongodb://root:root@127.0.0.1:27017"},
GatherClusterStatus: true,
GatherPerdbStats: true,
GatherColStats: true,
})
time.Sleep(time.Second)
plugins.PluginInputTest(t, input)
}

View File

@ -6,9 +6,9 @@ import (
"github.com/didi/nightingale/src/modules/monapi/collector"
"github.com/didi/nightingale/src/modules/monapi/plugins"
"github.com/didi/nightingale/src/modules/monapi/plugins/prometheus/prometheus"
"github.com/didi/nightingale/src/toolkits/i18n"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/inputs/prometheus"
)
func init() {
@ -74,7 +74,7 @@ func (p *PrometheusRule) TelegrafInput() (telegraf.Input, error) {
return nil, err
}
return &prometheus.Prometheus{
input := &prometheus.Prometheus{
URLs: p.URLs,
URLTag: "target",
// KubernetesServices: p.KubernetesServices,
@ -86,9 +86,15 @@ func (p *PrometheusRule) TelegrafInput() (telegraf.Input, error) {
// BearerTokenString: p.BearerTokenString,
// Username: p.Username,
// Password: p.Password,
ResponseTimeout: time.Second * time.Duration(p.ResponseTimeout),
MetricVersion: 2,
Log: plugins.GetLogger(),
ClientConfig: p.ClientConfig.TlsClientConfig(),
}, nil
// ResponseTimeout: time.Second * time.Duration(p.ResponseTimeout),
MetricVersion: 2,
Log: plugins.GetLogger(),
ClientConfig: p.ClientConfig.TlsClientConfig(),
}
if err := plugins.SetValue(&input.ResponseTimeout.Duration,
time.Second*time.Duration(p.ResponseTimeout)); err != nil {
return nil, err
}
return input, nil
}

View File

@ -1,171 +0,0 @@
# Prometheus Input Plugin
The prometheus input plugin gathers metrics from HTTP servers exposing metrics
in Prometheus format.
### Configuration:
```toml
# Read metrics from one or many prometheus clients
[[inputs.prometheus]]
## An array of urls to scrape metrics from.
urls = ["http://localhost:9100/metrics"]
## Metric version controls the mapping from Prometheus metrics into
## Telegraf metrics. When using the prometheus_client output, use the same
## value in both plugins to ensure metrics are round-tripped without
## modification.
##
## example: metric_version = 1; deprecated in 1.13
## metric_version = 2; recommended version
# metric_version = 1
## An array of Kubernetes services to scrape metrics from.
# kubernetes_services = ["http://my-service-dns.my-namespace:9100/metrics"]
## Kubernetes config file to create client from.
# kube_config = "/path/to/kubernetes.config"
## Scrape Kubernetes pods for the following prometheus annotations:
## - prometheus.io/scrape: Enable scraping for this pod
## - prometheus.io/scheme: If the metrics endpoint is secured then you will need to
## set this to `https` & most likely set the tls config.
## - prometheus.io/path: If the metrics path is not /metrics, define it with this annotation.
## - prometheus.io/port: If port is not 9102 use this annotation
# monitor_kubernetes_pods = true
## Restricts Kubernetes monitoring to a single namespace
## ex: monitor_kubernetes_pods_namespace = "default"
# monitor_kubernetes_pods_namespace = ""
# label selector to target pods which have the label
# kubernetes_label_selector = "env=dev,app=nginx"
# field selector to target pods
# eg. To scrape pods on a specific node
# kubernetes_field_selector = "spec.nodeName=$HOSTNAME"
## Use bearer token for authorization. ('bearer_token' takes priority)
# bearer_token = "/path/to/bearer/token"
## OR
# bearer_token_string = "abc_123"
## HTTP Basic Authentication username and password. ('bearer_token' and
## 'bearer_token_string' take priority)
# username = ""
# password = ""
## Specify timeout duration for slower prometheus clients (default is 3s)
# response_timeout = "3s"
## Optional TLS Config
# tls_ca = /path/to/cafile
# tls_cert = /path/to/certfile
# tls_key = /path/to/keyfile
## Use TLS but skip chain & host verification
# insecure_skip_verify = false
```
`urls` can contain a unix socket as well. If a different path is required (default is `/metrics` for both http[s] and unix) for a unix socket, add `path` as a query parameter as follows: `unix:///var/run/prometheus.sock?path=/custom/metrics`
#### Kubernetes Service Discovery
URLs listed in the `kubernetes_services` parameter will be expanded
by looking up all A records assigned to the hostname as described in
[Kubernetes DNS service discovery](https://kubernetes.io/docs/concepts/services-networking/service/#dns).
This method can be used to locate all
[Kubernetes headless services](https://kubernetes.io/docs/concepts/services-networking/service/#headless-services).
#### Kubernetes scraping
Enabling this option will allow the plugin to scrape for prometheus annotation on Kubernetes
pods. Currently, you can run this plugin in your kubernetes cluster, or we use the kubeconfig
file to determine where to monitor.
Currently the following annotation are supported:
* `prometheus.io/scrape` Enable scraping for this pod.
* `prometheus.io/scheme` If the metrics endpoint is secured then you will need to set this to `https` & most likely set the tls config. (default 'http')
* `prometheus.io/path` Override the path for the metrics endpoint on the service. (default '/metrics')
* `prometheus.io/port` Used to override the port. (default 9102)
Using the `monitor_kubernetes_pods_namespace` option allows you to limit which pods you are scraping.
#### Bearer Token
If set, the file specified by the `bearer_token` parameter will be read on
each interval and its contents will be appended to the Bearer string in the
Authorization header.
### Usage for Caddy HTTP server
If you want to monitor Caddy, you need to use Caddy with its Prometheus plugin:
* Download Caddy+Prometheus plugin [here](https://caddyserver.com/download/linux/amd64?plugins=http.prometheus)
* Add the `prometheus` directive in your `CaddyFile`
* Restart Caddy
* Configure Telegraf to fetch metrics on it:
```toml
[[inputs.prometheus]]
# ## An array of urls to scrape metrics from.
urls = ["http://localhost:9180/metrics"]
```
> This is the default URL where Caddy Prometheus plugin will send data.
> For more details, please read the [Caddy Prometheus documentation](https://github.com/miekg/caddy-prometheus/blob/master/README.md).
### Metrics:
Measurement names are based on the Metric Family and tags are created for each
label. The value is added to a field named based on the metric type.
All metrics receive the `url` tag indicating the related URL specified in the
Telegraf configuration. If using Kubernetes service discovery the `address`
tag is also added indicating the discovered ip address.
### Example Output:
**Source**
```
# HELP go_gc_duration_seconds A summary of the GC invocation durations.
# TYPE go_gc_duration_seconds summary
go_gc_duration_seconds{quantile="0"} 7.4545e-05
go_gc_duration_seconds{quantile="0.25"} 7.6999e-05
go_gc_duration_seconds{quantile="0.5"} 0.000277935
go_gc_duration_seconds{quantile="0.75"} 0.000706591
go_gc_duration_seconds{quantile="1"} 0.000706591
go_gc_duration_seconds_sum 0.00113607
go_gc_duration_seconds_count 4
# HELP go_goroutines Number of goroutines that currently exist.
# TYPE go_goroutines gauge
go_goroutines 15
# HELP cpu_usage_user Telegraf collected metric
# TYPE cpu_usage_user gauge
cpu_usage_user{cpu="cpu0"} 1.4112903225816156
cpu_usage_user{cpu="cpu1"} 0.702106318955865
cpu_usage_user{cpu="cpu2"} 2.0161290322588776
cpu_usage_user{cpu="cpu3"} 1.5045135406226022
```
**Output**
```
go_gc_duration_seconds,url=http://example.org:9273/metrics 1=0.001336611,count=14,sum=0.004527551,0=0.000057965,0.25=0.000083812,0.5=0.000286537,0.75=0.000365303 1505776733000000000
go_goroutines,url=http://example.org:9273/metrics gauge=21 1505776695000000000
cpu_usage_user,cpu=cpu0,url=http://example.org:9273/metrics gauge=1.513622603430151 1505776751000000000
cpu_usage_user,cpu=cpu1,url=http://example.org:9273/metrics gauge=5.829145728641773 1505776751000000000
cpu_usage_user,cpu=cpu2,url=http://example.org:9273/metrics gauge=2.119071644805144 1505776751000000000
cpu_usage_user,cpu=cpu3,url=http://example.org:9273/metrics gauge=1.5228426395944945 1505776751000000000
```
**Output (when metric_version = 2)**
```
prometheus,quantile=1,url=http://example.org:9273/metrics go_gc_duration_seconds=0.005574303 1556075100000000000
prometheus,quantile=0.75,url=http://example.org:9273/metrics go_gc_duration_seconds=0.0001046 1556075100000000000
prometheus,quantile=0.5,url=http://example.org:9273/metrics go_gc_duration_seconds=0.0000719 1556075100000000000
prometheus,quantile=0.25,url=http://example.org:9273/metrics go_gc_duration_seconds=0.0000579 1556075100000000000
prometheus,quantile=0,url=http://example.org:9273/metrics go_gc_duration_seconds=0.0000349 1556075100000000000
prometheus,url=http://example.org:9273/metrics go_gc_duration_seconds_count=324,go_gc_duration_seconds_sum=0.091340353 1556075100000000000
prometheus,url=http://example.org:9273/metrics go_goroutines=15 1556075100000000000
prometheus,cpu=cpu0,url=http://example.org:9273/metrics cpu_usage_user=1.513622603430151 1505776751000000000
prometheus,cpu=cpu1,url=http://example.org:9273/metrics cpu_usage_user=5.829145728641773 1505776751000000000
prometheus,cpu=cpu2,url=http://example.org:9273/metrics cpu_usage_user=2.119071644805144 1505776751000000000
prometheus,cpu=cpu3,url=http://example.org:9273/metrics cpu_usage_user=1.5228426395944945 1505776751000000000
```

View File

@ -1,237 +0,0 @@
package prometheus
import (
"context"
"log"
"net"
"net/url"
"sync"
"time"
"github.com/ericchiang/k8s"
corev1 "github.com/ericchiang/k8s/apis/core/v1"
"github.com/ghodss/yaml"
)
type payload struct {
eventype string
pod *corev1.Pod
}
// loadClient parses a kubeconfig from a file and returns a Kubernetes
// client. It does not support extensions or client auth providers.
func loadClient(kubeconfig string) (*k8s.Client, error) {
// data, err := ioutil.ReadFile(kubeconfigPath)
// if err != nil {
// return nil, fmt.Errorf("failed reading '%s': %v", kubeconfigPath, err)
// }
// Unmarshal YAML into a Kubernetes config object.
var config k8s.Config
if err := yaml.Unmarshal([]byte(kubeconfig), &config); err != nil {
return nil, err
}
return k8s.NewClient(&config)
}
func (p *Prometheus) start(ctx context.Context) error {
client, err := k8s.NewInClusterClient()
if err != nil {
// u, err := user.Current()
// if err != nil {
// return fmt.Errorf("Failed to get current user - %v", err)
// }
// configLocation := filepath.Join(u.HomeDir, ".kube/config")
// if p.KubeConfig != "" {
// configLocation = p.KubeConfig
// }
client, err = loadClient(p.KubeConfigContent)
if err != nil {
return err
}
}
p.wg = sync.WaitGroup{}
p.wg.Add(1)
go func() {
defer p.wg.Done()
for {
select {
case <-ctx.Done():
return
case <-time.After(time.Second):
err := p.watch(ctx, client)
if err != nil {
p.Log.Errorf("Unable to watch resources: %s", err.Error())
}
}
}
}()
return nil
}
// An edge case exists if a pod goes offline at the same time a new pod is created
// (without the scrape annotations). K8s may re-assign the old pod ip to the non-scrape
// pod, causing errors in the logs. This is only true if the pod going offline is not
// directed to do so by K8s.
func (p *Prometheus) watch(ctx context.Context, client *k8s.Client) error {
selectors := podSelector(p)
pod := &corev1.Pod{}
watcher, err := client.Watch(ctx, p.PodNamespace, &corev1.Pod{}, selectors...)
if err != nil {
return err
}
defer watcher.Close()
for {
select {
case <-ctx.Done():
return nil
default:
pod = &corev1.Pod{}
// An error here means we need to reconnect the watcher.
eventType, err := watcher.Next(pod)
if err != nil {
return err
}
// If the pod is not "ready", there will be no ip associated with it.
if pod.GetMetadata().GetAnnotations()["prometheus.io/scrape"] != "true" ||
!podReady(pod.Status.GetContainerStatuses()) {
continue
}
switch eventType {
case k8s.EventAdded:
registerPod(pod, p)
case k8s.EventModified:
// To avoid multiple actions for each event, unregister on the first event
// in the delete sequence, when the containers are still "ready".
if pod.Metadata.GetDeletionTimestamp() != nil {
unregisterPod(pod, p)
} else {
registerPod(pod, p)
}
}
}
}
}
func podReady(statuss []*corev1.ContainerStatus) bool {
if len(statuss) == 0 {
return false
}
for _, cs := range statuss {
if !cs.GetReady() {
return false
}
}
return true
}
func podSelector(p *Prometheus) []k8s.Option {
options := []k8s.Option{}
if len(p.KubernetesLabelSelector) > 0 {
options = append(options, k8s.QueryParam("labelSelector", p.KubernetesLabelSelector))
}
if len(p.KubernetesFieldSelector) > 0 {
options = append(options, k8s.QueryParam("fieldSelector", p.KubernetesFieldSelector))
}
return options
}
func registerPod(pod *corev1.Pod, p *Prometheus) {
if p.kubernetesPods == nil {
p.kubernetesPods = map[string]URLAndAddress{}
}
targetURL := getScrapeURL(pod)
if targetURL == nil {
return
}
log.Printf("D! [inputs.prometheus] will scrape metrics from %q", *targetURL)
// add annotation as metrics tags
tags := pod.GetMetadata().GetAnnotations()
if tags == nil {
tags = map[string]string{}
}
tags["pod_name"] = pod.GetMetadata().GetName()
tags["namespace"] = pod.GetMetadata().GetNamespace()
// add labels as metrics tags
for k, v := range pod.GetMetadata().GetLabels() {
tags[k] = v
}
URL, err := url.Parse(*targetURL)
if err != nil {
log.Printf("E! [inputs.prometheus] could not parse URL %q: %s", *targetURL, err.Error())
return
}
podURL := p.AddressToURL(URL, URL.Hostname())
p.lock.Lock()
p.kubernetesPods[podURL.String()] = URLAndAddress{
URL: podURL,
Address: URL.Hostname(),
OriginalURL: URL,
Tags: tags,
}
p.lock.Unlock()
}
func getScrapeURL(pod *corev1.Pod) *string {
ip := pod.Status.GetPodIP()
if ip == "" {
// return as if scrape was disabled, we will be notified again once the pod
// has an IP
return nil
}
scheme := pod.GetMetadata().GetAnnotations()["prometheus.io/scheme"]
path := pod.GetMetadata().GetAnnotations()["prometheus.io/path"]
port := pod.GetMetadata().GetAnnotations()["prometheus.io/port"]
if scheme == "" {
scheme = "http"
}
if port == "" {
port = "9102"
}
if path == "" {
path = "/metrics"
}
u := &url.URL{
Scheme: scheme,
Host: net.JoinHostPort(ip, port),
Path: path,
}
x := u.String()
return &x
}
func unregisterPod(pod *corev1.Pod, p *Prometheus) {
url := getScrapeURL(pod)
if url == nil {
return
}
log.Printf("D! [inputs.prometheus] registered a delete request for %q in namespace %q",
pod.GetMetadata().GetName(), pod.GetMetadata().GetNamespace())
p.lock.Lock()
defer p.lock.Unlock()
if _, ok := p.kubernetesPods[*url]; ok {
delete(p.kubernetesPods, *url)
log.Printf("D! [inputs.prometheus] will stop scraping for %q", *url)
}
}

View File

@ -1,155 +0,0 @@
package prometheus
import (
"github.com/ericchiang/k8s"
"testing"
"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/assert"
v1 "github.com/ericchiang/k8s/apis/core/v1"
metav1 "github.com/ericchiang/k8s/apis/meta/v1"
)
func TestScrapeURLNoAnnotations(t *testing.T) {
p := &v1.Pod{Metadata: &metav1.ObjectMeta{}}
p.GetMetadata().Annotations = map[string]string{}
url := getScrapeURL(p)
assert.Nil(t, url)
}
func TestScrapeURLAnnotationsNoScrape(t *testing.T) {
p := &v1.Pod{Metadata: &metav1.ObjectMeta{}}
p.Metadata.Name = str("myPod")
p.Metadata.Annotations = map[string]string{"prometheus.io/scrape": "false"}
url := getScrapeURL(p)
assert.Nil(t, url)
}
func TestScrapeURLAnnotations(t *testing.T) {
p := pod()
p.Metadata.Annotations = map[string]string{"prometheus.io/scrape": "true"}
url := getScrapeURL(p)
assert.Equal(t, "http://127.0.0.1:9102/metrics", *url)
}
func TestScrapeURLAnnotationsCustomPort(t *testing.T) {
p := pod()
p.Metadata.Annotations = map[string]string{"prometheus.io/scrape": "true", "prometheus.io/port": "9000"}
url := getScrapeURL(p)
assert.Equal(t, "http://127.0.0.1:9000/metrics", *url)
}
func TestScrapeURLAnnotationsCustomPath(t *testing.T) {
p := pod()
p.Metadata.Annotations = map[string]string{"prometheus.io/scrape": "true", "prometheus.io/path": "mymetrics"}
url := getScrapeURL(p)
assert.Equal(t, "http://127.0.0.1:9102/mymetrics", *url)
}
func TestScrapeURLAnnotationsCustomPathWithSep(t *testing.T) {
p := pod()
p.Metadata.Annotations = map[string]string{"prometheus.io/scrape": "true", "prometheus.io/path": "/mymetrics"}
url := getScrapeURL(p)
assert.Equal(t, "http://127.0.0.1:9102/mymetrics", *url)
}
func TestAddPod(t *testing.T) {
prom := &Prometheus{Log: testutil.Logger{}}
p := pod()
p.Metadata.Annotations = map[string]string{"prometheus.io/scrape": "true"}
registerPod(p, prom)
assert.Equal(t, 1, len(prom.kubernetesPods))
}
func TestAddMultipleDuplicatePods(t *testing.T) {
prom := &Prometheus{Log: testutil.Logger{}}
p := pod()
p.Metadata.Annotations = map[string]string{"prometheus.io/scrape": "true"}
registerPod(p, prom)
p.Metadata.Name = str("Pod2")
registerPod(p, prom)
assert.Equal(t, 1, len(prom.kubernetesPods))
}
func TestAddMultiplePods(t *testing.T) {
prom := &Prometheus{Log: testutil.Logger{}}
p := pod()
p.Metadata.Annotations = map[string]string{"prometheus.io/scrape": "true"}
registerPod(p, prom)
p.Metadata.Name = str("Pod2")
p.Status.PodIP = str("127.0.0.2")
registerPod(p, prom)
assert.Equal(t, 2, len(prom.kubernetesPods))
}
func TestDeletePods(t *testing.T) {
prom := &Prometheus{Log: testutil.Logger{}}
p := pod()
p.Metadata.Annotations = map[string]string{"prometheus.io/scrape": "true"}
registerPod(p, prom)
unregisterPod(p, prom)
assert.Equal(t, 0, len(prom.kubernetesPods))
}
func TestPodSelector(t *testing.T) {
cases := []struct {
expected []k8s.Option
labelselector string
fieldselector string
}{
{
expected: []k8s.Option{
k8s.QueryParam("labelSelector", "key1=val1,key2=val2,key3"),
k8s.QueryParam("fieldSelector", "spec.nodeName=ip-1-2-3-4.acme.com"),
},
labelselector: "key1=val1,key2=val2,key3",
fieldselector: "spec.nodeName=ip-1-2-3-4.acme.com",
},
{
expected: []k8s.Option{
k8s.QueryParam("labelSelector", "key1"),
k8s.QueryParam("fieldSelector", "spec.nodeName=ip-1-2-3-4.acme.com"),
},
labelselector: "key1",
fieldselector: "spec.nodeName=ip-1-2-3-4.acme.com",
},
{
expected: []k8s.Option{
k8s.QueryParam("labelSelector", "key1"),
k8s.QueryParam("fieldSelector", "somefield"),
},
labelselector: "key1",
fieldselector: "somefield",
},
}
for _, c := range cases {
prom := &Prometheus{
Log: testutil.Logger{},
KubernetesLabelSelector: c.labelselector,
KubernetesFieldSelector: c.fieldselector,
}
output := podSelector(prom)
assert.Equal(t, len(output), len(c.expected))
}
}
func pod() *v1.Pod {
p := &v1.Pod{Metadata: &metav1.ObjectMeta{}, Status: &v1.PodStatus{}, Spec: &v1.PodSpec{}}
p.Status.PodIP = str("127.0.0.1")
p.Metadata.Name = str("myPod")
p.Metadata.Namespace = str("default")
return p
}
func str(x string) *string {
return &x
}

View File

@ -1,320 +0,0 @@
package prometheus
// Parser inspired from
// https://github.com/prometheus/prom2json/blob/master/main.go
import (
"bufio"
"bytes"
"fmt"
"io"
"math"
"mime"
"net/http"
"time"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/metric"
"github.com/matttproud/golang_protobuf_extensions/pbutil"
dto "github.com/prometheus/client_model/go"
"github.com/prometheus/common/expfmt"
)
// Parse returns a slice of Metrics from a text representation of a
// metrics
func ParseV2(buf []byte, header http.Header) ([]telegraf.Metric, error) {
var metrics []telegraf.Metric
var parser expfmt.TextParser
// parse even if the buffer begins with a newline
buf = bytes.TrimPrefix(buf, []byte("\n"))
// Read raw data
buffer := bytes.NewBuffer(buf)
reader := bufio.NewReader(buffer)
mediatype, params, err := mime.ParseMediaType(header.Get("Content-Type"))
// Prepare output
metricFamilies := make(map[string]*dto.MetricFamily)
if err == nil && mediatype == "application/vnd.google.protobuf" &&
params["encoding"] == "delimited" &&
params["proto"] == "io.prometheus.client.MetricFamily" {
for {
mf := &dto.MetricFamily{}
if _, ierr := pbutil.ReadDelimited(reader, mf); ierr != nil {
if ierr == io.EOF {
break
}
return nil, fmt.Errorf("reading metric family protocol buffer failed: %s", ierr)
}
metricFamilies[mf.GetName()] = mf
}
} else {
metricFamilies, err = parser.TextToMetricFamilies(reader)
if err != nil {
return nil, fmt.Errorf("reading text format failed: %s", err)
}
}
// make sure all metrics have a consistent timestamp so that metrics don't straddle two different seconds
now := time.Now()
// read metrics
for metricName, mf := range metricFamilies {
for _, m := range mf.Metric {
// reading tags
tags := makeLabels(m)
if mf.GetType() == dto.MetricType_SUMMARY {
// summary metric
telegrafMetrics := makeQuantilesV2(m, tags, metricName, mf.GetType(), now)
metrics = append(metrics, telegrafMetrics...)
} else if mf.GetType() == dto.MetricType_HISTOGRAM {
// histogram metric
telegrafMetrics := makeBucketsV2(m, tags, metricName, mf.GetType(), now)
metrics = append(metrics, telegrafMetrics...)
} else {
// standard metric
// reading fields
fields := getNameAndValueV2(m, metricName)
// converting to telegraf metric
if len(fields) > 0 {
var t time.Time
if m.TimestampMs != nil && *m.TimestampMs > 0 {
t = time.Unix(0, *m.TimestampMs*1000000)
} else {
t = now
}
metric, err := metric.New("prometheus", tags, fields, t, valueType(mf.GetType()))
if err == nil {
metrics = append(metrics, metric)
}
}
}
}
}
return metrics, err
}
// Get Quantiles for summary metric & Buckets for histogram
func makeQuantilesV2(m *dto.Metric, tags map[string]string, metricName string, metricType dto.MetricType, now time.Time) []telegraf.Metric {
var metrics []telegraf.Metric
fields := make(map[string]interface{})
var t time.Time
if m.TimestampMs != nil && *m.TimestampMs > 0 {
t = time.Unix(0, *m.TimestampMs*1000000)
} else {
t = now
}
fields[metricName+"_count"] = float64(m.GetSummary().GetSampleCount())
fields[metricName+"_sum"] = float64(m.GetSummary().GetSampleSum())
met, err := metric.New("prometheus", tags, fields, t, valueType(metricType))
if err == nil {
metrics = append(metrics, met)
}
for _, q := range m.GetSummary().Quantile {
newTags := tags
fields = make(map[string]interface{})
newTags["quantile"] = fmt.Sprint(q.GetQuantile())
fields[metricName] = float64(q.GetValue())
quantileMetric, err := metric.New("prometheus", newTags, fields, t, valueType(metricType))
if err == nil {
metrics = append(metrics, quantileMetric)
}
}
return metrics
}
// Get Buckets from histogram metric
func makeBucketsV2(m *dto.Metric, tags map[string]string, metricName string, metricType dto.MetricType, now time.Time) []telegraf.Metric {
var metrics []telegraf.Metric
fields := make(map[string]interface{})
var t time.Time
if m.TimestampMs != nil && *m.TimestampMs > 0 {
t = time.Unix(0, *m.TimestampMs*1000000)
} else {
t = now
}
fields[metricName+"_count"] = float64(m.GetHistogram().GetSampleCount())
fields[metricName+"_sum"] = float64(m.GetHistogram().GetSampleSum())
met, err := metric.New("prometheus", tags, fields, t, valueType(metricType))
if err == nil {
metrics = append(metrics, met)
}
for _, b := range m.GetHistogram().Bucket {
newTags := tags
fields = make(map[string]interface{})
newTags["le"] = fmt.Sprint(b.GetUpperBound())
fields[metricName+"_bucket"] = float64(b.GetCumulativeCount())
histogramMetric, err := metric.New("prometheus", newTags, fields, t, valueType(metricType))
if err == nil {
metrics = append(metrics, histogramMetric)
}
}
return metrics
}
// Parse returns a slice of Metrics from a text representation of a
// metrics
func Parse(buf []byte, header http.Header) ([]telegraf.Metric, error) {
var metrics []telegraf.Metric
var parser expfmt.TextParser
// parse even if the buffer begins with a newline
buf = bytes.TrimPrefix(buf, []byte("\n"))
// Read raw data
buffer := bytes.NewBuffer(buf)
reader := bufio.NewReader(buffer)
mediatype, params, err := mime.ParseMediaType(header.Get("Content-Type"))
// Prepare output
metricFamilies := make(map[string]*dto.MetricFamily)
if err == nil && mediatype == "application/vnd.google.protobuf" &&
params["encoding"] == "delimited" &&
params["proto"] == "io.prometheus.client.MetricFamily" {
for {
mf := &dto.MetricFamily{}
if _, ierr := pbutil.ReadDelimited(reader, mf); ierr != nil {
if ierr == io.EOF {
break
}
return nil, fmt.Errorf("reading metric family protocol buffer failed: %s", ierr)
}
metricFamilies[mf.GetName()] = mf
}
} else {
metricFamilies, err = parser.TextToMetricFamilies(reader)
if err != nil {
return nil, fmt.Errorf("reading text format failed: %s", err)
}
}
// make sure all metrics have a consistent timestamp so that metrics don't straddle two different seconds
now := time.Now()
// read metrics
for metricName, mf := range metricFamilies {
for _, m := range mf.Metric {
// reading tags
tags := makeLabels(m)
// reading fields
var fields map[string]interface{}
if mf.GetType() == dto.MetricType_SUMMARY {
// summary metric
fields = makeQuantiles(m)
fields["count"] = float64(m.GetSummary().GetSampleCount())
fields["sum"] = float64(m.GetSummary().GetSampleSum())
} else if mf.GetType() == dto.MetricType_HISTOGRAM {
// histogram metric
fields = makeBuckets(m)
fields["count"] = float64(m.GetHistogram().GetSampleCount())
fields["sum"] = float64(m.GetHistogram().GetSampleSum())
} else {
// standard metric
fields = getNameAndValue(m)
}
// converting to telegraf metric
if len(fields) > 0 {
var t time.Time
if m.TimestampMs != nil && *m.TimestampMs > 0 {
t = time.Unix(0, *m.TimestampMs*1000000)
} else {
t = now
}
metric, err := metric.New(metricName, tags, fields, t, valueType(mf.GetType()))
if err == nil {
metrics = append(metrics, metric)
}
}
}
}
return metrics, err
}
func valueType(mt dto.MetricType) telegraf.ValueType {
switch mt {
case dto.MetricType_COUNTER:
return telegraf.Counter
case dto.MetricType_GAUGE:
return telegraf.Gauge
case dto.MetricType_SUMMARY:
return telegraf.Summary
case dto.MetricType_HISTOGRAM:
return telegraf.Histogram
default:
return telegraf.Untyped
}
}
// Get Quantiles from summary metric
func makeQuantiles(m *dto.Metric) map[string]interface{} {
fields := make(map[string]interface{})
for _, q := range m.GetSummary().Quantile {
if !math.IsNaN(q.GetValue()) {
fields[fmt.Sprint(q.GetQuantile())] = float64(q.GetValue())
}
}
return fields
}
// Get Buckets from histogram metric
func makeBuckets(m *dto.Metric) map[string]interface{} {
fields := make(map[string]interface{})
for _, b := range m.GetHistogram().Bucket {
fields[fmt.Sprint(b.GetUpperBound())] = float64(b.GetCumulativeCount())
}
return fields
}
// Get labels from metric
func makeLabels(m *dto.Metric) map[string]string {
result := map[string]string{}
for _, lp := range m.Label {
result[lp.GetName()] = lp.GetValue()
}
return result
}
// Get name and value from metric
func getNameAndValue(m *dto.Metric) map[string]interface{} {
fields := make(map[string]interface{})
if m.Gauge != nil {
if !math.IsNaN(m.GetGauge().GetValue()) {
fields["gauge"] = float64(m.GetGauge().GetValue())
}
} else if m.Counter != nil {
if !math.IsNaN(m.GetCounter().GetValue()) {
fields["counter"] = float64(m.GetCounter().GetValue())
}
} else if m.Untyped != nil {
if !math.IsNaN(m.GetUntyped().GetValue()) {
fields["value"] = float64(m.GetUntyped().GetValue())
}
}
return fields
}
// Get name and value from metric
func getNameAndValueV2(m *dto.Metric, metricName string) map[string]interface{} {
fields := make(map[string]interface{})
if m.Gauge != nil {
if !math.IsNaN(m.GetGauge().GetValue()) {
fields[metricName] = float64(m.GetGauge().GetValue())
}
} else if m.Counter != nil {
if !math.IsNaN(m.GetCounter().GetValue()) {
fields[metricName] = float64(m.GetCounter().GetValue())
}
} else if m.Untyped != nil {
if !math.IsNaN(m.GetUntyped().GetValue()) {
fields[metricName] = float64(m.GetUntyped().GetValue())
}
}
return fields
}

View File

@ -1,167 +0,0 @@
package prometheus
import (
"net/http"
"testing"
"time"
"github.com/stretchr/testify/assert"
)
var exptime = time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC)
const validUniqueGauge = `# HELP cadvisor_version_info A metric with a constant '1' value labeled by kernel version, OS version, docker version, cadvisor version & cadvisor revision.
# TYPE cadvisor_version_info gauge
cadvisor_version_info{cadvisorRevision="",cadvisorVersion="",dockerVersion="1.8.2",kernelVersion="3.10.0-229.20.1.el7.x86_64",osVersion="CentOS Linux 7 (Core)"} 1
`
const validUniqueCounter = `# HELP get_token_fail_count Counter of failed Token() requests to the alternate token source
# TYPE get_token_fail_count counter
get_token_fail_count 0
`
const validUniqueLine = `# HELP get_token_fail_count Counter of failed Token() requests to the alternate token source
`
const validUniqueSummary = `# HELP http_request_duration_microseconds The HTTP request latencies in microseconds.
# TYPE http_request_duration_microseconds summary
http_request_duration_microseconds{handler="prometheus",quantile="0.5"} 552048.506
http_request_duration_microseconds{handler="prometheus",quantile="0.9"} 5.876804288e+06
http_request_duration_microseconds{handler="prometheus",quantile="0.99"} 5.876804288e+06
http_request_duration_microseconds_sum{handler="prometheus"} 1.8909097205e+07
http_request_duration_microseconds_count{handler="prometheus"} 9
`
const validUniqueHistogram = `# HELP apiserver_request_latencies Response latency distribution in microseconds for each verb, resource and client.
# TYPE apiserver_request_latencies histogram
apiserver_request_latencies_bucket{resource="bindings",verb="POST",le="125000"} 1994
apiserver_request_latencies_bucket{resource="bindings",verb="POST",le="250000"} 1997
apiserver_request_latencies_bucket{resource="bindings",verb="POST",le="500000"} 2000
apiserver_request_latencies_bucket{resource="bindings",verb="POST",le="1e+06"} 2005
apiserver_request_latencies_bucket{resource="bindings",verb="POST",le="2e+06"} 2012
apiserver_request_latencies_bucket{resource="bindings",verb="POST",le="4e+06"} 2017
apiserver_request_latencies_bucket{resource="bindings",verb="POST",le="8e+06"} 2024
apiserver_request_latencies_bucket{resource="bindings",verb="POST",le="+Inf"} 2025
apiserver_request_latencies_sum{resource="bindings",verb="POST"} 1.02726334e+08
apiserver_request_latencies_count{resource="bindings",verb="POST"} 2025
`
const validData = `# HELP cadvisor_version_info A metric with a constant '1' value labeled by kernel version, OS version, docker version, cadvisor version & cadvisor revision.
# TYPE cadvisor_version_info gauge
cadvisor_version_info{cadvisorRevision="",cadvisorVersion="",dockerVersion="1.8.2",kernelVersion="3.10.0-229.20.1.el7.x86_64",osVersion="CentOS Linux 7 (Core)"} 1
# HELP go_gc_duration_seconds A summary of the GC invocation durations.
# TYPE go_gc_duration_seconds summary
go_gc_duration_seconds{quantile="0"} 0.013534896000000001
go_gc_duration_seconds{quantile="0.25"} 0.02469263
go_gc_duration_seconds{quantile="0.5"} 0.033727822000000005
go_gc_duration_seconds{quantile="0.75"} 0.03840335
go_gc_duration_seconds{quantile="1"} 0.049956604
go_gc_duration_seconds_sum 1970.341293002
go_gc_duration_seconds_count 65952
# HELP http_request_duration_microseconds The HTTP request latencies in microseconds.
# TYPE http_request_duration_microseconds summary
http_request_duration_microseconds{handler="prometheus",quantile="0.5"} 552048.506
http_request_duration_microseconds{handler="prometheus",quantile="0.9"} 5.876804288e+06
http_request_duration_microseconds{handler="prometheus",quantile="0.99"} 5.876804288e+06
http_request_duration_microseconds_sum{handler="prometheus"} 1.8909097205e+07
http_request_duration_microseconds_count{handler="prometheus"} 9
# HELP get_token_fail_count Counter of failed Token() requests to the alternate token source
# TYPE get_token_fail_count counter
get_token_fail_count 0
# HELP apiserver_request_latencies Response latency distribution in microseconds for each verb, resource and client.
# TYPE apiserver_request_latencies histogram
apiserver_request_latencies_bucket{resource="bindings",verb="POST",le="125000"} 1994
apiserver_request_latencies_bucket{resource="bindings",verb="POST",le="250000"} 1997
apiserver_request_latencies_bucket{resource="bindings",verb="POST",le="500000"} 2000
apiserver_request_latencies_bucket{resource="bindings",verb="POST",le="1e+06"} 2005
apiserver_request_latencies_bucket{resource="bindings",verb="POST",le="2e+06"} 2012
apiserver_request_latencies_bucket{resource="bindings",verb="POST",le="4e+06"} 2017
apiserver_request_latencies_bucket{resource="bindings",verb="POST",le="8e+06"} 2024
apiserver_request_latencies_bucket{resource="bindings",verb="POST",le="+Inf"} 2025
apiserver_request_latencies_sum{resource="bindings",verb="POST"} 1.02726334e+08
apiserver_request_latencies_count{resource="bindings",verb="POST"} 2025
`
const prometheusMulti = `
cpu,host=foo,datacenter=us-east usage_idle=99,usage_busy=1
cpu,host=foo,datacenter=us-east usage_idle=99,usage_busy=1
cpu,host=foo,datacenter=us-east usage_idle=99,usage_busy=1
cpu,host=foo,datacenter=us-east usage_idle=99,usage_busy=1
cpu,host=foo,datacenter=us-east usage_idle=99,usage_busy=1
cpu,host=foo,datacenter=us-east usage_idle=99,usage_busy=1
cpu,host=foo,datacenter=us-east usage_idle=99,usage_busy=1
`
const prometheusMultiSomeInvalid = `
cpu,host=foo,datacenter=us-east usage_idle=99,usage_busy=1
cpu,host=foo,datacenter=us-east usage_idle=99,usage_busy=1
cpu,host=foo,datacenter=us-east usage_idle=99,usage_busy=1
cpu,cpu=cpu3, host=foo,datacenter=us-east usage_idle=99,usage_busy=1
cpu,cpu=cpu4 , usage_idle=99,usage_busy=1
cpu,host=foo,datacenter=us-east usage_idle=99,usage_busy=1
`
func TestParseValidPrometheus(t *testing.T) {
// Gauge value
metrics, err := Parse([]byte(validUniqueGauge), http.Header{})
assert.NoError(t, err)
assert.Len(t, metrics, 1)
assert.Equal(t, "cadvisor_version_info", metrics[0].Name())
assert.Equal(t, map[string]interface{}{
"gauge": float64(1),
}, metrics[0].Fields())
assert.Equal(t, map[string]string{
"osVersion": "CentOS Linux 7 (Core)",
"cadvisorRevision": "",
"cadvisorVersion": "",
"dockerVersion": "1.8.2",
"kernelVersion": "3.10.0-229.20.1.el7.x86_64",
}, metrics[0].Tags())
// Counter value
metrics, err = Parse([]byte(validUniqueCounter), http.Header{})
assert.NoError(t, err)
assert.Len(t, metrics, 1)
assert.Equal(t, "get_token_fail_count", metrics[0].Name())
assert.Equal(t, map[string]interface{}{
"counter": float64(0),
}, metrics[0].Fields())
assert.Equal(t, map[string]string{}, metrics[0].Tags())
// Summary data
//SetDefaultTags(map[string]string{})
metrics, err = Parse([]byte(validUniqueSummary), http.Header{})
assert.NoError(t, err)
assert.Len(t, metrics, 1)
assert.Equal(t, "http_request_duration_microseconds", metrics[0].Name())
assert.Equal(t, map[string]interface{}{
"0.5": 552048.506,
"0.9": 5.876804288e+06,
"0.99": 5.876804288e+06,
"count": 9.0,
"sum": 1.8909097205e+07,
}, metrics[0].Fields())
assert.Equal(t, map[string]string{"handler": "prometheus"}, metrics[0].Tags())
// histogram data
metrics, err = Parse([]byte(validUniqueHistogram), http.Header{})
assert.NoError(t, err)
assert.Len(t, metrics, 1)
assert.Equal(t, "apiserver_request_latencies", metrics[0].Name())
assert.Equal(t, map[string]interface{}{
"500000": 2000.0,
"count": 2025.0,
"sum": 1.02726334e+08,
"250000": 1997.0,
"2e+06": 2012.0,
"4e+06": 2017.0,
"8e+06": 2024.0,
"+Inf": 2025.0,
"125000": 1994.0,
"1e+06": 2005.0,
}, metrics[0].Fields())
assert.Equal(t,
map[string]string{"verb": "POST", "resource": "bindings"},
metrics[0].Tags())
}

View File

@ -1,398 +0,0 @@
package prometheus
import (
"context"
"errors"
"fmt"
"io/ioutil"
"net"
"net/http"
"net/url"
"sync"
"time"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/common/tls"
)
const acceptHeader = `application/vnd.google.protobuf;proto=io.prometheus.client.MetricFamily;encoding=delimited;q=0.7,text/plain;version=0.0.4;q=0.3,*/*;q=0.1`
type Prometheus struct {
// An array of urls to scrape metrics from.
URLs []string `toml:"urls"`
// An array of Kubernetes services to scrape metrics from.
KubernetesServices []string
// Content of kubernetes config file
KubeConfigContent string
// Label Selector/s for Kubernetes
KubernetesLabelSelector string `toml:"kubernetes_label_selector"`
// Field Selector/s for Kubernetes
KubernetesFieldSelector string `toml:"kubernetes_field_selector"`
// Bearer Token authorization file path
BearerToken string `toml:"bearer_token"`
BearerTokenString string `toml:"bearer_token_string"`
// Basic authentication credentials
Username string `toml:"username"`
Password string `toml:"password"`
ResponseTimeout time.Duration `toml:"response_timeout"`
MetricVersion int `toml:"metric_version"`
URLTag string `toml:"url_tag"`
tls.ClientConfig
Log telegraf.Logger
client *http.Client
// Should we scrape Kubernetes services for prometheus annotations
MonitorPods bool `toml:"monitor_kubernetes_pods"`
PodNamespace string `toml:"monitor_kubernetes_pods_namespace"`
lock sync.Mutex
kubernetesPods map[string]URLAndAddress
cancel context.CancelFunc
wg sync.WaitGroup
}
var sampleConfig = `
## An array of urls to scrape metrics from.
urls = ["http://localhost:9100/metrics"]
## Metric version controls the mapping from Prometheus metrics into
## Telegraf metrics. When using the prometheus_client output, use the same
## value in both plugins to ensure metrics are round-tripped without
## modification.
##
## example: metric_version = 1; deprecated in 1.13
## metric_version = 2; recommended version
# metric_version = 1
## Url tag name (tag containing scrapped url. optional, default is "url")
# url_tag = "scrapeUrl"
## An array of Kubernetes services to scrape metrics from.
# kubernetes_services = ["http://my-service-dns.my-namespace:9100/metrics"]
## Kubernetes config file to create client from.
# kube_config = "/path/to/kubernetes.config"
## Scrape Kubernetes pods for the following prometheus annotations:
## - prometheus.io/scrape: Enable scraping for this pod
## - prometheus.io/scheme: If the metrics endpoint is secured then you will need to
## set this to 'https' & most likely set the tls config.
## - prometheus.io/path: If the metrics path is not /metrics, define it with this annotation.
## - prometheus.io/port: If port is not 9102 use this annotation
# monitor_kubernetes_pods = true
## Restricts Kubernetes monitoring to a single namespace
## ex: monitor_kubernetes_pods_namespace = "default"
# monitor_kubernetes_pods_namespace = ""
# label selector to target pods which have the label
# kubernetes_label_selector = "env=dev,app=nginx"
# field selector to target pods
# eg. To scrape pods on a specific node
# kubernetes_field_selector = "spec.nodeName=$HOSTNAME"
## Use bearer token for authorization. ('bearer_token' takes priority)
# bearer_token = "/path/to/bearer/token"
## OR
# bearer_token_string = "abc_123"
## HTTP Basic Authentication username and password. ('bearer_token' and
## 'bearer_token_string' take priority)
# username = ""
# password = ""
## Specify timeout duration for slower prometheus clients (default is 3s)
# response_timeout = "3s"
## Optional TLS Config
# tls_ca = /path/to/cafile
# tls_cert = /path/to/certfile
# tls_key = /path/to/keyfile
## Use TLS but skip chain & host verification
# insecure_skip_verify = false
`
func (p *Prometheus) SampleConfig() string {
return sampleConfig
}
func (p *Prometheus) Description() string {
return "Read metrics from one or many prometheus clients"
}
func (p *Prometheus) Init() error {
if p.MetricVersion != 2 {
p.Log.Warnf("Use of deprecated configuration: 'metric_version = 1'; please update to 'metric_version = 2'")
}
return nil
}
var ErrProtocolError = errors.New("prometheus protocol error")
func (p *Prometheus) AddressToURL(u *url.URL, address string) *url.URL {
host := address
if u.Port() != "" {
host = address + ":" + u.Port()
}
reconstructedURL := &url.URL{
Scheme: u.Scheme,
Opaque: u.Opaque,
User: u.User,
Path: u.Path,
RawPath: u.RawPath,
ForceQuery: u.ForceQuery,
RawQuery: u.RawQuery,
Fragment: u.Fragment,
Host: host,
}
return reconstructedURL
}
type URLAndAddress struct {
OriginalURL *url.URL
URL *url.URL
Address string
Tags map[string]string
}
func (p *Prometheus) GetAllURLs() (map[string]URLAndAddress, error) {
allURLs := make(map[string]URLAndAddress, 0)
for _, u := range p.URLs {
URL, err := url.Parse(u)
if err != nil {
p.Log.Errorf("Could not parse %q, skipping it. Error: %s", u, err.Error())
continue
}
allURLs[URL.String()] = URLAndAddress{URL: URL, OriginalURL: URL}
}
p.lock.Lock()
defer p.lock.Unlock()
// loop through all pods scraped via the prometheus annotation on the pods
for k, v := range p.kubernetesPods {
allURLs[k] = v
}
for _, service := range p.KubernetesServices {
URL, err := url.Parse(service)
if err != nil {
return nil, err
}
resolvedAddresses, err := net.LookupHost(URL.Hostname())
if err != nil {
p.Log.Errorf("Could not resolve %q, skipping it. Error: %s", URL.Host, err.Error())
continue
}
for _, resolved := range resolvedAddresses {
serviceURL := p.AddressToURL(URL, resolved)
allURLs[serviceURL.String()] = URLAndAddress{
URL: serviceURL,
Address: resolved,
OriginalURL: URL,
}
}
}
return allURLs, nil
}
// Reads stats from all configured servers accumulates stats.
// Returns one of the errors encountered while gather stats (if any).
func (p *Prometheus) Gather(acc telegraf.Accumulator) error {
if p.client == nil {
client, err := p.createHTTPClient()
if err != nil {
return err
}
p.client = client
}
var wg sync.WaitGroup
allURLs, err := p.GetAllURLs()
if err != nil {
return err
}
for _, URL := range allURLs {
wg.Add(1)
go func(serviceURL URLAndAddress) {
defer wg.Done()
acc.AddError(p.gatherURL(serviceURL, acc))
}(URL)
}
wg.Wait()
return nil
}
func (p *Prometheus) createHTTPClient() (*http.Client, error) {
tlsCfg, err := p.ClientConfig.TLSConfig()
if err != nil {
return nil, err
}
client := &http.Client{
Transport: &http.Transport{
TLSClientConfig: tlsCfg,
DisableKeepAlives: true,
},
Timeout: p.ResponseTimeout,
}
return client, nil
}
func (p *Prometheus) gatherURL(u URLAndAddress, acc telegraf.Accumulator) error {
var req *http.Request
var err error
var uClient *http.Client
var metrics []telegraf.Metric
if u.URL.Scheme == "unix" {
path := u.URL.Query().Get("path")
if path == "" {
path = "/metrics"
}
addr := "http://localhost" + path
req, err = http.NewRequest("GET", addr, nil)
if err != nil {
return fmt.Errorf("unable to create new request '%s': %s", addr, err)
}
// ignore error because it's been handled before getting here
tlsCfg, _ := p.ClientConfig.TLSConfig()
uClient = &http.Client{
Transport: &http.Transport{
TLSClientConfig: tlsCfg,
DisableKeepAlives: true,
Dial: func(network, addr string) (net.Conn, error) {
c, err := net.Dial("unix", u.URL.Path)
return c, err
},
},
Timeout: p.ResponseTimeout,
}
} else {
if u.URL.Path == "" {
u.URL.Path = "/metrics"
}
req, err = http.NewRequest("GET", u.URL.String(), nil)
if err != nil {
return fmt.Errorf("unable to create new request '%s': %s", u.URL.String(), err)
}
}
req.Header.Add("Accept", acceptHeader)
if p.BearerToken != "" {
token, err := ioutil.ReadFile(p.BearerToken)
if err != nil {
return err
}
req.Header.Set("Authorization", "Bearer "+string(token))
} else if p.BearerTokenString != "" {
req.Header.Set("Authorization", "Bearer "+p.BearerTokenString)
} else if p.Username != "" || p.Password != "" {
req.SetBasicAuth(p.Username, p.Password)
}
var resp *http.Response
if u.URL.Scheme != "unix" {
resp, err = p.client.Do(req)
} else {
resp, err = uClient.Do(req)
}
if err != nil {
return fmt.Errorf("error making HTTP request to %s: %s", u.URL, err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("%s returned HTTP status %s", u.URL, resp.Status)
}
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return fmt.Errorf("error reading body: %s", err)
}
if p.MetricVersion == 2 {
metrics, err = ParseV2(body, resp.Header)
} else {
metrics, err = Parse(body, resp.Header)
}
if err != nil {
return fmt.Errorf("error reading metrics for %s: %s",
u.URL, err)
}
for _, metric := range metrics {
tags := metric.Tags()
// strip user and password from URL
u.OriginalURL.User = nil
if p.URLTag != "" {
tags[p.URLTag] = u.OriginalURL.String()
}
if u.Address != "" {
tags["address"] = u.Address
}
for k, v := range u.Tags {
tags[k] = v
}
switch metric.Type() {
case telegraf.Counter:
acc.AddCounter(metric.Name(), metric.Fields(), tags, metric.Time())
case telegraf.Gauge:
acc.AddGauge(metric.Name(), metric.Fields(), tags, metric.Time())
case telegraf.Summary:
acc.AddSummary(metric.Name(), metric.Fields(), tags, metric.Time())
case telegraf.Histogram:
acc.AddHistogram(metric.Name(), metric.Fields(), tags, metric.Time())
default:
acc.AddFields(metric.Name(), metric.Fields(), tags, metric.Time())
}
}
return nil
}
// Start will start the Kubernetes scraping if enabled in the configuration
func (p *Prometheus) Start(a telegraf.Accumulator) error {
if p.MonitorPods {
var ctx context.Context
ctx, p.cancel = context.WithCancel(context.Background())
return p.start(ctx)
}
return nil
}
func (p *Prometheus) Stop() {
if p.MonitorPods {
p.cancel()
}
p.wg.Wait()
}
/*
func init() {
inputs.Add("prometheus", func() telegraf.Input {
return &Prometheus{
ResponseTimeout: internal.Duration{Duration: time.Second * 3},
kubernetesPods: map[string]URLAndAddress{},
URLTag: "url",
}
})
}
*/

View File

@ -1,236 +0,0 @@
package prometheus
import (
"fmt"
"math"
"net/http"
"net/http/httptest"
"net/url"
"testing"
"time"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
const sampleTextFormat = `# HELP go_gc_duration_seconds A summary of the GC invocation durations.
# TYPE go_gc_duration_seconds summary
go_gc_duration_seconds{quantile="0"} 0.00010425500000000001
go_gc_duration_seconds{quantile="0.25"} 0.000139108
go_gc_duration_seconds{quantile="0.5"} 0.00015749400000000002
go_gc_duration_seconds{quantile="0.75"} 0.000331463
go_gc_duration_seconds{quantile="1"} 0.000667154
go_gc_duration_seconds_sum 0.0018183950000000002
go_gc_duration_seconds_count 7
# HELP go_goroutines Number of goroutines that currently exist.
# TYPE go_goroutines gauge
go_goroutines 15
# HELP test_metric An untyped metric with a timestamp
# TYPE test_metric untyped
test_metric{label="value"} 1.0 1490802350000
`
const sampleSummaryTextFormat = `# HELP go_gc_duration_seconds A summary of the GC invocation durations.
# TYPE go_gc_duration_seconds summary
go_gc_duration_seconds{quantile="0"} 0.00010425500000000001
go_gc_duration_seconds{quantile="0.25"} 0.000139108
go_gc_duration_seconds{quantile="0.5"} 0.00015749400000000002
go_gc_duration_seconds{quantile="0.75"} 0.000331463
go_gc_duration_seconds{quantile="1"} 0.000667154
go_gc_duration_seconds_sum 0.0018183950000000002
go_gc_duration_seconds_count 7
`
const sampleGaugeTextFormat = `
# HELP go_goroutines Number of goroutines that currently exist.
# TYPE go_goroutines gauge
go_goroutines 15 1490802350000
`
func TestPrometheusGeneratesMetrics(t *testing.T) {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
fmt.Fprintln(w, sampleTextFormat)
}))
defer ts.Close()
p := &Prometheus{
Log: testutil.Logger{},
URLs: []string{ts.URL},
URLTag: "url",
}
var acc testutil.Accumulator
err := acc.GatherError(p.Gather)
require.NoError(t, err)
assert.True(t, acc.HasFloatField("go_gc_duration_seconds", "count"))
assert.True(t, acc.HasFloatField("go_goroutines", "gauge"))
assert.True(t, acc.HasFloatField("test_metric", "value"))
assert.True(t, acc.HasTimestamp("test_metric", time.Unix(1490802350, 0)))
assert.False(t, acc.HasTag("test_metric", "address"))
assert.True(t, acc.TagValue("test_metric", "url") == ts.URL+"/metrics")
}
func TestPrometheusGeneratesMetricsWithHostNameTag(t *testing.T) {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
fmt.Fprintln(w, sampleTextFormat)
}))
defer ts.Close()
p := &Prometheus{
Log: testutil.Logger{},
KubernetesServices: []string{ts.URL},
URLTag: "url",
}
u, _ := url.Parse(ts.URL)
tsAddress := u.Hostname()
var acc testutil.Accumulator
err := acc.GatherError(p.Gather)
require.NoError(t, err)
assert.True(t, acc.HasFloatField("go_gc_duration_seconds", "count"))
assert.True(t, acc.HasFloatField("go_goroutines", "gauge"))
assert.True(t, acc.HasFloatField("test_metric", "value"))
assert.True(t, acc.HasTimestamp("test_metric", time.Unix(1490802350, 0)))
assert.True(t, acc.TagValue("test_metric", "address") == tsAddress)
assert.True(t, acc.TagValue("test_metric", "url") == ts.URL)
}
func TestPrometheusGeneratesMetricsAlthoughFirstDNSFails(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
fmt.Fprintln(w, sampleTextFormat)
}))
defer ts.Close()
p := &Prometheus{
Log: testutil.Logger{},
URLs: []string{ts.URL},
KubernetesServices: []string{"http://random.telegraf.local:88/metrics"},
}
var acc testutil.Accumulator
err := acc.GatherError(p.Gather)
require.NoError(t, err)
assert.True(t, acc.HasFloatField("go_gc_duration_seconds", "count"))
assert.True(t, acc.HasFloatField("go_goroutines", "gauge"))
assert.True(t, acc.HasFloatField("test_metric", "value"))
assert.True(t, acc.HasTimestamp("test_metric", time.Unix(1490802350, 0)))
}
func TestPrometheusGeneratesSummaryMetricsV2(t *testing.T) {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
fmt.Fprintln(w, sampleSummaryTextFormat)
}))
defer ts.Close()
p := &Prometheus{
URLs: []string{ts.URL},
URLTag: "url",
MetricVersion: 2,
}
var acc testutil.Accumulator
err := acc.GatherError(p.Gather)
require.NoError(t, err)
assert.True(t, acc.TagSetValue("prometheus", "quantile") == "0")
assert.True(t, acc.HasFloatField("prometheus", "go_gc_duration_seconds_sum"))
assert.True(t, acc.HasFloatField("prometheus", "go_gc_duration_seconds_count"))
assert.True(t, acc.TagValue("prometheus", "url") == ts.URL+"/metrics")
}
func TestSummaryMayContainNaN(t *testing.T) {
const data = `# HELP go_gc_duration_seconds A summary of the GC invocation durations.
# TYPE go_gc_duration_seconds summary
go_gc_duration_seconds{quantile="0"} NaN
go_gc_duration_seconds{quantile="1"} NaN
go_gc_duration_seconds_sum 42.0
go_gc_duration_seconds_count 42
`
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
fmt.Fprintln(w, data)
}))
defer ts.Close()
p := &Prometheus{
URLs: []string{ts.URL},
URLTag: "",
MetricVersion: 2,
}
var acc testutil.Accumulator
err := p.Gather(&acc)
require.NoError(t, err)
expected := []telegraf.Metric{
testutil.MustMetric(
"prometheus",
map[string]string{
"quantile": "0",
},
map[string]interface{}{
"go_gc_duration_seconds": math.NaN(),
},
time.Unix(0, 0),
telegraf.Summary,
),
testutil.MustMetric(
"prometheus",
map[string]string{
"quantile": "1",
},
map[string]interface{}{
"go_gc_duration_seconds": math.NaN(),
},
time.Unix(0, 0),
telegraf.Summary,
),
testutil.MustMetric(
"prometheus",
map[string]string{},
map[string]interface{}{
"go_gc_duration_seconds_sum": 42.0,
"go_gc_duration_seconds_count": 42.0,
},
time.Unix(0, 0),
telegraf.Summary,
),
}
testutil.RequireMetricsEqual(t, expected, acc.GetTelegrafMetrics(),
testutil.IgnoreTime(), testutil.SortMetrics())
}
func TestPrometheusGeneratesGaugeMetricsV2(t *testing.T) {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
fmt.Fprintln(w, sampleGaugeTextFormat)
}))
defer ts.Close()
p := &Prometheus{
URLs: []string{ts.URL},
URLTag: "url",
MetricVersion: 2,
}
var acc testutil.Accumulator
err := acc.GatherError(p.Gather)
require.NoError(t, err)
assert.True(t, acc.HasFloatField("prometheus", "go_goroutines"))
assert.True(t, acc.TagValue("prometheus", "url") == ts.URL+"/metrics")
assert.True(t, acc.HasTimestamp("prometheus", time.Unix(1490802350, 0)))
}

View File

@ -2,6 +2,7 @@ package plugins
import (
"fmt"
"reflect"
"testing"
"github.com/didi/nightingale/src/common/dataobj"
@ -48,15 +49,21 @@ type telegrafPlugin interface {
TelegrafInput() (telegraf.Input, error)
}
func PluginTest(t *testing.T, plugin telegrafPlugin) {
metrics := []*dataobj.MetricValue{}
func PluginTest(t *testing.T, plugin telegrafPlugin) telegraf.Input {
input, err := plugin.TelegrafInput()
if err != nil {
t.Error(err)
}
acc, err := manager.NewAccumulator(manager.AccumulatorOptions{Name: "github-test", Metrics: &metrics})
PluginInputTest(t, input)
return input
}
func PluginInputTest(t *testing.T, input telegraf.Input) {
metrics := []*dataobj.MetricValue{}
acc, err := manager.NewAccumulator(manager.AccumulatorOptions{Name: "plugin-test", Metrics: &metrics})
if err != nil {
t.Error(err)
}
@ -69,3 +76,23 @@ func PluginTest(t *testing.T, plugin telegrafPlugin) {
t.Logf("%d %s %s %f", k, v.CounterType, v.PK(), v.Value)
}
}
func SetValue(in interface{}, value interface{}, fields ...string) error {
rv := reflect.Indirect(reflect.ValueOf(in))
for _, field := range fields {
if !rv.IsValid() {
return fmt.Errorf("invalid argument")
}
if rv.Kind() != reflect.Struct {
return fmt.Errorf("invalid argument, must be a struct")
}
rv = reflect.Indirect(rv.FieldByName(field))
}
if !rv.IsValid() || !rv.CanSet() {
return fmt.Errorf("invalid argument IsValid %v CanSet %v", rv.IsValid(), rv.CanSet())
}
rv.Set(reflect.Indirect(reflect.ValueOf(value)))
return nil
}