326 lines
8.5 KiB
Go
326 lines
8.5 KiB
Go
package alert
|
||
|
||
import (
|
||
"bytes"
|
||
"encoding/json"
|
||
"fmt"
|
||
"os/exec"
|
||
"sort"
|
||
"strconv"
|
||
"strings"
|
||
"time"
|
||
|
||
"github.com/didi/nightingale/v5/cache"
|
||
"github.com/didi/nightingale/v5/config"
|
||
"github.com/didi/nightingale/v5/judge"
|
||
"github.com/didi/nightingale/v5/models"
|
||
|
||
"github.com/toolkits/pkg/concurrent/semaphore"
|
||
"github.com/toolkits/pkg/logger"
|
||
"github.com/toolkits/pkg/net/httplib"
|
||
"github.com/toolkits/pkg/sys"
|
||
)
|
||
|
||
func popEvent() {
|
||
sema := semaphore.NewSemaphore(config.Config.Alert.NotifyScriptConcurrency)
|
||
duration := time.Duration(100) * time.Millisecond
|
||
for {
|
||
events := judge.EventQueue.PopBackBy(200)
|
||
if len(events) < 1 {
|
||
time.Sleep(duration)
|
||
continue
|
||
}
|
||
consume(events, sema)
|
||
}
|
||
}
|
||
|
||
func consume(events []interface{}, sema *semaphore.Semaphore) {
|
||
for i := range events {
|
||
if events[i] == nil {
|
||
continue
|
||
}
|
||
|
||
event := events[i].(*models.AlertEvent)
|
||
|
||
alertRule, exists := cache.AlertRules.Get(event.RuleId)
|
||
if !exists {
|
||
logger.Errorf("event_consume: alert rule not found, event:%+v", event)
|
||
continue
|
||
}
|
||
logger.Debugf("[event_consume_success][type:%v][event:%+v]", event.IsPromePull, event)
|
||
if isNoneffective(event, alertRule) {
|
||
// 告警规则非生效时段
|
||
continue
|
||
}
|
||
|
||
event.RuleName = alertRule.Name
|
||
event.RuleNote = alertRule.Note
|
||
event.NotifyChannels = alertRule.NotifyChannels
|
||
classpaths := cache.ResClasspath.GetValues(event.ResIdent)
|
||
sort.Strings(classpaths)
|
||
event.ResClasspaths = strings.Join(classpaths, " ")
|
||
enrichTag(event, alertRule)
|
||
|
||
if isEventMute(event) && event.IsAlert() {
|
||
// 被屏蔽的事件
|
||
event.MarkMuted()
|
||
|
||
if config.Config.Alert.MutedAlertPersist {
|
||
persist(event)
|
||
}
|
||
|
||
continue
|
||
}
|
||
|
||
// 操作数据库
|
||
persist(event)
|
||
|
||
// 不管是告警还是恢复,都触发回调,接收端自己处理
|
||
if alertRule.Callbacks != "" {
|
||
go callback(event, alertRule)
|
||
}
|
||
|
||
uids := genNotifyUserIDs(alertRule)
|
||
if len(uids) == 0 {
|
||
logger.Warningf("event_consume: notify users not found, event_hash_id: %s, rule_id: %d, rule_name: %s, res_ident: %s", event.HashId, event.RuleId, event.RuleName, event.ResIdent)
|
||
continue
|
||
}
|
||
|
||
users := cache.UserCache.GetByIds(uids)
|
||
if len(users) == 0 {
|
||
logger.Warningf("event_consume: notify users not found, event_hash_id: %s, rule_id: %d, rule_name: %s, res_ident: %s", event.HashId, event.RuleId, event.RuleName, event.ResIdent)
|
||
continue
|
||
}
|
||
|
||
alertMsg := AlertMsg{
|
||
Event: event,
|
||
Rule: alertRule,
|
||
Users: users,
|
||
}
|
||
|
||
logger.Infof("event_consume: notify alert:%+v", alertMsg)
|
||
|
||
sema.Acquire()
|
||
go func(alertMsg AlertMsg) {
|
||
defer sema.Release()
|
||
notify(alertMsg)
|
||
}(alertMsg)
|
||
}
|
||
}
|
||
|
||
func genNotifyUserIDs(alertRule *models.AlertRule) []int64 {
|
||
uidMap := make(map[int64]struct{})
|
||
|
||
groupIds := strings.Fields(alertRule.NotifyGroups)
|
||
for _, groupId := range groupIds {
|
||
gid, err := strconv.ParseInt(groupId, 10, 64)
|
||
if err != nil {
|
||
logger.Warningf("event_consume: strconv groupid(%s) fail: %v", groupId, err)
|
||
continue
|
||
}
|
||
|
||
um, exists := cache.UserGroupMember.Get(gid)
|
||
if !exists {
|
||
continue
|
||
}
|
||
|
||
for uid := range um {
|
||
uidMap[uid] = struct{}{}
|
||
}
|
||
}
|
||
|
||
userIds := strings.Fields(alertRule.NotifyUsers)
|
||
for _, userId := range userIds {
|
||
uid, err := strconv.ParseInt(userId, 10, 64)
|
||
if err != nil {
|
||
logger.Warningf("event_consume: strconv userid(%s) fail: %v", userId, err)
|
||
continue
|
||
}
|
||
|
||
uidMap[uid] = struct{}{}
|
||
}
|
||
|
||
uids := make([]int64, 0, len(uidMap))
|
||
for uid := range uidMap {
|
||
uids = append(uids, uid)
|
||
}
|
||
|
||
return uids
|
||
}
|
||
|
||
// 如果是告警,就存库,如果是恢复,就从未恢复的告警表里删除
|
||
func persist(event *models.AlertEvent) {
|
||
if event.IsRecov() {
|
||
logger.Debugf("[event.Recovery.db.DelByHashId]: delete recovery event:%+v", event)
|
||
err := event.DelByHashId()
|
||
if err != nil {
|
||
logger.Warningf("event_consume: delete recovery event err:%v, event:%+v", err, event)
|
||
}
|
||
} else {
|
||
err := event.Add()
|
||
if err != nil {
|
||
logger.Warningf("event_consume: insert alert event err:%v, event:%+v", err, event)
|
||
}
|
||
}
|
||
obj := ToHistoryAlertEvent(event)
|
||
err := obj.Add()
|
||
if err != nil {
|
||
logger.Warningf("event_consume: insert history alert event err:%v, event:%+v", err, event)
|
||
}
|
||
}
|
||
|
||
type AlertMsg struct {
|
||
Event *models.AlertEvent `json:"event"`
|
||
Rule *models.AlertRule `json:"rule"`
|
||
Users []*models.User `json:"users"`
|
||
}
|
||
|
||
func notify(alertMsg AlertMsg) {
|
||
//增加并发控制
|
||
bs, err := json.Marshal(alertMsg)
|
||
if err != nil {
|
||
logger.Errorf("notify: marshal alert %+v err:%v", alertMsg, err)
|
||
}
|
||
|
||
fpath := config.Config.Alert.NotifyScriptPath
|
||
cmd := exec.Command(fpath)
|
||
cmd.Stdin = bytes.NewReader(bs)
|
||
|
||
// combine stdout and stderr
|
||
var buf bytes.Buffer
|
||
cmd.Stdout = &buf
|
||
cmd.Stderr = &buf
|
||
|
||
err = cmd.Start()
|
||
if err != nil {
|
||
logger.Errorf("notify: run cmd err:%v", err)
|
||
return
|
||
}
|
||
|
||
err, isTimeout := sys.WrapTimeout(cmd, time.Duration(10)*time.Second)
|
||
|
||
if isTimeout {
|
||
if err == nil {
|
||
logger.Errorf("notify: timeout and killed process %s", fpath)
|
||
}
|
||
|
||
if err != nil {
|
||
logger.Errorf("notify: kill process %s occur error %v", fpath, err)
|
||
}
|
||
|
||
return
|
||
}
|
||
|
||
if err != nil {
|
||
logger.Errorf("notify: exec script %s occur error: %v, output: %s", fpath, err, buf.String())
|
||
return
|
||
}
|
||
|
||
logger.Infof("notify: exec %s output: %s", fpath, buf.String())
|
||
}
|
||
|
||
func callback(event *models.AlertEvent, alertRule *models.AlertRule) {
|
||
urls := strings.Fields(alertRule.Callbacks)
|
||
for _, url := range urls {
|
||
if url == "" {
|
||
continue
|
||
}
|
||
|
||
if !(strings.HasPrefix(url, "http://") || strings.HasPrefix(url, "https://")) {
|
||
url = "http://" + url
|
||
}
|
||
|
||
resp, code, err := httplib.PostJSON(url, 5*time.Second, event, map[string]string{})
|
||
if err != nil {
|
||
logger.Errorf("callback[%s] fail, callback content: %+v, resp: %s, err: %v, code:%d", url, event, string(resp), err, code)
|
||
} else {
|
||
logger.Infof("callback[%s] succ, callback content: %+v, resp: %s, code:%d", url, event, string(resp), code)
|
||
}
|
||
}
|
||
}
|
||
|
||
func isNoneffective(event *models.AlertEvent, alertRule *models.AlertRule) bool {
|
||
// 生效时间过滤
|
||
if alertRule.Status == models.ALERT_RULE_DISABLED {
|
||
logger.Debugf("event:%+v alert rule:%+v disable", event, alertRule)
|
||
return true
|
||
}
|
||
|
||
tm := time.Unix(event.TriggerTime, 0)
|
||
triggerTime := tm.Format("15:04")
|
||
triggerWeek := strconv.Itoa(int(tm.Weekday()))
|
||
|
||
if alertRule.EnableStime <= alertRule.EnableEtime {
|
||
if triggerTime < alertRule.EnableStime || triggerTime > alertRule.EnableEtime {
|
||
logger.Debugf("event:%+v alert rule:%+v triggerTime Noneffective", event, alertRule)
|
||
return true
|
||
}
|
||
} else {
|
||
if triggerTime < alertRule.EnableStime && triggerTime > alertRule.EnableEtime {
|
||
logger.Debugf("event:%+v alert rule:%+v triggerTime Noneffective", event, alertRule)
|
||
return true
|
||
}
|
||
}
|
||
|
||
alertRule.EnableDaysOfWeek = strings.Replace(alertRule.EnableDaysOfWeek, "7", "0", 1)
|
||
|
||
if !strings.Contains(alertRule.EnableDaysOfWeek, triggerWeek) {
|
||
logger.Debugf("event:%+v alert rule:%+v triggerWeek Noneffective", event, alertRule)
|
||
return true
|
||
}
|
||
|
||
return false
|
||
}
|
||
|
||
// 事件的tags有多种tags组成:ident作为一个tag,数据本身的tags(前期已经把res的tags也附到数据tags里了)、规则的tags
|
||
func enrichTag(event *models.AlertEvent, alertRule *models.AlertRule) {
|
||
if event.ResIdent != "" {
|
||
event.TagMap["ident"] = event.ResIdent
|
||
}
|
||
|
||
if alertRule.AppendTags != "" {
|
||
appendTags := strings.Fields(alertRule.AppendTags)
|
||
for _, tag := range appendTags {
|
||
arr := strings.Split(tag, "=")
|
||
if len(arr) != 2 {
|
||
logger.Warningf("alertRule AppendTags:%+v illagel", alertRule.AppendTags)
|
||
continue
|
||
}
|
||
event.TagMap[arr[0]] = arr[1]
|
||
}
|
||
}
|
||
|
||
var tagList []string
|
||
for key, value := range event.TagMap {
|
||
tagList = append(tagList, fmt.Sprintf("%s=%s", key, value))
|
||
}
|
||
sort.Strings(tagList)
|
||
event.Tags = strings.Join(tagList, " ")
|
||
}
|
||
|
||
func ToHistoryAlertEvent(ae *models.AlertEvent) *models.HistoryAlertEvent {
|
||
var obj models.HistoryAlertEvent
|
||
obj.RuleId = ae.RuleId
|
||
obj.RuleName = ae.RuleName
|
||
obj.RuleNote = ae.RuleNote
|
||
obj.HashId = ae.HashId
|
||
obj.IsPromePull = ae.IsPromePull
|
||
obj.ResClasspaths = ae.ResClasspaths
|
||
obj.ResIdent = ae.ResIdent
|
||
obj.Priority = ae.Priority
|
||
obj.Status = ae.Status
|
||
obj.IsRecovery = ae.IsRecovery
|
||
obj.HistoryPoints = ae.HistoryPoints
|
||
obj.TriggerTime = ae.TriggerTime
|
||
obj.Values = ae.Values
|
||
obj.NotifyChannels = ae.NotifyChannels
|
||
obj.NotifyGroups = ae.NotifyGroups
|
||
obj.NotifyUsers = ae.NotifyUsers
|
||
obj.RunbookUrl = ae.RunbookUrl
|
||
obj.ReadableExpression = ae.ReadableExpression
|
||
obj.Tags = ae.Tags
|
||
obj.NotifyGroupObjs = ae.NotifyGroupObjs
|
||
obj.NotifyUserObjs = ae.NotifyUserObjs
|
||
return &obj
|
||
}
|