From 0b4e3b96563f80d3ca138a9be5a5bc5e355a3d62 Mon Sep 17 00:00:00 2001 From: Ulric Qin Date: Sat, 14 Aug 2021 23:17:06 +0800 Subject: [PATCH 1/4] fix judge prom --- judge/handler.go | 10 +++--- judge/last_event.go | 77 ++++++++++++++++++++++++++++++--------------- judge/prome_pull.go | 4 +-- 3 files changed, 59 insertions(+), 32 deletions(-) diff --git a/judge/handler.go b/judge/handler.go index 265aee70..0a68b315 100644 --- a/judge/handler.go +++ b/judge/handler.go @@ -449,14 +449,14 @@ func sendEventIfNeed(status []bool, event *models.AlertEvent, stra *models.Alert } now := time.Now().Unix() - lastEvent, exists := LastEvents.Get(event.HashId) + lastEvent, exists := LastEvents.Get(event.RuleId, event.HashId) switch event.IsPromePull { case 0: // push型的 && 与条件型的 if exists && lastEvent.IsPromePull == 1 { // 之前内存中的事件是pull型的,先清空内存中的事件 - LastEvents.Del(event.HashId) + LastEvents.Del(event.RuleId, event.HashId) } if isTriggered { @@ -476,7 +476,7 @@ func sendEventIfNeed(status []bool, event *models.AlertEvent, stra *models.Alert // pull型的,产生的事件一定是触发了阈值的,即这个case里不存在recovery的场景,recovery的场景用resolve_timeout的cron来处理 if exists && lastEvent.IsPromePull == 0 { // 之前内存中的事件是push型的,先清空内存中的事件 - LastEvents.Del(event.HashId) + LastEvents.Del(event.RuleId, event.HashId) } // 1. 第一次来,并且AlertDuration=0,直接发送 @@ -490,7 +490,7 @@ func sendEventIfNeed(status []bool, event *models.AlertEvent, stra *models.Alert SendEvent(event) } else { // 只有一条事件,显然无法满足for AlertDuration的时间,放到内存里等待 - LastEvents.Set(event.HashId, event) + LastEvents.Set(event) } return } @@ -529,7 +529,7 @@ func sendEventIfNeed(status []bool, event *models.AlertEvent, stra *models.Alert func SendEvent(event *models.AlertEvent) { // update last event - LastEvents.Set(event.HashId, event) + LastEvents.Set(event) ok := EventQueue.PushFront(event) if !ok { logger.Errorf("push event:%v err", event) diff --git a/judge/last_event.go b/judge/last_event.go index 04462450..3333fb58 100644 --- a/judge/last_event.go +++ b/judge/last_event.go @@ -4,58 +4,85 @@ import ( "sync" "time" - "github.com/toolkits/pkg/logger" - "github.com/didi/nightingale/v5/models" + "github.com/toolkits/pkg/logger" ) +// rule_id -> hash_id -> *models.AlertEvent type SafeEventMap struct { sync.RWMutex - M map[string]*models.AlertEvent + M map[int64]map[string]*models.AlertEvent } var ( - LastEvents = &SafeEventMap{M: make(map[string]*models.AlertEvent)} + LastEvents = &SafeEventMap{M: make(map[int64]map[string]*models.AlertEvent)} ) -func (s *SafeEventMap) Get(key string) (*models.AlertEvent, bool) { +func (s *SafeEventMap) Get(ruleId int64, hashId string) (*models.AlertEvent, bool) { s.RLock() defer s.RUnlock() - event, exists := s.M[key] - return event, exists + + m, has := s.M[ruleId] + if !has { + return nil, false + } + + event, has := m[hashId] + return event, has } -func (s *SafeEventMap) Set(key string, event *models.AlertEvent) { +func (s *SafeEventMap) Set(event *models.AlertEvent) { s.Lock() defer s.Unlock() - s.M[key] = event + + m, has := s.M[event.RuleId] + if !has { + m = make(map[string]*models.AlertEvent) + m[event.HashId] = event + s.M[event.RuleId] = m + } else { + s.M[event.RuleId][event.HashId] = event + } } -func (s *SafeEventMap) Del(key string) { +func (s *SafeEventMap) Del(ruleId int64, hashId string) { s.Lock() defer s.Unlock() - delete(s.M, key) + + _, has := s.M[ruleId] + if !has { + return + } + + delete(s.M[ruleId], hashId) } -func (s *SafeEventMap) DeleteOrSendRecovery(promql string, toKeepKeys map[string]struct{}) { +func (s *SafeEventMap) DeleteOrSendRecovery(ruleId int64, toKeepKeys map[string]struct{}) { s.Lock() defer s.Unlock() - for k, ev := range s.M { + + m, has := s.M[ruleId] + if !has { + return + } + + for k, ev := range m { if _, loaded := toKeepKeys[k]; loaded { continue } - if ev.ReadableExpression == promql { - logger.Debugf("[to_del][ev.IsRecovery:%+v][ev.LastSend:%+v][promql:%v]", ev.IsRecovery, ev.LastSend, promql) - now := time.Now().Unix() - // promql 没查询到结果,需要将告警标记为已恢复并发送 - // 同时需要满足 已经发送过触发信息,并且时间差满足 大于AlertDuration - // 为了避免 发送告警后 一个点 断点了就立即发送恢复信息的case - if ev.IsAlert() && ev.LastSend && now-ev.TriggerTime > ev.AlertDuration { - logger.Debugf("[prom.alert.MarkRecov][promql:%v][ev.RuleName:%v]", promql, ev.RuleName) - ev.MarkRecov() - EventQueue.PushFront(ev) - delete(s.M, k) - } + + // 如果因为promql修改,导致本来是告警状态变成了恢复,也接受 + logger.Debugf("[to_del][ev.IsRecovery:%+v][ev.LastSend:%+v]", ev.IsRecovery, ev.LastSend) + + // promql 没查询到结果,需要将告警标记为已恢复并发送 + // 同时需要满足 已经发送过触发信息,并且时间差满足 大于AlertDuration + // 为了避免 发送告警后 一个点 断点了就立即发送恢复信息的case + now := time.Now().Unix() + if ev.IsAlert() && ev.LastSend && now-ev.TriggerTime > ev.AlertDuration { + logger.Debugf("[prom.alert.MarkRecov][ev.RuleName:%v]", ev.RuleName) + ev.MarkRecov() + EventQueue.PushFront(ev) + delete(s.M[ruleId], k) } } } diff --git a/judge/prome_pull.go b/judge/prome_pull.go index 45c58787..a5d09e63 100644 --- a/judge/prome_pull.go +++ b/judge/prome_pull.go @@ -121,7 +121,7 @@ func handlePromqlVector(pv promql.Vector, r models.AlertRule) { toKeepKeys := map[string]struct{}{} if len(pv) == 0 { // 说明没触发,或者没查询到,删掉rule-id开头的所有event - LastEvents.DeleteOrSendRecovery(r.PullExpr.PromQl, toKeepKeys) + LastEvents.DeleteOrSendRecovery(r.Id, toKeepKeys) return } @@ -191,6 +191,6 @@ func handlePromqlVector(pv promql.Vector, r models.AlertRule) { logger.Debugf("[handlePromqlVector_has_value][event:%+v]\n", event) sendEventIfNeed([]bool{true}, event, &r) } - LastEvents.DeleteOrSendRecovery(r.PullExpr.PromQl, toKeepKeys) + LastEvents.DeleteOrSendRecovery(r.Id, toKeepKeys) } From 60a964ae55f69ebaa93eb189609327412e4837f2 Mon Sep 17 00:00:00 2001 From: Ulric Qin Date: Sun, 15 Aug 2021 10:08:08 +0800 Subject: [PATCH 2/4] bugfix: for range goroutine --- judge/prome_pull.go | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/judge/prome_pull.go b/judge/prome_pull.go index a5d09e63..5232e0b8 100644 --- a/judge/prome_pull.go +++ b/judge/prome_pull.go @@ -41,9 +41,9 @@ type RuleEval struct { ctx context.Context } -func (re *RuleEval) start() { - logger.Debugf("[prome_pull_alert_start][RuleEval: %+v]", re) - go func(re *RuleEval) { +func (re RuleEval) start() { + go func(re RuleEval) { + logger.Debugf("[prome_pull_alert_start][RuleEval: %+v]", re) if re.R.PullExpr.EvaluationInterval <= 0 { re.R.PullExpr.EvaluationInterval = DEFAULT_PULL_ALERT_INTERVAL } @@ -72,7 +72,7 @@ func (re *RuleEval) start() { }(re) } -func (r *RuleEval) stop() { +func (r RuleEval) stop() { logger.Debugf("[prome_pull_alert_stop][RuleEval: %+v]", r) close(r.quiteChan) } @@ -103,17 +103,17 @@ func (rm *RuleManager) SyncRules(ctx context.Context, rules []models.AlertRule) } // 停止旧的 - for hash, t := range rm.activeRules { + for hash := range rm.activeRules { if _, loaded := thisAllRules[hash]; !loaded { - t.stop() + rm.activeRules[hash].stop() delete(rm.activeRules, hash) } } rm.targetMtx.Unlock() // 开启新的 - for _, t := range thisNewRules { - t.start() + for hash := range thisNewRules { + thisNewRules[hash].start() } } From 1eecb324d06eefe76262dbd71445febbc0a895a3 Mon Sep 17 00:00:00 2001 From: Ulric Qin Date: Sun, 15 Aug 2021 10:28:37 +0800 Subject: [PATCH 3/4] code refactor: exec pull_prom early --- judge/prome_pull.go | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/judge/prome_pull.go b/judge/prome_pull.go index 5232e0b8..9c57444d 100644 --- a/judge/prome_pull.go +++ b/judge/prome_pull.go @@ -47,13 +47,16 @@ func (re RuleEval) start() { if re.R.PullExpr.EvaluationInterval <= 0 { re.R.PullExpr.EvaluationInterval = DEFAULT_PULL_ALERT_INTERVAL } + + sleepDuration := time.Duration(re.R.PullExpr.EvaluationInterval) * time.Second + for { select { case <-re.ctx.Done(): return case <-re.quiteChan: return - case <-time.After(time.Duration(re.R.PullExpr.EvaluationInterval) * time.Second): + default: } // 获取backend的prometheus DataSource @@ -67,8 +70,9 @@ func (re RuleEval) start() { promVector := pb.QueryVector(re.R.PullExpr.PromQl) handlePromqlVector(promVector, re.R) - } + time.Sleep(sleepDuration) + } }(re) } From f651af970f7d249b620929819b69680ae6555a69 Mon Sep 17 00:00:00 2001 From: Istil <66511737+Keyun-Istil@users.noreply.github.com> Date: Wed, 18 Aug 2021 16:43:39 +0800 Subject: [PATCH 4/4] feat: support classpaths prefix tree display (#769) --- http/router.go | 4 ++ http/router_classpath.go | 15 ++++++ models/classpath.go | 102 ++++++++++++++++++++++++++++++++++++++- 3 files changed, 119 insertions(+), 2 deletions(-) diff --git a/http/router.go b/http/router.go index 3cb6db6d..4aa79b4d 100644 --- a/http/router.go +++ b/http/router.go @@ -83,6 +83,8 @@ func configRoutes(r *gin.Engine) { pages.DELETE("/user-group/:id", login(), userGroupDel) pages.GET("/classpaths", login(), classpathListGets) + pages.GET("/classpaths/tree", login(), classpathListNodeGets) + pages.GET("/classpaths/tree-node/:id", login(), classpathListNodeGetsById) pages.POST("/classpaths", login(), classpathAdd) pages.PUT("/classpath/:id", login(), classpathPut) pages.DELETE("/classpath/:id", login(), classpathDel) @@ -213,6 +215,8 @@ func configRoutes(r *gin.Engine) { v1.DELETE("/user-group/:id", login(), userGroupDel) v1.GET("/classpaths", login(), classpathListGets) + v1.GET("/classpaths/tree", login(), classpathListNodeGets) + v1.GET("/classpaths/tree-node/:id", login(), classpathListNodeGetsById) v1.POST("/classpaths", login(), classpathAdd) v1.PUT("/classpath/:id", login(), classpathPut) v1.DELETE("/classpath/:id", login(), classpathDel) diff --git a/http/router_classpath.go b/http/router_classpath.go index 76c8d45f..3c80f6ee 100644 --- a/http/router_classpath.go +++ b/http/router_classpath.go @@ -24,6 +24,21 @@ func classpathListGets(c *gin.Context) { }, nil) } +func classpathListNodeGets(c *gin.Context) { + query := queryStr(c, "query", "") + + list, err := models.ClasspathNodeGets(query) + dangerous(err) + + renderData(c, list, nil) +} + +func classpathListNodeGetsById(c *gin.Context) { + cp := Classpath(urlParamInt64(c, "id")) + children, err := cp.DirectChildren() + renderData(c, children, err) +} + func classpathFavoriteGet(c *gin.Context) { lst, err := loginUser(c).FavoriteClasspaths() renderData(c, lst, err) diff --git a/models/classpath.go b/models/classpath.go index 46f0a564..5e68b4da 100644 --- a/models/classpath.go +++ b/models/classpath.go @@ -19,6 +19,14 @@ type Classpath struct { UpdateBy string `json:"update_by"` } +type ClasspathNode struct { + Id int64 `json:"id"` + Path string `json:"path"` + Note string `json:"note"` + Preset int `json:"preset"` + Children []*ClasspathNode `json:"children"` +} + func (c *Classpath) TableName() string { return "classpath" } @@ -104,7 +112,6 @@ func ClasspathGets(query string, limit, offset int) ([]Classpath, error) { q := "%" + query + "%" session = session.Where("path like ?", q) } - var objs []Classpath err := session.Find(&objs) if err != nil { @@ -151,7 +158,7 @@ func ClasspathGet(where string, args ...interface{}) (*Classpath, error) { func ClasspathGetsByPrefix(prefix string) ([]Classpath, error) { var objs []Classpath - err := DB.Where("path like ?", prefix+"%").Find(&objs) + err := DB.Where("path like ?", prefix+"%").OrderBy("path").Find(&objs) if err != nil { logger.Errorf("mysql.error: query classpath fail: %v", err) return objs, internalServerError @@ -218,3 +225,94 @@ func (c *Classpath) AddResources(idents []string) error { func (c *Classpath) DelResources(idents []string) error { return ClasspathResourceDel(c.Id, idents) } + +func ClasspathNodeGets(query string) ([]*ClasspathNode, error) { + session := DB.OrderBy("path") + if query != "" { + q := "%" + query + "%" + session = session.Where("path like ?", q) + } + var objs []Classpath + err := session.Find(&objs) + if err != nil { + logger.Errorf("mysql.error: query classpath fail: %v", err) + return []*ClasspathNode{}, internalServerError + } + + if len(objs) == 0 { + return []*ClasspathNode{}, nil + } + pcs := ClasspathNodeAllChildren(objs) + + return pcs, nil +} + +func (cp *Classpath) DirectChildren() ([]Classpath, error) { + var pcs []Classpath + objs, err := ClasspathGetsByPrefix(cp.Path) + if err != nil { + logger.Errorf("mysql.error: query prefix classpath fail: %v", err) + return []Classpath{}, internalServerError + } + if len(objs) < 2 { + return []Classpath{}, nil + } + + pre := objs[1] + path := pre.Path[len(objs[0].Path):] + pre.Path = path + pcs = append(pcs, pre) + + for _, cp := range objs[2:] { + has := strings.HasPrefix(cp.Path, pre.Path) + if !has { + path := cp.Path[len(objs[0].Path):] + pre.Path = path + pcs = append(pcs, pre) + pre = cp + } + } + + return pcs, nil +} + +func ClasspathNodeAllChildren(cps []Classpath) []*ClasspathNode { + var node ClasspathNode + for _, cp := range cps { + ListInsert(cp, &node) + } + + return node.Children +} + +func ListInsert(obj Classpath, node *ClasspathNode) { + path := obj.Path + has := true + for { + if len(node.Children) == 0 { + break + } + children := node.Children[len(node.Children)-1] + prefix := children.Path + has = strings.HasPrefix(path, prefix) + if !has { + break + } + path = path[len(prefix):] + node = children + } + + newNode := ToClasspathNode(obj, path) + node.Children = append(node.Children, &newNode) +} + +func ToClasspathNode(cp Classpath, path string) ClasspathNode { + var obj ClasspathNode + obj.Id = cp.Id + obj.Path = path + obj.Note = cp.Note + obj.Preset = cp.Preset + obj.Children = []*ClasspathNode{} + + return obj +}