code refactor

This commit is contained in:
Ulric Qin 2022-09-05 18:09:28 +08:00
parent c74702cea1
commit f400e1f3f4
7 changed files with 122 additions and 104 deletions

View File

@ -63,15 +63,16 @@ type Agent struct {
func NewAgent(filters map[string]struct{}) (*Agent, error) {
agent := &Agent{
InputFilters: filters,
InputReaders: make(map[string]*InputReader),
InputProvider: nil,
InputFilters: filters,
InputReaders: make(map[string]*InputReader),
}
provider, err := inputs.NewProvider(config.Config, agent.Reload)
if err != nil {
return nil, err
}
agent.InputProvider = provider
return agent, nil
}

View File

@ -12,6 +12,7 @@ import (
func (a *Agent) startMetricsAgent() error {
a.InputProvider.LoadConfig()
a.InputProvider.StartReloader()
names, err := a.InputProvider.GetInputs()
if err != nil {
return err
@ -46,6 +47,7 @@ func (a *Agent) startMetricsAgent() error {
log.Println("E! failed to get configuration of plugin:", name, "error:", err)
continue
}
err = cfg.LoadConfigs(configs, input)
if err != nil {
log.Println("E! failed to load configuration of plugin:", name, "error:", err)

View File

@ -19,8 +19,8 @@ precision = "ms"
# global collect interval
interval = 15
# input provider settings; optional: LocalProvider / HttpRemoteProvider
providers = ["LocalProvider"]
# input provider settings; optional: local / http
providers = ["local"]
# [global.labels]
# region = "shanghai"
@ -55,7 +55,7 @@ address = ":9100"
print_access = false
run_mode = "release"
[http_remote_provider]
[http_provider]
# HttpRemoteProvider插件通过Http请求的方式获取Categraf的配置
# 通过设置global中的provider为HttpRemoteProvider启用
# example request: GET /categraf/configs?agent=categraf&host=machine1 HTTP/1.1
@ -86,17 +86,22 @@ run_mode = "release"
remote_url = "http://localhost:20000/categraf/configs"
# header settings when request config from remote
headers = {agent = "categraf"}
# headers = ["X-From", "categraf", "X-Xyz", "abc"]
# http basic auth config
basic_auth_user = ""
basic_auth_pass = ""
# tls InsecureSkipVerify
tls_insecure_skip_verify = false
# basic_auth_user = ""
# basic_auth_pass = ""
# http timeout in seconds
timeout = 5
# reload interval in seconds
reload_interval = 120
reload_interval = 120
## Optional TLS Config
# use_tls = false
# tls_ca = "/etc/categraf/ca.pem"
# tls_cert = "/etc/categraf/cert.pem"
# tls_key = "/etc/categraf/key.pem"
## Use TLS but skip chain & host verification
# insecure_skip_verify = false

View File

@ -76,7 +76,7 @@ type ConfigType struct {
HTTP *HTTP `toml:"http"`
Prometheus *Prometheus `toml:"prometheus"`
HttpRemoteProviderConfig *HttpRemoteProviderConfig `toml:"http_remote_provider"`
HTTPProviderConfig *HTTPProviderConfig `toml:"http_provider"`
}
var Config *ConfigType

View File

@ -1,12 +1,14 @@
package config
type HttpRemoteProviderConfig struct {
RemoteUrl string `toml:"remote_url"`
Headers map[string]string `toml:"headers"`
AuthUsername string `toml:"basic_auth_user"`
AuthPassword string `toml:"basic_auth_pass"`
TlsInsecureSkipVerify bool `toml:"tls_insecure_skip_verify"`
Timeout int `toml:"timeout"`
import "flashcat.cloud/categraf/pkg/tls"
ReloadInterval int `toml:"reload_interval"`
type HTTPProviderConfig struct {
tls.ClientConfig
RemoteUrl string `toml:"remote_url"`
Headers []string `toml:"headers"`
AuthUsername string `toml:"basic_auth_user"`
AuthPassword string `toml:"basic_auth_pass"`
Timeout int `toml:"timeout"`
ReloadInterval int `toml:"reload_interval"`
}

View File

@ -1,7 +1,6 @@
package inputs
import (
"crypto/tls"
"encoding/json"
"fmt"
"io"
@ -12,6 +11,7 @@ import (
"sync"
"time"
"flashcat.cloud/categraf/pkg/tls"
"github.com/toolkits/pkg/file"
"flashcat.cloud/categraf/config"
@ -60,13 +60,14 @@ type Provider interface {
}
func NewProvider(c *config.ConfigType, reloadFunc func()) (Provider, error) {
log.Println("I! use input provider: ", c.Global.Providers)
log.Println("I! use input provider:", c.Global.Providers)
providers := make([]Provider, 0, len(c.Global.Providers))
for _, p := range c.Global.Providers {
switch p {
case "HttpRemoteProvider":
provider, err := newHttpRemoteProvider(c, reloadFunc)
name := strings.ToLower(p)
switch name {
case "http":
provider, err := newHTTPProvider(c, reloadFunc)
if err != nil {
return nil, err
}
@ -90,7 +91,7 @@ type ProviderManager struct {
}
func (pm *ProviderManager) Name() string {
return "ProviderManager"
return "pm"
}
func (pm *ProviderManager) StartReloader() {
@ -121,7 +122,7 @@ func (pm *ProviderManager) GetInputs() ([]string, error) {
for _, p := range pm.providers {
pInputs, err := p.GetInputs()
if err != nil {
log.Printf("E! provider manager, GetInputs of %s error, skip\n", p.Name())
log.Printf("E! provider manager, GetInputs of %s error: %v, skip\n", p.Name(), err)
continue
}
for _, inputKey := range pInputs {
@ -141,16 +142,20 @@ func (pm *ProviderManager) GetInputConfig(inputName string) ([]cfg.ConfigWithFor
if providerName != p.Name() {
continue
}
pcwf, err := p.GetInputConfig(inputKey)
if err != nil {
log.Printf("E! provider manager, failed to get config of %s from %s, error: %s\n", inputName, p.Name(), err)
continue
}
cwf = append(cwf, pcwf...)
}
if len(cwf) == 0 {
return nil, fmt.Errorf("provider manager, failed to get config of %s", inputName)
}
return cwf, nil
}
@ -168,17 +173,13 @@ func newLocalProvider(c *config.ConfigType) (*LocalProvider, error) {
}
func (lp *LocalProvider) Name() string {
return "LocalProvider"
return "local"
}
// StartReloader 内部可以检查是否有配置的变更,如果有变更,则可以手动执行reloadFunc来重启插件
func (lp *LocalProvider) StartReloader() {
return
}
func (lp *LocalProvider) StartReloader() {}
func (lp *LocalProvider) StopReloader() {
return
}
func (lp *LocalProvider) StopReloader() {}
func (lp *LocalProvider) LoadConfig() (bool, error) {
dirs, err := file.DirsUnder(lp.configDir)
@ -192,9 +193,11 @@ func (lp *LocalProvider) LoadConfig() (bool, error) {
names = append(names, dir[len(inputFilePrefix):])
}
}
lp.Lock()
lp.inputNames = names
lp.Unlock()
return false, nil
}
@ -203,9 +206,7 @@ func (lp *LocalProvider) GetInputs() ([]string, error) {
defer lp.RUnlock()
inputs := make([]string, 0, len(lp.inputNames))
for _, input := range lp.inputNames {
inputs = append(inputs, input)
}
inputs = append(inputs, lp.inputNames...)
return inputs, nil
}
@ -234,30 +235,31 @@ func (lp *LocalProvider) GetInputConfig(inputKey string) ([]cfg.ConfigWithFormat
Format: cfg.GuessFormat(f),
})
}
return cwf, nil
}
type HttpRemoteProvider struct {
type HTTPProvider struct {
sync.RWMutex
RemoteUrl string
Headers map[string]string
AuthUsername string
AuthPassword string
TlsInsecureSkipVerify bool
RemoteUrl string
Headers []string
AuthUsername string
AuthPassword string
Timeout int
ReloadInterval int
tls.ClientConfig
client *http.Client
ch chan struct{}
stopCh chan struct{}
reloadFunc func()
configMap map[string]cfg.ConfigWithFormat
version string
}
type httpRemoteProviderResponse struct {
type httpProviderResponse struct {
// version is signature/md5 of current Config, server side should deal with the Version calculate
Version string `json:"version"`
@ -265,33 +267,35 @@ type httpRemoteProviderResponse struct {
Configs map[string]cfg.ConfigWithFormat `json:"configs"`
}
func (hrp *HttpRemoteProvider) Name() string {
return "HttpRemoteProvider"
func (hrp *HTTPProvider) Name() string {
return "http"
}
func newHttpRemoteProvider(c *config.ConfigType, reloadFunc func()) (*HttpRemoteProvider, error) {
if c.HttpRemoteProviderConfig == nil {
return nil, fmt.Errorf("no http remote provider config found")
func newHTTPProvider(c *config.ConfigType, reloadFunc func()) (*HTTPProvider, error) {
if c.HTTPProviderConfig == nil {
return nil, fmt.Errorf("no http provider config found")
}
httpRemoteProvider := &HttpRemoteProvider{
RemoteUrl: c.HttpRemoteProviderConfig.RemoteUrl,
Headers: c.HttpRemoteProviderConfig.Headers,
AuthUsername: c.HttpRemoteProviderConfig.AuthUsername,
AuthPassword: c.HttpRemoteProviderConfig.AuthPassword,
TlsInsecureSkipVerify: c.HttpRemoteProviderConfig.TlsInsecureSkipVerify,
Timeout: c.HttpRemoteProviderConfig.Timeout,
ReloadInterval: c.HttpRemoteProviderConfig.ReloadInterval,
ch: make(chan struct{}),
reloadFunc: reloadFunc,
provider := &HTTPProvider{
RemoteUrl: c.HTTPProviderConfig.RemoteUrl,
Headers: c.HTTPProviderConfig.Headers,
AuthUsername: c.HTTPProviderConfig.AuthUsername,
AuthPassword: c.HTTPProviderConfig.AuthPassword,
ClientConfig: c.HTTPProviderConfig.ClientConfig,
Timeout: c.HTTPProviderConfig.Timeout,
ReloadInterval: c.HTTPProviderConfig.ReloadInterval,
stopCh: make(chan struct{}),
reloadFunc: reloadFunc,
}
if err := httpRemoteProvider.check(); err != nil {
if err := provider.check(); err != nil {
return nil, err
}
return httpRemoteProvider, nil
return provider, nil
}
func (hrp *HttpRemoteProvider) check() error {
func (hrp *HTTPProvider) check() error {
if hrp.Timeout <= 0 {
hrp.Timeout = 5
}
@ -301,36 +305,36 @@ func (hrp *HttpRemoteProvider) check() error {
}
if !strings.HasPrefix(hrp.RemoteUrl, "http") {
return fmt.Errorf("http remote provider: bad remote url config: %s", hrp.RemoteUrl)
return fmt.Errorf("http provider: bad remote url config: %s", hrp.RemoteUrl)
}
if strings.HasPrefix(hrp.RemoteUrl, "https") && hrp.TlsInsecureSkipVerify {
hrp.client = &http.Client{
Timeout: time.Duration(hrp.Timeout) * time.Second,
Transport: &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
},
}
} else {
hrp.client = &http.Client{
Timeout: time.Duration(hrp.Timeout) * time.Second,
}
tlsc, err := hrp.TLSConfig()
if err != nil {
return err
}
hrp.client = &http.Client{
Timeout: time.Duration(hrp.Timeout) * time.Second,
Transport: &http.Transport{
TLSClientConfig: tlsc,
},
}
return nil
}
func (hrp *HttpRemoteProvider) doReq() (*httpRemoteProviderResponse, error) {
func (hrp *HTTPProvider) doReq() (*httpProviderResponse, error) {
req, err := http.NewRequest("GET", hrp.RemoteUrl, nil)
if err != nil {
log.Println("E! http remote provider: build reload config request error", err)
log.Println("E! http provider: build reload config request error:", err)
return nil, err
}
for k, v := range hrp.Headers {
if k == "Host" {
req.Host = v
for i := 0; i < len(hrp.Headers); i += 2 {
req.Header.Add(hrp.Headers[i], hrp.Headers[i+1])
if hrp.Headers[i] == "Host" {
req.Host = hrp.Headers[i+1]
}
req.Header.Add(k, v)
}
if hrp.AuthUsername != "" || hrp.AuthPassword != "" {
@ -348,29 +352,27 @@ func (hrp *HttpRemoteProvider) doReq() (*httpRemoteProviderResponse, error) {
resp, err := hrp.client.Do(req)
if err != nil {
log.Println("E! http remote provider: request reload config error", err)
log.Println("E! http provider: request reload config error:", err)
return nil, err
}
defer resp.Body.Close()
respData, err := io.ReadAll(resp.Body)
if err != nil {
log.Println("E! http remote provider: request reload config error", err)
log.Println("E! http provider: request reload config error:", err)
return nil, err
}
confResp := &httpRemoteProviderResponse{}
confResp := &httpProviderResponse{}
err = json.Unmarshal(respData, confResp)
if err != nil {
log.Println("E! http remote provider: unmarshal result error", err)
log.Println("E! http provider: unmarshal result error:", err)
return nil, err
}
return confResp, nil
}
func (hrp *HttpRemoteProvider) LoadConfig() (bool, error) {
//var confResp *httpRemoteProviderResponse
log.Println("E! http remote provider: start reload config from remote", hrp.RemoteUrl)
func (hrp *HTTPProvider) LoadConfig() (bool, error) {
log.Println("I! http provider: start reload config from remote:", hrp.RemoteUrl)
confResp, err := hrp.doReq()
if err != nil {
@ -381,9 +383,10 @@ func (hrp *HttpRemoteProvider) LoadConfig() (bool, error) {
if confResp.Version == hrp.version {
return false, nil
}
// if config is nil, may some error occurs in server side, ignore this instead of deleting all configs
if confResp.Configs == nil {
log.Println("W! http remote provider: received config is empty")
log.Println("W! http provider: received config is empty")
return false, nil
}
@ -396,13 +399,15 @@ func (hrp *HttpRemoteProvider) LoadConfig() (bool, error) {
news, updates, deletes := compareConfig(hrp.configMap, confResp.Configs)
if len(news) > 0 {
log.Println("I! http remote provider: new inputs", news)
log.Println("I! http provider: new inputs:", news)
}
if len(updates) > 0 {
log.Println("I! http remote provider: updated inputs", updates)
log.Println("I! http provider: updated inputs:", updates)
}
if len(deletes) > 0 {
log.Println("I! http remote provider: deleted inputs", deletes)
log.Println("I! http provider: deleted inputs:", deletes)
}
changed := len(news)+len(updates)+len(deletes) > 0
@ -412,10 +417,11 @@ func (hrp *HttpRemoteProvider) LoadConfig() (bool, error) {
hrp.configMap = confResp.Configs
hrp.version = confResp.Version
}
return changed, nil
}
func (hrp *HttpRemoteProvider) StartReloader() {
func (hrp *HTTPProvider) StartReloader() {
go func() {
for {
select {
@ -427,36 +433,37 @@ func (hrp *HttpRemoteProvider) StartReloader() {
if changed {
hrp.reloadFunc()
}
case <-hrp.ch:
case <-hrp.stopCh:
return
}
}
}()
return
}
func (hrp *HttpRemoteProvider) StopReloader() {
hrp.ch <- struct{}{}
func (hrp *HTTPProvider) StopReloader() {
hrp.stopCh <- struct{}{}
}
func (hrp *HttpRemoteProvider) GetInputs() ([]string, error) {
func (hrp *HTTPProvider) GetInputs() ([]string, error) {
hrp.RLock()
defer hrp.RUnlock()
inputs := make([]string, 0, len(hrp.configMap))
for k, _ := range hrp.configMap {
for k := range hrp.configMap {
inputs = append(inputs, k)
}
return inputs, nil
}
func (hrp *HttpRemoteProvider) GetInputConfig(inputKey string) ([]cfg.ConfigWithFormat, error) {
func (hrp *HTTPProvider) GetInputConfig(inputKey string) ([]cfg.ConfigWithFormat, error) {
hrp.RLock()
defer hrp.RUnlock()
if conf, has := hrp.configMap[inputKey]; has {
return []cfg.ConfigWithFormat{conf}, nil
}
return nil, nil
}
@ -476,10 +483,11 @@ func compareConfig(cold, cnew map[string]cfg.ConfigWithFormat) (news, updates, d
}
}
for knew, _ := range cnew {
for knew := range cnew {
if _, has := cold[knew]; !has {
news = append(news, knew)
}
}
return
}