From 80ee54898a35b4403b7c5b7c10ef43b42a8d1a9e Mon Sep 17 00:00:00 2001 From: Yening Qin <710leo@gmail.com> Date: Mon, 22 Aug 2022 14:17:17 +0800 Subject: [PATCH] feat: alert rule support cate (#1123) * alert rule support cate * his_event add cate * change RecoverEvent time * add get event api * event query by cate --- docker/initsql/a-n9e.sql | 5 +++++ src/models/alert_cur_event.go | 14 ++++++++++++-- src/models/alert_his_event.go | 13 +++++++++++-- src/models/alert_mute.go | 1 + src/models/alert_rule.go | 15 ++++++++++++++- src/models/alert_subscribe.go | 1 + src/server/config/config.go | 1 - src/server/engine/worker.go | 20 ++++++++++++++------ src/server/router/router_event.go | 3 ++- src/webapi/router/router.go | 1 + src/webapi/router/router_alert_cur_event.go | 16 +++++++++++++--- src/webapi/router/router_alert_his_event.go | 10 ++++++++-- src/webapi/router/router_alert_rule.go | 10 +++++++++- 13 files changed, 91 insertions(+), 19 deletions(-) diff --git a/docker/initsql/a-n9e.sql b/docker/initsql/a-n9e.sql index 7dfd8b7b..fed8c9a6 100644 --- a/docker/initsql/a-n9e.sql +++ b/docker/initsql/a-n9e.sql @@ -226,6 +226,7 @@ CREATE TABLE `chart_share` ( CREATE TABLE `alert_rule` ( `id` bigint unsigned not null auto_increment, `group_id` bigint not null default 0 comment 'busi group id', + `cate` varchar(128) not null, `cluster` varchar(128) not null, `name` varchar(255) not null, `note` varchar(1024) not null default '', @@ -264,6 +265,7 @@ CREATE TABLE `alert_mute` ( `id` bigint unsigned not null auto_increment, `group_id` bigint not null default 0 comment 'busi group id', `prod` varchar(255) not null default '', + `cate` varchar(128) not null, `cluster` varchar(128) not null, `tags` varchar(4096) not null default '' comment 'json,map,tagkey->regexp|value', `cause` varchar(255) not null default '', @@ -279,6 +281,7 @@ CREATE TABLE `alert_mute` ( CREATE TABLE `alert_subscribe` ( `id` bigint unsigned not null auto_increment, `group_id` bigint not null default 0 comment 'busi group id', + `cate` varchar(128) not null, `cluster` varchar(128) not null, `rule_id` bigint not null default 0, `tags` varchar(4096) not null default '' comment 'json,map,tagkey->regexp|value', @@ -380,6 +383,7 @@ insert into alert_aggr_view(name, rule, cate) values('By RuleName', 'field:rule_ CREATE TABLE `alert_cur_event` ( `id` bigint unsigned not null comment 'use alert_his_event.id', + `cate` varchar(128) not null, `cluster` varchar(128) not null, `group_id` bigint unsigned not null comment 'busi group id of rule', `group_name` varchar(255) not null default '' comment 'busi group name', @@ -416,6 +420,7 @@ CREATE TABLE `alert_cur_event` ( CREATE TABLE `alert_his_event` ( `id` bigint unsigned not null AUTO_INCREMENT, `is_recovered` tinyint(1) not null, + `cate` varchar(128) not null, `cluster` varchar(128) not null, `group_id` bigint unsigned not null comment 'busi group id of rule', `group_name` varchar(255) not null default '' comment 'busi group name', diff --git a/src/models/alert_cur_event.go b/src/models/alert_cur_event.go index 68508cf3..bff29ae8 100644 --- a/src/models/alert_cur_event.go +++ b/src/models/alert_cur_event.go @@ -12,6 +12,7 @@ import ( type AlertCurEvent struct { Id int64 `json:"id" gorm:"primaryKey"` + Cate string `json:"cate"` Cluster string `json:"cluster"` GroupId int64 `json:"group_id"` // busi group id GroupName string `json:"group_name"` // busi group name @@ -155,6 +156,7 @@ func (e *AlertCurEvent) ToHis() *AlertHisEvent { return &AlertHisEvent{ IsRecovered: isRecovered, + Cate: e.Cate, Cluster: e.Cluster, GroupId: e.GroupId, GroupName: e.GroupName, @@ -249,7 +251,7 @@ func (e *AlertCurEvent) FillNotifyGroups(cache map[int64]*UserGroup) error { return nil } -func AlertCurEventTotal(prod string, bgid, stime, etime int64, severity int, clusters []string, query string) (int64, error) { +func AlertCurEventTotal(prod string, bgid, stime, etime int64, severity int, clusters, cates []string, query string) (int64, error) { session := DB().Model(&AlertCurEvent{}).Where("trigger_time between ? and ? and rule_prod = ?", stime, etime, prod) if bgid > 0 { @@ -264,6 +266,10 @@ func AlertCurEventTotal(prod string, bgid, stime, etime int64, severity int, clu session = session.Where("cluster in ?", clusters) } + if len(cates) > 0 { + session = session.Where("cate in ?", cates) + } + if query != "" { arr := strings.Fields(query) for i := 0; i < len(arr); i++ { @@ -275,7 +281,7 @@ func AlertCurEventTotal(prod string, bgid, stime, etime int64, severity int, clu return Count(session) } -func AlertCurEventGets(prod string, bgid, stime, etime int64, severity int, clusters []string, query string, limit, offset int) ([]AlertCurEvent, error) { +func AlertCurEventGets(prod string, bgid, stime, etime int64, severity int, clusters, cates []string, query string, limit, offset int) ([]AlertCurEvent, error) { session := DB().Where("trigger_time between ? and ? and rule_prod = ?", stime, etime, prod) if bgid > 0 { @@ -290,6 +296,10 @@ func AlertCurEventGets(prod string, bgid, stime, etime int64, severity int, clus session = session.Where("cluster in ?", clusters) } + if len(cates) > 0 { + session = session.Where("cate in ?", cates) + } + if query != "" { arr := strings.Fields(query) for i := 0; i < len(arr); i++ { diff --git a/src/models/alert_his_event.go b/src/models/alert_his_event.go index efd11d25..85af4e44 100644 --- a/src/models/alert_his_event.go +++ b/src/models/alert_his_event.go @@ -7,6 +7,7 @@ import ( type AlertHisEvent struct { Id int64 `json:"id" gorm:"primaryKey"` + Cate string `json:"cate"` IsRecovered int `json:"is_recovered"` Cluster string `json:"cluster"` GroupId int64 `json:"group_id"` @@ -91,7 +92,7 @@ func (e *AlertHisEvent) FillNotifyGroups(cache map[int64]*UserGroup) error { return nil } -func AlertHisEventTotal(prod string, bgid, stime, etime int64, severity int, recovered int, clusters []string, query string) (int64, error) { +func AlertHisEventTotal(prod string, bgid, stime, etime int64, severity int, recovered int, clusters, cates []string, query string) (int64, error) { session := DB().Model(&AlertHisEvent{}).Where("last_eval_time between ? and ? and rule_prod = ?", stime, etime, prod) if bgid > 0 { @@ -110,6 +111,10 @@ func AlertHisEventTotal(prod string, bgid, stime, etime int64, severity int, rec session = session.Where("cluster in ?", clusters) } + if len(cates) > 0 { + session = session.Where("cate in ?", cates) + } + if query != "" { arr := strings.Fields(query) for i := 0; i < len(arr); i++ { @@ -121,7 +126,7 @@ func AlertHisEventTotal(prod string, bgid, stime, etime int64, severity int, rec return Count(session) } -func AlertHisEventGets(prod string, bgid, stime, etime int64, severity int, recovered int, clusters []string, query string, limit, offset int) ([]AlertHisEvent, error) { +func AlertHisEventGets(prod string, bgid, stime, etime int64, severity int, recovered int, clusters, cates []string, query string, limit, offset int) ([]AlertHisEvent, error) { session := DB().Where("last_eval_time between ? and ? and rule_prod = ?", stime, etime, prod) if bgid > 0 { @@ -140,6 +145,10 @@ func AlertHisEventGets(prod string, bgid, stime, etime int64, severity int, reco session = session.Where("cluster in ?", clusters) } + if len(cates) > 0 { + session = session.Where("cate in ?", cates) + } + if query != "" { arr := strings.Fields(query) for i := 0; i < len(arr); i++ { diff --git a/src/models/alert_mute.go b/src/models/alert_mute.go index 6589bab4..16834a50 100644 --- a/src/models/alert_mute.go +++ b/src/models/alert_mute.go @@ -22,6 +22,7 @@ type TagFilter struct { type AlertMute struct { Id int64 `json:"id" gorm:"primaryKey"` GroupId int64 `json:"group_id"` + Cate string `json:"cate"` Prod string `json:"prod"` // product empty means n9e Cluster string `json:"cluster"` // take effect by clusters, seperated by space Tags ormx.JSONArr `json:"tags"` diff --git a/src/models/alert_rule.go b/src/models/alert_rule.go index d4978403..5651192a 100644 --- a/src/models/alert_rule.go +++ b/src/models/alert_rule.go @@ -16,6 +16,7 @@ import ( type AlertRule struct { Id int64 `json:"id" gorm:"primaryKey"` GroupId int64 `json:"group_id"` // busi group id + Cate string `json:"cate"` // alert rule cate (prometheus|elasticsearch) Cluster string `json:"cluster"` // take effect by clusters, seperated by space Name string `json:"name"` // rule name Note string `json:"note"` // will sent in notify @@ -336,7 +337,7 @@ func AlertRuleGetsByCluster(cluster string) ([]*AlertRule, error) { return lr, err } -func AlertRulesGetsBy(prods []string, query, algorithm string) ([]*AlertRule, error) { +func AlertRulesGetsBy(prods []string, query, algorithm, cluster string, cates []string, disabled int) ([]*AlertRule, error) { session := DB().Where("prod in (?)", prods) if query != "" { @@ -351,6 +352,18 @@ func AlertRulesGetsBy(prods []string, query, algorithm string) ([]*AlertRule, er session = session.Where("algorithm = ?", algorithm) } + if cluster != "" { + session = session.Where("cluster like ?", "%"+cluster+"%") + } + + if len(cates) != 0 { + session = session.Where("cate in (?)", cates) + } + + if disabled != -1 { + session = session.Where("disabled = ?", disabled) + } + var lst []*AlertRule err := session.Find(&lst).Error if err == nil { diff --git a/src/models/alert_subscribe.go b/src/models/alert_subscribe.go index 672e3dbe..a448216e 100644 --- a/src/models/alert_subscribe.go +++ b/src/models/alert_subscribe.go @@ -14,6 +14,7 @@ import ( type AlertSubscribe struct { Id int64 `json:"id" gorm:"primaryKey"` GroupId int64 `json:"group_id"` + Cate string `json:"cate"` Cluster string `json:"cluster"` // take effect by clusters, seperated by space RuleId int64 `json:"rule_id"` RuleName string `json:"rule_name" gorm:"-"` // for fe diff --git a/src/server/config/config.go b/src/server/config/config.go index 423f2811..b9079c66 100644 --- a/src/server/config/config.go +++ b/src/server/config/config.go @@ -188,7 +188,6 @@ type Config struct { RunMode string ClusterName string BusiGroupLabelKey string - AnomalyDataApi []string EngineDelay int64 DisableUsageReport bool ReaderFrom string diff --git a/src/server/engine/worker.go b/src/server/engine/worker.go index f9ebd2cd..f3ff2c04 100644 --- a/src/server/engine/worker.go +++ b/src/server/engine/worker.go @@ -174,7 +174,7 @@ func (r *RuleEval) Work() { var value model.Value var err error - if r.rule.Algorithm == "" { + if r.rule.Algorithm == "" && (r.rule.Cate == "" || r.rule.Cate == "prometheus") { var warnings prom.Warnings value, warnings, err = reader.Client.Query(context.Background(), promql, time.Now()) if err != nil { @@ -359,12 +359,10 @@ func (r *RuleEval) MakeNewEvent(from string, now int64, vectors []conv.Vector) ( // handle target note targetIdent, has := vectors[i].Labels["ident"] targetNote := "" - targetCluster := "" if has { target, exists := memsto.TargetCache.Get(string(targetIdent)) if exists { targetNote = target.Note - targetCluster = target.Cluster // 对于包含ident的告警事件,check一下ident所属bg和rule所属bg是否相同 // 如果告警规则选择了只在本BG生效,那其他BG的机器就不能因此规则产生告警 @@ -396,7 +394,8 @@ func (r *RuleEval) MakeNewEvent(from string, now int64, vectors []conv.Vector) ( tagsArr := labelMapToArr(tagsMap) sort.Strings(tagsArr) - event.Cluster = targetCluster + event.Cluster = config.C.ClusterName + event.Cate = r.rule.Cate event.Hash = hash event.RuleId = r.rule.Id event.RuleName = r.rule.Name @@ -527,12 +526,21 @@ func (r *RuleEval) recoverRule(alertingKeys map[string]struct{}, now int64) { } } -func (r *RuleEval) RecoverEvent(hash string, now int64) { +func (r *RuleEval) RecoverEvent(hash string, now int64, value float64) { + curRule := memsto.AlertRuleCache.Get(r.rule.Id) + if curRule == nil { + return + } + r.rule = curRule + + r.pendings.Delete(hash) event, has := r.fires.Get(hash) if !has { return } - r.recoverEvent(hash, event, time.Now().Unix()) + + event.TriggerValue = fmt.Sprintf("%.5f", value) + r.recoverEvent(hash, event, now) } func (r *RuleEval) recoverEvent(hash string, event *models.AlertCurEvent, now int64) { diff --git a/src/server/router/router_event.go b/src/server/router/router_event.go index 4a0c49e7..ee4ec7eb 100644 --- a/src/server/router/router_event.go +++ b/src/server/router/router_event.go @@ -106,7 +106,8 @@ func makeEvent(c *gin.Context) { } else { for _, vector := range events[i].Vectors { hash := str.MD5(fmt.Sprintf("%d_%s", events[i].RuleId, vector.Key)) - go re.RecoverEvent(hash, now) + now := vector.Timestamp + go re.RecoverEvent(hash, now, vector.Value) } } } diff --git a/src/webapi/router/router.go b/src/webapi/router/router.go index 63109222..1484c87a 100644 --- a/src/webapi/router/router.go +++ b/src/webapi/router/router.go @@ -321,5 +321,6 @@ func configRoute(r *gin.Engine, version string) { service.GET("/alert-cur-events", alertCurEventsList) service.GET("/alert-his-events", alertHisEventsList) + service.GET("/alert-his-event/:eid", alertHisEventGet) } } diff --git a/src/webapi/router/router_alert_cur_event.go b/src/webapi/router/router_alert_cur_event.go index 30f49984..1b304dc6 100644 --- a/src/webapi/router/router_alert_cur_event.go +++ b/src/webapi/router/router_alert_cur_event.go @@ -46,9 +46,14 @@ func alertCurEventsCard(c *gin.Context) { clusters := queryClusters(c) rules := parseAggrRules(c) prod := ginx.QueryStr(c, "prod", "") + cate := ginx.QueryStr(c, "cate", "$all") + cates := []string{} + if cate != "$all" { + cates = strings.Split(cate, ",") + } // 最多获取50000个,获取太多也没啥意义 - list, err := models.AlertCurEventGets(prod, busiGroupId, stime, etime, severity, clusters, query, 50000, 0) + list, err := models.AlertCurEventGets(prod, busiGroupId, stime, etime, severity, clusters, cates, query, 50000, 0) ginx.Dangerous(err) cardmap := make(map[string]*AlertCard) @@ -123,11 +128,16 @@ func alertCurEventsList(c *gin.Context) { busiGroupId := ginx.QueryInt64(c, "bgid", 0) clusters := queryClusters(c) prod := ginx.QueryStr(c, "prod", "") + cate := ginx.QueryStr(c, "cate", "$all") + cates := []string{} + if cate != "$all" { + cates = strings.Split(cate, ",") + } - total, err := models.AlertCurEventTotal(prod, busiGroupId, stime, etime, severity, clusters, query) + total, err := models.AlertCurEventTotal(prod, busiGroupId, stime, etime, severity, clusters, cates, query) ginx.Dangerous(err) - list, err := models.AlertCurEventGets(prod, busiGroupId, stime, etime, severity, clusters, query, limit, ginx.Offset(c, limit)) + list, err := models.AlertCurEventGets(prod, busiGroupId, stime, etime, severity, clusters, cates, query, limit, ginx.Offset(c, limit)) ginx.Dangerous(err) cache := make(map[int64]*models.UserGroup) diff --git a/src/webapi/router/router_alert_his_event.go b/src/webapi/router/router_alert_his_event.go index 631b0454..555d4746 100644 --- a/src/webapi/router/router_alert_his_event.go +++ b/src/webapi/router/router_alert_his_event.go @@ -1,6 +1,7 @@ package router import ( + "strings" "time" "github.com/gin-gonic/gin" @@ -35,11 +36,16 @@ func alertHisEventsList(c *gin.Context) { busiGroupId := ginx.QueryInt64(c, "bgid", 0) clusters := queryClusters(c) prod := ginx.QueryStr(c, "prod", "") + cate := ginx.QueryStr(c, "cate", "$all") + cates := []string{} + if cate != "$all" { + cates = strings.Split(cate, ",") + } - total, err := models.AlertHisEventTotal(prod, busiGroupId, stime, etime, severity, recovered, clusters, query) + total, err := models.AlertHisEventTotal(prod, busiGroupId, stime, etime, severity, recovered, clusters, cates, query) ginx.Dangerous(err) - list, err := models.AlertHisEventGets(prod, busiGroupId, stime, etime, severity, recovered, clusters, query, limit, ginx.Offset(c, limit)) + list, err := models.AlertHisEventGets(prod, busiGroupId, stime, etime, severity, recovered, clusters, cates, query, limit, ginx.Offset(c, limit)) ginx.Dangerous(err) cache := make(map[int64]*models.UserGroup) diff --git a/src/webapi/router/router_alert_rule.go b/src/webapi/router/router_alert_rule.go index 244b448f..2940b729 100644 --- a/src/webapi/router/router_alert_rule.go +++ b/src/webapi/router/router_alert_rule.go @@ -29,7 +29,15 @@ func alertRulesGetByService(c *gin.Context) { prods := strings.Split(ginx.QueryStr(c, "prods", ""), ",") query := ginx.QueryStr(c, "query", "") algorithm := ginx.QueryStr(c, "algorithm", "") - ars, err := models.AlertRulesGetsBy(prods, query, algorithm) + cluster := ginx.QueryStr(c, "cluster", "") + cate := ginx.QueryStr(c, "cate", "$all") + cates := []string{} + if cate != "$all" { + cates = strings.Split(cate, ",") + } + + disabled := ginx.QueryInt(c, "disabled", -1) + ars, err := models.AlertRulesGetsBy(prods, query, algorithm, cluster, cates, disabled) if err == nil { cache := make(map[int64]*models.UserGroup) for i := 0; i < len(ars); i++ {