diff --git a/src/models/mon_collect_rule.go b/src/models/mon_collect_rule.go index 65334dbc..0cf34719 100644 --- a/src/models/mon_collect_rule.go +++ b/src/models/mon_collect_rule.go @@ -37,6 +37,10 @@ func (p CollectRule) PluginName() string { return p.CollectType } +func (p *CollectRule) String() string { + return fmt.Sprintf("id %d type %s name %s", p.Id, p.CollectType, p.Name) +} + func (p *CollectRule) Validate(v ...interface{}) error { if p.Name == "" { return fmt.Errorf("invalid collectRule.name") diff --git a/src/modules/monapi/http/router_collect.go b/src/modules/monapi/http/router_collect.go index 490f41f0..276c30a9 100644 --- a/src/modules/monapi/http/router_collect.go +++ b/src/modules/monapi/http/router_collect.go @@ -3,7 +3,6 @@ package http import ( "encoding/json" "fmt" - "log" "regexp" "strings" @@ -90,7 +89,6 @@ func collectRulesGetV2(c *gin.Context) { limit := queryInt(c, "limit", 20) typ := queryStr(c, "type", "") - log.Printf("typ %s", typ) total, list, err := models.GetCollectRules(typ, nid, limit, offset(c, limit, 0)) renderData(c, map[string]interface{}{ diff --git a/src/modules/prober/manager/collectrule.go b/src/modules/prober/manager/collectrule.go index d175f652..c0c6581c 100644 --- a/src/modules/prober/manager/collectrule.go +++ b/src/modules/prober/manager/collectrule.go @@ -1,7 +1,6 @@ package manager import ( - "log" "strconv" "sync" "time" @@ -23,6 +22,7 @@ type collectRule struct { precision time.Duration metrics []*dataobj.MetricValue lastAt int64 + updatedAt int64 } func newCollectRule(rule *models.CollectRule) (*collectRule, error) { @@ -47,6 +47,7 @@ func newCollectRule(rule *models.CollectRule) (*collectRule, error) { tags: tags, metrics: []*dataobj.MetricValue{}, precision: time.Second, + updatedAt: rule.UpdatedAt, }, nil } @@ -121,15 +122,16 @@ func (p *collectRule) prepareMetrics() error { } func (p *collectRule) update(rule *models.CollectRule) error { - if p.CollectRule.UpdatedAt == rule.UpdatedAt { + if p.updatedAt == rule.UpdatedAt { return nil } + logger.Debugf("update %s", rule) + input, err := telegrafInput(rule) if err != nil { // ignore error, use old config - log.Printf("telegrafInput() id %d type %s name %s err %s", - rule.Id, rule.CollectType, rule.Name, err) + logger.Warningf("telegrafInput %s err %s", rule, err) } tags, err := dataobj.SplitTagsString(rule.Tags) @@ -140,6 +142,7 @@ func (p *collectRule) update(rule *models.CollectRule) error { p.Input = input p.CollectRule = rule p.tags = tags + p.UpdatedAt = rule.UpdatedAt return nil } diff --git a/src/modules/prober/manager/manager.go b/src/modules/prober/manager/manager.go index 904a8c91..465fd8b4 100644 --- a/src/modules/prober/manager/manager.go +++ b/src/modules/prober/manager/manager.go @@ -87,22 +87,22 @@ func (p *manager) schedule() error { } summary := heap.Pop(&p.heap).(*ruleSummary) - ruleConfig, ok := p.cache.Get(summary.id) + latestRule, ok := p.cache.Get(summary.id) if !ok { // drop it if not exist in cache delete(p.index, summary.id) continue } - rule, ok := p.index[ruleConfig.Id] + rule, ok := p.index[latestRule.Id] if !ok { // impossible - log.Printf("manager.index[%d] not exists", ruleConfig.Id) + logger.Warningf("manager.index[%d] not exists", latestRule.Id) continue } // update rule - if err := rule.update(ruleConfig); err != nil { + if err := rule.update(latestRule); err != nil { logger.Warningf("ruleEntity update err %s", err) } @@ -110,9 +110,9 @@ func (p *manager) schedule() error { logger.Debugf("%s %s %d lastAt %ds before nextAt %ds later", rule.CollectType, rule.Name, rule.Id, - now-rule.lastAt, ruleConfig.Step) + now-rule.lastAt, rule.Step) - summary.activeAt = now + int64(ruleConfig.Step) + summary.activeAt = now + int64(rule.Step) rule.lastAt = now heap.Push(&p.heap, summary)