read prom url from database (#1119)

* add model alerting_engine

* heartbeat using db

* reader.Client from database

* fix sql
This commit is contained in:
ulricqin 2022-08-17 17:20:42 +08:00 committed by GitHub
parent b92e4abf86
commit b4ddd03691
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 310 additions and 86 deletions

View File

@ -52,7 +52,7 @@ insert into user_group_member(group_id, user_id) values(1, 1);
CREATE TABLE `configs` ( CREATE TABLE `configs` (
`id` bigint unsigned not null auto_increment, `id` bigint unsigned not null auto_increment,
`ckey` varchar(191) not null, `ckey` varchar(191) not null,
`cval` varchar(1024) not null default '', `cval` varchar(4096) not null default '',
PRIMARY KEY (`id`), PRIMARY KEY (`id`),
UNIQUE KEY (`ckey`) UNIQUE KEY (`ckey`)
) ENGINE = InnoDB DEFAULT CHARSET = utf8mb4; ) ENGINE = InnoDB DEFAULT CHARSET = utf8mb4;
@ -500,3 +500,13 @@ CREATE TABLE `task_record`
KEY (`create_at`, `group_id`), KEY (`create_at`, `group_id`),
KEY (`create_by`) KEY (`create_by`)
) ENGINE = InnoDB DEFAULT CHARSET = utf8mb4; ) ENGINE = InnoDB DEFAULT CHARSET = utf8mb4;
CREATE TABLE `alerting_engines`
(
`id` int unsigned NOT NULL AUTO_INCREMENT,
`instance` varchar(128) not null default '' comment 'instance identification, e.g. 10.9.0.9:9090',
`cluster` varchar(128) not null default '' comment 'target reader cluster',
`clock` bigint not null,
PRIMARY KEY (`id`),
UNIQUE KEY (`instance`)
) ENGINE = InnoDB DEFAULT CHARSET = utf8mb4;

View File

@ -13,6 +13,9 @@ EngineDelay = 120
DisableUsageReport = false DisableUsageReport = false
# config | database
ReaderFrom = "config"
[Log] [Log]
# log write dir # log write dir
Dir = "logs" Dir = "logs"
@ -155,15 +158,8 @@ BasicAuthUser = ""
BasicAuthPass = "" BasicAuthPass = ""
# timeout settings, unit: ms # timeout settings, unit: ms
Timeout = 30000 Timeout = 30000
DialTimeout = 10000 DialTimeout = 3000
TLSHandshakeTimeout = 30000 MaxIdleConnsPerHost = 100
ExpectContinueTimeout = 1000
IdleConnTimeout = 90000
# time duration, unit: ms
KeepAlive = 30000
MaxConnsPerHost = 0
MaxIdleConns = 100
MaxIdleConnsPerHost = 10
[WriterOpt] [WriterOpt]
# queue channel count # queue channel count

View File

@ -71,7 +71,7 @@ func (m *AlertMute) Verify() error {
} }
if m.Etime <= m.Btime { if m.Etime <= m.Btime {
return fmt.Errorf("Oops... etime(%d) <= btime(%d)", m.Etime, m.Btime) return fmt.Errorf("oops... etime(%d) <= btime(%d)", m.Etime, m.Btime)
} }
if err := m.Parse(); err != nil { if err := m.Parse(); err != nil {

View File

@ -0,0 +1,81 @@
package models
import "time"
type AlertingEngines struct {
Id int64 `json:"id" gorm:"primaryKey"`
Instance string `json:"instance"`
Cluster string `json:"cluster"` // reader cluster
Clock int64 `json:"clock"`
}
func (e *AlertingEngines) TableName() string {
return "alerting_engines"
}
// UpdateCluster 页面上用户会给各个n9e-server分配要关联的目标集群是什么
func (e *AlertingEngines) UpdateCluster(c string) error {
e.Cluster = c
return DB().Model(e).Select("cluster").Updates(e).Error
}
// AlertingEngineGetCluster 根据实例名获取对应的集群名字
func AlertingEngineGetCluster(instance string) (string, error) {
var objs []AlertingEngines
err := DB().Where("instance=?", instance).Find(&objs).Error
if err != nil {
return "", err
}
if len(objs) == 0 {
return "", nil
}
return objs[0].Cluster, nil
}
// AlertingEngineGets 拉取列表数据,用户要在页面上看到所有 n9e-server 实例列表,然后为其分配 cluster
func AlertingEngineGets(where string, args ...interface{}) ([]*AlertingEngines, error) {
var objs []*AlertingEngines
var err error
session := DB().Order("instance")
if where == "" {
err = session.Find(&objs).Error
} else {
err = session.Where(where, args...).Find(&objs).Error
}
return objs, err
}
func AlertingEngineGetsInstances(where string, args ...interface{}) ([]string, error) {
var arr []string
var err error
session := DB().Model(new(AlertingEngines)).Order("instance")
if where == "" {
err = session.Pluck("instance", &arr).Error
} else {
err = session.Where(where, args...).Pluck("instance", &arr).Error
}
return arr, err
}
func AlertingEngineHeartbeat(instance string) error {
var total int64
err := DB().Model(new(AlertingEngines)).Where("instance=?", instance).Count(&total).Error
if err != nil {
return err
}
if total == 0 {
// insert
err = DB().Create(&AlertingEngines{
Instance: instance,
Clock: time.Now().Unix(),
}).Error
} else {
// update
err = DB().Model(new(AlertingEngines)).Where("instance=?", instance).Update("clock", time.Now().Unix()).Error
}
return err
}

View File

@ -70,6 +70,10 @@ func MustLoad(fpaths ...string) {
C.EngineDelay = 120 C.EngineDelay = 120
} }
if C.ReaderFrom == "" {
C.ReaderFrom = "config"
}
if C.Heartbeat.IP == "" { if C.Heartbeat.IP == "" {
// auto detect // auto detect
// C.Heartbeat.IP = fmt.Sprint(GetOutboundIP()) // C.Heartbeat.IP = fmt.Sprint(GetOutboundIP())
@ -187,6 +191,7 @@ type Config struct {
AnomalyDataApi []string AnomalyDataApi []string
EngineDelay int64 EngineDelay int64
DisableUsageReport bool DisableUsageReport bool
ReaderFrom string
Log logx.Config Log logx.Config
HTTP httpx.Config HTTP httpx.Config
BasicAuth gin.Accounts BasicAuth gin.Accounts
@ -207,15 +212,9 @@ type ReaderOptions struct {
BasicAuthUser string BasicAuthUser string
BasicAuthPass string BasicAuthPass string
Timeout int64 Timeout int64
DialTimeout int64 DialTimeout int64
TLSHandshakeTimeout int64
ExpectContinueTimeout int64
IdleConnTimeout int64
KeepAlive int64
MaxConnsPerHost int
MaxIdleConns int
MaxIdleConnsPerHost int MaxIdleConnsPerHost int
Headers []string Headers []string

View File

@ -3,6 +3,7 @@ package engine
import ( import (
"context" "context"
"fmt" "fmt"
"log"
"sort" "sort"
"strings" "strings"
"sync" "sync"
@ -166,6 +167,11 @@ func (r *RuleEval) Work() {
return return
} }
if reader.Client == nil {
logger.Error("reader.Client is nil")
return
}
var value model.Value var value model.Value
var err error var err error
if r.rule.Algorithm == "" { if r.rule.Algorithm == "" {
@ -636,6 +642,11 @@ func (r RecordingRuleEval) Work() {
return return
} }
if reader.Client == nil {
log.Println("reader.Client is nil")
return
}
value, warnings, err := reader.Client.Query(context.Background(), promql, time.Now()) value, warnings, err := reader.Client.Query(context.Background(), promql, time.Now())
if err != nil { if err != nil {
logger.Errorf("recording_rule_eval:%d promql:%s, error:%v", r.RuleID(), promql, err) logger.Errorf("recording_rule_eval:%d promql:%s, error:%v", r.RuleID(), promql, err)

View File

@ -4,57 +4,45 @@ import (
"context" "context"
"fmt" "fmt"
"sort" "sort"
"strconv"
"strings" "strings"
"time" "time"
"github.com/toolkits/pkg/logger" "github.com/toolkits/pkg/logger"
"github.com/didi/nightingale/v5/src/models"
"github.com/didi/nightingale/v5/src/server/config" "github.com/didi/nightingale/v5/src/server/config"
"github.com/didi/nightingale/v5/src/storage"
) )
// local servers // local servers
var localss string var localss string
func Heartbeat(ctx context.Context) error { func Heartbeat(ctx context.Context) error {
if err := heartbeat(ctx); err != nil { if err := heartbeat(); err != nil {
fmt.Println("failed to heartbeat:", err) fmt.Println("failed to heartbeat:", err)
return err return err
} }
go loopHeartbeat(ctx) go loopHeartbeat()
return nil return nil
} }
func loopHeartbeat(ctx context.Context) { func loopHeartbeat() {
interval := time.Duration(config.C.Heartbeat.Interval) * time.Millisecond interval := time.Duration(config.C.Heartbeat.Interval) * time.Millisecond
for { for {
time.Sleep(interval) time.Sleep(interval)
if err := heartbeat(ctx); err != nil { if err := heartbeat(); err != nil {
logger.Warning(err) logger.Warning(err)
} }
} }
} }
// hash struct: func heartbeat() error {
// /server/heartbeat/Default -> { err := models.AlertingEngineHeartbeat(config.C.Heartbeat.Endpoint)
// 10.2.3.4:19000 => $timestamp
// 10.2.3.5:19000 => $timestamp
// }
func redisKey(cluster string) string {
return fmt.Sprintf("/server/heartbeat/%s", cluster)
}
func heartbeat(ctx context.Context) error {
now := time.Now().Unix()
key := redisKey(config.C.ClusterName)
err := storage.Redis.HSet(ctx, key, config.C.Heartbeat.Endpoint, now).Err()
if err != nil { if err != nil {
return err return err
} }
servers, err := ActiveServers(ctx, config.C.ClusterName) servers, err := ActiveServers()
if err != nil { if err != nil {
return err return err
} }
@ -69,37 +57,12 @@ func heartbeat(ctx context.Context) error {
return nil return nil
} }
func clearDeadServer(ctx context.Context, cluster, endpoint string) { func ActiveServers() ([]string, error) {
key := redisKey(cluster) cluster, err := models.AlertingEngineGetCluster(config.C.Heartbeat.Endpoint)
err := storage.Redis.HDel(ctx, key, endpoint).Err()
if err != nil {
logger.Warningf("failed to hdel %s %s, error: %v", key, endpoint, err)
}
}
func ActiveServers(ctx context.Context, cluster string) ([]string, error) {
ret, err := storage.Redis.HGetAll(ctx, redisKey(cluster)).Result()
if err != nil { if err != nil {
return nil, err return nil, err
} }
now := time.Now().Unix() // 30秒内有心跳就认为是活的
dur := int64(20) return models.AlertingEngineGetsInstances("cluster = ? and clock > ?", cluster, time.Now().Unix()-30)
actives := make([]string, 0, len(ret))
for endpoint, clockstr := range ret {
clock, err := strconv.ParseInt(clockstr, 10, 64)
if err != nil {
continue
}
if now-clock > dur {
clearDeadServer(ctx, cluster, endpoint)
continue
}
actives = append(actives, endpoint)
}
return actives, nil
} }

View File

@ -1,7 +1,6 @@
package naming package naming
import ( import (
"context"
"sort" "sort"
"github.com/didi/nightingale/v5/src/server/config" "github.com/didi/nightingale/v5/src/server/config"
@ -9,7 +8,7 @@ import (
) )
func IamLeader() (bool, error) { func IamLeader() (bool, error) {
servers, err := ActiveServers(context.Background(), config.C.ClusterName) servers, err := ActiveServers()
if err != nil { if err != nil {
logger.Errorf("failed to get active servers: %v", err) logger.Errorf("failed to get active servers: %v", err)
return false, err return false, err

View File

@ -1,34 +1,48 @@
package reader package reader
import ( import (
"encoding/json"
"fmt"
"net" "net"
"net/http" "net/http"
"strings"
"time" "time"
"github.com/didi/nightingale/v5/src/models"
"github.com/didi/nightingale/v5/src/pkg/prom" "github.com/didi/nightingale/v5/src/pkg/prom"
"github.com/didi/nightingale/v5/src/server/config" "github.com/didi/nightingale/v5/src/server/config"
"github.com/prometheus/client_golang/api" "github.com/prometheus/client_golang/api"
"github.com/toolkits/pkg/logger"
) )
var Client prom.API var Client prom.API
var LocalPromOption PromOption
func Init(opts config.ReaderOptions) error { func Init() error {
rf := strings.ToLower(strings.TrimSpace(config.C.ReaderFrom))
if rf == "" || rf == "config" {
return initFromConfig()
}
if rf == "database" {
return initFromDatabase()
}
return fmt.Errorf("invalid configuration ReaderFrom: %s", rf)
}
func initFromConfig() error {
opts := config.C.Reader
cli, err := api.NewClient(api.Config{ cli, err := api.NewClient(api.Config{
Address: opts.Url, Address: opts.Url,
RoundTripper: &http.Transport{ RoundTripper: &http.Transport{
// TLSClientConfig: tlsConfig, // TLSClientConfig: tlsConfig,
Proxy: http.ProxyFromEnvironment, Proxy: http.ProxyFromEnvironment,
DialContext: (&net.Dialer{ DialContext: (&net.Dialer{
Timeout: time.Duration(opts.DialTimeout) * time.Millisecond, Timeout: time.Duration(opts.DialTimeout) * time.Millisecond,
KeepAlive: time.Duration(opts.KeepAlive) * time.Millisecond,
}).DialContext, }).DialContext,
ResponseHeaderTimeout: time.Duration(opts.Timeout) * time.Millisecond, ResponseHeaderTimeout: time.Duration(opts.Timeout) * time.Millisecond,
TLSHandshakeTimeout: time.Duration(opts.TLSHandshakeTimeout) * time.Millisecond,
ExpectContinueTimeout: time.Duration(opts.ExpectContinueTimeout) * time.Millisecond,
MaxConnsPerHost: opts.MaxConnsPerHost,
MaxIdleConns: opts.MaxIdleConns,
MaxIdleConnsPerHost: opts.MaxIdleConnsPerHost, MaxIdleConnsPerHost: opts.MaxIdleConnsPerHost,
IdleConnTimeout: time.Duration(opts.IdleConnTimeout) * time.Millisecond,
}, },
}) })
@ -44,3 +58,114 @@ func Init(opts config.ReaderOptions) error {
return nil return nil
} }
func initFromDatabase() error {
go func() {
for {
loadFromDatabase()
time.Sleep(time.Second)
}
}()
return nil
}
type PromOption struct {
Url string
User string
Pass string
Headers []string
Timeout int64
DialTimeout int64
MaxIdleConnsPerHost int
}
func (po *PromOption) Equal(target PromOption) bool {
if po.Url != target.Url {
return false
}
if po.User != target.User {
return false
}
if po.Pass != target.Pass {
return false
}
if po.Timeout != target.Timeout {
return false
}
if po.DialTimeout != target.DialTimeout {
return false
}
if po.MaxIdleConnsPerHost != target.MaxIdleConnsPerHost {
return false
}
if len(po.Headers) != len(target.Headers) {
return false
}
for i := 0; i < len(po.Headers); i++ {
if po.Headers[i] != target.Headers[i] {
return false
}
}
return true
}
func loadFromDatabase() {
cluster, err := models.AlertingEngineGetCluster(config.C.Heartbeat.Endpoint)
if err != nil {
logger.Errorf("failed to get current cluster, error: %v", err)
return
}
ckey := "prom." + cluster + ".option"
cval, err := models.ConfigsGet(ckey)
if err != nil {
logger.Errorf("failed to get ckey: %s, error: %v", ckey, err)
return
}
if cval == "" {
Client = nil
return
}
var po PromOption
err = json.Unmarshal([]byte(cval), &po)
if err != nil {
logger.Errorf("failed to unmarshal PromOption: %s", err)
return
}
if Client == nil || !LocalPromOption.Equal(po) {
cli, err := api.NewClient(api.Config{
Address: po.Url,
RoundTripper: &http.Transport{
// TLSClientConfig: tlsConfig,
Proxy: http.ProxyFromEnvironment,
DialContext: (&net.Dialer{
Timeout: time.Duration(po.DialTimeout) * time.Millisecond,
}).DialContext,
ResponseHeaderTimeout: time.Duration(po.Timeout) * time.Millisecond,
MaxIdleConnsPerHost: po.MaxIdleConnsPerHost,
},
})
if err != nil {
logger.Errorf("failed to NewPromClient: %v", err)
return
}
Client = prom.NewAPI(cli, prom.ClientOptions{
BasicAuthUser: po.User,
BasicAuthPass: po.Pass,
Headers: po.Headers,
})
}
}

View File

@ -69,7 +69,7 @@ func configRoute(r *gin.Engine, version string, reloadFunc func()) {
}) })
r.GET("/servers/active", func(c *gin.Context) { r.GET("/servers/active", func(c *gin.Context) {
lst, err := naming.ActiveServers(c.Request.Context(), config.C.ClusterName) lst, err := naming.ActiveServers()
ginx.NewRender(c).Data(lst, err) ginx.NewRender(c).Data(lst, err)
}) })

View File

@ -38,6 +38,11 @@ func queryPromql(c *gin.Context) {
var f promqlForm var f promqlForm
ginx.BindJSON(c, &f) ginx.BindJSON(c, &f)
if reader.Client == nil {
c.String(500, "reader.Client is nil")
return
}
value, warnings, err := reader.Client.Query(c.Request.Context(), f.PromQL, time.Now()) value, warnings, err := reader.Client.Query(c.Request.Context(), f.PromQL, time.Now())
if err != nil { if err != nil {
c.String(500, "promql:%s error:%v", f.PromQL, err) c.String(500, "promql:%s error:%v", f.PromQL, err)

View File

@ -125,7 +125,7 @@ func (s Server) initialize() (func(), error) {
} }
// init prometheus remote reader // init prometheus remote reader
if err = reader.Init(config.C.Reader); err != nil { if err = reader.Init(); err != nil {
return fns.Ret(), err return fns.Ret(), err
} }

View File

@ -30,6 +30,10 @@ type Usage struct {
} }
func getSamples() (float64, error) { func getSamples() (float64, error) {
if reader.Client == nil {
return 0, fmt.Errorf("reader.Client is nil")
}
value, warns, err := reader.Client.Query(context.Background(), request, time.Now()) value, warns, err := reader.Client.Query(context.Background(), request, time.Now())
if err != nil { if err != nil {
return 0, err return 0, err
@ -55,10 +59,7 @@ func Report() {
} }
func report() { func report() {
sps, err := getSamples() sps, _ := getSamples()
if err != nil {
return
}
hostname, err := os.Hostname() hostname, err := os.Hostname()
if err != nil { if err != nil {

View File

@ -113,7 +113,6 @@ type ClusterOptions struct {
Timeout int64 Timeout int64
DialTimeout int64 DialTimeout int64
KeepAlive int64
UseTLS bool UseTLS bool
tls.ClientConfig tls.ClientConfig

View File

@ -11,6 +11,7 @@ import (
"sync" "sync"
"time" "time"
"github.com/didi/nightingale/v5/src/models"
"github.com/didi/nightingale/v5/src/pkg/prom" "github.com/didi/nightingale/v5/src/pkg/prom"
"github.com/didi/nightingale/v5/src/webapi/config" "github.com/didi/nightingale/v5/src/webapi/config"
"github.com/prometheus/client_golang/api" "github.com/prometheus/client_golang/api"
@ -29,10 +30,44 @@ type ClustersType struct {
mutex *sync.RWMutex mutex *sync.RWMutex
} }
type PromOption struct {
Url string
User string
Pass string
Headers []string
Timeout int64
DialTimeout int64
MaxIdleConnsPerHost int
}
func (cs *ClustersType) Put(name string, cluster *ClusterType) { func (cs *ClustersType) Put(name string, cluster *ClusterType) {
cs.mutex.Lock() cs.mutex.Lock()
defer cs.mutex.Unlock()
cs.datas[name] = cluster cs.datas[name] = cluster
cs.mutex.Unlock()
// 把配置信息写入DB一份这样n9e-server就可以直接从DB读取了
po := PromOption{
Url: cluster.Opts.Prom,
User: cluster.Opts.BasicAuthUser,
Pass: cluster.Opts.BasicAuthPass,
Headers: cluster.Opts.Headers,
Timeout: cluster.Opts.Timeout,
DialTimeout: cluster.Opts.DialTimeout,
MaxIdleConnsPerHost: cluster.Opts.MaxIdleConnsPerHost,
}
bs, err := json.Marshal(po)
if err != nil {
logger.Fatal("failed to marshal PromOption:", err)
return
}
key := "prom." + name + ".option"
err = models.ConfigsSet(key, string(bs))
if err != nil {
logger.Fatal("failed to set PromOption ", key, " to database, error: ", err)
}
} }
func (cs *ClustersType) Get(name string) (*ClusterType, bool) { func (cs *ClustersType) Get(name string) (*ClusterType, bool) {