loop load clusters from api

This commit is contained in:
Ulric Qin 2022-05-23 13:13:35 +08:00
parent 06224e4b20
commit 2f724075b2
1 changed files with 142 additions and 49 deletions

View File

@ -1,6 +1,10 @@
package prom package prom
import ( import (
"encoding/json"
"fmt"
"io"
"math/rand"
"net" "net"
"net/http" "net/http"
"strings" "strings"
@ -8,6 +12,8 @@ import (
"time" "time"
"github.com/didi/nightingale/v5/src/webapi/config" "github.com/didi/nightingale/v5/src/webapi/config"
"github.com/toolkits/pkg/logger"
"github.com/toolkits/pkg/net/httplib"
) )
type ClusterType struct { type ClusterType struct {
@ -27,56 +33,10 @@ func (cs *ClustersType) Put(name string, cluster *ClusterType) {
} }
func (cs *ClustersType) Get(name string) (*ClusterType, bool) { func (cs *ClustersType) Get(name string) (*ClusterType, bool) {
cf := strings.ToLower(strings.TrimSpace(config.C.ClustersFrom))
cs.mutex.RLock() cs.mutex.RLock()
c, has := cs.datas[name] c, has := cs.datas[name]
cs.mutex.RUnlock() cs.mutex.RUnlock()
if has { return c, has
return c, true
}
if cf == "" || cf == "config" {
return nil, false
}
// read from api
if cf == "api" {
return cs.GetFromAPI(name)
}
return nil, false
}
func (cs *ClustersType) GetFromAPI(name string) (*ClusterType, bool) {
// get from api, parse body
// 1. not found? return nil, false
// 2. found? new ClusterType, put, return
opt := config.ClusterOptions{
Name: name,
Prom: "",
BasicAuthUser: "",
BasicAuthPass: "",
Timeout: 60000,
DialTimeout: 5000,
MaxIdleConnsPerHost: 32,
}
cluster := &ClusterType{
Opts: opt,
Transport: &http.Transport{
// TLSClientConfig: tlsConfig,
Proxy: http.ProxyFromEnvironment,
DialContext: (&net.Dialer{
Timeout: time.Duration(opt.DialTimeout) * time.Millisecond,
}).DialContext,
ResponseHeaderTimeout: time.Duration(opt.Timeout) * time.Millisecond,
MaxIdleConnsPerHost: opt.MaxIdleConnsPerHost,
},
}
cs.Put(opt.Name, cluster)
return cluster, true
} }
var Clusters = ClustersType{ var Clusters = ClustersType{
@ -85,10 +45,19 @@ var Clusters = ClustersType{
} }
func Init() error { func Init() error {
if config.C.ClustersFrom != "" && config.C.ClustersFrom != "config" { cf := strings.ToLower(strings.TrimSpace(config.C.ClustersFrom))
return nil if cf == "" || cf == "config" {
return initClustersFromConfig()
} }
if cf == "api" {
return initClustersFromAPI()
}
return fmt.Errorf("invalid configuration ClustersFrom: %s", cf)
}
func initClustersFromConfig() error {
opts := config.C.Clusters opts := config.C.Clusters
for i := 0; i < len(opts); i++ { for i := 0; i < len(opts); i++ {
@ -109,3 +78,127 @@ func Init() error {
return nil return nil
} }
type DSReply struct {
RequestID string `json:"request_id"`
Data struct {
P int `json:"p"`
Limit int `json:"limit"`
Total int `json:"total"`
Items []struct {
ID int `json:"id"`
PluginID int `json:"plugin_id"`
Name string `json:"name"`
Status string `json:"status"`
Category string `json:"category"`
PluginType string `json:"plugin_type"`
PluginTypeName string `json:"plugin_type_name"`
Settings struct {
PrometheusAddr string `json:"prometheus.addr"`
PrometheusBasic struct {
PrometheusUser string `json:"promethues.user"`
PrometheusPass string `json:"promethues.password"`
} `json:"promethues.basic"`
PrometheusTimeout int64 `json:"prometheus.timeout"`
} `json:"settings,omitempty"`
CreatedAt int `json:"created_at"`
UpdatedAt int `json:"updated_at"`
LastTime int `json:"last_time"`
} `json:"items"`
} `json:"data"`
}
func initClustersFromAPI() error {
go func() {
for {
loadClustersFromAPI()
time.Sleep(time.Second * 3)
}
}()
return nil
}
func loadClustersFromAPI() {
urls := config.C.ClustersFromAPIs
if len(urls) == 0 {
logger.Error("configuration(ClustersFromAPIs) empty")
return
}
var reply DSReply
count := len(urls)
for _, i := range rand.Perm(count) {
url := urls[i]
res, err := httplib.Get(url).SetTimeout(time.Duration(3000) * time.Millisecond).Response()
if err != nil {
logger.Errorf("curl %s fail: %v", url, err)
continue
}
if res.StatusCode != 200 {
logger.Errorf("curl %s fail, status code: %d", url, res.StatusCode)
continue
}
defer res.Body.Close()
jsonBytes, err := io.ReadAll(res.Body)
if err != nil {
logger.Errorf("read response body of %s fail: %v", url, err)
continue
}
err = json.Unmarshal(jsonBytes, &reply)
if err != nil {
logger.Errorf("unmarshal response body of %s fail: %v", url, err)
continue
}
break
}
for _, item := range reply.Data.Items {
if item.Settings.PrometheusAddr == "" {
continue
}
if item.Name == "" {
continue
}
old, has := Clusters.Get(item.Name)
if !has ||
old.Opts.BasicAuthUser != item.Settings.PrometheusBasic.PrometheusUser ||
old.Opts.BasicAuthPass != item.Settings.PrometheusBasic.PrometheusPass ||
old.Opts.Timeout != item.Settings.PrometheusTimeout ||
old.Opts.Prom != item.Settings.PrometheusAddr {
opt := config.ClusterOptions{
Name: item.Name,
Prom: item.Settings.PrometheusAddr,
BasicAuthUser: item.Settings.PrometheusBasic.PrometheusUser,
BasicAuthPass: item.Settings.PrometheusBasic.PrometheusPass,
Timeout: item.Settings.PrometheusTimeout,
DialTimeout: 5000,
MaxIdleConnsPerHost: 32,
}
cluster := &ClusterType{
Opts: opt,
Transport: &http.Transport{
// TLSClientConfig: tlsConfig,
Proxy: http.ProxyFromEnvironment,
DialContext: (&net.Dialer{
Timeout: time.Duration(opt.DialTimeout) * time.Millisecond,
}).DialContext,
ResponseHeaderTimeout: time.Duration(opt.Timeout) * time.Millisecond,
MaxIdleConnsPerHost: opt.MaxIdleConnsPerHost,
},
}
Clusters.Put(item.Name, cluster)
continue
}
}
}