adjust some file, variable name for prober module (#503)

* move pulgins_config.go to config dir

* add mongodb, redis yml
This commit is contained in:
yubo 2021-01-11 22:08:27 +08:00 committed by GitHub
parent a9cf307cbf
commit 7161c1ac4e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 164 additions and 166 deletions

View File

@ -1,4 +1,4 @@
mode: whitelist # whitelist(default),all mode: whitelist # whitelist(default),overlay
metrics: metrics:
- name: mysql_queries - name: mysql_queries
type: COUNTER type: COUNTER
@ -14,20 +14,12 @@ metrics:
- name: mysql_variables_max_connections - name: mysql_variables_max_connections
type: GAUGE type: GAUGE
comment: "最大连接数" comment: "最大连接数"
- name: mysql_connections_threshold
type: GAUGE
expr: mysql_threads_connected / mysql_variables_max_connections
comment: "连接数阈值 < 0.8"
- name: mysql_innodb_buffer_pool_read_requests - name: mysql_innodb_buffer_pool_read_requests
type: COUNTER type: COUNTER
comment: "innodb缓冲池查询总数" comment: "innodb缓冲池查询总数"
- name: mysql_innodb_buffer_pool_reads - name: mysql_innodb_buffer_pool_reads
type: COUNTER type: COUNTER
comment: "innodb从磁盘查询数" comment: "innodb从磁盘查询数"
- name: mysql_innodb_buffer_read_threshold
type: COUNTER
expr: (mysql_innodb_buffer_pool_read_requests - mysql_innodb_buffer_pool_reads) / mysql_innodb_buffer_pool_read_requests
comment: "磁盘查询报警阈值 < 0.95"
- name: mysql_binary_files_count - name: mysql_binary_files_count
type: COUNTER type: COUNTER
- name: mysql_binary_size_bytes - name: mysql_binary_size_bytes

View File

@ -15,10 +15,10 @@ const (
type CollectRule struct { type CollectRule struct {
Id int64 `json:"id"` Id int64 `json:"id"`
Nid int64 `json:"nid"` Nid int64 `json:"nid"`
Step int `json:"step" description:"interval"` Step int64 `json:"step" description:"interval"`
Timeout int `json:"timeout"` Timeout int `json:"timeout"`
CollectType string `json:"collect_type" description:"plugin name"` CollectType string `json:"collect_type" description:"plugin name"`
Name string `json:"name"` Name string `json:"name" describes:"customize name"`
Region string `json:"region"` Region string `json:"region"`
Comment string `json:"comment"` Comment string `json:"comment"`
Data json.RawMessage `json:"data"` Data json.RawMessage `json:"data"`
@ -33,6 +33,10 @@ type validator interface {
Validate() error Validate() error
} }
func (p CollectRule) PluginName() string {
return p.CollectType
}
func (p *CollectRule) Validate(v ...interface{}) error { func (p *CollectRule) Validate(v ...interface{}) error {
if p.Name == "" { if p.Name == "" {
return fmt.Errorf("invalid collectRule.name") return fmt.Errorf("invalid collectRule.name")

View File

@ -13,6 +13,6 @@ func Init(ctx context.Context) error {
CollectRule = NewCollectRuleCache(&config.Config.CollectRule) CollectRule = NewCollectRuleCache(&config.Config.CollectRule)
CollectRule.start(ctx) CollectRule.start(ctx)
MetricHistory = NewHistory() MetricHistory = NewHistory()
InitPluginsConfig(config.Config) config.InitPluginsConfig(config.Config)
return nil return nil
} }

View File

@ -1,4 +1,4 @@
package cache package config
import ( import (
"fmt" "fmt"
@ -6,15 +6,24 @@ import (
"path/filepath" "path/filepath"
"strings" "strings"
"github.com/didi/nightingale/src/common/dataobj"
"github.com/didi/nightingale/src/modules/monapi/collector" "github.com/didi/nightingale/src/modules/monapi/collector"
"github.com/didi/nightingale/src/modules/prober/config"
"github.com/didi/nightingale/src/modules/prober/expr" "github.com/didi/nightingale/src/modules/prober/expr"
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
"github.com/toolkits/pkg/logger" "github.com/toolkits/pkg/logger"
"gopkg.in/yaml.v2" "gopkg.in/yaml.v2"
) )
type MetricConfig struct { var (
pluginConfigs map[string]*PluginConfig
)
const (
PluginModeWhitelist = iota
PluginModeOverlay
)
type Metric struct {
Name string `yaml:"name"` Name string `yaml:"name"`
Type string `yaml:"type"` Type string `yaml:"type"`
Comment string `yaml:"comment"` Comment string `yaml:"comment"`
@ -23,23 +32,19 @@ type MetricConfig struct {
} }
type PluginConfig struct { type PluginConfig struct {
Metrics []*MetricConfig `yaml:"metrics"` Name string
Mode int
Metrics map[string]*Metric
ExprMetrics map[string]*Metric
}
type pluginConfig struct {
Metrics []*Metric `yaml:"metrics"`
Mode string `yaml:"mode"` Mode string `yaml:"mode"`
mode int `yaml:"-"` mode int `yaml:"-"`
} }
type CachePluginConfig struct { func (p *pluginConfig) Validate() error {
Name string
Mode int
Metrics map[string]*MetricConfig
}
const (
PluginModeWhitelist = iota
PluginModeOverlay
)
func (p *PluginConfig) Validate() error {
switch strings.ToLower(p.Mode) { switch strings.ToLower(p.Mode) {
case "whitelist": case "whitelist":
p.mode = PluginModeWhitelist p.mode = PluginModeWhitelist
@ -51,19 +56,12 @@ func (p *PluginConfig) Validate() error {
return nil return nil
} }
var ( func InitPluginsConfig(cf *ConfYaml) {
metricsConfig map[string]*MetricConfig pluginConfigs = make(map[string]*PluginConfig)
metricsExpr map[string]*CachePluginConfig for _, plugin := range collector.GetRemoteCollectors() {
) c := pluginConfig{}
config := newPluginConfig()
func InitPluginsConfig(cf *config.ConfYaml) { pluginConfigs[plugin] = config
metricsConfig = make(map[string]*MetricConfig)
metricsExpr = make(map[string]*CachePluginConfig)
plugins := collector.GetRemoteCollectors()
for _, plugin := range plugins {
cacheConfig := newCachePluginConfig()
config := PluginConfig{}
metricsExpr[plugin] = cacheConfig
file := filepath.Join(cf.PluginsConfig, plugin+".yml") file := filepath.Join(cf.PluginsConfig, plugin+".yml")
b, err := ioutil.ReadFile(file) b, err := ioutil.ReadFile(file)
@ -72,49 +70,53 @@ func InitPluginsConfig(cf *config.ConfYaml) {
continue continue
} }
if err := yaml.Unmarshal(b, &config); err != nil { if err := yaml.Unmarshal(b, &c); err != nil {
logger.Warningf("yaml.Unmarshal %s err %s", plugin, err) logger.Warningf("yaml.Unmarshal %s err %s", plugin, err)
continue continue
} }
if err := config.Validate(); err != nil { if err := c.Validate(); err != nil {
logger.Warningf("%s Validate() err %s", plugin, err) logger.Warningf("%s Validate() err %s", plugin, err)
continue continue
} }
cacheConfig.Name = plugin config.Name = plugin
cacheConfig.Mode = config.mode config.Mode = c.mode
for _, v := range config.Metrics { for _, v := range c.Metrics {
if _, ok := metricsConfig[v.Name]; ok { if v.Expr != "" {
panic(fmt.Sprintf("plugin %s metrics %s is already exists", plugin, v.Name))
}
if v.Expr == "" {
// nomore
metricsConfig[v.Name] = v
} else {
err := v.parse() err := v.parse()
if err != nil { if err != nil {
panic(fmt.Sprintf("plugin %s metrics %s expr %s parse err %s", panic(fmt.Sprintf("plugin %s metrics %s expr %s parse err %s",
plugin, v.Name, v.Expr, err)) plugin, v.Name, v.Expr, err))
} }
cacheConfig.Metrics[v.Name] = v config.ExprMetrics[v.Name] = v
} else {
config.Metrics[v.Name] = v
} }
} }
logger.Infof("loaded plugin config %s", file) logger.Infof("loaded plugin config %s", file)
} }
} }
func (p *MetricConfig) parse() (err error) { func (p *Metric) parse() (err error) {
p.notations, err = expr.NewNotations([]byte(p.Expr)) p.notations, err = expr.NewNotations([]byte(p.Expr))
return return
} }
func (p *MetricConfig) Calc(vars map[string]float64) (float64, error) { func (p *Metric) Calc(vars map[string]*dataobj.MetricValue) (float64, error) {
return p.notations.Calc(vars) return p.notations.Calc(vars)
} }
func Metric(metric string, typ telegraf.ValueType) (c *MetricConfig, ok bool) { func GetMetric(plugin, metric string, typ telegraf.ValueType) (c *Metric, ok bool) {
c, ok = metricsConfig[metric] p, ok := pluginConfigs[plugin]
if !ok {
return
}
c, ok = p.Metrics[metric]
if !ok {
c, ok = p.ExprMetrics[metric]
}
if !ok { if !ok {
return return
} }
@ -126,8 +128,8 @@ func Metric(metric string, typ telegraf.ValueType) (c *MetricConfig, ok bool) {
return return
} }
func GetMetricExprs(pluginName string) (c *CachePluginConfig, ok bool) { func GetPluginConfig(pluginName string) (c *PluginConfig, ok bool) {
c, ok = metricsExpr[pluginName] c, ok = pluginConfigs[pluginName]
return return
} }
@ -148,8 +150,9 @@ func metricType(typ telegraf.ValueType) string {
} }
} }
func newCachePluginConfig() *CachePluginConfig { func newPluginConfig() *PluginConfig {
return &CachePluginConfig{ return &PluginConfig{
Metrics: map[string]*MetricConfig{}, Metrics: map[string]*Metric{},
ExprMetrics: map[string]*Metric{},
} }
} }

View File

@ -7,6 +7,7 @@ import (
"go/token" "go/token"
"strconv" "strconv"
"github.com/didi/nightingale/src/common/dataobj"
"github.com/toolkits/pkg/logger" "github.com/toolkits/pkg/logger"
) )
@ -61,7 +62,7 @@ func (s *StackFloat) Push(f float64) { *s = append(*s, f) }
func (s *StackFloat) Pop() float64 { n := (*s)[len(*s)-1]; *s = (*s)[:len(*s)-1]; return n } func (s *StackFloat) Pop() float64 { n := (*s)[len(*s)-1]; *s = (*s)[:len(*s)-1]; return n }
func (s *StackFloat) Len() int { return len(*s) } func (s *StackFloat) Len() int { return len(*s) }
func (rpn Notations) Calc(vars map[string]float64) (float64, error) { func (rpn Notations) Calc(vars map[string]*dataobj.MetricValue) (float64, error) {
var s StackFloat var s StackFloat
for i := 0; i < rpn.Len(); i++ { for i := 0; i < rpn.Len(); i++ {
tn := rpn[i] tn := rpn[i]
@ -70,8 +71,8 @@ func (rpn Notations) Calc(vars map[string]float64) (float64, error) {
if v, ok := vars[tn.tokenVariable]; !ok { if v, ok := vars[tn.tokenVariable]; !ok {
return 0, fmt.Errorf("variable %s is not set", tn.tokenVariable) return 0, fmt.Errorf("variable %s is not set", tn.tokenVariable)
} else { } else {
logger.Debugf("get %s %f", tn.tokenVariable, v) logger.Debugf("get %s %f", tn.tokenVariable, v.Value)
s.Push(v) s.Push(v.Value)
} }
case tokenConst: case tokenConst:
s.Push(tn.tokenConst) s.Push(tn.tokenConst)

View File

@ -9,22 +9,22 @@ import (
"github.com/didi/nightingale/src/common/dataobj" "github.com/didi/nightingale/src/common/dataobj"
"github.com/didi/nightingale/src/models" "github.com/didi/nightingale/src/models"
"github.com/didi/nightingale/src/modules/monapi/collector" "github.com/didi/nightingale/src/modules/monapi/collector"
"github.com/didi/nightingale/src/modules/prober/cache" "github.com/didi/nightingale/src/modules/prober/config"
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
"github.com/toolkits/pkg/logger" "github.com/toolkits/pkg/logger"
) )
// not thread-safe // not thread-safe
type ruleEntity struct { type collectRule struct {
sync.RWMutex sync.RWMutex
telegraf.Input telegraf.Input
rule *models.CollectRule *models.CollectRule
tags map[string]string tags map[string]string
precision time.Duration precision time.Duration
metrics []*dataobj.MetricValue metrics []*dataobj.MetricValue
} }
func newRuleEntity(rule *models.CollectRule) (*ruleEntity, error) { func newCollectRule(rule *models.CollectRule) (*collectRule, error) {
c, err := collector.GetCollector(rule.CollectType) c, err := collector.GetCollector(rule.CollectType)
if err != nil { if err != nil {
return nil, err return nil, err
@ -40,73 +40,87 @@ func newRuleEntity(rule *models.CollectRule) (*ruleEntity, error) {
return nil, err return nil, err
} }
return &ruleEntity{ return &collectRule{
Input: input, Input: input,
rule: rule, CollectRule: rule,
tags: tags, tags: tags,
metrics: []*dataobj.MetricValue{}, metrics: []*dataobj.MetricValue{},
precision: time.Second, precision: time.Second,
}, nil }, nil
} }
// calc metrics with expression // prepareMetrics
func (p *ruleEntity) calc() error { func (p *collectRule) prepareMetrics() error {
if len(p.metrics) == 0 { if len(p.metrics) == 0 {
return nil return nil
} }
sample := p.metrics[0] ts := p.metrics[0].Timestamp
nid := strconv.FormatInt(p.Nid, 10)
configs, ok := cache.GetMetricExprs(p.rule.CollectType) pluginConfig, ok := config.GetPluginConfig(p.PluginName())
if !ok { if !ok {
return nil return nil
} }
vars := map[string]float64{} vars := map[string]*dataobj.MetricValue{}
for _, v := range p.metrics { for _, v := range p.metrics {
logger.Debugf("get v[%s] %f", v.Metric, v.Value) logger.Debugf("get v[%s] %f", v.Metric, v.Value)
vars[v.Metric] = v.Value vars[v.Metric] = v
} }
for _, config := range configs.Metrics { p.metrics = p.metrics[:0]
f, err := config.Calc(vars) for _, metric := range pluginConfig.ExprMetrics {
f, err := metric.Calc(vars)
if err != nil { if err != nil {
logger.Debugf("calc err %s", err) logger.Debugf("calc err %s", err)
continue continue
} }
p.metrics = append(p.metrics, &dataobj.MetricValue{ p.metrics = append(p.metrics, &dataobj.MetricValue{
Nid: sample.Nid, Nid: nid,
Metric: config.Name, Metric: metric.Name,
Timestamp: sample.Timestamp, Timestamp: ts,
Step: sample.Step, Step: p.Step,
CounterType: config.Type, CounterType: metric.Type,
TagsMap: sample.TagsMap, TagsMap: p.tags,
Value: f, Value: f,
ValueUntyped: f, ValueUntyped: f,
}) })
} }
if configs.Mode == cache.PluginModeOverlay {
for k, v := range vars { for k, v := range vars {
if _, ok := configs.Metrics[k]; ok { if metric, ok := pluginConfig.Metrics[k]; ok {
p.metrics = append(p.metrics, &dataobj.MetricValue{
Nid: nid,
Metric: k,
Timestamp: ts,
Step: p.Step,
CounterType: metric.Type,
TagsMap: v.TagsMap,
Value: v.Value,
ValueUntyped: v.ValueUntyped,
})
} else {
if pluginConfig.Mode == config.PluginModeWhitelist {
continue continue
} }
p.metrics = append(p.metrics, &dataobj.MetricValue{ p.metrics = append(p.metrics, &dataobj.MetricValue{
Nid: sample.Nid, Nid: nid,
Metric: k, Metric: k,
Timestamp: sample.Timestamp, Timestamp: ts,
Step: sample.Step, Step: p.Step,
CounterType: "GAUGE", CounterType: "GAUGE",
TagsMap: sample.TagsMap, TagsMap: v.TagsMap,
Value: v, Value: v.Value,
ValueUntyped: v, ValueUntyped: v.ValueUntyped,
}) })
} }
} }
return nil return nil
} }
func (p *ruleEntity) update(rule *models.CollectRule) error { func (p *collectRule) update(rule *models.CollectRule) error {
if p.rule.LastUpdated == rule.LastUpdated { if p.CollectRule.LastUpdated == rule.LastUpdated {
return nil return nil
} }
@ -123,14 +137,14 @@ func (p *ruleEntity) update(rule *models.CollectRule) error {
} }
p.Input = input p.Input = input
p.rule = rule p.CollectRule = rule
p.tags = tags p.tags = tags
return nil return nil
} }
// https://docs.influxdata.com/telegraf/v1.14/data_formats/output/prometheus/ // https://docs.influxdata.com/telegraf/v1.14/data_formats/output/prometheus/
func (p *ruleEntity) MakeMetric(metric telegraf.Metric) []*dataobj.MetricValue { func (p *collectRule) MakeMetric(metric telegraf.Metric) []*dataobj.MetricValue {
tags := map[string]string{} tags := map[string]string{}
for _, v := range metric.TagList() { for _, v := range metric.TagList() {
tags[v.Key] = v.Value tags[v.Key] = v.Value
@ -140,10 +154,8 @@ func (p *ruleEntity) MakeMetric(metric telegraf.Metric) []*dataobj.MetricValue {
tags[k] = v tags[k] = v
} }
nid := strconv.FormatInt(p.rule.Nid, 10)
name := metric.Name() name := metric.Name()
ts := metric.Time().Unix() ts := metric.Time().Unix()
step := int64(p.rule.Step) // deprecated
fields := metric.Fields() fields := metric.Fields()
ms := make([]*dataobj.MetricValue, 0, len(fields)) ms := make([]*dataobj.MetricValue, 0, len(fields))
@ -153,17 +165,9 @@ func (p *ruleEntity) MakeMetric(metric telegraf.Metric) []*dataobj.MetricValue {
continue continue
} }
c, ok := cache.Metric(name+"_"+k, metric.Type())
if !ok {
continue
}
ms = append(ms, &dataobj.MetricValue{ ms = append(ms, &dataobj.MetricValue{
Nid: nid, Metric: name + "_" + k,
Metric: c.Name,
Timestamp: ts, Timestamp: ts,
Step: step,
CounterType: c.Type,
TagsMap: tags, TagsMap: tags,
Value: f, Value: f,
ValueUntyped: f, ValueUntyped: f,
@ -173,7 +177,7 @@ func (p *ruleEntity) MakeMetric(metric telegraf.Metric) []*dataobj.MetricValue {
return ms return ms
} }
func (p *ruleEntity) AddFields( func (p *collectRule) AddFields(
measurement string, measurement string,
fields map[string]interface{}, fields map[string]interface{},
tags map[string]string, tags map[string]string,
@ -182,7 +186,7 @@ func (p *ruleEntity) AddFields(
p.addFields(measurement, tags, fields, telegraf.Untyped, t...) p.addFields(measurement, tags, fields, telegraf.Untyped, t...)
} }
func (p *ruleEntity) AddGauge( func (p *collectRule) AddGauge(
measurement string, measurement string,
fields map[string]interface{}, fields map[string]interface{},
tags map[string]string, tags map[string]string,
@ -191,7 +195,7 @@ func (p *ruleEntity) AddGauge(
p.addFields(measurement, tags, fields, telegraf.Gauge, t...) p.addFields(measurement, tags, fields, telegraf.Gauge, t...)
} }
func (p *ruleEntity) AddCounter( func (p *collectRule) AddCounter(
measurement string, measurement string,
fields map[string]interface{}, fields map[string]interface{},
tags map[string]string, tags map[string]string,
@ -200,7 +204,7 @@ func (p *ruleEntity) AddCounter(
p.addFields(measurement, tags, fields, telegraf.Counter, t...) p.addFields(measurement, tags, fields, telegraf.Counter, t...)
} }
func (p *ruleEntity) AddSummary( func (p *collectRule) AddSummary(
measurement string, measurement string,
fields map[string]interface{}, fields map[string]interface{},
tags map[string]string, tags map[string]string,
@ -209,7 +213,7 @@ func (p *ruleEntity) AddSummary(
p.addFields(measurement, tags, fields, telegraf.Summary, t...) p.addFields(measurement, tags, fields, telegraf.Summary, t...)
} }
func (p *ruleEntity) AddHistogram( func (p *collectRule) AddHistogram(
measurement string, measurement string,
fields map[string]interface{}, fields map[string]interface{},
tags map[string]string, tags map[string]string,
@ -218,20 +222,20 @@ func (p *ruleEntity) AddHistogram(
p.addFields(measurement, tags, fields, telegraf.Histogram, t...) p.addFields(measurement, tags, fields, telegraf.Histogram, t...)
} }
func (p *ruleEntity) AddMetric(m telegraf.Metric) { func (p *collectRule) AddMetric(m telegraf.Metric) {
m.SetTime(m.Time().Round(p.precision)) m.SetTime(m.Time().Round(p.precision))
if metrics := p.MakeMetric(m); m != nil { if metrics := p.MakeMetric(m); m != nil {
p.pushMetrics(metrics) p.pushMetrics(metrics)
} }
} }
func (p *ruleEntity) pushMetrics(metrics []*dataobj.MetricValue) { func (p *collectRule) pushMetrics(metrics []*dataobj.MetricValue) {
p.Lock() p.Lock()
defer p.Unlock() defer p.Unlock()
p.metrics = append(p.metrics, metrics...) p.metrics = append(p.metrics, metrics...)
} }
func (p *ruleEntity) addFields( func (p *collectRule) addFields(
measurement string, measurement string,
tags map[string]string, tags map[string]string,
fields map[string]interface{}, fields map[string]interface{},
@ -249,18 +253,18 @@ func (p *ruleEntity) addFields(
// AddError passes a runtime error to the accumulator. // AddError passes a runtime error to the accumulator.
// The error will be tagged with the plugin name and written to the log. // The error will be tagged with the plugin name and written to the log.
func (p *ruleEntity) AddError(err error) { func (p *collectRule) AddError(err error) {
if err == nil { if err == nil {
return return
} }
log.Printf("Error in plugin: %v", err) logger.Debugf("Error in plugin: %v", err)
} }
func (p *ruleEntity) SetPrecision(precision time.Duration) { func (p *collectRule) SetPrecision(precision time.Duration) {
p.precision = precision p.precision = precision
} }
func (p *ruleEntity) getTime(t []time.Time) time.Time { func (p *collectRule) getTime(t []time.Time) time.Time {
var timestamp time.Time var timestamp time.Time
if len(t) > 0 { if len(t) > 0 {
timestamp = t[0] timestamp = t[0]
@ -270,6 +274,6 @@ func (p *ruleEntity) getTime(t []time.Time) time.Time {
return timestamp.Round(p.precision) return timestamp.Round(p.precision)
} }
func (p *ruleEntity) WithTracking(maxTracked int) telegraf.TrackingAccumulator { func (p *collectRule) WithTracking(maxTracked int) telegraf.TrackingAccumulator {
return nil return nil
} }

View File

@ -20,16 +20,16 @@ type manager struct {
cache *cache.CollectRuleCache cache *cache.CollectRuleCache
config *config.ConfYaml config *config.ConfYaml
heap ruleSummaryHeap heap ruleSummaryHeap
index map[int64]*ruleEntity // add at cache.C , del at executeAt check index map[int64]*collectRule // add at cache.C , del at executeAt check
worker []worker worker []worker
tx chan *ruleEntity collectRuleCh chan *collectRule
} }
func NewManager(cfg *config.ConfYaml, cache *cache.CollectRuleCache) *manager { func NewManager(cfg *config.ConfYaml, cache *cache.CollectRuleCache) *manager {
return &manager{ return &manager{
cache: cache, cache: cache,
config: cfg, config: cfg,
index: make(map[int64]*ruleEntity), index: make(map[int64]*collectRule),
} }
} }
@ -37,15 +37,13 @@ func (p *manager) Start(ctx context.Context) error {
workerProcesses := p.config.WorkerProcesses workerProcesses := p.config.WorkerProcesses
p.ctx = ctx p.ctx = ctx
p.tx = make(chan *ruleEntity, 1) p.collectRuleCh = make(chan *collectRule, 1)
heap.Init(&p.heap) heap.Init(&p.heap)
p.worker = make([]worker, workerProcesses) p.worker = make([]worker, workerProcesses)
for i := 0; i < workerProcesses; i++ { for i := 0; i < workerProcesses; i++ {
p.worker[i].rx = p.tx p.worker[i].collectRuleCh = p.collectRuleCh
p.worker[i].ctx = ctx p.worker[i].ctx = ctx
// p.worker[i].acc = p.acc
p.worker[i].loop(i) p.worker[i].loop(i)
} }
@ -65,7 +63,7 @@ func (p *manager) loop() {
case <-p.ctx.Done(): case <-p.ctx.Done():
return return
case <-p.cache.C: case <-p.cache.C:
if err := p.SyncRules(); err != nil { if err := p.AddRules(); err != nil {
log.Printf("manager.SyncRules err %s", err) log.Printf("manager.SyncRules err %s", err)
} }
case <-tick.C: case <-tick.C:
@ -89,36 +87,37 @@ func (p *manager) schedule() error {
} }
summary := heap.Pop(&p.heap).(*ruleSummary) summary := heap.Pop(&p.heap).(*ruleSummary)
rule, ok := p.cache.Get(summary.id) ruleConfig, ok := p.cache.Get(summary.id)
if !ok { if !ok {
// drop it if not exist in cache // drop it if not exist in cache
delete(p.index, summary.id) delete(p.index, summary.id)
continue continue
} }
entity, ok := p.index[rule.Id] rule, ok := p.index[ruleConfig.Id]
if !ok { if !ok {
// impossible // impossible
log.Printf("manager.index[%d] not exists", rule.Id) log.Printf("manager.index[%d] not exists", ruleConfig.Id)
// let's fix it continue
p.index[entity.rule.Id] = entity
} }
// update rule // update rule
if err := entity.update(rule); err != nil { if err := rule.update(ruleConfig); err != nil {
logger.Warningf("ruleEntity update err %s", err) logger.Warningf("ruleEntity update err %s", err)
} }
p.tx <- entity p.collectRuleCh <- rule
summary.executeAt = now + int64(rule.Step) summary.executeAt = now + int64(ruleConfig.Step)
heap.Push(&p.heap, summary) heap.Push(&p.heap, summary)
continue continue
} }
} }
func (p *manager) SyncRules() error { // AddRules add new rule to p.index from cache
// update / cleanup will be done by p.schedule() -> ruleEntity.update()
func (p *manager) AddRules() error {
for _, v := range p.cache.GetAll() { for _, v := range p.cache.GetAll() {
if _, ok := p.index[v.Id]; !ok { if _, ok := p.index[v.Id]; !ok {
p.AddRule(v) p.AddRule(v)
@ -128,7 +127,7 @@ func (p *manager) SyncRules() error {
} }
func (p *manager) AddRule(rule *models.CollectRule) error { func (p *manager) AddRule(rule *models.CollectRule) error {
ruleEntity, err := newRuleEntity(rule) ruleEntity, err := newCollectRule(rule)
if err != nil { if err != nil {
return err return err
} }
@ -141,11 +140,6 @@ func (p *manager) AddRule(rule *models.CollectRule) error {
return nil return nil
} }
type collectRule interface {
telegraf.Input
tags() map[string]string
}
func telegrafInput(rule *models.CollectRule) (telegraf.Input, error) { func telegrafInput(rule *models.CollectRule) (telegraf.Input, error) {
c, err := collector.GetCollector(rule.CollectType) c, err := collector.GetCollector(rule.CollectType)
if err != nil { if err != nil {
@ -157,7 +151,7 @@ func telegrafInput(rule *models.CollectRule) (telegraf.Input, error) {
type worker struct { type worker struct {
ctx context.Context ctx context.Context
cache *cache.CollectRuleCache cache *cache.CollectRuleCache
rx chan *ruleEntity collectRuleCh chan *collectRule
} }
func (p *worker) loop(id int) { func (p *worker) loop(id int) {
@ -166,8 +160,8 @@ func (p *worker) loop(id int) {
select { select {
case <-p.ctx.Done(): case <-p.ctx.Done():
return return
case entity := <-p.rx: case rule := <-p.collectRuleCh:
if err := p.do(entity); err != nil { if err := p.do(rule); err != nil {
log.Printf("work[%d].do err %s", id, err) log.Printf("work[%d].do err %s", id, err)
} }
} }
@ -175,20 +169,20 @@ func (p *worker) loop(id int) {
}() }()
} }
func (p *worker) do(entity *ruleEntity) error { func (p *worker) do(rule *collectRule) error {
entity.metrics = entity.metrics[:0] rule.metrics = rule.metrics[:0]
// telegraf // telegraf
err := entity.Input.Gather(entity) err := rule.Input.Gather(rule)
if len(entity.metrics) == 0 { if len(rule.metrics) == 0 {
return err return err
} }
// eval expression metrics // eval expression metrics
entity.calc() rule.prepareMetrics()
// send // send
core.Push(entity.metrics) core.Push(rule.metrics)
return err return err
} }