diff --git a/docker/initsql/a-n9e.sql b/docker/initsql/a-n9e.sql index 3ad36cb6..7dfd8b7b 100644 --- a/docker/initsql/a-n9e.sql +++ b/docker/initsql/a-n9e.sql @@ -52,7 +52,7 @@ insert into user_group_member(group_id, user_id) values(1, 1); CREATE TABLE `configs` ( `id` bigint unsigned not null auto_increment, `ckey` varchar(191) not null, - `cval` varchar(1024) not null default '', + `cval` varchar(4096) not null default '', PRIMARY KEY (`id`), UNIQUE KEY (`ckey`) ) ENGINE = InnoDB DEFAULT CHARSET = utf8mb4; @@ -500,3 +500,13 @@ CREATE TABLE `task_record` KEY (`create_at`, `group_id`), KEY (`create_by`) ) ENGINE = InnoDB DEFAULT CHARSET = utf8mb4; + +CREATE TABLE `alerting_engines` +( + `id` int unsigned NOT NULL AUTO_INCREMENT, + `instance` varchar(128) not null default '' comment 'instance identification, e.g. 10.9.0.9:9090', + `cluster` varchar(128) not null default '' comment 'target reader cluster', + `clock` bigint not null, + PRIMARY KEY (`id`), + UNIQUE KEY (`instance`) +) ENGINE = InnoDB DEFAULT CHARSET = utf8mb4; diff --git a/etc/server.conf b/etc/server.conf index 30f4020d..d6bdd015 100644 --- a/etc/server.conf +++ b/etc/server.conf @@ -13,6 +13,9 @@ EngineDelay = 120 DisableUsageReport = false +# config | database +ReaderFrom = "config" + [Log] # log write dir Dir = "logs" @@ -155,15 +158,8 @@ BasicAuthUser = "" BasicAuthPass = "" # timeout settings, unit: ms Timeout = 30000 -DialTimeout = 10000 -TLSHandshakeTimeout = 30000 -ExpectContinueTimeout = 1000 -IdleConnTimeout = 90000 -# time duration, unit: ms -KeepAlive = 30000 -MaxConnsPerHost = 0 -MaxIdleConns = 100 -MaxIdleConnsPerHost = 10 +DialTimeout = 3000 +MaxIdleConnsPerHost = 100 [WriterOpt] # queue channel count diff --git a/src/models/alert_mute.go b/src/models/alert_mute.go index c81cf985..6589bab4 100644 --- a/src/models/alert_mute.go +++ b/src/models/alert_mute.go @@ -71,7 +71,7 @@ func (m *AlertMute) Verify() error { } if m.Etime <= m.Btime { - return fmt.Errorf("Oops... etime(%d) <= btime(%d)", m.Etime, m.Btime) + return fmt.Errorf("oops... etime(%d) <= btime(%d)", m.Etime, m.Btime) } if err := m.Parse(); err != nil { diff --git a/src/models/alerting_engine.go b/src/models/alerting_engine.go new file mode 100644 index 00000000..5ff99c35 --- /dev/null +++ b/src/models/alerting_engine.go @@ -0,0 +1,81 @@ +package models + +import "time" + +type AlertingEngines struct { + Id int64 `json:"id" gorm:"primaryKey"` + Instance string `json:"instance"` + Cluster string `json:"cluster"` // reader cluster + Clock int64 `json:"clock"` +} + +func (e *AlertingEngines) TableName() string { + return "alerting_engines" +} + +// UpdateCluster 页面上用户会给各个n9e-server分配要关联的目标集群是什么 +func (e *AlertingEngines) UpdateCluster(c string) error { + e.Cluster = c + return DB().Model(e).Select("cluster").Updates(e).Error +} + +// AlertingEngineGetCluster 根据实例名获取对应的集群名字 +func AlertingEngineGetCluster(instance string) (string, error) { + var objs []AlertingEngines + err := DB().Where("instance=?", instance).Find(&objs).Error + if err != nil { + return "", err + } + + if len(objs) == 0 { + return "", nil + } + + return objs[0].Cluster, nil +} + +// AlertingEngineGets 拉取列表数据,用户要在页面上看到所有 n9e-server 实例列表,然后为其分配 cluster +func AlertingEngineGets(where string, args ...interface{}) ([]*AlertingEngines, error) { + var objs []*AlertingEngines + var err error + session := DB().Order("instance") + if where == "" { + err = session.Find(&objs).Error + } else { + err = session.Where(where, args...).Find(&objs).Error + } + return objs, err +} + +func AlertingEngineGetsInstances(where string, args ...interface{}) ([]string, error) { + var arr []string + var err error + session := DB().Model(new(AlertingEngines)).Order("instance") + if where == "" { + err = session.Pluck("instance", &arr).Error + } else { + err = session.Where(where, args...).Pluck("instance", &arr).Error + } + return arr, err +} + +func AlertingEngineHeartbeat(instance string) error { + var total int64 + err := DB().Model(new(AlertingEngines)).Where("instance=?", instance).Count(&total).Error + if err != nil { + return err + } + + if total == 0 { + // insert + err = DB().Create(&AlertingEngines{ + Instance: instance, + Clock: time.Now().Unix(), + }).Error + } else { + // update + err = DB().Model(new(AlertingEngines)).Where("instance=?", instance).Update("clock", time.Now().Unix()).Error + } + + return err +} diff --git a/src/server/config/config.go b/src/server/config/config.go index 9bca30f4..423f2811 100644 --- a/src/server/config/config.go +++ b/src/server/config/config.go @@ -70,6 +70,10 @@ func MustLoad(fpaths ...string) { C.EngineDelay = 120 } + if C.ReaderFrom == "" { + C.ReaderFrom = "config" + } + if C.Heartbeat.IP == "" { // auto detect // C.Heartbeat.IP = fmt.Sprint(GetOutboundIP()) @@ -187,6 +191,7 @@ type Config struct { AnomalyDataApi []string EngineDelay int64 DisableUsageReport bool + ReaderFrom string Log logx.Config HTTP httpx.Config BasicAuth gin.Accounts @@ -207,15 +212,9 @@ type ReaderOptions struct { BasicAuthUser string BasicAuthPass string - Timeout int64 - DialTimeout int64 - TLSHandshakeTimeout int64 - ExpectContinueTimeout int64 - IdleConnTimeout int64 - KeepAlive int64 + Timeout int64 + DialTimeout int64 - MaxConnsPerHost int - MaxIdleConns int MaxIdleConnsPerHost int Headers []string diff --git a/src/server/engine/worker.go b/src/server/engine/worker.go index 915b4a23..f9ebd2cd 100644 --- a/src/server/engine/worker.go +++ b/src/server/engine/worker.go @@ -3,6 +3,7 @@ package engine import ( "context" "fmt" + "log" "sort" "strings" "sync" @@ -166,6 +167,11 @@ func (r *RuleEval) Work() { return } + if reader.Client == nil { + logger.Error("reader.Client is nil") + return + } + var value model.Value var err error if r.rule.Algorithm == "" { @@ -636,6 +642,11 @@ func (r RecordingRuleEval) Work() { return } + if reader.Client == nil { + log.Println("reader.Client is nil") + return + } + value, warnings, err := reader.Client.Query(context.Background(), promql, time.Now()) if err != nil { logger.Errorf("recording_rule_eval:%d promql:%s, error:%v", r.RuleID(), promql, err) diff --git a/src/server/naming/heartbeat.go b/src/server/naming/heartbeat.go index 949cd24a..aaaa6a1e 100644 --- a/src/server/naming/heartbeat.go +++ b/src/server/naming/heartbeat.go @@ -4,57 +4,45 @@ import ( "context" "fmt" "sort" - "strconv" "strings" "time" "github.com/toolkits/pkg/logger" + "github.com/didi/nightingale/v5/src/models" "github.com/didi/nightingale/v5/src/server/config" - "github.com/didi/nightingale/v5/src/storage" ) // local servers var localss string func Heartbeat(ctx context.Context) error { - if err := heartbeat(ctx); err != nil { + if err := heartbeat(); err != nil { fmt.Println("failed to heartbeat:", err) return err } - go loopHeartbeat(ctx) + go loopHeartbeat() return nil } -func loopHeartbeat(ctx context.Context) { +func loopHeartbeat() { interval := time.Duration(config.C.Heartbeat.Interval) * time.Millisecond for { time.Sleep(interval) - if err := heartbeat(ctx); err != nil { + if err := heartbeat(); err != nil { logger.Warning(err) } } } -// hash struct: -// /server/heartbeat/Default -> { -// 10.2.3.4:19000 => $timestamp -// 10.2.3.5:19000 => $timestamp -// } -func redisKey(cluster string) string { - return fmt.Sprintf("/server/heartbeat/%s", cluster) -} - -func heartbeat(ctx context.Context) error { - now := time.Now().Unix() - key := redisKey(config.C.ClusterName) - err := storage.Redis.HSet(ctx, key, config.C.Heartbeat.Endpoint, now).Err() +func heartbeat() error { + err := models.AlertingEngineHeartbeat(config.C.Heartbeat.Endpoint) if err != nil { return err } - servers, err := ActiveServers(ctx, config.C.ClusterName) + servers, err := ActiveServers() if err != nil { return err } @@ -69,37 +57,12 @@ func heartbeat(ctx context.Context) error { return nil } -func clearDeadServer(ctx context.Context, cluster, endpoint string) { - key := redisKey(cluster) - err := storage.Redis.HDel(ctx, key, endpoint).Err() - if err != nil { - logger.Warningf("failed to hdel %s %s, error: %v", key, endpoint, err) - } -} - -func ActiveServers(ctx context.Context, cluster string) ([]string, error) { - ret, err := storage.Redis.HGetAll(ctx, redisKey(cluster)).Result() +func ActiveServers() ([]string, error) { + cluster, err := models.AlertingEngineGetCluster(config.C.Heartbeat.Endpoint) if err != nil { return nil, err } - now := time.Now().Unix() - dur := int64(20) - - actives := make([]string, 0, len(ret)) - for endpoint, clockstr := range ret { - clock, err := strconv.ParseInt(clockstr, 10, 64) - if err != nil { - continue - } - - if now-clock > dur { - clearDeadServer(ctx, cluster, endpoint) - continue - } - - actives = append(actives, endpoint) - } - - return actives, nil + // 30秒内有心跳,就认为是活的 + return models.AlertingEngineGetsInstances("cluster = ? and clock > ?", cluster, time.Now().Unix()-30) } diff --git a/src/server/naming/leader.go b/src/server/naming/leader.go index 0909be55..b154e827 100644 --- a/src/server/naming/leader.go +++ b/src/server/naming/leader.go @@ -1,7 +1,6 @@ package naming import ( - "context" "sort" "github.com/didi/nightingale/v5/src/server/config" @@ -9,7 +8,7 @@ import ( ) func IamLeader() (bool, error) { - servers, err := ActiveServers(context.Background(), config.C.ClusterName) + servers, err := ActiveServers() if err != nil { logger.Errorf("failed to get active servers: %v", err) return false, err diff --git a/src/server/reader/reader.go b/src/server/reader/reader.go index 43a5e907..5377d763 100644 --- a/src/server/reader/reader.go +++ b/src/server/reader/reader.go @@ -1,34 +1,48 @@ package reader import ( + "encoding/json" + "fmt" "net" "net/http" + "strings" "time" + "github.com/didi/nightingale/v5/src/models" "github.com/didi/nightingale/v5/src/pkg/prom" "github.com/didi/nightingale/v5/src/server/config" "github.com/prometheus/client_golang/api" + "github.com/toolkits/pkg/logger" ) var Client prom.API +var LocalPromOption PromOption -func Init(opts config.ReaderOptions) error { +func Init() error { + rf := strings.ToLower(strings.TrimSpace(config.C.ReaderFrom)) + if rf == "" || rf == "config" { + return initFromConfig() + } + + if rf == "database" { + return initFromDatabase() + } + + return fmt.Errorf("invalid configuration ReaderFrom: %s", rf) +} + +func initFromConfig() error { + opts := config.C.Reader cli, err := api.NewClient(api.Config{ Address: opts.Url, RoundTripper: &http.Transport{ // TLSClientConfig: tlsConfig, Proxy: http.ProxyFromEnvironment, DialContext: (&net.Dialer{ - Timeout: time.Duration(opts.DialTimeout) * time.Millisecond, - KeepAlive: time.Duration(opts.KeepAlive) * time.Millisecond, + Timeout: time.Duration(opts.DialTimeout) * time.Millisecond, }).DialContext, ResponseHeaderTimeout: time.Duration(opts.Timeout) * time.Millisecond, - TLSHandshakeTimeout: time.Duration(opts.TLSHandshakeTimeout) * time.Millisecond, - ExpectContinueTimeout: time.Duration(opts.ExpectContinueTimeout) * time.Millisecond, - MaxConnsPerHost: opts.MaxConnsPerHost, - MaxIdleConns: opts.MaxIdleConns, MaxIdleConnsPerHost: opts.MaxIdleConnsPerHost, - IdleConnTimeout: time.Duration(opts.IdleConnTimeout) * time.Millisecond, }, }) @@ -44,3 +58,114 @@ func Init(opts config.ReaderOptions) error { return nil } + +func initFromDatabase() error { + go func() { + for { + loadFromDatabase() + time.Sleep(time.Second) + } + }() + return nil +} + +type PromOption struct { + Url string + User string + Pass string + Headers []string + Timeout int64 + DialTimeout int64 + MaxIdleConnsPerHost int +} + +func (po *PromOption) Equal(target PromOption) bool { + if po.Url != target.Url { + return false + } + + if po.User != target.User { + return false + } + + if po.Pass != target.Pass { + return false + } + + if po.Timeout != target.Timeout { + return false + } + + if po.DialTimeout != target.DialTimeout { + return false + } + + if po.MaxIdleConnsPerHost != target.MaxIdleConnsPerHost { + return false + } + + if len(po.Headers) != len(target.Headers) { + return false + } + + for i := 0; i < len(po.Headers); i++ { + if po.Headers[i] != target.Headers[i] { + return false + } + } + + return true +} + +func loadFromDatabase() { + cluster, err := models.AlertingEngineGetCluster(config.C.Heartbeat.Endpoint) + if err != nil { + logger.Errorf("failed to get current cluster, error: %v", err) + return + } + + ckey := "prom." + cluster + ".option" + cval, err := models.ConfigsGet(ckey) + if err != nil { + logger.Errorf("failed to get ckey: %s, error: %v", ckey, err) + return + } + + if cval == "" { + Client = nil + return + } + + var po PromOption + err = json.Unmarshal([]byte(cval), &po) + if err != nil { + logger.Errorf("failed to unmarshal PromOption: %s", err) + return + } + + if Client == nil || !LocalPromOption.Equal(po) { + cli, err := api.NewClient(api.Config{ + Address: po.Url, + RoundTripper: &http.Transport{ + // TLSClientConfig: tlsConfig, + Proxy: http.ProxyFromEnvironment, + DialContext: (&net.Dialer{ + Timeout: time.Duration(po.DialTimeout) * time.Millisecond, + }).DialContext, + ResponseHeaderTimeout: time.Duration(po.Timeout) * time.Millisecond, + MaxIdleConnsPerHost: po.MaxIdleConnsPerHost, + }, + }) + + if err != nil { + logger.Errorf("failed to NewPromClient: %v", err) + return + } + + Client = prom.NewAPI(cli, prom.ClientOptions{ + BasicAuthUser: po.User, + BasicAuthPass: po.Pass, + Headers: po.Headers, + }) + } +} diff --git a/src/server/router/router.go b/src/server/router/router.go index 8f9ea7d7..221e7cad 100644 --- a/src/server/router/router.go +++ b/src/server/router/router.go @@ -69,7 +69,7 @@ func configRoute(r *gin.Engine, version string, reloadFunc func()) { }) r.GET("/servers/active", func(c *gin.Context) { - lst, err := naming.ActiveServers(c.Request.Context(), config.C.ClusterName) + lst, err := naming.ActiveServers() ginx.NewRender(c).Data(lst, err) }) diff --git a/src/server/router/router_prom.go b/src/server/router/router_prom.go index 91e4e416..7d2b57dd 100644 --- a/src/server/router/router_prom.go +++ b/src/server/router/router_prom.go @@ -38,6 +38,11 @@ func queryPromql(c *gin.Context) { var f promqlForm ginx.BindJSON(c, &f) + if reader.Client == nil { + c.String(500, "reader.Client is nil") + return + } + value, warnings, err := reader.Client.Query(c.Request.Context(), f.PromQL, time.Now()) if err != nil { c.String(500, "promql:%s error:%v", f.PromQL, err) diff --git a/src/server/server.go b/src/server/server.go index d36d208e..32035865 100644 --- a/src/server/server.go +++ b/src/server/server.go @@ -125,7 +125,7 @@ func (s Server) initialize() (func(), error) { } // init prometheus remote reader - if err = reader.Init(config.C.Reader); err != nil { + if err = reader.Init(); err != nil { return fns.Ret(), err } diff --git a/src/server/usage/usage.go b/src/server/usage/usage.go index 1edecaed..860c82bc 100644 --- a/src/server/usage/usage.go +++ b/src/server/usage/usage.go @@ -30,6 +30,10 @@ type Usage struct { } func getSamples() (float64, error) { + if reader.Client == nil { + return 0, fmt.Errorf("reader.Client is nil") + } + value, warns, err := reader.Client.Query(context.Background(), request, time.Now()) if err != nil { return 0, err @@ -55,10 +59,7 @@ func Report() { } func report() { - sps, err := getSamples() - if err != nil { - return - } + sps, _ := getSamples() hostname, err := os.Hostname() if err != nil { diff --git a/src/webapi/config/config.go b/src/webapi/config/config.go index 7ff8d7ad..3ad93805 100644 --- a/src/webapi/config/config.go +++ b/src/webapi/config/config.go @@ -113,7 +113,6 @@ type ClusterOptions struct { Timeout int64 DialTimeout int64 - KeepAlive int64 UseTLS bool tls.ClientConfig diff --git a/src/webapi/prom/prom.go b/src/webapi/prom/prom.go index 2ad98e76..b06da7b6 100644 --- a/src/webapi/prom/prom.go +++ b/src/webapi/prom/prom.go @@ -11,6 +11,7 @@ import ( "sync" "time" + "github.com/didi/nightingale/v5/src/models" "github.com/didi/nightingale/v5/src/pkg/prom" "github.com/didi/nightingale/v5/src/webapi/config" "github.com/prometheus/client_golang/api" @@ -29,10 +30,44 @@ type ClustersType struct { mutex *sync.RWMutex } +type PromOption struct { + Url string + User string + Pass string + Headers []string + Timeout int64 + DialTimeout int64 + MaxIdleConnsPerHost int +} + func (cs *ClustersType) Put(name string, cluster *ClusterType) { cs.mutex.Lock() + defer cs.mutex.Unlock() + cs.datas[name] = cluster - cs.mutex.Unlock() + + // 把配置信息写入DB一份,这样n9e-server就可以直接从DB读取了 + po := PromOption{ + Url: cluster.Opts.Prom, + User: cluster.Opts.BasicAuthUser, + Pass: cluster.Opts.BasicAuthPass, + Headers: cluster.Opts.Headers, + Timeout: cluster.Opts.Timeout, + DialTimeout: cluster.Opts.DialTimeout, + MaxIdleConnsPerHost: cluster.Opts.MaxIdleConnsPerHost, + } + + bs, err := json.Marshal(po) + if err != nil { + logger.Fatal("failed to marshal PromOption:", err) + return + } + + key := "prom." + name + ".option" + err = models.ConfigsSet(key, string(bs)) + if err != nil { + logger.Fatal("failed to set PromOption ", key, " to database, error: ", err) + } } func (cs *ClustersType) Get(name string) (*ClusterType, bool) {