add accumulator for prober & generate default plugin config (#560)

* add accumulator for prober & generate default plugin config

* add prometheus plugin

* add prober plugin test util
This commit is contained in:
yubo 2021-01-29 08:26:28 +08:00 committed by GitHub
parent 2d4e6bb8da
commit fbf4544849
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 2231 additions and 191 deletions

6
go.mod
View File

@ -8,7 +8,9 @@ require (
github.com/codegangsta/negroni v1.0.0
github.com/coreos/go-oidc v2.2.1+incompatible
github.com/dgryski/go-tsz v0.0.0-20180227144327-03b7d791f4fe
github.com/ericchiang/k8s v1.2.0
github.com/garyburd/redigo v1.6.2
github.com/ghodss/yaml v1.0.1-0.20190212211648-25d852aebe32
github.com/gin-contrib/pprof v1.3.0
github.com/gin-gonic/gin v1.6.3
github.com/go-sql-driver/mysql v1.5.0
@ -22,9 +24,12 @@ require (
github.com/m3db/m3 v0.15.17
github.com/mattn/go-isatty v0.0.12
github.com/mattn/go-sqlite3 v1.14.0 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.1
github.com/mojocn/base64Captcha v1.3.1
github.com/open-falcon/rrdlite v0.0.0-20200214140804-bf5829f786ad
github.com/pquerna/cachecontrol v0.0.0-20200819021114-67c6ae64274f // indirect
github.com/prometheus/client_model v0.2.0
github.com/prometheus/common v0.9.1
github.com/robfig/go-cache v0.0.0-20130306151617-9fc39e0dbf62 // indirect
github.com/shirou/gopsutil v3.20.11+incompatible // indirect
github.com/spaolacci/murmur3 v1.1.0
@ -37,7 +42,6 @@ require (
go.uber.org/automaxprocs v1.3.0 // indirect
golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d
golang.org/x/text v0.3.3
google.golang.org/protobuf v1.25.0 // indirect
gopkg.in/alexcesaro/quotedprintable.v3 v3.0.0-20150716171945-2caba252f4dc // indirect
gopkg.in/gomail.v2 v2.0.0-20160411212932-81ebce5c23df
gopkg.in/ldap.v3 v3.1.0

19
go.sum
View File

@ -403,13 +403,6 @@ github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5y
github.com/golang/protobuf v1.3.3/go.mod h1:vzj43D7+SQXF/4pzW/hwtAqwc6iTitCiVSaWz5lYuqw=
github.com/golang/protobuf v1.3.5 h1:F768QJ1E9tib+q5Sc8MkdJi1RxLTbRcTf8LJV56aRls=
github.com/golang/protobuf v1.3.5/go.mod h1:6O5/vntMXwX2lRkT1hjjk0nAC1IDOTvTlVgjlRvqsdk=
github.com/golang/protobuf v1.4.0-rc.1/go.mod h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8=
github.com/golang/protobuf v1.4.0-rc.1.0.20200221234624-67d41d38c208/go.mod h1:xKAWHe0F5eneWXFV3EuXVDTCmh+JuBKY0li0aMyXATA=
github.com/golang/protobuf v1.4.0-rc.2/go.mod h1:LlEzMj4AhA7rCAGe4KMBDvJI+AwstrUpVNzEA03Pprs=
github.com/golang/protobuf v1.4.0-rc.4.0.20200313231945-b860323f09d0/go.mod h1:WU3c8KckQ9AFe+yFwt9sWVRKCVIyN9cPHBJSNnbL67w=
github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvqG2KuDX0=
github.com/golang/protobuf v1.4.1 h1:ZFgWrT+bLgsYPirOnRfKLYJLvssAegOj/hgyMFdJZe0=
github.com/golang/protobuf v1.4.1/go.mod h1:U8fpvMrcmy5pZrNK1lt4xCsGvpyWQ/VVv6QDs8UjoX8=
github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db h1:woRePGFeVFfLKN/pOkfl+p/TAqKOfFu+7KPlMVpok/w=
github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4=
@ -422,7 +415,6 @@ github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5a
github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.1 h1:JFrFEBb2xKufg6XkJsJr+WbKb4FQlURi5RUcBveYu9k=
github.com/google/go-cmp v0.5.1/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.2 h1:X2ev0eStA3AbceY54o37/0PQ/UWqKEiiO2dKL5OPaFM=
@ -1318,8 +1310,6 @@ google.golang.org/genproto v0.0.0-20200212174721-66ed5ce911ce/go.mod h1:55QSHmfG
google.golang.org/genproto v0.0.0-20200305110556-506484158171/go.mod h1:55QSHmfGQM9UVYDPBsyGGes0y52j32PQ3BqQfXhyH3c=
google.golang.org/genproto v0.0.0-20200317114155-1f3552e48f24 h1:IGPykv426z7LZSVPlaPufOyphngM4at5uZ7x5alaFvE=
google.golang.org/genproto v0.0.0-20200317114155-1f3552e48f24/go.mod h1:55QSHmfGQM9UVYDPBsyGGes0y52j32PQ3BqQfXhyH3c=
google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013 h1:+kGHl1aib/qcwaRi1CbqBZ1rk19r85MNUf8HaBghugY=
google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013/go.mod h1:NbSheEEYHJ7i3ixzK3sjbqSGDJWnxyFXZblF3eUsNvo=
google.golang.org/grpc v1.8.0/go.mod h1:yo6s7OP7yaDglbqo1J04qKzAhqBH6lvTonzMVmEdcZw=
google.golang.org/grpc v1.17.0/go.mod h1:6QZJwpn2B+Zp71q/5VxRsJ6NXXVCE5NRUHRo+f3cWCs=
google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c=
@ -1338,15 +1328,6 @@ google.golang.org/grpc v1.27.1/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8
google.golang.org/grpc v1.28.0/go.mod h1:rpkK4SK4GF4Ach/+MFLZUBavHOvF2JJB5uozKKal+60=
google.golang.org/grpc v1.29.1 h1:EC2SB8S04d2r73uptxphDSUG+kTKVgjRPF+N3xpxRB4=
google.golang.org/grpc v1.29.1/go.mod h1:itym6AZVZYACWQqET3MqgPpjcuV5QH3BxFS3IjizoKk=
google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0=
google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM=
google.golang.org/protobuf v1.20.1-0.20200309200217-e05f789c0967/go.mod h1:A+miEFZTKqfCUM6K7xSMQL9OKL/b6hQv+e19PK+JZNE=
google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzikPIcrTAo=
google.golang.org/protobuf v1.22.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=
google.golang.org/protobuf v1.23.1-0.20200526195155-81db48ad09cc/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=
google.golang.org/protobuf v1.25.0 h1:Ejskq+SyPohKW+1uil0JJMtmHCgJPJ/qWTxr8qp+R4c=
google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c=
gopkg.in/airbrake/gobrake.v2 v2.0.9/go.mod h1:/h5ZAUhDkGaJfjzjKLSjv6zCL6O0LLBxU4K+aSYdM/U=
gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw=
gopkg.in/alexcesaro/quotedprintable.v3 v3.0.0-20150716171945-2caba252f4dc h1:2gGKlE2+asNV9m7xrywl36YYNnBG5ZQ0r/BOOxqPpmk=

View File

@ -46,7 +46,7 @@ func convPort(module, listen, portType string) int {
}
port, err := strconv.Atoi(strings.Split(listen, splitChar)[1])
if err != nil {
fmt.Printf("%s.%s invalid", module, portType)
fmt.Printf("address: %s.%s invalid", module, portType)
os.Exit(1)
}
@ -101,7 +101,7 @@ func getMod(modKey string) Module {
mod, has := mods[modKey]
if !has {
fmt.Printf("module(%s) configuration section not found", modKey)
fmt.Printf("address: module(%s) configuration section not found", modKey)
os.Exit(1)
}

View File

@ -1 +0,0 @@
prometheus/

View File

@ -7,6 +7,7 @@ import (
_ "github.com/didi/nightingale/src/modules/monapi/plugins/github"
_ "github.com/didi/nightingale/src/modules/monapi/plugins/mongodb"
_ "github.com/didi/nightingale/src/modules/monapi/plugins/mysql"
_ "github.com/didi/nightingale/src/modules/monapi/plugins/prometheus"
_ "github.com/didi/nightingale/src/modules/monapi/plugins/redis"
_ "github.com/didi/nightingale/src/modules/monapi/plugins/nginx"

View File

@ -0,0 +1,13 @@
package github
import (
"testing"
"github.com/didi/nightingale/src/modules/monapi/plugins"
)
func TestCollect(t *testing.T) {
plugins.PluginTest(t, &GitHubRule{
Repositories: []string{"didi/nightingale"},
})
}

View File

@ -0,0 +1,94 @@
package prometheus
import (
"fmt"
"time"
"github.com/didi/nightingale/src/modules/monapi/collector"
"github.com/didi/nightingale/src/modules/monapi/plugins"
"github.com/didi/nightingale/src/modules/monapi/plugins/prometheus/prometheus"
"github.com/didi/nightingale/src/toolkits/i18n"
"github.com/influxdata/telegraf"
)
func init() {
collector.CollectorRegister(NewPrometheusCollector()) // for monapi
i18n.DictRegister(langDict)
}
var (
langDict = map[string]map[string]string{
"zh": map[string]string{
"URLs": "网址",
"An array of urls to scrape metrics from": "采集数据的网址",
"URL Tag": "网址标签",
"Url tag name (tag containing scrapped url. optional, default is \"url\")": "url 标签名称,默认值 \"url\"",
"An array of Kubernetes services to scrape metrics from": "采集kube服务的地址",
"Kubernetes config file contenct to create client from": "kube config 文件内容用来连接kube服务",
"Use bearer token for authorization. ('bearer_token' takes priority)": "用户的Bearer令牌优先级高于 username/password",
"HTTP Basic Authentication username": "HTTP认证用户名",
"HTTP Basic Authentication password": "HTTP认证密码",
"RESP Timeout": "请求超时时间",
"Specify timeout duration for slower prometheus clients": "k8s请求超时时间, 单位: 秒",
},
}
)
type PrometheusCollector struct {
*collector.BaseCollector
}
func NewPrometheusCollector() *PrometheusCollector {
return &PrometheusCollector{BaseCollector: collector.NewBaseCollector(
"prometheus",
collector.RemoteCategory,
func() collector.TelegrafPlugin { return &PrometheusRule{} },
)}
}
type PrometheusRule struct {
URLs []string `label:"URLs" json:"urls,required" description:"An array of urls to scrape metrics from" example:"http://my-service-exporter:8080/metrics"`
// URLTag string `label:"URL Tag" json:"url_tag" description:"Url tag name (tag containing scrapped url. optional, default is \"url\")" example:"scrapeUrl"`
// KubernetesServices []string `label:"Kube Services" json:"kubernetes_services" description:"An array of Kubernetes services to scrape metrics from" example:"http://my-service-dns.my-namespace:9100/metrics"`
// KubeConfigContent string `label:"Kube Conf" json:"kube_config_content" format:"file" description:"Kubernetes config file contenct to create client from"`
// MonitorPods bool `label:"Monitor Pods" json:"monitor_kubernetes_pods" description:"Scrape Kubernetes pods for the following prometheus annotations:<br />- prometheus.io/scrape: Enable scraping for this pod<br />- prometheus.io/scheme: If the metrics endpoint is secured then you will need to<br /> set this to 'https' & most likely set the tls config.<br />- prometheus.io/path: If the metrics path is not /metrics, define it with this annotation.<br />- prometheus.io/port: If port is not 9102 use this annotation"`
// PodNamespace string `label:"Pod Namespace" json:"monitor_kubernetes_pods_namespace" description:"Restricts Kubernetes monitoring to a single namespace" example:"default"`
// KubernetesLabelSelector string `label:"Kube Label Selector" json:"kubernetes_label_selector" description:"label selector to target pods which have the label" example:"env=dev,app=nginx"`
// KubernetesFieldSelector string `label:"Kube Field Selector" json:"kubernetes_field_selector" description:"field selector to target pods<br />eg. To scrape pods on a specific node" example:"spec.nodeName=$HOSTNAME"`
// BearerTokenString string `label:"Bearer Token" json:"bearer_token_string" format:"file" description:"Use bearer token for authorization. ('bearer_token' takes priority)"`
// Username string `label:"Username" json:"username" description:"HTTP Basic Authentication username"`
// Password string `label:"Password" json:"password" format:"password" description:"HTTP Basic Authentication password"`
ResponseTimeout int `label:"RESP Timeout" json:"response_timeout" default:"3" description:"Specify timeout duration for slower prometheus clients"`
plugins.ClientConfig
}
func (p *PrometheusRule) Validate() error {
if len(p.URLs) == 0 || p.URLs[0] == "" {
return fmt.Errorf(" prometheus.rule unable to get urls")
}
return nil
}
func (p *PrometheusRule) TelegrafInput() (telegraf.Input, error) {
if err := p.Validate(); err != nil {
return nil, err
}
return &prometheus.Prometheus{
URLs: p.URLs,
URLTag: "target",
// KubernetesServices: p.KubernetesServices,
// KubeConfigContent: p.KubeConfigContent,
// MonitorPods: p.MonitorPods,
// PodNamespace: p.PodNamespace,
// KubernetesLabelSelector: p.KubernetesLabelSelector,
// KubernetesFieldSelector: p.KubernetesFieldSelector,
// BearerTokenString: p.BearerTokenString,
// Username: p.Username,
// Password: p.Password,
ResponseTimeout: time.Second * time.Duration(p.ResponseTimeout),
MetricVersion: 2,
Log: plugins.GetLogger(),
ClientConfig: p.ClientConfig.TlsClientConfig(),
}, nil
}

View File

@ -0,0 +1,171 @@
# Prometheus Input Plugin
The prometheus input plugin gathers metrics from HTTP servers exposing metrics
in Prometheus format.
### Configuration:
```toml
# Read metrics from one or many prometheus clients
[[inputs.prometheus]]
## An array of urls to scrape metrics from.
urls = ["http://localhost:9100/metrics"]
## Metric version controls the mapping from Prometheus metrics into
## Telegraf metrics. When using the prometheus_client output, use the same
## value in both plugins to ensure metrics are round-tripped without
## modification.
##
## example: metric_version = 1; deprecated in 1.13
## metric_version = 2; recommended version
# metric_version = 1
## An array of Kubernetes services to scrape metrics from.
# kubernetes_services = ["http://my-service-dns.my-namespace:9100/metrics"]
## Kubernetes config file to create client from.
# kube_config = "/path/to/kubernetes.config"
## Scrape Kubernetes pods for the following prometheus annotations:
## - prometheus.io/scrape: Enable scraping for this pod
## - prometheus.io/scheme: If the metrics endpoint is secured then you will need to
## set this to `https` & most likely set the tls config.
## - prometheus.io/path: If the metrics path is not /metrics, define it with this annotation.
## - prometheus.io/port: If port is not 9102 use this annotation
# monitor_kubernetes_pods = true
## Restricts Kubernetes monitoring to a single namespace
## ex: monitor_kubernetes_pods_namespace = "default"
# monitor_kubernetes_pods_namespace = ""
# label selector to target pods which have the label
# kubernetes_label_selector = "env=dev,app=nginx"
# field selector to target pods
# eg. To scrape pods on a specific node
# kubernetes_field_selector = "spec.nodeName=$HOSTNAME"
## Use bearer token for authorization. ('bearer_token' takes priority)
# bearer_token = "/path/to/bearer/token"
## OR
# bearer_token_string = "abc_123"
## HTTP Basic Authentication username and password. ('bearer_token' and
## 'bearer_token_string' take priority)
# username = ""
# password = ""
## Specify timeout duration for slower prometheus clients (default is 3s)
# response_timeout = "3s"
## Optional TLS Config
# tls_ca = /path/to/cafile
# tls_cert = /path/to/certfile
# tls_key = /path/to/keyfile
## Use TLS but skip chain & host verification
# insecure_skip_verify = false
```
`urls` can contain a unix socket as well. If a different path is required (default is `/metrics` for both http[s] and unix) for a unix socket, add `path` as a query parameter as follows: `unix:///var/run/prometheus.sock?path=/custom/metrics`
#### Kubernetes Service Discovery
URLs listed in the `kubernetes_services` parameter will be expanded
by looking up all A records assigned to the hostname as described in
[Kubernetes DNS service discovery](https://kubernetes.io/docs/concepts/services-networking/service/#dns).
This method can be used to locate all
[Kubernetes headless services](https://kubernetes.io/docs/concepts/services-networking/service/#headless-services).
#### Kubernetes scraping
Enabling this option will allow the plugin to scrape for prometheus annotation on Kubernetes
pods. Currently, you can run this plugin in your kubernetes cluster, or we use the kubeconfig
file to determine where to monitor.
Currently the following annotation are supported:
* `prometheus.io/scrape` Enable scraping for this pod.
* `prometheus.io/scheme` If the metrics endpoint is secured then you will need to set this to `https` & most likely set the tls config. (default 'http')
* `prometheus.io/path` Override the path for the metrics endpoint on the service. (default '/metrics')
* `prometheus.io/port` Used to override the port. (default 9102)
Using the `monitor_kubernetes_pods_namespace` option allows you to limit which pods you are scraping.
#### Bearer Token
If set, the file specified by the `bearer_token` parameter will be read on
each interval and its contents will be appended to the Bearer string in the
Authorization header.
### Usage for Caddy HTTP server
If you want to monitor Caddy, you need to use Caddy with its Prometheus plugin:
* Download Caddy+Prometheus plugin [here](https://caddyserver.com/download/linux/amd64?plugins=http.prometheus)
* Add the `prometheus` directive in your `CaddyFile`
* Restart Caddy
* Configure Telegraf to fetch metrics on it:
```toml
[[inputs.prometheus]]
# ## An array of urls to scrape metrics from.
urls = ["http://localhost:9180/metrics"]
```
> This is the default URL where Caddy Prometheus plugin will send data.
> For more details, please read the [Caddy Prometheus documentation](https://github.com/miekg/caddy-prometheus/blob/master/README.md).
### Metrics:
Measurement names are based on the Metric Family and tags are created for each
label. The value is added to a field named based on the metric type.
All metrics receive the `url` tag indicating the related URL specified in the
Telegraf configuration. If using Kubernetes service discovery the `address`
tag is also added indicating the discovered ip address.
### Example Output:
**Source**
```
# HELP go_gc_duration_seconds A summary of the GC invocation durations.
# TYPE go_gc_duration_seconds summary
go_gc_duration_seconds{quantile="0"} 7.4545e-05
go_gc_duration_seconds{quantile="0.25"} 7.6999e-05
go_gc_duration_seconds{quantile="0.5"} 0.000277935
go_gc_duration_seconds{quantile="0.75"} 0.000706591
go_gc_duration_seconds{quantile="1"} 0.000706591
go_gc_duration_seconds_sum 0.00113607
go_gc_duration_seconds_count 4
# HELP go_goroutines Number of goroutines that currently exist.
# TYPE go_goroutines gauge
go_goroutines 15
# HELP cpu_usage_user Telegraf collected metric
# TYPE cpu_usage_user gauge
cpu_usage_user{cpu="cpu0"} 1.4112903225816156
cpu_usage_user{cpu="cpu1"} 0.702106318955865
cpu_usage_user{cpu="cpu2"} 2.0161290322588776
cpu_usage_user{cpu="cpu3"} 1.5045135406226022
```
**Output**
```
go_gc_duration_seconds,url=http://example.org:9273/metrics 1=0.001336611,count=14,sum=0.004527551,0=0.000057965,0.25=0.000083812,0.5=0.000286537,0.75=0.000365303 1505776733000000000
go_goroutines,url=http://example.org:9273/metrics gauge=21 1505776695000000000
cpu_usage_user,cpu=cpu0,url=http://example.org:9273/metrics gauge=1.513622603430151 1505776751000000000
cpu_usage_user,cpu=cpu1,url=http://example.org:9273/metrics gauge=5.829145728641773 1505776751000000000
cpu_usage_user,cpu=cpu2,url=http://example.org:9273/metrics gauge=2.119071644805144 1505776751000000000
cpu_usage_user,cpu=cpu3,url=http://example.org:9273/metrics gauge=1.5228426395944945 1505776751000000000
```
**Output (when metric_version = 2)**
```
prometheus,quantile=1,url=http://example.org:9273/metrics go_gc_duration_seconds=0.005574303 1556075100000000000
prometheus,quantile=0.75,url=http://example.org:9273/metrics go_gc_duration_seconds=0.0001046 1556075100000000000
prometheus,quantile=0.5,url=http://example.org:9273/metrics go_gc_duration_seconds=0.0000719 1556075100000000000
prometheus,quantile=0.25,url=http://example.org:9273/metrics go_gc_duration_seconds=0.0000579 1556075100000000000
prometheus,quantile=0,url=http://example.org:9273/metrics go_gc_duration_seconds=0.0000349 1556075100000000000
prometheus,url=http://example.org:9273/metrics go_gc_duration_seconds_count=324,go_gc_duration_seconds_sum=0.091340353 1556075100000000000
prometheus,url=http://example.org:9273/metrics go_goroutines=15 1556075100000000000
prometheus,cpu=cpu0,url=http://example.org:9273/metrics cpu_usage_user=1.513622603430151 1505776751000000000
prometheus,cpu=cpu1,url=http://example.org:9273/metrics cpu_usage_user=5.829145728641773 1505776751000000000
prometheus,cpu=cpu2,url=http://example.org:9273/metrics cpu_usage_user=2.119071644805144 1505776751000000000
prometheus,cpu=cpu3,url=http://example.org:9273/metrics cpu_usage_user=1.5228426395944945 1505776751000000000
```

View File

@ -0,0 +1,237 @@
package prometheus
import (
"context"
"log"
"net"
"net/url"
"sync"
"time"
"github.com/ericchiang/k8s"
corev1 "github.com/ericchiang/k8s/apis/core/v1"
"github.com/ghodss/yaml"
)
type payload struct {
eventype string
pod *corev1.Pod
}
// loadClient parses a kubeconfig from a file and returns a Kubernetes
// client. It does not support extensions or client auth providers.
func loadClient(kubeconfig string) (*k8s.Client, error) {
// data, err := ioutil.ReadFile(kubeconfigPath)
// if err != nil {
// return nil, fmt.Errorf("failed reading '%s': %v", kubeconfigPath, err)
// }
// Unmarshal YAML into a Kubernetes config object.
var config k8s.Config
if err := yaml.Unmarshal([]byte(kubeconfig), &config); err != nil {
return nil, err
}
return k8s.NewClient(&config)
}
func (p *Prometheus) start(ctx context.Context) error {
client, err := k8s.NewInClusterClient()
if err != nil {
// u, err := user.Current()
// if err != nil {
// return fmt.Errorf("Failed to get current user - %v", err)
// }
// configLocation := filepath.Join(u.HomeDir, ".kube/config")
// if p.KubeConfig != "" {
// configLocation = p.KubeConfig
// }
client, err = loadClient(p.KubeConfigContent)
if err != nil {
return err
}
}
p.wg = sync.WaitGroup{}
p.wg.Add(1)
go func() {
defer p.wg.Done()
for {
select {
case <-ctx.Done():
return
case <-time.After(time.Second):
err := p.watch(ctx, client)
if err != nil {
p.Log.Errorf("Unable to watch resources: %s", err.Error())
}
}
}
}()
return nil
}
// An edge case exists if a pod goes offline at the same time a new pod is created
// (without the scrape annotations). K8s may re-assign the old pod ip to the non-scrape
// pod, causing errors in the logs. This is only true if the pod going offline is not
// directed to do so by K8s.
func (p *Prometheus) watch(ctx context.Context, client *k8s.Client) error {
selectors := podSelector(p)
pod := &corev1.Pod{}
watcher, err := client.Watch(ctx, p.PodNamespace, &corev1.Pod{}, selectors...)
if err != nil {
return err
}
defer watcher.Close()
for {
select {
case <-ctx.Done():
return nil
default:
pod = &corev1.Pod{}
// An error here means we need to reconnect the watcher.
eventType, err := watcher.Next(pod)
if err != nil {
return err
}
// If the pod is not "ready", there will be no ip associated with it.
if pod.GetMetadata().GetAnnotations()["prometheus.io/scrape"] != "true" ||
!podReady(pod.Status.GetContainerStatuses()) {
continue
}
switch eventType {
case k8s.EventAdded:
registerPod(pod, p)
case k8s.EventModified:
// To avoid multiple actions for each event, unregister on the first event
// in the delete sequence, when the containers are still "ready".
if pod.Metadata.GetDeletionTimestamp() != nil {
unregisterPod(pod, p)
} else {
registerPod(pod, p)
}
}
}
}
}
func podReady(statuss []*corev1.ContainerStatus) bool {
if len(statuss) == 0 {
return false
}
for _, cs := range statuss {
if !cs.GetReady() {
return false
}
}
return true
}
func podSelector(p *Prometheus) []k8s.Option {
options := []k8s.Option{}
if len(p.KubernetesLabelSelector) > 0 {
options = append(options, k8s.QueryParam("labelSelector", p.KubernetesLabelSelector))
}
if len(p.KubernetesFieldSelector) > 0 {
options = append(options, k8s.QueryParam("fieldSelector", p.KubernetesFieldSelector))
}
return options
}
func registerPod(pod *corev1.Pod, p *Prometheus) {
if p.kubernetesPods == nil {
p.kubernetesPods = map[string]URLAndAddress{}
}
targetURL := getScrapeURL(pod)
if targetURL == nil {
return
}
log.Printf("D! [inputs.prometheus] will scrape metrics from %q", *targetURL)
// add annotation as metrics tags
tags := pod.GetMetadata().GetAnnotations()
if tags == nil {
tags = map[string]string{}
}
tags["pod_name"] = pod.GetMetadata().GetName()
tags["namespace"] = pod.GetMetadata().GetNamespace()
// add labels as metrics tags
for k, v := range pod.GetMetadata().GetLabels() {
tags[k] = v
}
URL, err := url.Parse(*targetURL)
if err != nil {
log.Printf("E! [inputs.prometheus] could not parse URL %q: %s", *targetURL, err.Error())
return
}
podURL := p.AddressToURL(URL, URL.Hostname())
p.lock.Lock()
p.kubernetesPods[podURL.String()] = URLAndAddress{
URL: podURL,
Address: URL.Hostname(),
OriginalURL: URL,
Tags: tags,
}
p.lock.Unlock()
}
func getScrapeURL(pod *corev1.Pod) *string {
ip := pod.Status.GetPodIP()
if ip == "" {
// return as if scrape was disabled, we will be notified again once the pod
// has an IP
return nil
}
scheme := pod.GetMetadata().GetAnnotations()["prometheus.io/scheme"]
path := pod.GetMetadata().GetAnnotations()["prometheus.io/path"]
port := pod.GetMetadata().GetAnnotations()["prometheus.io/port"]
if scheme == "" {
scheme = "http"
}
if port == "" {
port = "9102"
}
if path == "" {
path = "/metrics"
}
u := &url.URL{
Scheme: scheme,
Host: net.JoinHostPort(ip, port),
Path: path,
}
x := u.String()
return &x
}
func unregisterPod(pod *corev1.Pod, p *Prometheus) {
url := getScrapeURL(pod)
if url == nil {
return
}
log.Printf("D! [inputs.prometheus] registered a delete request for %q in namespace %q",
pod.GetMetadata().GetName(), pod.GetMetadata().GetNamespace())
p.lock.Lock()
defer p.lock.Unlock()
if _, ok := p.kubernetesPods[*url]; ok {
delete(p.kubernetesPods, *url)
log.Printf("D! [inputs.prometheus] will stop scraping for %q", *url)
}
}

View File

@ -0,0 +1,155 @@
package prometheus
import (
"github.com/ericchiang/k8s"
"testing"
"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/assert"
v1 "github.com/ericchiang/k8s/apis/core/v1"
metav1 "github.com/ericchiang/k8s/apis/meta/v1"
)
func TestScrapeURLNoAnnotations(t *testing.T) {
p := &v1.Pod{Metadata: &metav1.ObjectMeta{}}
p.GetMetadata().Annotations = map[string]string{}
url := getScrapeURL(p)
assert.Nil(t, url)
}
func TestScrapeURLAnnotationsNoScrape(t *testing.T) {
p := &v1.Pod{Metadata: &metav1.ObjectMeta{}}
p.Metadata.Name = str("myPod")
p.Metadata.Annotations = map[string]string{"prometheus.io/scrape": "false"}
url := getScrapeURL(p)
assert.Nil(t, url)
}
func TestScrapeURLAnnotations(t *testing.T) {
p := pod()
p.Metadata.Annotations = map[string]string{"prometheus.io/scrape": "true"}
url := getScrapeURL(p)
assert.Equal(t, "http://127.0.0.1:9102/metrics", *url)
}
func TestScrapeURLAnnotationsCustomPort(t *testing.T) {
p := pod()
p.Metadata.Annotations = map[string]string{"prometheus.io/scrape": "true", "prometheus.io/port": "9000"}
url := getScrapeURL(p)
assert.Equal(t, "http://127.0.0.1:9000/metrics", *url)
}
func TestScrapeURLAnnotationsCustomPath(t *testing.T) {
p := pod()
p.Metadata.Annotations = map[string]string{"prometheus.io/scrape": "true", "prometheus.io/path": "mymetrics"}
url := getScrapeURL(p)
assert.Equal(t, "http://127.0.0.1:9102/mymetrics", *url)
}
func TestScrapeURLAnnotationsCustomPathWithSep(t *testing.T) {
p := pod()
p.Metadata.Annotations = map[string]string{"prometheus.io/scrape": "true", "prometheus.io/path": "/mymetrics"}
url := getScrapeURL(p)
assert.Equal(t, "http://127.0.0.1:9102/mymetrics", *url)
}
func TestAddPod(t *testing.T) {
prom := &Prometheus{Log: testutil.Logger{}}
p := pod()
p.Metadata.Annotations = map[string]string{"prometheus.io/scrape": "true"}
registerPod(p, prom)
assert.Equal(t, 1, len(prom.kubernetesPods))
}
func TestAddMultipleDuplicatePods(t *testing.T) {
prom := &Prometheus{Log: testutil.Logger{}}
p := pod()
p.Metadata.Annotations = map[string]string{"prometheus.io/scrape": "true"}
registerPod(p, prom)
p.Metadata.Name = str("Pod2")
registerPod(p, prom)
assert.Equal(t, 1, len(prom.kubernetesPods))
}
func TestAddMultiplePods(t *testing.T) {
prom := &Prometheus{Log: testutil.Logger{}}
p := pod()
p.Metadata.Annotations = map[string]string{"prometheus.io/scrape": "true"}
registerPod(p, prom)
p.Metadata.Name = str("Pod2")
p.Status.PodIP = str("127.0.0.2")
registerPod(p, prom)
assert.Equal(t, 2, len(prom.kubernetesPods))
}
func TestDeletePods(t *testing.T) {
prom := &Prometheus{Log: testutil.Logger{}}
p := pod()
p.Metadata.Annotations = map[string]string{"prometheus.io/scrape": "true"}
registerPod(p, prom)
unregisterPod(p, prom)
assert.Equal(t, 0, len(prom.kubernetesPods))
}
func TestPodSelector(t *testing.T) {
cases := []struct {
expected []k8s.Option
labelselector string
fieldselector string
}{
{
expected: []k8s.Option{
k8s.QueryParam("labelSelector", "key1=val1,key2=val2,key3"),
k8s.QueryParam("fieldSelector", "spec.nodeName=ip-1-2-3-4.acme.com"),
},
labelselector: "key1=val1,key2=val2,key3",
fieldselector: "spec.nodeName=ip-1-2-3-4.acme.com",
},
{
expected: []k8s.Option{
k8s.QueryParam("labelSelector", "key1"),
k8s.QueryParam("fieldSelector", "spec.nodeName=ip-1-2-3-4.acme.com"),
},
labelselector: "key1",
fieldselector: "spec.nodeName=ip-1-2-3-4.acme.com",
},
{
expected: []k8s.Option{
k8s.QueryParam("labelSelector", "key1"),
k8s.QueryParam("fieldSelector", "somefield"),
},
labelselector: "key1",
fieldselector: "somefield",
},
}
for _, c := range cases {
prom := &Prometheus{
Log: testutil.Logger{},
KubernetesLabelSelector: c.labelselector,
KubernetesFieldSelector: c.fieldselector,
}
output := podSelector(prom)
assert.Equal(t, len(output), len(c.expected))
}
}
func pod() *v1.Pod {
p := &v1.Pod{Metadata: &metav1.ObjectMeta{}, Status: &v1.PodStatus{}, Spec: &v1.PodSpec{}}
p.Status.PodIP = str("127.0.0.1")
p.Metadata.Name = str("myPod")
p.Metadata.Namespace = str("default")
return p
}
func str(x string) *string {
return &x
}

View File

@ -0,0 +1,320 @@
package prometheus
// Parser inspired from
// https://github.com/prometheus/prom2json/blob/master/main.go
import (
"bufio"
"bytes"
"fmt"
"io"
"math"
"mime"
"net/http"
"time"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/metric"
"github.com/matttproud/golang_protobuf_extensions/pbutil"
dto "github.com/prometheus/client_model/go"
"github.com/prometheus/common/expfmt"
)
// Parse returns a slice of Metrics from a text representation of a
// metrics
func ParseV2(buf []byte, header http.Header) ([]telegraf.Metric, error) {
var metrics []telegraf.Metric
var parser expfmt.TextParser
// parse even if the buffer begins with a newline
buf = bytes.TrimPrefix(buf, []byte("\n"))
// Read raw data
buffer := bytes.NewBuffer(buf)
reader := bufio.NewReader(buffer)
mediatype, params, err := mime.ParseMediaType(header.Get("Content-Type"))
// Prepare output
metricFamilies := make(map[string]*dto.MetricFamily)
if err == nil && mediatype == "application/vnd.google.protobuf" &&
params["encoding"] == "delimited" &&
params["proto"] == "io.prometheus.client.MetricFamily" {
for {
mf := &dto.MetricFamily{}
if _, ierr := pbutil.ReadDelimited(reader, mf); ierr != nil {
if ierr == io.EOF {
break
}
return nil, fmt.Errorf("reading metric family protocol buffer failed: %s", ierr)
}
metricFamilies[mf.GetName()] = mf
}
} else {
metricFamilies, err = parser.TextToMetricFamilies(reader)
if err != nil {
return nil, fmt.Errorf("reading text format failed: %s", err)
}
}
// make sure all metrics have a consistent timestamp so that metrics don't straddle two different seconds
now := time.Now()
// read metrics
for metricName, mf := range metricFamilies {
for _, m := range mf.Metric {
// reading tags
tags := makeLabels(m)
if mf.GetType() == dto.MetricType_SUMMARY {
// summary metric
telegrafMetrics := makeQuantilesV2(m, tags, metricName, mf.GetType(), now)
metrics = append(metrics, telegrafMetrics...)
} else if mf.GetType() == dto.MetricType_HISTOGRAM {
// histogram metric
telegrafMetrics := makeBucketsV2(m, tags, metricName, mf.GetType(), now)
metrics = append(metrics, telegrafMetrics...)
} else {
// standard metric
// reading fields
fields := getNameAndValueV2(m, metricName)
// converting to telegraf metric
if len(fields) > 0 {
var t time.Time
if m.TimestampMs != nil && *m.TimestampMs > 0 {
t = time.Unix(0, *m.TimestampMs*1000000)
} else {
t = now
}
metric, err := metric.New("prometheus", tags, fields, t, valueType(mf.GetType()))
if err == nil {
metrics = append(metrics, metric)
}
}
}
}
}
return metrics, err
}
// Get Quantiles for summary metric & Buckets for histogram
func makeQuantilesV2(m *dto.Metric, tags map[string]string, metricName string, metricType dto.MetricType, now time.Time) []telegraf.Metric {
var metrics []telegraf.Metric
fields := make(map[string]interface{})
var t time.Time
if m.TimestampMs != nil && *m.TimestampMs > 0 {
t = time.Unix(0, *m.TimestampMs*1000000)
} else {
t = now
}
fields[metricName+"_count"] = float64(m.GetSummary().GetSampleCount())
fields[metricName+"_sum"] = float64(m.GetSummary().GetSampleSum())
met, err := metric.New("prometheus", tags, fields, t, valueType(metricType))
if err == nil {
metrics = append(metrics, met)
}
for _, q := range m.GetSummary().Quantile {
newTags := tags
fields = make(map[string]interface{})
newTags["quantile"] = fmt.Sprint(q.GetQuantile())
fields[metricName] = float64(q.GetValue())
quantileMetric, err := metric.New("prometheus", newTags, fields, t, valueType(metricType))
if err == nil {
metrics = append(metrics, quantileMetric)
}
}
return metrics
}
// Get Buckets from histogram metric
func makeBucketsV2(m *dto.Metric, tags map[string]string, metricName string, metricType dto.MetricType, now time.Time) []telegraf.Metric {
var metrics []telegraf.Metric
fields := make(map[string]interface{})
var t time.Time
if m.TimestampMs != nil && *m.TimestampMs > 0 {
t = time.Unix(0, *m.TimestampMs*1000000)
} else {
t = now
}
fields[metricName+"_count"] = float64(m.GetHistogram().GetSampleCount())
fields[metricName+"_sum"] = float64(m.GetHistogram().GetSampleSum())
met, err := metric.New("prometheus", tags, fields, t, valueType(metricType))
if err == nil {
metrics = append(metrics, met)
}
for _, b := range m.GetHistogram().Bucket {
newTags := tags
fields = make(map[string]interface{})
newTags["le"] = fmt.Sprint(b.GetUpperBound())
fields[metricName+"_bucket"] = float64(b.GetCumulativeCount())
histogramMetric, err := metric.New("prometheus", newTags, fields, t, valueType(metricType))
if err == nil {
metrics = append(metrics, histogramMetric)
}
}
return metrics
}
// Parse returns a slice of Metrics from a text representation of a
// metrics
func Parse(buf []byte, header http.Header) ([]telegraf.Metric, error) {
var metrics []telegraf.Metric
var parser expfmt.TextParser
// parse even if the buffer begins with a newline
buf = bytes.TrimPrefix(buf, []byte("\n"))
// Read raw data
buffer := bytes.NewBuffer(buf)
reader := bufio.NewReader(buffer)
mediatype, params, err := mime.ParseMediaType(header.Get("Content-Type"))
// Prepare output
metricFamilies := make(map[string]*dto.MetricFamily)
if err == nil && mediatype == "application/vnd.google.protobuf" &&
params["encoding"] == "delimited" &&
params["proto"] == "io.prometheus.client.MetricFamily" {
for {
mf := &dto.MetricFamily{}
if _, ierr := pbutil.ReadDelimited(reader, mf); ierr != nil {
if ierr == io.EOF {
break
}
return nil, fmt.Errorf("reading metric family protocol buffer failed: %s", ierr)
}
metricFamilies[mf.GetName()] = mf
}
} else {
metricFamilies, err = parser.TextToMetricFamilies(reader)
if err != nil {
return nil, fmt.Errorf("reading text format failed: %s", err)
}
}
// make sure all metrics have a consistent timestamp so that metrics don't straddle two different seconds
now := time.Now()
// read metrics
for metricName, mf := range metricFamilies {
for _, m := range mf.Metric {
// reading tags
tags := makeLabels(m)
// reading fields
var fields map[string]interface{}
if mf.GetType() == dto.MetricType_SUMMARY {
// summary metric
fields = makeQuantiles(m)
fields["count"] = float64(m.GetSummary().GetSampleCount())
fields["sum"] = float64(m.GetSummary().GetSampleSum())
} else if mf.GetType() == dto.MetricType_HISTOGRAM {
// histogram metric
fields = makeBuckets(m)
fields["count"] = float64(m.GetHistogram().GetSampleCount())
fields["sum"] = float64(m.GetHistogram().GetSampleSum())
} else {
// standard metric
fields = getNameAndValue(m)
}
// converting to telegraf metric
if len(fields) > 0 {
var t time.Time
if m.TimestampMs != nil && *m.TimestampMs > 0 {
t = time.Unix(0, *m.TimestampMs*1000000)
} else {
t = now
}
metric, err := metric.New(metricName, tags, fields, t, valueType(mf.GetType()))
if err == nil {
metrics = append(metrics, metric)
}
}
}
}
return metrics, err
}
func valueType(mt dto.MetricType) telegraf.ValueType {
switch mt {
case dto.MetricType_COUNTER:
return telegraf.Counter
case dto.MetricType_GAUGE:
return telegraf.Gauge
case dto.MetricType_SUMMARY:
return telegraf.Summary
case dto.MetricType_HISTOGRAM:
return telegraf.Histogram
default:
return telegraf.Untyped
}
}
// Get Quantiles from summary metric
func makeQuantiles(m *dto.Metric) map[string]interface{} {
fields := make(map[string]interface{})
for _, q := range m.GetSummary().Quantile {
if !math.IsNaN(q.GetValue()) {
fields[fmt.Sprint(q.GetQuantile())] = float64(q.GetValue())
}
}
return fields
}
// Get Buckets from histogram metric
func makeBuckets(m *dto.Metric) map[string]interface{} {
fields := make(map[string]interface{})
for _, b := range m.GetHistogram().Bucket {
fields[fmt.Sprint(b.GetUpperBound())] = float64(b.GetCumulativeCount())
}
return fields
}
// Get labels from metric
func makeLabels(m *dto.Metric) map[string]string {
result := map[string]string{}
for _, lp := range m.Label {
result[lp.GetName()] = lp.GetValue()
}
return result
}
// Get name and value from metric
func getNameAndValue(m *dto.Metric) map[string]interface{} {
fields := make(map[string]interface{})
if m.Gauge != nil {
if !math.IsNaN(m.GetGauge().GetValue()) {
fields["gauge"] = float64(m.GetGauge().GetValue())
}
} else if m.Counter != nil {
if !math.IsNaN(m.GetCounter().GetValue()) {
fields["counter"] = float64(m.GetCounter().GetValue())
}
} else if m.Untyped != nil {
if !math.IsNaN(m.GetUntyped().GetValue()) {
fields["value"] = float64(m.GetUntyped().GetValue())
}
}
return fields
}
// Get name and value from metric
func getNameAndValueV2(m *dto.Metric, metricName string) map[string]interface{} {
fields := make(map[string]interface{})
if m.Gauge != nil {
if !math.IsNaN(m.GetGauge().GetValue()) {
fields[metricName] = float64(m.GetGauge().GetValue())
}
} else if m.Counter != nil {
if !math.IsNaN(m.GetCounter().GetValue()) {
fields[metricName] = float64(m.GetCounter().GetValue())
}
} else if m.Untyped != nil {
if !math.IsNaN(m.GetUntyped().GetValue()) {
fields[metricName] = float64(m.GetUntyped().GetValue())
}
}
return fields
}

View File

@ -0,0 +1,167 @@
package prometheus
import (
"net/http"
"testing"
"time"
"github.com/stretchr/testify/assert"
)
var exptime = time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC)
const validUniqueGauge = `# HELP cadvisor_version_info A metric with a constant '1' value labeled by kernel version, OS version, docker version, cadvisor version & cadvisor revision.
# TYPE cadvisor_version_info gauge
cadvisor_version_info{cadvisorRevision="",cadvisorVersion="",dockerVersion="1.8.2",kernelVersion="3.10.0-229.20.1.el7.x86_64",osVersion="CentOS Linux 7 (Core)"} 1
`
const validUniqueCounter = `# HELP get_token_fail_count Counter of failed Token() requests to the alternate token source
# TYPE get_token_fail_count counter
get_token_fail_count 0
`
const validUniqueLine = `# HELP get_token_fail_count Counter of failed Token() requests to the alternate token source
`
const validUniqueSummary = `# HELP http_request_duration_microseconds The HTTP request latencies in microseconds.
# TYPE http_request_duration_microseconds summary
http_request_duration_microseconds{handler="prometheus",quantile="0.5"} 552048.506
http_request_duration_microseconds{handler="prometheus",quantile="0.9"} 5.876804288e+06
http_request_duration_microseconds{handler="prometheus",quantile="0.99"} 5.876804288e+06
http_request_duration_microseconds_sum{handler="prometheus"} 1.8909097205e+07
http_request_duration_microseconds_count{handler="prometheus"} 9
`
const validUniqueHistogram = `# HELP apiserver_request_latencies Response latency distribution in microseconds for each verb, resource and client.
# TYPE apiserver_request_latencies histogram
apiserver_request_latencies_bucket{resource="bindings",verb="POST",le="125000"} 1994
apiserver_request_latencies_bucket{resource="bindings",verb="POST",le="250000"} 1997
apiserver_request_latencies_bucket{resource="bindings",verb="POST",le="500000"} 2000
apiserver_request_latencies_bucket{resource="bindings",verb="POST",le="1e+06"} 2005
apiserver_request_latencies_bucket{resource="bindings",verb="POST",le="2e+06"} 2012
apiserver_request_latencies_bucket{resource="bindings",verb="POST",le="4e+06"} 2017
apiserver_request_latencies_bucket{resource="bindings",verb="POST",le="8e+06"} 2024
apiserver_request_latencies_bucket{resource="bindings",verb="POST",le="+Inf"} 2025
apiserver_request_latencies_sum{resource="bindings",verb="POST"} 1.02726334e+08
apiserver_request_latencies_count{resource="bindings",verb="POST"} 2025
`
const validData = `# HELP cadvisor_version_info A metric with a constant '1' value labeled by kernel version, OS version, docker version, cadvisor version & cadvisor revision.
# TYPE cadvisor_version_info gauge
cadvisor_version_info{cadvisorRevision="",cadvisorVersion="",dockerVersion="1.8.2",kernelVersion="3.10.0-229.20.1.el7.x86_64",osVersion="CentOS Linux 7 (Core)"} 1
# HELP go_gc_duration_seconds A summary of the GC invocation durations.
# TYPE go_gc_duration_seconds summary
go_gc_duration_seconds{quantile="0"} 0.013534896000000001
go_gc_duration_seconds{quantile="0.25"} 0.02469263
go_gc_duration_seconds{quantile="0.5"} 0.033727822000000005
go_gc_duration_seconds{quantile="0.75"} 0.03840335
go_gc_duration_seconds{quantile="1"} 0.049956604
go_gc_duration_seconds_sum 1970.341293002
go_gc_duration_seconds_count 65952
# HELP http_request_duration_microseconds The HTTP request latencies in microseconds.
# TYPE http_request_duration_microseconds summary
http_request_duration_microseconds{handler="prometheus",quantile="0.5"} 552048.506
http_request_duration_microseconds{handler="prometheus",quantile="0.9"} 5.876804288e+06
http_request_duration_microseconds{handler="prometheus",quantile="0.99"} 5.876804288e+06
http_request_duration_microseconds_sum{handler="prometheus"} 1.8909097205e+07
http_request_duration_microseconds_count{handler="prometheus"} 9
# HELP get_token_fail_count Counter of failed Token() requests to the alternate token source
# TYPE get_token_fail_count counter
get_token_fail_count 0
# HELP apiserver_request_latencies Response latency distribution in microseconds for each verb, resource and client.
# TYPE apiserver_request_latencies histogram
apiserver_request_latencies_bucket{resource="bindings",verb="POST",le="125000"} 1994
apiserver_request_latencies_bucket{resource="bindings",verb="POST",le="250000"} 1997
apiserver_request_latencies_bucket{resource="bindings",verb="POST",le="500000"} 2000
apiserver_request_latencies_bucket{resource="bindings",verb="POST",le="1e+06"} 2005
apiserver_request_latencies_bucket{resource="bindings",verb="POST",le="2e+06"} 2012
apiserver_request_latencies_bucket{resource="bindings",verb="POST",le="4e+06"} 2017
apiserver_request_latencies_bucket{resource="bindings",verb="POST",le="8e+06"} 2024
apiserver_request_latencies_bucket{resource="bindings",verb="POST",le="+Inf"} 2025
apiserver_request_latencies_sum{resource="bindings",verb="POST"} 1.02726334e+08
apiserver_request_latencies_count{resource="bindings",verb="POST"} 2025
`
const prometheusMulti = `
cpu,host=foo,datacenter=us-east usage_idle=99,usage_busy=1
cpu,host=foo,datacenter=us-east usage_idle=99,usage_busy=1
cpu,host=foo,datacenter=us-east usage_idle=99,usage_busy=1
cpu,host=foo,datacenter=us-east usage_idle=99,usage_busy=1
cpu,host=foo,datacenter=us-east usage_idle=99,usage_busy=1
cpu,host=foo,datacenter=us-east usage_idle=99,usage_busy=1
cpu,host=foo,datacenter=us-east usage_idle=99,usage_busy=1
`
const prometheusMultiSomeInvalid = `
cpu,host=foo,datacenter=us-east usage_idle=99,usage_busy=1
cpu,host=foo,datacenter=us-east usage_idle=99,usage_busy=1
cpu,host=foo,datacenter=us-east usage_idle=99,usage_busy=1
cpu,cpu=cpu3, host=foo,datacenter=us-east usage_idle=99,usage_busy=1
cpu,cpu=cpu4 , usage_idle=99,usage_busy=1
cpu,host=foo,datacenter=us-east usage_idle=99,usage_busy=1
`
func TestParseValidPrometheus(t *testing.T) {
// Gauge value
metrics, err := Parse([]byte(validUniqueGauge), http.Header{})
assert.NoError(t, err)
assert.Len(t, metrics, 1)
assert.Equal(t, "cadvisor_version_info", metrics[0].Name())
assert.Equal(t, map[string]interface{}{
"gauge": float64(1),
}, metrics[0].Fields())
assert.Equal(t, map[string]string{
"osVersion": "CentOS Linux 7 (Core)",
"cadvisorRevision": "",
"cadvisorVersion": "",
"dockerVersion": "1.8.2",
"kernelVersion": "3.10.0-229.20.1.el7.x86_64",
}, metrics[0].Tags())
// Counter value
metrics, err = Parse([]byte(validUniqueCounter), http.Header{})
assert.NoError(t, err)
assert.Len(t, metrics, 1)
assert.Equal(t, "get_token_fail_count", metrics[0].Name())
assert.Equal(t, map[string]interface{}{
"counter": float64(0),
}, metrics[0].Fields())
assert.Equal(t, map[string]string{}, metrics[0].Tags())
// Summary data
//SetDefaultTags(map[string]string{})
metrics, err = Parse([]byte(validUniqueSummary), http.Header{})
assert.NoError(t, err)
assert.Len(t, metrics, 1)
assert.Equal(t, "http_request_duration_microseconds", metrics[0].Name())
assert.Equal(t, map[string]interface{}{
"0.5": 552048.506,
"0.9": 5.876804288e+06,
"0.99": 5.876804288e+06,
"count": 9.0,
"sum": 1.8909097205e+07,
}, metrics[0].Fields())
assert.Equal(t, map[string]string{"handler": "prometheus"}, metrics[0].Tags())
// histogram data
metrics, err = Parse([]byte(validUniqueHistogram), http.Header{})
assert.NoError(t, err)
assert.Len(t, metrics, 1)
assert.Equal(t, "apiserver_request_latencies", metrics[0].Name())
assert.Equal(t, map[string]interface{}{
"500000": 2000.0,
"count": 2025.0,
"sum": 1.02726334e+08,
"250000": 1997.0,
"2e+06": 2012.0,
"4e+06": 2017.0,
"8e+06": 2024.0,
"+Inf": 2025.0,
"125000": 1994.0,
"1e+06": 2005.0,
}, metrics[0].Fields())
assert.Equal(t,
map[string]string{"verb": "POST", "resource": "bindings"},
metrics[0].Tags())
}

View File

@ -0,0 +1,398 @@
package prometheus
import (
"context"
"errors"
"fmt"
"io/ioutil"
"net"
"net/http"
"net/url"
"sync"
"time"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/common/tls"
)
const acceptHeader = `application/vnd.google.protobuf;proto=io.prometheus.client.MetricFamily;encoding=delimited;q=0.7,text/plain;version=0.0.4;q=0.3,*/*;q=0.1`
type Prometheus struct {
// An array of urls to scrape metrics from.
URLs []string `toml:"urls"`
// An array of Kubernetes services to scrape metrics from.
KubernetesServices []string
// Content of kubernetes config file
KubeConfigContent string
// Label Selector/s for Kubernetes
KubernetesLabelSelector string `toml:"kubernetes_label_selector"`
// Field Selector/s for Kubernetes
KubernetesFieldSelector string `toml:"kubernetes_field_selector"`
// Bearer Token authorization file path
BearerToken string `toml:"bearer_token"`
BearerTokenString string `toml:"bearer_token_string"`
// Basic authentication credentials
Username string `toml:"username"`
Password string `toml:"password"`
ResponseTimeout time.Duration `toml:"response_timeout"`
MetricVersion int `toml:"metric_version"`
URLTag string `toml:"url_tag"`
tls.ClientConfig
Log telegraf.Logger
client *http.Client
// Should we scrape Kubernetes services for prometheus annotations
MonitorPods bool `toml:"monitor_kubernetes_pods"`
PodNamespace string `toml:"monitor_kubernetes_pods_namespace"`
lock sync.Mutex
kubernetesPods map[string]URLAndAddress
cancel context.CancelFunc
wg sync.WaitGroup
}
var sampleConfig = `
## An array of urls to scrape metrics from.
urls = ["http://localhost:9100/metrics"]
## Metric version controls the mapping from Prometheus metrics into
## Telegraf metrics. When using the prometheus_client output, use the same
## value in both plugins to ensure metrics are round-tripped without
## modification.
##
## example: metric_version = 1; deprecated in 1.13
## metric_version = 2; recommended version
# metric_version = 1
## Url tag name (tag containing scrapped url. optional, default is "url")
# url_tag = "scrapeUrl"
## An array of Kubernetes services to scrape metrics from.
# kubernetes_services = ["http://my-service-dns.my-namespace:9100/metrics"]
## Kubernetes config file to create client from.
# kube_config = "/path/to/kubernetes.config"
## Scrape Kubernetes pods for the following prometheus annotations:
## - prometheus.io/scrape: Enable scraping for this pod
## - prometheus.io/scheme: If the metrics endpoint is secured then you will need to
## set this to 'https' & most likely set the tls config.
## - prometheus.io/path: If the metrics path is not /metrics, define it with this annotation.
## - prometheus.io/port: If port is not 9102 use this annotation
# monitor_kubernetes_pods = true
## Restricts Kubernetes monitoring to a single namespace
## ex: monitor_kubernetes_pods_namespace = "default"
# monitor_kubernetes_pods_namespace = ""
# label selector to target pods which have the label
# kubernetes_label_selector = "env=dev,app=nginx"
# field selector to target pods
# eg. To scrape pods on a specific node
# kubernetes_field_selector = "spec.nodeName=$HOSTNAME"
## Use bearer token for authorization. ('bearer_token' takes priority)
# bearer_token = "/path/to/bearer/token"
## OR
# bearer_token_string = "abc_123"
## HTTP Basic Authentication username and password. ('bearer_token' and
## 'bearer_token_string' take priority)
# username = ""
# password = ""
## Specify timeout duration for slower prometheus clients (default is 3s)
# response_timeout = "3s"
## Optional TLS Config
# tls_ca = /path/to/cafile
# tls_cert = /path/to/certfile
# tls_key = /path/to/keyfile
## Use TLS but skip chain & host verification
# insecure_skip_verify = false
`
func (p *Prometheus) SampleConfig() string {
return sampleConfig
}
func (p *Prometheus) Description() string {
return "Read metrics from one or many prometheus clients"
}
func (p *Prometheus) Init() error {
if p.MetricVersion != 2 {
p.Log.Warnf("Use of deprecated configuration: 'metric_version = 1'; please update to 'metric_version = 2'")
}
return nil
}
var ErrProtocolError = errors.New("prometheus protocol error")
func (p *Prometheus) AddressToURL(u *url.URL, address string) *url.URL {
host := address
if u.Port() != "" {
host = address + ":" + u.Port()
}
reconstructedURL := &url.URL{
Scheme: u.Scheme,
Opaque: u.Opaque,
User: u.User,
Path: u.Path,
RawPath: u.RawPath,
ForceQuery: u.ForceQuery,
RawQuery: u.RawQuery,
Fragment: u.Fragment,
Host: host,
}
return reconstructedURL
}
type URLAndAddress struct {
OriginalURL *url.URL
URL *url.URL
Address string
Tags map[string]string
}
func (p *Prometheus) GetAllURLs() (map[string]URLAndAddress, error) {
allURLs := make(map[string]URLAndAddress, 0)
for _, u := range p.URLs {
URL, err := url.Parse(u)
if err != nil {
p.Log.Errorf("Could not parse %q, skipping it. Error: %s", u, err.Error())
continue
}
allURLs[URL.String()] = URLAndAddress{URL: URL, OriginalURL: URL}
}
p.lock.Lock()
defer p.lock.Unlock()
// loop through all pods scraped via the prometheus annotation on the pods
for k, v := range p.kubernetesPods {
allURLs[k] = v
}
for _, service := range p.KubernetesServices {
URL, err := url.Parse(service)
if err != nil {
return nil, err
}
resolvedAddresses, err := net.LookupHost(URL.Hostname())
if err != nil {
p.Log.Errorf("Could not resolve %q, skipping it. Error: %s", URL.Host, err.Error())
continue
}
for _, resolved := range resolvedAddresses {
serviceURL := p.AddressToURL(URL, resolved)
allURLs[serviceURL.String()] = URLAndAddress{
URL: serviceURL,
Address: resolved,
OriginalURL: URL,
}
}
}
return allURLs, nil
}
// Reads stats from all configured servers accumulates stats.
// Returns one of the errors encountered while gather stats (if any).
func (p *Prometheus) Gather(acc telegraf.Accumulator) error {
if p.client == nil {
client, err := p.createHTTPClient()
if err != nil {
return err
}
p.client = client
}
var wg sync.WaitGroup
allURLs, err := p.GetAllURLs()
if err != nil {
return err
}
for _, URL := range allURLs {
wg.Add(1)
go func(serviceURL URLAndAddress) {
defer wg.Done()
acc.AddError(p.gatherURL(serviceURL, acc))
}(URL)
}
wg.Wait()
return nil
}
func (p *Prometheus) createHTTPClient() (*http.Client, error) {
tlsCfg, err := p.ClientConfig.TLSConfig()
if err != nil {
return nil, err
}
client := &http.Client{
Transport: &http.Transport{
TLSClientConfig: tlsCfg,
DisableKeepAlives: true,
},
Timeout: p.ResponseTimeout,
}
return client, nil
}
func (p *Prometheus) gatherURL(u URLAndAddress, acc telegraf.Accumulator) error {
var req *http.Request
var err error
var uClient *http.Client
var metrics []telegraf.Metric
if u.URL.Scheme == "unix" {
path := u.URL.Query().Get("path")
if path == "" {
path = "/metrics"
}
addr := "http://localhost" + path
req, err = http.NewRequest("GET", addr, nil)
if err != nil {
return fmt.Errorf("unable to create new request '%s': %s", addr, err)
}
// ignore error because it's been handled before getting here
tlsCfg, _ := p.ClientConfig.TLSConfig()
uClient = &http.Client{
Transport: &http.Transport{
TLSClientConfig: tlsCfg,
DisableKeepAlives: true,
Dial: func(network, addr string) (net.Conn, error) {
c, err := net.Dial("unix", u.URL.Path)
return c, err
},
},
Timeout: p.ResponseTimeout,
}
} else {
if u.URL.Path == "" {
u.URL.Path = "/metrics"
}
req, err = http.NewRequest("GET", u.URL.String(), nil)
if err != nil {
return fmt.Errorf("unable to create new request '%s': %s", u.URL.String(), err)
}
}
req.Header.Add("Accept", acceptHeader)
if p.BearerToken != "" {
token, err := ioutil.ReadFile(p.BearerToken)
if err != nil {
return err
}
req.Header.Set("Authorization", "Bearer "+string(token))
} else if p.BearerTokenString != "" {
req.Header.Set("Authorization", "Bearer "+p.BearerTokenString)
} else if p.Username != "" || p.Password != "" {
req.SetBasicAuth(p.Username, p.Password)
}
var resp *http.Response
if u.URL.Scheme != "unix" {
resp, err = p.client.Do(req)
} else {
resp, err = uClient.Do(req)
}
if err != nil {
return fmt.Errorf("error making HTTP request to %s: %s", u.URL, err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("%s returned HTTP status %s", u.URL, resp.Status)
}
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return fmt.Errorf("error reading body: %s", err)
}
if p.MetricVersion == 2 {
metrics, err = ParseV2(body, resp.Header)
} else {
metrics, err = Parse(body, resp.Header)
}
if err != nil {
return fmt.Errorf("error reading metrics for %s: %s",
u.URL, err)
}
for _, metric := range metrics {
tags := metric.Tags()
// strip user and password from URL
u.OriginalURL.User = nil
if p.URLTag != "" {
tags[p.URLTag] = u.OriginalURL.String()
}
if u.Address != "" {
tags["address"] = u.Address
}
for k, v := range u.Tags {
tags[k] = v
}
switch metric.Type() {
case telegraf.Counter:
acc.AddCounter(metric.Name(), metric.Fields(), tags, metric.Time())
case telegraf.Gauge:
acc.AddGauge(metric.Name(), metric.Fields(), tags, metric.Time())
case telegraf.Summary:
acc.AddSummary(metric.Name(), metric.Fields(), tags, metric.Time())
case telegraf.Histogram:
acc.AddHistogram(metric.Name(), metric.Fields(), tags, metric.Time())
default:
acc.AddFields(metric.Name(), metric.Fields(), tags, metric.Time())
}
}
return nil
}
// Start will start the Kubernetes scraping if enabled in the configuration
func (p *Prometheus) Start(a telegraf.Accumulator) error {
if p.MonitorPods {
var ctx context.Context
ctx, p.cancel = context.WithCancel(context.Background())
return p.start(ctx)
}
return nil
}
func (p *Prometheus) Stop() {
if p.MonitorPods {
p.cancel()
}
p.wg.Wait()
}
/*
func init() {
inputs.Add("prometheus", func() telegraf.Input {
return &Prometheus{
ResponseTimeout: internal.Duration{Duration: time.Second * 3},
kubernetesPods: map[string]URLAndAddress{},
URLTag: "url",
}
})
}
*/

View File

@ -0,0 +1,236 @@
package prometheus
import (
"fmt"
"math"
"net/http"
"net/http/httptest"
"net/url"
"testing"
"time"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
const sampleTextFormat = `# HELP go_gc_duration_seconds A summary of the GC invocation durations.
# TYPE go_gc_duration_seconds summary
go_gc_duration_seconds{quantile="0"} 0.00010425500000000001
go_gc_duration_seconds{quantile="0.25"} 0.000139108
go_gc_duration_seconds{quantile="0.5"} 0.00015749400000000002
go_gc_duration_seconds{quantile="0.75"} 0.000331463
go_gc_duration_seconds{quantile="1"} 0.000667154
go_gc_duration_seconds_sum 0.0018183950000000002
go_gc_duration_seconds_count 7
# HELP go_goroutines Number of goroutines that currently exist.
# TYPE go_goroutines gauge
go_goroutines 15
# HELP test_metric An untyped metric with a timestamp
# TYPE test_metric untyped
test_metric{label="value"} 1.0 1490802350000
`
const sampleSummaryTextFormat = `# HELP go_gc_duration_seconds A summary of the GC invocation durations.
# TYPE go_gc_duration_seconds summary
go_gc_duration_seconds{quantile="0"} 0.00010425500000000001
go_gc_duration_seconds{quantile="0.25"} 0.000139108
go_gc_duration_seconds{quantile="0.5"} 0.00015749400000000002
go_gc_duration_seconds{quantile="0.75"} 0.000331463
go_gc_duration_seconds{quantile="1"} 0.000667154
go_gc_duration_seconds_sum 0.0018183950000000002
go_gc_duration_seconds_count 7
`
const sampleGaugeTextFormat = `
# HELP go_goroutines Number of goroutines that currently exist.
# TYPE go_goroutines gauge
go_goroutines 15 1490802350000
`
func TestPrometheusGeneratesMetrics(t *testing.T) {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
fmt.Fprintln(w, sampleTextFormat)
}))
defer ts.Close()
p := &Prometheus{
Log: testutil.Logger{},
URLs: []string{ts.URL},
URLTag: "url",
}
var acc testutil.Accumulator
err := acc.GatherError(p.Gather)
require.NoError(t, err)
assert.True(t, acc.HasFloatField("go_gc_duration_seconds", "count"))
assert.True(t, acc.HasFloatField("go_goroutines", "gauge"))
assert.True(t, acc.HasFloatField("test_metric", "value"))
assert.True(t, acc.HasTimestamp("test_metric", time.Unix(1490802350, 0)))
assert.False(t, acc.HasTag("test_metric", "address"))
assert.True(t, acc.TagValue("test_metric", "url") == ts.URL+"/metrics")
}
func TestPrometheusGeneratesMetricsWithHostNameTag(t *testing.T) {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
fmt.Fprintln(w, sampleTextFormat)
}))
defer ts.Close()
p := &Prometheus{
Log: testutil.Logger{},
KubernetesServices: []string{ts.URL},
URLTag: "url",
}
u, _ := url.Parse(ts.URL)
tsAddress := u.Hostname()
var acc testutil.Accumulator
err := acc.GatherError(p.Gather)
require.NoError(t, err)
assert.True(t, acc.HasFloatField("go_gc_duration_seconds", "count"))
assert.True(t, acc.HasFloatField("go_goroutines", "gauge"))
assert.True(t, acc.HasFloatField("test_metric", "value"))
assert.True(t, acc.HasTimestamp("test_metric", time.Unix(1490802350, 0)))
assert.True(t, acc.TagValue("test_metric", "address") == tsAddress)
assert.True(t, acc.TagValue("test_metric", "url") == ts.URL)
}
func TestPrometheusGeneratesMetricsAlthoughFirstDNSFails(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
fmt.Fprintln(w, sampleTextFormat)
}))
defer ts.Close()
p := &Prometheus{
Log: testutil.Logger{},
URLs: []string{ts.URL},
KubernetesServices: []string{"http://random.telegraf.local:88/metrics"},
}
var acc testutil.Accumulator
err := acc.GatherError(p.Gather)
require.NoError(t, err)
assert.True(t, acc.HasFloatField("go_gc_duration_seconds", "count"))
assert.True(t, acc.HasFloatField("go_goroutines", "gauge"))
assert.True(t, acc.HasFloatField("test_metric", "value"))
assert.True(t, acc.HasTimestamp("test_metric", time.Unix(1490802350, 0)))
}
func TestPrometheusGeneratesSummaryMetricsV2(t *testing.T) {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
fmt.Fprintln(w, sampleSummaryTextFormat)
}))
defer ts.Close()
p := &Prometheus{
URLs: []string{ts.URL},
URLTag: "url",
MetricVersion: 2,
}
var acc testutil.Accumulator
err := acc.GatherError(p.Gather)
require.NoError(t, err)
assert.True(t, acc.TagSetValue("prometheus", "quantile") == "0")
assert.True(t, acc.HasFloatField("prometheus", "go_gc_duration_seconds_sum"))
assert.True(t, acc.HasFloatField("prometheus", "go_gc_duration_seconds_count"))
assert.True(t, acc.TagValue("prometheus", "url") == ts.URL+"/metrics")
}
func TestSummaryMayContainNaN(t *testing.T) {
const data = `# HELP go_gc_duration_seconds A summary of the GC invocation durations.
# TYPE go_gc_duration_seconds summary
go_gc_duration_seconds{quantile="0"} NaN
go_gc_duration_seconds{quantile="1"} NaN
go_gc_duration_seconds_sum 42.0
go_gc_duration_seconds_count 42
`
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
fmt.Fprintln(w, data)
}))
defer ts.Close()
p := &Prometheus{
URLs: []string{ts.URL},
URLTag: "",
MetricVersion: 2,
}
var acc testutil.Accumulator
err := p.Gather(&acc)
require.NoError(t, err)
expected := []telegraf.Metric{
testutil.MustMetric(
"prometheus",
map[string]string{
"quantile": "0",
},
map[string]interface{}{
"go_gc_duration_seconds": math.NaN(),
},
time.Unix(0, 0),
telegraf.Summary,
),
testutil.MustMetric(
"prometheus",
map[string]string{
"quantile": "1",
},
map[string]interface{}{
"go_gc_duration_seconds": math.NaN(),
},
time.Unix(0, 0),
telegraf.Summary,
),
testutil.MustMetric(
"prometheus",
map[string]string{},
map[string]interface{}{
"go_gc_duration_seconds_sum": 42.0,
"go_gc_duration_seconds_count": 42.0,
},
time.Unix(0, 0),
telegraf.Summary,
),
}
testutil.RequireMetricsEqual(t, expected, acc.GetTelegrafMetrics(),
testutil.IgnoreTime(), testutil.SortMetrics())
}
func TestPrometheusGeneratesGaugeMetricsV2(t *testing.T) {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
fmt.Fprintln(w, sampleGaugeTextFormat)
}))
defer ts.Close()
p := &Prometheus{
URLs: []string{ts.URL},
URLTag: "url",
MetricVersion: 2,
}
var acc testutil.Accumulator
err := acc.GatherError(p.Gather)
require.NoError(t, err)
assert.True(t, acc.HasFloatField("prometheus", "go_goroutines"))
assert.True(t, acc.TagValue("prometheus", "url") == ts.URL+"/metrics")
assert.True(t, acc.HasTimestamp("prometheus", time.Unix(1490802350, 0)))
}

View File

@ -0,0 +1,56 @@
package prometheus
import (
"context"
"fmt"
"net/http"
"testing"
"time"
"github.com/didi/nightingale/src/modules/monapi/plugins"
)
const sampleTextFormat = `# HELP test_metric An untyped metric with a timestamp
# TYPE test_metric untyped
test_metric{label="value"} 1.0 1490802350000
# HELP helo_stats_test_timer helo_stats_test_timer summary
# TYPE helo_stats_test_timer summary
helo_stats_test_timer{region="bj",zone="test_1",quantile="0.5"} 0.501462767
helo_stats_test_timer{region="bj",zone="test_1",quantile="0.75"} 0.751876572
helo_stats_test_timer{region="bj",zone="test_1",quantile="0.95"} 0.978413628
helo_stats_test_timer{region="bj",zone="test_1",quantile="0.99"} 0.989530661
helo_stats_test_timer{region="bj",zone="test_1",quantile="0.999"} 0.989530661
helo_stats_test_timer_sum{region="bj",zone="test_1"} 39.169514066999994
helo_stats_test_timer_count{region="bj",zone="test_1"} 74
# HELP helo_stats_test_histogram helo_stats_test_histogram histogram
# TYPE helo_stats_test_histogram histogram
helo_stats_test_histogram_bucket{region="bj",zone="test_1",le="0"} 0
helo_stats_test_histogram_bucket{region="bj",zone="test_1",le="0.05"} 0
helo_stats_test_histogram_bucket{region="bj",zone="test_1",le="0.1"} 2
helo_stats_test_histogram_bucket{region="bj",zone="test_1",le="0.25"} 13
helo_stats_test_histogram_bucket{region="bj",zone="test_1",le="0.5"} 24
helo_stats_test_histogram_bucket{region="bj",zone="test_1",le="1"} 56
helo_stats_test_histogram_bucket{region="bj",zone="test_1",le="3"} 56
helo_stats_test_histogram_bucket{region="bj",zone="test_1",le="6"} 56
helo_stats_test_histogram_bucket{region="bj",zone="test_1",le="+Inf"} 56
helo_stats_test_histogram_sum{region="bj",zone="test_1"} 40.45
helo_stats_test_histogram_count{region="bj",zone="test_1"} 56
# HELP go_goroutines Number of goroutines that currently exist.
# TYPE go_goroutines gauge
go_goroutines 15 1490802350000
`
func TestCollect(t *testing.T) {
http.HandleFunc("/metrics", func(w http.ResponseWriter, r *http.Request) { fmt.Fprintf(w, sampleTextFormat) })
server := &http.Server{Addr: ":18080"}
go func() {
server.ListenAndServe()
}()
defer server.Shutdown(context.Background())
time.Sleep(time.Millisecond * 100)
plugins.PluginTest(t, &PrometheusRule{
URLs: []string{"http://localhost:18080/metrics"},
})
}

View File

@ -2,7 +2,11 @@ package plugins
import (
"fmt"
"testing"
"github.com/didi/nightingale/src/common/dataobj"
"github.com/didi/nightingale/src/modules/prober/manager"
"github.com/influxdata/telegraf"
"github.com/toolkits/pkg/logger"
)
@ -39,3 +43,29 @@ func (l *Logger) Infof(format string, args ...interface{}) {
func (l *Logger) Info(args ...interface{}) {
logger.LogDepth(logger.INFO, 1, fmt.Sprint(args...))
}
type telegrafPlugin interface {
TelegrafInput() (telegraf.Input, error)
}
func PluginTest(t *testing.T, plugin telegrafPlugin) {
metrics := []*dataobj.MetricValue{}
input, err := plugin.TelegrafInput()
if err != nil {
t.Error(err)
}
acc, err := manager.NewAccumulator(manager.AccumulatorOptions{Name: "github-test", Metrics: &metrics})
if err != nil {
t.Error(err)
}
if err = input.Gather(acc); err != nil {
t.Error(err)
}
for k, v := range metrics {
t.Logf("%d %s %s %f", k, v.CounterType, v.PK(), v.Value)
}
}

View File

@ -16,6 +16,7 @@ import (
var (
pluginConfigs map[string]*PluginConfig
defaultPluginConfigContent = []byte("mode: whitelist # whitelist(default),all")
)
const (
@ -84,8 +85,15 @@ func InitPluginsConfig(cf *ConfYaml) {
b, err = ioutil.ReadFile(file)
}
if err != nil {
logger.Debugf("readfile %s err %s", plugin, err)
file = filepath.Join(cf.PluginsConfig, plugin+".local.yml")
err = ioutil.WriteFile(file, defaultPluginConfigContent, 0644)
if err != nil {
logger.Warningf("create plugin config %s err %s", file, err)
continue
} else {
logger.Infof("create plugin config %s", file)
b = defaultPluginConfigContent
}
}
if err := yaml.Unmarshal(b, &c); err != nil {

View File

@ -0,0 +1,258 @@
package manager
import (
"fmt"
"strings"
"sync"
"time"
"github.com/didi/nightingale/src/common/dataobj"
"github.com/influxdata/telegraf"
"github.com/toolkits/pkg/logger"
)
type AccumulatorOptions struct {
Name string
Tags map[string]string
Metrics *[]*dataobj.MetricValue
}
func (p *AccumulatorOptions) Validate() error {
if p.Name == "" {
return fmt.Errorf("unable to get Name")
}
if p.Metrics == nil {
return fmt.Errorf("unable to get metrics")
}
return nil
}
// NewAccumulator return telegraf.Accumulator
func NewAccumulator(opt AccumulatorOptions) (telegraf.Accumulator, error) {
if err := opt.Validate(); err != nil {
return nil, err
}
return &accumulator{
name: opt.Name,
tags: opt.Tags,
metrics: opt.Metrics,
precision: time.Second,
}, nil
}
type accumulator struct {
sync.RWMutex
name string
tags map[string]string
precision time.Duration
metrics *[]*dataobj.MetricValue
}
func (p *accumulator) AddFields(
measurement string,
fields map[string]interface{},
tags map[string]string,
t ...time.Time,
) {
p.addFields(measurement, tags, fields, telegraf.Untyped, t...)
}
func (p *accumulator) AddGauge(
measurement string,
fields map[string]interface{},
tags map[string]string,
t ...time.Time,
) {
p.addFields(measurement, tags, fields, telegraf.Gauge, t...)
}
func (p *accumulator) AddCounter(
measurement string,
fields map[string]interface{},
tags map[string]string,
t ...time.Time,
) {
p.addFields(measurement, tags, fields, telegraf.Counter, t...)
}
func (p *accumulator) AddSummary(
measurement string,
fields map[string]interface{},
tags map[string]string,
t ...time.Time,
) {
p.addFields(measurement, tags, fields, telegraf.Summary, t...)
}
func (p *accumulator) AddHistogram(
measurement string,
fields map[string]interface{},
tags map[string]string,
t ...time.Time,
) {
p.addFields(measurement, tags, fields, telegraf.Histogram, t...)
}
func (p *accumulator) AddMetric(m telegraf.Metric) {
m.SetTime(m.Time().Round(p.precision))
if metrics := p.makeMetric(m); m != nil {
p.pushMetrics(metrics)
}
}
func (p *accumulator) SetPrecision(precision time.Duration) {
p.precision = precision
}
// AddError passes a runtime error to the accumulator.
// The error will be tagged with the plugin name and written to the log.
func (p *accumulator) AddError(err error) {
if err == nil {
return
}
logger.Debugf("accumulator %s Error: %s", p.name, err)
}
func (p *accumulator) WithTracking(maxTracked int) telegraf.TrackingAccumulator {
return nil
}
func (p *accumulator) addFields(
measurement string,
tags map[string]string,
fields map[string]interface{},
tp telegraf.ValueType,
t ...time.Time,
) {
m, err := NewMetric(measurement, tags, fields, p.getTime(t), tp)
if err != nil {
return
}
if metrics := p.makeMetric(m); m != nil {
p.pushMetrics(metrics)
}
}
func (p *accumulator) getTime(t []time.Time) time.Time {
var timestamp time.Time
if len(t) > 0 {
timestamp = t[0]
} else {
timestamp = time.Now()
}
return timestamp.Round(p.precision)
}
// https://docs.influxdata.com/telegraf/v1.14/data_formats/output/prometheus/
func (p *accumulator) makeMetric(metric telegraf.Metric) []*dataobj.MetricValue {
tags := map[string]string{}
for _, v := range metric.TagList() {
tags[v.Key] = v.Value
}
for k, v := range p.tags {
tags[k] = v
}
switch metric.Type() {
case telegraf.Counter:
return makeCounter(metric, tags)
case telegraf.Summary, telegraf.Histogram:
logger.Debugf("unsupported type summary, histogram, skip")
return nil
// return makeSummary(metric, tags)
default:
return makeGauge(metric, tags)
}
}
func makeSummary(metric telegraf.Metric, tags map[string]string) []*dataobj.MetricValue {
name := metric.Name()
ts := metric.Time().Unix()
fields := metric.Fields()
ms := make([]*dataobj.MetricValue, 0, len(fields))
for k, v := range fields {
f, ok := v.(float64)
if !ok {
continue
}
countType := "GAUGE"
if strings.HasSuffix(k, "_count") ||
strings.HasSuffix(k, "_sum") {
countType = "COUNTER"
}
ms = append(ms, &dataobj.MetricValue{
Metric: name + "_" + k,
CounterType: countType,
Timestamp: ts,
TagsMap: tags,
Value: f,
ValueUntyped: f,
})
}
return ms
}
func makeCounter(metric telegraf.Metric, tags map[string]string) []*dataobj.MetricValue {
name := metric.Name()
ts := metric.Time().Unix()
fields := metric.Fields()
ms := make([]*dataobj.MetricValue, 0, len(fields))
for k, v := range fields {
f, ok := v.(float64)
if !ok {
continue
}
ms = append(ms, &dataobj.MetricValue{
Metric: name + "_" + k,
CounterType: "COUNTER",
Timestamp: ts,
TagsMap: tags,
Value: f,
ValueUntyped: f,
})
}
return ms
}
func makeGauge(metric telegraf.Metric, tags map[string]string) []*dataobj.MetricValue {
name := metric.Name()
ts := metric.Time().Unix()
fields := metric.Fields()
ms := make([]*dataobj.MetricValue, 0, len(fields))
for k, v := range fields {
f, ok := v.(float64)
if !ok {
continue
}
ms = append(ms, &dataobj.MetricValue{
Metric: name + "_" + k,
CounterType: "GAUGE",
Timestamp: ts,
TagsMap: tags,
Value: f,
ValueUntyped: f,
})
}
return ms
}
func (p *accumulator) pushMetrics(metrics []*dataobj.MetricValue) {
p.Lock()
defer p.Unlock()
*p.metrics = append(*p.metrics, metrics...)
}

View File

@ -1,9 +1,9 @@
package manager
import (
"fmt"
"strconv"
"sync"
"time"
"github.com/didi/nightingale/src/common/dataobj"
"github.com/didi/nightingale/src/models"
@ -13,14 +13,14 @@ import (
"github.com/toolkits/pkg/logger"
)
// not thread-safe
type collectRule struct {
sync.RWMutex
telegraf.Input
*models.CollectRule
input telegraf.Input
acc telegraf.Accumulator
metrics *[]*dataobj.MetricValue
tags map[string]string
precision time.Duration
metrics []*dataobj.MetricValue
lastAt int64
updatedAt int64
}
@ -41,43 +41,76 @@ func newCollectRule(rule *models.CollectRule) (*collectRule, error) {
return nil, err
}
metrics := []*dataobj.MetricValue{}
acc, err := NewAccumulator(AccumulatorOptions{
Name: fmt.Sprintf("%s-%d", rule.CollectType, rule.Id),
Tags: tags,
Metrics: &metrics})
if err != nil {
return nil, err
}
return &collectRule{
Input: input,
CollectRule: rule,
input: input,
acc: acc,
metrics: &metrics,
tags: tags,
metrics: []*dataobj.MetricValue{},
precision: time.Second,
updatedAt: rule.UpdatedAt,
}, nil
}
func (p *collectRule) reset() {
p.Lock()
defer p.Unlock()
*p.metrics = (*p.metrics)[:0]
}
func (p *collectRule) Metrics() []*dataobj.MetricValue {
p.RLock()
defer p.RUnlock()
return *p.metrics
}
// prepareMetrics
func (p *collectRule) prepareMetrics() error {
if len(p.metrics) == 0 {
return nil
func (p *collectRule) prepareMetrics() (metrics []*dataobj.MetricValue, err error) {
p.RLock()
defer p.RUnlock()
if len(*p.metrics) == 0 {
return
}
ts := p.metrics[0].Timestamp
metrics = *p.metrics
ts := metrics[0].Timestamp
nid := strconv.FormatInt(p.Nid, 10)
pluginConfig, ok := config.GetPluginConfig(p.PluginName())
if !ok {
return nil
return
}
if pluginConfig.Mode == config.PluginModeWhitelist && len(pluginConfig.Metrics) == 0 {
return
}
vars := map[string]*dataobj.MetricValue{}
for _, v := range p.metrics {
for _, v := range metrics {
logger.Debugf("get v[%s] %f", v.Metric, v.Value)
vars[v.Metric] = v
}
p.metrics = p.metrics[:0]
metrics = metrics[:0]
for _, metric := range pluginConfig.ExprMetrics {
f, err := metric.Calc(vars)
if err != nil {
logger.Debugf("calc err %s", err)
continue
}
p.metrics = append(p.metrics, &dataobj.MetricValue{
metrics = append(metrics, &dataobj.MetricValue{
Nid: nid,
Metric: metric.Name,
Timestamp: ts,
@ -91,7 +124,7 @@ func (p *collectRule) prepareMetrics() error {
for k, v := range vars {
if metric, ok := pluginConfig.Metrics[k]; ok {
p.metrics = append(p.metrics, &dataobj.MetricValue{
metrics = append(metrics, &dataobj.MetricValue{
Nid: nid,
Metric: k,
Timestamp: ts,
@ -105,7 +138,7 @@ func (p *collectRule) prepareMetrics() error {
if pluginConfig.Mode == config.PluginModeWhitelist {
continue
}
p.metrics = append(p.metrics, &dataobj.MetricValue{
metrics = append(metrics, &dataobj.MetricValue{
Nid: nid,
Metric: k,
Timestamp: ts,
@ -115,13 +148,15 @@ func (p *collectRule) prepareMetrics() error {
Value: v.Value,
ValueUntyped: v.ValueUntyped,
})
}
}
return nil
return
}
func (p *collectRule) update(rule *models.CollectRule) error {
p.Lock()
defer p.Unlock()
if p.updatedAt == rule.UpdatedAt {
return nil
}
@ -139,145 +174,18 @@ func (p *collectRule) update(rule *models.CollectRule) error {
return err
}
p.Input = input
acc, err := NewAccumulator(AccumulatorOptions{
Name: fmt.Sprintf("%s-%d", rule.CollectType, rule.Id),
Tags: tags,
Metrics: p.metrics})
if err != nil {
return err
}
p.input = input
p.CollectRule = rule
p.tags = tags
p.acc = acc
p.UpdatedAt = rule.UpdatedAt
return nil
}
// https://docs.influxdata.com/telegraf/v1.14/data_formats/output/prometheus/
func (p *collectRule) MakeMetric(metric telegraf.Metric) []*dataobj.MetricValue {
tags := map[string]string{}
for _, v := range metric.TagList() {
tags[v.Key] = v.Value
}
for k, v := range p.tags {
tags[k] = v
}
name := metric.Name()
ts := metric.Time().Unix()
fields := metric.Fields()
ms := make([]*dataobj.MetricValue, 0, len(fields))
for k, v := range fields {
f, ok := v.(float64)
if !ok {
continue
}
ms = append(ms, &dataobj.MetricValue{
Metric: name + "_" + k,
Timestamp: ts,
TagsMap: tags,
Value: f,
ValueUntyped: f,
})
}
return ms
}
func (p *collectRule) AddFields(
measurement string,
fields map[string]interface{},
tags map[string]string,
t ...time.Time,
) {
p.addFields(measurement, tags, fields, telegraf.Untyped, t...)
}
func (p *collectRule) AddGauge(
measurement string,
fields map[string]interface{},
tags map[string]string,
t ...time.Time,
) {
p.addFields(measurement, tags, fields, telegraf.Gauge, t...)
}
func (p *collectRule) AddCounter(
measurement string,
fields map[string]interface{},
tags map[string]string,
t ...time.Time,
) {
p.addFields(measurement, tags, fields, telegraf.Counter, t...)
}
func (p *collectRule) AddSummary(
measurement string,
fields map[string]interface{},
tags map[string]string,
t ...time.Time,
) {
p.addFields(measurement, tags, fields, telegraf.Summary, t...)
}
func (p *collectRule) AddHistogram(
measurement string,
fields map[string]interface{},
tags map[string]string,
t ...time.Time,
) {
p.addFields(measurement, tags, fields, telegraf.Histogram, t...)
}
func (p *collectRule) AddMetric(m telegraf.Metric) {
m.SetTime(m.Time().Round(p.precision))
if metrics := p.MakeMetric(m); m != nil {
p.pushMetrics(metrics)
}
}
func (p *collectRule) pushMetrics(metrics []*dataobj.MetricValue) {
p.Lock()
defer p.Unlock()
p.metrics = append(p.metrics, metrics...)
}
func (p *collectRule) addFields(
measurement string,
tags map[string]string,
fields map[string]interface{},
tp telegraf.ValueType,
t ...time.Time,
) {
m, err := NewMetric(measurement, tags, fields, p.getTime(t), tp)
if err != nil {
return
}
if metrics := p.MakeMetric(m); m != nil {
p.pushMetrics(metrics)
}
}
// AddError passes a runtime error to the accumulator.
// The error will be tagged with the plugin name and written to the log.
func (p *collectRule) AddError(err error) {
if err == nil {
return
}
logger.Debugf("collectRule %s.%s(%d) Error: %s", p.CollectType, p.Name, p.Id, err)
}
func (p *collectRule) SetPrecision(precision time.Duration) {
p.precision = precision
}
func (p *collectRule) getTime(t []time.Time) time.Time {
var timestamp time.Time
if len(t) > 0 {
timestamp = t[0]
} else {
timestamp = time.Now()
}
return timestamp.Round(p.precision)
}
func (p *collectRule) WithTracking(maxTracked int) telegraf.TrackingAccumulator {
return nil
}

View File

@ -3,6 +3,7 @@ package manager
import (
"container/heap"
"context"
"fmt"
"log"
"time"
@ -167,7 +168,7 @@ func (p *worker) loop(id int) {
return
case rule := <-p.collectRuleCh:
if err := p.do(rule); err != nil {
log.Printf("work[%d].do err %s", id, err)
logger.Debugf("work[%d].do %s", id, err)
}
}
}
@ -175,19 +176,22 @@ func (p *worker) loop(id int) {
}
func (p *worker) do(rule *collectRule) error {
rule.metrics = rule.metrics[:0]
rule.reset()
// telegraf
err := rule.Input.Gather(rule)
if len(rule.metrics) == 0 {
return err
err := rule.input.Gather(rule.acc)
if err != nil {
return fmt.Errorf("gather %s", err)
}
// eval expression metrics
rule.prepareMetrics()
metrics, err := rule.prepareMetrics()
if err != nil {
return fmt.Errorf("prepareMetrics %s", err)
}
// send
core.Push(rule.metrics)
// push to transfer
core.Push(metrics)
return err
}