From f81888cd8ab60384a57a389c588753116037397e Mon Sep 17 00:00:00 2001 From: Ulric Qin Date: Sun, 22 May 2022 16:56:58 +0800 Subject: [PATCH 1/5] get prometheus info from api. code skelton --- src/webapi/prom/prom.go | 62 +++++++++++++++++++++++++++++++++++------ 1 file changed, 54 insertions(+), 8 deletions(-) diff --git a/src/webapi/prom/prom.go b/src/webapi/prom/prom.go index 849a6b9c..9eab3e5b 100644 --- a/src/webapi/prom/prom.go +++ b/src/webapi/prom/prom.go @@ -3,6 +3,7 @@ package prom import ( "net" "net/http" + "strings" "sync" "time" @@ -15,26 +16,71 @@ type ClusterType struct { } type ClustersType struct { - datas map[string]ClusterType + datas map[string]*ClusterType mutex *sync.RWMutex } -func (cs *ClustersType) Put(name string, cluster ClusterType) { +func (cs *ClustersType) Put(name string, cluster *ClusterType) { cs.mutex.Lock() cs.datas[name] = cluster cs.mutex.Unlock() } -func (cs *ClustersType) Get(name string) (ClusterType, bool) { - cs.mutex.RLock() - defer cs.mutex.RUnlock() +func (cs *ClustersType) Get(name string) (*ClusterType, bool) { + cf := strings.ToLower(strings.TrimSpace(config.C.ClustersFrom)) + cs.mutex.RLock() c, has := cs.datas[name] - return c, has + cs.mutex.RUnlock() + if has { + return c, true + } + + if cf == "" || cf == "config" { + return nil, false + } + + // read from api + if cf == "api" { + return cs.GetFromAPI(name) + } + + return nil, false +} + +func (cs *ClustersType) GetFromAPI(name string) (*ClusterType, bool) { + // get from api, parse body + // 1. not found? return nil, false + // 2. found? new ClusterType, put, return + opt := config.ClusterOptions{ + Name: "", + Prom: "", + BasicAuthUser: "", + BasicAuthPass: "", + Timeout: 60000, + DialTimeout: 5000, + MaxIdleConnsPerHost: 32, + } + + cluster := &ClusterType{ + Opts: opt, + Transport: &http.Transport{ + // TLSClientConfig: tlsConfig, + Proxy: http.ProxyFromEnvironment, + DialContext: (&net.Dialer{ + Timeout: time.Duration(opt.DialTimeout) * time.Millisecond, + }).DialContext, + ResponseHeaderTimeout: time.Duration(opt.Timeout) * time.Millisecond, + MaxIdleConnsPerHost: opt.MaxIdleConnsPerHost, + }, + } + + cs.Put(opt.Name, cluster) + return cluster, true } var Clusters = ClustersType{ - datas: make(map[string]ClusterType), + datas: make(map[string]*ClusterType), mutex: new(sync.RWMutex), } @@ -46,7 +92,7 @@ func Init() error { opts := config.C.Clusters for i := 0; i < len(opts); i++ { - cluster := ClusterType{ + cluster := &ClusterType{ Opts: opts[i], Transport: &http.Transport{ // TLSClientConfig: tlsConfig, From 06224e4b202a2fc76d3cb5b995cb730917aaf647 Mon Sep 17 00:00:00 2001 From: Ulric Qin Date: Sun, 22 May 2022 17:03:57 +0800 Subject: [PATCH 2/5] refactor --- src/webapi/prom/prom.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/webapi/prom/prom.go b/src/webapi/prom/prom.go index 9eab3e5b..06a0c8f5 100644 --- a/src/webapi/prom/prom.go +++ b/src/webapi/prom/prom.go @@ -53,7 +53,7 @@ func (cs *ClustersType) GetFromAPI(name string) (*ClusterType, bool) { // 1. not found? return nil, false // 2. found? new ClusterType, put, return opt := config.ClusterOptions{ - Name: "", + Name: name, Prom: "", BasicAuthUser: "", BasicAuthPass: "", From 2f724075b211389ca56f34de5f930ad296ffec4c Mon Sep 17 00:00:00 2001 From: Ulric Qin Date: Mon, 23 May 2022 13:13:35 +0800 Subject: [PATCH 3/5] loop load clusters from api --- src/webapi/prom/prom.go | 191 +++++++++++++++++++++++++++++----------- 1 file changed, 142 insertions(+), 49 deletions(-) diff --git a/src/webapi/prom/prom.go b/src/webapi/prom/prom.go index 06a0c8f5..be5b2df3 100644 --- a/src/webapi/prom/prom.go +++ b/src/webapi/prom/prom.go @@ -1,6 +1,10 @@ package prom import ( + "encoding/json" + "fmt" + "io" + "math/rand" "net" "net/http" "strings" @@ -8,6 +12,8 @@ import ( "time" "github.com/didi/nightingale/v5/src/webapi/config" + "github.com/toolkits/pkg/logger" + "github.com/toolkits/pkg/net/httplib" ) type ClusterType struct { @@ -27,56 +33,10 @@ func (cs *ClustersType) Put(name string, cluster *ClusterType) { } func (cs *ClustersType) Get(name string) (*ClusterType, bool) { - cf := strings.ToLower(strings.TrimSpace(config.C.ClustersFrom)) - cs.mutex.RLock() c, has := cs.datas[name] cs.mutex.RUnlock() - if has { - return c, true - } - - if cf == "" || cf == "config" { - return nil, false - } - - // read from api - if cf == "api" { - return cs.GetFromAPI(name) - } - - return nil, false -} - -func (cs *ClustersType) GetFromAPI(name string) (*ClusterType, bool) { - // get from api, parse body - // 1. not found? return nil, false - // 2. found? new ClusterType, put, return - opt := config.ClusterOptions{ - Name: name, - Prom: "", - BasicAuthUser: "", - BasicAuthPass: "", - Timeout: 60000, - DialTimeout: 5000, - MaxIdleConnsPerHost: 32, - } - - cluster := &ClusterType{ - Opts: opt, - Transport: &http.Transport{ - // TLSClientConfig: tlsConfig, - Proxy: http.ProxyFromEnvironment, - DialContext: (&net.Dialer{ - Timeout: time.Duration(opt.DialTimeout) * time.Millisecond, - }).DialContext, - ResponseHeaderTimeout: time.Duration(opt.Timeout) * time.Millisecond, - MaxIdleConnsPerHost: opt.MaxIdleConnsPerHost, - }, - } - - cs.Put(opt.Name, cluster) - return cluster, true + return c, has } var Clusters = ClustersType{ @@ -85,10 +45,19 @@ var Clusters = ClustersType{ } func Init() error { - if config.C.ClustersFrom != "" && config.C.ClustersFrom != "config" { - return nil + cf := strings.ToLower(strings.TrimSpace(config.C.ClustersFrom)) + if cf == "" || cf == "config" { + return initClustersFromConfig() } + if cf == "api" { + return initClustersFromAPI() + } + + return fmt.Errorf("invalid configuration ClustersFrom: %s", cf) +} + +func initClustersFromConfig() error { opts := config.C.Clusters for i := 0; i < len(opts); i++ { @@ -109,3 +78,127 @@ func Init() error { return nil } + +type DSReply struct { + RequestID string `json:"request_id"` + Data struct { + P int `json:"p"` + Limit int `json:"limit"` + Total int `json:"total"` + Items []struct { + ID int `json:"id"` + PluginID int `json:"plugin_id"` + Name string `json:"name"` + Status string `json:"status"` + Category string `json:"category"` + PluginType string `json:"plugin_type"` + PluginTypeName string `json:"plugin_type_name"` + Settings struct { + PrometheusAddr string `json:"prometheus.addr"` + PrometheusBasic struct { + PrometheusUser string `json:"promethues.user"` + PrometheusPass string `json:"promethues.password"` + } `json:"promethues.basic"` + PrometheusTimeout int64 `json:"prometheus.timeout"` + } `json:"settings,omitempty"` + CreatedAt int `json:"created_at"` + UpdatedAt int `json:"updated_at"` + LastTime int `json:"last_time"` + } `json:"items"` + } `json:"data"` +} + +func initClustersFromAPI() error { + go func() { + for { + loadClustersFromAPI() + time.Sleep(time.Second * 3) + } + }() + return nil +} + +func loadClustersFromAPI() { + urls := config.C.ClustersFromAPIs + if len(urls) == 0 { + logger.Error("configuration(ClustersFromAPIs) empty") + return + } + + var reply DSReply + + count := len(urls) + for _, i := range rand.Perm(count) { + url := urls[i] + + res, err := httplib.Get(url).SetTimeout(time.Duration(3000) * time.Millisecond).Response() + if err != nil { + logger.Errorf("curl %s fail: %v", url, err) + continue + } + + if res.StatusCode != 200 { + logger.Errorf("curl %s fail, status code: %d", url, res.StatusCode) + continue + } + + defer res.Body.Close() + + jsonBytes, err := io.ReadAll(res.Body) + if err != nil { + logger.Errorf("read response body of %s fail: %v", url, err) + continue + } + + err = json.Unmarshal(jsonBytes, &reply) + if err != nil { + logger.Errorf("unmarshal response body of %s fail: %v", url, err) + continue + } + + break + } + + for _, item := range reply.Data.Items { + if item.Settings.PrometheusAddr == "" { + continue + } + + if item.Name == "" { + continue + } + + old, has := Clusters.Get(item.Name) + if !has || + old.Opts.BasicAuthUser != item.Settings.PrometheusBasic.PrometheusUser || + old.Opts.BasicAuthPass != item.Settings.PrometheusBasic.PrometheusPass || + old.Opts.Timeout != item.Settings.PrometheusTimeout || + old.Opts.Prom != item.Settings.PrometheusAddr { + opt := config.ClusterOptions{ + Name: item.Name, + Prom: item.Settings.PrometheusAddr, + BasicAuthUser: item.Settings.PrometheusBasic.PrometheusUser, + BasicAuthPass: item.Settings.PrometheusBasic.PrometheusPass, + Timeout: item.Settings.PrometheusTimeout, + DialTimeout: 5000, + MaxIdleConnsPerHost: 32, + } + + cluster := &ClusterType{ + Opts: opt, + Transport: &http.Transport{ + // TLSClientConfig: tlsConfig, + Proxy: http.ProxyFromEnvironment, + DialContext: (&net.Dialer{ + Timeout: time.Duration(opt.DialTimeout) * time.Millisecond, + }).DialContext, + ResponseHeaderTimeout: time.Duration(opt.Timeout) * time.Millisecond, + MaxIdleConnsPerHost: opt.MaxIdleConnsPerHost, + }, + } + + Clusters.Put(item.Name, cluster) + continue + } + } +} From fd29d183123120e6033d56158533f899c171de75 Mon Sep 17 00:00:00 2001 From: Ulric Qin Date: Mon, 23 May 2022 13:29:08 +0800 Subject: [PATCH 4/5] delete no use code --- src/webapi/prom/prom.go | 16 ++-------------- 1 file changed, 2 insertions(+), 14 deletions(-) diff --git a/src/webapi/prom/prom.go b/src/webapi/prom/prom.go index be5b2df3..1fb4a2c4 100644 --- a/src/webapi/prom/prom.go +++ b/src/webapi/prom/prom.go @@ -82,18 +82,9 @@ func initClustersFromConfig() error { type DSReply struct { RequestID string `json:"request_id"` Data struct { - P int `json:"p"` - Limit int `json:"limit"` - Total int `json:"total"` Items []struct { - ID int `json:"id"` - PluginID int `json:"plugin_id"` - Name string `json:"name"` - Status string `json:"status"` - Category string `json:"category"` - PluginType string `json:"plugin_type"` - PluginTypeName string `json:"plugin_type_name"` - Settings struct { + Name string `json:"name"` + Settings struct { PrometheusAddr string `json:"prometheus.addr"` PrometheusBasic struct { PrometheusUser string `json:"promethues.user"` @@ -101,9 +92,6 @@ type DSReply struct { } `json:"promethues.basic"` PrometheusTimeout int64 `json:"prometheus.timeout"` } `json:"settings,omitempty"` - CreatedAt int `json:"created_at"` - UpdatedAt int `json:"updated_at"` - LastTime int `json:"last_time"` } `json:"items"` } `json:"data"` } From c2f2a7d5e25e661bb678979c2b26617cc0721c48 Mon Sep 17 00:00:00 2001 From: Ulric Qin Date: Mon, 23 May 2022 13:31:05 +0800 Subject: [PATCH 5/5] use post method to get datasources --- src/webapi/prom/prom.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/webapi/prom/prom.go b/src/webapi/prom/prom.go index 1fb4a2c4..c4a7ccb9 100644 --- a/src/webapi/prom/prom.go +++ b/src/webapi/prom/prom.go @@ -119,7 +119,7 @@ func loadClustersFromAPI() { for _, i := range rand.Perm(count) { url := urls[i] - res, err := httplib.Get(url).SetTimeout(time.Duration(3000) * time.Millisecond).Response() + res, err := httplib.Post(url).SetTimeout(time.Duration(3000) * time.Millisecond).Response() if err != nil { logger.Errorf("curl %s fail: %v", url, err) continue