From e7d2c45f9d27da6d7e914fd9fac75f548f64e5c2 Mon Sep 17 00:00:00 2001 From: ulricqin Date: Mon, 22 Aug 2022 18:39:29 +0800 Subject: [PATCH] Manage bindings of n9e-server and datasource in web (#1127) * manage bindings of n9e-server and datasource * fix sync memsto --- src/server/config/config.go | 16 +- src/server/config/prom_client.go | 59 ++++++++ src/server/config/prom_option.go | 81 ++++++++++ src/server/config/reader.go | 131 ++++++++++++++++ src/server/engine/engine.go | 6 +- src/server/engine/notify.go | 9 +- src/server/engine/worker.go | 27 ++-- src/server/idents/idents.go | 18 ++- src/server/memsto/alert_mute_cache.go | 28 +++- src/server/memsto/alert_rule_cache.go | 28 +++- src/server/memsto/alert_subsribe_cache.go | 28 +++- src/server/memsto/busi_group_cache.go | 16 +- src/server/memsto/recording_rule_cache.go | 28 +++- src/server/memsto/target_cache.go | 28 +++- src/server/memsto/user_cache.go | 16 +- src/server/memsto/user_group_cache.go | 16 +- src/server/reader/reader.go | 177 ---------------------- src/server/router/router_datadog.go | 5 +- src/server/router/router_event.go | 11 +- src/server/router/router_openfalcon.go | 6 +- src/server/router/router_opentsdb.go | 5 +- src/server/router/router_prom.go | 13 +- src/server/server.go | 3 +- src/server/usage/usage.go | 26 ---- src/server/writer/writer.go | 12 +- 25 files changed, 496 insertions(+), 297 deletions(-) create mode 100644 src/server/config/prom_client.go create mode 100644 src/server/config/prom_option.go create mode 100644 src/server/config/reader.go delete mode 100644 src/server/reader/reader.go diff --git a/src/server/config/config.go b/src/server/config/config.go index b9079c66..ebdda00f 100644 --- a/src/server/config/config.go +++ b/src/server/config/config.go @@ -98,7 +98,6 @@ func MustLoad(fpaths ...string) { } C.Heartbeat.Endpoint = fmt.Sprintf("%s:%d", C.Heartbeat.IP, C.HTTP.Port) - C.Alerting.RedisPub.ChannelKey = C.Alerting.RedisPub.ChannelPrefix + C.ClusterName if C.Alerting.Webhook.Enable { if C.Alerting.Webhook.Timeout == "" { @@ -202,23 +201,10 @@ type Config struct { DB ormx.DBConfig WriterOpt WriterGlobalOpt Writers []WriterOptions - Reader ReaderOptions + Reader PromOption Ibex Ibex } -type ReaderOptions struct { - Url string - BasicAuthUser string - BasicAuthPass string - - Timeout int64 - DialTimeout int64 - - MaxIdleConnsPerHost int - - Headers []string -} - type WriterOptions struct { Url string BasicAuthUser string diff --git a/src/server/config/prom_client.go b/src/server/config/prom_client.go new file mode 100644 index 00000000..de550511 --- /dev/null +++ b/src/server/config/prom_client.go @@ -0,0 +1,59 @@ +package config + +import ( + "sync" + + "github.com/didi/nightingale/v5/src/pkg/prom" +) + +type PromClient struct { + prom.API + ClusterName string + sync.RWMutex +} + +var ReaderClient *PromClient = &PromClient{} + +func (pc *PromClient) Set(clusterName string, c prom.API) { + pc.Lock() + defer pc.Unlock() + pc.ClusterName = clusterName + pc.API = c +} + +func (pc *PromClient) Get() (string, prom.API) { + pc.RLock() + defer pc.RUnlock() + return pc.ClusterName, pc.API +} + +func (pc *PromClient) GetClusterName() string { + pc.RLock() + defer pc.RUnlock() + return pc.ClusterName +} + +func (pc *PromClient) GetCli() prom.API { + pc.RLock() + defer pc.RUnlock() + return pc.API +} + +func (pc *PromClient) IsNil() bool { + if pc == nil { + return true + } + + pc.RLock() + defer pc.RUnlock() + + return pc.API == nil +} + +func (pc *PromClient) Reset() { + pc.Lock() + defer pc.Unlock() + + pc.ClusterName = "" + pc.API = nil +} diff --git a/src/server/config/prom_option.go b/src/server/config/prom_option.go new file mode 100644 index 00000000..3dad6dae --- /dev/null +++ b/src/server/config/prom_option.go @@ -0,0 +1,81 @@ +package config + +import "sync" + +type PromOption struct { + Url string + BasicAuthUser string + BasicAuthPass string + + Timeout int64 + DialTimeout int64 + + MaxIdleConnsPerHost int + + Headers []string +} + +func (po *PromOption) Equal(target PromOption) bool { + if po.Url != target.Url { + return false + } + + if po.BasicAuthUser != target.BasicAuthUser { + return false + } + + if po.BasicAuthPass != target.BasicAuthPass { + return false + } + + if po.Timeout != target.Timeout { + return false + } + + if po.DialTimeout != target.DialTimeout { + return false + } + + if po.MaxIdleConnsPerHost != target.MaxIdleConnsPerHost { + return false + } + + if len(po.Headers) != len(target.Headers) { + return false + } + + for i := 0; i < len(po.Headers); i++ { + if po.Headers[i] != target.Headers[i] { + return false + } + } + + return true +} + +type PromOptionsStruct struct { + Data map[string]PromOption + sync.RWMutex +} + +func (pos *PromOptionsStruct) Set(clusterName string, po PromOption) { + pos.Lock() + pos.Data[clusterName] = po + pos.Unlock() +} + +func (pos *PromOptionsStruct) Sets(clusterName string, po PromOption) { + pos.Lock() + pos.Data = map[string]PromOption{clusterName: po} + pos.Unlock() +} + +func (pos *PromOptionsStruct) Get(clusterName string) (PromOption, bool) { + pos.RLock() + defer pos.RUnlock() + ret, has := pos.Data[clusterName] + return ret, has +} + +// Data key is cluster name +var PromOptions = &PromOptionsStruct{Data: make(map[string]PromOption)} diff --git a/src/server/config/reader.go b/src/server/config/reader.go new file mode 100644 index 00000000..4098c8f2 --- /dev/null +++ b/src/server/config/reader.go @@ -0,0 +1,131 @@ +package config + +import ( + "encoding/json" + "fmt" + "net" + "net/http" + "strings" + "time" + + "github.com/didi/nightingale/v5/src/models" + "github.com/didi/nightingale/v5/src/pkg/prom" + "github.com/prometheus/client_golang/api" + "github.com/toolkits/pkg/logger" +) + +func InitReader() error { + rf := strings.ToLower(strings.TrimSpace(C.ReaderFrom)) + if rf == "" || rf == "config" { + return setClientFromPromOption(C.ClusterName, C.Reader) + } + + if rf == "database" { + return initFromDatabase() + } + + return fmt.Errorf("invalid configuration ReaderFrom: %s", rf) +} + +func initFromDatabase() error { + go func() { + for { + loadFromDatabase() + time.Sleep(time.Second) + } + }() + return nil +} + +func loadFromDatabase() { + cluster, err := models.AlertingEngineGetCluster(C.Heartbeat.Endpoint) + if err != nil { + logger.Errorf("failed to get current cluster, error: %v", err) + return + } + + if cluster == "" { + ReaderClient.Reset() + logger.Warning("no datasource binded to me") + return + } + + ckey := "prom." + cluster + ".option" + cval, err := models.ConfigsGet(ckey) + if err != nil { + logger.Errorf("failed to get ckey: %s, error: %v", ckey, err) + return + } + + if cval == "" { + ReaderClient.Reset() + return + } + + var po PromOption + err = json.Unmarshal([]byte(cval), &po) + if err != nil { + logger.Errorf("failed to unmarshal PromOption: %s", err) + return + } + + if ReaderClient.IsNil() { + // first time + if err = setClientFromPromOption(cluster, po); err != nil { + logger.Errorf("failed to setClientFromPromOption: %v", err) + return + } + + PromOptions.Sets(cluster, po) + return + } + + localPo, has := PromOptions.Get(cluster) + if !has || !localPo.Equal(po) { + if err = setClientFromPromOption(cluster, po); err != nil { + logger.Errorf("failed to setClientFromPromOption: %v", err) + return + } + + PromOptions.Sets(cluster, po) + return + } +} + +func newClientFromPromOption(po PromOption) (api.Client, error) { + return api.NewClient(api.Config{ + Address: po.Url, + RoundTripper: &http.Transport{ + // TLSClientConfig: tlsConfig, + Proxy: http.ProxyFromEnvironment, + DialContext: (&net.Dialer{ + Timeout: time.Duration(po.DialTimeout) * time.Millisecond, + }).DialContext, + ResponseHeaderTimeout: time.Duration(po.Timeout) * time.Millisecond, + MaxIdleConnsPerHost: po.MaxIdleConnsPerHost, + }, + }) +} + +func setClientFromPromOption(clusterName string, po PromOption) error { + if clusterName == "" { + return fmt.Errorf("argument clusterName is blank") + } + + if po.Url == "" { + return fmt.Errorf("prometheus url is blank") + } + + cli, err := newClientFromPromOption(po) + if err != nil { + return fmt.Errorf("failed to newClientFromPromOption: %v", err) + } + + ReaderClient.Set(clusterName, prom.NewAPI(cli, prom.ClientOptions{ + BasicAuthUser: po.BasicAuthUser, + BasicAuthPass: po.BasicAuthPass, + Headers: po.Headers, + })) + + return nil +} diff --git a/src/server/engine/engine.go b/src/server/engine/engine.go index 7648e974..f15f5412 100644 --- a/src/server/engine/engine.go +++ b/src/server/engine/engine.go @@ -53,6 +53,10 @@ func Reload() { func reportQueueSize() { for { time.Sleep(time.Second) - promstat.GaugeAlertQueueSize.WithLabelValues(config.C.ClusterName).Set(float64(EventQueue.Len())) + clusterName := config.ReaderClient.GetClusterName() + if clusterName == "" { + continue + } + promstat.GaugeAlertQueueSize.WithLabelValues(clusterName).Set(float64(EventQueue.Len())) } } diff --git a/src/server/engine/notify.go b/src/server/engine/notify.go index 213e5879..200981fc 100644 --- a/src/server/engine/notify.go +++ b/src/server/engine/notify.go @@ -101,12 +101,13 @@ func genNotice(event *models.AlertCurEvent) Notice { return Notice{Event: event, Tpls: ntpls} } -func alertingRedisPub(bs []byte) { +func alertingRedisPub(clusterName string, bs []byte) { + channelKey := config.C.Alerting.RedisPub.ChannelPrefix + clusterName // pub all alerts to redis if config.C.Alerting.RedisPub.Enable { - err := storage.Redis.Publish(context.Background(), config.C.Alerting.RedisPub.ChannelKey, bs).Err() + err := storage.Redis.Publish(context.Background(), channelKey, bs).Err() if err != nil { - logger.Errorf("event_notify: redis publish %s err: %v", config.C.Alerting.RedisPub.ChannelKey, err) + logger.Errorf("event_notify: redis publish %s err: %v", channelKey, err) } } } @@ -249,7 +250,7 @@ func notify(event *models.AlertCurEvent) { return } - alertingRedisPub(stdinBytes) + alertingRedisPub(event.Cluster, stdinBytes) alertingWebhook(event) handleNotice(notice, stdinBytes) diff --git a/src/server/engine/worker.go b/src/server/engine/worker.go index f3ff2c04..448bbf9e 100644 --- a/src/server/engine/worker.go +++ b/src/server/engine/worker.go @@ -20,7 +20,6 @@ import ( "github.com/didi/nightingale/v5/src/server/config" "github.com/didi/nightingale/v5/src/server/memsto" "github.com/didi/nightingale/v5/src/server/naming" - "github.com/didi/nightingale/v5/src/server/reader" promstat "github.com/didi/nightingale/v5/src/server/stat" ) @@ -167,16 +166,18 @@ func (r *RuleEval) Work() { return } - if reader.Client == nil { - logger.Error("reader.Client is nil") + if config.ReaderClient.IsNil() { + logger.Error("reader client is nil") return } + clusterName, readerClient := config.ReaderClient.Get() + var value model.Value var err error if r.rule.Algorithm == "" && (r.rule.Cate == "" || r.rule.Cate == "prometheus") { var warnings prom.Warnings - value, warnings, err = reader.Client.Query(context.Background(), promql, time.Now()) + value, warnings, err = readerClient.Query(context.Background(), promql, time.Now()) if err != nil { logger.Errorf("rule_eval:%d promql:%s, error:%v", r.RuleID(), promql, err) //notifyToMaintainer(err, "failed to query prometheus") @@ -191,7 +192,7 @@ func (r *RuleEval) Work() { logger.Debugf("rule_eval:%d promql:%s, value:%v", r.RuleID(), promql, value) } - r.Judge(conv.ConvertVectors(value)) + r.Judge(clusterName, conv.ConvertVectors(value)) } type WorkersType struct { @@ -306,10 +307,10 @@ func (ws *WorkersType) BuildRe(rids []int64) { } } -func (r *RuleEval) Judge(vectors []conv.Vector) { +func (r *RuleEval) Judge(clusterName string, vectors []conv.Vector) { now := time.Now().Unix() - alertingKeys, ruleExists := r.MakeNewEvent("inner", now, vectors) + alertingKeys, ruleExists := r.MakeNewEvent("inner", now, clusterName, vectors) if !ruleExists { return } @@ -318,7 +319,7 @@ func (r *RuleEval) Judge(vectors []conv.Vector) { r.recoverRule(alertingKeys, now) } -func (r *RuleEval) MakeNewEvent(from string, now int64, vectors []conv.Vector) (map[string]struct{}, bool) { +func (r *RuleEval) MakeNewEvent(from string, now int64, clusterName string, vectors []conv.Vector) (map[string]struct{}, bool) { // 有可能rule的一些配置已经发生变化,比如告警接收人、callbacks等 // 这些信息的修改是不会引起worker restart的,但是确实会影响告警处理逻辑 // 所以,这里直接从memsto.AlertRuleCache中获取并覆盖 @@ -394,7 +395,7 @@ func (r *RuleEval) MakeNewEvent(from string, now int64, vectors []conv.Vector) ( tagsArr := labelMapToArr(tagsMap) sort.Strings(tagsArr) - event.Cluster = config.C.ClusterName + event.Cluster = clusterName event.Cate = r.rule.Cate event.Hash = hash event.RuleId = r.rule.Id @@ -583,7 +584,7 @@ func (r *RuleEval) pushEventToQueue(event *models.AlertCurEvent) { r.fires.Set(event.Hash, event) } - promstat.CounterAlertsTotal.WithLabelValues(config.C.ClusterName).Inc() + promstat.CounterAlertsTotal.WithLabelValues(event.Cluster).Inc() LogEvent(event, "push_queue") if !EventQueue.PushFront(event) { logger.Warningf("event_push_queue: queue is full") @@ -650,12 +651,12 @@ func (r RecordingRuleEval) Work() { return } - if reader.Client == nil { - log.Println("reader.Client is nil") + if config.ReaderClient.IsNil() { + log.Println("reader client is nil") return } - value, warnings, err := reader.Client.Query(context.Background(), promql, time.Now()) + value, warnings, err := config.ReaderClient.GetCli().Query(context.Background(), promql, time.Now()) if err != nil { logger.Errorf("recording_rule_eval:%d promql:%s, error:%v", r.RuleID(), promql, err) return diff --git a/src/server/idents/idents.go b/src/server/idents/idents.go index aa69a5da..23922b51 100644 --- a/src/server/idents/idents.go +++ b/src/server/idents/idents.go @@ -41,6 +41,10 @@ func toRedis() { return } + if config.ReaderClient.IsNil() { + return + } + now := time.Now().Unix() // clean old idents @@ -49,7 +53,7 @@ func toRedis() { Idents.Remove(key) } else { // use now as timestamp to redis - err := storage.Redis.HSet(context.Background(), redisKey(config.C.ClusterName), key, now).Err() + err := storage.Redis.HSet(context.Background(), redisKey(config.ReaderClient.GetClusterName()), key, now).Err() if err != nil { logger.Errorf("redis hset idents failed: %v", err) } @@ -103,8 +107,14 @@ func pushMetrics() { return } + clusterName := config.ReaderClient.GetClusterName() + if clusterName == "" { + logger.Warning("cluster name is blank") + return + } + // get all the target heartbeat timestamp - ret, err := storage.Redis.HGetAll(context.Background(), redisKey(config.C.ClusterName)).Result() + ret, err := storage.Redis.HGetAll(context.Background(), redisKey(clusterName)).Result() if err != nil { logger.Errorf("handle_idents: redis hgetall fail: %v", err) return @@ -121,7 +131,7 @@ func pushMetrics() { } if now-clock > dur { - clearDeadIdent(context.Background(), config.C.ClusterName, ident) + clearDeadIdent(context.Background(), clusterName, ident) } else { actives[ident] = struct{}{} } @@ -153,7 +163,7 @@ func pushMetrics() { if !has { // target not exists target = &models.Target{ - Cluster: config.C.ClusterName, + Cluster: clusterName, Ident: active, Tags: "", TagsJSON: []string{}, diff --git a/src/server/memsto/alert_mute_cache.go b/src/server/memsto/alert_mute_cache.go index 582a477d..723a20d7 100644 --- a/src/server/memsto/alert_mute_cache.go +++ b/src/server/memsto/alert_mute_cache.go @@ -27,6 +27,15 @@ var AlertMuteCache = AlertMuteCacheType{ mutes: make(map[int64][]*models.AlertMute), } +func (amc *AlertMuteCacheType) Reset() { + amc.Lock() + defer amc.Unlock() + + amc.statTotal = -1 + amc.statLastUpdated = -1 + amc.mutes = make(map[int64][]*models.AlertMute) +} + func (amc *AlertMuteCacheType) StatChanged(total, lastUpdated int64) bool { if amc.statTotal == total && amc.statLastUpdated == lastUpdated { return false @@ -90,19 +99,26 @@ func loopSyncAlertMutes() { func syncAlertMutes() error { start := time.Now() - stat, err := models.AlertMuteStatistics(config.C.ClusterName) + clusterName := config.ReaderClient.GetClusterName() + if clusterName == "" { + AlertMuteCache.Reset() + logger.Warning("cluster name is blank") + return nil + } + + stat, err := models.AlertMuteStatistics(clusterName) if err != nil { return errors.WithMessage(err, "failed to exec AlertMuteStatistics") } if !AlertMuteCache.StatChanged(stat.Total, stat.LastUpdated) { - promstat.GaugeCronDuration.WithLabelValues(config.C.ClusterName, "sync_alert_mutes").Set(0) - promstat.GaugeSyncNumber.WithLabelValues(config.C.ClusterName, "sync_alert_mutes").Set(0) + promstat.GaugeCronDuration.WithLabelValues(clusterName, "sync_alert_mutes").Set(0) + promstat.GaugeSyncNumber.WithLabelValues(clusterName, "sync_alert_mutes").Set(0) logger.Debug("alert mutes not changed") return nil } - lst, err := models.AlertMuteGetsByCluster(config.C.ClusterName) + lst, err := models.AlertMuteGetsByCluster(clusterName) if err != nil { return errors.WithMessage(err, "failed to exec AlertMuteGetsByCluster") } @@ -122,8 +138,8 @@ func syncAlertMutes() error { AlertMuteCache.Set(oks, stat.Total, stat.LastUpdated) ms := time.Since(start).Milliseconds() - promstat.GaugeCronDuration.WithLabelValues(config.C.ClusterName, "sync_alert_mutes").Set(float64(ms)) - promstat.GaugeSyncNumber.WithLabelValues(config.C.ClusterName, "sync_alert_mutes").Set(float64(len(lst))) + promstat.GaugeCronDuration.WithLabelValues(clusterName, "sync_alert_mutes").Set(float64(ms)) + promstat.GaugeSyncNumber.WithLabelValues(clusterName, "sync_alert_mutes").Set(float64(len(lst))) logger.Infof("timer: sync mutes done, cost: %dms, number: %d", ms, len(lst)) return nil diff --git a/src/server/memsto/alert_rule_cache.go b/src/server/memsto/alert_rule_cache.go index d412d985..c4902a9d 100644 --- a/src/server/memsto/alert_rule_cache.go +++ b/src/server/memsto/alert_rule_cache.go @@ -27,6 +27,15 @@ var AlertRuleCache = AlertRuleCacheType{ rules: make(map[int64]*models.AlertRule), } +func (arc *AlertRuleCacheType) Reset() { + arc.Lock() + defer arc.Unlock() + + arc.statTotal = -1 + arc.statLastUpdated = -1 + arc.rules = make(map[int64]*models.AlertRule) +} + func (arc *AlertRuleCacheType) StatChanged(total, lastUpdated int64) bool { if arc.statTotal == total && arc.statLastUpdated == lastUpdated { return false @@ -87,19 +96,26 @@ func loopSyncAlertRules() { func syncAlertRules() error { start := time.Now() - stat, err := models.AlertRuleStatistics(config.C.ClusterName) + clusterName := config.ReaderClient.GetClusterName() + if clusterName == "" { + AlertRuleCache.Reset() + logger.Warning("cluster name is blank") + return nil + } + + stat, err := models.AlertRuleStatistics(clusterName) if err != nil { return errors.WithMessage(err, "failed to exec AlertRuleStatistics") } if !AlertRuleCache.StatChanged(stat.Total, stat.LastUpdated) { - promstat.GaugeCronDuration.WithLabelValues(config.C.ClusterName, "sync_alert_rules").Set(0) - promstat.GaugeSyncNumber.WithLabelValues(config.C.ClusterName, "sync_alert_rules").Set(0) + promstat.GaugeCronDuration.WithLabelValues(clusterName, "sync_alert_rules").Set(0) + promstat.GaugeSyncNumber.WithLabelValues(clusterName, "sync_alert_rules").Set(0) logger.Debug("alert rules not changed") return nil } - lst, err := models.AlertRuleGetsByCluster(config.C.ClusterName) + lst, err := models.AlertRuleGetsByCluster(clusterName) if err != nil { return errors.WithMessage(err, "failed to exec AlertRuleGetsByCluster") } @@ -112,8 +128,8 @@ func syncAlertRules() error { AlertRuleCache.Set(m, stat.Total, stat.LastUpdated) ms := time.Since(start).Milliseconds() - promstat.GaugeCronDuration.WithLabelValues(config.C.ClusterName, "sync_alert_rules").Set(float64(ms)) - promstat.GaugeSyncNumber.WithLabelValues(config.C.ClusterName, "sync_alert_rules").Set(float64(len(m))) + promstat.GaugeCronDuration.WithLabelValues(clusterName, "sync_alert_rules").Set(float64(ms)) + promstat.GaugeSyncNumber.WithLabelValues(clusterName, "sync_alert_rules").Set(float64(len(m))) logger.Infof("timer: sync rules done, cost: %dms, number: %d", ms, len(m)) return nil diff --git a/src/server/memsto/alert_subsribe_cache.go b/src/server/memsto/alert_subsribe_cache.go index 9f319e14..4b2a11e4 100644 --- a/src/server/memsto/alert_subsribe_cache.go +++ b/src/server/memsto/alert_subsribe_cache.go @@ -27,6 +27,15 @@ var AlertSubscribeCache = AlertSubscribeCacheType{ subs: make(map[int64][]*models.AlertSubscribe), } +func (c *AlertSubscribeCacheType) Reset() { + c.Lock() + defer c.Unlock() + + c.statTotal = -1 + c.statLastUpdated = -1 + c.subs = make(map[int64][]*models.AlertSubscribe) +} + func (c *AlertSubscribeCacheType) StatChanged(total, lastUpdated int64) bool { if c.statTotal == total && c.statLastUpdated == lastUpdated { return false @@ -93,19 +102,26 @@ func loopSyncAlertSubscribes() { func syncAlertSubscribes() error { start := time.Now() - stat, err := models.AlertSubscribeStatistics(config.C.ClusterName) + clusterName := config.ReaderClient.GetClusterName() + if clusterName == "" { + AlertSubscribeCache.Reset() + logger.Warning("cluster name is blank") + return nil + } + + stat, err := models.AlertSubscribeStatistics(clusterName) if err != nil { return errors.WithMessage(err, "failed to exec AlertSubscribeStatistics") } if !AlertSubscribeCache.StatChanged(stat.Total, stat.LastUpdated) { - promstat.GaugeCronDuration.WithLabelValues(config.C.ClusterName, "sync_alert_subscribes").Set(0) - promstat.GaugeSyncNumber.WithLabelValues(config.C.ClusterName, "sync_alert_subscribes").Set(0) + promstat.GaugeCronDuration.WithLabelValues(clusterName, "sync_alert_subscribes").Set(0) + promstat.GaugeSyncNumber.WithLabelValues(clusterName, "sync_alert_subscribes").Set(0) logger.Debug("alert subscribes not changed") return nil } - lst, err := models.AlertSubscribeGetsByCluster(config.C.ClusterName) + lst, err := models.AlertSubscribeGetsByCluster(clusterName) if err != nil { return errors.WithMessage(err, "failed to exec AlertSubscribeGetsByCluster") } @@ -125,8 +141,8 @@ func syncAlertSubscribes() error { AlertSubscribeCache.Set(subs, stat.Total, stat.LastUpdated) ms := time.Since(start).Milliseconds() - promstat.GaugeCronDuration.WithLabelValues(config.C.ClusterName, "sync_alert_subscribes").Set(float64(ms)) - promstat.GaugeSyncNumber.WithLabelValues(config.C.ClusterName, "sync_alert_subscribes").Set(float64(len(lst))) + promstat.GaugeCronDuration.WithLabelValues(clusterName, "sync_alert_subscribes").Set(float64(ms)) + promstat.GaugeSyncNumber.WithLabelValues(clusterName, "sync_alert_subscribes").Set(float64(len(lst))) logger.Infof("timer: sync subscribes done, cost: %dms, number: %d", ms, len(lst)) return nil diff --git a/src/server/memsto/busi_group_cache.go b/src/server/memsto/busi_group_cache.go index 947baaa5..a32b641e 100644 --- a/src/server/memsto/busi_group_cache.go +++ b/src/server/memsto/busi_group_cache.go @@ -79,9 +79,14 @@ func syncBusiGroups() error { return errors.WithMessage(err, "failed to exec BusiGroupStatistics") } + clusterName := config.ReaderClient.GetClusterName() + if !BusiGroupCache.StatChanged(stat.Total, stat.LastUpdated) { - promstat.GaugeCronDuration.WithLabelValues(config.C.ClusterName, "sync_busi_groups").Set(0) - promstat.GaugeSyncNumber.WithLabelValues(config.C.ClusterName, "sync_busi_groups").Set(0) + if clusterName != "" { + promstat.GaugeCronDuration.WithLabelValues(clusterName, "sync_busi_groups").Set(0) + promstat.GaugeSyncNumber.WithLabelValues(clusterName, "sync_busi_groups").Set(0) + } + logger.Debug("busi_group not changed") return nil } @@ -94,8 +99,11 @@ func syncBusiGroups() error { BusiGroupCache.Set(m, stat.Total, stat.LastUpdated) ms := time.Since(start).Milliseconds() - promstat.GaugeCronDuration.WithLabelValues(config.C.ClusterName, "sync_busi_groups").Set(float64(ms)) - promstat.GaugeSyncNumber.WithLabelValues(config.C.ClusterName, "sync_busi_groups").Set(float64(len(m))) + if clusterName != "" { + promstat.GaugeCronDuration.WithLabelValues(clusterName, "sync_busi_groups").Set(float64(ms)) + promstat.GaugeSyncNumber.WithLabelValues(clusterName, "sync_busi_groups").Set(float64(len(m))) + } + logger.Infof("timer: sync busi groups done, cost: %dms, number: %d", ms, len(m)) return nil diff --git a/src/server/memsto/recording_rule_cache.go b/src/server/memsto/recording_rule_cache.go index 61305de1..7670bd28 100644 --- a/src/server/memsto/recording_rule_cache.go +++ b/src/server/memsto/recording_rule_cache.go @@ -26,6 +26,15 @@ var RecordingRuleCache = RecordingRuleCacheType{ rules: make(map[int64]*models.RecordingRule), } +func (rrc *RecordingRuleCacheType) Reset() { + rrc.Lock() + defer rrc.Unlock() + + rrc.statTotal = -1 + rrc.statLastUpdated = -1 + rrc.rules = make(map[int64]*models.RecordingRule) +} + func (rrc *RecordingRuleCacheType) StatChanged(total, lastUpdated int64) bool { if rrc.statTotal == total && rrc.statLastUpdated == lastUpdated { return false @@ -86,19 +95,26 @@ func loopSyncRecordingRules() { func syncRecordingRules() error { start := time.Now() - stat, err := models.RecordingRuleStatistics(config.C.ClusterName) + clusterName := config.ReaderClient.GetClusterName() + if clusterName == "" { + RecordingRuleCache.Reset() + logger.Warning("cluster name is blank") + return nil + } + + stat, err := models.RecordingRuleStatistics(clusterName) if err != nil { return errors.WithMessage(err, "failed to exec RecordingRuleStatistics") } if !RecordingRuleCache.StatChanged(stat.Total, stat.LastUpdated) { - promstat.GaugeCronDuration.WithLabelValues(config.C.ClusterName, "sync_recording_rules").Set(0) - promstat.GaugeSyncNumber.WithLabelValues(config.C.ClusterName, "sync_recording_rules").Set(0) + promstat.GaugeCronDuration.WithLabelValues(clusterName, "sync_recording_rules").Set(0) + promstat.GaugeSyncNumber.WithLabelValues(clusterName, "sync_recording_rules").Set(0) logger.Debug("recoding rules not changed") return nil } - lst, err := models.RecordingRuleGetsByCluster(config.C.ClusterName) + lst, err := models.RecordingRuleGetsByCluster(clusterName) if err != nil { return errors.WithMessage(err, "failed to exec RecordingRuleGetsByCluster") } @@ -111,8 +127,8 @@ func syncRecordingRules() error { RecordingRuleCache.Set(m, stat.Total, stat.LastUpdated) ms := time.Since(start).Milliseconds() - promstat.GaugeCronDuration.WithLabelValues(config.C.ClusterName, "sync_recording_rules").Set(float64(ms)) - promstat.GaugeSyncNumber.WithLabelValues(config.C.ClusterName, "sync_recording_rules").Set(float64(len(m))) + promstat.GaugeCronDuration.WithLabelValues(clusterName, "sync_recording_rules").Set(float64(ms)) + promstat.GaugeSyncNumber.WithLabelValues(clusterName, "sync_recording_rules").Set(float64(len(m))) logger.Infof("timer: sync recording rules done, cost: %dms, number: %d", ms, len(m)) return nil diff --git a/src/server/memsto/target_cache.go b/src/server/memsto/target_cache.go index 1cc77644..9e5e04b4 100644 --- a/src/server/memsto/target_cache.go +++ b/src/server/memsto/target_cache.go @@ -31,6 +31,15 @@ var TargetCache = TargetCacheType{ targets: make(map[string]*models.Target), } +func (tc *TargetCacheType) Reset() { + tc.Lock() + defer tc.Unlock() + + tc.statTotal = -1 + tc.statLastUpdated = -1 + tc.targets = make(map[string]*models.Target) +} + func (tc *TargetCacheType) StatChanged(total, lastUpdated int64) bool { if tc.statTotal == total && tc.statLastUpdated == lastUpdated { return false @@ -94,19 +103,26 @@ func loopSyncTargets() { func syncTargets() error { start := time.Now() - stat, err := models.TargetStatistics(config.C.ClusterName) + clusterName := config.ReaderClient.GetClusterName() + if clusterName == "" { + TargetCache.Reset() + logger.Warning("cluster name is blank") + return nil + } + + stat, err := models.TargetStatistics(clusterName) if err != nil { return errors.WithMessage(err, "failed to exec TargetStatistics") } if !TargetCache.StatChanged(stat.Total, stat.LastUpdated) { - promstat.GaugeCronDuration.WithLabelValues(config.C.ClusterName, "sync_targets").Set(0) - promstat.GaugeSyncNumber.WithLabelValues(config.C.ClusterName, "sync_targets").Set(0) + promstat.GaugeCronDuration.WithLabelValues(clusterName, "sync_targets").Set(0) + promstat.GaugeSyncNumber.WithLabelValues(clusterName, "sync_targets").Set(0) logger.Debug("targets not changed") return nil } - lst, err := models.TargetGetsByCluster(config.C.ClusterName) + lst, err := models.TargetGetsByCluster(clusterName) if err != nil { return errors.WithMessage(err, "failed to exec TargetGetsByCluster") } @@ -129,8 +145,8 @@ func syncTargets() error { TargetCache.Set(m, stat.Total, stat.LastUpdated) ms := time.Since(start).Milliseconds() - promstat.GaugeCronDuration.WithLabelValues(config.C.ClusterName, "sync_targets").Set(float64(ms)) - promstat.GaugeSyncNumber.WithLabelValues(config.C.ClusterName, "sync_targets").Set(float64(len(lst))) + promstat.GaugeCronDuration.WithLabelValues(clusterName, "sync_targets").Set(float64(ms)) + promstat.GaugeSyncNumber.WithLabelValues(clusterName, "sync_targets").Set(float64(len(lst))) logger.Infof("timer: sync targets done, cost: %dms, number: %d", ms, len(lst)) return nil diff --git a/src/server/memsto/user_cache.go b/src/server/memsto/user_cache.go index 4cee8c2b..1d1b7fb7 100644 --- a/src/server/memsto/user_cache.go +++ b/src/server/memsto/user_cache.go @@ -124,9 +124,14 @@ func syncUsers() error { return errors.WithMessage(err, "failed to exec UserStatistics") } + clusterName := config.ReaderClient.GetClusterName() + if !UserCache.StatChanged(stat.Total, stat.LastUpdated) { - promstat.GaugeCronDuration.WithLabelValues(config.C.ClusterName, "sync_users").Set(0) - promstat.GaugeSyncNumber.WithLabelValues(config.C.ClusterName, "sync_users").Set(0) + if clusterName != "" { + promstat.GaugeCronDuration.WithLabelValues(clusterName, "sync_users").Set(0) + promstat.GaugeSyncNumber.WithLabelValues(clusterName, "sync_users").Set(0) + } + logger.Debug("users not changed") return nil } @@ -144,8 +149,11 @@ func syncUsers() error { UserCache.Set(m, stat.Total, stat.LastUpdated) ms := time.Since(start).Milliseconds() - promstat.GaugeCronDuration.WithLabelValues(config.C.ClusterName, "sync_users").Set(float64(ms)) - promstat.GaugeSyncNumber.WithLabelValues(config.C.ClusterName, "sync_users").Set(float64(len(m))) + if clusterName != "" { + promstat.GaugeCronDuration.WithLabelValues(clusterName, "sync_users").Set(float64(ms)) + promstat.GaugeSyncNumber.WithLabelValues(clusterName, "sync_users").Set(float64(len(m))) + } + logger.Infof("timer: sync users done, cost: %dms, number: %d", ms, len(m)) return nil diff --git a/src/server/memsto/user_group_cache.go b/src/server/memsto/user_group_cache.go index 13ca4843..4d9ec2b9 100644 --- a/src/server/memsto/user_group_cache.go +++ b/src/server/memsto/user_group_cache.go @@ -106,9 +106,14 @@ func syncUserGroups() error { return errors.WithMessage(err, "failed to exec UserGroupStatistics") } + clusterName := config.ReaderClient.GetClusterName() + if !UserGroupCache.StatChanged(stat.Total, stat.LastUpdated) { - promstat.GaugeCronDuration.WithLabelValues(config.C.ClusterName, "sync_user_groups").Set(0) - promstat.GaugeSyncNumber.WithLabelValues(config.C.ClusterName, "sync_user_groups").Set(0) + if clusterName != "" { + promstat.GaugeCronDuration.WithLabelValues(clusterName, "sync_user_groups").Set(0) + promstat.GaugeSyncNumber.WithLabelValues(clusterName, "sync_user_groups").Set(0) + } + logger.Debug("user_group not changed") return nil } @@ -145,8 +150,11 @@ func syncUserGroups() error { UserGroupCache.Set(m, stat.Total, stat.LastUpdated) ms := time.Since(start).Milliseconds() - promstat.GaugeCronDuration.WithLabelValues(config.C.ClusterName, "sync_user_groups").Set(float64(ms)) - promstat.GaugeSyncNumber.WithLabelValues(config.C.ClusterName, "sync_user_groups").Set(float64(len(m))) + if clusterName != "" { + promstat.GaugeCronDuration.WithLabelValues(clusterName, "sync_user_groups").Set(float64(ms)) + promstat.GaugeSyncNumber.WithLabelValues(clusterName, "sync_user_groups").Set(float64(len(m))) + } + logger.Infof("timer: sync user groups done, cost: %dms, number: %d", ms, len(m)) return nil diff --git a/src/server/reader/reader.go b/src/server/reader/reader.go deleted file mode 100644 index c7b27a64..00000000 --- a/src/server/reader/reader.go +++ /dev/null @@ -1,177 +0,0 @@ -package reader - -import ( - "encoding/json" - "fmt" - "net" - "net/http" - "strings" - "time" - - "github.com/didi/nightingale/v5/src/models" - "github.com/didi/nightingale/v5/src/pkg/prom" - "github.com/didi/nightingale/v5/src/server/config" - "github.com/prometheus/client_golang/api" - "github.com/toolkits/pkg/logger" -) - -var Client prom.API -var LocalPromOption PromOption - -func Init() error { - rf := strings.ToLower(strings.TrimSpace(config.C.ReaderFrom)) - if rf == "" || rf == "config" { - return initFromConfig() - } - - if rf == "database" { - return initFromDatabase() - } - - return fmt.Errorf("invalid configuration ReaderFrom: %s", rf) -} - -func initFromConfig() error { - opts := config.C.Reader - - if opts.Url == "" { - logger.Warning("reader url is blank") - return nil - } - - cli, err := api.NewClient(api.Config{ - Address: opts.Url, - RoundTripper: &http.Transport{ - // TLSClientConfig: tlsConfig, - Proxy: http.ProxyFromEnvironment, - DialContext: (&net.Dialer{ - Timeout: time.Duration(opts.DialTimeout) * time.Millisecond, - }).DialContext, - ResponseHeaderTimeout: time.Duration(opts.Timeout) * time.Millisecond, - MaxIdleConnsPerHost: opts.MaxIdleConnsPerHost, - }, - }) - - if err != nil { - return err - } - - Client = prom.NewAPI(cli, prom.ClientOptions{ - BasicAuthUser: opts.BasicAuthUser, - BasicAuthPass: opts.BasicAuthPass, - Headers: opts.Headers, - }) - - return nil -} - -func initFromDatabase() error { - go func() { - for { - loadFromDatabase() - time.Sleep(time.Second) - } - }() - return nil -} - -type PromOption struct { - Url string - User string - Pass string - Headers []string - Timeout int64 - DialTimeout int64 - MaxIdleConnsPerHost int -} - -func (po *PromOption) Equal(target PromOption) bool { - if po.Url != target.Url { - return false - } - - if po.User != target.User { - return false - } - - if po.Pass != target.Pass { - return false - } - - if po.Timeout != target.Timeout { - return false - } - - if po.DialTimeout != target.DialTimeout { - return false - } - - if po.MaxIdleConnsPerHost != target.MaxIdleConnsPerHost { - return false - } - - if len(po.Headers) != len(target.Headers) { - return false - } - - for i := 0; i < len(po.Headers); i++ { - if po.Headers[i] != target.Headers[i] { - return false - } - } - - return true -} - -func loadFromDatabase() { - cluster, err := models.AlertingEngineGetCluster(config.C.Heartbeat.Endpoint) - if err != nil { - logger.Errorf("failed to get current cluster, error: %v", err) - return - } - - ckey := "prom." + cluster + ".option" - cval, err := models.ConfigsGet(ckey) - if err != nil { - logger.Errorf("failed to get ckey: %s, error: %v", ckey, err) - return - } - - if cval == "" { - Client = nil - return - } - - var po PromOption - err = json.Unmarshal([]byte(cval), &po) - if err != nil { - logger.Errorf("failed to unmarshal PromOption: %s", err) - return - } - - if Client == nil || !LocalPromOption.Equal(po) { - cli, err := api.NewClient(api.Config{ - Address: po.Url, - RoundTripper: &http.Transport{ - // TLSClientConfig: tlsConfig, - Proxy: http.ProxyFromEnvironment, - DialContext: (&net.Dialer{ - Timeout: time.Duration(po.DialTimeout) * time.Millisecond, - }).DialContext, - ResponseHeaderTimeout: time.Duration(po.Timeout) * time.Millisecond, - MaxIdleConnsPerHost: po.MaxIdleConnsPerHost, - }, - }) - - if err != nil { - logger.Errorf("failed to NewPromClient: %v", err) - return - } - - Client = prom.NewAPI(cli, prom.ClientOptions{ - BasicAuthUser: po.User, - BasicAuthPass: po.Pass, - Headers: po.Headers, - }) - } -} diff --git a/src/server/router/router_datadog.go b/src/server/router/router_datadog.go index 01e79267..ceb8cc82 100644 --- a/src/server/router/router_datadog.go +++ b/src/server/router/router_datadog.go @@ -269,7 +269,10 @@ func datadogSeries(c *gin.Context) { } if succ > 0 { - promstat.CounterSampleTotal.WithLabelValues(config.C.ClusterName, "datadog").Add(float64(succ)) + cn := config.ReaderClient.GetClusterName() + if cn != "" { + promstat.CounterSampleTotal.WithLabelValues(cn, "datadog").Add(float64(succ)) + } idents.Idents.MSet(ids) } diff --git a/src/server/router/router_event.go b/src/server/router/router_event.go index ee4ec7eb..b5aaa5bf 100644 --- a/src/server/router/router_event.go +++ b/src/server/router/router_event.go @@ -63,7 +63,11 @@ func pushEventToQueue(c *gin.Context) { event.NotifyChannels = strings.Join(event.NotifyChannelsJSON, " ") event.NotifyGroups = strings.Join(event.NotifyGroupsJSON, " ") - promstat.CounterAlertsTotal.WithLabelValues(config.C.ClusterName).Inc() + cn := config.ReaderClient.GetClusterName() + if cn != "" { + promstat.CounterAlertsTotal.WithLabelValues(cn).Inc() + } + engine.LogEvent(event, "http_push_queue") if !engine.EventQueue.PushFront(event) { msg := fmt.Sprintf("event:%+v push_queue err: queue is full", event) @@ -77,6 +81,7 @@ type eventForm struct { Alert bool `json:"alert"` Vectors []conv.Vector `json:"vectors"` RuleId int64 `json:"rule_id"` + Cluster string `json:"cluster"` } func judgeEvent(c *gin.Context) { @@ -86,7 +91,7 @@ func judgeEvent(c *gin.Context) { if !exists { ginx.Bomb(200, "rule not exists") } - re.Judge(form.Vectors) + re.Judge(form.Cluster, form.Vectors) ginx.NewRender(c).Message(nil) } @@ -102,7 +107,7 @@ func makeEvent(c *gin.Context) { } if events[i].Alert { - go re.MakeNewEvent("http", now, events[i].Vectors) + go re.MakeNewEvent("http", now, events[i].Cluster, events[i].Vectors) } else { for _, vector := range events[i].Vectors { hash := str.MD5(fmt.Sprintf("%d_%s", events[i].RuleId, vector.Key)) diff --git a/src/server/router/router_openfalcon.go b/src/server/router/router_openfalcon.go index 4eb885b6..c8943955 100644 --- a/src/server/router/router_openfalcon.go +++ b/src/server/router/router_openfalcon.go @@ -214,7 +214,11 @@ func falconPush(c *gin.Context) { } if succ > 0 { - promstat.CounterSampleTotal.WithLabelValues(config.C.ClusterName, "openfalcon").Add(float64(succ)) + cn := config.ReaderClient.GetClusterName() + if cn != "" { + promstat.CounterSampleTotal.WithLabelValues(cn, "openfalcon").Add(float64(succ)) + } + idents.Idents.MSet(ids) } diff --git a/src/server/router/router_opentsdb.go b/src/server/router/router_opentsdb.go index 27d1d37e..c7b167a3 100644 --- a/src/server/router/router_opentsdb.go +++ b/src/server/router/router_opentsdb.go @@ -208,7 +208,10 @@ func handleOpenTSDB(c *gin.Context) { } if succ > 0 { - promstat.CounterSampleTotal.WithLabelValues(config.C.ClusterName, "opentsdb").Add(float64(succ)) + cn := config.ReaderClient.GetClusterName() + if cn != "" { + promstat.CounterSampleTotal.WithLabelValues(cn, "opentsdb").Add(float64(succ)) + } idents.Idents.MSet(ids) } diff --git a/src/server/router/router_prom.go b/src/server/router/router_prom.go index 7d2b57dd..30601457 100644 --- a/src/server/router/router_prom.go +++ b/src/server/router/router_prom.go @@ -17,7 +17,6 @@ import ( "github.com/didi/nightingale/v5/src/server/config" "github.com/didi/nightingale/v5/src/server/idents" "github.com/didi/nightingale/v5/src/server/memsto" - "github.com/didi/nightingale/v5/src/server/reader" promstat "github.com/didi/nightingale/v5/src/server/stat" "github.com/didi/nightingale/v5/src/server/writer" ) @@ -38,12 +37,12 @@ func queryPromql(c *gin.Context) { var f promqlForm ginx.BindJSON(c, &f) - if reader.Client == nil { - c.String(500, "reader.Client is nil") + if config.ReaderClient.IsNil() { + c.String(500, "reader client is nil") return } - value, warnings, err := reader.Client.Query(c.Request.Context(), f.PromQL, time.Now()) + value, warnings, err := config.ReaderClient.GetCli().Query(c.Request.Context(), f.PromQL, time.Now()) if err != nil { c.String(500, "promql:%s error:%v", f.PromQL, err) return @@ -147,7 +146,11 @@ func remoteWrite(c *gin.Context) { writer.Writers.PushSample(metric, req.Timeseries[i]) } - promstat.CounterSampleTotal.WithLabelValues(config.C.ClusterName, "prometheus").Add(float64(count)) + cn := config.ReaderClient.GetClusterName() + if cn != "" { + promstat.CounterSampleTotal.WithLabelValues(cn, "prometheus").Add(float64(count)) + } + idents.Idents.MSet(ids) } diff --git a/src/server/server.go b/src/server/server.go index 32035865..46432b5f 100644 --- a/src/server/server.go +++ b/src/server/server.go @@ -18,7 +18,6 @@ import ( "github.com/didi/nightingale/v5/src/server/idents" "github.com/didi/nightingale/v5/src/server/memsto" "github.com/didi/nightingale/v5/src/server/naming" - "github.com/didi/nightingale/v5/src/server/reader" "github.com/didi/nightingale/v5/src/server/router" "github.com/didi/nightingale/v5/src/server/stat" "github.com/didi/nightingale/v5/src/server/usage" @@ -125,7 +124,7 @@ func (s Server) initialize() (func(), error) { } // init prometheus remote reader - if err = reader.Init(); err != nil { + if err = config.InitReader(); err != nil { return fns.Ret(), err } diff --git a/src/server/usage/usage.go b/src/server/usage/usage.go index 66dfa578..3fd0e6ac 100644 --- a/src/server/usage/usage.go +++ b/src/server/usage/usage.go @@ -2,7 +2,6 @@ package usage import ( "bytes" - "context" "encoding/json" "fmt" "io/ioutil" @@ -12,8 +11,6 @@ import ( "github.com/didi/nightingale/v5/src/models" "github.com/didi/nightingale/v5/src/pkg/version" - "github.com/didi/nightingale/v5/src/server/common/conv" - "github.com/didi/nightingale/v5/src/server/reader" ) const ( @@ -29,28 +26,6 @@ type Usage struct { Version string `json:"version"` } -func getSamples() (float64, error) { - if reader.Client == nil { - return 0, fmt.Errorf("reader.Client is nil") - } - - value, warns, err := reader.Client.Query(context.Background(), request, time.Now()) - if err != nil { - return 0, err - } - - if len(warns) > 0 { - return 0, fmt.Errorf("occur some warnings: %v", warns) - } - - lst := conv.ConvertVectors(value) - if len(lst) == 0 { - return 0, fmt.Errorf("convert result is empty") - } - - return lst[0].Value, nil -} - func Report() { for { time.Sleep(time.Minute * 10) @@ -59,7 +34,6 @@ func Report() { } func report() { - // sps, _ := getSamples() tnum, err := models.TargetTotalCount() if err != nil { return diff --git a/src/server/writer/writer.go b/src/server/writer/writer.go index 7d8b8723..d5de8099 100644 --- a/src/server/writer/writer.go +++ b/src/server/writer/writer.go @@ -49,7 +49,10 @@ func (w WriterType) Write(index int, items []*prompb.TimeSeries, headers ...map[ start := time.Now() defer func() { - promstat.ForwardDuration.WithLabelValues(config.C.ClusterName, fmt.Sprint(index)).Observe(time.Since(start).Seconds()) + cn := config.ReaderClient.GetClusterName() + if cn != "" { + promstat.ForwardDuration.WithLabelValues(cn, fmt.Sprint(index)).Observe(time.Since(start).Seconds()) + } }() req := &prompb.WriteRequest{ @@ -240,11 +243,16 @@ func Init(opts []config.WriterOptions, globalOpt config.WriterGlobalOpt) error { } func reportChanSize() { + clusterName := config.ReaderClient.GetClusterName() + if clusterName == "" { + return + } + for { time.Sleep(time.Second * 3) for i, c := range Writers.chans { size := len(c) - promstat.GaugeSampleQueueSize.WithLabelValues(config.C.ClusterName, fmt.Sprint(i)).Set(float64(size)) + promstat.GaugeSampleQueueSize.WithLabelValues(clusterName, fmt.Sprint(i)).Set(float64(size)) } } }