Manage bindings of n9e-server and datasource in web (#1127)

* manage bindings of n9e-server and datasource

* fix sync memsto
This commit is contained in:
ulricqin 2022-08-22 18:39:29 +08:00 committed by GitHub
parent 80ee54898a
commit e7d2c45f9d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
25 changed files with 496 additions and 297 deletions

View File

@ -98,7 +98,6 @@ func MustLoad(fpaths ...string) {
}
C.Heartbeat.Endpoint = fmt.Sprintf("%s:%d", C.Heartbeat.IP, C.HTTP.Port)
C.Alerting.RedisPub.ChannelKey = C.Alerting.RedisPub.ChannelPrefix + C.ClusterName
if C.Alerting.Webhook.Enable {
if C.Alerting.Webhook.Timeout == "" {
@ -202,23 +201,10 @@ type Config struct {
DB ormx.DBConfig
WriterOpt WriterGlobalOpt
Writers []WriterOptions
Reader ReaderOptions
Reader PromOption
Ibex Ibex
}
type ReaderOptions struct {
Url string
BasicAuthUser string
BasicAuthPass string
Timeout int64
DialTimeout int64
MaxIdleConnsPerHost int
Headers []string
}
type WriterOptions struct {
Url string
BasicAuthUser string

View File

@ -0,0 +1,59 @@
package config
import (
"sync"
"github.com/didi/nightingale/v5/src/pkg/prom"
)
type PromClient struct {
prom.API
ClusterName string
sync.RWMutex
}
var ReaderClient *PromClient = &PromClient{}
func (pc *PromClient) Set(clusterName string, c prom.API) {
pc.Lock()
defer pc.Unlock()
pc.ClusterName = clusterName
pc.API = c
}
func (pc *PromClient) Get() (string, prom.API) {
pc.RLock()
defer pc.RUnlock()
return pc.ClusterName, pc.API
}
func (pc *PromClient) GetClusterName() string {
pc.RLock()
defer pc.RUnlock()
return pc.ClusterName
}
func (pc *PromClient) GetCli() prom.API {
pc.RLock()
defer pc.RUnlock()
return pc.API
}
func (pc *PromClient) IsNil() bool {
if pc == nil {
return true
}
pc.RLock()
defer pc.RUnlock()
return pc.API == nil
}
func (pc *PromClient) Reset() {
pc.Lock()
defer pc.Unlock()
pc.ClusterName = ""
pc.API = nil
}

View File

@ -0,0 +1,81 @@
package config
import "sync"
type PromOption struct {
Url string
BasicAuthUser string
BasicAuthPass string
Timeout int64
DialTimeout int64
MaxIdleConnsPerHost int
Headers []string
}
func (po *PromOption) Equal(target PromOption) bool {
if po.Url != target.Url {
return false
}
if po.BasicAuthUser != target.BasicAuthUser {
return false
}
if po.BasicAuthPass != target.BasicAuthPass {
return false
}
if po.Timeout != target.Timeout {
return false
}
if po.DialTimeout != target.DialTimeout {
return false
}
if po.MaxIdleConnsPerHost != target.MaxIdleConnsPerHost {
return false
}
if len(po.Headers) != len(target.Headers) {
return false
}
for i := 0; i < len(po.Headers); i++ {
if po.Headers[i] != target.Headers[i] {
return false
}
}
return true
}
type PromOptionsStruct struct {
Data map[string]PromOption
sync.RWMutex
}
func (pos *PromOptionsStruct) Set(clusterName string, po PromOption) {
pos.Lock()
pos.Data[clusterName] = po
pos.Unlock()
}
func (pos *PromOptionsStruct) Sets(clusterName string, po PromOption) {
pos.Lock()
pos.Data = map[string]PromOption{clusterName: po}
pos.Unlock()
}
func (pos *PromOptionsStruct) Get(clusterName string) (PromOption, bool) {
pos.RLock()
defer pos.RUnlock()
ret, has := pos.Data[clusterName]
return ret, has
}
// Data key is cluster name
var PromOptions = &PromOptionsStruct{Data: make(map[string]PromOption)}

131
src/server/config/reader.go Normal file
View File

@ -0,0 +1,131 @@
package config
import (
"encoding/json"
"fmt"
"net"
"net/http"
"strings"
"time"
"github.com/didi/nightingale/v5/src/models"
"github.com/didi/nightingale/v5/src/pkg/prom"
"github.com/prometheus/client_golang/api"
"github.com/toolkits/pkg/logger"
)
func InitReader() error {
rf := strings.ToLower(strings.TrimSpace(C.ReaderFrom))
if rf == "" || rf == "config" {
return setClientFromPromOption(C.ClusterName, C.Reader)
}
if rf == "database" {
return initFromDatabase()
}
return fmt.Errorf("invalid configuration ReaderFrom: %s", rf)
}
func initFromDatabase() error {
go func() {
for {
loadFromDatabase()
time.Sleep(time.Second)
}
}()
return nil
}
func loadFromDatabase() {
cluster, err := models.AlertingEngineGetCluster(C.Heartbeat.Endpoint)
if err != nil {
logger.Errorf("failed to get current cluster, error: %v", err)
return
}
if cluster == "" {
ReaderClient.Reset()
logger.Warning("no datasource binded to me")
return
}
ckey := "prom." + cluster + ".option"
cval, err := models.ConfigsGet(ckey)
if err != nil {
logger.Errorf("failed to get ckey: %s, error: %v", ckey, err)
return
}
if cval == "" {
ReaderClient.Reset()
return
}
var po PromOption
err = json.Unmarshal([]byte(cval), &po)
if err != nil {
logger.Errorf("failed to unmarshal PromOption: %s", err)
return
}
if ReaderClient.IsNil() {
// first time
if err = setClientFromPromOption(cluster, po); err != nil {
logger.Errorf("failed to setClientFromPromOption: %v", err)
return
}
PromOptions.Sets(cluster, po)
return
}
localPo, has := PromOptions.Get(cluster)
if !has || !localPo.Equal(po) {
if err = setClientFromPromOption(cluster, po); err != nil {
logger.Errorf("failed to setClientFromPromOption: %v", err)
return
}
PromOptions.Sets(cluster, po)
return
}
}
func newClientFromPromOption(po PromOption) (api.Client, error) {
return api.NewClient(api.Config{
Address: po.Url,
RoundTripper: &http.Transport{
// TLSClientConfig: tlsConfig,
Proxy: http.ProxyFromEnvironment,
DialContext: (&net.Dialer{
Timeout: time.Duration(po.DialTimeout) * time.Millisecond,
}).DialContext,
ResponseHeaderTimeout: time.Duration(po.Timeout) * time.Millisecond,
MaxIdleConnsPerHost: po.MaxIdleConnsPerHost,
},
})
}
func setClientFromPromOption(clusterName string, po PromOption) error {
if clusterName == "" {
return fmt.Errorf("argument clusterName is blank")
}
if po.Url == "" {
return fmt.Errorf("prometheus url is blank")
}
cli, err := newClientFromPromOption(po)
if err != nil {
return fmt.Errorf("failed to newClientFromPromOption: %v", err)
}
ReaderClient.Set(clusterName, prom.NewAPI(cli, prom.ClientOptions{
BasicAuthUser: po.BasicAuthUser,
BasicAuthPass: po.BasicAuthPass,
Headers: po.Headers,
}))
return nil
}

View File

@ -53,6 +53,10 @@ func Reload() {
func reportQueueSize() {
for {
time.Sleep(time.Second)
promstat.GaugeAlertQueueSize.WithLabelValues(config.C.ClusterName).Set(float64(EventQueue.Len()))
clusterName := config.ReaderClient.GetClusterName()
if clusterName == "" {
continue
}
promstat.GaugeAlertQueueSize.WithLabelValues(clusterName).Set(float64(EventQueue.Len()))
}
}

View File

@ -101,12 +101,13 @@ func genNotice(event *models.AlertCurEvent) Notice {
return Notice{Event: event, Tpls: ntpls}
}
func alertingRedisPub(bs []byte) {
func alertingRedisPub(clusterName string, bs []byte) {
channelKey := config.C.Alerting.RedisPub.ChannelPrefix + clusterName
// pub all alerts to redis
if config.C.Alerting.RedisPub.Enable {
err := storage.Redis.Publish(context.Background(), config.C.Alerting.RedisPub.ChannelKey, bs).Err()
err := storage.Redis.Publish(context.Background(), channelKey, bs).Err()
if err != nil {
logger.Errorf("event_notify: redis publish %s err: %v", config.C.Alerting.RedisPub.ChannelKey, err)
logger.Errorf("event_notify: redis publish %s err: %v", channelKey, err)
}
}
}
@ -249,7 +250,7 @@ func notify(event *models.AlertCurEvent) {
return
}
alertingRedisPub(stdinBytes)
alertingRedisPub(event.Cluster, stdinBytes)
alertingWebhook(event)
handleNotice(notice, stdinBytes)

View File

@ -20,7 +20,6 @@ import (
"github.com/didi/nightingale/v5/src/server/config"
"github.com/didi/nightingale/v5/src/server/memsto"
"github.com/didi/nightingale/v5/src/server/naming"
"github.com/didi/nightingale/v5/src/server/reader"
promstat "github.com/didi/nightingale/v5/src/server/stat"
)
@ -167,16 +166,18 @@ func (r *RuleEval) Work() {
return
}
if reader.Client == nil {
logger.Error("reader.Client is nil")
if config.ReaderClient.IsNil() {
logger.Error("reader client is nil")
return
}
clusterName, readerClient := config.ReaderClient.Get()
var value model.Value
var err error
if r.rule.Algorithm == "" && (r.rule.Cate == "" || r.rule.Cate == "prometheus") {
var warnings prom.Warnings
value, warnings, err = reader.Client.Query(context.Background(), promql, time.Now())
value, warnings, err = readerClient.Query(context.Background(), promql, time.Now())
if err != nil {
logger.Errorf("rule_eval:%d promql:%s, error:%v", r.RuleID(), promql, err)
//notifyToMaintainer(err, "failed to query prometheus")
@ -191,7 +192,7 @@ func (r *RuleEval) Work() {
logger.Debugf("rule_eval:%d promql:%s, value:%v", r.RuleID(), promql, value)
}
r.Judge(conv.ConvertVectors(value))
r.Judge(clusterName, conv.ConvertVectors(value))
}
type WorkersType struct {
@ -306,10 +307,10 @@ func (ws *WorkersType) BuildRe(rids []int64) {
}
}
func (r *RuleEval) Judge(vectors []conv.Vector) {
func (r *RuleEval) Judge(clusterName string, vectors []conv.Vector) {
now := time.Now().Unix()
alertingKeys, ruleExists := r.MakeNewEvent("inner", now, vectors)
alertingKeys, ruleExists := r.MakeNewEvent("inner", now, clusterName, vectors)
if !ruleExists {
return
}
@ -318,7 +319,7 @@ func (r *RuleEval) Judge(vectors []conv.Vector) {
r.recoverRule(alertingKeys, now)
}
func (r *RuleEval) MakeNewEvent(from string, now int64, vectors []conv.Vector) (map[string]struct{}, bool) {
func (r *RuleEval) MakeNewEvent(from string, now int64, clusterName string, vectors []conv.Vector) (map[string]struct{}, bool) {
// 有可能rule的一些配置已经发生变化比如告警接收人、callbacks等
// 这些信息的修改是不会引起worker restart的但是确实会影响告警处理逻辑
// 所以这里直接从memsto.AlertRuleCache中获取并覆盖
@ -394,7 +395,7 @@ func (r *RuleEval) MakeNewEvent(from string, now int64, vectors []conv.Vector) (
tagsArr := labelMapToArr(tagsMap)
sort.Strings(tagsArr)
event.Cluster = config.C.ClusterName
event.Cluster = clusterName
event.Cate = r.rule.Cate
event.Hash = hash
event.RuleId = r.rule.Id
@ -583,7 +584,7 @@ func (r *RuleEval) pushEventToQueue(event *models.AlertCurEvent) {
r.fires.Set(event.Hash, event)
}
promstat.CounterAlertsTotal.WithLabelValues(config.C.ClusterName).Inc()
promstat.CounterAlertsTotal.WithLabelValues(event.Cluster).Inc()
LogEvent(event, "push_queue")
if !EventQueue.PushFront(event) {
logger.Warningf("event_push_queue: queue is full")
@ -650,12 +651,12 @@ func (r RecordingRuleEval) Work() {
return
}
if reader.Client == nil {
log.Println("reader.Client is nil")
if config.ReaderClient.IsNil() {
log.Println("reader client is nil")
return
}
value, warnings, err := reader.Client.Query(context.Background(), promql, time.Now())
value, warnings, err := config.ReaderClient.GetCli().Query(context.Background(), promql, time.Now())
if err != nil {
logger.Errorf("recording_rule_eval:%d promql:%s, error:%v", r.RuleID(), promql, err)
return

View File

@ -41,6 +41,10 @@ func toRedis() {
return
}
if config.ReaderClient.IsNil() {
return
}
now := time.Now().Unix()
// clean old idents
@ -49,7 +53,7 @@ func toRedis() {
Idents.Remove(key)
} else {
// use now as timestamp to redis
err := storage.Redis.HSet(context.Background(), redisKey(config.C.ClusterName), key, now).Err()
err := storage.Redis.HSet(context.Background(), redisKey(config.ReaderClient.GetClusterName()), key, now).Err()
if err != nil {
logger.Errorf("redis hset idents failed: %v", err)
}
@ -103,8 +107,14 @@ func pushMetrics() {
return
}
clusterName := config.ReaderClient.GetClusterName()
if clusterName == "" {
logger.Warning("cluster name is blank")
return
}
// get all the target heartbeat timestamp
ret, err := storage.Redis.HGetAll(context.Background(), redisKey(config.C.ClusterName)).Result()
ret, err := storage.Redis.HGetAll(context.Background(), redisKey(clusterName)).Result()
if err != nil {
logger.Errorf("handle_idents: redis hgetall fail: %v", err)
return
@ -121,7 +131,7 @@ func pushMetrics() {
}
if now-clock > dur {
clearDeadIdent(context.Background(), config.C.ClusterName, ident)
clearDeadIdent(context.Background(), clusterName, ident)
} else {
actives[ident] = struct{}{}
}
@ -153,7 +163,7 @@ func pushMetrics() {
if !has {
// target not exists
target = &models.Target{
Cluster: config.C.ClusterName,
Cluster: clusterName,
Ident: active,
Tags: "",
TagsJSON: []string{},

View File

@ -27,6 +27,15 @@ var AlertMuteCache = AlertMuteCacheType{
mutes: make(map[int64][]*models.AlertMute),
}
func (amc *AlertMuteCacheType) Reset() {
amc.Lock()
defer amc.Unlock()
amc.statTotal = -1
amc.statLastUpdated = -1
amc.mutes = make(map[int64][]*models.AlertMute)
}
func (amc *AlertMuteCacheType) StatChanged(total, lastUpdated int64) bool {
if amc.statTotal == total && amc.statLastUpdated == lastUpdated {
return false
@ -90,19 +99,26 @@ func loopSyncAlertMutes() {
func syncAlertMutes() error {
start := time.Now()
stat, err := models.AlertMuteStatistics(config.C.ClusterName)
clusterName := config.ReaderClient.GetClusterName()
if clusterName == "" {
AlertMuteCache.Reset()
logger.Warning("cluster name is blank")
return nil
}
stat, err := models.AlertMuteStatistics(clusterName)
if err != nil {
return errors.WithMessage(err, "failed to exec AlertMuteStatistics")
}
if !AlertMuteCache.StatChanged(stat.Total, stat.LastUpdated) {
promstat.GaugeCronDuration.WithLabelValues(config.C.ClusterName, "sync_alert_mutes").Set(0)
promstat.GaugeSyncNumber.WithLabelValues(config.C.ClusterName, "sync_alert_mutes").Set(0)
promstat.GaugeCronDuration.WithLabelValues(clusterName, "sync_alert_mutes").Set(0)
promstat.GaugeSyncNumber.WithLabelValues(clusterName, "sync_alert_mutes").Set(0)
logger.Debug("alert mutes not changed")
return nil
}
lst, err := models.AlertMuteGetsByCluster(config.C.ClusterName)
lst, err := models.AlertMuteGetsByCluster(clusterName)
if err != nil {
return errors.WithMessage(err, "failed to exec AlertMuteGetsByCluster")
}
@ -122,8 +138,8 @@ func syncAlertMutes() error {
AlertMuteCache.Set(oks, stat.Total, stat.LastUpdated)
ms := time.Since(start).Milliseconds()
promstat.GaugeCronDuration.WithLabelValues(config.C.ClusterName, "sync_alert_mutes").Set(float64(ms))
promstat.GaugeSyncNumber.WithLabelValues(config.C.ClusterName, "sync_alert_mutes").Set(float64(len(lst)))
promstat.GaugeCronDuration.WithLabelValues(clusterName, "sync_alert_mutes").Set(float64(ms))
promstat.GaugeSyncNumber.WithLabelValues(clusterName, "sync_alert_mutes").Set(float64(len(lst)))
logger.Infof("timer: sync mutes done, cost: %dms, number: %d", ms, len(lst))
return nil

View File

@ -27,6 +27,15 @@ var AlertRuleCache = AlertRuleCacheType{
rules: make(map[int64]*models.AlertRule),
}
func (arc *AlertRuleCacheType) Reset() {
arc.Lock()
defer arc.Unlock()
arc.statTotal = -1
arc.statLastUpdated = -1
arc.rules = make(map[int64]*models.AlertRule)
}
func (arc *AlertRuleCacheType) StatChanged(total, lastUpdated int64) bool {
if arc.statTotal == total && arc.statLastUpdated == lastUpdated {
return false
@ -87,19 +96,26 @@ func loopSyncAlertRules() {
func syncAlertRules() error {
start := time.Now()
stat, err := models.AlertRuleStatistics(config.C.ClusterName)
clusterName := config.ReaderClient.GetClusterName()
if clusterName == "" {
AlertRuleCache.Reset()
logger.Warning("cluster name is blank")
return nil
}
stat, err := models.AlertRuleStatistics(clusterName)
if err != nil {
return errors.WithMessage(err, "failed to exec AlertRuleStatistics")
}
if !AlertRuleCache.StatChanged(stat.Total, stat.LastUpdated) {
promstat.GaugeCronDuration.WithLabelValues(config.C.ClusterName, "sync_alert_rules").Set(0)
promstat.GaugeSyncNumber.WithLabelValues(config.C.ClusterName, "sync_alert_rules").Set(0)
promstat.GaugeCronDuration.WithLabelValues(clusterName, "sync_alert_rules").Set(0)
promstat.GaugeSyncNumber.WithLabelValues(clusterName, "sync_alert_rules").Set(0)
logger.Debug("alert rules not changed")
return nil
}
lst, err := models.AlertRuleGetsByCluster(config.C.ClusterName)
lst, err := models.AlertRuleGetsByCluster(clusterName)
if err != nil {
return errors.WithMessage(err, "failed to exec AlertRuleGetsByCluster")
}
@ -112,8 +128,8 @@ func syncAlertRules() error {
AlertRuleCache.Set(m, stat.Total, stat.LastUpdated)
ms := time.Since(start).Milliseconds()
promstat.GaugeCronDuration.WithLabelValues(config.C.ClusterName, "sync_alert_rules").Set(float64(ms))
promstat.GaugeSyncNumber.WithLabelValues(config.C.ClusterName, "sync_alert_rules").Set(float64(len(m)))
promstat.GaugeCronDuration.WithLabelValues(clusterName, "sync_alert_rules").Set(float64(ms))
promstat.GaugeSyncNumber.WithLabelValues(clusterName, "sync_alert_rules").Set(float64(len(m)))
logger.Infof("timer: sync rules done, cost: %dms, number: %d", ms, len(m))
return nil

View File

@ -27,6 +27,15 @@ var AlertSubscribeCache = AlertSubscribeCacheType{
subs: make(map[int64][]*models.AlertSubscribe),
}
func (c *AlertSubscribeCacheType) Reset() {
c.Lock()
defer c.Unlock()
c.statTotal = -1
c.statLastUpdated = -1
c.subs = make(map[int64][]*models.AlertSubscribe)
}
func (c *AlertSubscribeCacheType) StatChanged(total, lastUpdated int64) bool {
if c.statTotal == total && c.statLastUpdated == lastUpdated {
return false
@ -93,19 +102,26 @@ func loopSyncAlertSubscribes() {
func syncAlertSubscribes() error {
start := time.Now()
stat, err := models.AlertSubscribeStatistics(config.C.ClusterName)
clusterName := config.ReaderClient.GetClusterName()
if clusterName == "" {
AlertSubscribeCache.Reset()
logger.Warning("cluster name is blank")
return nil
}
stat, err := models.AlertSubscribeStatistics(clusterName)
if err != nil {
return errors.WithMessage(err, "failed to exec AlertSubscribeStatistics")
}
if !AlertSubscribeCache.StatChanged(stat.Total, stat.LastUpdated) {
promstat.GaugeCronDuration.WithLabelValues(config.C.ClusterName, "sync_alert_subscribes").Set(0)
promstat.GaugeSyncNumber.WithLabelValues(config.C.ClusterName, "sync_alert_subscribes").Set(0)
promstat.GaugeCronDuration.WithLabelValues(clusterName, "sync_alert_subscribes").Set(0)
promstat.GaugeSyncNumber.WithLabelValues(clusterName, "sync_alert_subscribes").Set(0)
logger.Debug("alert subscribes not changed")
return nil
}
lst, err := models.AlertSubscribeGetsByCluster(config.C.ClusterName)
lst, err := models.AlertSubscribeGetsByCluster(clusterName)
if err != nil {
return errors.WithMessage(err, "failed to exec AlertSubscribeGetsByCluster")
}
@ -125,8 +141,8 @@ func syncAlertSubscribes() error {
AlertSubscribeCache.Set(subs, stat.Total, stat.LastUpdated)
ms := time.Since(start).Milliseconds()
promstat.GaugeCronDuration.WithLabelValues(config.C.ClusterName, "sync_alert_subscribes").Set(float64(ms))
promstat.GaugeSyncNumber.WithLabelValues(config.C.ClusterName, "sync_alert_subscribes").Set(float64(len(lst)))
promstat.GaugeCronDuration.WithLabelValues(clusterName, "sync_alert_subscribes").Set(float64(ms))
promstat.GaugeSyncNumber.WithLabelValues(clusterName, "sync_alert_subscribes").Set(float64(len(lst)))
logger.Infof("timer: sync subscribes done, cost: %dms, number: %d", ms, len(lst))
return nil

View File

@ -79,9 +79,14 @@ func syncBusiGroups() error {
return errors.WithMessage(err, "failed to exec BusiGroupStatistics")
}
clusterName := config.ReaderClient.GetClusterName()
if !BusiGroupCache.StatChanged(stat.Total, stat.LastUpdated) {
promstat.GaugeCronDuration.WithLabelValues(config.C.ClusterName, "sync_busi_groups").Set(0)
promstat.GaugeSyncNumber.WithLabelValues(config.C.ClusterName, "sync_busi_groups").Set(0)
if clusterName != "" {
promstat.GaugeCronDuration.WithLabelValues(clusterName, "sync_busi_groups").Set(0)
promstat.GaugeSyncNumber.WithLabelValues(clusterName, "sync_busi_groups").Set(0)
}
logger.Debug("busi_group not changed")
return nil
}
@ -94,8 +99,11 @@ func syncBusiGroups() error {
BusiGroupCache.Set(m, stat.Total, stat.LastUpdated)
ms := time.Since(start).Milliseconds()
promstat.GaugeCronDuration.WithLabelValues(config.C.ClusterName, "sync_busi_groups").Set(float64(ms))
promstat.GaugeSyncNumber.WithLabelValues(config.C.ClusterName, "sync_busi_groups").Set(float64(len(m)))
if clusterName != "" {
promstat.GaugeCronDuration.WithLabelValues(clusterName, "sync_busi_groups").Set(float64(ms))
promstat.GaugeSyncNumber.WithLabelValues(clusterName, "sync_busi_groups").Set(float64(len(m)))
}
logger.Infof("timer: sync busi groups done, cost: %dms, number: %d", ms, len(m))
return nil

View File

@ -26,6 +26,15 @@ var RecordingRuleCache = RecordingRuleCacheType{
rules: make(map[int64]*models.RecordingRule),
}
func (rrc *RecordingRuleCacheType) Reset() {
rrc.Lock()
defer rrc.Unlock()
rrc.statTotal = -1
rrc.statLastUpdated = -1
rrc.rules = make(map[int64]*models.RecordingRule)
}
func (rrc *RecordingRuleCacheType) StatChanged(total, lastUpdated int64) bool {
if rrc.statTotal == total && rrc.statLastUpdated == lastUpdated {
return false
@ -86,19 +95,26 @@ func loopSyncRecordingRules() {
func syncRecordingRules() error {
start := time.Now()
stat, err := models.RecordingRuleStatistics(config.C.ClusterName)
clusterName := config.ReaderClient.GetClusterName()
if clusterName == "" {
RecordingRuleCache.Reset()
logger.Warning("cluster name is blank")
return nil
}
stat, err := models.RecordingRuleStatistics(clusterName)
if err != nil {
return errors.WithMessage(err, "failed to exec RecordingRuleStatistics")
}
if !RecordingRuleCache.StatChanged(stat.Total, stat.LastUpdated) {
promstat.GaugeCronDuration.WithLabelValues(config.C.ClusterName, "sync_recording_rules").Set(0)
promstat.GaugeSyncNumber.WithLabelValues(config.C.ClusterName, "sync_recording_rules").Set(0)
promstat.GaugeCronDuration.WithLabelValues(clusterName, "sync_recording_rules").Set(0)
promstat.GaugeSyncNumber.WithLabelValues(clusterName, "sync_recording_rules").Set(0)
logger.Debug("recoding rules not changed")
return nil
}
lst, err := models.RecordingRuleGetsByCluster(config.C.ClusterName)
lst, err := models.RecordingRuleGetsByCluster(clusterName)
if err != nil {
return errors.WithMessage(err, "failed to exec RecordingRuleGetsByCluster")
}
@ -111,8 +127,8 @@ func syncRecordingRules() error {
RecordingRuleCache.Set(m, stat.Total, stat.LastUpdated)
ms := time.Since(start).Milliseconds()
promstat.GaugeCronDuration.WithLabelValues(config.C.ClusterName, "sync_recording_rules").Set(float64(ms))
promstat.GaugeSyncNumber.WithLabelValues(config.C.ClusterName, "sync_recording_rules").Set(float64(len(m)))
promstat.GaugeCronDuration.WithLabelValues(clusterName, "sync_recording_rules").Set(float64(ms))
promstat.GaugeSyncNumber.WithLabelValues(clusterName, "sync_recording_rules").Set(float64(len(m)))
logger.Infof("timer: sync recording rules done, cost: %dms, number: %d", ms, len(m))
return nil

View File

@ -31,6 +31,15 @@ var TargetCache = TargetCacheType{
targets: make(map[string]*models.Target),
}
func (tc *TargetCacheType) Reset() {
tc.Lock()
defer tc.Unlock()
tc.statTotal = -1
tc.statLastUpdated = -1
tc.targets = make(map[string]*models.Target)
}
func (tc *TargetCacheType) StatChanged(total, lastUpdated int64) bool {
if tc.statTotal == total && tc.statLastUpdated == lastUpdated {
return false
@ -94,19 +103,26 @@ func loopSyncTargets() {
func syncTargets() error {
start := time.Now()
stat, err := models.TargetStatistics(config.C.ClusterName)
clusterName := config.ReaderClient.GetClusterName()
if clusterName == "" {
TargetCache.Reset()
logger.Warning("cluster name is blank")
return nil
}
stat, err := models.TargetStatistics(clusterName)
if err != nil {
return errors.WithMessage(err, "failed to exec TargetStatistics")
}
if !TargetCache.StatChanged(stat.Total, stat.LastUpdated) {
promstat.GaugeCronDuration.WithLabelValues(config.C.ClusterName, "sync_targets").Set(0)
promstat.GaugeSyncNumber.WithLabelValues(config.C.ClusterName, "sync_targets").Set(0)
promstat.GaugeCronDuration.WithLabelValues(clusterName, "sync_targets").Set(0)
promstat.GaugeSyncNumber.WithLabelValues(clusterName, "sync_targets").Set(0)
logger.Debug("targets not changed")
return nil
}
lst, err := models.TargetGetsByCluster(config.C.ClusterName)
lst, err := models.TargetGetsByCluster(clusterName)
if err != nil {
return errors.WithMessage(err, "failed to exec TargetGetsByCluster")
}
@ -129,8 +145,8 @@ func syncTargets() error {
TargetCache.Set(m, stat.Total, stat.LastUpdated)
ms := time.Since(start).Milliseconds()
promstat.GaugeCronDuration.WithLabelValues(config.C.ClusterName, "sync_targets").Set(float64(ms))
promstat.GaugeSyncNumber.WithLabelValues(config.C.ClusterName, "sync_targets").Set(float64(len(lst)))
promstat.GaugeCronDuration.WithLabelValues(clusterName, "sync_targets").Set(float64(ms))
promstat.GaugeSyncNumber.WithLabelValues(clusterName, "sync_targets").Set(float64(len(lst)))
logger.Infof("timer: sync targets done, cost: %dms, number: %d", ms, len(lst))
return nil

View File

@ -124,9 +124,14 @@ func syncUsers() error {
return errors.WithMessage(err, "failed to exec UserStatistics")
}
clusterName := config.ReaderClient.GetClusterName()
if !UserCache.StatChanged(stat.Total, stat.LastUpdated) {
promstat.GaugeCronDuration.WithLabelValues(config.C.ClusterName, "sync_users").Set(0)
promstat.GaugeSyncNumber.WithLabelValues(config.C.ClusterName, "sync_users").Set(0)
if clusterName != "" {
promstat.GaugeCronDuration.WithLabelValues(clusterName, "sync_users").Set(0)
promstat.GaugeSyncNumber.WithLabelValues(clusterName, "sync_users").Set(0)
}
logger.Debug("users not changed")
return nil
}
@ -144,8 +149,11 @@ func syncUsers() error {
UserCache.Set(m, stat.Total, stat.LastUpdated)
ms := time.Since(start).Milliseconds()
promstat.GaugeCronDuration.WithLabelValues(config.C.ClusterName, "sync_users").Set(float64(ms))
promstat.GaugeSyncNumber.WithLabelValues(config.C.ClusterName, "sync_users").Set(float64(len(m)))
if clusterName != "" {
promstat.GaugeCronDuration.WithLabelValues(clusterName, "sync_users").Set(float64(ms))
promstat.GaugeSyncNumber.WithLabelValues(clusterName, "sync_users").Set(float64(len(m)))
}
logger.Infof("timer: sync users done, cost: %dms, number: %d", ms, len(m))
return nil

View File

@ -106,9 +106,14 @@ func syncUserGroups() error {
return errors.WithMessage(err, "failed to exec UserGroupStatistics")
}
clusterName := config.ReaderClient.GetClusterName()
if !UserGroupCache.StatChanged(stat.Total, stat.LastUpdated) {
promstat.GaugeCronDuration.WithLabelValues(config.C.ClusterName, "sync_user_groups").Set(0)
promstat.GaugeSyncNumber.WithLabelValues(config.C.ClusterName, "sync_user_groups").Set(0)
if clusterName != "" {
promstat.GaugeCronDuration.WithLabelValues(clusterName, "sync_user_groups").Set(0)
promstat.GaugeSyncNumber.WithLabelValues(clusterName, "sync_user_groups").Set(0)
}
logger.Debug("user_group not changed")
return nil
}
@ -145,8 +150,11 @@ func syncUserGroups() error {
UserGroupCache.Set(m, stat.Total, stat.LastUpdated)
ms := time.Since(start).Milliseconds()
promstat.GaugeCronDuration.WithLabelValues(config.C.ClusterName, "sync_user_groups").Set(float64(ms))
promstat.GaugeSyncNumber.WithLabelValues(config.C.ClusterName, "sync_user_groups").Set(float64(len(m)))
if clusterName != "" {
promstat.GaugeCronDuration.WithLabelValues(clusterName, "sync_user_groups").Set(float64(ms))
promstat.GaugeSyncNumber.WithLabelValues(clusterName, "sync_user_groups").Set(float64(len(m)))
}
logger.Infof("timer: sync user groups done, cost: %dms, number: %d", ms, len(m))
return nil

View File

@ -1,177 +0,0 @@
package reader
import (
"encoding/json"
"fmt"
"net"
"net/http"
"strings"
"time"
"github.com/didi/nightingale/v5/src/models"
"github.com/didi/nightingale/v5/src/pkg/prom"
"github.com/didi/nightingale/v5/src/server/config"
"github.com/prometheus/client_golang/api"
"github.com/toolkits/pkg/logger"
)
var Client prom.API
var LocalPromOption PromOption
func Init() error {
rf := strings.ToLower(strings.TrimSpace(config.C.ReaderFrom))
if rf == "" || rf == "config" {
return initFromConfig()
}
if rf == "database" {
return initFromDatabase()
}
return fmt.Errorf("invalid configuration ReaderFrom: %s", rf)
}
func initFromConfig() error {
opts := config.C.Reader
if opts.Url == "" {
logger.Warning("reader url is blank")
return nil
}
cli, err := api.NewClient(api.Config{
Address: opts.Url,
RoundTripper: &http.Transport{
// TLSClientConfig: tlsConfig,
Proxy: http.ProxyFromEnvironment,
DialContext: (&net.Dialer{
Timeout: time.Duration(opts.DialTimeout) * time.Millisecond,
}).DialContext,
ResponseHeaderTimeout: time.Duration(opts.Timeout) * time.Millisecond,
MaxIdleConnsPerHost: opts.MaxIdleConnsPerHost,
},
})
if err != nil {
return err
}
Client = prom.NewAPI(cli, prom.ClientOptions{
BasicAuthUser: opts.BasicAuthUser,
BasicAuthPass: opts.BasicAuthPass,
Headers: opts.Headers,
})
return nil
}
func initFromDatabase() error {
go func() {
for {
loadFromDatabase()
time.Sleep(time.Second)
}
}()
return nil
}
type PromOption struct {
Url string
User string
Pass string
Headers []string
Timeout int64
DialTimeout int64
MaxIdleConnsPerHost int
}
func (po *PromOption) Equal(target PromOption) bool {
if po.Url != target.Url {
return false
}
if po.User != target.User {
return false
}
if po.Pass != target.Pass {
return false
}
if po.Timeout != target.Timeout {
return false
}
if po.DialTimeout != target.DialTimeout {
return false
}
if po.MaxIdleConnsPerHost != target.MaxIdleConnsPerHost {
return false
}
if len(po.Headers) != len(target.Headers) {
return false
}
for i := 0; i < len(po.Headers); i++ {
if po.Headers[i] != target.Headers[i] {
return false
}
}
return true
}
func loadFromDatabase() {
cluster, err := models.AlertingEngineGetCluster(config.C.Heartbeat.Endpoint)
if err != nil {
logger.Errorf("failed to get current cluster, error: %v", err)
return
}
ckey := "prom." + cluster + ".option"
cval, err := models.ConfigsGet(ckey)
if err != nil {
logger.Errorf("failed to get ckey: %s, error: %v", ckey, err)
return
}
if cval == "" {
Client = nil
return
}
var po PromOption
err = json.Unmarshal([]byte(cval), &po)
if err != nil {
logger.Errorf("failed to unmarshal PromOption: %s", err)
return
}
if Client == nil || !LocalPromOption.Equal(po) {
cli, err := api.NewClient(api.Config{
Address: po.Url,
RoundTripper: &http.Transport{
// TLSClientConfig: tlsConfig,
Proxy: http.ProxyFromEnvironment,
DialContext: (&net.Dialer{
Timeout: time.Duration(po.DialTimeout) * time.Millisecond,
}).DialContext,
ResponseHeaderTimeout: time.Duration(po.Timeout) * time.Millisecond,
MaxIdleConnsPerHost: po.MaxIdleConnsPerHost,
},
})
if err != nil {
logger.Errorf("failed to NewPromClient: %v", err)
return
}
Client = prom.NewAPI(cli, prom.ClientOptions{
BasicAuthUser: po.User,
BasicAuthPass: po.Pass,
Headers: po.Headers,
})
}
}

View File

@ -269,7 +269,10 @@ func datadogSeries(c *gin.Context) {
}
if succ > 0 {
promstat.CounterSampleTotal.WithLabelValues(config.C.ClusterName, "datadog").Add(float64(succ))
cn := config.ReaderClient.GetClusterName()
if cn != "" {
promstat.CounterSampleTotal.WithLabelValues(cn, "datadog").Add(float64(succ))
}
idents.Idents.MSet(ids)
}

View File

@ -63,7 +63,11 @@ func pushEventToQueue(c *gin.Context) {
event.NotifyChannels = strings.Join(event.NotifyChannelsJSON, " ")
event.NotifyGroups = strings.Join(event.NotifyGroupsJSON, " ")
promstat.CounterAlertsTotal.WithLabelValues(config.C.ClusterName).Inc()
cn := config.ReaderClient.GetClusterName()
if cn != "" {
promstat.CounterAlertsTotal.WithLabelValues(cn).Inc()
}
engine.LogEvent(event, "http_push_queue")
if !engine.EventQueue.PushFront(event) {
msg := fmt.Sprintf("event:%+v push_queue err: queue is full", event)
@ -77,6 +81,7 @@ type eventForm struct {
Alert bool `json:"alert"`
Vectors []conv.Vector `json:"vectors"`
RuleId int64 `json:"rule_id"`
Cluster string `json:"cluster"`
}
func judgeEvent(c *gin.Context) {
@ -86,7 +91,7 @@ func judgeEvent(c *gin.Context) {
if !exists {
ginx.Bomb(200, "rule not exists")
}
re.Judge(form.Vectors)
re.Judge(form.Cluster, form.Vectors)
ginx.NewRender(c).Message(nil)
}
@ -102,7 +107,7 @@ func makeEvent(c *gin.Context) {
}
if events[i].Alert {
go re.MakeNewEvent("http", now, events[i].Vectors)
go re.MakeNewEvent("http", now, events[i].Cluster, events[i].Vectors)
} else {
for _, vector := range events[i].Vectors {
hash := str.MD5(fmt.Sprintf("%d_%s", events[i].RuleId, vector.Key))

View File

@ -214,7 +214,11 @@ func falconPush(c *gin.Context) {
}
if succ > 0 {
promstat.CounterSampleTotal.WithLabelValues(config.C.ClusterName, "openfalcon").Add(float64(succ))
cn := config.ReaderClient.GetClusterName()
if cn != "" {
promstat.CounterSampleTotal.WithLabelValues(cn, "openfalcon").Add(float64(succ))
}
idents.Idents.MSet(ids)
}

View File

@ -208,7 +208,10 @@ func handleOpenTSDB(c *gin.Context) {
}
if succ > 0 {
promstat.CounterSampleTotal.WithLabelValues(config.C.ClusterName, "opentsdb").Add(float64(succ))
cn := config.ReaderClient.GetClusterName()
if cn != "" {
promstat.CounterSampleTotal.WithLabelValues(cn, "opentsdb").Add(float64(succ))
}
idents.Idents.MSet(ids)
}

View File

@ -17,7 +17,6 @@ import (
"github.com/didi/nightingale/v5/src/server/config"
"github.com/didi/nightingale/v5/src/server/idents"
"github.com/didi/nightingale/v5/src/server/memsto"
"github.com/didi/nightingale/v5/src/server/reader"
promstat "github.com/didi/nightingale/v5/src/server/stat"
"github.com/didi/nightingale/v5/src/server/writer"
)
@ -38,12 +37,12 @@ func queryPromql(c *gin.Context) {
var f promqlForm
ginx.BindJSON(c, &f)
if reader.Client == nil {
c.String(500, "reader.Client is nil")
if config.ReaderClient.IsNil() {
c.String(500, "reader client is nil")
return
}
value, warnings, err := reader.Client.Query(c.Request.Context(), f.PromQL, time.Now())
value, warnings, err := config.ReaderClient.GetCli().Query(c.Request.Context(), f.PromQL, time.Now())
if err != nil {
c.String(500, "promql:%s error:%v", f.PromQL, err)
return
@ -147,7 +146,11 @@ func remoteWrite(c *gin.Context) {
writer.Writers.PushSample(metric, req.Timeseries[i])
}
promstat.CounterSampleTotal.WithLabelValues(config.C.ClusterName, "prometheus").Add(float64(count))
cn := config.ReaderClient.GetClusterName()
if cn != "" {
promstat.CounterSampleTotal.WithLabelValues(cn, "prometheus").Add(float64(count))
}
idents.Idents.MSet(ids)
}

View File

@ -18,7 +18,6 @@ import (
"github.com/didi/nightingale/v5/src/server/idents"
"github.com/didi/nightingale/v5/src/server/memsto"
"github.com/didi/nightingale/v5/src/server/naming"
"github.com/didi/nightingale/v5/src/server/reader"
"github.com/didi/nightingale/v5/src/server/router"
"github.com/didi/nightingale/v5/src/server/stat"
"github.com/didi/nightingale/v5/src/server/usage"
@ -125,7 +124,7 @@ func (s Server) initialize() (func(), error) {
}
// init prometheus remote reader
if err = reader.Init(); err != nil {
if err = config.InitReader(); err != nil {
return fns.Ret(), err
}

View File

@ -2,7 +2,6 @@ package usage
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io/ioutil"
@ -12,8 +11,6 @@ import (
"github.com/didi/nightingale/v5/src/models"
"github.com/didi/nightingale/v5/src/pkg/version"
"github.com/didi/nightingale/v5/src/server/common/conv"
"github.com/didi/nightingale/v5/src/server/reader"
)
const (
@ -29,28 +26,6 @@ type Usage struct {
Version string `json:"version"`
}
func getSamples() (float64, error) {
if reader.Client == nil {
return 0, fmt.Errorf("reader.Client is nil")
}
value, warns, err := reader.Client.Query(context.Background(), request, time.Now())
if err != nil {
return 0, err
}
if len(warns) > 0 {
return 0, fmt.Errorf("occur some warnings: %v", warns)
}
lst := conv.ConvertVectors(value)
if len(lst) == 0 {
return 0, fmt.Errorf("convert result is empty")
}
return lst[0].Value, nil
}
func Report() {
for {
time.Sleep(time.Minute * 10)
@ -59,7 +34,6 @@ func Report() {
}
func report() {
// sps, _ := getSamples()
tnum, err := models.TargetTotalCount()
if err != nil {
return

View File

@ -49,7 +49,10 @@ func (w WriterType) Write(index int, items []*prompb.TimeSeries, headers ...map[
start := time.Now()
defer func() {
promstat.ForwardDuration.WithLabelValues(config.C.ClusterName, fmt.Sprint(index)).Observe(time.Since(start).Seconds())
cn := config.ReaderClient.GetClusterName()
if cn != "" {
promstat.ForwardDuration.WithLabelValues(cn, fmt.Sprint(index)).Observe(time.Since(start).Seconds())
}
}()
req := &prompb.WriteRequest{
@ -240,11 +243,16 @@ func Init(opts []config.WriterOptions, globalOpt config.WriterGlobalOpt) error {
}
func reportChanSize() {
clusterName := config.ReaderClient.GetClusterName()
if clusterName == "" {
return
}
for {
time.Sleep(time.Second * 3)
for i, c := range Writers.chans {
size := len(c)
promstat.GaugeSampleQueueSize.WithLabelValues(config.C.ClusterName, fmt.Sprint(i)).Set(float64(size))
promstat.GaugeSampleQueueSize.WithLabelValues(clusterName, fmt.Sprint(i)).Set(float64(size))
}
}
}