diff --git a/src/models/alert_rule.go b/src/models/alert_rule.go index bb5f336a..d4978403 100644 --- a/src/models/alert_rule.go +++ b/src/models/alert_rule.go @@ -336,7 +336,7 @@ func AlertRuleGetsByCluster(cluster string) ([]*AlertRule, error) { return lr, err } -func AlertRulesGetsBy(prods []string, query string) ([]*AlertRule, error) { +func AlertRulesGetsBy(prods []string, query, algorithm string) ([]*AlertRule, error) { session := DB().Where("prod in (?)", prods) if query != "" { @@ -347,6 +347,10 @@ func AlertRulesGetsBy(prods []string, query string) ([]*AlertRule, error) { } } + if algorithm != "" { + session = session.Where("algorithm = ?", algorithm) + } + var lst []*AlertRule err := session.Find(&lst).Error if err == nil { diff --git a/src/server/engine/worker.go b/src/server/engine/worker.go index a9c1b071..915b4a23 100644 --- a/src/server/engine/worker.go +++ b/src/server/engine/worker.go @@ -3,15 +3,14 @@ package engine import ( "context" "fmt" - "math/rand" "sort" "strings" + "sync" "time" "github.com/didi/nightingale/v5/src/server/writer" "github.com/prometheus/common/model" "github.com/toolkits/pkg/logger" - "github.com/toolkits/pkg/net/httplib" "github.com/toolkits/pkg/str" "github.com/didi/nightingale/v5/src/models" @@ -60,25 +59,88 @@ func filterRules() { } Workers.Build(mines) + RuleEvalForExternal.Build() } type RuleEval struct { rule *models.AlertRule - fires map[string]*models.AlertCurEvent - pendings map[string]*models.AlertCurEvent + fires *AlertCurEventMap + pendings *AlertCurEventMap quit chan struct{} } -func (r RuleEval) Stop() { +type AlertCurEventMap struct { + sync.RWMutex + Data map[string]*models.AlertCurEvent +} + +func (a *AlertCurEventMap) SetAll(data map[string]*models.AlertCurEvent) { + a.Lock() + defer a.Unlock() + a.Data = data +} + +func (a *AlertCurEventMap) Set(key string, value *models.AlertCurEvent) { + a.Lock() + defer a.Unlock() + a.Data[key] = value +} + +func (a *AlertCurEventMap) Get(key string) (*models.AlertCurEvent, bool) { + a.RLock() + defer a.RUnlock() + event, exists := a.Data[key] + return event, exists +} + +func (a *AlertCurEventMap) UpdateLastEvalTime(key string, lastEvalTime int64) { + a.Lock() + defer a.Unlock() + event, exists := a.Data[key] + if !exists { + return + } + event.LastEvalTime = lastEvalTime +} + +func (a *AlertCurEventMap) Delete(key string) { + a.Lock() + defer a.Unlock() + delete(a.Data, key) +} + +func (a *AlertCurEventMap) Keys() []string { + a.RLock() + defer a.RUnlock() + keys := make([]string, 0, len(a.Data)) + for k := range a.Data { + keys = append(keys, k) + } + return keys +} + +func (a *AlertCurEventMap) GetAll() map[string]*models.AlertCurEvent { + a.RLock() + defer a.RUnlock() + return a.Data +} + +func NewAlertCurEventMap() *AlertCurEventMap { + return &AlertCurEventMap{ + Data: make(map[string]*models.AlertCurEvent), + } +} + +func (r *RuleEval) Stop() { logger.Infof("rule_eval:%d stopping", r.RuleID()) close(r.quit) } -func (r RuleEval) RuleID() int64 { +func (r *RuleEval) RuleID() int64 { return r.rule.Id } -func (r RuleEval) Start() { +func (r *RuleEval) Start() { logger.Infof("rule_eval:%d started", r.RuleID()) for { select { @@ -97,12 +159,7 @@ func (r RuleEval) Start() { } } -type AnomalyPoint struct { - Data model.Matrix `json:"data"` - Err string `json:"error"` -} - -func (r RuleEval) Work() { +func (r *RuleEval) Work() { promql := strings.TrimSpace(r.rule.PromQl) if promql == "" { logger.Errorf("rule_eval:%d promql is blank", r.RuleID()) @@ -126,34 +183,17 @@ func (r RuleEval) Work() { return } logger.Debugf("rule_eval:%d promql:%s, value:%v", r.RuleID(), promql, value) - } else { - var res AnomalyPoint - count := len(config.C.AnomalyDataApi) - for _, i := range rand.Perm(count) { - url := fmt.Sprintf("%s?rid=%d", config.C.AnomalyDataApi[i], r.rule.Id) - err = httplib.Get(url).SetTimeout(time.Duration(3000) * time.Millisecond).ToJSON(&res) - if err != nil { - logger.Errorf("curl %s fail: %v", url, err) - continue - } - if res.Err != "" { - logger.Errorf("curl %s fail: %s", url, res.Err) - continue - } - value = res.Data - logger.Debugf("curl %s get: %+v", url, res.Data) - } } - r.judge(conv.ConvertVectors(value)) + r.Judge(conv.ConvertVectors(value)) } type WorkersType struct { - rules map[string]RuleEval + rules map[string]*RuleEval recordRules map[string]RecordingRuleEval } -var Workers = &WorkersType{rules: make(map[string]RuleEval), recordRules: make(map[string]RecordingRuleEval)} +var Workers = &WorkersType{rules: make(map[string]*RuleEval), recordRules: make(map[string]RecordingRuleEval)} func (ws *WorkersType) Build(rids []int64) { rules := make(map[string]*models.AlertRule) @@ -199,12 +239,13 @@ func (ws *WorkersType) Build(rids []int64) { elst[i].DB2Mem() firemap[elst[i].Hash] = elst[i] } - - re := RuleEval{ + fires := NewAlertCurEventMap() + fires.SetAll(firemap) + re := &RuleEval{ rule: rules[hash], quit: make(chan struct{}), - fires: firemap, - pendings: make(map[string]*models.AlertCurEvent), + fires: fires, + pendings: NewAlertCurEventMap(), } go re.Start() @@ -259,20 +300,31 @@ func (ws *WorkersType) BuildRe(rids []int64) { } } -func (r RuleEval) judge(vectors []conv.Vector) { +func (r *RuleEval) Judge(vectors []conv.Vector) { + now := time.Now().Unix() + + alertingKeys, ruleExists := r.MakeNewEvent("inner", now, vectors) + if !ruleExists { + return + } + + // handle recovered events + r.recoverRule(alertingKeys, now) +} + +func (r *RuleEval) MakeNewEvent(from string, now int64, vectors []conv.Vector) (map[string]struct{}, bool) { // 有可能rule的一些配置已经发生变化,比如告警接收人、callbacks等 // 这些信息的修改是不会引起worker restart的,但是确实会影响告警处理逻辑 // 所以,这里直接从memsto.AlertRuleCache中获取并覆盖 curRule := memsto.AlertRuleCache.Get(r.rule.Id) if curRule == nil { - return + return map[string]struct{}{}, false } r.rule = curRule count := len(vectors) alertingKeys := make(map[string]struct{}) - now := time.Now().Unix() for i := 0; i < count; i++ { // compute hash hash := str.MD5(fmt.Sprintf("%d_%s", r.rule.Id, vectors[i].Key)) @@ -364,12 +416,15 @@ func (r RuleEval) judge(vectors []conv.Vector) { event.Tags = strings.Join(tagsArr, ",,") event.IsRecovered = false event.LastEvalTime = now + if from != "inner" { + event.LastEvalTime = event.TriggerTime + } r.handleNewEvent(event) + } - // handle recovered events - r.recoverRule(alertingKeys, now) + return alertingKeys, true } func readableValue(value float64) string { @@ -393,26 +448,30 @@ func labelMapToArr(m map[string]string) []string { return labelStrings } -func (r RuleEval) handleNewEvent(event *models.AlertCurEvent) { +func (r *RuleEval) handleNewEvent(event *models.AlertCurEvent) { if event.PromForDuration == 0 { r.fireEvent(event) return } - _, has := r.pendings[event.Hash] + var preTriggerTime int64 + preEvent, has := r.pendings.Get(event.Hash) if has { - r.pendings[event.Hash].LastEvalTime = event.LastEvalTime + r.pendings.UpdateLastEvalTime(event.Hash, event.LastEvalTime) + preTriggerTime = preEvent.TriggerTime } else { - r.pendings[event.Hash] = event + r.pendings.Set(event.Hash, event) + preTriggerTime = event.TriggerTime } - if r.pendings[event.Hash].LastEvalTime-r.pendings[event.Hash].TriggerTime+int64(event.PromEvalInterval) >= int64(event.PromForDuration) { + + if event.LastEvalTime-preTriggerTime+int64(event.PromEvalInterval) >= int64(event.PromForDuration) { r.fireEvent(event) } } -func (r RuleEval) fireEvent(event *models.AlertCurEvent) { - if fired, has := r.fires[event.Hash]; has { - r.fires[event.Hash].LastEvalTime = event.LastEvalTime +func (r *RuleEval) fireEvent(event *models.AlertCurEvent) { + if fired, has := r.fires.Get(event.Hash); has { + r.fires.UpdateLastEvalTime(event.Hash, event.LastEvalTime) if r.rule.NotifyRepeatStep == 0 { // 说明不想重复通知,那就直接返回了,nothing to do @@ -445,58 +504,69 @@ func (r RuleEval) fireEvent(event *models.AlertCurEvent) { } } -func (r RuleEval) recoverRule(alertingKeys map[string]struct{}, now int64) { - for hash := range r.pendings { +func (r *RuleEval) recoverRule(alertingKeys map[string]struct{}, now int64) { + for _, hash := range r.pendings.Keys() { if _, has := alertingKeys[hash]; has { continue } - - delete(r.pendings, hash) + r.pendings.Delete(hash) } - for hash, event := range r.fires { + for hash, event := range r.fires.GetAll() { if _, has := alertingKeys[hash]; has { continue } - // 如果配置了留观时长,就不能立马恢复了 - if r.rule.RecoverDuration > 0 && now-event.LastEvalTime < r.rule.RecoverDuration { - continue - } - - // 没查到触发阈值的vector,姑且就认为这个vector的值恢复了 - // 我确实无法分辨,是prom中有值但是未满足阈值所以没返回,还是prom中确实丢了一些点导致没有数据可以返回,尴尬 - delete(r.fires, hash) - delete(r.pendings, hash) - - event.IsRecovered = true - event.LastEvalTime = now - // 可能是因为调整了promql才恢复的,所以事件里边要体现最新的promql,否则用户会比较困惑 - // 当然,其实rule的各个字段都可能发生变化了,都更新一下吧 - event.RuleName = r.rule.Name - event.RuleNote = r.rule.Note - event.RuleProd = r.rule.Prod - event.RuleAlgo = r.rule.Algorithm - event.Severity = r.rule.Severity - event.PromForDuration = r.rule.PromForDuration - event.PromQl = r.rule.PromQl - event.PromEvalInterval = r.rule.PromEvalInterval - event.Callbacks = r.rule.Callbacks - event.CallbacksJSON = r.rule.CallbacksJSON - event.RunbookUrl = r.rule.RunbookUrl - event.NotifyRecovered = r.rule.NotifyRecovered - event.NotifyChannels = r.rule.NotifyChannels - event.NotifyChannelsJSON = r.rule.NotifyChannelsJSON - event.NotifyGroups = r.rule.NotifyGroups - event.NotifyGroupsJSON = r.rule.NotifyGroupsJSON - r.pushEventToQueue(event) + r.recoverEvent(hash, event, now) } } -func (r RuleEval) pushEventToQueue(event *models.AlertCurEvent) { +func (r *RuleEval) RecoverEvent(hash string, now int64) { + event, has := r.fires.Get(hash) + if !has { + return + } + r.recoverEvent(hash, event, time.Now().Unix()) +} + +func (r *RuleEval) recoverEvent(hash string, event *models.AlertCurEvent, now int64) { + // 如果配置了留观时长,就不能立马恢复了 + if r.rule.RecoverDuration > 0 && now-event.LastEvalTime < r.rule.RecoverDuration { + return + } + + // 没查到触发阈值的vector,姑且就认为这个vector的值恢复了 + // 我确实无法分辨,是prom中有值但是未满足阈值所以没返回,还是prom中确实丢了一些点导致没有数据可以返回,尴尬 + r.fires.Delete(hash) + r.pendings.Delete(hash) + + event.IsRecovered = true + event.LastEvalTime = now + // 可能是因为调整了promql才恢复的,所以事件里边要体现最新的promql,否则用户会比较困惑 + // 当然,其实rule的各个字段都可能发生变化了,都更新一下吧 + event.RuleName = r.rule.Name + event.RuleNote = r.rule.Note + event.RuleProd = r.rule.Prod + event.RuleAlgo = r.rule.Algorithm + event.Severity = r.rule.Severity + event.PromForDuration = r.rule.PromForDuration + event.PromQl = r.rule.PromQl + event.PromEvalInterval = r.rule.PromEvalInterval + event.Callbacks = r.rule.Callbacks + event.CallbacksJSON = r.rule.CallbacksJSON + event.RunbookUrl = r.rule.RunbookUrl + event.NotifyRecovered = r.rule.NotifyRecovered + event.NotifyChannels = r.rule.NotifyChannels + event.NotifyChannelsJSON = r.rule.NotifyChannelsJSON + event.NotifyGroups = r.rule.NotifyGroups + event.NotifyGroupsJSON = r.rule.NotifyGroupsJSON + r.pushEventToQueue(event) +} + +func (r *RuleEval) pushEventToQueue(event *models.AlertCurEvent) { if !event.IsRecovered { event.LastSentTime = event.LastEvalTime - r.fires[event.Hash] = event + r.fires.Set(event.Hash, event) } promstat.CounterAlertsTotal.WithLabelValues(config.C.ClusterName).Inc() @@ -505,6 +575,7 @@ func (r RuleEval) pushEventToQueue(event *models.AlertCurEvent) { logger.Warningf("event_push_queue: queue is full") } } + func filterRecordingRules() { ids := memsto.RecordingRuleCache.GetRuleIds() @@ -582,3 +653,82 @@ func (r RecordingRuleEval) Work() { } } } + +type RuleEvalForExternalType struct { + sync.RWMutex + rules map[int64]RuleEval +} + +var RuleEvalForExternal = RuleEvalForExternalType{rules: make(map[int64]RuleEval)} + +func (re *RuleEvalForExternalType) Build() { + rids := memsto.AlertRuleCache.GetRuleIds() + rules := make(map[int64]*models.AlertRule) + + for i := 0; i < len(rids); i++ { + rule := memsto.AlertRuleCache.Get(rids[i]) + if rule == nil { + continue + } + + re.Lock() + rules[rule.Id] = rule + re.Unlock() + } + + // stop old + for rid := range re.rules { + if _, has := rules[rid]; !has { + re.Lock() + delete(re.rules, rid) + re.Unlock() + } + } + + // start new + re.Lock() + defer re.Unlock() + for rid := range rules { + if _, has := re.rules[rid]; has { + // already exists + continue + } + + elst, err := models.AlertCurEventGetByRule(rules[rid].Id) + if err != nil { + logger.Errorf("worker_build: AlertCurEventGetByRule failed: %v", err) + continue + } + + firemap := make(map[string]*models.AlertCurEvent) + for i := 0; i < len(elst); i++ { + elst[i].DB2Mem() + firemap[elst[i].Hash] = elst[i] + } + fires := NewAlertCurEventMap() + fires.SetAll(firemap) + newRe := RuleEval{ + rule: rules[rid], + quit: make(chan struct{}), + fires: fires, + pendings: NewAlertCurEventMap(), + } + + re.rules[rid] = newRe + } +} + +func (re *RuleEvalForExternalType) Get(rid int64) (RuleEval, bool) { + rule := memsto.AlertRuleCache.Get(rid) + if rule == nil { + return RuleEval{}, false + } + + re.RLock() + defer re.RUnlock() + if ret, has := re.rules[rid]; has { + // already exists + return ret, has + } + return RuleEval{}, false +} diff --git a/src/server/router/router.go b/src/server/router/router.go index 415a06db..8f9ea7d7 100644 --- a/src/server/router/router.go +++ b/src/server/router/router.go @@ -103,6 +103,8 @@ func configRoute(r *gin.Engine, version string, reloadFunc func()) { service := r.Group("/v1/n9e") service.POST("/event", pushEventToQueue) + service.POST("/make-event", makeEvent) + service.POST("/judge-event", judgeEvent) } func stat() gin.HandlerFunc { diff --git a/src/server/router/router_event.go b/src/server/router/router_event.go index b3c5c168..4a0c49e7 100644 --- a/src/server/router/router_event.go +++ b/src/server/router/router_event.go @@ -3,8 +3,10 @@ package router import ( "fmt" "strings" + "time" "github.com/didi/nightingale/v5/src/models" + "github.com/didi/nightingale/v5/src/server/common/conv" "github.com/didi/nightingale/v5/src/server/config" "github.com/didi/nightingale/v5/src/server/engine" promstat "github.com/didi/nightingale/v5/src/server/stat" @@ -12,6 +14,7 @@ import ( "github.com/gin-gonic/gin" "github.com/toolkits/pkg/ginx" "github.com/toolkits/pkg/logger" + "github.com/toolkits/pkg/str" ) func pushEventToQueue(c *gin.Context) { @@ -69,3 +72,43 @@ func pushEventToQueue(c *gin.Context) { } ginx.NewRender(c).Message(nil) } + +type eventForm struct { + Alert bool `json:"alert"` + Vectors []conv.Vector `json:"vectors"` + RuleId int64 `json:"rule_id"` +} + +func judgeEvent(c *gin.Context) { + var form eventForm + ginx.BindJSON(c, &form) + re, exists := engine.RuleEvalForExternal.Get(form.RuleId) + if !exists { + ginx.Bomb(200, "rule not exists") + } + re.Judge(form.Vectors) + ginx.NewRender(c).Message(nil) +} + +func makeEvent(c *gin.Context) { + var events []*eventForm + ginx.BindJSON(c, &events) + now := time.Now().Unix() + for i := 0; i < len(events); i++ { + re, exists := engine.RuleEvalForExternal.Get(events[i].RuleId) + logger.Debugf("handle event:%+v exists:%v", events[i], exists) + if !exists { + ginx.Bomb(200, "rule not exists") + } + + if events[i].Alert { + go re.MakeNewEvent("http", now, events[i].Vectors) + } else { + for _, vector := range events[i].Vectors { + hash := str.MD5(fmt.Sprintf("%d_%s", events[i].RuleId, vector.Key)) + go re.RecoverEvent(hash, now) + } + } + } + ginx.NewRender(c).Message(nil) +} diff --git a/src/webapi/router/router_alert_rule.go b/src/webapi/router/router_alert_rule.go index aa882db1..244b448f 100644 --- a/src/webapi/router/router_alert_rule.go +++ b/src/webapi/router/router_alert_rule.go @@ -26,10 +26,10 @@ func alertRuleGets(c *gin.Context) { } func alertRulesGetByService(c *gin.Context) { - prods := strings.Fields(ginx.QueryStr(c, "prods", "")) + prods := strings.Split(ginx.QueryStr(c, "prods", ""), ",") query := ginx.QueryStr(c, "query", "") - - ars, err := models.AlertRulesGetsBy(prods, query) + algorithm := ginx.QueryStr(c, "algorithm", "") + ars, err := models.AlertRulesGetsBy(prods, query, algorithm) if err == nil { cache := make(map[int64]*models.UserGroup) for i := 0; i < len(ars); i++ {