From 2f724075b211389ca56f34de5f930ad296ffec4c Mon Sep 17 00:00:00 2001 From: Ulric Qin Date: Mon, 23 May 2022 13:13:35 +0800 Subject: [PATCH] loop load clusters from api --- src/webapi/prom/prom.go | 191 +++++++++++++++++++++++++++++----------- 1 file changed, 142 insertions(+), 49 deletions(-) diff --git a/src/webapi/prom/prom.go b/src/webapi/prom/prom.go index 06a0c8f5..be5b2df3 100644 --- a/src/webapi/prom/prom.go +++ b/src/webapi/prom/prom.go @@ -1,6 +1,10 @@ package prom import ( + "encoding/json" + "fmt" + "io" + "math/rand" "net" "net/http" "strings" @@ -8,6 +12,8 @@ import ( "time" "github.com/didi/nightingale/v5/src/webapi/config" + "github.com/toolkits/pkg/logger" + "github.com/toolkits/pkg/net/httplib" ) type ClusterType struct { @@ -27,56 +33,10 @@ func (cs *ClustersType) Put(name string, cluster *ClusterType) { } func (cs *ClustersType) Get(name string) (*ClusterType, bool) { - cf := strings.ToLower(strings.TrimSpace(config.C.ClustersFrom)) - cs.mutex.RLock() c, has := cs.datas[name] cs.mutex.RUnlock() - if has { - return c, true - } - - if cf == "" || cf == "config" { - return nil, false - } - - // read from api - if cf == "api" { - return cs.GetFromAPI(name) - } - - return nil, false -} - -func (cs *ClustersType) GetFromAPI(name string) (*ClusterType, bool) { - // get from api, parse body - // 1. not found? return nil, false - // 2. found? new ClusterType, put, return - opt := config.ClusterOptions{ - Name: name, - Prom: "", - BasicAuthUser: "", - BasicAuthPass: "", - Timeout: 60000, - DialTimeout: 5000, - MaxIdleConnsPerHost: 32, - } - - cluster := &ClusterType{ - Opts: opt, - Transport: &http.Transport{ - // TLSClientConfig: tlsConfig, - Proxy: http.ProxyFromEnvironment, - DialContext: (&net.Dialer{ - Timeout: time.Duration(opt.DialTimeout) * time.Millisecond, - }).DialContext, - ResponseHeaderTimeout: time.Duration(opt.Timeout) * time.Millisecond, - MaxIdleConnsPerHost: opt.MaxIdleConnsPerHost, - }, - } - - cs.Put(opt.Name, cluster) - return cluster, true + return c, has } var Clusters = ClustersType{ @@ -85,10 +45,19 @@ var Clusters = ClustersType{ } func Init() error { - if config.C.ClustersFrom != "" && config.C.ClustersFrom != "config" { - return nil + cf := strings.ToLower(strings.TrimSpace(config.C.ClustersFrom)) + if cf == "" || cf == "config" { + return initClustersFromConfig() } + if cf == "api" { + return initClustersFromAPI() + } + + return fmt.Errorf("invalid configuration ClustersFrom: %s", cf) +} + +func initClustersFromConfig() error { opts := config.C.Clusters for i := 0; i < len(opts); i++ { @@ -109,3 +78,127 @@ func Init() error { return nil } + +type DSReply struct { + RequestID string `json:"request_id"` + Data struct { + P int `json:"p"` + Limit int `json:"limit"` + Total int `json:"total"` + Items []struct { + ID int `json:"id"` + PluginID int `json:"plugin_id"` + Name string `json:"name"` + Status string `json:"status"` + Category string `json:"category"` + PluginType string `json:"plugin_type"` + PluginTypeName string `json:"plugin_type_name"` + Settings struct { + PrometheusAddr string `json:"prometheus.addr"` + PrometheusBasic struct { + PrometheusUser string `json:"promethues.user"` + PrometheusPass string `json:"promethues.password"` + } `json:"promethues.basic"` + PrometheusTimeout int64 `json:"prometheus.timeout"` + } `json:"settings,omitempty"` + CreatedAt int `json:"created_at"` + UpdatedAt int `json:"updated_at"` + LastTime int `json:"last_time"` + } `json:"items"` + } `json:"data"` +} + +func initClustersFromAPI() error { + go func() { + for { + loadClustersFromAPI() + time.Sleep(time.Second * 3) + } + }() + return nil +} + +func loadClustersFromAPI() { + urls := config.C.ClustersFromAPIs + if len(urls) == 0 { + logger.Error("configuration(ClustersFromAPIs) empty") + return + } + + var reply DSReply + + count := len(urls) + for _, i := range rand.Perm(count) { + url := urls[i] + + res, err := httplib.Get(url).SetTimeout(time.Duration(3000) * time.Millisecond).Response() + if err != nil { + logger.Errorf("curl %s fail: %v", url, err) + continue + } + + if res.StatusCode != 200 { + logger.Errorf("curl %s fail, status code: %d", url, res.StatusCode) + continue + } + + defer res.Body.Close() + + jsonBytes, err := io.ReadAll(res.Body) + if err != nil { + logger.Errorf("read response body of %s fail: %v", url, err) + continue + } + + err = json.Unmarshal(jsonBytes, &reply) + if err != nil { + logger.Errorf("unmarshal response body of %s fail: %v", url, err) + continue + } + + break + } + + for _, item := range reply.Data.Items { + if item.Settings.PrometheusAddr == "" { + continue + } + + if item.Name == "" { + continue + } + + old, has := Clusters.Get(item.Name) + if !has || + old.Opts.BasicAuthUser != item.Settings.PrometheusBasic.PrometheusUser || + old.Opts.BasicAuthPass != item.Settings.PrometheusBasic.PrometheusPass || + old.Opts.Timeout != item.Settings.PrometheusTimeout || + old.Opts.Prom != item.Settings.PrometheusAddr { + opt := config.ClusterOptions{ + Name: item.Name, + Prom: item.Settings.PrometheusAddr, + BasicAuthUser: item.Settings.PrometheusBasic.PrometheusUser, + BasicAuthPass: item.Settings.PrometheusBasic.PrometheusPass, + Timeout: item.Settings.PrometheusTimeout, + DialTimeout: 5000, + MaxIdleConnsPerHost: 32, + } + + cluster := &ClusterType{ + Opts: opt, + Transport: &http.Transport{ + // TLSClientConfig: tlsConfig, + Proxy: http.ProxyFromEnvironment, + DialContext: (&net.Dialer{ + Timeout: time.Duration(opt.DialTimeout) * time.Millisecond, + }).DialContext, + ResponseHeaderTimeout: time.Duration(opt.Timeout) * time.Millisecond, + MaxIdleConnsPerHost: opt.MaxIdleConnsPerHost, + }, + } + + Clusters.Put(item.Name, cluster) + continue + } + } +}