feat: support handle event api (#1113)

* support handle event service api
This commit is contained in:
Yening Qin 2022-08-17 11:22:49 +08:00 committed by GitHub
parent a1c458b764
commit b92e4abf86
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 291 additions and 92 deletions

View File

@ -336,7 +336,7 @@ func AlertRuleGetsByCluster(cluster string) ([]*AlertRule, error) {
return lr, err
}
func AlertRulesGetsBy(prods []string, query string) ([]*AlertRule, error) {
func AlertRulesGetsBy(prods []string, query, algorithm string) ([]*AlertRule, error) {
session := DB().Where("prod in (?)", prods)
if query != "" {
@ -347,6 +347,10 @@ func AlertRulesGetsBy(prods []string, query string) ([]*AlertRule, error) {
}
}
if algorithm != "" {
session = session.Where("algorithm = ?", algorithm)
}
var lst []*AlertRule
err := session.Find(&lst).Error
if err == nil {

View File

@ -3,15 +3,14 @@ package engine
import (
"context"
"fmt"
"math/rand"
"sort"
"strings"
"sync"
"time"
"github.com/didi/nightingale/v5/src/server/writer"
"github.com/prometheus/common/model"
"github.com/toolkits/pkg/logger"
"github.com/toolkits/pkg/net/httplib"
"github.com/toolkits/pkg/str"
"github.com/didi/nightingale/v5/src/models"
@ -60,25 +59,88 @@ func filterRules() {
}
Workers.Build(mines)
RuleEvalForExternal.Build()
}
type RuleEval struct {
rule *models.AlertRule
fires map[string]*models.AlertCurEvent
pendings map[string]*models.AlertCurEvent
fires *AlertCurEventMap
pendings *AlertCurEventMap
quit chan struct{}
}
func (r RuleEval) Stop() {
type AlertCurEventMap struct {
sync.RWMutex
Data map[string]*models.AlertCurEvent
}
func (a *AlertCurEventMap) SetAll(data map[string]*models.AlertCurEvent) {
a.Lock()
defer a.Unlock()
a.Data = data
}
func (a *AlertCurEventMap) Set(key string, value *models.AlertCurEvent) {
a.Lock()
defer a.Unlock()
a.Data[key] = value
}
func (a *AlertCurEventMap) Get(key string) (*models.AlertCurEvent, bool) {
a.RLock()
defer a.RUnlock()
event, exists := a.Data[key]
return event, exists
}
func (a *AlertCurEventMap) UpdateLastEvalTime(key string, lastEvalTime int64) {
a.Lock()
defer a.Unlock()
event, exists := a.Data[key]
if !exists {
return
}
event.LastEvalTime = lastEvalTime
}
func (a *AlertCurEventMap) Delete(key string) {
a.Lock()
defer a.Unlock()
delete(a.Data, key)
}
func (a *AlertCurEventMap) Keys() []string {
a.RLock()
defer a.RUnlock()
keys := make([]string, 0, len(a.Data))
for k := range a.Data {
keys = append(keys, k)
}
return keys
}
func (a *AlertCurEventMap) GetAll() map[string]*models.AlertCurEvent {
a.RLock()
defer a.RUnlock()
return a.Data
}
func NewAlertCurEventMap() *AlertCurEventMap {
return &AlertCurEventMap{
Data: make(map[string]*models.AlertCurEvent),
}
}
func (r *RuleEval) Stop() {
logger.Infof("rule_eval:%d stopping", r.RuleID())
close(r.quit)
}
func (r RuleEval) RuleID() int64 {
func (r *RuleEval) RuleID() int64 {
return r.rule.Id
}
func (r RuleEval) Start() {
func (r *RuleEval) Start() {
logger.Infof("rule_eval:%d started", r.RuleID())
for {
select {
@ -97,12 +159,7 @@ func (r RuleEval) Start() {
}
}
type AnomalyPoint struct {
Data model.Matrix `json:"data"`
Err string `json:"error"`
}
func (r RuleEval) Work() {
func (r *RuleEval) Work() {
promql := strings.TrimSpace(r.rule.PromQl)
if promql == "" {
logger.Errorf("rule_eval:%d promql is blank", r.RuleID())
@ -126,34 +183,17 @@ func (r RuleEval) Work() {
return
}
logger.Debugf("rule_eval:%d promql:%s, value:%v", r.RuleID(), promql, value)
} else {
var res AnomalyPoint
count := len(config.C.AnomalyDataApi)
for _, i := range rand.Perm(count) {
url := fmt.Sprintf("%s?rid=%d", config.C.AnomalyDataApi[i], r.rule.Id)
err = httplib.Get(url).SetTimeout(time.Duration(3000) * time.Millisecond).ToJSON(&res)
if err != nil {
logger.Errorf("curl %s fail: %v", url, err)
continue
}
if res.Err != "" {
logger.Errorf("curl %s fail: %s", url, res.Err)
continue
}
value = res.Data
logger.Debugf("curl %s get: %+v", url, res.Data)
}
}
r.judge(conv.ConvertVectors(value))
r.Judge(conv.ConvertVectors(value))
}
type WorkersType struct {
rules map[string]RuleEval
rules map[string]*RuleEval
recordRules map[string]RecordingRuleEval
}
var Workers = &WorkersType{rules: make(map[string]RuleEval), recordRules: make(map[string]RecordingRuleEval)}
var Workers = &WorkersType{rules: make(map[string]*RuleEval), recordRules: make(map[string]RecordingRuleEval)}
func (ws *WorkersType) Build(rids []int64) {
rules := make(map[string]*models.AlertRule)
@ -199,12 +239,13 @@ func (ws *WorkersType) Build(rids []int64) {
elst[i].DB2Mem()
firemap[elst[i].Hash] = elst[i]
}
re := RuleEval{
fires := NewAlertCurEventMap()
fires.SetAll(firemap)
re := &RuleEval{
rule: rules[hash],
quit: make(chan struct{}),
fires: firemap,
pendings: make(map[string]*models.AlertCurEvent),
fires: fires,
pendings: NewAlertCurEventMap(),
}
go re.Start()
@ -259,20 +300,31 @@ func (ws *WorkersType) BuildRe(rids []int64) {
}
}
func (r RuleEval) judge(vectors []conv.Vector) {
func (r *RuleEval) Judge(vectors []conv.Vector) {
now := time.Now().Unix()
alertingKeys, ruleExists := r.MakeNewEvent("inner", now, vectors)
if !ruleExists {
return
}
// handle recovered events
r.recoverRule(alertingKeys, now)
}
func (r *RuleEval) MakeNewEvent(from string, now int64, vectors []conv.Vector) (map[string]struct{}, bool) {
// 有可能rule的一些配置已经发生变化比如告警接收人、callbacks等
// 这些信息的修改是不会引起worker restart的但是确实会影响告警处理逻辑
// 所以这里直接从memsto.AlertRuleCache中获取并覆盖
curRule := memsto.AlertRuleCache.Get(r.rule.Id)
if curRule == nil {
return
return map[string]struct{}{}, false
}
r.rule = curRule
count := len(vectors)
alertingKeys := make(map[string]struct{})
now := time.Now().Unix()
for i := 0; i < count; i++ {
// compute hash
hash := str.MD5(fmt.Sprintf("%d_%s", r.rule.Id, vectors[i].Key))
@ -364,12 +416,15 @@ func (r RuleEval) judge(vectors []conv.Vector) {
event.Tags = strings.Join(tagsArr, ",,")
event.IsRecovered = false
event.LastEvalTime = now
r.handleNewEvent(event)
if from != "inner" {
event.LastEvalTime = event.TriggerTime
}
// handle recovered events
r.recoverRule(alertingKeys, now)
r.handleNewEvent(event)
}
return alertingKeys, true
}
func readableValue(value float64) string {
@ -393,26 +448,30 @@ func labelMapToArr(m map[string]string) []string {
return labelStrings
}
func (r RuleEval) handleNewEvent(event *models.AlertCurEvent) {
func (r *RuleEval) handleNewEvent(event *models.AlertCurEvent) {
if event.PromForDuration == 0 {
r.fireEvent(event)
return
}
_, has := r.pendings[event.Hash]
var preTriggerTime int64
preEvent, has := r.pendings.Get(event.Hash)
if has {
r.pendings[event.Hash].LastEvalTime = event.LastEvalTime
r.pendings.UpdateLastEvalTime(event.Hash, event.LastEvalTime)
preTriggerTime = preEvent.TriggerTime
} else {
r.pendings[event.Hash] = event
r.pendings.Set(event.Hash, event)
preTriggerTime = event.TriggerTime
}
if r.pendings[event.Hash].LastEvalTime-r.pendings[event.Hash].TriggerTime+int64(event.PromEvalInterval) >= int64(event.PromForDuration) {
if event.LastEvalTime-preTriggerTime+int64(event.PromEvalInterval) >= int64(event.PromForDuration) {
r.fireEvent(event)
}
}
func (r RuleEval) fireEvent(event *models.AlertCurEvent) {
if fired, has := r.fires[event.Hash]; has {
r.fires[event.Hash].LastEvalTime = event.LastEvalTime
func (r *RuleEval) fireEvent(event *models.AlertCurEvent) {
if fired, has := r.fires.Get(event.Hash); has {
r.fires.UpdateLastEvalTime(event.Hash, event.LastEvalTime)
if r.rule.NotifyRepeatStep == 0 {
// 说明不想重复通知那就直接返回了nothing to do
@ -445,29 +504,41 @@ func (r RuleEval) fireEvent(event *models.AlertCurEvent) {
}
}
func (r RuleEval) recoverRule(alertingKeys map[string]struct{}, now int64) {
for hash := range r.pendings {
func (r *RuleEval) recoverRule(alertingKeys map[string]struct{}, now int64) {
for _, hash := range r.pendings.Keys() {
if _, has := alertingKeys[hash]; has {
continue
}
r.pendings.Delete(hash)
}
for hash, event := range r.fires.GetAll() {
if _, has := alertingKeys[hash]; has {
continue
}
delete(r.pendings, hash)
r.recoverEvent(hash, event, now)
}
}
for hash, event := range r.fires {
if _, has := alertingKeys[hash]; has {
continue
func (r *RuleEval) RecoverEvent(hash string, now int64) {
event, has := r.fires.Get(hash)
if !has {
return
}
r.recoverEvent(hash, event, time.Now().Unix())
}
func (r *RuleEval) recoverEvent(hash string, event *models.AlertCurEvent, now int64) {
// 如果配置了留观时长,就不能立马恢复了
if r.rule.RecoverDuration > 0 && now-event.LastEvalTime < r.rule.RecoverDuration {
continue
return
}
// 没查到触发阈值的vector姑且就认为这个vector的值恢复了
// 我确实无法分辨是prom中有值但是未满足阈值所以没返回还是prom中确实丢了一些点导致没有数据可以返回尴尬
delete(r.fires, hash)
delete(r.pendings, hash)
r.fires.Delete(hash)
r.pendings.Delete(hash)
event.IsRecovered = true
event.LastEvalTime = now
@ -490,13 +561,12 @@ func (r RuleEval) recoverRule(alertingKeys map[string]struct{}, now int64) {
event.NotifyGroups = r.rule.NotifyGroups
event.NotifyGroupsJSON = r.rule.NotifyGroupsJSON
r.pushEventToQueue(event)
}
}
func (r RuleEval) pushEventToQueue(event *models.AlertCurEvent) {
func (r *RuleEval) pushEventToQueue(event *models.AlertCurEvent) {
if !event.IsRecovered {
event.LastSentTime = event.LastEvalTime
r.fires[event.Hash] = event
r.fires.Set(event.Hash, event)
}
promstat.CounterAlertsTotal.WithLabelValues(config.C.ClusterName).Inc()
@ -505,6 +575,7 @@ func (r RuleEval) pushEventToQueue(event *models.AlertCurEvent) {
logger.Warningf("event_push_queue: queue is full")
}
}
func filterRecordingRules() {
ids := memsto.RecordingRuleCache.GetRuleIds()
@ -582,3 +653,82 @@ func (r RecordingRuleEval) Work() {
}
}
}
type RuleEvalForExternalType struct {
sync.RWMutex
rules map[int64]RuleEval
}
var RuleEvalForExternal = RuleEvalForExternalType{rules: make(map[int64]RuleEval)}
func (re *RuleEvalForExternalType) Build() {
rids := memsto.AlertRuleCache.GetRuleIds()
rules := make(map[int64]*models.AlertRule)
for i := 0; i < len(rids); i++ {
rule := memsto.AlertRuleCache.Get(rids[i])
if rule == nil {
continue
}
re.Lock()
rules[rule.Id] = rule
re.Unlock()
}
// stop old
for rid := range re.rules {
if _, has := rules[rid]; !has {
re.Lock()
delete(re.rules, rid)
re.Unlock()
}
}
// start new
re.Lock()
defer re.Unlock()
for rid := range rules {
if _, has := re.rules[rid]; has {
// already exists
continue
}
elst, err := models.AlertCurEventGetByRule(rules[rid].Id)
if err != nil {
logger.Errorf("worker_build: AlertCurEventGetByRule failed: %v", err)
continue
}
firemap := make(map[string]*models.AlertCurEvent)
for i := 0; i < len(elst); i++ {
elst[i].DB2Mem()
firemap[elst[i].Hash] = elst[i]
}
fires := NewAlertCurEventMap()
fires.SetAll(firemap)
newRe := RuleEval{
rule: rules[rid],
quit: make(chan struct{}),
fires: fires,
pendings: NewAlertCurEventMap(),
}
re.rules[rid] = newRe
}
}
func (re *RuleEvalForExternalType) Get(rid int64) (RuleEval, bool) {
rule := memsto.AlertRuleCache.Get(rid)
if rule == nil {
return RuleEval{}, false
}
re.RLock()
defer re.RUnlock()
if ret, has := re.rules[rid]; has {
// already exists
return ret, has
}
return RuleEval{}, false
}

View File

@ -103,6 +103,8 @@ func configRoute(r *gin.Engine, version string, reloadFunc func()) {
service := r.Group("/v1/n9e")
service.POST("/event", pushEventToQueue)
service.POST("/make-event", makeEvent)
service.POST("/judge-event", judgeEvent)
}
func stat() gin.HandlerFunc {

View File

@ -3,8 +3,10 @@ package router
import (
"fmt"
"strings"
"time"
"github.com/didi/nightingale/v5/src/models"
"github.com/didi/nightingale/v5/src/server/common/conv"
"github.com/didi/nightingale/v5/src/server/config"
"github.com/didi/nightingale/v5/src/server/engine"
promstat "github.com/didi/nightingale/v5/src/server/stat"
@ -12,6 +14,7 @@ import (
"github.com/gin-gonic/gin"
"github.com/toolkits/pkg/ginx"
"github.com/toolkits/pkg/logger"
"github.com/toolkits/pkg/str"
)
func pushEventToQueue(c *gin.Context) {
@ -69,3 +72,43 @@ func pushEventToQueue(c *gin.Context) {
}
ginx.NewRender(c).Message(nil)
}
type eventForm struct {
Alert bool `json:"alert"`
Vectors []conv.Vector `json:"vectors"`
RuleId int64 `json:"rule_id"`
}
func judgeEvent(c *gin.Context) {
var form eventForm
ginx.BindJSON(c, &form)
re, exists := engine.RuleEvalForExternal.Get(form.RuleId)
if !exists {
ginx.Bomb(200, "rule not exists")
}
re.Judge(form.Vectors)
ginx.NewRender(c).Message(nil)
}
func makeEvent(c *gin.Context) {
var events []*eventForm
ginx.BindJSON(c, &events)
now := time.Now().Unix()
for i := 0; i < len(events); i++ {
re, exists := engine.RuleEvalForExternal.Get(events[i].RuleId)
logger.Debugf("handle event:%+v exists:%v", events[i], exists)
if !exists {
ginx.Bomb(200, "rule not exists")
}
if events[i].Alert {
go re.MakeNewEvent("http", now, events[i].Vectors)
} else {
for _, vector := range events[i].Vectors {
hash := str.MD5(fmt.Sprintf("%d_%s", events[i].RuleId, vector.Key))
go re.RecoverEvent(hash, now)
}
}
}
ginx.NewRender(c).Message(nil)
}

View File

@ -26,10 +26,10 @@ func alertRuleGets(c *gin.Context) {
}
func alertRulesGetByService(c *gin.Context) {
prods := strings.Fields(ginx.QueryStr(c, "prods", ""))
prods := strings.Split(ginx.QueryStr(c, "prods", ""), ",")
query := ginx.QueryStr(c, "query", "")
ars, err := models.AlertRulesGetsBy(prods, query)
algorithm := ginx.QueryStr(c, "algorithm", "")
ars, err := models.AlertRulesGetsBy(prods, query, algorithm)
if err == nil {
cache := make(map[int64]*models.UserGroup)
for i := 0; i < len(ars); i++ {