diff --git a/src/models/alert_mute.go b/src/models/alert_mute.go index 83f724e1..f941d37c 100644 --- a/src/models/alert_mute.go +++ b/src/models/alert_mute.go @@ -104,9 +104,8 @@ func AlertMuteDel(ids []int64) error { return DB().Where("id in ?", ids).Delete(new(AlertMute)).Error } -func AlertMuteStatistics(cluster string, btime int64) (*Statistics, error) { - session := DB().Model(&AlertMute{}).Select("count(*) as total", "max(create_at) as last_updated").Where("btime <= ?", btime) - +func AlertMuteStatistics(cluster string) (*Statistics, error) { + session := DB().Model(&AlertMute{}).Select("count(*) as total", "max(create_at) as last_updated") if cluster != "" { session = session.Where("cluster = ?", cluster) } @@ -120,7 +119,7 @@ func AlertMuteStatistics(cluster string, btime int64) (*Statistics, error) { return stats[0], nil } -func AlertMuteGetsByCluster(cluster string, btime int64) ([]*AlertMute, error) { +func AlertMuteGetsByCluster(cluster string) ([]*AlertMute, error) { // clean expired first buf := int64(30) err := DB().Where("etime < ?", time.Now().Unix()+buf).Delete(new(AlertMute)).Error @@ -129,7 +128,7 @@ func AlertMuteGetsByCluster(cluster string, btime int64) ([]*AlertMute, error) { } // get my cluster's mutes - session := DB().Model(&AlertMute{}).Where("btime <= ?", btime) + session := DB().Model(&AlertMute{}) if cluster != "" { session = session.Where("cluster = ?", cluster) } diff --git a/src/server/config/config.go b/src/server/config/config.go index 4a2f192e..47c31783 100644 --- a/src/server/config/config.go +++ b/src/server/config/config.go @@ -12,7 +12,6 @@ import ( "github.com/didi/nightingale/v5/src/pkg/httpx" "github.com/didi/nightingale/v5/src/pkg/logx" - "github.com/didi/nightingale/v5/src/server/naming" "github.com/didi/nightingale/v5/src/server/reader" "github.com/didi/nightingale/v5/src/server/writer" "github.com/didi/nightingale/v5/src/storage" @@ -77,7 +76,6 @@ func MustLoad(fpaths ...string) { } C.Heartbeat.Endpoint = fmt.Sprintf("%s:%d", C.Heartbeat.IP, C.HTTP.Port) - C.Heartbeat.Cluster = C.ClusterName C.Alerting.RedisPub.ChannelKey = C.Alerting.RedisPub.ChannelPrefix + C.ClusterName @@ -93,7 +91,7 @@ type Config struct { Log logx.Config HTTP httpx.Config BasicAuth gin.Accounts - Heartbeat naming.HeartbeatConfig + Heartbeat HeartbeatConfig Alerting Alerting NoData NoData Redis storage.RedisConfig @@ -106,6 +104,12 @@ type Config struct { Ibex Ibex } +type HeartbeatConfig struct { + IP string + Interval int64 + Endpoint string +} + type Alerting struct { NotifyScriptPath string NotifyConcurrency int diff --git a/src/server/engine/repeat.go b/src/server/engine/repeat.go index 3e6ddfd6..7d9a652c 100644 --- a/src/server/engine/repeat.go +++ b/src/server/engine/repeat.go @@ -4,10 +4,12 @@ import ( "context" "time" + "github.com/toolkits/pkg/logger" + "github.com/didi/nightingale/v5/src/models" "github.com/didi/nightingale/v5/src/server/config" "github.com/didi/nightingale/v5/src/server/memsto" - "github.com/toolkits/pkg/logger" + "github.com/didi/nightingale/v5/src/server/naming" ) func loopRepeat(ctx context.Context) { @@ -24,6 +26,17 @@ func loopRepeat(ctx context.Context) { // 拉取未恢复的告警表中需要重复通知的数据 func repeat() { + isLeader, err := naming.IamLeader() + if err != nil { + logger.Errorf("repeat: %v", err) + return + } + + if !isLeader { + logger.Info("repeat: i am not leader") + return + } + events, err := models.AlertCurEventNeedRepeat(config.C.ClusterName) if err != nil { logger.Errorf("repeat: AlertCurEventNeedRepeat: %v", err) @@ -35,35 +48,39 @@ func repeat() { } for i := 0; i < len(events); i++ { - event := events[i] - rule := memsto.AlertRuleCache.Get(event.RuleId) + rule := memsto.AlertRuleCache.Get(events[i].RuleId) if rule == nil { + // 可能告警规则已经被删了,理论上不应该出现这种情况,因为删除告警规则的时候,会顺带删除活跃告警,无论如何,自保一下 continue } - if rule.NotifyRepeatStep == 0 { - // 用户后来调整了这个字段,不让继续发送了 - continue - } + repeatOne(events[i], rule) - event.DB2Mem() - - // 重复通知的告警,应该用新的时间来判断是否生效和是否屏蔽, - // 不能使用TriggerTime,因为TriggerTime是触发时的时间,是一个比较老的时间 - // 先发了告警,又做了屏蔽,本质是不想发了,如果继续用TriggerTime判断,就还是会发,不符合预期 - if isNoneffective(event.NotifyRepeatNext, rule) { - continue - } - - if isMuted(event, event.NotifyRepeatNext) { - continue - } - - fillUsers(event) - notify(event) - - if err = event.IncRepeatStep(int64(rule.NotifyRepeatStep * 60)); err != nil { + if err = events[i].IncRepeatStep(int64(rule.NotifyRepeatStep * 60)); err != nil { logger.Errorf("repeat: IncRepeatStep: %v", err) } } } + +func repeatOne(event *models.AlertCurEvent, rule *models.AlertRule) { + if rule.NotifyRepeatStep == 0 { + // 用户后来调整了这个字段,不让继续发送了 + return + } + + event.DB2Mem() + + // 重复通知的告警,应该用新的时间来判断是否生效和是否屏蔽, + // 不能使用TriggerTime,因为TriggerTime是触发时的时间,是一个比较老的时间 + // 先发了告警,又做了屏蔽,本质是不想发了,如果继续用TriggerTime判断,就还是会发,不符合预期 + if isNoneffective(event.NotifyRepeatNext, rule) { + return + } + + if isMuted(event, event.NotifyRepeatNext) { + return + } + + fillUsers(event) + notify(event) +} diff --git a/src/server/engine/worker.go b/src/server/engine/worker.go index 40e0aafa..46283d07 100644 --- a/src/server/engine/worker.go +++ b/src/server/engine/worker.go @@ -187,6 +187,10 @@ func (r RuleEval) judge(vectors []Vector) { alertingKeys := make(map[string]struct{}) now := time.Now().Unix() for i := 0; i < count; i++ { + // compute hash + hash := str.MD5(fmt.Sprintf("%d_%s", r.rule.Id, vectors[i].Key)) + alertingKeys[hash] = struct{}{} + // rule disabled in this time span? if isNoneffective(vectors[i].Timestamp, r.rule) { continue @@ -226,10 +230,6 @@ func (r RuleEval) judge(vectors []Vector) { continue } - // compute hash - hash := str.MD5(fmt.Sprintf("%d_%s", r.rule.Id, vectors[i].Key)) - alertingKeys[hash] = struct{}{} - tagsArr := labelMapToArr(tagsMap) sort.Strings(tagsArr) diff --git a/src/server/idents/idents.go b/src/server/idents/idents.go index a3ab67f5..5cd7d8d0 100644 --- a/src/server/idents/idents.go +++ b/src/server/idents/idents.go @@ -3,7 +3,6 @@ package idents import ( "context" "fmt" - "sort" "strconv" "time" @@ -92,20 +91,13 @@ func loopPushMetrics(ctx context.Context) { } func pushMetrics() { - servers, err := naming.ActiveServers(context.Background(), config.C.ClusterName) + isLeader, err := naming.IamLeader() if err != nil { - logger.Errorf("handle_idents: failed to get active servers: %v", err) + logger.Errorf("handle_idents: %v", err) return } - if len(servers) == 0 { - logger.Errorf("handle_idents: active servers empty") - return - } - - sort.Strings(servers) - - if config.C.Heartbeat.Endpoint != servers[0] { + if !isLeader { logger.Info("handle_idents: i am not leader") return } diff --git a/src/server/memsto/alert_mute_cache.go b/src/server/memsto/alert_mute_cache.go index 9efa17de..582a477d 100644 --- a/src/server/memsto/alert_mute_cache.go +++ b/src/server/memsto/alert_mute_cache.go @@ -89,9 +89,8 @@ func loopSyncAlertMutes() { func syncAlertMutes() error { start := time.Now() - btime := start.Unix() - int64(30) - stat, err := models.AlertMuteStatistics(config.C.ClusterName, btime) + stat, err := models.AlertMuteStatistics(config.C.ClusterName) if err != nil { return errors.WithMessage(err, "failed to exec AlertMuteStatistics") } @@ -103,7 +102,7 @@ func syncAlertMutes() error { return nil } - lst, err := models.AlertMuteGetsByCluster(config.C.ClusterName, btime) + lst, err := models.AlertMuteGetsByCluster(config.C.ClusterName) if err != nil { return errors.WithMessage(err, "failed to exec AlertMuteGetsByCluster") } diff --git a/src/server/naming/heartbeat.go b/src/server/naming/heartbeat.go index ab4dff0a..949cd24a 100644 --- a/src/server/naming/heartbeat.go +++ b/src/server/naming/heartbeat.go @@ -10,34 +10,28 @@ import ( "github.com/toolkits/pkg/logger" + "github.com/didi/nightingale/v5/src/server/config" "github.com/didi/nightingale/v5/src/storage" ) // local servers var localss string -type HeartbeatConfig struct { - IP string - Interval int64 - Endpoint string - Cluster string -} - -func Heartbeat(ctx context.Context, cfg HeartbeatConfig) error { - if err := heartbeat(ctx, cfg); err != nil { +func Heartbeat(ctx context.Context) error { + if err := heartbeat(ctx); err != nil { fmt.Println("failed to heartbeat:", err) return err } - go loopHeartbeat(ctx, cfg) + go loopHeartbeat(ctx) return nil } -func loopHeartbeat(ctx context.Context, cfg HeartbeatConfig) { - interval := time.Duration(cfg.Interval) * time.Millisecond +func loopHeartbeat(ctx context.Context) { + interval := time.Duration(config.C.Heartbeat.Interval) * time.Millisecond for { time.Sleep(interval) - if err := heartbeat(ctx, cfg); err != nil { + if err := heartbeat(ctx); err != nil { logger.Warning(err) } } @@ -52,15 +46,15 @@ func redisKey(cluster string) string { return fmt.Sprintf("/server/heartbeat/%s", cluster) } -func heartbeat(ctx context.Context, cfg HeartbeatConfig) error { +func heartbeat(ctx context.Context) error { now := time.Now().Unix() - key := redisKey(cfg.Cluster) - err := storage.Redis.HSet(ctx, key, cfg.Endpoint, now).Err() + key := redisKey(config.C.ClusterName) + err := storage.Redis.HSet(ctx, key, config.C.Heartbeat.Endpoint, now).Err() if err != nil { return err } - servers, err := ActiveServers(ctx, cfg.Cluster) + servers, err := ActiveServers(ctx, config.C.ClusterName) if err != nil { return err } diff --git a/src/server/naming/leader.go b/src/server/naming/leader.go new file mode 100644 index 00000000..0909be55 --- /dev/null +++ b/src/server/naming/leader.go @@ -0,0 +1,26 @@ +package naming + +import ( + "context" + "sort" + + "github.com/didi/nightingale/v5/src/server/config" + "github.com/toolkits/pkg/logger" +) + +func IamLeader() (bool, error) { + servers, err := ActiveServers(context.Background(), config.C.ClusterName) + if err != nil { + logger.Errorf("failed to get active servers: %v", err) + return false, err + } + + if len(servers) == 0 { + logger.Errorf("active servers empty") + return false, err + } + + sort.Strings(servers) + + return config.C.Heartbeat.Endpoint == servers[0], nil +} diff --git a/src/server/server.go b/src/server/server.go index 57c55d94..f69c336e 100644 --- a/src/server/server.go +++ b/src/server/server.go @@ -134,7 +134,7 @@ func (s Server) initialize() (func(), error) { memsto.Sync() // start heartbeat - if err = naming.Heartbeat(ctx, config.C.Heartbeat); err != nil { + if err = naming.Heartbeat(ctx); err != nil { return fns.Ret(), err }