nightingale/judge/prome_pull.go

198 lines
4.6 KiB
Go
Raw Normal View History

package judge
import (
"context"
"encoding/json"
"fmt"
"strings"
"sync"
"time"
"github.com/prometheus/prometheus/promql"
"github.com/toolkits/pkg/logger"
"github.com/toolkits/pkg/str"
"github.com/didi/nightingale/v5/backend"
"github.com/didi/nightingale/v5/models"
"github.com/didi/nightingale/v5/vos"
)
const (
DEFAULT_PULL_ALERT_INTERVAL = 15
LABEL_NAME = "__name__"
)
type RuleManager struct {
targetMtx sync.Mutex
activeRules map[string]RuleEval
}
var pullRuleManager = NewRuleManager()
func NewRuleManager() *RuleManager {
return &RuleManager{
activeRules: make(map[string]RuleEval),
}
}
type RuleEval struct {
R models.AlertRule
quiteChan chan struct{}
ctx context.Context
}
func (re *RuleEval) start() {
logger.Debugf("[prome_pull_alert_start][RuleEval: %+v]", re)
go func(re *RuleEval) {
if re.R.PullExpr.EvaluationInterval <= 0 {
re.R.PullExpr.EvaluationInterval = DEFAULT_PULL_ALERT_INTERVAL
}
for {
select {
case <-re.ctx.Done():
return
case <-re.quiteChan:
return
case <-time.After(time.Duration(re.R.PullExpr.EvaluationInterval) * time.Second):
}
// 获取backend的prometheus DataSource
pb, err := backend.GetDataSourceFor("prometheus")
if err != nil {
logger.Errorf("[pull_alert][get_prome_datasource_error][err: %v]", err)
return
}
// 调prometheus instance query 查询数据
promVector := pb.QueryVector(re.R.PullExpr.PromQl)
handlePromqlVector(promVector, re.R)
}
}(re)
}
func (r *RuleEval) stop() {
logger.Debugf("[prome_pull_alert_stop][RuleEval: %+v]", r)
close(r.quiteChan)
}
func (rm *RuleManager) SyncRules(ctx context.Context, rules []models.AlertRule) {
thisNewRules := make(map[string]RuleEval)
thisAllRules := make(map[string]RuleEval)
rm.targetMtx.Lock()
for _, r := range rules {
newR := RuleEval{
R: r,
quiteChan: make(chan struct{}, 1),
ctx: ctx,
}
hash := str.MD5(fmt.Sprintf("rid_%d_%d_%d_%s",
r.Id,
r.AlertDuration,
r.PullExpr.EvaluationInterval,
r.PullExpr.PromQl,
))
thisAllRules[hash] = newR
if _, loaded := rm.activeRules[hash]; !loaded {
thisNewRules[hash] = newR
rm.activeRules[hash] = newR
}
}
// 停止旧的
for hash, t := range rm.activeRules {
if _, loaded := thisAllRules[hash]; !loaded {
t.stop()
delete(rm.activeRules, hash)
}
}
rm.targetMtx.Unlock()
// 开启新的
for _, t := range thisNewRules {
t.start()
}
}
func handlePromqlVector(pv promql.Vector, r models.AlertRule) {
toKeepKeys := map[string]struct{}{}
if len(pv) == 0 {
// 说明没触发或者没查询到删掉rule-id开头的所有event
LastEvents.DeleteOrSendRecovery(r.PullExpr.PromQl, toKeepKeys)
return
}
for _, s := range pv {
readableStr := s.Metric.String()
value := fmt.Sprintf("[vector=%s]: [value=%f]", readableStr, s.Point.V)
hashId := str.MD5(fmt.Sprintf("s_%d_%s", r.Id, readableStr))
toKeepKeys[hashId] = struct{}{}
tags := ""
tagm := make(map[string]string)
metricsName := ""
for _, l := range s.Metric {
if l.Name == LABEL_NAME {
metricsName = l.Value
continue
}
tags += fmt.Sprintf("%s=%s,", l.Name, l.Value)
tagm[l.Name] = l.Value
}
tags = strings.TrimRight(tags, ",")
// prometheus查询返回 13位时间戳
triggerTs := s.T / 1e3
//triggerTs := time.Now().Unix()
historyArr := make([]vos.HistoryPoints, 0)
hp := &vos.HPoint{
Timestamp: triggerTs,
Value: vos.JsonFloat(s.V),
}
historyArr = append(historyArr, vos.HistoryPoints{
Metric: metricsName,
Tags: tagm,
Points: []*vos.HPoint{hp},
})
bs, err := json.Marshal(historyArr)
if err != nil {
logger.Errorf("[pull_alert][historyArr_json_Marshal_error][historyArr:%+v][err: %v]", historyArr, err)
return
}
logger.Debugf("[proml.historyArr][metricsName:%v][Tags:%v]\n", metricsName, tagm)
event := &models.AlertEvent{
RuleId: r.Id,
RuleName: r.Name,
RuleNote: r.Note,
// TODO for expr 这些信息变化 hashid 也要变,
HashId: hashId,
IsPromePull: 1,
IsRecovery: 0,
Priority: r.Priority,
HistoryPoints: bs,
TriggerTime: triggerTs,
Values: value,
NotifyChannels: r.NotifyChannels,
NotifyGroups: r.NotifyGroups,
NotifyUsers: r.NotifyUsers,
RunbookUrl: r.RunbookUrl,
ReadableExpression: r.PullExpr.PromQl,
Tags: tags,
AlertDuration: int64(r.AlertDuration),
TagMap: tagm,
}
logger.Debugf("[handlePromqlVector_has_value][event:%+v]\n", event)
sendEventIfNeed([]bool{true}, event, &r)
}
LastEvents.DeleteOrSendRecovery(r.PullExpr.PromQl, toKeepKeys)
}