198 lines
4.6 KiB
Go
198 lines
4.6 KiB
Go
package judge
|
||
|
||
import (
|
||
"context"
|
||
"encoding/json"
|
||
"fmt"
|
||
"strings"
|
||
"sync"
|
||
"time"
|
||
|
||
"github.com/prometheus/prometheus/promql"
|
||
"github.com/toolkits/pkg/logger"
|
||
"github.com/toolkits/pkg/str"
|
||
|
||
"github.com/didi/nightingale/v5/backend"
|
||
"github.com/didi/nightingale/v5/models"
|
||
"github.com/didi/nightingale/v5/vos"
|
||
)
|
||
|
||
const (
|
||
DEFAULT_PULL_ALERT_INTERVAL = 15
|
||
LABEL_NAME = "__name__"
|
||
)
|
||
|
||
type RuleManager struct {
|
||
targetMtx sync.Mutex
|
||
activeRules map[string]RuleEval
|
||
}
|
||
|
||
var pullRuleManager = NewRuleManager()
|
||
|
||
func NewRuleManager() *RuleManager {
|
||
return &RuleManager{
|
||
activeRules: make(map[string]RuleEval),
|
||
}
|
||
}
|
||
|
||
type RuleEval struct {
|
||
R models.AlertRule
|
||
quiteChan chan struct{}
|
||
ctx context.Context
|
||
}
|
||
|
||
func (re *RuleEval) start() {
|
||
logger.Debugf("[prome_pull_alert_start][RuleEval: %+v]", re)
|
||
go func(re *RuleEval) {
|
||
if re.R.PullExpr.EvaluationInterval <= 0 {
|
||
re.R.PullExpr.EvaluationInterval = DEFAULT_PULL_ALERT_INTERVAL
|
||
}
|
||
for {
|
||
select {
|
||
case <-re.ctx.Done():
|
||
return
|
||
case <-re.quiteChan:
|
||
return
|
||
case <-time.After(time.Duration(re.R.PullExpr.EvaluationInterval) * time.Second):
|
||
}
|
||
|
||
// 获取backend的prometheus DataSource
|
||
pb, err := backend.GetDataSourceFor("prometheus")
|
||
if err != nil {
|
||
logger.Errorf("[pull_alert][get_prome_datasource_error][err: %v]", err)
|
||
return
|
||
}
|
||
|
||
// 调prometheus instance query 查询数据
|
||
promVector := pb.QueryVector(re.R.PullExpr.PromQl)
|
||
|
||
handlePromqlVector(promVector, re.R)
|
||
}
|
||
|
||
}(re)
|
||
}
|
||
|
||
func (r *RuleEval) stop() {
|
||
logger.Debugf("[prome_pull_alert_stop][RuleEval: %+v]", r)
|
||
close(r.quiteChan)
|
||
}
|
||
|
||
func (rm *RuleManager) SyncRules(ctx context.Context, rules []models.AlertRule) {
|
||
|
||
thisNewRules := make(map[string]RuleEval)
|
||
thisAllRules := make(map[string]RuleEval)
|
||
|
||
rm.targetMtx.Lock()
|
||
for _, r := range rules {
|
||
newR := RuleEval{
|
||
R: r,
|
||
quiteChan: make(chan struct{}, 1),
|
||
ctx: ctx,
|
||
}
|
||
hash := str.MD5(fmt.Sprintf("rid_%d_%d_%d_%s",
|
||
r.Id,
|
||
r.AlertDuration,
|
||
r.PullExpr.EvaluationInterval,
|
||
r.PullExpr.PromQl,
|
||
))
|
||
thisAllRules[hash] = newR
|
||
if _, loaded := rm.activeRules[hash]; !loaded {
|
||
thisNewRules[hash] = newR
|
||
rm.activeRules[hash] = newR
|
||
}
|
||
}
|
||
|
||
// 停止旧的
|
||
for hash, t := range rm.activeRules {
|
||
if _, loaded := thisAllRules[hash]; !loaded {
|
||
t.stop()
|
||
delete(rm.activeRules, hash)
|
||
}
|
||
}
|
||
rm.targetMtx.Unlock()
|
||
|
||
// 开启新的
|
||
for _, t := range thisNewRules {
|
||
t.start()
|
||
}
|
||
}
|
||
|
||
func handlePromqlVector(pv promql.Vector, r models.AlertRule) {
|
||
toKeepKeys := map[string]struct{}{}
|
||
if len(pv) == 0 {
|
||
// 说明没触发,或者没查询到,删掉rule-id开头的所有event
|
||
LastEvents.DeleteOrSendRecovery(r.PullExpr.PromQl, toKeepKeys)
|
||
|
||
return
|
||
}
|
||
|
||
for _, s := range pv {
|
||
readableStr := s.Metric.String()
|
||
|
||
value := fmt.Sprintf("[vector=%s]: [value=%f]", readableStr, s.Point.V)
|
||
hashId := str.MD5(fmt.Sprintf("s_%d_%s", r.Id, readableStr))
|
||
toKeepKeys[hashId] = struct{}{}
|
||
tags := ""
|
||
tagm := make(map[string]string)
|
||
metricsName := ""
|
||
for _, l := range s.Metric {
|
||
if l.Name == LABEL_NAME {
|
||
metricsName = l.Value
|
||
continue
|
||
}
|
||
tags += fmt.Sprintf("%s=%s,", l.Name, l.Value)
|
||
tagm[l.Name] = l.Value
|
||
|
||
}
|
||
|
||
tags = strings.TrimRight(tags, ",")
|
||
// prometheus查询返回 13位时间戳
|
||
triggerTs := s.T / 1e3
|
||
//triggerTs := time.Now().Unix()
|
||
historyArr := make([]vos.HistoryPoints, 0)
|
||
|
||
hp := &vos.HPoint{
|
||
Timestamp: triggerTs,
|
||
Value: vos.JsonFloat(s.V),
|
||
}
|
||
historyArr = append(historyArr, vos.HistoryPoints{
|
||
Metric: metricsName,
|
||
Tags: tagm,
|
||
Points: []*vos.HPoint{hp},
|
||
})
|
||
bs, err := json.Marshal(historyArr)
|
||
if err != nil {
|
||
logger.Errorf("[pull_alert][historyArr_json_Marshal_error][historyArr:%+v][err: %v]", historyArr, err)
|
||
return
|
||
}
|
||
logger.Debugf("[proml.historyArr][metricsName:%v][Tags:%v]\n", metricsName, tagm)
|
||
|
||
event := &models.AlertEvent{
|
||
RuleId: r.Id,
|
||
RuleName: r.Name,
|
||
RuleNote: r.Note,
|
||
// TODO for expr 这些信息变化 hashid 也要变,
|
||
HashId: hashId,
|
||
IsPromePull: 1,
|
||
IsRecovery: 0,
|
||
Priority: r.Priority,
|
||
HistoryPoints: bs,
|
||
TriggerTime: triggerTs,
|
||
Values: value,
|
||
NotifyChannels: r.NotifyChannels,
|
||
NotifyGroups: r.NotifyGroups,
|
||
NotifyUsers: r.NotifyUsers,
|
||
RunbookUrl: r.RunbookUrl,
|
||
ReadableExpression: r.PullExpr.PromQl,
|
||
Tags: tags,
|
||
AlertDuration: int64(r.AlertDuration),
|
||
TagMap: tagm,
|
||
}
|
||
|
||
logger.Debugf("[handlePromqlVector_has_value][event:%+v]\n", event)
|
||
sendEventIfNeed([]bool{true}, event, &r)
|
||
}
|
||
LastEvents.DeleteOrSendRecovery(r.PullExpr.PromQl, toKeepKeys)
|
||
|
||
}
|