refactor: fix conflicts

This commit is contained in:
710leo 2021-08-20 19:52:37 +08:00
commit 55ec76a23d
6 changed files with 193 additions and 47 deletions

View File

@ -86,6 +86,8 @@ func configRoutes(r *gin.Engine) {
pages.DELETE("/user-group/:id", login(), userGroupDel) pages.DELETE("/user-group/:id", login(), userGroupDel)
pages.GET("/classpaths", login(), classpathListGets) pages.GET("/classpaths", login(), classpathListGets)
pages.GET("/classpaths/tree", login(), classpathListNodeGets)
pages.GET("/classpaths/tree-node/:id", login(), classpathListNodeGetsById)
pages.POST("/classpaths", login(), classpathAdd) pages.POST("/classpaths", login(), classpathAdd)
pages.PUT("/classpath/:id", login(), classpathPut) pages.PUT("/classpath/:id", login(), classpathPut)
pages.DELETE("/classpath/:id", login(), classpathDel) pages.DELETE("/classpath/:id", login(), classpathDel)
@ -197,13 +199,13 @@ func configRoutes(r *gin.Engine) {
v1.POST("/tag-values", GetTagValues) v1.POST("/tag-values", GetTagValues)
v1.POST("/tag-pairs", GetTagPairs) v1.POST("/tag-pairs", GetTagPairs)
v1.POST("/tag-metrics", GetMetrics) v1.POST("/tag-metrics", GetMetrics)
v1.POST("/push", PushData)
v1.GET("/collect-rules-belong-to-ident", collectRuleGetsByIdent)
v1.GET("/collect-rules-summary", collectRuleSummaryGetByIdent)
v1.GET("/can-do-op-by-name", login(), canDoOpByName) v1.GET("/can-do-op-by-name", login(), canDoOpByName)
v1.GET("/can-do-op-by-token", login(), canDoOpByToken) v1.GET("/can-do-op-by-token", login(), canDoOpByToken)
v1.POST("/push", PushData)
v1.GET("/collect-rules-belong-to-ident", collectRuleGetsByIdent)
v1.GET("/collect-rules-summary", collectRuleSummaryGetByIdent)
} }
push := r.Group("/v1/n9e/series").Use(gzip.Gzip(gzip.DefaultCompression)) push := r.Group("/v1/n9e/series").Use(gzip.Gzip(gzip.DefaultCompression))

View File

@ -24,6 +24,21 @@ func classpathListGets(c *gin.Context) {
}, nil) }, nil)
} }
func classpathListNodeGets(c *gin.Context) {
query := queryStr(c, "query", "")
list, err := models.ClasspathNodeGets(query)
dangerous(err)
renderData(c, list, nil)
}
func classpathListNodeGetsById(c *gin.Context) {
cp := Classpath(urlParamInt64(c, "id"))
children, err := cp.DirectChildren()
renderData(c, children, err)
}
func classpathFavoriteGet(c *gin.Context) { func classpathFavoriteGet(c *gin.Context) {
lst, err := loginUser(c).FavoriteClasspaths() lst, err := loginUser(c).FavoriteClasspaths()
renderData(c, lst, err) renderData(c, lst, err)

View File

@ -449,14 +449,14 @@ func sendEventIfNeed(status []bool, event *models.AlertEvent, stra *models.Alert
} }
now := time.Now().Unix() now := time.Now().Unix()
lastEvent, exists := LastEvents.Get(event.HashId) lastEvent, exists := LastEvents.Get(event.RuleId, event.HashId)
switch event.IsPromePull { switch event.IsPromePull {
case 0: case 0:
// push型的 && 与条件型的 // push型的 && 与条件型的
if exists && lastEvent.IsPromePull == 1 { if exists && lastEvent.IsPromePull == 1 {
// 之前内存中的事件是pull型的先清空内存中的事件 // 之前内存中的事件是pull型的先清空内存中的事件
LastEvents.Del(event.HashId) LastEvents.Del(event.RuleId, event.HashId)
} }
if isTriggered { if isTriggered {
@ -476,7 +476,7 @@ func sendEventIfNeed(status []bool, event *models.AlertEvent, stra *models.Alert
// pull型的产生的事件一定是触发了阈值的即这个case里不存在recovery的场景recovery的场景用resolve_timeout的cron来处理 // pull型的产生的事件一定是触发了阈值的即这个case里不存在recovery的场景recovery的场景用resolve_timeout的cron来处理
if exists && lastEvent.IsPromePull == 0 { if exists && lastEvent.IsPromePull == 0 {
// 之前内存中的事件是push型的先清空内存中的事件 // 之前内存中的事件是push型的先清空内存中的事件
LastEvents.Del(event.HashId) LastEvents.Del(event.RuleId, event.HashId)
} }
// 1. 第一次来并且AlertDuration=0直接发送 // 1. 第一次来并且AlertDuration=0直接发送
@ -490,7 +490,7 @@ func sendEventIfNeed(status []bool, event *models.AlertEvent, stra *models.Alert
SendEvent(event) SendEvent(event)
} else { } else {
// 只有一条事件显然无法满足for AlertDuration的时间放到内存里等待 // 只有一条事件显然无法满足for AlertDuration的时间放到内存里等待
LastEvents.Set(event.HashId, event) LastEvents.Set(event)
} }
return return
} }
@ -529,7 +529,7 @@ func sendEventIfNeed(status []bool, event *models.AlertEvent, stra *models.Alert
func SendEvent(event *models.AlertEvent) { func SendEvent(event *models.AlertEvent) {
// update last event // update last event
LastEvents.Set(event.HashId, event) LastEvents.Set(event)
ok := EventQueue.PushFront(event) ok := EventQueue.PushFront(event)
if !ok { if !ok {
logger.Errorf("push event:%v err", event) logger.Errorf("push event:%v err", event)

View File

@ -4,58 +4,85 @@ import (
"sync" "sync"
"time" "time"
"github.com/toolkits/pkg/logger"
"github.com/didi/nightingale/v5/models" "github.com/didi/nightingale/v5/models"
"github.com/toolkits/pkg/logger"
) )
// rule_id -> hash_id -> *models.AlertEvent
type SafeEventMap struct { type SafeEventMap struct {
sync.RWMutex sync.RWMutex
M map[string]*models.AlertEvent M map[int64]map[string]*models.AlertEvent
} }
var ( var (
LastEvents = &SafeEventMap{M: make(map[string]*models.AlertEvent)} LastEvents = &SafeEventMap{M: make(map[int64]map[string]*models.AlertEvent)}
) )
func (s *SafeEventMap) Get(key string) (*models.AlertEvent, bool) { func (s *SafeEventMap) Get(ruleId int64, hashId string) (*models.AlertEvent, bool) {
s.RLock() s.RLock()
defer s.RUnlock() defer s.RUnlock()
event, exists := s.M[key]
return event, exists m, has := s.M[ruleId]
if !has {
return nil, false
}
event, has := m[hashId]
return event, has
} }
func (s *SafeEventMap) Set(key string, event *models.AlertEvent) { func (s *SafeEventMap) Set(event *models.AlertEvent) {
s.Lock() s.Lock()
defer s.Unlock() defer s.Unlock()
s.M[key] = event
m, has := s.M[event.RuleId]
if !has {
m = make(map[string]*models.AlertEvent)
m[event.HashId] = event
s.M[event.RuleId] = m
} else {
s.M[event.RuleId][event.HashId] = event
}
} }
func (s *SafeEventMap) Del(key string) { func (s *SafeEventMap) Del(ruleId int64, hashId string) {
s.Lock() s.Lock()
defer s.Unlock() defer s.Unlock()
delete(s.M, key)
_, has := s.M[ruleId]
if !has {
return
}
delete(s.M[ruleId], hashId)
} }
func (s *SafeEventMap) DeleteOrSendRecovery(promql string, toKeepKeys map[string]struct{}) { func (s *SafeEventMap) DeleteOrSendRecovery(ruleId int64, toKeepKeys map[string]struct{}) {
s.Lock() s.Lock()
defer s.Unlock() defer s.Unlock()
for k, ev := range s.M {
m, has := s.M[ruleId]
if !has {
return
}
for k, ev := range m {
if _, loaded := toKeepKeys[k]; loaded { if _, loaded := toKeepKeys[k]; loaded {
continue continue
} }
if ev.ReadableExpression == promql {
logger.Debugf("[to_del][ev.IsRecovery:%+v][ev.LastSend:%+v][promql:%v]", ev.IsRecovery, ev.LastSend, promql) // 如果因为promql修改导致本来是告警状态变成了恢复也接受
now := time.Now().Unix() logger.Debugf("[to_del][ev.IsRecovery:%+v][ev.LastSend:%+v]", ev.IsRecovery, ev.LastSend)
// promql 没查询到结果,需要将告警标记为已恢复并发送
// 同时需要满足 已经发送过触发信息,并且时间差满足 大于AlertDuration // promql 没查询到结果,需要将告警标记为已恢复并发送
// 为了避免 发送告警后 一个点 断点了就立即发送恢复信息的case // 同时需要满足 已经发送过触发信息,并且时间差满足 大于AlertDuration
if ev.IsAlert() && ev.LastSend && now-ev.TriggerTime > ev.AlertDuration { // 为了避免 发送告警后 一个点 断点了就立即发送恢复信息的case
logger.Debugf("[prom.alert.MarkRecov][promql:%v][ev.RuleName:%v]", promql, ev.RuleName) now := time.Now().Unix()
ev.MarkRecov() if ev.IsAlert() && ev.LastSend && now-ev.TriggerTime > ev.AlertDuration {
EventQueue.PushFront(ev) logger.Debugf("[prom.alert.MarkRecov][ev.RuleName:%v]", ev.RuleName)
delete(s.M, k) ev.MarkRecov()
} EventQueue.PushFront(ev)
delete(s.M[ruleId], k)
} }
} }
} }

View File

@ -41,19 +41,22 @@ type RuleEval struct {
ctx context.Context ctx context.Context
} }
func (re *RuleEval) start() { func (re RuleEval) start() {
logger.Debugf("[prome_pull_alert_start][RuleEval: %+v]", re) go func(re RuleEval) {
go func(re *RuleEval) { logger.Debugf("[prome_pull_alert_start][RuleEval: %+v]", re)
if re.R.PullExpr.EvaluationInterval <= 0 { if re.R.PullExpr.EvaluationInterval <= 0 {
re.R.PullExpr.EvaluationInterval = DEFAULT_PULL_ALERT_INTERVAL re.R.PullExpr.EvaluationInterval = DEFAULT_PULL_ALERT_INTERVAL
} }
sleepDuration := time.Duration(re.R.PullExpr.EvaluationInterval) * time.Second
for { for {
select { select {
case <-re.ctx.Done(): case <-re.ctx.Done():
return return
case <-re.quiteChan: case <-re.quiteChan:
return return
case <-time.After(time.Duration(re.R.PullExpr.EvaluationInterval) * time.Second): default:
} }
// 获取backend的prometheus DataSource // 获取backend的prometheus DataSource
@ -67,12 +70,13 @@ func (re *RuleEval) start() {
promVector := pb.QueryVector(re.R.PullExpr.PromQl) promVector := pb.QueryVector(re.R.PullExpr.PromQl)
handlePromqlVector(promVector, re.R) handlePromqlVector(promVector, re.R)
}
time.Sleep(sleepDuration)
}
}(re) }(re)
} }
func (r *RuleEval) stop() { func (r RuleEval) stop() {
logger.Debugf("[prome_pull_alert_stop][RuleEval: %+v]", r) logger.Debugf("[prome_pull_alert_stop][RuleEval: %+v]", r)
close(r.quiteChan) close(r.quiteChan)
} }
@ -103,17 +107,17 @@ func (rm *RuleManager) SyncRules(ctx context.Context, rules []models.AlertRule)
} }
// 停止旧的 // 停止旧的
for hash, t := range rm.activeRules { for hash := range rm.activeRules {
if _, loaded := thisAllRules[hash]; !loaded { if _, loaded := thisAllRules[hash]; !loaded {
t.stop() rm.activeRules[hash].stop()
delete(rm.activeRules, hash) delete(rm.activeRules, hash)
} }
} }
rm.targetMtx.Unlock() rm.targetMtx.Unlock()
// 开启新的 // 开启新的
for _, t := range thisNewRules { for hash := range thisNewRules {
t.start() thisNewRules[hash].start()
} }
} }
@ -121,7 +125,7 @@ func handlePromqlVector(pv promql.Vector, r models.AlertRule) {
toKeepKeys := map[string]struct{}{} toKeepKeys := map[string]struct{}{}
if len(pv) == 0 { if len(pv) == 0 {
// 说明没触发或者没查询到删掉rule-id开头的所有event // 说明没触发或者没查询到删掉rule-id开头的所有event
LastEvents.DeleteOrSendRecovery(r.PullExpr.PromQl, toKeepKeys) LastEvents.DeleteOrSendRecovery(r.Id, toKeepKeys)
return return
} }
@ -191,6 +195,6 @@ func handlePromqlVector(pv promql.Vector, r models.AlertRule) {
logger.Debugf("[handlePromqlVector_has_value][event:%+v]\n", event) logger.Debugf("[handlePromqlVector_has_value][event:%+v]\n", event)
sendEventIfNeed([]bool{true}, event, &r) sendEventIfNeed([]bool{true}, event, &r)
} }
LastEvents.DeleteOrSendRecovery(r.PullExpr.PromQl, toKeepKeys) LastEvents.DeleteOrSendRecovery(r.Id, toKeepKeys)
} }

View File

@ -19,6 +19,14 @@ type Classpath struct {
UpdateBy string `json:"update_by"` UpdateBy string `json:"update_by"`
} }
type ClasspathNode struct {
Id int64 `json:"id"`
Path string `json:"path"`
Note string `json:"note"`
Preset int `json:"preset"`
Children []*ClasspathNode `json:"children"`
}
func (c *Classpath) TableName() string { func (c *Classpath) TableName() string {
return "classpath" return "classpath"
} }
@ -104,7 +112,6 @@ func ClasspathGets(query string, limit, offset int) ([]Classpath, error) {
q := "%" + query + "%" q := "%" + query + "%"
session = session.Where("path like ?", q) session = session.Where("path like ?", q)
} }
var objs []Classpath var objs []Classpath
err := session.Find(&objs) err := session.Find(&objs)
if err != nil { if err != nil {
@ -151,7 +158,7 @@ func ClasspathGet(where string, args ...interface{}) (*Classpath, error) {
func ClasspathGetsByPrefix(prefix string) ([]Classpath, error) { func ClasspathGetsByPrefix(prefix string) ([]Classpath, error) {
var objs []Classpath var objs []Classpath
err := DB.Where("path like ?", prefix+"%").Find(&objs) err := DB.Where("path like ?", prefix+"%").OrderBy("path").Find(&objs)
if err != nil { if err != nil {
logger.Errorf("mysql.error: query classpath fail: %v", err) logger.Errorf("mysql.error: query classpath fail: %v", err)
return objs, internalServerError return objs, internalServerError
@ -218,3 +225,94 @@ func (c *Classpath) AddResources(idents []string) error {
func (c *Classpath) DelResources(idents []string) error { func (c *Classpath) DelResources(idents []string) error {
return ClasspathResourceDel(c.Id, idents) return ClasspathResourceDel(c.Id, idents)
} }
func ClasspathNodeGets(query string) ([]*ClasspathNode, error) {
session := DB.OrderBy("path")
if query != "" {
q := "%" + query + "%"
session = session.Where("path like ?", q)
}
var objs []Classpath
err := session.Find(&objs)
if err != nil {
logger.Errorf("mysql.error: query classpath fail: %v", err)
return []*ClasspathNode{}, internalServerError
}
if len(objs) == 0 {
return []*ClasspathNode{}, nil
}
pcs := ClasspathNodeAllChildren(objs)
return pcs, nil
}
func (cp *Classpath) DirectChildren() ([]Classpath, error) {
var pcs []Classpath
objs, err := ClasspathGetsByPrefix(cp.Path)
if err != nil {
logger.Errorf("mysql.error: query prefix classpath fail: %v", err)
return []Classpath{}, internalServerError
}
if len(objs) < 2 {
return []Classpath{}, nil
}
pre := objs[1]
path := pre.Path[len(objs[0].Path):]
pre.Path = path
pcs = append(pcs, pre)
for _, cp := range objs[2:] {
has := strings.HasPrefix(cp.Path, pre.Path)
if !has {
path := cp.Path[len(objs[0].Path):]
pre.Path = path
pcs = append(pcs, pre)
pre = cp
}
}
return pcs, nil
}
func ClasspathNodeAllChildren(cps []Classpath) []*ClasspathNode {
var node ClasspathNode
for _, cp := range cps {
ListInsert(cp, &node)
}
return node.Children
}
func ListInsert(obj Classpath, node *ClasspathNode) {
path := obj.Path
has := true
for {
if len(node.Children) == 0 {
break
}
children := node.Children[len(node.Children)-1]
prefix := children.Path
has = strings.HasPrefix(path, prefix)
if !has {
break
}
path = path[len(prefix):]
node = children
}
newNode := ToClasspathNode(obj, path)
node.Children = append(node.Children, &newNode)
}
func ToClasspathNode(cp Classpath, path string) ClasspathNode {
var obj ClasspathNode
obj.Id = cp.Id
obj.Path = path
obj.Note = cp.Note
obj.Preset = cp.Preset
obj.Children = []*ClasspathNode{}
return obj
}