categraf/agent/agent.go

152 lines
3.4 KiB
Go
Raw Normal View History

2022-04-13 14:09:33 +08:00
package agent
import (
2022-04-25 12:08:56 +08:00
"errors"
2022-04-13 14:09:33 +08:00
"fmt"
2022-04-13 14:19:02 +08:00
"log"
2022-04-13 14:09:33 +08:00
"path"
"strings"
2022-04-13 14:09:33 +08:00
2022-04-13 20:12:20 +08:00
"flashcat.cloud/categraf/config"
"flashcat.cloud/categraf/inputs"
2022-04-13 20:12:20 +08:00
"flashcat.cloud/categraf/pkg/cfg"
"flashcat.cloud/categraf/types"
2022-04-13 14:09:33 +08:00
"github.com/toolkits/pkg/file"
2022-04-13 20:12:20 +08:00
// auto registry
2022-04-15 12:35:45 +08:00
_ "flashcat.cloud/categraf/inputs/cpu"
2022-04-15 13:58:46 +08:00
_ "flashcat.cloud/categraf/inputs/disk"
2022-04-15 16:44:57 +08:00
_ "flashcat.cloud/categraf/inputs/diskio"
2022-04-18 14:23:00 +08:00
_ "flashcat.cloud/categraf/inputs/exec"
2022-05-14 16:26:52 +08:00
_ "flashcat.cloud/categraf/inputs/http_response"
2022-04-16 23:14:48 +08:00
_ "flashcat.cloud/categraf/inputs/kernel"
_ "flashcat.cloud/categraf/inputs/kernel_vmstat"
_ "flashcat.cloud/categraf/inputs/linux_sysctl_fs"
2022-04-15 11:00:18 +08:00
_ "flashcat.cloud/categraf/inputs/mem"
2022-04-26 11:39:56 +08:00
_ "flashcat.cloud/categraf/inputs/mysql"
2022-04-15 23:06:13 +08:00
_ "flashcat.cloud/categraf/inputs/net"
_ "flashcat.cloud/categraf/inputs/net_response"
2022-04-16 22:45:20 +08:00
_ "flashcat.cloud/categraf/inputs/netstat"
2022-04-20 23:45:02 +08:00
_ "flashcat.cloud/categraf/inputs/ntp"
_ "flashcat.cloud/categraf/inputs/nvidia_smi"
2022-04-18 17:35:16 +08:00
_ "flashcat.cloud/categraf/inputs/oracle"
2022-04-19 17:36:45 +08:00
_ "flashcat.cloud/categraf/inputs/ping"
2022-04-17 10:50:02 +08:00
_ "flashcat.cloud/categraf/inputs/processes"
_ "flashcat.cloud/categraf/inputs/procstat"
2022-04-28 15:20:57 +08:00
_ "flashcat.cloud/categraf/inputs/prometheus"
2022-04-29 16:36:57 +08:00
_ "flashcat.cloud/categraf/inputs/rabbitmq"
_ "flashcat.cloud/categraf/inputs/redis"
2022-04-14 18:34:27 +08:00
_ "flashcat.cloud/categraf/inputs/system"
2022-04-28 10:14:49 +08:00
_ "flashcat.cloud/categraf/inputs/tomcat"
2022-04-13 14:09:33 +08:00
)
2022-04-15 12:35:45 +08:00
type Agent struct {
InputFilters map[string]struct{}
}
2022-04-13 14:09:33 +08:00
2022-04-15 12:35:45 +08:00
func NewAgent(filters map[string]struct{}) *Agent {
return &Agent{
InputFilters: filters,
2022-04-13 14:09:33 +08:00
}
}
2022-04-13 14:19:02 +08:00
func (a *Agent) Start() {
log.Println("I! agent starting")
2022-04-13 17:16:16 +08:00
a.startInputs()
2022-04-13 14:19:02 +08:00
}
func (a *Agent) Stop() {
log.Println("I! agent stopping")
2022-04-14 00:05:48 +08:00
for name := range InputReaders {
2022-04-16 16:52:23 +08:00
InputReaders[name].QuitChan <- struct{}{}
2022-04-14 00:05:48 +08:00
close(InputReaders[name].Queue)
2022-04-18 17:35:16 +08:00
InputReaders[name].Instance.Drop()
2022-04-13 17:16:16 +08:00
}
2022-04-15 12:43:17 +08:00
log.Println("I! agent stopped")
2022-04-13 14:19:02 +08:00
}
func (a *Agent) Reload() {
log.Println("I! agent reloading")
2022-04-13 17:16:16 +08:00
a.Stop()
a.Start()
2022-04-13 14:19:02 +08:00
}
2022-04-13 17:16:16 +08:00
func (a *Agent) startInputs() error {
names, err := a.getInputsByDirs()
if err != nil {
return err
}
if len(names) == 0 {
log.Println("I! no inputs")
return nil
}
for _, name := range names {
2022-04-15 12:35:45 +08:00
if len(a.InputFilters) > 0 {
// do filter
if _, has := a.InputFilters[name]; !has {
continue
}
}
creator, has := inputs.InputCreators[name]
if !has {
log.Println("E! input:", name, "not supported")
continue
}
// construct input instance
instance := creator()
// set configurations for input instance
2022-04-13 20:12:20 +08:00
cfg.LoadConfigs(path.Join(config.Config.ConfigDir, "input."+name), instance)
2022-04-15 12:35:45 +08:00
if err = instance.Init(); err != nil {
2022-04-25 12:08:56 +08:00
if !errors.Is(err, types.ErrInstancesEmpty) {
log.Println("E! failed to init input:", name, "error:", err)
}
continue
}
2022-04-16 16:52:23 +08:00
reader := &Reader{
Instance: instance,
2022-04-16 16:52:23 +08:00
QuitChan: make(chan struct{}, 1),
Queue: make(chan *types.Sample, config.Config.WriterOpt.ChanSize),
}
2022-04-13 17:16:16 +08:00
log.Println("I! input:", name, "started")
2022-04-16 16:52:23 +08:00
reader.Start()
2022-04-16 16:52:23 +08:00
InputReaders[name] = reader
}
return nil
}
// input dir should has prefix input.
2022-04-13 17:16:16 +08:00
func (a *Agent) getInputsByDirs() ([]string, error) {
2022-04-13 20:12:20 +08:00
dirs, err := file.DirsUnder(config.Config.ConfigDir)
if err != nil {
2022-04-13 20:12:20 +08:00
return nil, fmt.Errorf("failed to get dirs under %s : %v", config.Config.ConfigDir, err)
}
count := len(dirs)
if count == 0 {
return dirs, nil
}
names := make([]string, 0, count)
for i := 0; i < count; i++ {
if strings.HasPrefix(dirs[i], "input.") {
names = append(names, dirs[i][6:])
}
}
return names, nil
}