From 7161c1ac4ede324d12df4650ca805d87a47315d5 Mon Sep 17 00:00:00 2001 From: yubo Date: Mon, 11 Jan 2021 22:08:27 +0800 Subject: [PATCH] adjust some file, variable name for prober module (#503) * move pulgins_config.go to config dir * add mongodb, redis yml --- etc/plugins/mysql.yml | 10 +- src/models/mon_collect_rule.go | 8 +- src/modules/prober/cache/cache.go | 2 +- .../cache/{collect_rule.go => collectrule.go} | 0 .../plugins_config.go => config/plugin.go} | 105 ++++++++------- src/modules/prober/expr/expr.go | 7 +- .../{rule_entity.go => collectrule.go} | 126 +++++++++--------- src/modules/prober/manager/manager.go | 72 +++++----- 8 files changed, 164 insertions(+), 166 deletions(-) rename src/modules/prober/cache/{collect_rule.go => collectrule.go} (100%) rename src/modules/prober/{cache/plugins_config.go => config/plugin.go} (52%) rename src/modules/prober/manager/{rule_entity.go => collectrule.go} (62%) diff --git a/etc/plugins/mysql.yml b/etc/plugins/mysql.yml index 030a0a70..afa2298e 100644 --- a/etc/plugins/mysql.yml +++ b/etc/plugins/mysql.yml @@ -1,4 +1,4 @@ -mode: whitelist # whitelist(default),all +mode: whitelist # whitelist(default),overlay metrics: - name: mysql_queries type: COUNTER @@ -14,20 +14,12 @@ metrics: - name: mysql_variables_max_connections type: GAUGE comment: "最大连接数" - - name: mysql_connections_threshold - type: GAUGE - expr: mysql_threads_connected / mysql_variables_max_connections - comment: "连接数阈值 < 0.8" - name: mysql_innodb_buffer_pool_read_requests type: COUNTER comment: "innodb缓冲池查询总数" - name: mysql_innodb_buffer_pool_reads type: COUNTER comment: "innodb从磁盘查询数" - - name: mysql_innodb_buffer_read_threshold - type: COUNTER - expr: (mysql_innodb_buffer_pool_read_requests - mysql_innodb_buffer_pool_reads) / mysql_innodb_buffer_pool_read_requests - comment: "磁盘查询报警阈值 < 0.95" - name: mysql_binary_files_count type: COUNTER - name: mysql_binary_size_bytes diff --git a/src/models/mon_collect_rule.go b/src/models/mon_collect_rule.go index f3a75783..6d9fc265 100644 --- a/src/models/mon_collect_rule.go +++ b/src/models/mon_collect_rule.go @@ -15,10 +15,10 @@ const ( type CollectRule struct { Id int64 `json:"id"` Nid int64 `json:"nid"` - Step int `json:"step" description:"interval"` + Step int64 `json:"step" description:"interval"` Timeout int `json:"timeout"` CollectType string `json:"collect_type" description:"plugin name"` - Name string `json:"name"` + Name string `json:"name" describes:"customize name"` Region string `json:"region"` Comment string `json:"comment"` Data json.RawMessage `json:"data"` @@ -33,6 +33,10 @@ type validator interface { Validate() error } +func (p CollectRule) PluginName() string { + return p.CollectType +} + func (p *CollectRule) Validate(v ...interface{}) error { if p.Name == "" { return fmt.Errorf("invalid collectRule.name") diff --git a/src/modules/prober/cache/cache.go b/src/modules/prober/cache/cache.go index 9c5f306b..bd20cadc 100644 --- a/src/modules/prober/cache/cache.go +++ b/src/modules/prober/cache/cache.go @@ -13,6 +13,6 @@ func Init(ctx context.Context) error { CollectRule = NewCollectRuleCache(&config.Config.CollectRule) CollectRule.start(ctx) MetricHistory = NewHistory() - InitPluginsConfig(config.Config) + config.InitPluginsConfig(config.Config) return nil } diff --git a/src/modules/prober/cache/collect_rule.go b/src/modules/prober/cache/collectrule.go similarity index 100% rename from src/modules/prober/cache/collect_rule.go rename to src/modules/prober/cache/collectrule.go diff --git a/src/modules/prober/cache/plugins_config.go b/src/modules/prober/config/plugin.go similarity index 52% rename from src/modules/prober/cache/plugins_config.go rename to src/modules/prober/config/plugin.go index 550c22b3..3aa2f79a 100644 --- a/src/modules/prober/cache/plugins_config.go +++ b/src/modules/prober/config/plugin.go @@ -1,4 +1,4 @@ -package cache +package config import ( "fmt" @@ -6,15 +6,24 @@ import ( "path/filepath" "strings" + "github.com/didi/nightingale/src/common/dataobj" "github.com/didi/nightingale/src/modules/monapi/collector" - "github.com/didi/nightingale/src/modules/prober/config" "github.com/didi/nightingale/src/modules/prober/expr" "github.com/influxdata/telegraf" "github.com/toolkits/pkg/logger" "gopkg.in/yaml.v2" ) -type MetricConfig struct { +var ( + pluginConfigs map[string]*PluginConfig +) + +const ( + PluginModeWhitelist = iota + PluginModeOverlay +) + +type Metric struct { Name string `yaml:"name"` Type string `yaml:"type"` Comment string `yaml:"comment"` @@ -23,23 +32,19 @@ type MetricConfig struct { } type PluginConfig struct { - Metrics []*MetricConfig `yaml:"metrics"` - Mode string `yaml:"mode"` - mode int `yaml:"-"` + Name string + Mode int + Metrics map[string]*Metric + ExprMetrics map[string]*Metric } -type CachePluginConfig struct { - Name string - Mode int - Metrics map[string]*MetricConfig +type pluginConfig struct { + Metrics []*Metric `yaml:"metrics"` + Mode string `yaml:"mode"` + mode int `yaml:"-"` } -const ( - PluginModeWhitelist = iota - PluginModeOverlay -) - -func (p *PluginConfig) Validate() error { +func (p *pluginConfig) Validate() error { switch strings.ToLower(p.Mode) { case "whitelist": p.mode = PluginModeWhitelist @@ -51,19 +56,12 @@ func (p *PluginConfig) Validate() error { return nil } -var ( - metricsConfig map[string]*MetricConfig - metricsExpr map[string]*CachePluginConfig -) - -func InitPluginsConfig(cf *config.ConfYaml) { - metricsConfig = make(map[string]*MetricConfig) - metricsExpr = make(map[string]*CachePluginConfig) - plugins := collector.GetRemoteCollectors() - for _, plugin := range plugins { - cacheConfig := newCachePluginConfig() - config := PluginConfig{} - metricsExpr[plugin] = cacheConfig +func InitPluginsConfig(cf *ConfYaml) { + pluginConfigs = make(map[string]*PluginConfig) + for _, plugin := range collector.GetRemoteCollectors() { + c := pluginConfig{} + config := newPluginConfig() + pluginConfigs[plugin] = config file := filepath.Join(cf.PluginsConfig, plugin+".yml") b, err := ioutil.ReadFile(file) @@ -72,49 +70,53 @@ func InitPluginsConfig(cf *config.ConfYaml) { continue } - if err := yaml.Unmarshal(b, &config); err != nil { + if err := yaml.Unmarshal(b, &c); err != nil { logger.Warningf("yaml.Unmarshal %s err %s", plugin, err) continue } - if err := config.Validate(); err != nil { + if err := c.Validate(); err != nil { logger.Warningf("%s Validate() err %s", plugin, err) continue } - cacheConfig.Name = plugin - cacheConfig.Mode = config.mode + config.Name = plugin + config.Mode = c.mode - for _, v := range config.Metrics { - if _, ok := metricsConfig[v.Name]; ok { - panic(fmt.Sprintf("plugin %s metrics %s is already exists", plugin, v.Name)) - } - if v.Expr == "" { - // nomore - metricsConfig[v.Name] = v - } else { + for _, v := range c.Metrics { + if v.Expr != "" { err := v.parse() if err != nil { panic(fmt.Sprintf("plugin %s metrics %s expr %s parse err %s", plugin, v.Name, v.Expr, err)) } - cacheConfig.Metrics[v.Name] = v + config.ExprMetrics[v.Name] = v + } else { + config.Metrics[v.Name] = v } } logger.Infof("loaded plugin config %s", file) } } -func (p *MetricConfig) parse() (err error) { +func (p *Metric) parse() (err error) { p.notations, err = expr.NewNotations([]byte(p.Expr)) return } -func (p *MetricConfig) Calc(vars map[string]float64) (float64, error) { +func (p *Metric) Calc(vars map[string]*dataobj.MetricValue) (float64, error) { return p.notations.Calc(vars) } -func Metric(metric string, typ telegraf.ValueType) (c *MetricConfig, ok bool) { - c, ok = metricsConfig[metric] +func GetMetric(plugin, metric string, typ telegraf.ValueType) (c *Metric, ok bool) { + p, ok := pluginConfigs[plugin] + if !ok { + return + } + + c, ok = p.Metrics[metric] + if !ok { + c, ok = p.ExprMetrics[metric] + } if !ok { return } @@ -126,8 +128,8 @@ func Metric(metric string, typ telegraf.ValueType) (c *MetricConfig, ok bool) { return } -func GetMetricExprs(pluginName string) (c *CachePluginConfig, ok bool) { - c, ok = metricsExpr[pluginName] +func GetPluginConfig(pluginName string) (c *PluginConfig, ok bool) { + c, ok = pluginConfigs[pluginName] return } @@ -148,8 +150,9 @@ func metricType(typ telegraf.ValueType) string { } } -func newCachePluginConfig() *CachePluginConfig { - return &CachePluginConfig{ - Metrics: map[string]*MetricConfig{}, +func newPluginConfig() *PluginConfig { + return &PluginConfig{ + Metrics: map[string]*Metric{}, + ExprMetrics: map[string]*Metric{}, } } diff --git a/src/modules/prober/expr/expr.go b/src/modules/prober/expr/expr.go index 5268ea2a..ec3b7711 100644 --- a/src/modules/prober/expr/expr.go +++ b/src/modules/prober/expr/expr.go @@ -7,6 +7,7 @@ import ( "go/token" "strconv" + "github.com/didi/nightingale/src/common/dataobj" "github.com/toolkits/pkg/logger" ) @@ -61,7 +62,7 @@ func (s *StackFloat) Push(f float64) { *s = append(*s, f) } func (s *StackFloat) Pop() float64 { n := (*s)[len(*s)-1]; *s = (*s)[:len(*s)-1]; return n } func (s *StackFloat) Len() int { return len(*s) } -func (rpn Notations) Calc(vars map[string]float64) (float64, error) { +func (rpn Notations) Calc(vars map[string]*dataobj.MetricValue) (float64, error) { var s StackFloat for i := 0; i < rpn.Len(); i++ { tn := rpn[i] @@ -70,8 +71,8 @@ func (rpn Notations) Calc(vars map[string]float64) (float64, error) { if v, ok := vars[tn.tokenVariable]; !ok { return 0, fmt.Errorf("variable %s is not set", tn.tokenVariable) } else { - logger.Debugf("get %s %f", tn.tokenVariable, v) - s.Push(v) + logger.Debugf("get %s %f", tn.tokenVariable, v.Value) + s.Push(v.Value) } case tokenConst: s.Push(tn.tokenConst) diff --git a/src/modules/prober/manager/rule_entity.go b/src/modules/prober/manager/collectrule.go similarity index 62% rename from src/modules/prober/manager/rule_entity.go rename to src/modules/prober/manager/collectrule.go index cde17a07..73899ce8 100644 --- a/src/modules/prober/manager/rule_entity.go +++ b/src/modules/prober/manager/collectrule.go @@ -9,22 +9,22 @@ import ( "github.com/didi/nightingale/src/common/dataobj" "github.com/didi/nightingale/src/models" "github.com/didi/nightingale/src/modules/monapi/collector" - "github.com/didi/nightingale/src/modules/prober/cache" + "github.com/didi/nightingale/src/modules/prober/config" "github.com/influxdata/telegraf" "github.com/toolkits/pkg/logger" ) // not thread-safe -type ruleEntity struct { +type collectRule struct { sync.RWMutex telegraf.Input - rule *models.CollectRule + *models.CollectRule tags map[string]string precision time.Duration metrics []*dataobj.MetricValue } -func newRuleEntity(rule *models.CollectRule) (*ruleEntity, error) { +func newCollectRule(rule *models.CollectRule) (*collectRule, error) { c, err := collector.GetCollector(rule.CollectType) if err != nil { return nil, err @@ -40,73 +40,87 @@ func newRuleEntity(rule *models.CollectRule) (*ruleEntity, error) { return nil, err } - return &ruleEntity{ - Input: input, - rule: rule, - tags: tags, - metrics: []*dataobj.MetricValue{}, - precision: time.Second, + return &collectRule{ + Input: input, + CollectRule: rule, + tags: tags, + metrics: []*dataobj.MetricValue{}, + precision: time.Second, }, nil } -// calc metrics with expression -func (p *ruleEntity) calc() error { +// prepareMetrics +func (p *collectRule) prepareMetrics() error { if len(p.metrics) == 0 { return nil } - sample := p.metrics[0] + ts := p.metrics[0].Timestamp + nid := strconv.FormatInt(p.Nid, 10) - configs, ok := cache.GetMetricExprs(p.rule.CollectType) + pluginConfig, ok := config.GetPluginConfig(p.PluginName()) if !ok { return nil } - vars := map[string]float64{} + vars := map[string]*dataobj.MetricValue{} for _, v := range p.metrics { logger.Debugf("get v[%s] %f", v.Metric, v.Value) - vars[v.Metric] = v.Value + vars[v.Metric] = v } - for _, config := range configs.Metrics { - f, err := config.Calc(vars) + p.metrics = p.metrics[:0] + for _, metric := range pluginConfig.ExprMetrics { + f, err := metric.Calc(vars) if err != nil { logger.Debugf("calc err %s", err) continue } p.metrics = append(p.metrics, &dataobj.MetricValue{ - Nid: sample.Nid, - Metric: config.Name, - Timestamp: sample.Timestamp, - Step: sample.Step, - CounterType: config.Type, - TagsMap: sample.TagsMap, + Nid: nid, + Metric: metric.Name, + Timestamp: ts, + Step: p.Step, + CounterType: metric.Type, + TagsMap: p.tags, Value: f, ValueUntyped: f, }) } - if configs.Mode == cache.PluginModeOverlay { - for k, v := range vars { - if _, ok := configs.Metrics[k]; ok { + for k, v := range vars { + if metric, ok := pluginConfig.Metrics[k]; ok { + p.metrics = append(p.metrics, &dataobj.MetricValue{ + Nid: nid, + Metric: k, + Timestamp: ts, + Step: p.Step, + CounterType: metric.Type, + TagsMap: v.TagsMap, + Value: v.Value, + ValueUntyped: v.ValueUntyped, + }) + } else { + if pluginConfig.Mode == config.PluginModeWhitelist { continue } p.metrics = append(p.metrics, &dataobj.MetricValue{ - Nid: sample.Nid, + Nid: nid, Metric: k, - Timestamp: sample.Timestamp, - Step: sample.Step, + Timestamp: ts, + Step: p.Step, CounterType: "GAUGE", - TagsMap: sample.TagsMap, - Value: v, - ValueUntyped: v, + TagsMap: v.TagsMap, + Value: v.Value, + ValueUntyped: v.ValueUntyped, }) + } } return nil } -func (p *ruleEntity) update(rule *models.CollectRule) error { - if p.rule.LastUpdated == rule.LastUpdated { +func (p *collectRule) update(rule *models.CollectRule) error { + if p.CollectRule.LastUpdated == rule.LastUpdated { return nil } @@ -123,14 +137,14 @@ func (p *ruleEntity) update(rule *models.CollectRule) error { } p.Input = input - p.rule = rule + p.CollectRule = rule p.tags = tags return nil } // https://docs.influxdata.com/telegraf/v1.14/data_formats/output/prometheus/ -func (p *ruleEntity) MakeMetric(metric telegraf.Metric) []*dataobj.MetricValue { +func (p *collectRule) MakeMetric(metric telegraf.Metric) []*dataobj.MetricValue { tags := map[string]string{} for _, v := range metric.TagList() { tags[v.Key] = v.Value @@ -140,10 +154,8 @@ func (p *ruleEntity) MakeMetric(metric telegraf.Metric) []*dataobj.MetricValue { tags[k] = v } - nid := strconv.FormatInt(p.rule.Nid, 10) name := metric.Name() ts := metric.Time().Unix() - step := int64(p.rule.Step) // deprecated fields := metric.Fields() ms := make([]*dataobj.MetricValue, 0, len(fields)) @@ -153,17 +165,9 @@ func (p *ruleEntity) MakeMetric(metric telegraf.Metric) []*dataobj.MetricValue { continue } - c, ok := cache.Metric(name+"_"+k, metric.Type()) - if !ok { - continue - } - ms = append(ms, &dataobj.MetricValue{ - Nid: nid, - Metric: c.Name, + Metric: name + "_" + k, Timestamp: ts, - Step: step, - CounterType: c.Type, TagsMap: tags, Value: f, ValueUntyped: f, @@ -173,7 +177,7 @@ func (p *ruleEntity) MakeMetric(metric telegraf.Metric) []*dataobj.MetricValue { return ms } -func (p *ruleEntity) AddFields( +func (p *collectRule) AddFields( measurement string, fields map[string]interface{}, tags map[string]string, @@ -182,7 +186,7 @@ func (p *ruleEntity) AddFields( p.addFields(measurement, tags, fields, telegraf.Untyped, t...) } -func (p *ruleEntity) AddGauge( +func (p *collectRule) AddGauge( measurement string, fields map[string]interface{}, tags map[string]string, @@ -191,7 +195,7 @@ func (p *ruleEntity) AddGauge( p.addFields(measurement, tags, fields, telegraf.Gauge, t...) } -func (p *ruleEntity) AddCounter( +func (p *collectRule) AddCounter( measurement string, fields map[string]interface{}, tags map[string]string, @@ -200,7 +204,7 @@ func (p *ruleEntity) AddCounter( p.addFields(measurement, tags, fields, telegraf.Counter, t...) } -func (p *ruleEntity) AddSummary( +func (p *collectRule) AddSummary( measurement string, fields map[string]interface{}, tags map[string]string, @@ -209,7 +213,7 @@ func (p *ruleEntity) AddSummary( p.addFields(measurement, tags, fields, telegraf.Summary, t...) } -func (p *ruleEntity) AddHistogram( +func (p *collectRule) AddHistogram( measurement string, fields map[string]interface{}, tags map[string]string, @@ -218,20 +222,20 @@ func (p *ruleEntity) AddHistogram( p.addFields(measurement, tags, fields, telegraf.Histogram, t...) } -func (p *ruleEntity) AddMetric(m telegraf.Metric) { +func (p *collectRule) AddMetric(m telegraf.Metric) { m.SetTime(m.Time().Round(p.precision)) if metrics := p.MakeMetric(m); m != nil { p.pushMetrics(metrics) } } -func (p *ruleEntity) pushMetrics(metrics []*dataobj.MetricValue) { +func (p *collectRule) pushMetrics(metrics []*dataobj.MetricValue) { p.Lock() defer p.Unlock() p.metrics = append(p.metrics, metrics...) } -func (p *ruleEntity) addFields( +func (p *collectRule) addFields( measurement string, tags map[string]string, fields map[string]interface{}, @@ -249,18 +253,18 @@ func (p *ruleEntity) addFields( // AddError passes a runtime error to the accumulator. // The error will be tagged with the plugin name and written to the log. -func (p *ruleEntity) AddError(err error) { +func (p *collectRule) AddError(err error) { if err == nil { return } - log.Printf("Error in plugin: %v", err) + logger.Debugf("Error in plugin: %v", err) } -func (p *ruleEntity) SetPrecision(precision time.Duration) { +func (p *collectRule) SetPrecision(precision time.Duration) { p.precision = precision } -func (p *ruleEntity) getTime(t []time.Time) time.Time { +func (p *collectRule) getTime(t []time.Time) time.Time { var timestamp time.Time if len(t) > 0 { timestamp = t[0] @@ -270,6 +274,6 @@ func (p *ruleEntity) getTime(t []time.Time) time.Time { return timestamp.Round(p.precision) } -func (p *ruleEntity) WithTracking(maxTracked int) telegraf.TrackingAccumulator { +func (p *collectRule) WithTracking(maxTracked int) telegraf.TrackingAccumulator { return nil } diff --git a/src/modules/prober/manager/manager.go b/src/modules/prober/manager/manager.go index 96f0263b..059a8186 100644 --- a/src/modules/prober/manager/manager.go +++ b/src/modules/prober/manager/manager.go @@ -16,20 +16,20 @@ import ( ) type manager struct { - ctx context.Context - cache *cache.CollectRuleCache - config *config.ConfYaml - heap ruleSummaryHeap - index map[int64]*ruleEntity // add at cache.C , del at executeAt check - worker []worker - tx chan *ruleEntity + ctx context.Context + cache *cache.CollectRuleCache + config *config.ConfYaml + heap ruleSummaryHeap + index map[int64]*collectRule // add at cache.C , del at executeAt check + worker []worker + collectRuleCh chan *collectRule } func NewManager(cfg *config.ConfYaml, cache *cache.CollectRuleCache) *manager { return &manager{ cache: cache, config: cfg, - index: make(map[int64]*ruleEntity), + index: make(map[int64]*collectRule), } } @@ -37,15 +37,13 @@ func (p *manager) Start(ctx context.Context) error { workerProcesses := p.config.WorkerProcesses p.ctx = ctx - p.tx = make(chan *ruleEntity, 1) + p.collectRuleCh = make(chan *collectRule, 1) heap.Init(&p.heap) p.worker = make([]worker, workerProcesses) for i := 0; i < workerProcesses; i++ { - p.worker[i].rx = p.tx + p.worker[i].collectRuleCh = p.collectRuleCh p.worker[i].ctx = ctx - // p.worker[i].acc = p.acc - p.worker[i].loop(i) } @@ -65,7 +63,7 @@ func (p *manager) loop() { case <-p.ctx.Done(): return case <-p.cache.C: - if err := p.SyncRules(); err != nil { + if err := p.AddRules(); err != nil { log.Printf("manager.SyncRules err %s", err) } case <-tick.C: @@ -89,36 +87,37 @@ func (p *manager) schedule() error { } summary := heap.Pop(&p.heap).(*ruleSummary) - rule, ok := p.cache.Get(summary.id) + ruleConfig, ok := p.cache.Get(summary.id) if !ok { // drop it if not exist in cache delete(p.index, summary.id) continue } - entity, ok := p.index[rule.Id] + rule, ok := p.index[ruleConfig.Id] if !ok { // impossible - log.Printf("manager.index[%d] not exists", rule.Id) - // let's fix it - p.index[entity.rule.Id] = entity + log.Printf("manager.index[%d] not exists", ruleConfig.Id) + continue } // update rule - if err := entity.update(rule); err != nil { + if err := rule.update(ruleConfig); err != nil { logger.Warningf("ruleEntity update err %s", err) } - p.tx <- entity + p.collectRuleCh <- rule - summary.executeAt = now + int64(rule.Step) + summary.executeAt = now + int64(ruleConfig.Step) heap.Push(&p.heap, summary) continue } } -func (p *manager) SyncRules() error { +// AddRules add new rule to p.index from cache +// update / cleanup will be done by p.schedule() -> ruleEntity.update() +func (p *manager) AddRules() error { for _, v := range p.cache.GetAll() { if _, ok := p.index[v.Id]; !ok { p.AddRule(v) @@ -128,7 +127,7 @@ func (p *manager) SyncRules() error { } func (p *manager) AddRule(rule *models.CollectRule) error { - ruleEntity, err := newRuleEntity(rule) + ruleEntity, err := newCollectRule(rule) if err != nil { return err } @@ -141,11 +140,6 @@ func (p *manager) AddRule(rule *models.CollectRule) error { return nil } -type collectRule interface { - telegraf.Input - tags() map[string]string -} - func telegrafInput(rule *models.CollectRule) (telegraf.Input, error) { c, err := collector.GetCollector(rule.CollectType) if err != nil { @@ -155,9 +149,9 @@ func telegrafInput(rule *models.CollectRule) (telegraf.Input, error) { } type worker struct { - ctx context.Context - cache *cache.CollectRuleCache - rx chan *ruleEntity + ctx context.Context + cache *cache.CollectRuleCache + collectRuleCh chan *collectRule } func (p *worker) loop(id int) { @@ -166,8 +160,8 @@ func (p *worker) loop(id int) { select { case <-p.ctx.Done(): return - case entity := <-p.rx: - if err := p.do(entity); err != nil { + case rule := <-p.collectRuleCh: + if err := p.do(rule); err != nil { log.Printf("work[%d].do err %s", id, err) } } @@ -175,20 +169,20 @@ func (p *worker) loop(id int) { }() } -func (p *worker) do(entity *ruleEntity) error { - entity.metrics = entity.metrics[:0] +func (p *worker) do(rule *collectRule) error { + rule.metrics = rule.metrics[:0] // telegraf - err := entity.Input.Gather(entity) - if len(entity.metrics) == 0 { + err := rule.Input.Gather(rule) + if len(rule.metrics) == 0 { return err } // eval expression metrics - entity.calc() + rule.prepareMetrics() // send - core.Push(entity.metrics) + core.Push(rule.metrics) return err }