diff --git a/go.mod b/go.mod index 531631b5..365dbfdb 100644 --- a/go.mod +++ b/go.mod @@ -8,7 +8,9 @@ require ( github.com/codegangsta/negroni v1.0.0 github.com/coreos/go-oidc v2.2.1+incompatible github.com/dgryski/go-tsz v0.0.0-20180227144327-03b7d791f4fe + github.com/ericchiang/k8s v1.2.0 github.com/garyburd/redigo v1.6.2 + github.com/ghodss/yaml v1.0.1-0.20190212211648-25d852aebe32 github.com/gin-contrib/pprof v1.3.0 github.com/gin-gonic/gin v1.6.3 github.com/go-sql-driver/mysql v1.5.0 @@ -22,9 +24,12 @@ require ( github.com/m3db/m3 v0.15.17 github.com/mattn/go-isatty v0.0.12 github.com/mattn/go-sqlite3 v1.14.0 // indirect + github.com/matttproud/golang_protobuf_extensions v1.0.1 github.com/mojocn/base64Captcha v1.3.1 github.com/open-falcon/rrdlite v0.0.0-20200214140804-bf5829f786ad github.com/pquerna/cachecontrol v0.0.0-20200819021114-67c6ae64274f // indirect + github.com/prometheus/client_model v0.2.0 + github.com/prometheus/common v0.9.1 github.com/robfig/go-cache v0.0.0-20130306151617-9fc39e0dbf62 // indirect github.com/shirou/gopsutil v3.20.11+incompatible // indirect github.com/spaolacci/murmur3 v1.1.0 @@ -37,7 +42,6 @@ require ( go.uber.org/automaxprocs v1.3.0 // indirect golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d golang.org/x/text v0.3.3 - google.golang.org/protobuf v1.25.0 // indirect gopkg.in/alexcesaro/quotedprintable.v3 v3.0.0-20150716171945-2caba252f4dc // indirect gopkg.in/gomail.v2 v2.0.0-20160411212932-81ebce5c23df gopkg.in/ldap.v3 v3.1.0 diff --git a/go.sum b/go.sum index b6163a23..26d755b9 100644 --- a/go.sum +++ b/go.sum @@ -403,13 +403,6 @@ github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5y github.com/golang/protobuf v1.3.3/go.mod h1:vzj43D7+SQXF/4pzW/hwtAqwc6iTitCiVSaWz5lYuqw= github.com/golang/protobuf v1.3.5 h1:F768QJ1E9tib+q5Sc8MkdJi1RxLTbRcTf8LJV56aRls= github.com/golang/protobuf v1.3.5/go.mod h1:6O5/vntMXwX2lRkT1hjjk0nAC1IDOTvTlVgjlRvqsdk= -github.com/golang/protobuf v1.4.0-rc.1/go.mod h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8= -github.com/golang/protobuf v1.4.0-rc.1.0.20200221234624-67d41d38c208/go.mod h1:xKAWHe0F5eneWXFV3EuXVDTCmh+JuBKY0li0aMyXATA= -github.com/golang/protobuf v1.4.0-rc.2/go.mod h1:LlEzMj4AhA7rCAGe4KMBDvJI+AwstrUpVNzEA03Pprs= -github.com/golang/protobuf v1.4.0-rc.4.0.20200313231945-b860323f09d0/go.mod h1:WU3c8KckQ9AFe+yFwt9sWVRKCVIyN9cPHBJSNnbL67w= -github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvqG2KuDX0= -github.com/golang/protobuf v1.4.1 h1:ZFgWrT+bLgsYPirOnRfKLYJLvssAegOj/hgyMFdJZe0= -github.com/golang/protobuf v1.4.1/go.mod h1:U8fpvMrcmy5pZrNK1lt4xCsGvpyWQ/VVv6QDs8UjoX8= github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db h1:woRePGFeVFfLKN/pOkfl+p/TAqKOfFu+7KPlMVpok/w= github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4= @@ -422,7 +415,6 @@ github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5a github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= -github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.1 h1:JFrFEBb2xKufg6XkJsJr+WbKb4FQlURi5RUcBveYu9k= github.com/google/go-cmp v0.5.1/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.2 h1:X2ev0eStA3AbceY54o37/0PQ/UWqKEiiO2dKL5OPaFM= @@ -1318,8 +1310,6 @@ google.golang.org/genproto v0.0.0-20200212174721-66ed5ce911ce/go.mod h1:55QSHmfG google.golang.org/genproto v0.0.0-20200305110556-506484158171/go.mod h1:55QSHmfGQM9UVYDPBsyGGes0y52j32PQ3BqQfXhyH3c= google.golang.org/genproto v0.0.0-20200317114155-1f3552e48f24 h1:IGPykv426z7LZSVPlaPufOyphngM4at5uZ7x5alaFvE= google.golang.org/genproto v0.0.0-20200317114155-1f3552e48f24/go.mod h1:55QSHmfGQM9UVYDPBsyGGes0y52j32PQ3BqQfXhyH3c= -google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013 h1:+kGHl1aib/qcwaRi1CbqBZ1rk19r85MNUf8HaBghugY= -google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013/go.mod h1:NbSheEEYHJ7i3ixzK3sjbqSGDJWnxyFXZblF3eUsNvo= google.golang.org/grpc v1.8.0/go.mod h1:yo6s7OP7yaDglbqo1J04qKzAhqBH6lvTonzMVmEdcZw= google.golang.org/grpc v1.17.0/go.mod h1:6QZJwpn2B+Zp71q/5VxRsJ6NXXVCE5NRUHRo+f3cWCs= google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= @@ -1338,15 +1328,6 @@ google.golang.org/grpc v1.27.1/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8 google.golang.org/grpc v1.28.0/go.mod h1:rpkK4SK4GF4Ach/+MFLZUBavHOvF2JJB5uozKKal+60= google.golang.org/grpc v1.29.1 h1:EC2SB8S04d2r73uptxphDSUG+kTKVgjRPF+N3xpxRB4= google.golang.org/grpc v1.29.1/go.mod h1:itym6AZVZYACWQqET3MqgPpjcuV5QH3BxFS3IjizoKk= -google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= -google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= -google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM= -google.golang.org/protobuf v1.20.1-0.20200309200217-e05f789c0967/go.mod h1:A+miEFZTKqfCUM6K7xSMQL9OKL/b6hQv+e19PK+JZNE= -google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzikPIcrTAo= -google.golang.org/protobuf v1.22.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= -google.golang.org/protobuf v1.23.1-0.20200526195155-81db48ad09cc/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= -google.golang.org/protobuf v1.25.0 h1:Ejskq+SyPohKW+1uil0JJMtmHCgJPJ/qWTxr8qp+R4c= -google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c= gopkg.in/airbrake/gobrake.v2 v2.0.9/go.mod h1:/h5ZAUhDkGaJfjzjKLSjv6zCL6O0LLBxU4K+aSYdM/U= gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= gopkg.in/alexcesaro/quotedprintable.v3 v3.0.0-20150716171945-2caba252f4dc h1:2gGKlE2+asNV9m7xrywl36YYNnBG5ZQ0r/BOOxqPpmk= diff --git a/src/common/address/address.go b/src/common/address/address.go index 8d250818..73606c73 100644 --- a/src/common/address/address.go +++ b/src/common/address/address.go @@ -46,7 +46,7 @@ func convPort(module, listen, portType string) int { } port, err := strconv.Atoi(strings.Split(listen, splitChar)[1]) if err != nil { - fmt.Printf("%s.%s invalid", module, portType) + fmt.Printf("address: %s.%s invalid", module, portType) os.Exit(1) } @@ -101,7 +101,7 @@ func getMod(modKey string) Module { mod, has := mods[modKey] if !has { - fmt.Printf("module(%s) configuration section not found", modKey) + fmt.Printf("address: module(%s) configuration section not found", modKey) os.Exit(1) } diff --git a/src/modules/monapi/plugins/.gitignore b/src/modules/monapi/plugins/.gitignore deleted file mode 100644 index 6dd5e90c..00000000 --- a/src/modules/monapi/plugins/.gitignore +++ /dev/null @@ -1 +0,0 @@ -prometheus/ diff --git a/src/modules/monapi/plugins/all/all.go b/src/modules/monapi/plugins/all/all.go index b3f1bca7..c52709ee 100644 --- a/src/modules/monapi/plugins/all/all.go +++ b/src/modules/monapi/plugins/all/all.go @@ -7,6 +7,7 @@ import ( _ "github.com/didi/nightingale/src/modules/monapi/plugins/github" _ "github.com/didi/nightingale/src/modules/monapi/plugins/mongodb" _ "github.com/didi/nightingale/src/modules/monapi/plugins/mysql" + _ "github.com/didi/nightingale/src/modules/monapi/plugins/prometheus" _ "github.com/didi/nightingale/src/modules/monapi/plugins/redis" _ "github.com/didi/nightingale/src/modules/monapi/plugins/nginx" diff --git a/src/modules/monapi/plugins/github/github_test.go b/src/modules/monapi/plugins/github/github_test.go new file mode 100644 index 00000000..0a04a64e --- /dev/null +++ b/src/modules/monapi/plugins/github/github_test.go @@ -0,0 +1,13 @@ +package github + +import ( + "testing" + + "github.com/didi/nightingale/src/modules/monapi/plugins" +) + +func TestCollect(t *testing.T) { + plugins.PluginTest(t, &GitHubRule{ + Repositories: []string{"didi/nightingale"}, + }) +} diff --git a/src/modules/monapi/plugins/prometheus/prometheus.go b/src/modules/monapi/plugins/prometheus/prometheus.go new file mode 100644 index 00000000..75526f55 --- /dev/null +++ b/src/modules/monapi/plugins/prometheus/prometheus.go @@ -0,0 +1,94 @@ +package prometheus + +import ( + "fmt" + "time" + + "github.com/didi/nightingale/src/modules/monapi/collector" + "github.com/didi/nightingale/src/modules/monapi/plugins" + "github.com/didi/nightingale/src/modules/monapi/plugins/prometheus/prometheus" + "github.com/didi/nightingale/src/toolkits/i18n" + "github.com/influxdata/telegraf" +) + +func init() { + collector.CollectorRegister(NewPrometheusCollector()) // for monapi + i18n.DictRegister(langDict) +} + +var ( + langDict = map[string]map[string]string{ + "zh": map[string]string{ + "URLs": "网址", + "An array of urls to scrape metrics from": "采集数据的网址", + "URL Tag": "网址标签", + "Url tag name (tag containing scrapped url. optional, default is \"url\")": "url 标签名称,默认值 \"url\"", + "An array of Kubernetes services to scrape metrics from": "采集kube服务的地址", + "Kubernetes config file contenct to create client from": "kube config 文件内容,用来连接kube服务", + "Use bearer token for authorization. ('bearer_token' takes priority)": "用户的Bearer令牌,优先级高于 username/password", + "HTTP Basic Authentication username": "HTTP认证用户名", + "HTTP Basic Authentication password": "HTTP认证密码", + "RESP Timeout": "请求超时时间", + "Specify timeout duration for slower prometheus clients": "k8s请求超时时间, 单位: 秒", + }, + } +) + +type PrometheusCollector struct { + *collector.BaseCollector +} + +func NewPrometheusCollector() *PrometheusCollector { + return &PrometheusCollector{BaseCollector: collector.NewBaseCollector( + "prometheus", + collector.RemoteCategory, + func() collector.TelegrafPlugin { return &PrometheusRule{} }, + )} +} + +type PrometheusRule struct { + URLs []string `label:"URLs" json:"urls,required" description:"An array of urls to scrape metrics from" example:"http://my-service-exporter:8080/metrics"` + // URLTag string `label:"URL Tag" json:"url_tag" description:"Url tag name (tag containing scrapped url. optional, default is \"url\")" example:"scrapeUrl"` + // KubernetesServices []string `label:"Kube Services" json:"kubernetes_services" description:"An array of Kubernetes services to scrape metrics from" example:"http://my-service-dns.my-namespace:9100/metrics"` + // KubeConfigContent string `label:"Kube Conf" json:"kube_config_content" format:"file" description:"Kubernetes config file contenct to create client from"` + // MonitorPods bool `label:"Monitor Pods" json:"monitor_kubernetes_pods" description:"Scrape Kubernetes pods for the following prometheus annotations:
- prometheus.io/scrape: Enable scraping for this pod
- prometheus.io/scheme: If the metrics endpoint is secured then you will need to
set this to 'https' & most likely set the tls config.
- prometheus.io/path: If the metrics path is not /metrics, define it with this annotation.
- prometheus.io/port: If port is not 9102 use this annotation"` + // PodNamespace string `label:"Pod Namespace" json:"monitor_kubernetes_pods_namespace" description:"Restricts Kubernetes monitoring to a single namespace" example:"default"` + // KubernetesLabelSelector string `label:"Kube Label Selector" json:"kubernetes_label_selector" description:"label selector to target pods which have the label" example:"env=dev,app=nginx"` + // KubernetesFieldSelector string `label:"Kube Field Selector" json:"kubernetes_field_selector" description:"field selector to target pods
eg. To scrape pods on a specific node" example:"spec.nodeName=$HOSTNAME"` + // BearerTokenString string `label:"Bearer Token" json:"bearer_token_string" format:"file" description:"Use bearer token for authorization. ('bearer_token' takes priority)"` + // Username string `label:"Username" json:"username" description:"HTTP Basic Authentication username"` + // Password string `label:"Password" json:"password" format:"password" description:"HTTP Basic Authentication password"` + ResponseTimeout int `label:"RESP Timeout" json:"response_timeout" default:"3" description:"Specify timeout duration for slower prometheus clients"` + plugins.ClientConfig +} + +func (p *PrometheusRule) Validate() error { + if len(p.URLs) == 0 || p.URLs[0] == "" { + return fmt.Errorf(" prometheus.rule unable to get urls") + } + return nil +} + +func (p *PrometheusRule) TelegrafInput() (telegraf.Input, error) { + if err := p.Validate(); err != nil { + return nil, err + } + + return &prometheus.Prometheus{ + URLs: p.URLs, + URLTag: "target", + // KubernetesServices: p.KubernetesServices, + // KubeConfigContent: p.KubeConfigContent, + // MonitorPods: p.MonitorPods, + // PodNamespace: p.PodNamespace, + // KubernetesLabelSelector: p.KubernetesLabelSelector, + // KubernetesFieldSelector: p.KubernetesFieldSelector, + // BearerTokenString: p.BearerTokenString, + // Username: p.Username, + // Password: p.Password, + ResponseTimeout: time.Second * time.Duration(p.ResponseTimeout), + MetricVersion: 2, + Log: plugins.GetLogger(), + ClientConfig: p.ClientConfig.TlsClientConfig(), + }, nil +} diff --git a/src/modules/monapi/plugins/prometheus/prometheus/README.md b/src/modules/monapi/plugins/prometheus/prometheus/README.md new file mode 100644 index 00000000..e9dd119c --- /dev/null +++ b/src/modules/monapi/plugins/prometheus/prometheus/README.md @@ -0,0 +1,171 @@ +# Prometheus Input Plugin + +The prometheus input plugin gathers metrics from HTTP servers exposing metrics +in Prometheus format. + +### Configuration: + +```toml +# Read metrics from one or many prometheus clients +[[inputs.prometheus]] + ## An array of urls to scrape metrics from. + urls = ["http://localhost:9100/metrics"] + + ## Metric version controls the mapping from Prometheus metrics into + ## Telegraf metrics. When using the prometheus_client output, use the same + ## value in both plugins to ensure metrics are round-tripped without + ## modification. + ## + ## example: metric_version = 1; deprecated in 1.13 + ## metric_version = 2; recommended version + # metric_version = 1 + + ## An array of Kubernetes services to scrape metrics from. + # kubernetes_services = ["http://my-service-dns.my-namespace:9100/metrics"] + + ## Kubernetes config file to create client from. + # kube_config = "/path/to/kubernetes.config" + + ## Scrape Kubernetes pods for the following prometheus annotations: + ## - prometheus.io/scrape: Enable scraping for this pod + ## - prometheus.io/scheme: If the metrics endpoint is secured then you will need to + ## set this to `https` & most likely set the tls config. + ## - prometheus.io/path: If the metrics path is not /metrics, define it with this annotation. + ## - prometheus.io/port: If port is not 9102 use this annotation + # monitor_kubernetes_pods = true + ## Restricts Kubernetes monitoring to a single namespace + ## ex: monitor_kubernetes_pods_namespace = "default" + # monitor_kubernetes_pods_namespace = "" + # label selector to target pods which have the label + # kubernetes_label_selector = "env=dev,app=nginx" + # field selector to target pods + # eg. To scrape pods on a specific node + # kubernetes_field_selector = "spec.nodeName=$HOSTNAME" + + ## Use bearer token for authorization. ('bearer_token' takes priority) + # bearer_token = "/path/to/bearer/token" + ## OR + # bearer_token_string = "abc_123" + + ## HTTP Basic Authentication username and password. ('bearer_token' and + ## 'bearer_token_string' take priority) + # username = "" + # password = "" + + ## Specify timeout duration for slower prometheus clients (default is 3s) + # response_timeout = "3s" + + ## Optional TLS Config + # tls_ca = /path/to/cafile + # tls_cert = /path/to/certfile + # tls_key = /path/to/keyfile + ## Use TLS but skip chain & host verification + # insecure_skip_verify = false +``` + +`urls` can contain a unix socket as well. If a different path is required (default is `/metrics` for both http[s] and unix) for a unix socket, add `path` as a query parameter as follows: `unix:///var/run/prometheus.sock?path=/custom/metrics` + +#### Kubernetes Service Discovery + +URLs listed in the `kubernetes_services` parameter will be expanded +by looking up all A records assigned to the hostname as described in +[Kubernetes DNS service discovery](https://kubernetes.io/docs/concepts/services-networking/service/#dns). + +This method can be used to locate all +[Kubernetes headless services](https://kubernetes.io/docs/concepts/services-networking/service/#headless-services). + +#### Kubernetes scraping + +Enabling this option will allow the plugin to scrape for prometheus annotation on Kubernetes +pods. Currently, you can run this plugin in your kubernetes cluster, or we use the kubeconfig +file to determine where to monitor. +Currently the following annotation are supported: + +* `prometheus.io/scrape` Enable scraping for this pod. +* `prometheus.io/scheme` If the metrics endpoint is secured then you will need to set this to `https` & most likely set the tls config. (default 'http') +* `prometheus.io/path` Override the path for the metrics endpoint on the service. (default '/metrics') +* `prometheus.io/port` Used to override the port. (default 9102) + +Using the `monitor_kubernetes_pods_namespace` option allows you to limit which pods you are scraping. + +#### Bearer Token + +If set, the file specified by the `bearer_token` parameter will be read on +each interval and its contents will be appended to the Bearer string in the +Authorization header. + +### Usage for Caddy HTTP server + +If you want to monitor Caddy, you need to use Caddy with its Prometheus plugin: + +* Download Caddy+Prometheus plugin [here](https://caddyserver.com/download/linux/amd64?plugins=http.prometheus) +* Add the `prometheus` directive in your `CaddyFile` +* Restart Caddy +* Configure Telegraf to fetch metrics on it: + +```toml +[[inputs.prometheus]] +# ## An array of urls to scrape metrics from. + urls = ["http://localhost:9180/metrics"] +``` + +> This is the default URL where Caddy Prometheus plugin will send data. +> For more details, please read the [Caddy Prometheus documentation](https://github.com/miekg/caddy-prometheus/blob/master/README.md). + +### Metrics: + +Measurement names are based on the Metric Family and tags are created for each +label. The value is added to a field named based on the metric type. + +All metrics receive the `url` tag indicating the related URL specified in the +Telegraf configuration. If using Kubernetes service discovery the `address` +tag is also added indicating the discovered ip address. + +### Example Output: + +**Source** +``` +# HELP go_gc_duration_seconds A summary of the GC invocation durations. +# TYPE go_gc_duration_seconds summary +go_gc_duration_seconds{quantile="0"} 7.4545e-05 +go_gc_duration_seconds{quantile="0.25"} 7.6999e-05 +go_gc_duration_seconds{quantile="0.5"} 0.000277935 +go_gc_duration_seconds{quantile="0.75"} 0.000706591 +go_gc_duration_seconds{quantile="1"} 0.000706591 +go_gc_duration_seconds_sum 0.00113607 +go_gc_duration_seconds_count 4 +# HELP go_goroutines Number of goroutines that currently exist. +# TYPE go_goroutines gauge +go_goroutines 15 +# HELP cpu_usage_user Telegraf collected metric +# TYPE cpu_usage_user gauge +cpu_usage_user{cpu="cpu0"} 1.4112903225816156 +cpu_usage_user{cpu="cpu1"} 0.702106318955865 +cpu_usage_user{cpu="cpu2"} 2.0161290322588776 +cpu_usage_user{cpu="cpu3"} 1.5045135406226022 +``` + +**Output** +``` +go_gc_duration_seconds,url=http://example.org:9273/metrics 1=0.001336611,count=14,sum=0.004527551,0=0.000057965,0.25=0.000083812,0.5=0.000286537,0.75=0.000365303 1505776733000000000 +go_goroutines,url=http://example.org:9273/metrics gauge=21 1505776695000000000 +cpu_usage_user,cpu=cpu0,url=http://example.org:9273/metrics gauge=1.513622603430151 1505776751000000000 +cpu_usage_user,cpu=cpu1,url=http://example.org:9273/metrics gauge=5.829145728641773 1505776751000000000 +cpu_usage_user,cpu=cpu2,url=http://example.org:9273/metrics gauge=2.119071644805144 1505776751000000000 +cpu_usage_user,cpu=cpu3,url=http://example.org:9273/metrics gauge=1.5228426395944945 1505776751000000000 +``` + +**Output (when metric_version = 2)** +``` +prometheus,quantile=1,url=http://example.org:9273/metrics go_gc_duration_seconds=0.005574303 1556075100000000000 +prometheus,quantile=0.75,url=http://example.org:9273/metrics go_gc_duration_seconds=0.0001046 1556075100000000000 +prometheus,quantile=0.5,url=http://example.org:9273/metrics go_gc_duration_seconds=0.0000719 1556075100000000000 +prometheus,quantile=0.25,url=http://example.org:9273/metrics go_gc_duration_seconds=0.0000579 1556075100000000000 +prometheus,quantile=0,url=http://example.org:9273/metrics go_gc_duration_seconds=0.0000349 1556075100000000000 +prometheus,url=http://example.org:9273/metrics go_gc_duration_seconds_count=324,go_gc_duration_seconds_sum=0.091340353 1556075100000000000 +prometheus,url=http://example.org:9273/metrics go_goroutines=15 1556075100000000000 +prometheus,cpu=cpu0,url=http://example.org:9273/metrics cpu_usage_user=1.513622603430151 1505776751000000000 +prometheus,cpu=cpu1,url=http://example.org:9273/metrics cpu_usage_user=5.829145728641773 1505776751000000000 +prometheus,cpu=cpu2,url=http://example.org:9273/metrics cpu_usage_user=2.119071644805144 1505776751000000000 +prometheus,cpu=cpu3,url=http://example.org:9273/metrics cpu_usage_user=1.5228426395944945 1505776751000000000 +``` diff --git a/src/modules/monapi/plugins/prometheus/prometheus/kubernetes.go b/src/modules/monapi/plugins/prometheus/prometheus/kubernetes.go new file mode 100644 index 00000000..93f6c7e4 --- /dev/null +++ b/src/modules/monapi/plugins/prometheus/prometheus/kubernetes.go @@ -0,0 +1,237 @@ +package prometheus + +import ( + "context" + "log" + "net" + "net/url" + "sync" + "time" + + "github.com/ericchiang/k8s" + corev1 "github.com/ericchiang/k8s/apis/core/v1" + "github.com/ghodss/yaml" +) + +type payload struct { + eventype string + pod *corev1.Pod +} + +// loadClient parses a kubeconfig from a file and returns a Kubernetes +// client. It does not support extensions or client auth providers. +func loadClient(kubeconfig string) (*k8s.Client, error) { + // data, err := ioutil.ReadFile(kubeconfigPath) + // if err != nil { + // return nil, fmt.Errorf("failed reading '%s': %v", kubeconfigPath, err) + // } + + // Unmarshal YAML into a Kubernetes config object. + var config k8s.Config + if err := yaml.Unmarshal([]byte(kubeconfig), &config); err != nil { + return nil, err + } + return k8s.NewClient(&config) +} + +func (p *Prometheus) start(ctx context.Context) error { + client, err := k8s.NewInClusterClient() + if err != nil { + // u, err := user.Current() + // if err != nil { + // return fmt.Errorf("Failed to get current user - %v", err) + // } + + // configLocation := filepath.Join(u.HomeDir, ".kube/config") + // if p.KubeConfig != "" { + // configLocation = p.KubeConfig + // } + client, err = loadClient(p.KubeConfigContent) + if err != nil { + return err + } + } + + p.wg = sync.WaitGroup{} + + p.wg.Add(1) + go func() { + defer p.wg.Done() + for { + select { + case <-ctx.Done(): + return + case <-time.After(time.Second): + err := p.watch(ctx, client) + if err != nil { + p.Log.Errorf("Unable to watch resources: %s", err.Error()) + } + } + } + }() + + return nil +} + +// An edge case exists if a pod goes offline at the same time a new pod is created +// (without the scrape annotations). K8s may re-assign the old pod ip to the non-scrape +// pod, causing errors in the logs. This is only true if the pod going offline is not +// directed to do so by K8s. +func (p *Prometheus) watch(ctx context.Context, client *k8s.Client) error { + + selectors := podSelector(p) + + pod := &corev1.Pod{} + watcher, err := client.Watch(ctx, p.PodNamespace, &corev1.Pod{}, selectors...) + if err != nil { + return err + } + defer watcher.Close() + + for { + select { + case <-ctx.Done(): + return nil + default: + pod = &corev1.Pod{} + // An error here means we need to reconnect the watcher. + eventType, err := watcher.Next(pod) + if err != nil { + return err + } + + // If the pod is not "ready", there will be no ip associated with it. + if pod.GetMetadata().GetAnnotations()["prometheus.io/scrape"] != "true" || + !podReady(pod.Status.GetContainerStatuses()) { + continue + } + + switch eventType { + case k8s.EventAdded: + registerPod(pod, p) + case k8s.EventModified: + // To avoid multiple actions for each event, unregister on the first event + // in the delete sequence, when the containers are still "ready". + if pod.Metadata.GetDeletionTimestamp() != nil { + unregisterPod(pod, p) + } else { + registerPod(pod, p) + } + } + } + } +} + +func podReady(statuss []*corev1.ContainerStatus) bool { + if len(statuss) == 0 { + return false + } + for _, cs := range statuss { + if !cs.GetReady() { + return false + } + } + return true +} + +func podSelector(p *Prometheus) []k8s.Option { + options := []k8s.Option{} + + if len(p.KubernetesLabelSelector) > 0 { + options = append(options, k8s.QueryParam("labelSelector", p.KubernetesLabelSelector)) + } + + if len(p.KubernetesFieldSelector) > 0 { + options = append(options, k8s.QueryParam("fieldSelector", p.KubernetesFieldSelector)) + } + + return options + +} + +func registerPod(pod *corev1.Pod, p *Prometheus) { + if p.kubernetesPods == nil { + p.kubernetesPods = map[string]URLAndAddress{} + } + targetURL := getScrapeURL(pod) + if targetURL == nil { + return + } + + log.Printf("D! [inputs.prometheus] will scrape metrics from %q", *targetURL) + // add annotation as metrics tags + tags := pod.GetMetadata().GetAnnotations() + if tags == nil { + tags = map[string]string{} + } + tags["pod_name"] = pod.GetMetadata().GetName() + tags["namespace"] = pod.GetMetadata().GetNamespace() + // add labels as metrics tags + for k, v := range pod.GetMetadata().GetLabels() { + tags[k] = v + } + URL, err := url.Parse(*targetURL) + if err != nil { + log.Printf("E! [inputs.prometheus] could not parse URL %q: %s", *targetURL, err.Error()) + return + } + podURL := p.AddressToURL(URL, URL.Hostname()) + p.lock.Lock() + p.kubernetesPods[podURL.String()] = URLAndAddress{ + URL: podURL, + Address: URL.Hostname(), + OriginalURL: URL, + Tags: tags, + } + p.lock.Unlock() +} + +func getScrapeURL(pod *corev1.Pod) *string { + ip := pod.Status.GetPodIP() + if ip == "" { + // return as if scrape was disabled, we will be notified again once the pod + // has an IP + return nil + } + + scheme := pod.GetMetadata().GetAnnotations()["prometheus.io/scheme"] + path := pod.GetMetadata().GetAnnotations()["prometheus.io/path"] + port := pod.GetMetadata().GetAnnotations()["prometheus.io/port"] + + if scheme == "" { + scheme = "http" + } + if port == "" { + port = "9102" + } + if path == "" { + path = "/metrics" + } + + u := &url.URL{ + Scheme: scheme, + Host: net.JoinHostPort(ip, port), + Path: path, + } + + x := u.String() + + return &x +} + +func unregisterPod(pod *corev1.Pod, p *Prometheus) { + url := getScrapeURL(pod) + if url == nil { + return + } + + log.Printf("D! [inputs.prometheus] registered a delete request for %q in namespace %q", + pod.GetMetadata().GetName(), pod.GetMetadata().GetNamespace()) + + p.lock.Lock() + defer p.lock.Unlock() + if _, ok := p.kubernetesPods[*url]; ok { + delete(p.kubernetesPods, *url) + log.Printf("D! [inputs.prometheus] will stop scraping for %q", *url) + } +} diff --git a/src/modules/monapi/plugins/prometheus/prometheus/kubernetes_test.go b/src/modules/monapi/plugins/prometheus/prometheus/kubernetes_test.go new file mode 100644 index 00000000..8568ac94 --- /dev/null +++ b/src/modules/monapi/plugins/prometheus/prometheus/kubernetes_test.go @@ -0,0 +1,155 @@ +package prometheus + +import ( + "github.com/ericchiang/k8s" + "testing" + + "github.com/influxdata/telegraf/testutil" + "github.com/stretchr/testify/assert" + + v1 "github.com/ericchiang/k8s/apis/core/v1" + metav1 "github.com/ericchiang/k8s/apis/meta/v1" +) + +func TestScrapeURLNoAnnotations(t *testing.T) { + p := &v1.Pod{Metadata: &metav1.ObjectMeta{}} + p.GetMetadata().Annotations = map[string]string{} + url := getScrapeURL(p) + assert.Nil(t, url) +} + +func TestScrapeURLAnnotationsNoScrape(t *testing.T) { + p := &v1.Pod{Metadata: &metav1.ObjectMeta{}} + p.Metadata.Name = str("myPod") + p.Metadata.Annotations = map[string]string{"prometheus.io/scrape": "false"} + url := getScrapeURL(p) + assert.Nil(t, url) +} + +func TestScrapeURLAnnotations(t *testing.T) { + p := pod() + p.Metadata.Annotations = map[string]string{"prometheus.io/scrape": "true"} + url := getScrapeURL(p) + assert.Equal(t, "http://127.0.0.1:9102/metrics", *url) +} + +func TestScrapeURLAnnotationsCustomPort(t *testing.T) { + p := pod() + p.Metadata.Annotations = map[string]string{"prometheus.io/scrape": "true", "prometheus.io/port": "9000"} + url := getScrapeURL(p) + assert.Equal(t, "http://127.0.0.1:9000/metrics", *url) +} + +func TestScrapeURLAnnotationsCustomPath(t *testing.T) { + p := pod() + p.Metadata.Annotations = map[string]string{"prometheus.io/scrape": "true", "prometheus.io/path": "mymetrics"} + url := getScrapeURL(p) + assert.Equal(t, "http://127.0.0.1:9102/mymetrics", *url) +} + +func TestScrapeURLAnnotationsCustomPathWithSep(t *testing.T) { + p := pod() + p.Metadata.Annotations = map[string]string{"prometheus.io/scrape": "true", "prometheus.io/path": "/mymetrics"} + url := getScrapeURL(p) + assert.Equal(t, "http://127.0.0.1:9102/mymetrics", *url) +} + +func TestAddPod(t *testing.T) { + prom := &Prometheus{Log: testutil.Logger{}} + + p := pod() + p.Metadata.Annotations = map[string]string{"prometheus.io/scrape": "true"} + registerPod(p, prom) + assert.Equal(t, 1, len(prom.kubernetesPods)) +} + +func TestAddMultipleDuplicatePods(t *testing.T) { + prom := &Prometheus{Log: testutil.Logger{}} + + p := pod() + p.Metadata.Annotations = map[string]string{"prometheus.io/scrape": "true"} + registerPod(p, prom) + p.Metadata.Name = str("Pod2") + registerPod(p, prom) + assert.Equal(t, 1, len(prom.kubernetesPods)) +} + +func TestAddMultiplePods(t *testing.T) { + prom := &Prometheus{Log: testutil.Logger{}} + + p := pod() + p.Metadata.Annotations = map[string]string{"prometheus.io/scrape": "true"} + registerPod(p, prom) + p.Metadata.Name = str("Pod2") + p.Status.PodIP = str("127.0.0.2") + registerPod(p, prom) + assert.Equal(t, 2, len(prom.kubernetesPods)) +} + +func TestDeletePods(t *testing.T) { + prom := &Prometheus{Log: testutil.Logger{}} + + p := pod() + p.Metadata.Annotations = map[string]string{"prometheus.io/scrape": "true"} + registerPod(p, prom) + unregisterPod(p, prom) + assert.Equal(t, 0, len(prom.kubernetesPods)) +} + +func TestPodSelector(t *testing.T) { + + cases := []struct { + expected []k8s.Option + labelselector string + fieldselector string + }{ + { + expected: []k8s.Option{ + k8s.QueryParam("labelSelector", "key1=val1,key2=val2,key3"), + k8s.QueryParam("fieldSelector", "spec.nodeName=ip-1-2-3-4.acme.com"), + }, + labelselector: "key1=val1,key2=val2,key3", + fieldselector: "spec.nodeName=ip-1-2-3-4.acme.com", + }, + { + expected: []k8s.Option{ + k8s.QueryParam("labelSelector", "key1"), + k8s.QueryParam("fieldSelector", "spec.nodeName=ip-1-2-3-4.acme.com"), + }, + labelselector: "key1", + fieldselector: "spec.nodeName=ip-1-2-3-4.acme.com", + }, + { + expected: []k8s.Option{ + k8s.QueryParam("labelSelector", "key1"), + k8s.QueryParam("fieldSelector", "somefield"), + }, + labelselector: "key1", + fieldselector: "somefield", + }, + } + + for _, c := range cases { + prom := &Prometheus{ + Log: testutil.Logger{}, + KubernetesLabelSelector: c.labelselector, + KubernetesFieldSelector: c.fieldselector, + } + + output := podSelector(prom) + + assert.Equal(t, len(output), len(c.expected)) + } +} + +func pod() *v1.Pod { + p := &v1.Pod{Metadata: &metav1.ObjectMeta{}, Status: &v1.PodStatus{}, Spec: &v1.PodSpec{}} + p.Status.PodIP = str("127.0.0.1") + p.Metadata.Name = str("myPod") + p.Metadata.Namespace = str("default") + return p +} + +func str(x string) *string { + return &x +} diff --git a/src/modules/monapi/plugins/prometheus/prometheus/parser.go b/src/modules/monapi/plugins/prometheus/prometheus/parser.go new file mode 100644 index 00000000..0726c877 --- /dev/null +++ b/src/modules/monapi/plugins/prometheus/prometheus/parser.go @@ -0,0 +1,320 @@ +package prometheus + +// Parser inspired from +// https://github.com/prometheus/prom2json/blob/master/main.go + +import ( + "bufio" + "bytes" + "fmt" + "io" + "math" + "mime" + "net/http" + "time" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/metric" + "github.com/matttproud/golang_protobuf_extensions/pbutil" + dto "github.com/prometheus/client_model/go" + "github.com/prometheus/common/expfmt" +) + +// Parse returns a slice of Metrics from a text representation of a +// metrics +func ParseV2(buf []byte, header http.Header) ([]telegraf.Metric, error) { + var metrics []telegraf.Metric + var parser expfmt.TextParser + // parse even if the buffer begins with a newline + buf = bytes.TrimPrefix(buf, []byte("\n")) + // Read raw data + buffer := bytes.NewBuffer(buf) + reader := bufio.NewReader(buffer) + + mediatype, params, err := mime.ParseMediaType(header.Get("Content-Type")) + // Prepare output + metricFamilies := make(map[string]*dto.MetricFamily) + + if err == nil && mediatype == "application/vnd.google.protobuf" && + params["encoding"] == "delimited" && + params["proto"] == "io.prometheus.client.MetricFamily" { + for { + mf := &dto.MetricFamily{} + if _, ierr := pbutil.ReadDelimited(reader, mf); ierr != nil { + if ierr == io.EOF { + break + } + return nil, fmt.Errorf("reading metric family protocol buffer failed: %s", ierr) + } + metricFamilies[mf.GetName()] = mf + } + } else { + metricFamilies, err = parser.TextToMetricFamilies(reader) + if err != nil { + return nil, fmt.Errorf("reading text format failed: %s", err) + } + } + + // make sure all metrics have a consistent timestamp so that metrics don't straddle two different seconds + now := time.Now() + // read metrics + for metricName, mf := range metricFamilies { + for _, m := range mf.Metric { + // reading tags + tags := makeLabels(m) + + if mf.GetType() == dto.MetricType_SUMMARY { + // summary metric + telegrafMetrics := makeQuantilesV2(m, tags, metricName, mf.GetType(), now) + metrics = append(metrics, telegrafMetrics...) + } else if mf.GetType() == dto.MetricType_HISTOGRAM { + // histogram metric + telegrafMetrics := makeBucketsV2(m, tags, metricName, mf.GetType(), now) + metrics = append(metrics, telegrafMetrics...) + } else { + // standard metric + // reading fields + fields := getNameAndValueV2(m, metricName) + // converting to telegraf metric + if len(fields) > 0 { + var t time.Time + if m.TimestampMs != nil && *m.TimestampMs > 0 { + t = time.Unix(0, *m.TimestampMs*1000000) + } else { + t = now + } + metric, err := metric.New("prometheus", tags, fields, t, valueType(mf.GetType())) + if err == nil { + metrics = append(metrics, metric) + } + } + } + } + } + + return metrics, err +} + +// Get Quantiles for summary metric & Buckets for histogram +func makeQuantilesV2(m *dto.Metric, tags map[string]string, metricName string, metricType dto.MetricType, now time.Time) []telegraf.Metric { + var metrics []telegraf.Metric + fields := make(map[string]interface{}) + var t time.Time + if m.TimestampMs != nil && *m.TimestampMs > 0 { + t = time.Unix(0, *m.TimestampMs*1000000) + } else { + t = now + } + fields[metricName+"_count"] = float64(m.GetSummary().GetSampleCount()) + fields[metricName+"_sum"] = float64(m.GetSummary().GetSampleSum()) + met, err := metric.New("prometheus", tags, fields, t, valueType(metricType)) + if err == nil { + metrics = append(metrics, met) + } + + for _, q := range m.GetSummary().Quantile { + newTags := tags + fields = make(map[string]interface{}) + + newTags["quantile"] = fmt.Sprint(q.GetQuantile()) + fields[metricName] = float64(q.GetValue()) + + quantileMetric, err := metric.New("prometheus", newTags, fields, t, valueType(metricType)) + if err == nil { + metrics = append(metrics, quantileMetric) + } + } + return metrics +} + +// Get Buckets from histogram metric +func makeBucketsV2(m *dto.Metric, tags map[string]string, metricName string, metricType dto.MetricType, now time.Time) []telegraf.Metric { + var metrics []telegraf.Metric + fields := make(map[string]interface{}) + var t time.Time + if m.TimestampMs != nil && *m.TimestampMs > 0 { + t = time.Unix(0, *m.TimestampMs*1000000) + } else { + t = now + } + fields[metricName+"_count"] = float64(m.GetHistogram().GetSampleCount()) + fields[metricName+"_sum"] = float64(m.GetHistogram().GetSampleSum()) + + met, err := metric.New("prometheus", tags, fields, t, valueType(metricType)) + if err == nil { + metrics = append(metrics, met) + } + + for _, b := range m.GetHistogram().Bucket { + newTags := tags + fields = make(map[string]interface{}) + newTags["le"] = fmt.Sprint(b.GetUpperBound()) + fields[metricName+"_bucket"] = float64(b.GetCumulativeCount()) + + histogramMetric, err := metric.New("prometheus", newTags, fields, t, valueType(metricType)) + if err == nil { + metrics = append(metrics, histogramMetric) + } + } + return metrics +} + +// Parse returns a slice of Metrics from a text representation of a +// metrics +func Parse(buf []byte, header http.Header) ([]telegraf.Metric, error) { + var metrics []telegraf.Metric + var parser expfmt.TextParser + // parse even if the buffer begins with a newline + buf = bytes.TrimPrefix(buf, []byte("\n")) + // Read raw data + buffer := bytes.NewBuffer(buf) + reader := bufio.NewReader(buffer) + + mediatype, params, err := mime.ParseMediaType(header.Get("Content-Type")) + // Prepare output + metricFamilies := make(map[string]*dto.MetricFamily) + + if err == nil && mediatype == "application/vnd.google.protobuf" && + params["encoding"] == "delimited" && + params["proto"] == "io.prometheus.client.MetricFamily" { + for { + mf := &dto.MetricFamily{} + if _, ierr := pbutil.ReadDelimited(reader, mf); ierr != nil { + if ierr == io.EOF { + break + } + return nil, fmt.Errorf("reading metric family protocol buffer failed: %s", ierr) + } + metricFamilies[mf.GetName()] = mf + } + } else { + metricFamilies, err = parser.TextToMetricFamilies(reader) + if err != nil { + return nil, fmt.Errorf("reading text format failed: %s", err) + } + } + + // make sure all metrics have a consistent timestamp so that metrics don't straddle two different seconds + now := time.Now() + // read metrics + for metricName, mf := range metricFamilies { + for _, m := range mf.Metric { + // reading tags + tags := makeLabels(m) + // reading fields + var fields map[string]interface{} + if mf.GetType() == dto.MetricType_SUMMARY { + // summary metric + fields = makeQuantiles(m) + fields["count"] = float64(m.GetSummary().GetSampleCount()) + fields["sum"] = float64(m.GetSummary().GetSampleSum()) + } else if mf.GetType() == dto.MetricType_HISTOGRAM { + // histogram metric + fields = makeBuckets(m) + fields["count"] = float64(m.GetHistogram().GetSampleCount()) + fields["sum"] = float64(m.GetHistogram().GetSampleSum()) + + } else { + // standard metric + fields = getNameAndValue(m) + } + // converting to telegraf metric + if len(fields) > 0 { + var t time.Time + if m.TimestampMs != nil && *m.TimestampMs > 0 { + t = time.Unix(0, *m.TimestampMs*1000000) + } else { + t = now + } + metric, err := metric.New(metricName, tags, fields, t, valueType(mf.GetType())) + if err == nil { + metrics = append(metrics, metric) + } + } + } + } + + return metrics, err +} + +func valueType(mt dto.MetricType) telegraf.ValueType { + switch mt { + case dto.MetricType_COUNTER: + return telegraf.Counter + case dto.MetricType_GAUGE: + return telegraf.Gauge + case dto.MetricType_SUMMARY: + return telegraf.Summary + case dto.MetricType_HISTOGRAM: + return telegraf.Histogram + default: + return telegraf.Untyped + } +} + +// Get Quantiles from summary metric +func makeQuantiles(m *dto.Metric) map[string]interface{} { + fields := make(map[string]interface{}) + for _, q := range m.GetSummary().Quantile { + if !math.IsNaN(q.GetValue()) { + fields[fmt.Sprint(q.GetQuantile())] = float64(q.GetValue()) + } + } + return fields +} + +// Get Buckets from histogram metric +func makeBuckets(m *dto.Metric) map[string]interface{} { + fields := make(map[string]interface{}) + for _, b := range m.GetHistogram().Bucket { + fields[fmt.Sprint(b.GetUpperBound())] = float64(b.GetCumulativeCount()) + } + return fields +} + +// Get labels from metric +func makeLabels(m *dto.Metric) map[string]string { + result := map[string]string{} + for _, lp := range m.Label { + result[lp.GetName()] = lp.GetValue() + } + return result +} + +// Get name and value from metric +func getNameAndValue(m *dto.Metric) map[string]interface{} { + fields := make(map[string]interface{}) + if m.Gauge != nil { + if !math.IsNaN(m.GetGauge().GetValue()) { + fields["gauge"] = float64(m.GetGauge().GetValue()) + } + } else if m.Counter != nil { + if !math.IsNaN(m.GetCounter().GetValue()) { + fields["counter"] = float64(m.GetCounter().GetValue()) + } + } else if m.Untyped != nil { + if !math.IsNaN(m.GetUntyped().GetValue()) { + fields["value"] = float64(m.GetUntyped().GetValue()) + } + } + return fields +} + +// Get name and value from metric +func getNameAndValueV2(m *dto.Metric, metricName string) map[string]interface{} { + fields := make(map[string]interface{}) + if m.Gauge != nil { + if !math.IsNaN(m.GetGauge().GetValue()) { + fields[metricName] = float64(m.GetGauge().GetValue()) + } + } else if m.Counter != nil { + if !math.IsNaN(m.GetCounter().GetValue()) { + fields[metricName] = float64(m.GetCounter().GetValue()) + } + } else if m.Untyped != nil { + if !math.IsNaN(m.GetUntyped().GetValue()) { + fields[metricName] = float64(m.GetUntyped().GetValue()) + } + } + return fields +} diff --git a/src/modules/monapi/plugins/prometheus/prometheus/parser_test.go b/src/modules/monapi/plugins/prometheus/prometheus/parser_test.go new file mode 100644 index 00000000..7b2bfeca --- /dev/null +++ b/src/modules/monapi/plugins/prometheus/prometheus/parser_test.go @@ -0,0 +1,167 @@ +package prometheus + +import ( + "net/http" + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +var exptime = time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC) + +const validUniqueGauge = `# HELP cadvisor_version_info A metric with a constant '1' value labeled by kernel version, OS version, docker version, cadvisor version & cadvisor revision. +# TYPE cadvisor_version_info gauge +cadvisor_version_info{cadvisorRevision="",cadvisorVersion="",dockerVersion="1.8.2",kernelVersion="3.10.0-229.20.1.el7.x86_64",osVersion="CentOS Linux 7 (Core)"} 1 +` + +const validUniqueCounter = `# HELP get_token_fail_count Counter of failed Token() requests to the alternate token source +# TYPE get_token_fail_count counter +get_token_fail_count 0 +` + +const validUniqueLine = `# HELP get_token_fail_count Counter of failed Token() requests to the alternate token source +` + +const validUniqueSummary = `# HELP http_request_duration_microseconds The HTTP request latencies in microseconds. +# TYPE http_request_duration_microseconds summary +http_request_duration_microseconds{handler="prometheus",quantile="0.5"} 552048.506 +http_request_duration_microseconds{handler="prometheus",quantile="0.9"} 5.876804288e+06 +http_request_duration_microseconds{handler="prometheus",quantile="0.99"} 5.876804288e+06 +http_request_duration_microseconds_sum{handler="prometheus"} 1.8909097205e+07 +http_request_duration_microseconds_count{handler="prometheus"} 9 +` + +const validUniqueHistogram = `# HELP apiserver_request_latencies Response latency distribution in microseconds for each verb, resource and client. +# TYPE apiserver_request_latencies histogram +apiserver_request_latencies_bucket{resource="bindings",verb="POST",le="125000"} 1994 +apiserver_request_latencies_bucket{resource="bindings",verb="POST",le="250000"} 1997 +apiserver_request_latencies_bucket{resource="bindings",verb="POST",le="500000"} 2000 +apiserver_request_latencies_bucket{resource="bindings",verb="POST",le="1e+06"} 2005 +apiserver_request_latencies_bucket{resource="bindings",verb="POST",le="2e+06"} 2012 +apiserver_request_latencies_bucket{resource="bindings",verb="POST",le="4e+06"} 2017 +apiserver_request_latencies_bucket{resource="bindings",verb="POST",le="8e+06"} 2024 +apiserver_request_latencies_bucket{resource="bindings",verb="POST",le="+Inf"} 2025 +apiserver_request_latencies_sum{resource="bindings",verb="POST"} 1.02726334e+08 +apiserver_request_latencies_count{resource="bindings",verb="POST"} 2025 +` + +const validData = `# HELP cadvisor_version_info A metric with a constant '1' value labeled by kernel version, OS version, docker version, cadvisor version & cadvisor revision. +# TYPE cadvisor_version_info gauge +cadvisor_version_info{cadvisorRevision="",cadvisorVersion="",dockerVersion="1.8.2",kernelVersion="3.10.0-229.20.1.el7.x86_64",osVersion="CentOS Linux 7 (Core)"} 1 +# HELP go_gc_duration_seconds A summary of the GC invocation durations. +# TYPE go_gc_duration_seconds summary +go_gc_duration_seconds{quantile="0"} 0.013534896000000001 +go_gc_duration_seconds{quantile="0.25"} 0.02469263 +go_gc_duration_seconds{quantile="0.5"} 0.033727822000000005 +go_gc_duration_seconds{quantile="0.75"} 0.03840335 +go_gc_duration_seconds{quantile="1"} 0.049956604 +go_gc_duration_seconds_sum 1970.341293002 +go_gc_duration_seconds_count 65952 +# HELP http_request_duration_microseconds The HTTP request latencies in microseconds. +# TYPE http_request_duration_microseconds summary +http_request_duration_microseconds{handler="prometheus",quantile="0.5"} 552048.506 +http_request_duration_microseconds{handler="prometheus",quantile="0.9"} 5.876804288e+06 +http_request_duration_microseconds{handler="prometheus",quantile="0.99"} 5.876804288e+06 +http_request_duration_microseconds_sum{handler="prometheus"} 1.8909097205e+07 +http_request_duration_microseconds_count{handler="prometheus"} 9 +# HELP get_token_fail_count Counter of failed Token() requests to the alternate token source +# TYPE get_token_fail_count counter +get_token_fail_count 0 +# HELP apiserver_request_latencies Response latency distribution in microseconds for each verb, resource and client. +# TYPE apiserver_request_latencies histogram +apiserver_request_latencies_bucket{resource="bindings",verb="POST",le="125000"} 1994 +apiserver_request_latencies_bucket{resource="bindings",verb="POST",le="250000"} 1997 +apiserver_request_latencies_bucket{resource="bindings",verb="POST",le="500000"} 2000 +apiserver_request_latencies_bucket{resource="bindings",verb="POST",le="1e+06"} 2005 +apiserver_request_latencies_bucket{resource="bindings",verb="POST",le="2e+06"} 2012 +apiserver_request_latencies_bucket{resource="bindings",verb="POST",le="4e+06"} 2017 +apiserver_request_latencies_bucket{resource="bindings",verb="POST",le="8e+06"} 2024 +apiserver_request_latencies_bucket{resource="bindings",verb="POST",le="+Inf"} 2025 +apiserver_request_latencies_sum{resource="bindings",verb="POST"} 1.02726334e+08 +apiserver_request_latencies_count{resource="bindings",verb="POST"} 2025 +` + +const prometheusMulti = ` +cpu,host=foo,datacenter=us-east usage_idle=99,usage_busy=1 +cpu,host=foo,datacenter=us-east usage_idle=99,usage_busy=1 +cpu,host=foo,datacenter=us-east usage_idle=99,usage_busy=1 +cpu,host=foo,datacenter=us-east usage_idle=99,usage_busy=1 +cpu,host=foo,datacenter=us-east usage_idle=99,usage_busy=1 +cpu,host=foo,datacenter=us-east usage_idle=99,usage_busy=1 +cpu,host=foo,datacenter=us-east usage_idle=99,usage_busy=1 +` + +const prometheusMultiSomeInvalid = ` +cpu,host=foo,datacenter=us-east usage_idle=99,usage_busy=1 +cpu,host=foo,datacenter=us-east usage_idle=99,usage_busy=1 +cpu,host=foo,datacenter=us-east usage_idle=99,usage_busy=1 +cpu,cpu=cpu3, host=foo,datacenter=us-east usage_idle=99,usage_busy=1 +cpu,cpu=cpu4 , usage_idle=99,usage_busy=1 +cpu,host=foo,datacenter=us-east usage_idle=99,usage_busy=1 +` + +func TestParseValidPrometheus(t *testing.T) { + // Gauge value + metrics, err := Parse([]byte(validUniqueGauge), http.Header{}) + assert.NoError(t, err) + assert.Len(t, metrics, 1) + assert.Equal(t, "cadvisor_version_info", metrics[0].Name()) + assert.Equal(t, map[string]interface{}{ + "gauge": float64(1), + }, metrics[0].Fields()) + assert.Equal(t, map[string]string{ + "osVersion": "CentOS Linux 7 (Core)", + "cadvisorRevision": "", + "cadvisorVersion": "", + "dockerVersion": "1.8.2", + "kernelVersion": "3.10.0-229.20.1.el7.x86_64", + }, metrics[0].Tags()) + + // Counter value + metrics, err = Parse([]byte(validUniqueCounter), http.Header{}) + assert.NoError(t, err) + assert.Len(t, metrics, 1) + assert.Equal(t, "get_token_fail_count", metrics[0].Name()) + assert.Equal(t, map[string]interface{}{ + "counter": float64(0), + }, metrics[0].Fields()) + assert.Equal(t, map[string]string{}, metrics[0].Tags()) + + // Summary data + //SetDefaultTags(map[string]string{}) + metrics, err = Parse([]byte(validUniqueSummary), http.Header{}) + assert.NoError(t, err) + assert.Len(t, metrics, 1) + assert.Equal(t, "http_request_duration_microseconds", metrics[0].Name()) + assert.Equal(t, map[string]interface{}{ + "0.5": 552048.506, + "0.9": 5.876804288e+06, + "0.99": 5.876804288e+06, + "count": 9.0, + "sum": 1.8909097205e+07, + }, metrics[0].Fields()) + assert.Equal(t, map[string]string{"handler": "prometheus"}, metrics[0].Tags()) + + // histogram data + metrics, err = Parse([]byte(validUniqueHistogram), http.Header{}) + assert.NoError(t, err) + assert.Len(t, metrics, 1) + assert.Equal(t, "apiserver_request_latencies", metrics[0].Name()) + assert.Equal(t, map[string]interface{}{ + "500000": 2000.0, + "count": 2025.0, + "sum": 1.02726334e+08, + "250000": 1997.0, + "2e+06": 2012.0, + "4e+06": 2017.0, + "8e+06": 2024.0, + "+Inf": 2025.0, + "125000": 1994.0, + "1e+06": 2005.0, + }, metrics[0].Fields()) + assert.Equal(t, + map[string]string{"verb": "POST", "resource": "bindings"}, + metrics[0].Tags()) + +} diff --git a/src/modules/monapi/plugins/prometheus/prometheus/prometheus.go b/src/modules/monapi/plugins/prometheus/prometheus/prometheus.go new file mode 100644 index 00000000..5226b4bd --- /dev/null +++ b/src/modules/monapi/plugins/prometheus/prometheus/prometheus.go @@ -0,0 +1,398 @@ +package prometheus + +import ( + "context" + "errors" + "fmt" + "io/ioutil" + "net" + "net/http" + "net/url" + "sync" + "time" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/plugins/common/tls" +) + +const acceptHeader = `application/vnd.google.protobuf;proto=io.prometheus.client.MetricFamily;encoding=delimited;q=0.7,text/plain;version=0.0.4;q=0.3,*/*;q=0.1` + +type Prometheus struct { + // An array of urls to scrape metrics from. + URLs []string `toml:"urls"` + + // An array of Kubernetes services to scrape metrics from. + KubernetesServices []string + + // Content of kubernetes config file + KubeConfigContent string + + // Label Selector/s for Kubernetes + KubernetesLabelSelector string `toml:"kubernetes_label_selector"` + + // Field Selector/s for Kubernetes + KubernetesFieldSelector string `toml:"kubernetes_field_selector"` + + // Bearer Token authorization file path + BearerToken string `toml:"bearer_token"` + BearerTokenString string `toml:"bearer_token_string"` + + // Basic authentication credentials + Username string `toml:"username"` + Password string `toml:"password"` + + ResponseTimeout time.Duration `toml:"response_timeout"` + + MetricVersion int `toml:"metric_version"` + + URLTag string `toml:"url_tag"` + + tls.ClientConfig + + Log telegraf.Logger + + client *http.Client + + // Should we scrape Kubernetes services for prometheus annotations + MonitorPods bool `toml:"monitor_kubernetes_pods"` + PodNamespace string `toml:"monitor_kubernetes_pods_namespace"` + lock sync.Mutex + kubernetesPods map[string]URLAndAddress + cancel context.CancelFunc + wg sync.WaitGroup +} + +var sampleConfig = ` + ## An array of urls to scrape metrics from. + urls = ["http://localhost:9100/metrics"] + + ## Metric version controls the mapping from Prometheus metrics into + ## Telegraf metrics. When using the prometheus_client output, use the same + ## value in both plugins to ensure metrics are round-tripped without + ## modification. + ## + ## example: metric_version = 1; deprecated in 1.13 + ## metric_version = 2; recommended version + # metric_version = 1 + + ## Url tag name (tag containing scrapped url. optional, default is "url") + # url_tag = "scrapeUrl" + + ## An array of Kubernetes services to scrape metrics from. + # kubernetes_services = ["http://my-service-dns.my-namespace:9100/metrics"] + + ## Kubernetes config file to create client from. + # kube_config = "/path/to/kubernetes.config" + + ## Scrape Kubernetes pods for the following prometheus annotations: + ## - prometheus.io/scrape: Enable scraping for this pod + ## - prometheus.io/scheme: If the metrics endpoint is secured then you will need to + ## set this to 'https' & most likely set the tls config. + ## - prometheus.io/path: If the metrics path is not /metrics, define it with this annotation. + ## - prometheus.io/port: If port is not 9102 use this annotation + # monitor_kubernetes_pods = true + ## Restricts Kubernetes monitoring to a single namespace + ## ex: monitor_kubernetes_pods_namespace = "default" + # monitor_kubernetes_pods_namespace = "" + # label selector to target pods which have the label + # kubernetes_label_selector = "env=dev,app=nginx" + # field selector to target pods + # eg. To scrape pods on a specific node + # kubernetes_field_selector = "spec.nodeName=$HOSTNAME" + + ## Use bearer token for authorization. ('bearer_token' takes priority) + # bearer_token = "/path/to/bearer/token" + ## OR + # bearer_token_string = "abc_123" + + ## HTTP Basic Authentication username and password. ('bearer_token' and + ## 'bearer_token_string' take priority) + # username = "" + # password = "" + + ## Specify timeout duration for slower prometheus clients (default is 3s) + # response_timeout = "3s" + + ## Optional TLS Config + # tls_ca = /path/to/cafile + # tls_cert = /path/to/certfile + # tls_key = /path/to/keyfile + ## Use TLS but skip chain & host verification + # insecure_skip_verify = false +` + +func (p *Prometheus) SampleConfig() string { + return sampleConfig +} + +func (p *Prometheus) Description() string { + return "Read metrics from one or many prometheus clients" +} + +func (p *Prometheus) Init() error { + if p.MetricVersion != 2 { + p.Log.Warnf("Use of deprecated configuration: 'metric_version = 1'; please update to 'metric_version = 2'") + } + + return nil +} + +var ErrProtocolError = errors.New("prometheus protocol error") + +func (p *Prometheus) AddressToURL(u *url.URL, address string) *url.URL { + host := address + if u.Port() != "" { + host = address + ":" + u.Port() + } + reconstructedURL := &url.URL{ + Scheme: u.Scheme, + Opaque: u.Opaque, + User: u.User, + Path: u.Path, + RawPath: u.RawPath, + ForceQuery: u.ForceQuery, + RawQuery: u.RawQuery, + Fragment: u.Fragment, + Host: host, + } + return reconstructedURL +} + +type URLAndAddress struct { + OriginalURL *url.URL + URL *url.URL + Address string + Tags map[string]string +} + +func (p *Prometheus) GetAllURLs() (map[string]URLAndAddress, error) { + allURLs := make(map[string]URLAndAddress, 0) + for _, u := range p.URLs { + URL, err := url.Parse(u) + if err != nil { + p.Log.Errorf("Could not parse %q, skipping it. Error: %s", u, err.Error()) + continue + } + allURLs[URL.String()] = URLAndAddress{URL: URL, OriginalURL: URL} + } + + p.lock.Lock() + defer p.lock.Unlock() + // loop through all pods scraped via the prometheus annotation on the pods + for k, v := range p.kubernetesPods { + allURLs[k] = v + } + + for _, service := range p.KubernetesServices { + URL, err := url.Parse(service) + if err != nil { + return nil, err + } + + resolvedAddresses, err := net.LookupHost(URL.Hostname()) + if err != nil { + p.Log.Errorf("Could not resolve %q, skipping it. Error: %s", URL.Host, err.Error()) + continue + } + for _, resolved := range resolvedAddresses { + serviceURL := p.AddressToURL(URL, resolved) + allURLs[serviceURL.String()] = URLAndAddress{ + URL: serviceURL, + Address: resolved, + OriginalURL: URL, + } + } + } + return allURLs, nil +} + +// Reads stats from all configured servers accumulates stats. +// Returns one of the errors encountered while gather stats (if any). +func (p *Prometheus) Gather(acc telegraf.Accumulator) error { + if p.client == nil { + client, err := p.createHTTPClient() + if err != nil { + return err + } + p.client = client + } + + var wg sync.WaitGroup + + allURLs, err := p.GetAllURLs() + if err != nil { + return err + } + for _, URL := range allURLs { + wg.Add(1) + go func(serviceURL URLAndAddress) { + defer wg.Done() + acc.AddError(p.gatherURL(serviceURL, acc)) + }(URL) + } + + wg.Wait() + + return nil +} + +func (p *Prometheus) createHTTPClient() (*http.Client, error) { + tlsCfg, err := p.ClientConfig.TLSConfig() + if err != nil { + return nil, err + } + + client := &http.Client{ + Transport: &http.Transport{ + TLSClientConfig: tlsCfg, + DisableKeepAlives: true, + }, + Timeout: p.ResponseTimeout, + } + + return client, nil +} + +func (p *Prometheus) gatherURL(u URLAndAddress, acc telegraf.Accumulator) error { + var req *http.Request + var err error + var uClient *http.Client + var metrics []telegraf.Metric + if u.URL.Scheme == "unix" { + path := u.URL.Query().Get("path") + if path == "" { + path = "/metrics" + } + addr := "http://localhost" + path + req, err = http.NewRequest("GET", addr, nil) + if err != nil { + return fmt.Errorf("unable to create new request '%s': %s", addr, err) + } + + // ignore error because it's been handled before getting here + tlsCfg, _ := p.ClientConfig.TLSConfig() + uClient = &http.Client{ + Transport: &http.Transport{ + TLSClientConfig: tlsCfg, + DisableKeepAlives: true, + Dial: func(network, addr string) (net.Conn, error) { + c, err := net.Dial("unix", u.URL.Path) + return c, err + }, + }, + Timeout: p.ResponseTimeout, + } + } else { + if u.URL.Path == "" { + u.URL.Path = "/metrics" + } + req, err = http.NewRequest("GET", u.URL.String(), nil) + if err != nil { + return fmt.Errorf("unable to create new request '%s': %s", u.URL.String(), err) + } + } + + req.Header.Add("Accept", acceptHeader) + + if p.BearerToken != "" { + token, err := ioutil.ReadFile(p.BearerToken) + if err != nil { + return err + } + req.Header.Set("Authorization", "Bearer "+string(token)) + } else if p.BearerTokenString != "" { + req.Header.Set("Authorization", "Bearer "+p.BearerTokenString) + } else if p.Username != "" || p.Password != "" { + req.SetBasicAuth(p.Username, p.Password) + } + + var resp *http.Response + if u.URL.Scheme != "unix" { + resp, err = p.client.Do(req) + } else { + resp, err = uClient.Do(req) + } + if err != nil { + return fmt.Errorf("error making HTTP request to %s: %s", u.URL, err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + return fmt.Errorf("%s returned HTTP status %s", u.URL, resp.Status) + } + + body, err := ioutil.ReadAll(resp.Body) + if err != nil { + return fmt.Errorf("error reading body: %s", err) + } + + if p.MetricVersion == 2 { + metrics, err = ParseV2(body, resp.Header) + } else { + metrics, err = Parse(body, resp.Header) + } + + if err != nil { + return fmt.Errorf("error reading metrics for %s: %s", + u.URL, err) + } + + for _, metric := range metrics { + tags := metric.Tags() + // strip user and password from URL + u.OriginalURL.User = nil + if p.URLTag != "" { + tags[p.URLTag] = u.OriginalURL.String() + } + if u.Address != "" { + tags["address"] = u.Address + } + for k, v := range u.Tags { + tags[k] = v + } + + switch metric.Type() { + case telegraf.Counter: + acc.AddCounter(metric.Name(), metric.Fields(), tags, metric.Time()) + case telegraf.Gauge: + acc.AddGauge(metric.Name(), metric.Fields(), tags, metric.Time()) + case telegraf.Summary: + acc.AddSummary(metric.Name(), metric.Fields(), tags, metric.Time()) + case telegraf.Histogram: + acc.AddHistogram(metric.Name(), metric.Fields(), tags, metric.Time()) + default: + acc.AddFields(metric.Name(), metric.Fields(), tags, metric.Time()) + } + } + + return nil +} + +// Start will start the Kubernetes scraping if enabled in the configuration +func (p *Prometheus) Start(a telegraf.Accumulator) error { + if p.MonitorPods { + var ctx context.Context + ctx, p.cancel = context.WithCancel(context.Background()) + return p.start(ctx) + } + return nil +} + +func (p *Prometheus) Stop() { + if p.MonitorPods { + p.cancel() + } + p.wg.Wait() +} + +/* +func init() { + inputs.Add("prometheus", func() telegraf.Input { + return &Prometheus{ + ResponseTimeout: internal.Duration{Duration: time.Second * 3}, + kubernetesPods: map[string]URLAndAddress{}, + URLTag: "url", + } + }) +} +*/ diff --git a/src/modules/monapi/plugins/prometheus/prometheus/prometheus_test.go b/src/modules/monapi/plugins/prometheus/prometheus/prometheus_test.go new file mode 100644 index 00000000..d33cba27 --- /dev/null +++ b/src/modules/monapi/plugins/prometheus/prometheus/prometheus_test.go @@ -0,0 +1,236 @@ +package prometheus + +import ( + "fmt" + "math" + "net/http" + "net/http/httptest" + "net/url" + "testing" + "time" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/testutil" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +const sampleTextFormat = `# HELP go_gc_duration_seconds A summary of the GC invocation durations. +# TYPE go_gc_duration_seconds summary +go_gc_duration_seconds{quantile="0"} 0.00010425500000000001 +go_gc_duration_seconds{quantile="0.25"} 0.000139108 +go_gc_duration_seconds{quantile="0.5"} 0.00015749400000000002 +go_gc_duration_seconds{quantile="0.75"} 0.000331463 +go_gc_duration_seconds{quantile="1"} 0.000667154 +go_gc_duration_seconds_sum 0.0018183950000000002 +go_gc_duration_seconds_count 7 +# HELP go_goroutines Number of goroutines that currently exist. +# TYPE go_goroutines gauge +go_goroutines 15 +# HELP test_metric An untyped metric with a timestamp +# TYPE test_metric untyped +test_metric{label="value"} 1.0 1490802350000 +` +const sampleSummaryTextFormat = `# HELP go_gc_duration_seconds A summary of the GC invocation durations. +# TYPE go_gc_duration_seconds summary +go_gc_duration_seconds{quantile="0"} 0.00010425500000000001 +go_gc_duration_seconds{quantile="0.25"} 0.000139108 +go_gc_duration_seconds{quantile="0.5"} 0.00015749400000000002 +go_gc_duration_seconds{quantile="0.75"} 0.000331463 +go_gc_duration_seconds{quantile="1"} 0.000667154 +go_gc_duration_seconds_sum 0.0018183950000000002 +go_gc_duration_seconds_count 7 +` +const sampleGaugeTextFormat = ` +# HELP go_goroutines Number of goroutines that currently exist. +# TYPE go_goroutines gauge +go_goroutines 15 1490802350000 +` + +func TestPrometheusGeneratesMetrics(t *testing.T) { + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + fmt.Fprintln(w, sampleTextFormat) + })) + defer ts.Close() + + p := &Prometheus{ + Log: testutil.Logger{}, + URLs: []string{ts.URL}, + URLTag: "url", + } + + var acc testutil.Accumulator + + err := acc.GatherError(p.Gather) + require.NoError(t, err) + + assert.True(t, acc.HasFloatField("go_gc_duration_seconds", "count")) + assert.True(t, acc.HasFloatField("go_goroutines", "gauge")) + assert.True(t, acc.HasFloatField("test_metric", "value")) + assert.True(t, acc.HasTimestamp("test_metric", time.Unix(1490802350, 0))) + assert.False(t, acc.HasTag("test_metric", "address")) + assert.True(t, acc.TagValue("test_metric", "url") == ts.URL+"/metrics") +} + +func TestPrometheusGeneratesMetricsWithHostNameTag(t *testing.T) { + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + fmt.Fprintln(w, sampleTextFormat) + })) + defer ts.Close() + + p := &Prometheus{ + Log: testutil.Logger{}, + KubernetesServices: []string{ts.URL}, + URLTag: "url", + } + u, _ := url.Parse(ts.URL) + tsAddress := u.Hostname() + + var acc testutil.Accumulator + + err := acc.GatherError(p.Gather) + require.NoError(t, err) + + assert.True(t, acc.HasFloatField("go_gc_duration_seconds", "count")) + assert.True(t, acc.HasFloatField("go_goroutines", "gauge")) + assert.True(t, acc.HasFloatField("test_metric", "value")) + assert.True(t, acc.HasTimestamp("test_metric", time.Unix(1490802350, 0))) + assert.True(t, acc.TagValue("test_metric", "address") == tsAddress) + assert.True(t, acc.TagValue("test_metric", "url") == ts.URL) +} + +func TestPrometheusGeneratesMetricsAlthoughFirstDNSFails(t *testing.T) { + if testing.Short() { + t.Skip("Skipping integration test in short mode") + } + + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + fmt.Fprintln(w, sampleTextFormat) + })) + defer ts.Close() + + p := &Prometheus{ + Log: testutil.Logger{}, + URLs: []string{ts.URL}, + KubernetesServices: []string{"http://random.telegraf.local:88/metrics"}, + } + + var acc testutil.Accumulator + + err := acc.GatherError(p.Gather) + require.NoError(t, err) + + assert.True(t, acc.HasFloatField("go_gc_duration_seconds", "count")) + assert.True(t, acc.HasFloatField("go_goroutines", "gauge")) + assert.True(t, acc.HasFloatField("test_metric", "value")) + assert.True(t, acc.HasTimestamp("test_metric", time.Unix(1490802350, 0))) +} + +func TestPrometheusGeneratesSummaryMetricsV2(t *testing.T) { + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + fmt.Fprintln(w, sampleSummaryTextFormat) + })) + defer ts.Close() + + p := &Prometheus{ + URLs: []string{ts.URL}, + URLTag: "url", + MetricVersion: 2, + } + + var acc testutil.Accumulator + + err := acc.GatherError(p.Gather) + require.NoError(t, err) + + assert.True(t, acc.TagSetValue("prometheus", "quantile") == "0") + assert.True(t, acc.HasFloatField("prometheus", "go_gc_duration_seconds_sum")) + assert.True(t, acc.HasFloatField("prometheus", "go_gc_duration_seconds_count")) + assert.True(t, acc.TagValue("prometheus", "url") == ts.URL+"/metrics") + +} + +func TestSummaryMayContainNaN(t *testing.T) { + const data = `# HELP go_gc_duration_seconds A summary of the GC invocation durations. +# TYPE go_gc_duration_seconds summary +go_gc_duration_seconds{quantile="0"} NaN +go_gc_duration_seconds{quantile="1"} NaN +go_gc_duration_seconds_sum 42.0 +go_gc_duration_seconds_count 42 +` + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + fmt.Fprintln(w, data) + })) + defer ts.Close() + + p := &Prometheus{ + URLs: []string{ts.URL}, + URLTag: "", + MetricVersion: 2, + } + + var acc testutil.Accumulator + + err := p.Gather(&acc) + require.NoError(t, err) + + expected := []telegraf.Metric{ + testutil.MustMetric( + "prometheus", + map[string]string{ + "quantile": "0", + }, + map[string]interface{}{ + "go_gc_duration_seconds": math.NaN(), + }, + time.Unix(0, 0), + telegraf.Summary, + ), + testutil.MustMetric( + "prometheus", + map[string]string{ + "quantile": "1", + }, + map[string]interface{}{ + "go_gc_duration_seconds": math.NaN(), + }, + time.Unix(0, 0), + telegraf.Summary, + ), + testutil.MustMetric( + "prometheus", + map[string]string{}, + map[string]interface{}{ + "go_gc_duration_seconds_sum": 42.0, + "go_gc_duration_seconds_count": 42.0, + }, + time.Unix(0, 0), + telegraf.Summary, + ), + } + + testutil.RequireMetricsEqual(t, expected, acc.GetTelegrafMetrics(), + testutil.IgnoreTime(), testutil.SortMetrics()) +} + +func TestPrometheusGeneratesGaugeMetricsV2(t *testing.T) { + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + fmt.Fprintln(w, sampleGaugeTextFormat) + })) + defer ts.Close() + + p := &Prometheus{ + URLs: []string{ts.URL}, + URLTag: "url", + MetricVersion: 2, + } + + var acc testutil.Accumulator + + err := acc.GatherError(p.Gather) + require.NoError(t, err) + + assert.True(t, acc.HasFloatField("prometheus", "go_goroutines")) + assert.True(t, acc.TagValue("prometheus", "url") == ts.URL+"/metrics") + assert.True(t, acc.HasTimestamp("prometheus", time.Unix(1490802350, 0))) +} diff --git a/src/modules/monapi/plugins/prometheus/prometheus_test.go b/src/modules/monapi/plugins/prometheus/prometheus_test.go new file mode 100644 index 00000000..90f395b3 --- /dev/null +++ b/src/modules/monapi/plugins/prometheus/prometheus_test.go @@ -0,0 +1,56 @@ +package prometheus + +import ( + "context" + "fmt" + "net/http" + "testing" + "time" + + "github.com/didi/nightingale/src/modules/monapi/plugins" +) + +const sampleTextFormat = `# HELP test_metric An untyped metric with a timestamp +# TYPE test_metric untyped +test_metric{label="value"} 1.0 1490802350000 +# HELP helo_stats_test_timer helo_stats_test_timer summary +# TYPE helo_stats_test_timer summary +helo_stats_test_timer{region="bj",zone="test_1",quantile="0.5"} 0.501462767 +helo_stats_test_timer{region="bj",zone="test_1",quantile="0.75"} 0.751876572 +helo_stats_test_timer{region="bj",zone="test_1",quantile="0.95"} 0.978413628 +helo_stats_test_timer{region="bj",zone="test_1",quantile="0.99"} 0.989530661 +helo_stats_test_timer{region="bj",zone="test_1",quantile="0.999"} 0.989530661 +helo_stats_test_timer_sum{region="bj",zone="test_1"} 39.169514066999994 +helo_stats_test_timer_count{region="bj",zone="test_1"} 74 +# HELP helo_stats_test_histogram helo_stats_test_histogram histogram +# TYPE helo_stats_test_histogram histogram +helo_stats_test_histogram_bucket{region="bj",zone="test_1",le="0"} 0 +helo_stats_test_histogram_bucket{region="bj",zone="test_1",le="0.05"} 0 +helo_stats_test_histogram_bucket{region="bj",zone="test_1",le="0.1"} 2 +helo_stats_test_histogram_bucket{region="bj",zone="test_1",le="0.25"} 13 +helo_stats_test_histogram_bucket{region="bj",zone="test_1",le="0.5"} 24 +helo_stats_test_histogram_bucket{region="bj",zone="test_1",le="1"} 56 +helo_stats_test_histogram_bucket{region="bj",zone="test_1",le="3"} 56 +helo_stats_test_histogram_bucket{region="bj",zone="test_1",le="6"} 56 +helo_stats_test_histogram_bucket{region="bj",zone="test_1",le="+Inf"} 56 +helo_stats_test_histogram_sum{region="bj",zone="test_1"} 40.45 +helo_stats_test_histogram_count{region="bj",zone="test_1"} 56 +# HELP go_goroutines Number of goroutines that currently exist. +# TYPE go_goroutines gauge +go_goroutines 15 1490802350000 +` + +func TestCollect(t *testing.T) { + http.HandleFunc("/metrics", func(w http.ResponseWriter, r *http.Request) { fmt.Fprintf(w, sampleTextFormat) }) + server := &http.Server{Addr: ":18080"} + go func() { + server.ListenAndServe() + }() + defer server.Shutdown(context.Background()) + + time.Sleep(time.Millisecond * 100) + + plugins.PluginTest(t, &PrometheusRule{ + URLs: []string{"http://localhost:18080/metrics"}, + }) +} diff --git a/src/modules/monapi/plugins/util.go b/src/modules/monapi/plugins/util.go index 267f065f..ac9cfee0 100644 --- a/src/modules/monapi/plugins/util.go +++ b/src/modules/monapi/plugins/util.go @@ -2,7 +2,11 @@ package plugins import ( "fmt" + "testing" + "github.com/didi/nightingale/src/common/dataobj" + "github.com/didi/nightingale/src/modules/prober/manager" + "github.com/influxdata/telegraf" "github.com/toolkits/pkg/logger" ) @@ -39,3 +43,29 @@ func (l *Logger) Infof(format string, args ...interface{}) { func (l *Logger) Info(args ...interface{}) { logger.LogDepth(logger.INFO, 1, fmt.Sprint(args...)) } + +type telegrafPlugin interface { + TelegrafInput() (telegraf.Input, error) +} + +func PluginTest(t *testing.T, plugin telegrafPlugin) { + metrics := []*dataobj.MetricValue{} + + input, err := plugin.TelegrafInput() + if err != nil { + t.Error(err) + } + + acc, err := manager.NewAccumulator(manager.AccumulatorOptions{Name: "github-test", Metrics: &metrics}) + if err != nil { + t.Error(err) + } + + if err = input.Gather(acc); err != nil { + t.Error(err) + } + + for k, v := range metrics { + t.Logf("%d %s %s %f", k, v.CounterType, v.PK(), v.Value) + } +} diff --git a/src/modules/prober/config/plugin.go b/src/modules/prober/config/plugin.go index 056dedfa..8fd57cf8 100644 --- a/src/modules/prober/config/plugin.go +++ b/src/modules/prober/config/plugin.go @@ -15,7 +15,8 @@ import ( ) var ( - pluginConfigs map[string]*PluginConfig + pluginConfigs map[string]*PluginConfig + defaultPluginConfigContent = []byte("mode: whitelist # whitelist(default),all") ) const ( @@ -84,8 +85,15 @@ func InitPluginsConfig(cf *ConfYaml) { b, err = ioutil.ReadFile(file) } if err != nil { - logger.Debugf("readfile %s err %s", plugin, err) - continue + file = filepath.Join(cf.PluginsConfig, plugin+".local.yml") + err = ioutil.WriteFile(file, defaultPluginConfigContent, 0644) + if err != nil { + logger.Warningf("create plugin config %s err %s", file, err) + continue + } else { + logger.Infof("create plugin config %s", file) + b = defaultPluginConfigContent + } } if err := yaml.Unmarshal(b, &c); err != nil { diff --git a/src/modules/prober/manager/accumulator.go b/src/modules/prober/manager/accumulator.go new file mode 100644 index 00000000..46e8b9ed --- /dev/null +++ b/src/modules/prober/manager/accumulator.go @@ -0,0 +1,258 @@ +package manager + +import ( + "fmt" + "strings" + "sync" + "time" + + "github.com/didi/nightingale/src/common/dataobj" + "github.com/influxdata/telegraf" + "github.com/toolkits/pkg/logger" +) + +type AccumulatorOptions struct { + Name string + Tags map[string]string + Metrics *[]*dataobj.MetricValue +} + +func (p *AccumulatorOptions) Validate() error { + if p.Name == "" { + return fmt.Errorf("unable to get Name") + } + if p.Metrics == nil { + return fmt.Errorf("unable to get metrics") + } + + return nil +} + +// NewAccumulator return telegraf.Accumulator +func NewAccumulator(opt AccumulatorOptions) (telegraf.Accumulator, error) { + if err := opt.Validate(); err != nil { + return nil, err + } + + return &accumulator{ + name: opt.Name, + tags: opt.Tags, + metrics: opt.Metrics, + precision: time.Second, + }, nil +} + +type accumulator struct { + sync.RWMutex + name string + tags map[string]string + precision time.Duration + metrics *[]*dataobj.MetricValue +} + +func (p *accumulator) AddFields( + measurement string, + fields map[string]interface{}, + tags map[string]string, + t ...time.Time, +) { + p.addFields(measurement, tags, fields, telegraf.Untyped, t...) +} + +func (p *accumulator) AddGauge( + measurement string, + fields map[string]interface{}, + tags map[string]string, + t ...time.Time, +) { + p.addFields(measurement, tags, fields, telegraf.Gauge, t...) +} + +func (p *accumulator) AddCounter( + measurement string, + fields map[string]interface{}, + tags map[string]string, + t ...time.Time, +) { + p.addFields(measurement, tags, fields, telegraf.Counter, t...) +} + +func (p *accumulator) AddSummary( + measurement string, + fields map[string]interface{}, + tags map[string]string, + t ...time.Time, +) { + p.addFields(measurement, tags, fields, telegraf.Summary, t...) +} + +func (p *accumulator) AddHistogram( + measurement string, + fields map[string]interface{}, + tags map[string]string, + t ...time.Time, +) { + p.addFields(measurement, tags, fields, telegraf.Histogram, t...) +} + +func (p *accumulator) AddMetric(m telegraf.Metric) { + m.SetTime(m.Time().Round(p.precision)) + if metrics := p.makeMetric(m); m != nil { + p.pushMetrics(metrics) + } +} + +func (p *accumulator) SetPrecision(precision time.Duration) { + p.precision = precision +} + +// AddError passes a runtime error to the accumulator. +// The error will be tagged with the plugin name and written to the log. +func (p *accumulator) AddError(err error) { + if err == nil { + return + } + logger.Debugf("accumulator %s Error: %s", p.name, err) +} + +func (p *accumulator) WithTracking(maxTracked int) telegraf.TrackingAccumulator { + return nil +} + +func (p *accumulator) addFields( + measurement string, + tags map[string]string, + fields map[string]interface{}, + tp telegraf.ValueType, + t ...time.Time, +) { + m, err := NewMetric(measurement, tags, fields, p.getTime(t), tp) + if err != nil { + return + } + if metrics := p.makeMetric(m); m != nil { + p.pushMetrics(metrics) + } +} + +func (p *accumulator) getTime(t []time.Time) time.Time { + var timestamp time.Time + if len(t) > 0 { + timestamp = t[0] + } else { + timestamp = time.Now() + } + return timestamp.Round(p.precision) +} + +// https://docs.influxdata.com/telegraf/v1.14/data_formats/output/prometheus/ +func (p *accumulator) makeMetric(metric telegraf.Metric) []*dataobj.MetricValue { + tags := map[string]string{} + for _, v := range metric.TagList() { + tags[v.Key] = v.Value + } + + for k, v := range p.tags { + tags[k] = v + } + + switch metric.Type() { + case telegraf.Counter: + return makeCounter(metric, tags) + case telegraf.Summary, telegraf.Histogram: + logger.Debugf("unsupported type summary, histogram, skip") + return nil + // return makeSummary(metric, tags) + default: + return makeGauge(metric, tags) + } + +} + +func makeSummary(metric telegraf.Metric, tags map[string]string) []*dataobj.MetricValue { + name := metric.Name() + ts := metric.Time().Unix() + fields := metric.Fields() + ms := make([]*dataobj.MetricValue, 0, len(fields)) + + for k, v := range fields { + f, ok := v.(float64) + if !ok { + continue + } + + countType := "GAUGE" + if strings.HasSuffix(k, "_count") || + strings.HasSuffix(k, "_sum") { + countType = "COUNTER" + } + + ms = append(ms, &dataobj.MetricValue{ + Metric: name + "_" + k, + CounterType: countType, + Timestamp: ts, + TagsMap: tags, + Value: f, + ValueUntyped: f, + }) + + } + return ms +} + +func makeCounter(metric telegraf.Metric, tags map[string]string) []*dataobj.MetricValue { + name := metric.Name() + ts := metric.Time().Unix() + fields := metric.Fields() + ms := make([]*dataobj.MetricValue, 0, len(fields)) + + for k, v := range fields { + f, ok := v.(float64) + if !ok { + continue + } + + ms = append(ms, &dataobj.MetricValue{ + Metric: name + "_" + k, + CounterType: "COUNTER", + Timestamp: ts, + TagsMap: tags, + Value: f, + ValueUntyped: f, + }) + } + + return ms +} + +func makeGauge(metric telegraf.Metric, tags map[string]string) []*dataobj.MetricValue { + name := metric.Name() + ts := metric.Time().Unix() + fields := metric.Fields() + ms := make([]*dataobj.MetricValue, 0, len(fields)) + + for k, v := range fields { + f, ok := v.(float64) + if !ok { + continue + } + + ms = append(ms, &dataobj.MetricValue{ + Metric: name + "_" + k, + CounterType: "GAUGE", + Timestamp: ts, + TagsMap: tags, + Value: f, + ValueUntyped: f, + }) + } + + return ms + +} + +func (p *accumulator) pushMetrics(metrics []*dataobj.MetricValue) { + p.Lock() + defer p.Unlock() + *p.metrics = append(*p.metrics, metrics...) +} diff --git a/src/modules/prober/manager/collectrule.go b/src/modules/prober/manager/collectrule.go index c0c6581c..95353b55 100644 --- a/src/modules/prober/manager/collectrule.go +++ b/src/modules/prober/manager/collectrule.go @@ -1,9 +1,9 @@ package manager import ( + "fmt" "strconv" "sync" - "time" "github.com/didi/nightingale/src/common/dataobj" "github.com/didi/nightingale/src/models" @@ -13,14 +13,14 @@ import ( "github.com/toolkits/pkg/logger" ) -// not thread-safe type collectRule struct { sync.RWMutex - telegraf.Input *models.CollectRule + + input telegraf.Input + acc telegraf.Accumulator + metrics *[]*dataobj.MetricValue tags map[string]string - precision time.Duration - metrics []*dataobj.MetricValue lastAt int64 updatedAt int64 } @@ -41,43 +41,76 @@ func newCollectRule(rule *models.CollectRule) (*collectRule, error) { return nil, err } + metrics := []*dataobj.MetricValue{} + + acc, err := NewAccumulator(AccumulatorOptions{ + Name: fmt.Sprintf("%s-%d", rule.CollectType, rule.Id), + Tags: tags, + Metrics: &metrics}) + if err != nil { + return nil, err + } + return &collectRule{ - Input: input, CollectRule: rule, + input: input, + acc: acc, + metrics: &metrics, tags: tags, - metrics: []*dataobj.MetricValue{}, - precision: time.Second, updatedAt: rule.UpdatedAt, }, nil } +func (p *collectRule) reset() { + p.Lock() + defer p.Unlock() + + *p.metrics = (*p.metrics)[:0] +} + +func (p *collectRule) Metrics() []*dataobj.MetricValue { + p.RLock() + defer p.RUnlock() + + return *p.metrics +} + // prepareMetrics -func (p *collectRule) prepareMetrics() error { - if len(p.metrics) == 0 { - return nil +func (p *collectRule) prepareMetrics() (metrics []*dataobj.MetricValue, err error) { + p.RLock() + defer p.RUnlock() + + if len(*p.metrics) == 0 { + return } - ts := p.metrics[0].Timestamp + + metrics = *p.metrics + ts := metrics[0].Timestamp nid := strconv.FormatInt(p.Nid, 10) pluginConfig, ok := config.GetPluginConfig(p.PluginName()) if !ok { - return nil + return + } + + if pluginConfig.Mode == config.PluginModeWhitelist && len(pluginConfig.Metrics) == 0 { + return } vars := map[string]*dataobj.MetricValue{} - for _, v := range p.metrics { + for _, v := range metrics { logger.Debugf("get v[%s] %f", v.Metric, v.Value) vars[v.Metric] = v } - p.metrics = p.metrics[:0] + metrics = metrics[:0] for _, metric := range pluginConfig.ExprMetrics { f, err := metric.Calc(vars) if err != nil { logger.Debugf("calc err %s", err) continue } - p.metrics = append(p.metrics, &dataobj.MetricValue{ + metrics = append(metrics, &dataobj.MetricValue{ Nid: nid, Metric: metric.Name, Timestamp: ts, @@ -91,7 +124,7 @@ func (p *collectRule) prepareMetrics() error { for k, v := range vars { if metric, ok := pluginConfig.Metrics[k]; ok { - p.metrics = append(p.metrics, &dataobj.MetricValue{ + metrics = append(metrics, &dataobj.MetricValue{ Nid: nid, Metric: k, Timestamp: ts, @@ -105,7 +138,7 @@ func (p *collectRule) prepareMetrics() error { if pluginConfig.Mode == config.PluginModeWhitelist { continue } - p.metrics = append(p.metrics, &dataobj.MetricValue{ + metrics = append(metrics, &dataobj.MetricValue{ Nid: nid, Metric: k, Timestamp: ts, @@ -115,13 +148,15 @@ func (p *collectRule) prepareMetrics() error { Value: v.Value, ValueUntyped: v.ValueUntyped, }) - } } - return nil + return } func (p *collectRule) update(rule *models.CollectRule) error { + p.Lock() + defer p.Unlock() + if p.updatedAt == rule.UpdatedAt { return nil } @@ -139,145 +174,18 @@ func (p *collectRule) update(rule *models.CollectRule) error { return err } - p.Input = input + acc, err := NewAccumulator(AccumulatorOptions{ + Name: fmt.Sprintf("%s-%d", rule.CollectType, rule.Id), + Tags: tags, + Metrics: p.metrics}) + if err != nil { + return err + } + + p.input = input p.CollectRule = rule - p.tags = tags + p.acc = acc p.UpdatedAt = rule.UpdatedAt return nil } - -// https://docs.influxdata.com/telegraf/v1.14/data_formats/output/prometheus/ -func (p *collectRule) MakeMetric(metric telegraf.Metric) []*dataobj.MetricValue { - tags := map[string]string{} - for _, v := range metric.TagList() { - tags[v.Key] = v.Value - } - - for k, v := range p.tags { - tags[k] = v - } - - name := metric.Name() - ts := metric.Time().Unix() - - fields := metric.Fields() - ms := make([]*dataobj.MetricValue, 0, len(fields)) - for k, v := range fields { - f, ok := v.(float64) - if !ok { - continue - } - - ms = append(ms, &dataobj.MetricValue{ - Metric: name + "_" + k, - Timestamp: ts, - TagsMap: tags, - Value: f, - ValueUntyped: f, - }) - } - - return ms -} - -func (p *collectRule) AddFields( - measurement string, - fields map[string]interface{}, - tags map[string]string, - t ...time.Time, -) { - p.addFields(measurement, tags, fields, telegraf.Untyped, t...) -} - -func (p *collectRule) AddGauge( - measurement string, - fields map[string]interface{}, - tags map[string]string, - t ...time.Time, -) { - p.addFields(measurement, tags, fields, telegraf.Gauge, t...) -} - -func (p *collectRule) AddCounter( - measurement string, - fields map[string]interface{}, - tags map[string]string, - t ...time.Time, -) { - p.addFields(measurement, tags, fields, telegraf.Counter, t...) -} - -func (p *collectRule) AddSummary( - measurement string, - fields map[string]interface{}, - tags map[string]string, - t ...time.Time, -) { - p.addFields(measurement, tags, fields, telegraf.Summary, t...) -} - -func (p *collectRule) AddHistogram( - measurement string, - fields map[string]interface{}, - tags map[string]string, - t ...time.Time, -) { - p.addFields(measurement, tags, fields, telegraf.Histogram, t...) -} - -func (p *collectRule) AddMetric(m telegraf.Metric) { - m.SetTime(m.Time().Round(p.precision)) - if metrics := p.MakeMetric(m); m != nil { - p.pushMetrics(metrics) - } -} - -func (p *collectRule) pushMetrics(metrics []*dataobj.MetricValue) { - p.Lock() - defer p.Unlock() - p.metrics = append(p.metrics, metrics...) -} - -func (p *collectRule) addFields( - measurement string, - tags map[string]string, - fields map[string]interface{}, - tp telegraf.ValueType, - t ...time.Time, -) { - m, err := NewMetric(measurement, tags, fields, p.getTime(t), tp) - if err != nil { - return - } - if metrics := p.MakeMetric(m); m != nil { - p.pushMetrics(metrics) - } -} - -// AddError passes a runtime error to the accumulator. -// The error will be tagged with the plugin name and written to the log. -func (p *collectRule) AddError(err error) { - if err == nil { - return - } - logger.Debugf("collectRule %s.%s(%d) Error: %s", p.CollectType, p.Name, p.Id, err) -} - -func (p *collectRule) SetPrecision(precision time.Duration) { - p.precision = precision -} - -func (p *collectRule) getTime(t []time.Time) time.Time { - var timestamp time.Time - if len(t) > 0 { - timestamp = t[0] - } else { - timestamp = time.Now() - } - return timestamp.Round(p.precision) -} - -func (p *collectRule) WithTracking(maxTracked int) telegraf.TrackingAccumulator { - return nil -} diff --git a/src/modules/prober/manager/manager.go b/src/modules/prober/manager/manager.go index 465fd8b4..7fe9a2b8 100644 --- a/src/modules/prober/manager/manager.go +++ b/src/modules/prober/manager/manager.go @@ -3,6 +3,7 @@ package manager import ( "container/heap" "context" + "fmt" "log" "time" @@ -167,7 +168,7 @@ func (p *worker) loop(id int) { return case rule := <-p.collectRuleCh: if err := p.do(rule); err != nil { - log.Printf("work[%d].do err %s", id, err) + logger.Debugf("work[%d].do %s", id, err) } } } @@ -175,19 +176,22 @@ func (p *worker) loop(id int) { } func (p *worker) do(rule *collectRule) error { - rule.metrics = rule.metrics[:0] + rule.reset() // telegraf - err := rule.Input.Gather(rule) - if len(rule.metrics) == 0 { - return err + err := rule.input.Gather(rule.acc) + if err != nil { + return fmt.Errorf("gather %s", err) } // eval expression metrics - rule.prepareMetrics() + metrics, err := rule.prepareMetrics() + if err != nil { + return fmt.Errorf("prepareMetrics %s", err) + } - // send - core.Push(rule.metrics) + // push to transfer + core.Push(metrics) return err }