diff --git a/src/model/event.go b/src/model/event.go index a1922525..0d59a13b 100644 --- a/src/model/event.go +++ b/src/model/event.go @@ -29,6 +29,9 @@ type Event struct { Nid int64 `json:"nid"` NeedUpgrade int `json:"need_upgrade"` AlertUpgrade string `json:"alert_upgrade"` + RecvUserIDs []int64 `json:"recv_user_ids"` + RecvUserObjs []User `json:"recv_user_objs"` + RealUpgrade bool `json:"real_upgrade"` } type EventDetail struct { diff --git a/src/modules/monapi/cron/event_consumer.go b/src/modules/monapi/cron/event_consumer.go index 9207ff04..ffc23ede 100644 --- a/src/modules/monapi/cron/event_consumer.go +++ b/src/modules/monapi/cron/event_consumer.go @@ -33,51 +33,17 @@ func consume(event *model.Event) { } if event.NeedUpgrade == 1 { - needUpgrade, needNotify := isAlertUpgrade(event) - if needUpgrade { - var alertUpgrade model.EventAlertUpgrade - if err := json.Unmarshal([]byte(event.AlertUpgrade), &alertUpgrade); err != nil { - logger.Errorf("AlertUpgrade unmarshal failed, event: %+v, err: %v", event, err) - return - } - - if event.EventType == config.ALERT { - err := model.UpdateEventCurPriority(event.HashId, alertUpgrade.Level) - if err != nil { - logger.Errorf("UpdateEventCurPriority failed, err: %v, event: %+v", err, event) - return - } - } - err := model.UpdateEventPriority(event.Id, alertUpgrade.Level) - if err != nil { - logger.Errorf("UpdateEventPriority failed, err: %v, event: %+v", err, event) - return - } - event.Priority = alertUpgrade.Level - - SetEventStatus(event, model.STATUS_UPGRADE) - - if needNotify { - if NeedCallback(event.Sid) { - if err := PushCallbackEvent(event); err != nil { - logger.Errorf("push event to callback queue failed, callbackEvent: %+v", event) - } - logger.Infof("push event to callback queue succ, event hashid: %v", event.HashId) - - SetEventStatus(event, model.STATUS_CALLBACK) - } - - go notify.DoNotify(true, event) - SetEventStatus(event, model.STATUS_SEND) - return - } - - SetEventStatus(event, model.STATUS_CONVERGE) - return - } + event.RealUpgrade = needUpgrade(event) } - if isInConverge(event, false) { + if event.RealUpgrade { + if err := updatePriority(event); err != nil { + return + } + SetEventStatus(event, model.STATUS_UPGRADE) + } + + if isInConverge(event) { SetEventStatus(event, model.STATUS_CONVERGE) return } @@ -91,20 +57,107 @@ func consume(event *model.Event) { SetEventStatus(event, model.STATUS_CALLBACK) } + if err := fillRecvs(event); err != nil { + return + } + // 没有配置报警接收人,修改event状态为无接收人 - if strings.TrimSpace(event.Users) == "[]" && strings.TrimSpace(event.Groups) == "[]" { + if len(event.RecvUserObjs) == 0 { SetEventStatus(event, model.STATUS_NONEUSER) return } - go notify.DoNotify(false, event) + go notify.DoNotify(event) SetEventStatus(event, model.STATUS_SEND) } +func updatePriority(event *model.Event) error { + var alertUpgrade model.EventAlertUpgrade + if err := json.Unmarshal([]byte(event.AlertUpgrade), &alertUpgrade); err != nil { + logger.Errorf("AlertUpgrade unmarshal failed, event: %+v, err: %v", event, err) + return err + } + + if event.EventType == config.ALERT { + err := model.UpdateEventCurPriority(event.HashId, alertUpgrade.Level) + if err != nil { + logger.Errorf("UpdateEventCurPriority failed, err: %v, event: %+v", err, event) + return err + } + } + err := model.UpdateEventPriority(event.Id, alertUpgrade.Level) + if err != nil { + logger.Errorf("UpdateEventPriority failed, err: %v, event: %+v", err, event) + return err + } + event.Priority = alertUpgrade.Level + return nil +} + +func fillRecvs(event *model.Event) error { + userIds, err := getUserIds(event.Users, event.Groups) + if err != nil { + logger.Errorf("notify failed, get users id failed, event: %+v, err: %v", event, err) + return err + } + + if event.RealUpgrade { + // 这里即使报错也不返回,只是打印个日志,无非就是拿不到升级配置里边的users信息,没所谓 + var alertUpgrade model.EventAlertUpgrade + if err := json.Unmarshal([]byte(event.AlertUpgrade), &alertUpgrade); err != nil { + logger.Errorf("unmarshal EventAlertUpgrade fail: %v", err) + } + + upgradeUserIds, err := getUserIds(alertUpgrade.Users, alertUpgrade.Groups) + if err != nil { + logger.Errorf("upgrade notify failed, get upgrade users id failed, event: %+v, err: %v", event, err) + } + + if upgradeUserIds != nil { + userIds = append(userIds, upgradeUserIds...) + } + } + + event.RecvUserIDs = userIds + userObjs, err := model.UserGetByIds(userIds) + if err != nil { + logger.Errorf("notify failed, get user by id failed, event: %+v, err: %v", event, err) + return err + } + + event.RecvUserObjs = userObjs + return nil +} + +func getUserIds(users, groups string) ([]int64, error) { + var userIds []int64 + + if err := json.Unmarshal([]byte(users), &userIds); err != nil { + logger.Errorf("unmarshal users failed, users: %s, err: %v", users, err) + return nil, nil + } + + var groupIds []int64 + if err := json.Unmarshal([]byte(groups), &groupIds); err != nil { + logger.Errorf("unmarshal groups failed, groups: %s, err: %v", groups, err) + return nil, nil + } + + teamUsers, err := model.UserIdGetByTeamIds(groupIds) + if err != nil { + logger.Errorf("get user id by team id failed, err: %v", err) + return nil, err + } + + userIds = append(userIds, teamUsers...) + + return userIds, nil +} + // isInConverge 包含2种情况 // 1. 用户配置了N秒之内只报警M次 // 2. 用户配置了不发送recovery通知 -func isInConverge(event *model.Event, isUpgrade bool) bool { +func isInConverge(event *model.Event) bool { stra, exists := mcache.StraCache.GetById(event.Sid) if !exists { logger.Errorf("sid not found, event: %+v", event) @@ -150,7 +203,7 @@ func isInConverge(event *model.Event, isUpgrade bool) bool { startTs = recoveryTs } - cnt, err := model.EventCnt(event.HashId, model.ParseEtime(startTs), model.ParseEtime(now), isUpgrade) + cnt, err := model.EventCnt(event.HashId, model.ParseEtime(startTs), model.ParseEtime(now), event.RealUpgrade) if err != nil { logger.Errorf("get event count failed, err: %v", err) return false @@ -168,10 +221,11 @@ func isInConverge(event *model.Event, isUpgrade bool) bool { // 1,认领的报警不需要升级 // 2,忽略的报警不需要升级 // 3,屏蔽的报警不需要升级 -func isAlertUpgrade(event *model.Event) (needUpgrade, needNotify bool) { +func needUpgrade(event *model.Event) bool { alertUpgradeKey := PrefixAlertUpgrade + fmt.Sprint(event.HashId) eventAlertKey := PrefixAlertTime + fmt.Sprint(event.HashId) + // 如果告警恢复了 if event.EventType == config.RECOVERY { // 之前如果残留了upgrade的redis记录,现在恢复了,相当于一个新的周期要开始了,自然要删除老旧记录 if redisc.HasKey(alertUpgradeKey) { @@ -188,24 +242,24 @@ func isAlertUpgrade(event *model.Event) (needUpgrade, needNotify bool) { // 之前升级过,即老板已经知道了,那现在恢复了,就需要把恢复通知发给老板 // 如果配置了静默恢复呢?配置了升级的告警,显然是重要的告警,并且此时老板已经知道了,哪能静默恢复呢... // 老板收到升级告警了,但是恢复了之后,就一定要让他收到告警恢复的通知,忽略用户的"静默恢复"的配置项 - return true, true + return true } // 之前没有升级过,老板压根不知道这个事,现在恢复了,自然也不需要知道 - return false, false + return false } // 这是一个alert,not recovery,但是告警事件都找不到了,还升级通知个毛线 eventCur, err := model.EventCurGet("hashid", event.HashId) if err != nil { logger.Errorf("AlertUpgrade failed:get event_cur failed, event: %+v, err: %v", event, err) - return false, false + return false } // 告警事件都找不到了,还升级通知个毛线 if eventCur == nil { logger.Infof("AlertUpgrade failed:get event_cur is nil, event hashid: %v", event.HashId) - return false, false + return false } now := time.Now().Unix() @@ -214,7 +268,7 @@ func isAlertUpgrade(event *model.Event) (needUpgrade, needNotify bool) { var alertUpgrade model.EventAlertUpgrade if err = json.Unmarshal([]byte(event.AlertUpgrade), &alertUpgrade); err != nil { logger.Errorf("AlertUpgrade unmarshal failed, event: %+v, err: %v", event, err) - return false, false + return false } upgradeDuration := int64(alertUpgrade.Duration) @@ -222,12 +276,12 @@ func isAlertUpgrade(event *model.Event) (needUpgrade, needNotify bool) { // 说明告警已经被认领 claimants := strings.TrimSpace(eventCur.Claimants) if claimants != "[]" && claimants != "" { - return false, false + return false } // 告警已经忽略了 if eventCur.IgnoreAlert == 1 { - return false, false + return false } // 告警之后,比如30分钟没有处理,就需要升级,那首先得知道首次告警时间 @@ -235,32 +289,25 @@ func isAlertUpgrade(event *model.Event) (needUpgrade, needNotify bool) { err := redisc.SetWithTTL(eventAlertKey, now, 30*24*3600) if err != nil { logger.Errorf("set eventAlertKey failed, eventAlertKey: %v, err: %v", eventAlertKey, err) - return false, false } + + // 之前没有eventAlertKey,说明是第一次报警,不需要升级 + return false } // 比如:没到30分钟呢,不用升级 firstAlertTime := redisc.GET(eventAlertKey) if now-firstAlertTime < upgradeDuration { - return false, false + return false } err = redisc.SetWithTTL(alertUpgradeKey, 1, 30*24*3600) if err != nil { logger.Errorf("set alertUpgradeKey failed, alertUpgradeKey: %v, err: %v", alertUpgradeKey, err) - return false, false + return false } - // 还没有升级之前可能已经发过多次告警,并且已经触发了收敛,这时触发升级的告警,可千万不能被收敛 - // 比如1h内最多报1一次,在1分钟的时候触发告警并发送,6分钟、11分钟、16分钟的时候又触发但被收敛 - // 要求20分钟未处理则升级,虽然此时仍然在1h时间内,但是升级的情况需要单独来看之前是否有"已升级并且已发送"的事件 - // 显然,在这个场景下,前面只有"已发送"和"已收敛"的事件,没有"已升级并且已发送"的事件 - // 所以在21分钟的时候,应该触发升级并发送,在26分钟、31分钟的时候,都是"已升级并且已收敛" - if isInConverge(event, true) { - return true, false - } - - return true, true + return true } func SetEventStatus(event *model.Event, status string) { diff --git a/src/modules/monapi/notify/notify.go b/src/modules/monapi/notify/notify.go index 1929e2bb..3f4ba217 100644 --- a/src/modules/monapi/notify/notify.go +++ b/src/modules/monapi/notify/notify.go @@ -27,31 +27,6 @@ type Message struct { IsUpgrade bool `json:"is_upgrade"` } -func getUserIds(users, groups string) ([]int64, error) { - var userIds []int64 - - if err := json.Unmarshal([]byte(users), &userIds); err != nil { - logger.Errorf("unmarshal users failed, users: %s, err: %v", users, err) - return nil, nil - } - - var groupIds []int64 - if err := json.Unmarshal([]byte(groups), &groupIds); err != nil { - logger.Errorf("unmarshal groups failed, groups: %s, err: %v", groups, err) - return nil, nil - } - - teamUsers, err := model.UserIdGetByTeamIds(groupIds) - if err != nil { - logger.Errorf("get user id by team id failed, err: %v", err) - return nil, err - } - - userIds = append(userIds, teamUsers...) - - return userIds, nil -} - func genClaimLink(event *model.Event) string { eventCur, err := model.EventCurGet("hashid", event.HashId) if err != nil { @@ -121,46 +96,11 @@ func genEndpoint(event *model.Event) string { } // DoNotify 除了原始event信息之外,再附加一些通过查库才能得到的信息交给下游处理 -func DoNotify(isUpgrade bool, event *model.Event) { +func DoNotify(event *model.Event) { if event == nil { return } - userIds, err := getUserIds(event.Users, event.Groups) - if err != nil { - logger.Errorf("notify failed, get users id failed, event: %+v, err: %v", event, err) - return - } - - prio := fmt.Sprintf("p%v", event.Priority) - - if isUpgrade { - // 如果是触发了告警升级,就需要把要升级的人的信息也拿到 - alertUpgradeString := event.AlertUpgrade - var alertUpgrade model.EventAlertUpgrade - if err = json.Unmarshal([]byte(alertUpgradeString), &alertUpgrade); err != nil { - logger.Errorf("") - } - - upgradeUserIds, err := getUserIds(alertUpgrade.Users, alertUpgrade.Groups) - if err != nil { - logger.Errorf("upgrade notify failed, get upgrade users id failed, event: %+v, err: %v", event, err) - } - - if upgradeUserIds != nil { - userIds = append(userIds, upgradeUserIds...) - } - - // 升级了,告警级别也要相应变成升级策略里配置的级别 - prio = fmt.Sprintf("p%v", alertUpgrade.Level) - } - - users, err := model.UserGetByIds(userIds) - if err != nil { - logger.Errorf("notify failed, get user by id failed, event: %+v, err: %v", event, err) - return - } - message := Message{ Event: event, ClaimLink: genClaimLink(event), @@ -170,10 +110,11 @@ func DoNotify(isUpgrade bool, event *model.Event) { Metrics: genMetrics(event), ReadableTags: genTags(event), ReadableEndpoint: genEndpoint(event), - IsUpgrade: isUpgrade, + IsUpgrade: event.RealUpgrade, } - notifyTypes := config.Get().Notify[prio] + notifyTypes := config.Get().Notify[fmt.Sprintf("p%v", event.Priority)] + users := event.RecvUserObjs for i := 0; i < len(notifyTypes); i++ { switch notifyTypes[i] {