bugfix: replace ref with instantiated variable for prober.rules.updatedAt (#536)
* add mon.plugins.redis descriptions * bugfix: add region field for instances/heartbeat * bugfix: replace ref with instantiated variable for prober.rules.updatedAt
This commit is contained in:
parent
25c31fcb2e
commit
8fe3d2b0b3
|
@ -37,6 +37,10 @@ func (p CollectRule) PluginName() string {
|
||||||
return p.CollectType
|
return p.CollectType
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (p *CollectRule) String() string {
|
||||||
|
return fmt.Sprintf("id %d type %s name %s", p.Id, p.CollectType, p.Name)
|
||||||
|
}
|
||||||
|
|
||||||
func (p *CollectRule) Validate(v ...interface{}) error {
|
func (p *CollectRule) Validate(v ...interface{}) error {
|
||||||
if p.Name == "" {
|
if p.Name == "" {
|
||||||
return fmt.Errorf("invalid collectRule.name")
|
return fmt.Errorf("invalid collectRule.name")
|
||||||
|
|
|
@ -3,7 +3,6 @@ package http
|
||||||
import (
|
import (
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
"log"
|
|
||||||
"regexp"
|
"regexp"
|
||||||
"strings"
|
"strings"
|
||||||
|
|
||||||
|
@ -90,7 +89,6 @@ func collectRulesGetV2(c *gin.Context) {
|
||||||
limit := queryInt(c, "limit", 20)
|
limit := queryInt(c, "limit", 20)
|
||||||
typ := queryStr(c, "type", "")
|
typ := queryStr(c, "type", "")
|
||||||
|
|
||||||
log.Printf("typ %s", typ)
|
|
||||||
total, list, err := models.GetCollectRules(typ, nid, limit, offset(c, limit, 0))
|
total, list, err := models.GetCollectRules(typ, nid, limit, offset(c, limit, 0))
|
||||||
|
|
||||||
renderData(c, map[string]interface{}{
|
renderData(c, map[string]interface{}{
|
||||||
|
|
|
@ -1,7 +1,6 @@
|
||||||
package manager
|
package manager
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"log"
|
|
||||||
"strconv"
|
"strconv"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
@ -23,6 +22,7 @@ type collectRule struct {
|
||||||
precision time.Duration
|
precision time.Duration
|
||||||
metrics []*dataobj.MetricValue
|
metrics []*dataobj.MetricValue
|
||||||
lastAt int64
|
lastAt int64
|
||||||
|
updatedAt int64
|
||||||
}
|
}
|
||||||
|
|
||||||
func newCollectRule(rule *models.CollectRule) (*collectRule, error) {
|
func newCollectRule(rule *models.CollectRule) (*collectRule, error) {
|
||||||
|
@ -47,6 +47,7 @@ func newCollectRule(rule *models.CollectRule) (*collectRule, error) {
|
||||||
tags: tags,
|
tags: tags,
|
||||||
metrics: []*dataobj.MetricValue{},
|
metrics: []*dataobj.MetricValue{},
|
||||||
precision: time.Second,
|
precision: time.Second,
|
||||||
|
updatedAt: rule.UpdatedAt,
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -121,15 +122,16 @@ func (p *collectRule) prepareMetrics() error {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *collectRule) update(rule *models.CollectRule) error {
|
func (p *collectRule) update(rule *models.CollectRule) error {
|
||||||
if p.CollectRule.UpdatedAt == rule.UpdatedAt {
|
if p.updatedAt == rule.UpdatedAt {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
logger.Debugf("update %s", rule)
|
||||||
|
|
||||||
input, err := telegrafInput(rule)
|
input, err := telegrafInput(rule)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// ignore error, use old config
|
// ignore error, use old config
|
||||||
log.Printf("telegrafInput() id %d type %s name %s err %s",
|
logger.Warningf("telegrafInput %s err %s", rule, err)
|
||||||
rule.Id, rule.CollectType, rule.Name, err)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
tags, err := dataobj.SplitTagsString(rule.Tags)
|
tags, err := dataobj.SplitTagsString(rule.Tags)
|
||||||
|
@ -140,6 +142,7 @@ func (p *collectRule) update(rule *models.CollectRule) error {
|
||||||
p.Input = input
|
p.Input = input
|
||||||
p.CollectRule = rule
|
p.CollectRule = rule
|
||||||
p.tags = tags
|
p.tags = tags
|
||||||
|
p.UpdatedAt = rule.UpdatedAt
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -87,22 +87,22 @@ func (p *manager) schedule() error {
|
||||||
}
|
}
|
||||||
|
|
||||||
summary := heap.Pop(&p.heap).(*ruleSummary)
|
summary := heap.Pop(&p.heap).(*ruleSummary)
|
||||||
ruleConfig, ok := p.cache.Get(summary.id)
|
latestRule, ok := p.cache.Get(summary.id)
|
||||||
if !ok {
|
if !ok {
|
||||||
// drop it if not exist in cache
|
// drop it if not exist in cache
|
||||||
delete(p.index, summary.id)
|
delete(p.index, summary.id)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
rule, ok := p.index[ruleConfig.Id]
|
rule, ok := p.index[latestRule.Id]
|
||||||
if !ok {
|
if !ok {
|
||||||
// impossible
|
// impossible
|
||||||
log.Printf("manager.index[%d] not exists", ruleConfig.Id)
|
logger.Warningf("manager.index[%d] not exists", latestRule.Id)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
// update rule
|
// update rule
|
||||||
if err := rule.update(ruleConfig); err != nil {
|
if err := rule.update(latestRule); err != nil {
|
||||||
logger.Warningf("ruleEntity update err %s", err)
|
logger.Warningf("ruleEntity update err %s", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -110,9 +110,9 @@ func (p *manager) schedule() error {
|
||||||
|
|
||||||
logger.Debugf("%s %s %d lastAt %ds before nextAt %ds later",
|
logger.Debugf("%s %s %d lastAt %ds before nextAt %ds later",
|
||||||
rule.CollectType, rule.Name, rule.Id,
|
rule.CollectType, rule.Name, rule.Id,
|
||||||
now-rule.lastAt, ruleConfig.Step)
|
now-rule.lastAt, rule.Step)
|
||||||
|
|
||||||
summary.activeAt = now + int64(ruleConfig.Step)
|
summary.activeAt = now + int64(rule.Step)
|
||||||
rule.lastAt = now
|
rule.lastAt = now
|
||||||
heap.Push(&p.heap, summary)
|
heap.Push(&p.heap, summary)
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue