1. add kafka metrics collector
2. change Sample funcs from package inputs to types to avoid cycle imports 3. add runtimex.Stack to print stack detail when panic
@ -3,6 +3,8 @@ package agent
import (
// auto registry
_ "flashcat.cloud/categraf/inputs/conntrack"
_ "flashcat.cloud/categraf/inputs/cpu"
@ -12,6 +14,7 @@ import (
_ "flashcat.cloud/categraf/inputs/elasticsearch"
_ "flashcat.cloud/categraf/inputs/exec"
_ "flashcat.cloud/categraf/inputs/http_response"
_ "flashcat.cloud/categraf/inputs/kafka"
_ "flashcat.cloud/categraf/inputs/kernel"
_ "flashcat.cloud/categraf/inputs/kernel_vmstat"
_ "flashcat.cloud/categraf/inputs/kubernetes"
@ -35,7 +38,6 @@ import (
_ "flashcat.cloud/categraf/inputs/system"
_ "flashcat.cloud/categraf/inputs/tomcat"
_ "flashcat.cloud/categraf/inputs/zookeeper"
type Agent struct {
@ -12,9 +12,6 @@ import (
coreconfig "flashcat.cloud/categraf/config"
logsconfig "flashcat.cloud/categraf/config/logs"
@ -23,9 +20,11 @@ import (
logService "flashcat.cloud/categraf/logs/service"
coreconfig "flashcat.cloud/categraf/config"
logsconfig "flashcat.cloud/categraf/config/logs"
logService "flashcat.cloud/categraf/logs/service"
// LogAgent represents the data pipeline that collects, decodes,
@ -44,7 +43,7 @@ type LogAgent struct {
// NewAgent returns a new Logs LogAgent
func NewLogAgent(sources *logsconfig.LogSources, services *service.Services, processingRules []*logsconfig.ProcessingRule, endpoints *logsconfig.Endpoints) *LogAgent {
func NewLogAgent(sources *logsconfig.LogSources, services *logService.Services, processingRules []*logsconfig.ProcessingRule, endpoints *logsconfig.Endpoints) *LogAgent {
// setup the auditor
// We pass the health handle to the auditor because it's the end of the pipeline and the most
// critical part. Arguably it could also be plugged to the destination.
@ -42,18 +42,18 @@ func (a *Agent) startMetricsAgent() error {
// construct input instance
instance := creator()
inp := creator()
// set configurations for input instance
cfg.LoadConfigs(path.Join(config.Config.ConfigDir, inputFilePrefix+name), instance)
cfg.LoadConfigs(path.Join(config.Config.ConfigDir, inputFilePrefix+name), inp)
if err = instance.Init(); err != nil {
if err = inp.Init(); err != nil {
if !errors.Is(err, types.ErrInstancesEmpty) {
log.Println("E! failed to init input:", name, "error:", err)
reader := NewInputReader(instance)
reader := NewInputReader(inp)
a.InputReaders[name] = reader
@ -9,6 +9,7 @@ import (
@ -69,7 +70,7 @@ func (r *InputReader) gatherOnce() {
if strings.Contains(fmt.Sprint(r), "closed channel") {
} else {
log.Println("E! gather metrics panic:", r)
log.Println("E! gather metrics panic:", r, string(runtimex.Stack(3)))
@ -92,6 +93,9 @@ func (r *InputReader) gatherOnce() {
s := samples[i].(*types.Sample)
if s == nil {
if s.Timestamp.IsZero() {
s.Timestamp = now
@ -0,0 +1,12 @@
# # collect interval
# interval = 15
# !!! uncomment [[instances]] to enable this plugin
# # interval = global.interval * interval_times
# interval_times = 1
# append some labels to metrics
labels = { cluster="cloud-n9e-kafka" }
kafka_uris = ["","",""]
@ -9,6 +9,7 @@ require (
github.com/coreos/go-systemd v0.0.0-20191104093116-d3cd4ed1dbcf
github.com/docker/docker v20.10.16+incompatible
github.com/gaochao1/sw v1.0.0
github.com/go-kit/kit v0.11.0
github.com/go-ping/ping v0.0.0-20211130115550-779d1e919534
github.com/go-redis/redis/v8 v8.11.5
github.com/go-sql-driver/mysql v1.6.0
@ -22,6 +23,7 @@ require (
github.com/jmoiron/sqlx v1.3.5
github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51
github.com/koding/multiconfig v0.0.0-20171124222453-69c27309b2d7
github.com/krallistic/kazoo-go v0.0.0-20170526135507-a15279744f4e
github.com/matttproud/golang_protobuf_extensions v1.0.2-0.20181231171920-c182affec369
github.com/open-telemetry/opentelemetry-collector-contrib/exporter/alibabacloudlogserviceexporter v0.54.0
github.com/open-telemetry/opentelemetry-collector-contrib/exporter/jaegerexporter v0.54.0
@ -46,6 +48,7 @@ require (
github.com/stretchr/testify v1.7.4
github.com/toolkits/pkg v1.3.0
github.com/ulricqin/gosnmp v0.0.1
github.com/xdg/scram v1.0.5
go.opentelemetry.io/collector v0.54.0
go.opentelemetry.io/otel/metric v0.30.0
go.opentelemetry.io/otel/trace v1.7.0
@ -90,7 +93,6 @@ require (
github.com/felixge/httpsnoop v1.0.2 // indirect
github.com/freedomkk-qfeng/go-fastping v0.0.0-20160109021039-d7bb493dee3e // indirect
github.com/fsnotify/fsnotify v1.5.4 // indirect
github.com/go-kit/kit v0.11.0 // indirect
github.com/go-kit/log v0.2.0 // indirect
github.com/go-logfmt/logfmt v0.5.1 // indirect
github.com/go-logr/logr v1.2.3 // indirect
@ -158,6 +160,7 @@ require (
github.com/prometheus/statsd_exporter v0.21.0 // indirect
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 // indirect
github.com/rs/cors v1.8.2 // indirect
github.com/samuel/go-zookeeper v0.0.0-20190923202752-2cc03de413da // indirect
github.com/shopspring/decimal v1.3.1 // indirect
github.com/sirupsen/logrus v1.8.1 // indirect
github.com/spf13/afero v1.8.2 // indirect
@ -176,6 +179,7 @@ require (
github.com/xdg-go/pbkdf2 v1.0.0 // indirect
github.com/xdg-go/scram v1.1.1 // indirect
github.com/xdg-go/stringprep v1.0.3 // indirect
github.com/xdg/stringprep v1.0.3 // indirect
github.com/yusufpapurcu/wmi v1.2.2 // indirect
go.opencensus.io v0.23.0 // indirect
go.opentelemetry.io/collector/pdata v0.54.0 // indirect
@ -199,4 +203,7 @@ require (
gotest.tools/v3 v3.2.0 // indirect
replace go.opentelemetry.io/collector => github.com/flashcatcloud/opentelemetry-collector v0.54.1-0.20220628041301-3b8dabd1bcd0
replace (
go.opentelemetry.io/collector => github.com/flashcatcloud/opentelemetry-collector v0.54.1-0.20220628041301-3b8dabd1bcd0
github.com/prometheus/client_golang => ../../flashcatcloud/client_golang
@ -582,6 +582,8 @@ github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/krallistic/kazoo-go v0.0.0-20170526135507-a15279744f4e h1:IWiVY66Xy9YrDZ28qJMt1UTlh6x9UGW0aDH/o58CSnA=
github.com/krallistic/kazoo-go v0.0.0-20170526135507-a15279744f4e/go.mod h1:Rq6003vCNoJNrT6ol0hMebQ3GWLWXSHrD/QcMlXt0EE=
github.com/leodido/go-urn v1.2.0/go.mod h1:+8+nEpDfqqsY+g338gtMEUOtuK+4dEMhiQEgxpxOKII=
github.com/lib/pq v1.0.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo=
github.com/lib/pq v1.2.0 h1:LXpIM/LZ5xGFhOpXAQUIMM1HdyqzVYM13zNdjCEEcA0=
@ -854,6 +856,7 @@ github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQD
github.com/ryanuber/columnize v0.0.0-20160712163229-9b3edd62028f/go.mod h1:sm1tb6uqfes/u+d4ooFouqFdy9/2g9QGwK3SQygK0Ts=
github.com/ryanuber/columnize v2.1.0+incompatible/go.mod h1:sm1tb6uqfes/u+d4ooFouqFdy9/2g9QGwK3SQygK0Ts=
github.com/ryanuber/go-glob v1.0.0/go.mod h1:807d1WSdnB0XRJzKNil9Om6lcp/3a0v4qIHxIXzX/Yc=
github.com/samuel/go-zookeeper v0.0.0-20190923202752-2cc03de413da h1:p3Vo3i64TCLY7gIfzeQaUJ+kppEO5WQG3cL8iE8tGHU=
github.com/samuel/go-zookeeper v0.0.0-20190923202752-2cc03de413da/go.mod h1:gi+0XIa01GRL2eRQVjQkKGqKF3SF9vZR/HnPullcV2E=
github.com/sanity-io/litter v1.2.0/go.mod h1:JF6pZUFgu2Q0sBZ+HSV35P8TVPI1TTzEwyu9FXAw2W4=
github.com/schollz/progressbar/v2 v2.13.2/go.mod h1:6YZjqdthH6SCZKv2rqGryrxPtfmRB/DWZxSMfCXPyD8=
@ -945,6 +948,10 @@ github.com/xdg-go/scram v1.1.1/go.mod h1:RaEWvsqvNKKvBPvcKeFjrG2cJqOkHTiyTpzz23n
github.com/xdg-go/stringprep v1.0.2/go.mod h1:8F9zXuvzgwmyT5DUm4GUfZGDdT3W+LCvS6+da4O5kxM=
github.com/xdg-go/stringprep v1.0.3 h1:kdwGpVNwPFtjs98xCGkHjQtGKh86rDcRZN17QEMCOIs=
github.com/xdg-go/stringprep v1.0.3/go.mod h1:W3f5j4i+9rC0kuIEJL0ky1VpHXQU3ocBgklLGvcBnW8=
github.com/xdg/scram v1.0.5 h1:TuS0RFmt5Is5qm9Tm2SoD89OPqe4IRiFtyFY4iwWXsw=
github.com/xdg/scram v1.0.5/go.mod h1:lB8K/P019DLNhemzwFU4jHLhdvlE6uDZjXFejJXr49I=
github.com/xdg/stringprep v1.0.3 h1:cmL5Enob4W83ti/ZHuZLuKD/xqJfus4fVPwE+/BDm+4=
github.com/xdg/stringprep v1.0.3/go.mod h1:Jhud4/sHMO4oL310DaZAKk9ZaJ08SJfe+sJh0HrGL1Y=
github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q53MR2AWcXfiuqkDkRtnGDLqkBTpCHuJHxtU=
github.com/yuin/goldmark v1.1.25/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
@ -0,0 +1,76 @@
package inputs
import (
pp "flashcat.cloud/categraf/parser/prometheus"
dto "github.com/prometheus/client_model/go"
const capMetricChan = 1000
var parser = new(pp.Parser)
func Collect(e prometheus.Collector, slist *list.SafeList) error {
if e == nil {
return errors.New("exporter must not be nil")
metricChan := make(chan prometheus.Metric, capMetricChan)
go func() {
for metric := range metricChan {
if metric == nil {
desc := metric.Desc()
if desc.Err() != nil {
log.Println("E! got invalid metric:", desc.Name(), desc.Err())
dtoMetric := &dto.Metric{}
err := metric.Write(dtoMetric)
if err != nil {
log.Println("E! failed to write metric:", desc.String())
labels := map[string]string{}
for _, kv := range desc.ConstLabels() {
labels[*kv.Name] = *kv.Value
for _, kv := range dtoMetric.Label {
labels[*kv.Name] = *kv.Value
switch {
case dtoMetric.Counter != nil:
_ = slist.PushFront(types.NewSample(desc.Name(), *dtoMetric.Counter.Value, labels))
case dtoMetric.Gauge != nil:
_ = slist.PushFront(types.NewSample(desc.Name(), *dtoMetric.Gauge.Value, labels))
case dtoMetric.Summary != nil:
parser.HandleSummary(dtoMetric, nil, desc.Name(), slist)
case dtoMetric.Histogram != nil:
parser.HandleHistogram(dtoMetric, nil, desc.Name(), slist)
_ = slist.PushFront(types.NewSample(desc.Name(), *dtoMetric.Untyped.Value, labels))
return nil
@ -100,5 +100,5 @@ func (c *Conntrack) Gather(slist *list.SafeList) {
log.Println("E! Conntrack input failed to collect metrics. Is the conntrack kernel module loaded?")
inputs.PushSamples(slist, fields)
types.PushSamples(slist, fields)
@ -9,6 +9,7 @@ import (
const inputName = "cpu"
@ -95,7 +96,7 @@ func (c *CPUStats) Gather(slist *list.SafeList) {
"usage_active": 100 * (active - lastActive) / totalDelta,
inputs.PushSamples(slist, fields, tags)
types.PushSamples(slist, fields, tags)
c.lastStats = make(map[string]cpuUtil.TimesStat)
@ -8,6 +8,7 @@ import (
@ -84,7 +85,7 @@ func (s *DiskStats) Gather(slist *list.SafeList) {
"inodes_used": du.InodesUsed,
inputs.PushSamples(slist, fields, tags)
types.PushSamples(slist, fields, tags)
@ -8,6 +8,7 @@ import (
@ -81,6 +82,6 @@ func (d *DiskIO) Gather(slist *list.SafeList) {
"merged_writes": io.MergedWriteCount,
inputs.PushSamples(slist, fields, map[string]string{"name": io.Name})
types.PushSamples(slist, fields, map[string]string{"name": io.Name})
@ -17,12 +17,13 @@ import (
tlsx "flashcat.cloud/categraf/pkg/tls"
itypes "flashcat.cloud/categraf/types"
tlsx "flashcat.cloud/categraf/pkg/tls"
itypes "flashcat.cloud/categraf/types"
const inputName = "docker"
@ -167,7 +168,7 @@ func (ins *Instance) gatherOnce(slist *list.SafeList) {
if ins.client == nil {
c, err := ins.getNewClient()
if err != nil {
slist.PushFront(inputs.NewSample("docker_up", 0, ins.Labels))
slist.PushFront(itypes.NewSample("docker_up", 0, ins.Labels))
log.Println("E! failed to new docker client:", err)
@ -177,12 +178,12 @@ func (ins *Instance) gatherOnce(slist *list.SafeList) {
defer ins.client.Close()
if err := ins.gatherInfo(slist); err != nil {
slist.PushFront(inputs.NewSample("docker_up", 0, ins.Labels))
slist.PushFront(itypes.NewSample("docker_up", 0, ins.Labels))
log.Println("E! failed to gather docker info:", err)
slist.PushFront(inputs.NewSample("docker_up", 1, ins.Labels))
slist.PushFront(itypes.NewSample("docker_up", 1, ins.Labels))
if ins.GatherServices {
@ -346,10 +347,10 @@ func (ins *Instance) gatherContainerInspect(container types.Container, slist *li
statefields["docker_container_status_uptime"] = uptime.Seconds()
inputs.PushSamples(slist, statefields, tags, ins.Labels)
itypes.PushSamples(slist, statefields, tags, ins.Labels)
if info.State.Health != nil {
slist.PushFront(inputs.NewSample("docker_container_health_failing_streak", info.ContainerJSONBase.State.Health.FailingStreak, tags, ins.Labels))
slist.PushFront(itypes.NewSample("docker_container_health_failing_streak", info.ContainerJSONBase.State.Health.FailingStreak, tags, ins.Labels))
ins.parseContainerStats(v, slist, tags, daemonOSType)
@ -429,7 +430,7 @@ func (ins *Instance) parseContainerStats(stat *types.StatsJSON, slist *list.Safe
memfields["docker_container_mem_private_working_set"] = stat.MemoryStats.PrivateWorkingSet
inputs.PushSamples(slist, memfields, tags, ins.Labels)
itypes.PushSamples(slist, memfields, tags, ins.Labels)
// cpu
@ -454,7 +455,7 @@ func (ins *Instance) parseContainerStats(stat *types.StatsJSON, slist *list.Safe
cpufields["docker_container_cpu_usage_percent"] = cpuPercent
inputs.PushSamples(slist, cpufields, map[string]string{"cpu": "cpu-total"}, tags, ins.Labels)
itypes.PushSamples(slist, cpufields, map[string]string{"cpu": "cpu-total"}, tags, ins.Labels)
if choice.Contains("cpu", ins.PerDeviceInclude) && len(stat.CPUStats.CPUUsage.PercpuUsage) > 0 {
@ -466,7 +467,7 @@ func (ins *Instance) parseContainerStats(stat *types.StatsJSON, slist *list.Safe
for i, percpu := range percpuusage {
map[string]string{"cpu": fmt.Sprintf("cpu%d", i)},
@ -492,7 +493,7 @@ func (ins *Instance) parseContainerStats(stat *types.StatsJSON, slist *list.Safe
if choice.Contains("network", ins.PerDeviceInclude) {
inputs.PushSamples(slist, netfields, map[string]string{"network": network}, tags, ins.Labels)
itypes.PushSamples(slist, netfields, map[string]string{"network": network}, tags, ins.Labels)
if choice.Contains("network", ins.TotalInclude) {
@ -519,7 +520,7 @@ func (ins *Instance) parseContainerStats(stat *types.StatsJSON, slist *list.Safe
// totalNetworkStatMap could be empty if container is running with --net=host.
if choice.Contains("network", ins.TotalInclude) && len(totalNetworkStatMap) != 0 {
inputs.PushSamples(slist, totalNetworkStatMap, map[string]string{"network": "total"}, tags, ins.Labels)
itypes.PushSamples(slist, totalNetworkStatMap, map[string]string{"network": "total"}, tags, ins.Labels)
ins.gatherBlockIOMetrics(slist, stat, tags)
@ -535,7 +536,7 @@ func (ins *Instance) gatherBlockIOMetrics(slist *list.SafeList, stat *types.Stat
totalStatMap := make(map[string]interface{})
for device, fields := range deviceStatMap {
if perDeviceBlkio {
inputs.PushSamples(slist, fields, map[string]string{"device": device}, tags, ins.Labels)
itypes.PushSamples(slist, fields, map[string]string{"device": device}, tags, ins.Labels)
if totalBlkio {
for field, value := range fields {
@ -560,7 +561,7 @@ func (ins *Instance) gatherBlockIOMetrics(slist *list.SafeList, stat *types.Stat
if totalBlkio {
inputs.PushSamples(slist, totalStatMap, map[string]string{"device": "total"}, tags, ins.Labels)
itypes.PushSamples(slist, totalStatMap, map[string]string{"device": "total"}, tags, ins.Labels)
@ -691,7 +692,7 @@ func (ins *Instance) gatherSwarmInfo(slist *list.SafeList) {
log.Println("E! Unknown replica mode")
inputs.PushSamples(slist, fields, tags, ins.Labels)
itypes.PushSamples(slist, fields, tags, ins.Labels)
@ -719,7 +720,7 @@ func (ins *Instance) gatherInfo(slist *list.SafeList) error {
"docker_memory_total": info.MemTotal,
inputs.PushSamples(slist, fields, ins.Labels)
itypes.PushSamples(slist, fields, ins.Labels)
return nil
@ -242,7 +242,7 @@ func (ins *Instance) gatherOnce(slist *list.SafeList) {
// Gather node ID
if info.nodeID, err = ins.gatherNodeID(s + "/_nodes/_local/name"); err != nil {
slist.PushFront(inputs.NewSample("up", 0, ins.Labels))
slist.PushFront(types.NewSample("up", 0, ins.Labels))
log.Println("E! failed to gather node id:", err)
@ -250,12 +250,12 @@ func (ins *Instance) gatherOnce(slist *list.SafeList) {
// get cat/master information here so NodeStats can determine
// whether this node is the Master
if info.masterID, err = ins.getCatMaster(s + "/_cat/master"); err != nil {
slist.PushFront(inputs.NewSample("up", 0, ins.Labels))
slist.PushFront(types.NewSample("up", 0, ins.Labels))
log.Println("E! failed to get cat master:", err)
slist.PushFront(inputs.NewSample("up", 1, ins.Labels))
slist.PushFront(types.NewSample("up", 1, ins.Labels))
ins.serverInfo[s] = info
@ -328,7 +328,7 @@ func (ins *Instance) gatherIndicesStats(url string, slist *list.SafeList) error
// Total Shards Stats
for k, v := range indicesStats.Shards {
slist.PushFront(inputs.NewSample("indices_stats_shards_total_"+k, v, ins.Labels))
slist.PushFront(types.NewSample("indices_stats_shards_total_"+k, v, ins.Labels))
// All Stats
@ -340,7 +340,7 @@ func (ins *Instance) gatherIndicesStats(url string, slist *list.SafeList) error
return err
for key, val := range jsonParser.Fields {
slist.PushFront(inputs.NewSample("indices_stats_"+m+"_"+key, val, map[string]string{"index_name": "_all"}, ins.Labels))
slist.PushFront(types.NewSample("indices_stats_"+m+"_"+key, val, map[string]string{"index_name": "_all"}, ins.Labels))
@ -393,7 +393,7 @@ func (ins *Instance) gatherSingleIndexStats(name string, index indexStat, slist
return err
for key, val := range f.Fields {
slist.PushFront(inputs.NewSample("indices_stats_"+m+"_"+key, val, indexTag, ins.Labels))
slist.PushFront(types.NewSample("indices_stats_"+m+"_"+key, val, indexTag, ins.Labels))
@ -436,7 +436,7 @@ func (ins *Instance) gatherSingleIndexStats(name string, index indexStat, slist
for key, val := range flattened.Fields {
slist.PushFront(inputs.NewSample("indices_stats_shards_"+key, val, shardTags, ins.Labels))
slist.PushFront(types.NewSample("indices_stats_shards_"+key, val, shardTags, ins.Labels))
@ -501,7 +501,7 @@ func (ins *Instance) gatherClusterStats(url string, slist *list.SafeList) error
for key, val := range f.Fields {
slist.PushFront(inputs.NewSample("clusterstats_"+p+"_"+key, val, tags, ins.Labels))
slist.PushFront(types.NewSample("clusterstats_"+p+"_"+key, val, tags, ins.Labels))
@ -531,7 +531,7 @@ func (ins *Instance) gatherClusterHealth(url string, slist *list.SafeList) error
"cluster_health_unassigned_shards": healthStats.UnassignedShards,
inputs.PushSamples(slist, clusterFields, map[string]string{"name": healthStats.ClusterName}, ins.Labels)
types.PushSamples(slist, clusterFields, map[string]string{"name": healthStats.ClusterName}, ins.Labels)
for name, health := range healthStats.Indices {
indexFields := map[string]interface{}{
@ -544,7 +544,7 @@ func (ins *Instance) gatherClusterHealth(url string, slist *list.SafeList) error
"cluster_health_indices_status_code": mapHealthStatusToCode(health.Status),
"cluster_health_indices_unassigned_shards": health.UnassignedShards,
inputs.PushSamples(slist, indexFields, map[string]string{"index": name, "name": healthStats.ClusterName}, ins.Labels)
types.PushSamples(slist, indexFields, map[string]string{"index": name, "name": healthStats.ClusterName}, ins.Labels)
return nil
@ -571,7 +571,7 @@ func (ins *Instance) gatherNodeStats(url string, slist *list.SafeList) error {
for k, v := range n.Attributes {
slist.PushFront(inputs.NewSample("node_attribute_"+k, v, tags, ins.Labels))
slist.PushFront(types.NewSample("node_attribute_"+k, v, tags, ins.Labels))
stats := map[string]interface{}{
@ -600,7 +600,7 @@ func (ins *Instance) gatherNodeStats(url string, slist *list.SafeList) error {
for key, val := range f.Fields {
slist.PushFront(inputs.NewSample(p+"_"+key, val, tags, ins.Labels))
slist.PushFront(types.NewSample(p+"_"+key, val, tags, ins.Labels))
@ -221,7 +221,7 @@ func (ins *Instance) gather(slist *list.SafeList, target string) {
defer func() {
for field, value := range fields {
slist.PushFront(inputs.NewSample(field, value, labels))
slist.PushFront(types.NewSample(field, value, labels))
@ -2,8 +2,6 @@ package inputs
import (
@ -22,52 +20,3 @@ var InputCreators = map[string]Creator{}
func Add(name string, creator Creator) {
InputCreators[name] = creator
func NewSample(metric string, value interface{}, labels ...map[string]string) *types.Sample {
floatValue, err := conv.ToFloat64(value)
if err != nil {
return nil
s := &types.Sample{
Metric: metric,
Value: floatValue,
Labels: make(map[string]string),
for i := 0; i < len(labels); i++ {
for k, v := range labels[i] {
if v == "-" {
s.Labels[k] = v
return s
func NewSamples(fields map[string]interface{}, labels ...map[string]string) []*types.Sample {
count := len(fields)
samples := make([]*types.Sample, 0, count)
for metric, value := range fields {
floatValue, err := conv.ToFloat64(value)
if err != nil {
samples = append(samples, NewSample(metric, floatValue, labels...))
return samples
func PushSamples(slist *list.SafeList, fields map[string]interface{}, labels ...map[string]string) {
for metric, value := range fields {
floatValue, err := conv.ToFloat64(value)
if err != nil {
slist.PushFront(NewSample(metric, floatValue, labels...))
@ -0,0 +1,18 @@
# kafka
kafka 监控采集插件,封装kafka-exporter(https://github.com/davidmparrott/kafka_exporter)而来
## Configuration
# # collect interval
# interval = 15
# 要监控 MySQL,首先要给出要监控的MySQL的连接地址、用户名、密码
## 监控大盘和告警规则
本 README 的同级目录,大家可以看到 dashboard.json 就是监控大盘,导入夜莺就可以使用,alerts.json 是告警规则,也是导入夜莺就可以使用。
@ -0,0 +1,860 @@
package exporter
import (
const (
namespace = "kafka"
clientID = "kafka_exporter"
var (
clusterBrokers *prometheus.Desc
topicPartitions *prometheus.Desc
topicCurrentOffset *prometheus.Desc
topicOldestOffset *prometheus.Desc
topicPartitionLeader *prometheus.Desc
topicPartitionReplicas *prometheus.Desc
topicPartitionInSyncReplicas *prometheus.Desc
topicPartitionUsesPreferredReplica *prometheus.Desc
topicUnderReplicatedPartition *prometheus.Desc
consumergroupCurrentOffset *prometheus.Desc
consumergroupCurrentOffsetSum *prometheus.Desc
consumergroupUncomittedOffsets *prometheus.Desc
consumergroupUncommittedOffsetsSum *prometheus.Desc
consumergroupUncommittedOffsetsZookeeper *prometheus.Desc
consumergroupMembers *prometheus.Desc
topicPartitionLagMillis *prometheus.Desc
lagDatapointUsedInterpolation *prometheus.Desc
lagDatapointUsedExtrapolation *prometheus.Desc
// Exporter collects Kafka stats from the given server and exports them using
// the prometheus metrics package.
type Exporter struct {
client sarama.Client
topicFilter *regexp.Regexp
groupFilter *regexp.Regexp
mu sync.Mutex
useZooKeeperLag bool
zookeeperClient *kazoo.Kazoo
nextMetadataRefresh time.Time
metadataRefreshInterval time.Duration
allowConcurrent bool
sgMutex sync.Mutex
sgWaitCh chan struct{}
sgChans []chan<- prometheus.Metric
consumerGroupFetchAll bool
consumerGroupLagTable interpolationMap
kafkaOpts Options
saramaConfig *sarama.Config
logger log.Logger
type Options struct {
Uri []string
UseSASL bool
UseSASLHandshake bool
SaslUsername string
SaslPassword string
SaslMechanism string
UseTLS bool
TlsCAFile string
TlsCertFile string
TlsKeyFile string
TlsInsecureSkipTLSVerify bool
KafkaVersion string
UseZooKeeperLag bool
UriZookeeper []string
Labels string
MetadataRefreshInterval string
AllowConcurrent bool
MaxOffsets int
PruneIntervalSeconds int
// CanReadCertAndKey returns true if the certificate and key files already exists,
// otherwise returns false. If lost one of cert and key, returns error.
func CanReadCertAndKey(certPath, keyPath string) (bool, error) {
certReadable := canReadFile(certPath)
keyReadable := canReadFile(keyPath)
if certReadable == false && keyReadable == false {
return false, nil
if certReadable == false {
return false, fmt.Errorf("error reading %s, certificate and key must be supplied as a pair", certPath)
if keyReadable == false {
return false, fmt.Errorf("error reading %s, certificate and key must be supplied as a pair", keyPath)
return true, nil
// If the file represented by path exists and
// readable, returns true otherwise returns false.
func canReadFile(path string) bool {
f, err := os.Open(path)
if err != nil {
return false
defer f.Close()
return true
// New returns an initialized Exporter.
func New(logger log.Logger, opts Options, topicFilter, groupFilter string) (*Exporter, error) {
var zookeeperClient *kazoo.Kazoo
config := sarama.NewConfig()
config.ClientID = clientID
kafkaVersion, err := sarama.ParseKafkaVersion(opts.KafkaVersion)
if err != nil {
return nil, err
config.Version = kafkaVersion
if opts.UseSASL {
// Convert to lowercase so that SHA512 and SHA256 is still valid
opts.SaslMechanism = strings.ToLower(opts.SaslMechanism)
switch opts.SaslMechanism {
case "scram-sha512":
config.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient { return &XDGSCRAMClient{HashGeneratorFcn: SHA512} }
config.Net.SASL.Mechanism = sarama.SASLMechanism(sarama.SASLTypeSCRAMSHA512)
case "scram-sha256":
config.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient { return &XDGSCRAMClient{HashGeneratorFcn: SHA256} }
config.Net.SASL.Mechanism = sarama.SASLMechanism(sarama.SASLTypeSCRAMSHA256)
case "plain":
level.Error(logger).Log("msg", "invalid sasl mechanism. can only be \"scram-sha256\", \"scram-sha512\" or \"plain\"", "SaslMechanism", opts.SaslMechanism)
return nil, fmt.Errorf("invalid sasl mechanism \"%s\": can only be \"scram-sha256\", \"scram-sha512\" or \"plain\"", opts.SaslMechanism)
config.Net.SASL.Enable = true
config.Net.SASL.Handshake = opts.UseSASLHandshake
if opts.SaslUsername != "" {
config.Net.SASL.User = opts.SaslUsername
if opts.SaslPassword != "" {
config.Net.SASL.Password = opts.SaslPassword
if opts.UseTLS {
config.Net.TLS.Enable = true
config.Net.TLS.Config = &tls.Config{
RootCAs: x509.NewCertPool(),
InsecureSkipVerify: opts.TlsInsecureSkipTLSVerify,
if opts.TlsCAFile != "" {
if ca, err := ioutil.ReadFile(opts.TlsCAFile); err == nil {
} else {
level.Error(logger).Log("msg", "unable to open TlsCAFile", "TlsCAFile", opts.TlsCAFile)
return nil, fmt.Errorf("UseTLS is true but unable to open TlsCAFile: %s", opts.TlsCAFile)
canReadCertAndKey, err := CanReadCertAndKey(opts.TlsCertFile, opts.TlsKeyFile)
if err != nil {
level.Error(logger).Log("msg", "Error attempting to read TlsCertFile or TlsKeyFile", "err", err.Error())
return nil, err
if canReadCertAndKey {
cert, err := tls.LoadX509KeyPair(opts.TlsCertFile, opts.TlsKeyFile)
if err == nil {
config.Net.TLS.Config.Certificates = []tls.Certificate{cert}
} else {
level.Error(logger).Log("msg", "Error attempting to load X509KeyPair", "err", err.Error())
return nil, err
if opts.UseZooKeeperLag {
zookeeperClient, err = kazoo.NewKazoo(opts.UriZookeeper, nil)
interval, err := time.ParseDuration(opts.MetadataRefreshInterval)
if err != nil {
level.Error(logger).Log("msg", "Error parsing metadata refresh interval", "err", err.Error())
return nil, err
config.Metadata.RefreshFrequency = interval
client, err := sarama.NewClient(opts.Uri, config)
if err != nil {
level.Error(logger).Log("msg", "Error initiating kafka client: %s", "err", err.Error())
return nil, err
level.Debug(logger).Log("msg", "Done with kafka client initialization")
// Init our exporter.
newExporter := &Exporter{
client: client,
topicFilter: regexp.MustCompile(topicFilter),
groupFilter: regexp.MustCompile(groupFilter),
useZooKeeperLag: opts.UseZooKeeperLag,
zookeeperClient: zookeeperClient,
nextMetadataRefresh: time.Now(),
metadataRefreshInterval: interval,
allowConcurrent: opts.AllowConcurrent,
sgMutex: sync.Mutex{},
sgWaitCh: nil,
sgChans: []chan<- prometheus.Metric{},
consumerGroupFetchAll: config.Version.IsAtLeast(sarama.V2_0_0_0),
consumerGroupLagTable: interpolationMap{mu: sync.Mutex{}},
kafkaOpts: opts,
saramaConfig: config,
logger: logger,
level.Debug(logger).Log("msg", "Initializing metrics")
return newExporter, nil
func (e *Exporter) fetchOffsetVersion() int16 {
version := e.client.Config().Version
if e.client.Config().Version.IsAtLeast(sarama.V2_0_0_0) {
return 4
} else if version.IsAtLeast(sarama.V0_10_2_0) {
return 2
} else if version.IsAtLeast(sarama.V0_8_2_2) {
return 1
return 0
// Describe describes all the metrics ever exported by the Kafka exporter. It
// implements prometheus.Collector.
func (e *Exporter) Describe(ch chan<- *prometheus.Desc) {
ch <- clusterBrokers
ch <- topicCurrentOffset
ch <- topicOldestOffset
ch <- topicPartitions
ch <- topicPartitionLeader
ch <- topicPartitionReplicas
ch <- topicPartitionInSyncReplicas
ch <- topicPartitionUsesPreferredReplica
ch <- topicUnderReplicatedPartition
ch <- consumergroupCurrentOffset
ch <- consumergroupCurrentOffsetSum
ch <- consumergroupUncomittedOffsets
ch <- consumergroupUncommittedOffsetsZookeeper
ch <- consumergroupUncommittedOffsetsSum
ch <- topicPartitionLagMillis
ch <- lagDatapointUsedInterpolation
ch <- lagDatapointUsedExtrapolation
func (e *Exporter) Collect(ch chan<- prometheus.Metric) {
if e.allowConcurrent {
// Locking to avoid race add
e.sgChans = append(e.sgChans, ch)
// Safe to compare length since we own the Lock
if len(e.sgChans) == 1 {
e.sgWaitCh = make(chan struct{})
go e.collectChans(e.sgWaitCh)
} else {
level.Info(e.logger).Log("msg", "concurrent calls detected, waiting for first to finish")
// Put in another variable to ensure not overwriting it in another Collect once we wait
waiter := e.sgWaitCh
// Released lock, we have insurance that our chan will be part of the collectChan slice
// collectChan finished
// Collect fetches the stats from configured Kafka location and delivers them
// as Prometheus metrics. It implements prometheus.Collector.
func (e *Exporter) collectChans(quit chan struct{}) {
original := make(chan prometheus.Metric)
container := make([]prometheus.Metric, 0, 100)
go func() {
for metric := range original {
container = append(container, metric)
// Lock to avoid modification on the channel slice
for _, ch := range e.sgChans {
for _, metric := range container {
ch <- metric
// Reset the slice
e.sgChans = e.sgChans[:0]
// Notify remaining waiting Collect they can return
// Release the lock so Collect can append to the slice again
func (e *Exporter) collect(ch chan<- prometheus.Metric) {
var wg = sync.WaitGroup{}
ch <- prometheus.MustNewConstMetric(
clusterBrokers, prometheus.GaugeValue, float64(len(e.client.Brokers())),
now := time.Now()
if now.After(e.nextMetadataRefresh) {
level.Info(e.logger).Log("msg", "Refreshing client metadata")
if err := e.client.RefreshMetadata(); err != nil {
level.Error(e.logger).Log("msg", "Error refreshing topics. Using cached topic data", "err", err.Error())
e.nextMetadataRefresh = now.Add(e.metadataRefreshInterval)
topics, err := e.client.Topics()
if err != nil {
level.Error(e.logger).Log("msg", "Error getting topics: %s. Skipping metric generation", "err", err.Error())
level.Info(e.logger).Log("msg", "Generating topic metrics")
for _, topic := range topics {
topic := topic
go func() {
defer wg.Done()
e.metricsForTopic(topic, ch)
level.Debug(e.logger).Log("msg", "waiting for topic metric generation to complete")
level.Info(e.logger).Log("msg", "Generating consumergroup metrics")
if len(e.client.Brokers()) > 0 {
for _, broker := range e.client.Brokers() {
broker := broker
go func() {
defer wg.Done()
e.metricsForConsumerGroup(broker, ch)
level.Debug(e.logger).Log("msg", "waiting for consumergroup metric generation to complete")
} else {
level.Error(e.logger).Log("msg", "No brokers found. Unable to generate topic metrics")
level.Info(e.logger).Log("msg", "Calculating consumergroup lag")
go func() {
defer wg.Done()
level.Debug(e.logger).Log("msg", "waiting for consumergroup lag estimation metric generation to complete")
func (e *Exporter) metricsForTopic(topic string, ch chan<- prometheus.Metric) {
level.Debug(e.logger).Log("msg", "Fetching topic metrics", "topic", topic)
if e.topicFilter.MatchString(topic) {
partitions, err := e.client.Partitions(topic)
if err != nil {
level.Error(e.logger).Log("msg", "Error getting partitions for topic", "topic", topic, "err", err.Error())
ch <- prometheus.MustNewConstMetric(
topicPartitions, prometheus.GaugeValue, float64(len(partitions)), topic,
offset := make(map[int32]int64, len(partitions))
for _, partition := range partitions {
broker, err := e.client.Leader(topic, partition)
if err != nil {
level.Error(e.logger).Log("msg", "Error getting leader for topic/partition", "topic", topic, "partition", partition, "err", err.Error())
} else {
ch <- prometheus.MustNewConstMetric(
topicPartitionLeader, prometheus.GaugeValue, float64(broker.ID()), topic, strconv.FormatInt(int64(partition), 10),
currentOffset, err := e.client.GetOffset(topic, partition, sarama.OffsetNewest)
if err != nil {
level.Error(e.logger).Log("msg", "Error getting offset for topic/partition", "topic", topic, "partition", partition, "err", err.Error())
} else {
offset[partition] = currentOffset
ch <- prometheus.MustNewConstMetric(
topicCurrentOffset, prometheus.GaugeValue, float64(currentOffset), topic, strconv.FormatInt(int64(partition), 10),
oldestOffset, err := e.client.GetOffset(topic, partition, sarama.OffsetOldest)
if err != nil {
level.Error(e.logger).Log("msg", "Error getting oldest offset for topic/partition", "topic", topic, "partition", partition, "err", err.Error())
} else {
ch <- prometheus.MustNewConstMetric(
topicOldestOffset, prometheus.GaugeValue, float64(oldestOffset), topic, strconv.FormatInt(int64(partition), 10),
replicas, err := e.client.Replicas(topic, partition)
if err != nil {
level.Error(e.logger).Log("msg", "Error getting replicas for topic/partition", "topic", topic, "partition", partition, "err", err.Error())
} else {
ch <- prometheus.MustNewConstMetric(
topicPartitionReplicas, prometheus.GaugeValue, float64(len(replicas)), topic, strconv.FormatInt(int64(partition), 10),
inSyncReplicas, err := e.client.InSyncReplicas(topic, partition)
if err != nil {
level.Error(e.logger).Log("msg", "Error getting in-sync replicas for topic/partition", "topic", topic, "partition", partition, "err", err.Error())
} else {
ch <- prometheus.MustNewConstMetric(
topicPartitionInSyncReplicas, prometheus.GaugeValue, float64(len(inSyncReplicas)), topic, strconv.FormatInt(int64(partition), 10),
if broker != nil && replicas != nil && len(replicas) > 0 && broker.ID() == replicas[0] {
ch <- prometheus.MustNewConstMetric(
topicPartitionUsesPreferredReplica, prometheus.GaugeValue, float64(1), topic, strconv.FormatInt(int64(partition), 10),
} else {
ch <- prometheus.MustNewConstMetric(
topicPartitionUsesPreferredReplica, prometheus.GaugeValue, float64(0), topic, strconv.FormatInt(int64(partition), 10),
if replicas != nil && inSyncReplicas != nil && len(inSyncReplicas) < len(replicas) {
ch <- prometheus.MustNewConstMetric(
topicUnderReplicatedPartition, prometheus.GaugeValue, float64(1), topic, strconv.FormatInt(int64(partition), 10),
} else {
ch <- prometheus.MustNewConstMetric(
topicUnderReplicatedPartition, prometheus.GaugeValue, float64(0), topic, strconv.FormatInt(int64(partition), 10),
if e.useZooKeeperLag {
ConsumerGroups, err := e.zookeeperClient.Consumergroups()
if err != nil {
level.Error(e.logger).Log("msg", "Error getting consumergroups from ZooKeeper", "err", err.Error())
for _, group := range ConsumerGroups {
offset, _ := group.FetchOffset(topic, partition)
if offset > 0 {
consumerGroupLag := currentOffset - offset
ch <- prometheus.MustNewConstMetric(
consumergroupUncommittedOffsetsZookeeper, prometheus.GaugeValue, float64(consumerGroupLag), group.Name, topic, strconv.FormatInt(int64(partition), 10),
func (e *Exporter) metricsForConsumerGroup(broker *sarama.Broker, ch chan<- prometheus.Metric) {
level.Debug(e.logger).Log("msg", "Fetching consumer group metrics for broker", "broker", broker.ID())
if err := broker.Open(e.client.Config()); err != nil && err != sarama.ErrAlreadyConnected {
level.Error(e.logger).Log("msg", "Error connecting to broker", "broker", broker.ID(), "err", err.Error())
defer broker.Close()
level.Debug(e.logger).Log("msg", "listing consumergroups for broker", "broker", broker.ID())
groups, err := broker.ListGroups(&sarama.ListGroupsRequest{})
if err != nil {
level.Error(e.logger).Log("msg", "Error listing consumergroups for broker", "broker", broker.ID(), "err", err.Error())
groupIds := make([]string, 0)
for groupId := range groups.Groups {
if e.groupFilter.MatchString(groupId) {
groupIds = append(groupIds, groupId)
level.Debug(e.logger).Log("msg", "describing consumergroups for broker", "broker", broker.ID())
describeGroups, err := broker.DescribeGroups(&sarama.DescribeGroupsRequest{Groups: groupIds})
if err != nil {
level.Error(e.logger).Log("msg", "Error from broker.DescribeGroups()", "err", err.Error())
for _, group := range describeGroups.Groups {
offsetFetchRequest := sarama.OffsetFetchRequest{ConsumerGroup: group.GroupId, Version: e.fetchOffsetVersion()}
if !e.consumerGroupFetchAll {
//TODO: currently this will never add partitions to the request since the only place insertions to the table are done is further down in this method
for topic, partitions := range e.consumerGroupLagTable.iMap[group.GroupId] {
for partition := range partitions {
offsetFetchRequest.AddPartition(topic, partition)
ch <- prometheus.MustNewConstMetric(
consumergroupMembers, prometheus.GaugeValue, float64(len(group.Members)), group.GroupId,
level.Debug(e.logger).Log("msg", "fetching offsets for broker/group", "broker", broker.ID(), "group", group.GroupId)
if offsetFetchResponse, err := broker.FetchOffset(&offsetFetchRequest); err != nil {
level.Error(e.logger).Log("msg", "Error fetching offset for consumergroup", "group", group.GroupId, "err", err.Error())
} else {
for topic, partitions := range offsetFetchResponse.Blocks {
if !e.topicFilter.MatchString(topic) {
// If the topic is not consumed by that consumer group, skip it
topicConsumed := false
for _, offsetFetchResponseBlock := range partitions {
// Kafka will return -1 if there is no offset associated with a topic-partition under that consumer group
if offsetFetchResponseBlock.Offset != -1 {
topicConsumed = true
if topicConsumed {
var currentOffsetSum int64
var lagSum int64
for partition, offsetFetchResponseBlock := range partitions {
kerr := offsetFetchResponseBlock.Err
if kerr != sarama.ErrNoError {
level.Error(e.logger).Log("msg", "Error in response block for topic/partition", "topic", topic, "partition", partition, "err", kerr.Error())
currentOffset := offsetFetchResponseBlock.Offset
currentOffsetSum += currentOffset
ch <- prometheus.MustNewConstMetric(
consumergroupCurrentOffset, prometheus.GaugeValue, float64(currentOffset), group.GroupId, topic, strconv.FormatInt(int64(partition), 10),
// Get and insert the next offset to be produced into the interpolation map
nextOffset, err := e.client.GetOffset(topic, partition, sarama.OffsetNewest)
if err != nil {
level.Error(e.logger).Log("msg", "Error getting next offset for topic/partition", "topic", topic, "partition", partition, "err", err.Error())
e.consumerGroupLagTable.createOrUpdate(group.GroupId, topic, partition, nextOffset)
// If the topic is consumed by that consumer group, but no offset associated with the partition
// forcing lag to -1 to be able to alert on that
var lag int64
if currentOffset == -1 {
lag = -1
} else {
lag = nextOffset - currentOffset
lagSum += lag
ch <- prometheus.MustNewConstMetric(
consumergroupUncomittedOffsets, prometheus.GaugeValue, float64(lag), group.GroupId, topic, strconv.FormatInt(int64(partition), 10),
ch <- prometheus.MustNewConstMetric(
consumergroupCurrentOffsetSum, prometheus.GaugeValue, float64(currentOffsetSum), group.GroupId, topic,
ch <- prometheus.MustNewConstMetric(
consumergroupUncommittedOffsetsSum, prometheus.GaugeValue, float64(lagSum), group.GroupId, topic,
func (e *Exporter) metricsForLag(ch chan<- prometheus.Metric) {
admin, err := sarama.NewClusterAdminFromClient(e.client)
if err != nil {
level.Error(e.logger).Log("msg", "Error creating cluster admin", "err", err.Error())
if admin == nil {
level.Error(e.logger).Log("msg", "Failed to create cluster admin")
// Iterate over all consumergroup/topic/partitions
for group, topics := range e.consumerGroupLagTable.iMap {
for topic, partitionMap := range topics {
var partitionKeys []int32
// Collect partitions to create ListConsumerGroupOffsets request
for key := range partitionMap {
partitionKeys = append(partitionKeys, key)
// response.Blocks is a map of topic to partition to offset
response, err := admin.ListConsumerGroupOffsets(group, map[string][]int32{
topic: partitionKeys,
if err != nil {
level.Error(e.logger).Log("msg", "Error listing offsets for", "group", group, "err", err.Error())
if response == nil {
level.Error(e.logger).Log("msg", "Got nil response from ListConsumerGroupOffsets for group", "group", group)
for partition, offsets := range partitionMap {
if len(offsets) < 2 {
level.Debug(e.logger).Log("msg", "Insufficient data for lag calculation for group: continuing", "group", group)
if latestConsumedOffset, ok := response.Blocks[topic][partition]; ok {
Sort offset keys so we know if we have an offset to use as a left bound in our calculation
If latestConsumedOffset < smallestMappedOffset then extrapolate
Else Find two offsets that bound latestConsumedOffset
var producedOffsets []int64
for offsetKey := range offsets {
producedOffsets = append(producedOffsets, offsetKey)
sort.Slice(producedOffsets, func(i, j int) bool { return producedOffsets[i] < producedOffsets[j] })
if latestConsumedOffset.Offset < producedOffsets[0] {
level.Debug(e.logger).Log("msg", "estimating lag for group/topic/partition", "group", group, "topic", topic, "partition", partition, "method", "extrapolation")
// Because we do not have data points that bound the latestConsumedOffset we must use extrapolation
highestOffset := producedOffsets[len(producedOffsets)-1]
lowestOffset := producedOffsets[0]
px := float64(offsets[highestOffset].UnixNano()/1000000) -
lagMillis := float64(time.Now().UnixNano()/1000000) - px
level.Debug(e.logger).Log("msg", "estimated lag for group/topic/partition (in ms)", "group", group, "topic", topic, "partition", partition, "lag", lagMillis)
ch <- prometheus.MustNewConstMetric(lagDatapointUsedExtrapolation, prometheus.CounterValue, 1, group, topic, strconv.FormatInt(int64(partition), 10))
ch <- prometheus.MustNewConstMetric(topicPartitionLagMillis, prometheus.GaugeValue, lagMillis, group, topic, strconv.FormatInt(int64(partition), 10))
} else {
level.Debug(e.logger).Log("msg", "estimating lag for group/topic/partition", "group", group, "topic", topic, "partition", partition, "method", "interpolation")
nextHigherOffset := getNextHigherOffset(producedOffsets, latestConsumedOffset.Offset)
nextLowerOffset := getNextLowerOffset(producedOffsets, latestConsumedOffset.Offset)
px := float64(offsets[nextHigherOffset].UnixNano()/1000000) -
lagMillis := float64(time.Now().UnixNano()/1000000) - px
level.Debug(e.logger).Log("msg", "estimated lag for group/topic/partition (in ms)", "group", group, "topic", topic, "partition", partition, "lag", lagMillis)
ch <- prometheus.MustNewConstMetric(lagDatapointUsedInterpolation, prometheus.CounterValue, 1, group, topic, strconv.FormatInt(int64(partition), 10))
ch <- prometheus.MustNewConstMetric(topicPartitionLagMillis, prometheus.GaugeValue, lagMillis, group, topic, strconv.FormatInt(int64(partition), 10))
} else {
level.Error(e.logger).Log("msg", "Could not get latest latest consumed offset", "group", group, "topic", topic, "partition", partition)
func getNextHigherOffset(offsets []int64, k int64) int64 {
index := len(offsets) - 1
max := offsets[index]
for max >= k && index > 0 {
if offsets[index-1] < k {
return max
max = offsets[index]
return max
func getNextLowerOffset(offsets []int64, k int64) int64 {
index := 0
min := offsets[index]
for min <= k && index < len(offsets)-1 {
if offsets[index+1] > k {
return min
min = offsets[index]
return min
//Run iMap.Prune() on an interval (default 30 seconds). A new client is created
//to avoid an issue where the client may be closed before Prune attempts to
//use it.
func (e *Exporter) RunPruner(quit chan struct{}) {
ticker := time.NewTicker(time.Duration(e.kafkaOpts.PruneIntervalSeconds) * time.Second)
for {
select {
case <-ticker.C:
client, err := sarama.NewClient(e.kafkaOpts.Uri, e.saramaConfig)
if err != nil {
level.Error(e.logger).Log("msg", "Error initializing kafka client for RunPruner", "err", err.Error())
e.consumerGroupLagTable.Prune(e.logger, client, e.kafkaOpts.MaxOffsets)
case <-quit:
func (e *Exporter) Close() {
func (e *Exporter) initializeMetrics() {
labels := make(map[string]string)
// Protect against empty labels
if e.kafkaOpts.Labels != "" {
for _, label := range strings.Split(e.kafkaOpts.Labels, ",") {
splitLabels := strings.Split(label, "=")
if len(splitLabels) >= 2 {
labels[splitLabels[0]] = splitLabels[1]
clusterBrokers = prometheus.NewDesc(
prometheus.BuildFQName(namespace, "", "brokers"),
"Number of Brokers in the Kafka Cluster.",
nil, labels,
topicPartitions = prometheus.NewDesc(
prometheus.BuildFQName(namespace, "topic", "partitions"),
"Number of partitions for this Topic",
[]string{"topic"}, labels,
topicCurrentOffset = prometheus.NewDesc(
prometheus.BuildFQName(namespace, "topic", "partition_current_offset"),
"Current Offset of a Broker at Topic/Partition",
[]string{"topic", "partition"}, labels,
topicOldestOffset = prometheus.NewDesc(
prometheus.BuildFQName(namespace, "topic", "partition_oldest_offset"),
"Oldest Offset of a Broker at Topic/Partition",
[]string{"topic", "partition"}, labels,
topicPartitionLeader = prometheus.NewDesc(
prometheus.BuildFQName(namespace, "topic", "partition_leader"),
"Leader Broker ID of this Topic/Partition",
[]string{"topic", "partition"}, labels,
topicPartitionReplicas = prometheus.NewDesc(
prometheus.BuildFQName(namespace, "topic", "partition_replicas"),
"Number of Replicas for this Topic/Partition",
[]string{"topic", "partition"}, labels,
topicPartitionInSyncReplicas = prometheus.NewDesc(
prometheus.BuildFQName(namespace, "topic", "partition_in_sync_replica"),
"Number of In-Sync Replicas for this Topic/Partition",
[]string{"topic", "partition"}, labels,
topicPartitionUsesPreferredReplica = prometheus.NewDesc(
prometheus.BuildFQName(namespace, "topic", "partition_leader_is_preferred"),
"1 if Topic/Partition is using the Preferred Broker",
[]string{"topic", "partition"}, labels,
topicUnderReplicatedPartition = prometheus.NewDesc(
prometheus.BuildFQName(namespace, "topic", "partition_under_replicated_partition"),
"1 if Topic/Partition is under Replicated",
[]string{"topic", "partition"}, labels,
consumergroupCurrentOffset = prometheus.NewDesc(
prometheus.BuildFQName(namespace, "consumergroup", "current_offset"),
"Current Offset of a ConsumerGroup at Topic/Partition",
[]string{"consumergroup", "topic", "partition"}, labels,
consumergroupCurrentOffsetSum = prometheus.NewDesc(
prometheus.BuildFQName(namespace, "consumergroup", "current_offset_sum"),
"Current Offset of a ConsumerGroup at Topic for all partitions",
[]string{"consumergroup", "topic"}, labels,
consumergroupUncomittedOffsets = prometheus.NewDesc(
prometheus.BuildFQName(namespace, "consumergroup", "uncommitted_offsets"),
"Current Approximate count of uncommitted offsets for a ConsumerGroup at Topic/Partition",
[]string{"consumergroup", "topic", "partition"}, labels,
consumergroupUncommittedOffsetsZookeeper = prometheus.NewDesc(
prometheus.BuildFQName(namespace, "consumergroupzookeeper", "uncommitted_offsets_zookeeper"),
"Current Approximate count of uncommitted offsets(zookeeper) for a ConsumerGroup at Topic/Partition",
[]string{"consumergroup", "topic", "partition"}, nil,
consumergroupUncommittedOffsetsSum = prometheus.NewDesc(
prometheus.BuildFQName(namespace, "consumergroup", "uncommitted_offsets_sum"),
"Current Approximate count of uncommitted offsets for a ConsumerGroup at Topic for all partitions",
[]string{"consumergroup", "topic"}, labels,
consumergroupMembers = prometheus.NewDesc(
prometheus.BuildFQName(namespace, "consumergroup", "members"),
"Amount of members in a consumer group",
[]string{"consumergroup"}, labels,
topicPartitionLagMillis = prometheus.NewDesc(
prometheus.BuildFQName(namespace, "consumer_lag", "millis"),
"Current approximation of consumer lag for a ConsumerGroup at Topic/Partition",
[]string{"consumergroup", "topic", "partition"},
lagDatapointUsedInterpolation = prometheus.NewDesc(prometheus.BuildFQName(namespace, "consumer_lag", "interpolation"),
"Indicates that a consumer group lag estimation used interpolation",
[]string{"consumergroup", "topic", "partition"},
lagDatapointUsedExtrapolation = prometheus.NewDesc(prometheus.BuildFQName(namespace, "consumer_lag", "extrapolation"),
"Indicates that a consumer group lag estimation used extrapolation",
[]string{"consumergroup", "topic", "partition"},
@ -0,0 +1,115 @@
package exporter
import (
type interpolationMap struct {
iMap map[string]map[string]map[int32]map[int64]time.Time
mu sync.Mutex
// Prune removes any entries from the Interpolation map that are not returned by the
// ClusterAdmin. An example would be when a consumer group or topic has been deleted
// from the cluster, the Interpolation map may still have cached offsets. Any partition
// that contains more offset entries than maxNumberOfOffsets will have the oldest
// offsets pruned
func (i *interpolationMap) Prune(logger log.Logger, client sarama.Client, maxOffsets int) {
level.Debug(logger).Log("msg", "pruning iMap data", "maxOffsets", maxOffsets)
if i.iMap == nil {
level.Info(logger).Log("msg", "Interpolation map is nil, nothing to prune")
admin, err := sarama.NewClusterAdminFromClient(client)
if err != nil {
level.Error(logger).Log("msg", "Error creating cluster admin", "err", err.Error())
if admin == nil {
level.Error(logger).Log("msg", "Failed to create cluster admin")
defer admin.Close()
groupsMap, err := admin.ListConsumerGroups()
groupKeys := make([]string, len(groupsMap))
for group, _ := range groupsMap {
groupKeys = append(groupKeys, group)
topicsMap, err := admin.ListTopics()
topicKeys := make([]string, len(topicsMap))
for topic, _ := range topicsMap {
topicKeys = append(topicKeys, topic)
level.Debug(logger).Log("msg", "iMap locked for pruning")
start := time.Now()
for group, _ := range i.iMap {
if !contains(groupKeys, group) {
delete(i.iMap, group)
for topic, partitions := range i.iMap[group] {
if !contains(topicKeys, topic) {
delete(i.iMap[group], topic)
for partition, offsets := range partitions {
if len(offsets) > maxOffsets {
offsetKeys := make([]int64, len(offsets))
for offset, _ := range offsets {
offsetKeys = append(offsetKeys, offset)
sort.Slice(offsetKeys, func(i, j int) bool { return offsetKeys[i] < offsetKeys[j] })
offsetKeys = offsetKeys[0 : len(offsetKeys)-maxOffsets]
level.Debug(logger).Log("msg", "pruning offsets", "count", len(offsetKeys), "group", group, "topic", topic, "partition", partition)
for _, offsetToRemove := range offsetKeys {
delete(i.iMap[group][topic][partition], offsetToRemove)
level.Debug(logger).Log("msg", "pruning complete", "duration", time.Since(start).String())
// Lazily create the interpolation map as we see new group/topic/partition/offset
func (i *interpolationMap) createOrUpdate(group, topic string, partition int32, offset int64) {
if i.iMap == nil {
i.iMap = make(map[string]map[string]map[int32]map[int64]time.Time)
if fetchedGroup, ok := i.iMap[group]; ok {
if fetchedTopic, ok := fetchedGroup[topic]; ok {
if fetchedPartition, ok := fetchedTopic[partition]; ok {
fetchedPartition[offset] = time.Now()
} else {
fetchedTopic[partition] = make(map[int64]time.Time)
} else {
fetchedGroup[topic] = make(map[int32]map[int64]time.Time)
} else {
i.iMap[group] = make(map[string]map[int32]map[int64]time.Time)
func contains(keys []string, v string) bool {
for _, k := range keys {
if k == v {
return true
return false
@ -0,0 +1,36 @@
package exporter
import (
var SHA256 scram.HashGeneratorFcn = func() hash.Hash { return sha256.New() }
var SHA512 scram.HashGeneratorFcn = func() hash.Hash { return sha512.New() }
type XDGSCRAMClient struct {
func (x *XDGSCRAMClient) Begin(userName, password, authzID string) (err error) {
x.Client, err = x.HashGeneratorFcn.NewClient(userName, password, authzID)
if err != nil {
return err
x.ClientConversation = x.Client.NewConversation()
return nil
func (x *XDGSCRAMClient) Step(challenge string) (response string, err error) {
response, err = x.ClientConversation.Step(challenge)
func (x *XDGSCRAMClient) Done() bool {
return x.ClientConversation.Done()
@ -0,0 +1,260 @@
package tpl
import (
klog "github.com/go-kit/log"
const inputName = "kafka"
type Kafka struct {
counter uint64
waitgrp sync.WaitGroup
Instances []*Instance `toml:"instances"`
func init() {
inputs.Add(inputName, func() inputs.Input {
return &Kafka{}
func (r *Kafka) Prefix() string {
return ""
func (r *Kafka) Init() error {
if len(r.Instances) == 0 {
return types.ErrInstancesEmpty
for i := 0; i < len(r.Instances); i++ {
if err := r.Instances[i].Init(); err != nil {
return err
return nil
func (r *Kafka) Drop() {
for _, i := range r.Instances {
if i == nil {
if i.e != nil {
func (r *Kafka) Gather(slist *list.SafeList) {
atomic.AddUint64(&r.counter, 1)
for i := range r.Instances {
ins := r.Instances[i]
go func(slist *list.SafeList, ins *Instance) {
defer r.waitgrp.Done()
if ins.IntervalTimes > 0 {
counter := atomic.LoadUint64(&r.counter)
if counter%uint64(ins.IntervalTimes) != 0 {
}(slist, ins)
type Instance struct {
Labels map[string]string `toml:"labels"`
IntervalTimes int64 `toml:"interval_times"`
LogLevel string `toml:"log_level"`
// Address (host:port) of Kafka server.
KafkaURIs []string `toml:"kafka_uris,omitempty"`
// Connect using SASL/PLAIN
UseSASL bool `toml:"use_sasl,omitempty"`
// Only set this to false if using a non-Kafka SASL proxy
UseSASLHandshake *bool `toml:"use_sasl_handshake,omitempty"`
// SASL user name
SASLUsername string `toml:"sasl_username,omitempty"`
// SASL user password
SASLPassword string `toml:"sasl_password,omitempty"`
// The SASL SCRAM SHA algorithm sha256 or sha512 as mechanism
SASLMechanism string `toml:"sasl_mechanism,omitempty"`
// Connect using TLS
UseTLS bool `toml:"use_tls,omitempty"`
// The optional certificate authority file for TLS client authentication
CAFile string `toml:"ca_file,omitempty"`
// The optional certificate file for TLS client authentication
CertFile string `toml:"cert_file,omitempty"`
// The optional key file for TLS client authentication
KeyFile string `toml:"key_file,omitempty"`
// If true, the server's certificate will not be checked for validity. This will make your HTTPS connections insecure
InsecureSkipVerify bool `toml:"insecure_skip_verify,omitempty"`
// Kafka broker version
KafkaVersion string `toml:"kafka_version,omitempty"`
// if you need to use a group from zookeeper
UseZooKeeperLag bool `toml:"use_zookeeper_lag,omitempty"`
// Address array (hosts) of zookeeper server.
ZookeeperURIs []string `toml:"zookeeper_uris,omitempty"`
// Metadata refresh interval
MetadataRefreshInterval string `toml:"metadata_refresh_interval,omitempty"`
// If true, all scrapes will trigger kafka operations otherwise, they will share results. WARN: This should be disabled on large clusters
AllowConcurrent *bool `toml:"allow_concurrency,omitempty"`
// Maximum number of offsets to store in the interpolation table for a partition
MaxOffsets int `toml:"max_offsets,omitempty"`
// How frequently should the interpolation table be pruned, in seconds
PruneIntervalSeconds int `toml:"prune_interval_seconds,omitempty"`
// Regex filter for topics to be monitored
TopicsFilter string `toml:"topics_filter_regex,omitempty"`
// Regex filter for consumer groups to be monitored
GroupFilter string `toml:"groups_filter_regex,omitempty"`
l klog.Logger `toml:"-"`
e *exporter.Exporter `toml:"-"`
func (ins *Instance) Init() error {
if len(ins.KafkaURIs) == 0 || ins.KafkaURIs[0] == "" {
return fmt.Errorf("kafka_uris must be specified")
if ins.UseTLS && (ins.CertFile == "" || ins.KeyFile == "") {
return fmt.Errorf("tls is enabled but key pair was not provided")
if ins.UseSASL && (ins.SASLPassword == "" || ins.SASLUsername == "") {
return fmt.Errorf("SASL is enabled but username or password was not provided")
if ins.UseZooKeeperLag && (len(ins.ZookeeperURIs) == 0 || ins.ZookeeperURIs[0] == "") {
return fmt.Errorf("zookeeper lag is enabled but no zookeeper uri was provided")
// default options
if ins.UseSASLHandshake == nil {
flag := true
ins.UseSASLHandshake = &flag
if len(ins.KafkaVersion) == 0 {
ins.KafkaVersion = sarama.V2_0_0_0.String()
if len(ins.MetadataRefreshInterval) == 0 {
ins.MetadataRefreshInterval = "1s"
if ins.AllowConcurrent == nil {
flag := true
ins.AllowConcurrent = &flag
if ins.MaxOffsets <= 0 {
ins.MaxOffsets = 1000
if ins.PruneIntervalSeconds <= 0 {
ins.PruneIntervalSeconds = 30
if len(ins.TopicsFilter) == 0 {
ins.TopicsFilter = ".*"
if len(ins.GroupFilter) == 0 {
ins.GroupFilter = ".*"
options := exporter.Options{
Uri: ins.KafkaURIs,
UseSASL: ins.UseSASL,
UseSASLHandshake: *ins.UseSASLHandshake,
SaslUsername: ins.SASLUsername,
SaslPassword: string(ins.SASLPassword),
SaslMechanism: ins.SASLMechanism,
UseTLS: ins.UseTLS,
TlsCAFile: ins.CAFile,
TlsCertFile: ins.CertFile,
TlsKeyFile: ins.KeyFile,
TlsInsecureSkipTLSVerify: ins.InsecureSkipVerify,
KafkaVersion: ins.KafkaVersion,
UseZooKeeperLag: ins.UseZooKeeperLag,
UriZookeeper: ins.ZookeeperURIs,
MetadataRefreshInterval: ins.MetadataRefreshInterval,
AllowConcurrent: *ins.AllowConcurrent,
MaxOffsets: ins.MaxOffsets,
PruneIntervalSeconds: ins.PruneIntervalSeconds,
encLabels := []string{}
for k, v := range ins.Labels {
encLabels = append(encLabels, fmt.Sprintf("%s=%s", k, v))
options.Labels = strings.Join(encLabels, ",")
ins.l = level.NewFilter(klog.NewLogfmtLogger(klog.NewSyncWriter(os.Stderr)), levelFilter(ins.LogLevel))
e, err := exporter.New(ins.l, options, ins.TopicsFilter, ins.GroupFilter)
if err != nil {
return fmt.Errorf("could not instantiate kafka lag exporter: %w", err)
ins.e = e
return nil
func (ins *Instance) gatherOnce(slist *list.SafeList) {
err := inputs.Collect(ins.e, slist)
if err != nil {
log.Println("E! failed to collect metrics:", err)
func levelFilter(l string) level.Option {
switch l {
case "debug":
return level.AllowDebug()
case "info":
return level.AllowInfo()
case "warn":
return level.AllowWarn()
case "error":
return level.AllowError()
return level.AllowAll()
@ -117,7 +117,7 @@ func (s *KernelStats) Gather(slist *list.SafeList) {
inputs.PushSamples(slist, fields)
types.PushSamples(slist, fields)
func (s *KernelStats) getProcStat() ([]byte, error) {
@ -73,7 +73,7 @@ func (s *KernelVmstat) Gather(slist *list.SafeList) {
inputs.PushSamples(slist, fields)
types.PushSamples(slist, fields)
func (s *KernelVmstat) getProcVmstat() ([]byte, error) {
@ -146,11 +146,11 @@ func (ins *Instance) gatherOnce(slist *list.SafeList) {
err := ins.LoadJSON(urlpath, summaryMetrics)
if err != nil {
log.Println("E! failed to load", urlpath, "error:", err)
slist.PushFront(inputs.NewSample("kubelet_up", 0, ins.Labels))
slist.PushFront(types.NewSample("kubelet_up", 0, ins.Labels))
slist.PushFront(inputs.NewSample("kubelet_up", 1, ins.Labels))
slist.PushFront(types.NewSample("kubelet_up", 1, ins.Labels))
podInfos, err := ins.gatherPodInfo(ins.URL)
if err != nil {
@ -207,7 +207,7 @@ func (ins *Instance) buildPodMetrics(summaryMetrics *SummaryMetrics, podInfo []M
fields["pod_container_logsfs_available_bytes"] = container.LogsFS.AvailableBytes
fields["pod_container_logsfs_capacity_bytes"] = container.LogsFS.CapacityBytes
fields["pod_container_logsfs_used_bytes"] = container.LogsFS.UsedBytes
inputs.PushSamples(slist, fields, tags, ins.Labels)
types.PushSamples(slist, fields, tags, ins.Labels)
@ -226,7 +226,7 @@ func (ins *Instance) buildPodMetrics(summaryMetrics *SummaryMetrics, podInfo []M
fields["pod_volume_available_bytes"] = volume.AvailableBytes
fields["pod_volume_capacity_bytes"] = volume.CapacityBytes
fields["pod_volume_used_bytes"] = volume.UsedBytes
inputs.PushSamples(slist, fields, tags, ins.Labels)
types.PushSamples(slist, fields, tags, ins.Labels)
@ -244,7 +244,7 @@ func (ins *Instance) buildPodMetrics(summaryMetrics *SummaryMetrics, podInfo []M
fields["pod_network_rx_errors"] = pod.Network.RXErrors
fields["pod_network_tx_bytes"] = pod.Network.TXBytes
fields["pod_network_tx_errors"] = pod.Network.TXErrors
inputs.PushSamples(slist, fields, tags, ins.Labels)
types.PushSamples(slist, fields, tags, ins.Labels)
@ -269,7 +269,7 @@ func (ins *Instance) buildSystemContainerMetrics(summaryMetrics *SummaryMetrics,
fields["system_container_logsfs_available_bytes"] = container.LogsFS.AvailableBytes
fields["system_container_logsfs_capacity_bytes"] = container.LogsFS.CapacityBytes
inputs.PushSamples(slist, fields, tags, ins.Labels)
types.PushSamples(slist, fields, tags, ins.Labels)
@ -297,7 +297,7 @@ func (ins *Instance) buildNodeMetrics(summaryMetrics *SummaryMetrics, slist *lis
fields["node_runtime_image_fs_capacity_bytes"] = summaryMetrics.Node.Runtime.ImageFileSystem.CapacityBytes
fields["node_runtime_image_fs_used_bytes"] = summaryMetrics.Node.Runtime.ImageFileSystem.UsedBytes
inputs.PushSamples(slist, fields, tags, ins.Labels)
types.PushSamples(slist, fields, tags, ins.Labels)
func (ins *Instance) gatherPodInfo(baseURL string) ([]Metadata, error) {
@ -67,7 +67,7 @@ func (s *SysctlFS) Gather(slist *list.SafeList) {
log.Println("E! failed to gather file-nr:", err)
inputs.PushSamples(slist, fields)
types.PushSamples(slist, fields)
func (s *SysctlFS) gatherOne(name string, fields map[string]interface{}) error {
@ -7,6 +7,7 @@ import (
@ -109,5 +110,5 @@ func (s *MemStats) Gather(slist *list.SafeList) {
inputs.PushSamples(slist, fields)
types.PushSamples(slist, fields)
@ -6,8 +6,8 @@ import (
@ -66,11 +66,11 @@ func (m *MySQL) gatherBinlog(slist *list.SafeList, ins *Instance, db *sql.DB, gl
tags := tagx.Copy(globalTags)
slist.PushFront(inputs.NewSample("binlog_size_bytes", size, tags))
slist.PushFront(inputs.NewSample("binlog_file_count", count, tags))
slist.PushFront(types.NewSample("binlog_size_bytes", size, tags))
slist.PushFront(types.NewSample("binlog_file_count", count, tags))
value, err := strconv.ParseFloat(strings.Split(filename, ".")[1], 64)
if err == nil {
slist.PushFront(inputs.NewSample("binlog_file_number", value, tags))
slist.PushFront(types.NewSample("binlog_file_number", value, tags))
@ -8,9 +8,9 @@ import (
@ -93,10 +93,10 @@ func (m *MySQL) parseRow(row map[string]string, query QueryConfig, slist *list.S
if query.FieldToAppend == "" {
slist.PushFront(inputs.NewSample(query.Mesurement+"_"+column, value, labels))
slist.PushFront(types.NewSample(query.Mesurement+"_"+column, value, labels))
} else {
suffix := cleanName(row[query.FieldToAppend])
slist.PushFront(inputs.NewSample(query.Mesurement+"_"+suffix+"_"+column, value, labels))
slist.PushFront(types.NewSample(query.Mesurement+"_"+suffix+"_"+column, value, labels))
@ -7,8 +7,8 @@ import (
@ -43,19 +43,19 @@ func (m *MySQL) gatherEngineInnodbStatus(slist *list.SafeList, ins *Instance, db
if err != nil {
slist.PushFront(inputs.NewSample("engine_innodb_queries_inside_innodb", value, tags))
slist.PushFront(types.NewSample("engine_innodb_queries_inside_innodb", value, tags))
value, err = strconv.ParseFloat(data[2], 64)
if err != nil {
slist.PushFront(inputs.NewSample("engine_innodb_queries_in_queue", value, tags))
slist.PushFront(types.NewSample("engine_innodb_queries_in_queue", value, tags))
} else if data := rViews.FindStringSubmatch(line); data != nil {
value, err := strconv.ParseFloat(data[1], 64)
if err != nil {
slist.PushFront(inputs.NewSample("engine_innodb_read_views_open_inside_innodb", value, tags))
slist.PushFront(types.NewSample("engine_innodb_read_views_open_inside_innodb", value, tags))
@ -3,8 +3,8 @@ package mysql
import (
@ -22,14 +22,14 @@ func (m *MySQL) gatherEngineInnodbStatusCompute(slist *list.SafeList, ins *Insta
pageUtil = pageUsed / cache["innodb_buffer_pool_pages_total"] * 100
slist.PushFront(inputs.NewSample("global_status_buffer_pool_bytes", byteUsed, tags, map[string]string{"state": "used"}))
slist.PushFront(inputs.NewSample("global_status_buffer_pool_bytes", byteData, tags, map[string]string{"state": "data"}))
slist.PushFront(inputs.NewSample("global_status_buffer_pool_bytes", byteFree, tags, map[string]string{"state": "free"}))
slist.PushFront(inputs.NewSample("global_status_buffer_pool_bytes", byteTotal, tags, map[string]string{"state": "total"}))
slist.PushFront(inputs.NewSample("global_status_buffer_pool_bytes", byteDirty, tags, map[string]string{"state": "dirty"}))
slist.PushFront(inputs.NewSample("global_status_buffer_pool_pages_utilization", pageUtil, tags))
slist.PushFront(types.NewSample("global_status_buffer_pool_bytes", byteUsed, tags, map[string]string{"state": "used"}))
slist.PushFront(types.NewSample("global_status_buffer_pool_bytes", byteData, tags, map[string]string{"state": "data"}))
slist.PushFront(types.NewSample("global_status_buffer_pool_bytes", byteFree, tags, map[string]string{"state": "free"}))
slist.PushFront(types.NewSample("global_status_buffer_pool_bytes", byteTotal, tags, map[string]string{"state": "total"}))
slist.PushFront(types.NewSample("global_status_buffer_pool_bytes", byteDirty, tags, map[string]string{"state": "dirty"}))
slist.PushFront(types.NewSample("global_status_buffer_pool_pages_utilization", pageUtil, tags))
if ins.ExtraInnodbMetrics {
slist.PushFront(inputs.NewSample("global_status_buffer_pool_pages", pageUsed, tags, map[string]string{"state": "used"}))
slist.PushFront(types.NewSample("global_status_buffer_pool_pages", pageUsed, tags, map[string]string{"state": "used"}))
@ -8,8 +8,8 @@ import (
@ -62,42 +62,42 @@ func (m *MySQL) gatherGlobalStatus(slist *list.SafeList, ins *Instance, db *sql.
match := globalStatusRE.FindStringSubmatch(key)
if match == nil {
slist.PushFront(inputs.NewSample("global_status_"+key, floatVal, tags))
slist.PushFront(types.NewSample("global_status_"+key, floatVal, tags))
switch match[1] {
case "com":
// Total number of executed MySQL commands.
slist.PushFront(inputs.NewSample("global_status_commands_total", floatVal, tags, map[string]string{"command": match[2]}))
slist.PushFront(types.NewSample("global_status_commands_total", floatVal, tags, map[string]string{"command": match[2]}))
case "handler":
// Total number of executed MySQL handlers.
slist.PushFront(inputs.NewSample("global_status_handlers_total", floatVal, tags, map[string]string{"handler": match[2]}))
slist.PushFront(types.NewSample("global_status_handlers_total", floatVal, tags, map[string]string{"handler": match[2]}))
case "connection_errors":
// Total number of MySQL connection errors.
slist.PushFront(inputs.NewSample("global_status_connection_errors_total", floatVal, tags, map[string]string{"error": match[2]}))
slist.PushFront(types.NewSample("global_status_connection_errors_total", floatVal, tags, map[string]string{"error": match[2]}))
case "innodb_buffer_pool_pages":
switch match[2] {
case "data", "free", "misc", "old", "total", "dirty":
// Innodb buffer pool pages by state.
slist.PushFront(inputs.NewSample("global_status_buffer_pool_pages", floatVal, tags, map[string]string{"state": match[2]}))
slist.PushFront(types.NewSample("global_status_buffer_pool_pages", floatVal, tags, map[string]string{"state": match[2]}))
// Innodb buffer pool page state changes.
slist.PushFront(inputs.NewSample("global_status_buffer_pool_page_changes_total", floatVal, tags, map[string]string{"operation": match[2]}))
slist.PushFront(types.NewSample("global_status_buffer_pool_page_changes_total", floatVal, tags, map[string]string{"operation": match[2]}))
case "innodb_rows":
// Total number of MySQL InnoDB row operations.
slist.PushFront(inputs.NewSample("global_status_innodb_row_ops_total", floatVal, tags, map[string]string{"operation": match[2]}))
slist.PushFront(types.NewSample("global_status_innodb_row_ops_total", floatVal, tags, map[string]string{"operation": match[2]}))
case "performance_schema":
// Total number of MySQL instrumentations that could not be loaded or created due to memory constraints.
slist.PushFront(inputs.NewSample("global_status_performance_schema_lost_total", floatVal, tags, map[string]string{"instrumentation": match[2]}))
slist.PushFront(types.NewSample("global_status_performance_schema_lost_total", floatVal, tags, map[string]string{"instrumentation": match[2]}))
// mysql_galera_variables_info metric.
if textItems["wsrep_local_state_uuid"] != "" {
slist.PushFront(inputs.NewSample("galera_status_info", 1, tags, map[string]string{
slist.PushFront(types.NewSample("galera_status_info", 1, tags, map[string]string{
"wsrep_local_state_uuid": textItems["wsrep_local_state_uuid"],
"wsrep_cluster_state_uuid": textItems["wsrep_cluster_state_uuid"],
"wsrep_provider_version": textItems["wsrep_provider_version"],
@ -134,7 +134,7 @@ func (m *MySQL) gatherGlobalStatus(slist *list.SafeList, ins *Instance, db *sql.
if evsParsingSuccess {
for _, v := range evsMap {
slist.PushFront(inputs.NewSample("galera_evs_repl_latency_"+v.name, v.value, tags))
slist.PushFront(types.NewSample("galera_evs_repl_latency_"+v.name, v.value, tags))
@ -7,8 +7,8 @@ import (
@ -59,12 +59,12 @@ func (m *MySQL) gatherGlobalVariables(slist *list.SafeList, ins *Instance, db *s
slist.PushFront(inputs.NewSample("global_variables_"+key, floatVal, tags))
slist.PushFront(types.NewSample("global_variables_"+key, floatVal, tags))
slist.PushFront(inputs.NewSample("version_info", 1, tags, map[string]string{
slist.PushFront(types.NewSample("version_info", 1, tags, map[string]string{
"version": textItems["version"],
"innodb_version": textItems["innodb_version"],
"version_comment": textItems["version_comment"],
@ -73,14 +73,14 @@ func (m *MySQL) gatherGlobalVariables(slist *list.SafeList, ins *Instance, db *s
// mysql_galera_variables_info metric.
// PXC/Galera variables information.
if textItems["wsrep_cluster_name"] != "" {
slist.PushFront(inputs.NewSample("galera_variables_info", 1, tags, map[string]string{
slist.PushFront(types.NewSample("galera_variables_info", 1, tags, map[string]string{
"wsrep_cluster_name": textItems["wsrep_cluster_name"],
// mysql_galera_gcache_size_bytes metric.
if textItems["wsrep_provider_options"] != "" {
slist.PushFront(inputs.NewSample("galera_gcache_size_bytes", parseWsrepProviderOptions(textItems["wsrep_provider_options"]), tags))
slist.PushFront(types.NewSample("galera_gcache_size_bytes", parseWsrepProviderOptions(textItems["wsrep_provider_options"]), tags))
if textItems["transaction_isolation"] != "" || textItems["tx_isolation"] != "" {
@ -89,7 +89,7 @@ func (m *MySQL) gatherGlobalVariables(slist *list.SafeList, ins *Instance, db *s
level = textItems["tx_isolation"]
slist.PushFront(inputs.NewSample("transaction_isolation", 1, tags, map[string]string{"level": level}))
slist.PushFront(types.NewSample("transaction_isolation", 1, tags, map[string]string{"level": level}))
@ -215,12 +215,12 @@ func (m *MySQL) gatherOnce(slist *list.SafeList, ins *Instance) {
// scrape use seconds
defer func(begun time.Time) {
use := time.Since(begun).Seconds()
slist.PushFront(inputs.NewSample("scrape_use_seconds", use, tags))
slist.PushFront(types.NewSample("scrape_use_seconds", use, tags))
db, err := sql.Open("mysql", ins.dsn)
if err != nil {
slist.PushFront(inputs.NewSample("up", 0, tags))
slist.PushFront(types.NewSample("up", 0, tags))
log.Println("E! failed to open mysql:", err)
@ -232,12 +232,12 @@ func (m *MySQL) gatherOnce(slist *list.SafeList, ins *Instance) {
if err = db.Ping(); err != nil {
slist.PushFront(inputs.NewSample("up", 0, tags))
slist.PushFront(types.NewSample("up", 0, tags))
log.Println("E! failed to ping mysql:", err)
slist.PushFront(inputs.NewSample("up", 1, tags))
slist.PushFront(types.NewSample("up", 1, tags))
cache := make(map[string]float64)
@ -5,8 +5,8 @@ import (
@ -131,7 +131,7 @@ func (m *MySQL) gatherProcesslistByState(slist *list.SafeList, ins *Instance, db
for s, c := range stateCounts {
slist.PushFront(inputs.NewSample("processlist_processes_by_state", c, labels, map[string]string{"state": s}))
slist.PushFront(types.NewSample("processlist_processes_by_state", c, labels, map[string]string{"state": s}))
@ -4,8 +4,8 @@ import (
@ -34,6 +34,6 @@ func (m *MySQL) gatherProcesslistByUser(slist *list.SafeList, ins *Instance, db
slist.PushFront(inputs.NewSample("processlist_processes_by_user", connections, labels, map[string]string{"user": user}))
slist.PushFront(types.NewSample("processlist_processes_by_user", connections, labels, map[string]string{"user": user}))
@ -4,8 +4,8 @@ import (
@ -34,6 +34,6 @@ func (m *MySQL) gatherSchemaSize(slist *list.SafeList, ins *Instance, db *sql.DB
slist.PushFront(inputs.NewSample("schema_size_bytes", size, labels, map[string]string{"schema": schema}))
slist.PushFront(types.NewSample("schema_size_bytes", size, labels, map[string]string{"schema": schema}))
@ -6,7 +6,7 @@ import (
@ -88,7 +88,7 @@ func (m *MySQL) gatherSlaveStatus(slist *list.SafeList, ins *Instance, db *sql.D
if value, ok := parseStatus(*scanArgs[i].(*sql.RawBytes)); ok {
slist.PushFront(inputs.NewSample("slave_status_"+key, value, globalTags, map[string]string{
slist.PushFront(types.NewSample("slave_status_"+key, value, globalTags, map[string]string{
"master_host": masterHost,
"master_uuid": masterUUID,
"channel_name": channelName,
@ -4,8 +4,8 @@ import (
@ -42,7 +42,7 @@ func (m *MySQL) gatherTableSize(slist *list.SafeList, ins *Instance, db *sql.DB,
slist.PushFront(inputs.NewSample("table_size_index_bytes", indexSize, labels, map[string]string{"schema": schema, "table": table}))
slist.PushFront(inputs.NewSample("table_size_data_bytes", dataSize, labels, map[string]string{"schema": schema, "table": table}))
slist.PushFront(types.NewSample("table_size_index_bytes", indexSize, labels, map[string]string{"schema": schema, "table": table}))
slist.PushFront(types.NewSample("table_size_data_bytes", dataSize, labels, map[string]string{"schema": schema, "table": table}))
@ -9,6 +9,7 @@ import (
@ -113,6 +114,6 @@ func (s *NetIOStats) Gather(slist *list.SafeList) {
"drop_out": io.Dropout,
inputs.PushSamples(slist, fields, tags)
types.PushSamples(slist, fields, tags)
@ -168,7 +168,7 @@ func (ins *Instance) gather(slist *list.SafeList, target string) {
defer func() {
for field, value := range fields {
slist.PushFront(inputs.NewSample(field, value, labels))
slist.PushFront(types.NewSample(field, value, labels))
@ -7,6 +7,7 @@ import (
@ -76,5 +77,5 @@ func (s *NetStats) Gather(slist *list.SafeList) {
"udp_socket": counts["UDP"],
inputs.PushSamples(slist, fields, tags)
types.PushSamples(slist, fields, tags)
@ -247,7 +247,7 @@ func (ins *Instance) gather(slist *list.SafeList, target string) {
"fall": server.Fall,
inputs.PushSamples(slist, fields, tags, labels)
types.PushSamples(slist, fields, tags, labels)
@ -58,7 +58,7 @@ func (n *NTPStat) Gather(slist *list.SafeList) {
duration := ((serverReciveTime.UnixNano() - orgTime.UnixNano()) + (serverTransmitTime.UnixNano() - dstTime.UnixNano())) / 2
delta := duration / 1e6 // convert to ms
slist.PushFront(inputs.NewSample("offset_ms", delta))
slist.PushFront(types.NewSample("offset_ms", delta))
@ -9,6 +9,7 @@ import (
@ -63,16 +64,16 @@ func (s *GPUStats) Gather(slist *list.SafeList) {
// scrape use seconds
defer func(begun time.Time) {
use := time.Since(begun).Seconds()
slist.PushFront(inputs.NewSample("scrape_use_seconds", use))
slist.PushFront(types.NewSample("scrape_use_seconds", use))
currentTable, err := scrape(s.qFields, s.NvidiaSmiCommand)
if err != nil {
slist.PushFront(inputs.NewSample("scraper_up", 0))
slist.PushFront(types.NewSample("scraper_up", 0))
slist.PushFront(inputs.NewSample("scraper_up", 1))
slist.PushFront(types.NewSample("scraper_up", 1))
for _, currentRow := range currentTable.rows {
uuid := strings.TrimPrefix(strings.ToLower(currentRow.qFieldToCells[uuidQField].rawValue), "gpu-")
@ -82,7 +83,7 @@ func (s *GPUStats) Gather(slist *list.SafeList) {
vBiosVersion := currentRow.qFieldToCells[vBiosVersionQField].rawValue
driverVersion := currentRow.qFieldToCells[driverVersionQField].rawValue
slist.PushFront(inputs.NewSample("gpu_info", 1, map[string]string{
slist.PushFront(types.NewSample("gpu_info", 1, map[string]string{
"uuid": uuid,
"name": name,
"driver_model_current": driverModelCurrent,
@ -102,7 +103,7 @@ func (s *GPUStats) Gather(slist *list.SafeList) {
slist.PushFront(inputs.NewSample(metricInfo.metricName, num, map[string]string{"uuid": uuid}))
slist.PushFront(types.NewSample(metricInfo.metricName, num, map[string]string{"uuid": uuid}))
@ -35,10 +35,10 @@ func (o *Oracle) parseRow(row map[string]string, metricConf MetricConfig, slist
if metricConf.FieldToAppend == "" {
slist.PushFront(inputs.NewSample(metricConf.Mesurement+"_"+column, value, labels))
slist.PushFront(types.NewSample(metricConf.Mesurement+"_"+column, value, labels))
} else {
suffix := cleanName(row[metricConf.FieldToAppend])
slist.PushFront(inputs.NewSample(metricConf.Mesurement+"_"+suffix+"_"+column, value, labels))
slist.PushFront(types.NewSample(metricConf.Mesurement+"_"+suffix+"_"+column, value, labels))
@ -128,16 +128,16 @@ func (o *Oracle) gatherOnce(slist *list.SafeList, ins Instance) {
defer func(begun time.Time) {
use := time.Since(begun).Seconds()
slist.PushFront(inputs.NewSample("scrape_use_seconds", use, tags))
slist.PushFront(types.NewSample("scrape_use_seconds", use, tags))
db := o.dbconnpool[ins.Address]
if err := db.Ping(); err != nil {
slist.PushFront(inputs.NewSample("up", 0, tags))
slist.PushFront(types.NewSample("up", 0, tags))
log.Println("E! failed to ping oracle:", ins.Address, "error:", err)
} else {
slist.PushFront(inputs.NewSample("up", 1, tags))
slist.PushFront(types.NewSample("up", 1, tags))
waitMetrics := new(sync.WaitGroup)
@ -244,10 +244,10 @@ func (ins *Instance) parseRow(row map[string]string, metricConf MetricConfig, sl
if metricConf.FieldToAppend == "" {
slist.PushFront(inputs.NewSample(metricConf.Mesurement+"_"+column, value, labels))
slist.PushFront(types.NewSample(metricConf.Mesurement+"_"+column, value, labels))
} else {
suffix := cleanName(row[metricConf.FieldToAppend])
slist.PushFront(inputs.NewSample(metricConf.Mesurement+"_"+suffix+"_"+column, value, labels))
slist.PushFront(types.NewSample(metricConf.Mesurement+"_"+suffix+"_"+column, value, labels))
@ -160,7 +160,7 @@ func (ins *Instance) gather(slist *list.SafeList, target string) {
defer func() {
for field, value := range fields {
slist.PushFront(inputs.NewSample(field, value, labels))
slist.PushFront(types.NewSample(field, value, labels))
@ -17,6 +17,7 @@ import (
@ -70,7 +71,7 @@ func (p *Processes) Gather(slist *list.SafeList) {
inputs.PushSamples(slist, fields)
types.PushSamples(slist, fields)
// Gets empty fields of metrics based on the OS
@ -35,16 +35,16 @@ func (ins *Instance) gatherCPU(slist *list.SafeList, procs map[PID]Process, tags
if err == nil {
if solarisMode {
value += v / float64(runtime.NumCPU())
slist.PushFront(inputs.NewSample("cpu_usage", v/float64(runtime.NumCPU()), map[string]string{"pid": fmt.Sprint(pid)}, tags))
slist.PushFront(types.NewSample("cpu_usage", v/float64(runtime.NumCPU()), map[string]string{"pid": fmt.Sprint(pid)}, tags))
} else {
value += v
slist.PushFront(inputs.NewSample("cpu_usage", v, map[string]string{"pid": fmt.Sprint(pid)}, tags))
slist.PushFront(types.NewSample("cpu_usage", v, map[string]string{"pid": fmt.Sprint(pid)}, tags))
if ins.GatherTotal {
slist.PushFront(inputs.NewSample("cpu_usage_total", value, tags))
slist.PushFront(types.NewSample("cpu_usage_total", value, tags))
@ -139,11 +139,11 @@ func (s *Procstat) gatherOnce(slist *list.SafeList, ins *Instance) {
if err != nil {
log.Println("E! procstat: failed to lookup pids, search string:", ins.searchString, "error:", err)
slist.PushFront(inputs.NewSample("lookup_count", 0, tags))
slist.PushFront(types.NewSample("lookup_count", 0, tags))
slist.PushFront(inputs.NewSample("lookup_count", len(pids), tags))
slist.PushFront(types.NewSample("lookup_count", len(pids), tags))
if len(pids) == 0 {
@ -210,13 +210,13 @@ func (ins *Instance) gatherThreads(slist *list.SafeList, procs map[PID]Process,
if err == nil {
val += v
if ins.GatherPerPid {
slist.PushFront(inputs.NewSample("num_threads", val, map[string]string{"pid": fmt.Sprint(pid)}, tags))
slist.PushFront(types.NewSample("num_threads", val, map[string]string{"pid": fmt.Sprint(pid)}, tags))
if ins.GatherTotal {
slist.PushFront(inputs.NewSample("num_threads_total", val, tags))
slist.PushFront(types.NewSample("num_threads_total", val, tags))
@ -227,13 +227,13 @@ func (ins *Instance) gatherFD(slist *list.SafeList, procs map[PID]Process, tags
if err == nil {
val += v
if ins.GatherPerPid {
slist.PushFront(inputs.NewSample("num_fds", val, map[string]string{"pid": fmt.Sprint(pid)}, tags))
slist.PushFront(types.NewSample("num_fds", val, map[string]string{"pid": fmt.Sprint(pid)}, tags))
if ins.GatherTotal {
slist.PushFront(inputs.NewSample("num_fds_total", val, tags))
slist.PushFront(types.NewSample("num_fds_total", val, tags))
@ -253,19 +253,19 @@ func (ins *Instance) gatherIO(slist *list.SafeList, procs map[PID]Process, tags
readBytes += io.ReadBytes
writeBytes += io.WriteBytes
if ins.GatherPerPid {
slist.PushFront(inputs.NewSample("read_count", readCount, map[string]string{"pid": fmt.Sprint(pid)}, tags))
slist.PushFront(inputs.NewSample("write_count", writeCount, map[string]string{"pid": fmt.Sprint(pid)}, tags))
slist.PushFront(inputs.NewSample("read_bytes", readBytes, map[string]string{"pid": fmt.Sprint(pid)}, tags))
slist.PushFront(inputs.NewSample("write_bytes", writeBytes, map[string]string{"pid": fmt.Sprint(pid)}, tags))
slist.PushFront(types.NewSample("read_count", readCount, map[string]string{"pid": fmt.Sprint(pid)}, tags))
slist.PushFront(types.NewSample("write_count", writeCount, map[string]string{"pid": fmt.Sprint(pid)}, tags))
slist.PushFront(types.NewSample("read_bytes", readBytes, map[string]string{"pid": fmt.Sprint(pid)}, tags))
slist.PushFront(types.NewSample("write_bytes", writeBytes, map[string]string{"pid": fmt.Sprint(pid)}, tags))
if ins.GatherTotal {
slist.PushFront(inputs.NewSample("read_count_total", readCount, tags))
slist.PushFront(inputs.NewSample("write_count_total", writeCount, tags))
slist.PushFront(inputs.NewSample("read_bytes_total", readBytes, tags))
slist.PushFront(inputs.NewSample("write_bytes_total", writeBytes, tags))
slist.PushFront(types.NewSample("read_count_total", readCount, tags))
slist.PushFront(types.NewSample("write_count_total", writeCount, tags))
slist.PushFront(types.NewSample("read_bytes_total", readBytes, tags))
slist.PushFront(types.NewSample("write_bytes_total", writeBytes, tags))
@ -276,7 +276,7 @@ func (ins *Instance) gatherUptime(slist *list.SafeList, procs map[PID]Process, t
v, err := procs[pid].CreateTime() // returns epoch in ms
if err == nil {
if ins.GatherPerPid {
slist.PushFront(inputs.NewSample("uptime", value, map[string]string{"pid": fmt.Sprint(pid)}, tags))
slist.PushFront(types.NewSample("uptime", value, map[string]string{"pid": fmt.Sprint(pid)}, tags))
if value == -1 {
value = v
@ -290,7 +290,7 @@ func (ins *Instance) gatherUptime(slist *list.SafeList, procs map[PID]Process, t
if ins.GatherTotal {
slist.PushFront(inputs.NewSample("uptime_minimum", value, tags))
slist.PushFront(types.NewSample("uptime_minimum", value, tags))
@ -301,16 +301,16 @@ func (ins *Instance) gatherCPU(slist *list.SafeList, procs map[PID]Process, tags
if err == nil {
if solarisMode {
value += v / float64(runtime.NumCPU())
slist.PushFront(inputs.NewSample("cpu_usage", v/float64(runtime.NumCPU()), map[string]string{"pid": fmt.Sprint(pid)}, tags))
slist.PushFront(types.NewSample("cpu_usage", v/float64(runtime.NumCPU()), map[string]string{"pid": fmt.Sprint(pid)}, tags))
} else {
value += v
slist.PushFront(inputs.NewSample("cpu_usage", v, map[string]string{"pid": fmt.Sprint(pid)}, tags))
slist.PushFront(types.NewSample("cpu_usage", v, map[string]string{"pid": fmt.Sprint(pid)}, tags))
if ins.GatherTotal {
slist.PushFront(inputs.NewSample("cpu_usage_total", value, tags))
slist.PushFront(types.NewSample("cpu_usage_total", value, tags))
@ -321,13 +321,13 @@ func (ins *Instance) gatherMem(slist *list.SafeList, procs map[PID]Process, tags
if err == nil {
value += v
if ins.GatherPerPid {
slist.PushFront(inputs.NewSample("mem_usage", v, map[string]string{"pid": fmt.Sprint(pid)}, tags))
slist.PushFront(types.NewSample("mem_usage", v, map[string]string{"pid": fmt.Sprint(pid)}, tags))
if ins.GatherTotal {
slist.PushFront(inputs.NewSample("mem_usage_total", value, tags))
slist.PushFront(types.NewSample("mem_usage_total", value, tags))
@ -339,8 +339,8 @@ func (ins *Instance) gatherLimit(slist *list.SafeList, procs map[PID]Process, ta
for _, rlim := range rlims {
if rlim.Resource == process.RLIMIT_NOFILE {
if ins.GatherPerPid {
slist.PushFront(inputs.NewSample("rlimit_num_fds_soft", rlim.Soft, map[string]string{"pid": fmt.Sprint(pid)}, tags))
slist.PushFront(inputs.NewSample("rlimit_num_fds_hard", rlim.Hard, map[string]string{"pid": fmt.Sprint(pid)}, tags))
slist.PushFront(types.NewSample("rlimit_num_fds_soft", rlim.Soft, map[string]string{"pid": fmt.Sprint(pid)}, tags))
slist.PushFront(types.NewSample("rlimit_num_fds_hard", rlim.Hard, map[string]string{"pid": fmt.Sprint(pid)}, tags))
if softMin == 0 {
@ -362,8 +362,8 @@ func (ins *Instance) gatherLimit(slist *list.SafeList, procs map[PID]Process, ta
if ins.GatherTotal {
slist.PushFront(inputs.NewSample("rlimit_num_fds_soft_minimum", softMin, tags))
slist.PushFront(inputs.NewSample("rlimit_num_fds_hard_minimum", hardMin, tags))
slist.PushFront(types.NewSample("rlimit_num_fds_soft_minimum", softMin, tags))
slist.PushFront(types.NewSample("rlimit_num_fds_hard_minimum", hardMin, tags))
@ -237,13 +237,13 @@ func (p *Prometheus) gatherUrl(urlwg *sync.WaitGroup, slist *list.SafeList, ins
res, err := ins.client.Do(req)
if err != nil {
slist.PushFront(inputs.NewSample("up", 0, labels))
slist.PushFront(types.NewSample("up", 0, labels))
log.Println("E! failed to query url:", u.String(), "error:", err)
if res.StatusCode != http.StatusOK {
slist.PushFront(inputs.NewSample("up", 0, labels))
slist.PushFront(types.NewSample("up", 0, labels))
log.Println("E! failed to query url:", u.String(), "status code:", res.StatusCode)
@ -252,12 +252,12 @@ func (p *Prometheus) gatherUrl(urlwg *sync.WaitGroup, slist *list.SafeList, ins
body, err := io.ReadAll(res.Body)
if err != nil {
slist.PushFront(inputs.NewSample("up", 0, labels))
slist.PushFront(types.NewSample("up", 0, labels))
log.Println("E! failed to read response body, error:", err)
slist.PushFront(inputs.NewSample("up", 1, labels))
slist.PushFront(types.NewSample("up", 1, labels))
parser := prometheus.NewParser(ins.NamePrefix, labels, res.Header, ins.ignoreMetricsFilter, ins.ignoreLabelKeysFilter)
if err = parser.Parse(body, slist); err != nil {
@ -390,7 +390,7 @@ func (ins *Instance) gatherOnce(slist *list.SafeList) {
// scrape use seconds
defer func(begun time.Time) {
use := time.Since(begun).Seconds()
slist.PushFront(inputs.NewSample("scrape_use_seconds", use, tags, ins.Labels))
slist.PushFront(types.NewSample("scrape_use_seconds", use, tags, ins.Labels))
var wg sync.WaitGroup
@ -509,7 +509,7 @@ func gatherOverview(ins *Instance, slist *list.SafeList) {
"overview_return_unroutable_rate": overview.MessageStats.ReturnUnroutableDetails.Rate,
inputs.PushSamples(slist, fields, tags)
types.PushSamples(slist, fields, tags)
func gatherExchanges(ins *Instance, slist *list.SafeList) {
@ -546,7 +546,7 @@ func gatherExchanges(ins *Instance, slist *list.SafeList) {
"exchange_messages_publish_out_rate": exchange.MessageStats.PublishOutDetails.Rate,
inputs.PushSamples(slist, fields, tags)
types.PushSamples(slist, fields, tags)
@ -604,7 +604,7 @@ func gatherFederationLinks(ins *Instance, slist *list.SafeList) {
"federation_messages_return_unroutable": link.LocalChannel.MessageStats.ReturnUnroutable,
inputs.PushSamples(slist, fields, tags, ins.Labels)
types.PushSamples(slist, fields, tags, ins.Labels)
@ -734,7 +734,7 @@ func gatherNodes(ins *Instance, slist *list.SafeList) {
inputs.PushSamples(slist, fields, tags, ins.Labels)
types.PushSamples(slist, fields, tags, ins.Labels)
@ -818,6 +818,6 @@ func gatherQueues(ins *Instance, slist *list.SafeList) {
"queue_messages_redeliver_rate": queue.MessageStats.RedeliverDetails.Rate,
inputs.PushSamples(slist, fields, tags, ins.Labels)
types.PushSamples(slist, fields, tags, ins.Labels)
@ -133,18 +133,18 @@ func (r *Redis) gatherOnce(slist *list.SafeList, ins *Instance) {
// scrape use seconds
defer func(begun time.Time) {
use := time.Since(begun).Seconds()
slist.PushFront(inputs.NewSample("scrape_use_seconds", use, tags))
slist.PushFront(types.NewSample("scrape_use_seconds", use, tags))
// ping
err := ins.client.Ping(context.Background()).Err()
slist.PushFront(inputs.NewSample("ping_use_seconds", time.Since(begun).Seconds(), tags))
slist.PushFront(types.NewSample("ping_use_seconds", time.Since(begun).Seconds(), tags))
if err != nil {
slist.PushFront(inputs.NewSample("up", 0, tags))
slist.PushFront(types.NewSample("up", 0, tags))
log.Println("E! failed to ping redis:", ins.Address, "error:", err)
} else {
slist.PushFront(inputs.NewSample("up", 1, tags))
slist.PushFront(types.NewSample("up", 1, tags))
r.gatherInfoAll(slist, ins, tags)
@ -170,7 +170,7 @@ func (r *Redis) gatherCommandValues(slist *list.SafeList, ins *Instance, tags ma
for k, v := range fields {
slist.PushFront(inputs.NewSample("exec_result_"+k, v, tags))
slist.PushFront(types.NewSample("exec_result_"+k, v, tags))
@ -293,7 +293,7 @@ func (r *Redis) gatherInfoAll(slist *list.SafeList, ins *Instance, tags map[stri
fields["keyspace_hitrate"] = keyspaceHitrate
for k, v := range fields {
slist.PushFront(inputs.NewSample(k, v, tags))
slist.PushFront(types.NewSample(k, v, tags))
@ -324,7 +324,7 @@ func gatherKeyspaceLine(
for k, v := range fields {
slist.PushFront(inputs.NewSample("keyspace_"+k, v, tags))
slist.PushFront(types.NewSample("keyspace_"+k, v, tags))
@ -373,7 +373,7 @@ func gatherCommandstateLine(
for k, v := range fields {
slist.PushFront(inputs.NewSample("cmdstat_"+k, v, tags))
slist.PushFront(types.NewSample("cmdstat_"+k, v, tags))
@ -419,6 +419,6 @@ func gatherReplicationLine(
for k, v := range fields {
slist.PushFront(inputs.NewSample("replication_"+k, v, tags))
slist.PushFront(types.NewSample("replication_"+k, v, tags))
@ -11,6 +11,7 @@ import (
@ -201,7 +202,7 @@ func (ins *Instance) custstat(wg *sync.WaitGroup, ip string, slist *list.SafeLis
defer func() {
if r := recover(); r != nil {
log.Println("E! recovered in custstat, ip:", ip, "oid:", cust.OID, "error:", r)
log.Println("E! recovered in custstat, ip:", ip, "oid:", cust.OID, "error:", r, "stack:", runtimex.Stack(3))
@ -213,7 +214,7 @@ func (ins *Instance) custstat(wg *sync.WaitGroup, ip string, slist *list.SafeLis
if len(snmpPDUs) > 0 && err == nil {
value, err = conv.ToFloat64(snmpPDUs[0].Value)
if err == nil {
slist.PushFront(inputs.NewSample(cust.Metric, value, cust.Tags, ins.Labels))
slist.PushFront(types.NewSample(cust.Metric, value, cust.Tags, ins.Labels))
} else {
log.Println("E! failed to convert to float64, ip:", ip, "oid:", cust.OID, "value:", snmpPDUs[0].Value)
@ -243,7 +244,7 @@ func (ins *Instance) gatherMemMetrics(ips []string, slist *list.SafeList) {
if utilPercent == -1 {
slist.PushFront(inputs.NewSample("mem_util", utilPercent, map[string]string{ins.parent.SwitchIdLabel: ip}, ins.Labels))
slist.PushFront(types.NewSample("mem_util", utilPercent, map[string]string{ins.parent.SwitchIdLabel: ip}, ins.Labels))
@ -282,7 +283,7 @@ func (ins *Instance) gatherCpuMetrics(ips []string, slist *list.SafeList) {
if utilPercent == -1 {
slist.PushFront(inputs.NewSample("cpu_util", utilPercent, map[string]string{ins.parent.SwitchIdLabel: ip}, ins.Labels))
slist.PushFront(types.NewSample("cpu_util", utilPercent, map[string]string{ins.parent.SwitchIdLabel: ip}, ins.Labels))
@ -354,10 +355,10 @@ func (ins *Instance) gatherFlowMetrics(ips []string, slist *list.SafeList) {
if ins.GatherOperStatus {
slist.PushFront(inputs.NewSample("if_oper_status", ifStat.IfOperStatus, tags))
slist.PushFront(types.NewSample("if_oper_status", ifStat.IfOperStatus, tags))
slist.PushFront(inputs.NewSample("if_speed", ifStat.IfSpeed, tags))
slist.PushFront(types.NewSample("if_speed", ifStat.IfSpeed, tags))
if lastIfStatList := ins.lastifmap.Get(ip); lastIfStatList != nil {
for _, lastifStat := range lastIfStatList {
@ -373,18 +374,18 @@ func (ins *Instance) gatherFlowMetrics(ips []string, slist *list.SafeList) {
IfHCOutOctets := 8 * (float64(ifStat.IfHCOutOctets) - float64(lastifStat.IfHCOutOctets)) / float64(interval)
if limitCheck(IfHCInOctets, speedlimit) {
slist.PushFront(inputs.NewSample("if_in", IfHCInOctets, tags))
slist.PushFront(types.NewSample("if_in", IfHCInOctets, tags))
if ifStat.IfSpeed > 0 {
slist.PushFront(inputs.NewSample("if_in_speed_percent", 100*IfHCInOctets/float64(ifStat.IfSpeed), tags))
slist.PushFront(types.NewSample("if_in_speed_percent", 100*IfHCInOctets/float64(ifStat.IfSpeed), tags))
} else {
log.Println("W! if_in out of range, current:", ifStat.IfHCInOctets, "lasttime:", lastifStat.IfHCInOctets, "tags:", tags)
if limitCheck(IfHCOutOctets, speedlimit) {
slist.PushFront(inputs.NewSample("if_out", IfHCOutOctets, tags))
slist.PushFront(types.NewSample("if_out", IfHCOutOctets, tags))
if ifStat.IfSpeed > 0 {
slist.PushFront(inputs.NewSample("if_out_speed_percent", 100*IfHCOutOctets/float64(ifStat.IfSpeed), tags))
slist.PushFront(types.NewSample("if_out_speed_percent", 100*IfHCOutOctets/float64(ifStat.IfSpeed), tags))
} else {
log.Println("W! if_out out of range, current:", ifStat.IfHCOutOctets, "lasttime:", lastifStat.IfHCOutOctets, "tags:", tags)
@ -403,13 +404,13 @@ func (ins *Instance) gatherFlowMetrics(ips []string, slist *list.SafeList) {
IfHCOutBroadcastPkts := (float64(ifStat.IfHCOutBroadcastPkts) - float64(lastifStat.IfHCOutBroadcastPkts)) / float64(interval)
if limitCheck(IfHCInBroadcastPkts, ins.BroadcastPktLimit) {
slist.PushFront(inputs.NewSample("if_in_broadcast_pkt", IfHCInBroadcastPkts, tags))
slist.PushFront(types.NewSample("if_in_broadcast_pkt", IfHCInBroadcastPkts, tags))
} else {
log.Println("W! if_in_broadcast_pkt out of range, current:", ifStat.IfHCInBroadcastPkts, "lasttime:", lastifStat.IfHCInBroadcastPkts, "tags:", tags)
if limitCheck(IfHCOutBroadcastPkts, ins.BroadcastPktLimit) {
slist.PushFront(inputs.NewSample("if_out_broadcast_pkt", IfHCOutBroadcastPkts, tags))
slist.PushFront(types.NewSample("if_out_broadcast_pkt", IfHCOutBroadcastPkts, tags))
} else {
log.Println("W! if_out_broadcast_pkt out of range, current:", ifStat.IfHCOutBroadcastPkts, "lasttime:", lastifStat.IfHCOutBroadcastPkts, "tags:", tags)
@ -428,13 +429,13 @@ func (ins *Instance) gatherFlowMetrics(ips []string, slist *list.SafeList) {
IfHCOutMulticastPkts := (float64(ifStat.IfHCOutMulticastPkts) - float64(lastifStat.IfHCOutMulticastPkts)) / float64(interval)
if limitCheck(IfHCInMulticastPkts, ins.MulticastPktLimit) {
slist.PushFront(inputs.NewSample("if_in_multicast_pkt", IfHCInMulticastPkts, tags))
slist.PushFront(types.NewSample("if_in_multicast_pkt", IfHCInMulticastPkts, tags))
} else {
log.Println("W! if_in_multicast_pkt out of range, current:", ifStat.IfHCInMulticastPkts, "lasttime:", lastifStat.IfHCInMulticastPkts, "tags:", tags)
if limitCheck(IfHCOutMulticastPkts, ins.MulticastPktLimit) {
slist.PushFront(inputs.NewSample("if_out_multicast_pkt", IfHCOutMulticastPkts, tags))
slist.PushFront(types.NewSample("if_out_multicast_pkt", IfHCOutMulticastPkts, tags))
} else {
log.Println("W! if_out_multicast_pkt out of range, current:", ifStat.IfHCOutMulticastPkts, "lasttime:", lastifStat.IfHCOutMulticastPkts, "tags:", tags)
@ -453,13 +454,13 @@ func (ins *Instance) gatherFlowMetrics(ips []string, slist *list.SafeList) {
IfOutDiscards := (float64(ifStat.IfOutDiscards) - float64(lastifStat.IfOutDiscards)) / float64(interval)
if limitCheck(IfInDiscards, ins.DiscardsPktLimit) {
slist.PushFront(inputs.NewSample("if_in_discards", IfInDiscards, tags))
slist.PushFront(types.NewSample("if_in_discards", IfInDiscards, tags))
} else {
log.Println("W! if_in_discards out of range, current:", ifStat.IfInDiscards, "lasttime:", lastifStat.IfInDiscards, "tags:", tags)
if limitCheck(IfOutDiscards, ins.DiscardsPktLimit) {
slist.PushFront(inputs.NewSample("if_out_discards", IfOutDiscards, tags))
slist.PushFront(types.NewSample("if_out_discards", IfOutDiscards, tags))
} else {
log.Println("W! if_out_discards out of range, current:", ifStat.IfOutDiscards, "lasttime:", lastifStat.IfOutDiscards, "tags:", tags)
@ -478,13 +479,13 @@ func (ins *Instance) gatherFlowMetrics(ips []string, slist *list.SafeList) {
IfOutErrors := (float64(ifStat.IfOutErrors) - float64(lastifStat.IfOutErrors)) / float64(interval)
if limitCheck(IfInErrors, ins.ErrorsPktLimit) {
slist.PushFront(inputs.NewSample("if_in_errors", IfInErrors, tags))
slist.PushFront(types.NewSample("if_in_errors", IfInErrors, tags))
} else {
log.Println("W! if_in_errors out of range, current:", ifStat.IfInErrors, "lasttime:", lastifStat.IfInErrors, "tags:", tags)
if limitCheck(IfOutErrors, ins.ErrorsPktLimit) {
slist.PushFront(inputs.NewSample("if_out_errors", IfOutErrors, tags))
slist.PushFront(types.NewSample("if_out_errors", IfOutErrors, tags))
} else {
log.Println("W! if_out_errors out of range, current:", ifStat.IfOutErrors, "lasttime:", lastifStat.IfOutErrors, "tags:", tags)
@ -500,7 +501,7 @@ func (ins *Instance) gatherFlowMetrics(ips []string, slist *list.SafeList) {
interval := ifStat.TS - lastifStat.TS
IfInUnknownProtos := (float64(ifStat.IfInUnknownProtos) - float64(lastifStat.IfInUnknownProtos)) / float64(interval)
if limitCheck(IfInUnknownProtos, ins.UnknownProtosPktLimit) {
slist.PushFront(inputs.NewSample("if_in_unknown_protos", IfInUnknownProtos, tags))
slist.PushFront(types.NewSample("if_in_unknown_protos", IfInUnknownProtos, tags))
} else {
log.Println("W! if_in_unknown_protos out of range, current:", ifStat.IfInUnknownProtos, "lasttime:", lastifStat.IfInUnknownProtos, "tags:", tags)
@ -516,7 +517,7 @@ func (ins *Instance) gatherFlowMetrics(ips []string, slist *list.SafeList) {
interval := ifStat.TS - lastifStat.TS
IfOutQLen := (float64(ifStat.IfOutQLen) - float64(lastifStat.IfOutQLen)) / float64(interval)
if limitCheck(IfOutQLen, ins.OutQlenPktLimit) {
slist.PushFront(inputs.NewSample("if_out_qlen", IfOutQLen, tags))
slist.PushFront(types.NewSample("if_out_qlen", IfOutQLen, tags))
} else {
log.Println("W! if_out_qlen out of range, current:", ifStat.IfOutQLen, "lasttime:", lastifStat.IfOutQLen, "tags:", tags)
@ -535,13 +536,13 @@ func (ins *Instance) gatherFlowMetrics(ips []string, slist *list.SafeList) {
IfHCOutUcastPkts := (float64(ifStat.IfHCOutUcastPkts) - float64(lastifStat.IfHCOutUcastPkts)) / float64(interval)
if limitCheck(IfHCInUcastPkts, ins.PktLimit) {
slist.PushFront(inputs.NewSample("if_in_pkts", IfHCInUcastPkts, tags))
slist.PushFront(types.NewSample("if_in_pkts", IfHCInUcastPkts, tags))
} else {
log.Println("W! if_in_pkts out of range, current:", ifStat.IfHCInUcastPkts, "lasttime:", lastifStat.IfHCInUcastPkts, "tags:", tags)
if limitCheck(IfHCOutUcastPkts, ins.PktLimit) {
slist.PushFront(inputs.NewSample("if_out_pkts", IfHCOutUcastPkts, tags))
slist.PushFront(types.NewSample("if_out_pkts", IfHCOutUcastPkts, tags))
} else {
log.Println("W! if_out_pkts out of range, current:", ifStat.IfHCOutUcastPkts, "lasttime:", lastifStat.IfHCOutUcastPkts, "tags:", tags)
@ -619,7 +620,7 @@ func (ins *Instance) gatherPing(ips []string, slist *list.SafeList) []string {
if ins.GatherPingMetrics {
slist.PushFront(inputs.NewSample("ping_up", val, map[string]string{ins.parent.SwitchIdLabel: ip}, ins.Labels))
slist.PushFront(types.NewSample("ping_up", val, map[string]string{ins.parent.SwitchIdLabel: ip}, ins.Labels))
@ -7,6 +7,7 @@ import (
@ -77,5 +78,5 @@ func (s *SystemStats) Gather(slist *list.SafeList) {
inputs.PushSamples(slist, fields)
types.PushSamples(slist, fields)
@ -196,19 +196,19 @@ func (t *Tomcat) gatherOnce(slist *list.SafeList, ins *Instance) {
// scrape use seconds
defer func(begun time.Time) {
use := time.Since(begun).Seconds()
slist.PushFront(inputs.NewSample("scrape_use_seconds", use, tags))
slist.PushFront(types.NewSample("scrape_use_seconds", use, tags))
// url cannot connect? up = 0
resp, err := ins.client.Do(ins.request)
if err != nil {
slist.PushFront(inputs.NewSample("up", 0, tags))
slist.PushFront(types.NewSample("up", 0, tags))
log.Println("E! failed to query tomcat url:", err)
if resp.StatusCode != http.StatusOK {
slist.PushFront(inputs.NewSample("up", 0, tags))
slist.PushFront(types.NewSample("up", 0, tags))
log.Println("E! received HTTP status code:", resp.StatusCode, "expected: 200")
@ -217,16 +217,16 @@ func (t *Tomcat) gatherOnce(slist *list.SafeList, ins *Instance) {
var status TomcatStatus
if err := xml.NewDecoder(resp.Body).Decode(&status); err != nil {
slist.PushFront(inputs.NewSample("up", 0, tags))
slist.PushFront(types.NewSample("up", 0, tags))
log.Println("E! failed to decode response body:", err)
slist.PushFront(inputs.NewSample("up", 1, tags))
slist.PushFront(types.NewSample("up", 1, tags))
slist.PushFront(inputs.NewSample("jvm_memory_free", status.TomcatJvm.JvmMemory.Free, tags))
slist.PushFront(inputs.NewSample("jvm_memory_total", status.TomcatJvm.JvmMemory.Total, tags))
slist.PushFront(inputs.NewSample("jvm_memory_max", status.TomcatJvm.JvmMemory.Max, tags))
slist.PushFront(types.NewSample("jvm_memory_free", status.TomcatJvm.JvmMemory.Free, tags))
slist.PushFront(types.NewSample("jvm_memory_total", status.TomcatJvm.JvmMemory.Total, tags))
slist.PushFront(types.NewSample("jvm_memory_max", status.TomcatJvm.JvmMemory.Max, tags))
// add tomcat_jvm_memorypool measurements
for _, mp := range status.TomcatJvm.JvmMemoryPools {
@ -242,7 +242,7 @@ func (t *Tomcat) gatherOnce(slist *list.SafeList, ins *Instance) {
"jvm_memorypool_used": mp.UsageUsed,
inputs.PushSamples(slist, tcmpFields, tags, tcmpTags)
types.PushSamples(slist, tcmpFields, tags, tcmpTags)
// add tomcat_connector measurements
@ -268,6 +268,6 @@ func (t *Tomcat) gatherOnce(slist *list.SafeList, ins *Instance) {
"connector_bytes_sent": c.RequestInfo.BytesSent,
inputs.PushSamples(slist, tccFields, tags, tccTags)
types.PushSamples(slist, tccFields, tags, tccTags)
@ -140,13 +140,13 @@ func (ins *Instance) gatherOneHost(wg *sync.WaitGroup, slist *list.SafeList, zkH
// scrape use seconds
defer func(begun time.Time) {
use := time.Since(begun).Seconds()
slist.PushFront(inputs.NewSample("zk_scrape_use_seconds", use, tags))
slist.PushFront(types.NewSample("zk_scrape_use_seconds", use, tags))
// zk_up
mntrConn, err := ins.ZkConnect(zkHost)
if err != nil {
slist.PushFront(inputs.NewSample("zk_up", 0, tags))
slist.PushFront(types.NewSample("zk_up", 0, tags))
log.Println("E! failed to connect zookeeper:", zkHost, "error:", err)
@ -157,7 +157,7 @@ func (ins *Instance) gatherOneHost(wg *sync.WaitGroup, slist *list.SafeList, zkH
// zk_ruok
ruokConn, err := ins.ZkConnect(zkHost)
if err != nil {
slist.PushFront(inputs.NewSample("zk_ruok", 0, tags))
slist.PushFront(types.NewSample("zk_ruok", 0, tags))
log.Println("E! failed to connect zookeeper:", zkHost, "error:", err)
@ -174,16 +174,16 @@ func (ins *Instance) gatherMntrResult(conn net.Conn, slist *list.SafeList, globa
// 'mntr' command isn't allowed in zk config, log as warning
if strings.Contains(lines[0], cmdNotExecutedSffx) {
slist.PushFront(inputs.NewSample("zk_up", 0, globalTags))
slist.PushFront(types.NewSample("zk_up", 0, globalTags))
log.Printf(commandNotAllowedTmpl, "mntr", conn.RemoteAddr().String())
slist.PushFront(inputs.NewSample("zk_up", 1, globalTags))
slist.PushFront(types.NewSample("zk_up", 1, globalTags))
// skip instance if it in a leader only state and doesnt serving client requests
if lines[0] == instanceNotServingMessage {
slist.PushFront(inputs.NewSample("zk_server_leader", 1, globalTags))
slist.PushFront(types.NewSample("zk_server_leader", 1, globalTags))
@ -204,17 +204,17 @@ func (ins *Instance) gatherMntrResult(conn net.Conn, slist *list.SafeList, globa
switch key {
case "zk_server_state":
if value == "leader" {
slist.PushFront(inputs.NewSample("zk_server_leader", 1, globalTags))
slist.PushFront(types.NewSample("zk_server_leader", 1, globalTags))
} else {
slist.PushFront(inputs.NewSample("zk_server_leader", 0, globalTags))
slist.PushFront(types.NewSample("zk_server_leader", 0, globalTags))
case "zk_version":
version := versionRE.ReplaceAllString(value, "$1")
slist.PushFront(inputs.NewSample("zk_version", 1, globalTags, map[string]string{"version": version}))
slist.PushFront(types.NewSample("zk_version", 1, globalTags, map[string]string{"version": version}))
case "zk_peer_state":
slist.PushFront(inputs.NewSample("zk_peer_state", 1, globalTags, map[string]string{"state": value}))
slist.PushFront(types.NewSample("zk_peer_state", 1, globalTags, map[string]string{"state": value}))
var k string
@ -226,9 +226,9 @@ func (ins *Instance) gatherMntrResult(conn net.Conn, slist *list.SafeList, globa
k = metricNameReplacer.Replace(key)
if strings.Contains(k, "{") {
labels := parseLabels(k)
slist.PushFront(inputs.NewSample(k, value, globalTags, labels))
slist.PushFront(types.NewSample(k, value, globalTags, labels))
} else {
slist.PushFront(inputs.NewSample(k, value, globalTags))
slist.PushFront(types.NewSample(k, value, globalTags))
@ -237,12 +237,12 @@ func (ins *Instance) gatherMntrResult(conn net.Conn, slist *list.SafeList, globa
func (ins *Instance) gatherRuokResult(conn net.Conn, slist *list.SafeList, globalTags map[string]string) {
res := sendZookeeperCmd(conn, "ruok")
if res == "imok" {
slist.PushFront(inputs.NewSample("zk_ruok", 1, globalTags))
slist.PushFront(types.NewSample("zk_ruok", 1, globalTags))
} else {
if strings.Contains(res, cmdNotExecutedSffx) {
log.Printf(commandNotAllowedTmpl, "ruok", conn.RemoteAddr().String())
slist.PushFront(inputs.NewSample("zk_ruok", 0, globalTags))
slist.PushFront(types.NewSample("zk_ruok", 0, globalTags))
@ -9,13 +9,14 @@ import (
dto "github.com/prometheus/client_model/go"
dto "github.com/prometheus/client_model/go"
type Parser struct {
@ -79,12 +80,12 @@ func (p *Parser) Parse(buf []byte, slist *list.SafeList) error {
tags := p.makeLabels(m)
if mf.GetType() == dto.MetricType_SUMMARY {
p.handleSummary(m, tags, metricName, slist)
p.HandleSummary(m, tags, metricName, slist)
} else if mf.GetType() == dto.MetricType_HISTOGRAM {
p.handleHistogram(m, tags, metricName, slist)
p.HandleHistogram(m, tags, metricName, slist)
} else {
fields := getNameAndValue(m, metricName)
inputs.PushSamples(slist, fields, tags)
types.PushSamples(slist, fields, tags)
@ -92,24 +93,24 @@ func (p *Parser) Parse(buf []byte, slist *list.SafeList) error {
return nil
func (p *Parser) handleSummary(m *dto.Metric, tags map[string]string, metricName string, slist *list.SafeList) {
slist.PushFront(inputs.NewSample(prom.BuildMetric(p.NamePrefix, metricName, "count"), float64(m.GetSummary().GetSampleCount()), tags))
slist.PushFront(inputs.NewSample(prom.BuildMetric(p.NamePrefix, metricName, "sum"), m.GetSummary().GetSampleSum(), tags))
func (p *Parser) HandleSummary(m *dto.Metric, tags map[string]string, metricName string, slist *list.SafeList) {
slist.PushFront(types.NewSample(prom.BuildMetric(p.NamePrefix, metricName, "count"), float64(m.GetSummary().GetSampleCount()), tags))
slist.PushFront(types.NewSample(prom.BuildMetric(p.NamePrefix, metricName, "sum"), m.GetSummary().GetSampleSum(), tags))
for _, q := range m.GetSummary().Quantile {
slist.PushFront(inputs.NewSample(prom.BuildMetric(p.NamePrefix, metricName), q.GetValue(), tags, map[string]string{"quantile": fmt.Sprint(q.GetQuantile())}))
slist.PushFront(types.NewSample(prom.BuildMetric(p.NamePrefix, metricName), q.GetValue(), tags, map[string]string{"quantile": fmt.Sprint(q.GetQuantile())}))
func (p *Parser) handleHistogram(m *dto.Metric, tags map[string]string, metricName string, slist *list.SafeList) {
slist.PushFront(inputs.NewSample(prom.BuildMetric(p.NamePrefix, metricName, "count"), float64(m.GetHistogram().GetSampleCount()), tags))
slist.PushFront(inputs.NewSample(prom.BuildMetric(p.NamePrefix, metricName, "sum"), m.GetHistogram().GetSampleSum(), tags))
slist.PushFront(inputs.NewSample(prom.BuildMetric(p.NamePrefix, metricName, "bucket"), float64(m.GetHistogram().GetSampleCount()), tags, map[string]string{"le": "+Inf"}))
func (p *Parser) HandleHistogram(m *dto.Metric, tags map[string]string, metricName string, slist *list.SafeList) {
slist.PushFront(types.NewSample(prom.BuildMetric(p.NamePrefix, metricName, "count"), float64(m.GetHistogram().GetSampleCount()), tags))
slist.PushFront(types.NewSample(prom.BuildMetric(p.NamePrefix, metricName, "sum"), m.GetHistogram().GetSampleSum(), tags))
slist.PushFront(types.NewSample(prom.BuildMetric(p.NamePrefix, metricName, "bucket"), float64(m.GetHistogram().GetSampleCount()), tags, map[string]string{"le": "+Inf"}))
for _, b := range m.GetHistogram().Bucket {
le := fmt.Sprint(b.GetUpperBound())
value := float64(b.GetCumulativeCount())
slist.PushFront(inputs.NewSample(prom.BuildMetric(p.NamePrefix, metricName, "bucket"), value, tags, map[string]string{"le": le}))
slist.PushFront(types.NewSample(prom.BuildMetric(p.NamePrefix, metricName, "bucket"), value, tags, map[string]string{"le": le}))
@ -0,0 +1,76 @@
package runtimex
import (
var (
dunno = []byte("???")
centerDot = []byte("·")
dot = []byte(".")
slash = []byte("/")
// stack returns a nicely formatted stack frame, skipping skip frames.
func Stack(skip int) []byte {
buf := new(bytes.Buffer) // the returned data
// As we loop, we open files and read them. These variables record the currently
// loaded file.
var lines [][]byte
var lastFile string
for i := skip; ; i++ { // Skip the expected number of frames
pc, file, line, ok := runtime.Caller(i)
if !ok {
// Print this much at least. If we can't find the source, it won't show.
fmt.Fprintf(buf, "%s:%d (0x%x)\n", file, line, pc)
if file != lastFile {
data, err := ioutil.ReadFile(file)
if err != nil {
lines = bytes.Split(data, []byte{'\n'})
lastFile = file
fmt.Fprintf(buf, "\t%s: %s\n", function(pc), source(lines, line))
return buf.Bytes()
// source returns a space-trimmed slice of the n'th line.
func source(lines [][]byte, n int) []byte {
n-- // in stack trace, lines are 1-indexed but our array is 0-indexed
if n < 0 || n >= len(lines) {
return dunno
return bytes.TrimSpace(lines[n])
// function returns, if possible, the name of the function containing the PC.
func function(pc uintptr) []byte {
fn := runtime.FuncForPC(pc)
if fn == nil {
return dunno
name := []byte(fn.Name())
// The name includes the path name to the package, which is unnecessary
// since the file name is already included. Plus, it has center dots.
// That is, we see
// runtime/debug.*T·ptrmethod
// and want
// *T.ptrmethod
// Also the package path might contains dot (e.g. code.google.com/...),
// so first eliminate the path prefix
if lastslash := bytes.LastIndex(name, slash); lastslash >= 0 {
name = name[lastslash+1:]
if period := bytes.Index(name, dot); period >= 0 {
name = name[period+1:]
name = bytes.Replace(name, centerDot, dot, -1)
return name
@ -0,0 +1,67 @@
package types
import (
type Sample struct {
Metric string `json:"metric"`
Timestamp time.Time `json:"timestamp"`
Value float64 `json:"value"`
Labels map[string]string `json:"labels"`
func NewSample(metric string, value interface{}, labels ...map[string]string) *Sample {
floatValue, err := conv.ToFloat64(value)
if err != nil {
log.Printf("E! can not convert value type %v to float: %v\n", reflect.TypeOf(value), err)
return nil
s := &Sample{
Metric: metric,
Value: floatValue,
Labels: make(map[string]string),
for i := 0; i < len(labels); i++ {
for k, v := range labels[i] {
if v == "-" {
s.Labels[k] = v
return s
func NewSamples(fields map[string]interface{}, labels ...map[string]string) []*Sample {
count := len(fields)
samples := make([]*Sample, 0, count)
for metric, value := range fields {
floatValue, err := conv.ToFloat64(value)
if err != nil {
samples = append(samples, NewSample(metric, floatValue, labels...))
return samples
func PushSamples(slist *list.SafeList, fields map[string]interface{}, labels ...map[string]string) {
for metric, value := range fields {
floatValue, err := conv.ToFloat64(value)
if err != nil {
slist.PushFront(NewSample(metric, floatValue, labels...))
@ -1,10 +0,0 @@
package types
import "time"
type Sample struct {
Metric string `json:"metric"`
Timestamp time.Time `json:"timestamp"`
Value float64 `json:"value"`
Labels map[string]string `json:"labels"`
