refactor agent

This commit is contained in:
Ulric Qin 2022-06-24 16:35:46 +08:00
parent b7d3925d53
commit de571b21f2
5 changed files with 175 additions and 98 deletions

View File

@ -1,17 +1,7 @@
package agent
import (
"errors"
"fmt"
"log"
"path"
"strings"
"flashcat.cloud/categraf/config"
"flashcat.cloud/categraf/inputs"
"flashcat.cloud/categraf/pkg/cfg"
"flashcat.cloud/categraf/types"
"github.com/toolkits/pkg/file"
// auto registry
_ "flashcat.cloud/categraf/inputs/conntrack"
@ -46,8 +36,6 @@ import (
_ "flashcat.cloud/categraf/inputs/zookeeper"
)
const inputFilePrefix = "input."
type Agent struct {
InputFilters map[string]struct{}
}
@ -60,101 +48,21 @@ func NewAgent(filters map[string]struct{}) *Agent {
func (a *Agent) Start() {
log.Println("I! agent starting")
a.startLogAgent()
a.startInputs()
a.startMetricsAgent()
log.Println("I! agent started")
}
func (a *Agent) Stop() {
log.Println("I! agent stopping")
stopLogAgent()
for name := range InputReaders {
InputReaders[name].QuitChan <- struct{}{}
close(InputReaders[name].Queue)
InputReaders[name].Instance.Drop()
}
a.stopLogAgent()
a.stopMetricsAgent()
log.Println("I! agent stopped")
}
func (a *Agent) Reload() {
log.Println("I! agent reloading")
a.Stop()
a.Start()
}
func (a *Agent) startInputs() error {
names, err := a.getInputsByDirs()
if err != nil {
return err
}
if len(names) == 0 {
log.Println("I! no inputs")
return nil
}
for _, name := range names {
if len(a.InputFilters) > 0 {
// do filter
if _, has := a.InputFilters[name]; !has {
continue
}
}
creator, has := inputs.InputCreators[name]
if !has {
log.Println("E! input:", name, "not supported")
continue
}
// construct input instance
instance := creator()
// set configurations for input instance
cfg.LoadConfigs(path.Join(config.Config.ConfigDir, inputFilePrefix+name), instance)
if err = instance.Init(); err != nil {
if !errors.Is(err, types.ErrInstancesEmpty) {
log.Println("E! failed to init input:", name, "error:", err)
}
continue
}
reader := &Reader{
Instance: instance,
QuitChan: make(chan struct{}, 1),
Queue: make(chan *types.Sample, config.Config.WriterOpt.ChanSize),
}
log.Println("I! input:", name, "started")
reader.Start()
InputReaders[name] = reader
}
return nil
}
// input dir should has prefix input.
func (a *Agent) getInputsByDirs() ([]string, error) {
dirs, err := file.DirsUnder(config.Config.ConfigDir)
if err != nil {
return nil, fmt.Errorf("failed to get dirs under %s : %v", config.Config.ConfigDir, err)
}
count := len(dirs)
if count == 0 {
return dirs, nil
}
names := make([]string, 0, count)
for i := 0; i < count; i++ {
if strings.HasPrefix(dirs[i], inputFilePrefix) {
names = append(names, dirs[i][len(inputFilePrefix):])
}
}
return names, nil
log.Println("I! agent reloaded")
}

View File

@ -183,7 +183,7 @@ func (a *Agent) startLogAgent() {
}
}
func stopLogAgent() {
func (a *Agent) stopLogAgent() {
if logAgent != nil {
logAgent.Stop()
}

99
agent/metrics_agent.go Normal file
View File

@ -0,0 +1,99 @@
package agent
import (
"errors"
"fmt"
"log"
"path"
"strings"
"flashcat.cloud/categraf/config"
"flashcat.cloud/categraf/inputs"
"flashcat.cloud/categraf/pkg/cfg"
"flashcat.cloud/categraf/types"
"github.com/toolkits/pkg/file"
)
const inputFilePrefix = "input."
func (a *Agent) startMetricsAgent() error {
names, err := a.getInputsByDirs()
if err != nil {
return err
}
if len(names) == 0 {
log.Println("I! no inputs")
return nil
}
for _, name := range names {
if len(a.InputFilters) > 0 {
// do filter
if _, has := a.InputFilters[name]; !has {
continue
}
}
creator, has := inputs.InputCreators[name]
if !has {
log.Println("E! input:", name, "not supported")
continue
}
// construct input instance
instance := creator()
// set configurations for input instance
cfg.LoadConfigs(path.Join(config.Config.ConfigDir, inputFilePrefix+name), instance)
if err = instance.Init(); err != nil {
if !errors.Is(err, types.ErrInstancesEmpty) {
log.Println("E! failed to init input:", name, "error:", err)
}
continue
}
reader := &Reader{
Instance: instance,
QuitChan: make(chan struct{}, 1),
Queue: make(chan *types.Sample, config.Config.WriterOpt.ChanSize),
}
log.Println("I! input:", name, "started")
reader.Start()
InputReaders[name] = reader
}
return nil
}
// input dir should has prefix input.
func (a *Agent) getInputsByDirs() ([]string, error) {
dirs, err := file.DirsUnder(config.Config.ConfigDir)
if err != nil {
return nil, fmt.Errorf("failed to get dirs under %s : %v", config.Config.ConfigDir, err)
}
count := len(dirs)
if count == 0 {
return dirs, nil
}
names := make([]string, 0, count)
for i := 0; i < count; i++ {
if strings.HasPrefix(dirs[i], inputFilePrefix) {
names = append(names, dirs[i][len(inputFilePrefix):])
}
}
return names, nil
}
func (a *Agent) stopMetricsAgent() {
for name := range InputReaders {
InputReaders[name].QuitChan <- struct{}{}
close(InputReaders[name].Queue)
InputReaders[name].Instance.Drop()
}
}

View File

@ -0,0 +1,70 @@
# # collect interval
# interval = 15
############################################################################
# !!! uncomment [[instances]] to enable this plugin
# [[instances]]
# # interval = global.interval * interval_times
# interval_times = 1
# append some labels to metrics
# labels = { cluster="cloud-n9e-es" }
## specify a list of one or more Elasticsearch servers
## you can add username and password to your url to use basic authentication:
## servers = ["http://user:pass@localhost:9200"]
servers = ["http://localhost:9200"]
## Timeout for HTTP requests to the elastic search server(s)
http_timeout = "5s"
## When local is true (the default), the node will read only its own stats.
## Set local to false when you want to read the node stats from all nodes
## of the cluster.
local = true
## Set cluster_health to true when you want to obtain cluster health stats
cluster_health = false
## Adjust cluster_health_level when you want to obtain detailed health stats
## The options are
## - indices (default)
## - cluster
# cluster_health_level = "indices"
## Set cluster_stats to true when you want to obtain cluster stats.
cluster_stats = false
## Only gather cluster_stats from the master node. To work this require local = true
cluster_stats_only_from_master = true
## Indices to collect; can be one or more indices names or _all
## Use of wildcards is allowed. Use a wildcard at the end to retrieve index names that end with a changing value, like a date.
indices_include = ["_all"]
## One of "shards", "cluster", "indices"
## Currently only "shards" is implemented
indices_level = "shards"
## node_stats is a list of sub-stats that you want to have gathered. Valid options
## are "indices", "os", "process", "jvm", "thread_pool", "fs", "transport", "http",
## "breaker". Per default, all stats are gathered.
# node_stats = ["jvm", "http"]
## HTTP Basic Authentication username and password.
# username = ""
# password = ""
## Optional TLS Config
# use_tls = false
# tls_ca = "/etc/categraf/ca.pem"
# tls_cert = "/etc/categraf/cert.pem"
# tls_key = "/etc/categraf/key.pem"
## Use TLS but skip chain & host verification
# insecure_skip_verify = true
## Sets the number of most recent indices to return for indices that are configured with a date-stamped suffix.
## Each 'indices_include' entry ending with a wildcard (*) or glob matching pattern will group together all indices that match it, and
## sort them by the date or number after the wildcard. Metrics then are gathered for only the 'num_most_recent_indices' amount of most
## recent indices.
# num_most_recent_indices = 0