2022-04-13 14:09:33 +08:00
|
|
|
package agent
|
|
|
|
|
|
|
|
import (
|
|
|
|
"fmt"
|
2022-04-13 14:19:02 +08:00
|
|
|
"log"
|
2022-04-13 14:09:33 +08:00
|
|
|
"path"
|
2022-04-13 16:24:12 +08:00
|
|
|
"strings"
|
2022-04-13 14:09:33 +08:00
|
|
|
|
2022-04-13 20:12:20 +08:00
|
|
|
"flashcat.cloud/categraf/config"
|
2022-04-13 16:24:12 +08:00
|
|
|
"flashcat.cloud/categraf/inputs"
|
2022-04-13 20:12:20 +08:00
|
|
|
"flashcat.cloud/categraf/pkg/cfg"
|
2022-04-13 16:24:12 +08:00
|
|
|
"flashcat.cloud/categraf/types"
|
2022-04-13 14:09:33 +08:00
|
|
|
"github.com/toolkits/pkg/file"
|
2022-04-13 16:24:12 +08:00
|
|
|
|
2022-04-13 20:12:20 +08:00
|
|
|
// auto registry
|
2022-04-15 12:35:45 +08:00
|
|
|
_ "flashcat.cloud/categraf/inputs/cpu"
|
2022-04-15 13:58:46 +08:00
|
|
|
_ "flashcat.cloud/categraf/inputs/disk"
|
2022-04-15 16:44:57 +08:00
|
|
|
_ "flashcat.cloud/categraf/inputs/diskio"
|
2022-04-15 11:00:18 +08:00
|
|
|
_ "flashcat.cloud/categraf/inputs/mem"
|
2022-04-15 23:06:13 +08:00
|
|
|
_ "flashcat.cloud/categraf/inputs/net"
|
2022-04-16 22:45:20 +08:00
|
|
|
_ "flashcat.cloud/categraf/inputs/netstat"
|
2022-04-13 16:24:12 +08:00
|
|
|
_ "flashcat.cloud/categraf/inputs/redis"
|
2022-04-14 18:34:27 +08:00
|
|
|
_ "flashcat.cloud/categraf/inputs/system"
|
2022-04-13 14:09:33 +08:00
|
|
|
)
|
|
|
|
|
2022-04-15 12:35:45 +08:00
|
|
|
type Agent struct {
|
|
|
|
InputFilters map[string]struct{}
|
|
|
|
}
|
2022-04-13 14:09:33 +08:00
|
|
|
|
2022-04-15 12:35:45 +08:00
|
|
|
func NewAgent(filters map[string]struct{}) *Agent {
|
|
|
|
return &Agent{
|
|
|
|
InputFilters: filters,
|
2022-04-13 14:09:33 +08:00
|
|
|
}
|
|
|
|
}
|
2022-04-13 14:19:02 +08:00
|
|
|
|
|
|
|
func (a *Agent) Start() {
|
|
|
|
log.Println("I! agent starting")
|
2022-04-13 16:24:12 +08:00
|
|
|
|
2022-04-13 17:16:16 +08:00
|
|
|
a.startInputs()
|
2022-04-13 14:19:02 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
func (a *Agent) Stop() {
|
|
|
|
log.Println("I! agent stopping")
|
|
|
|
|
2022-04-14 00:05:48 +08:00
|
|
|
for name := range InputReaders {
|
2022-04-16 16:52:23 +08:00
|
|
|
InputReaders[name].QuitChan <- struct{}{}
|
2022-04-14 00:05:48 +08:00
|
|
|
close(InputReaders[name].Queue)
|
2022-04-13 17:16:16 +08:00
|
|
|
}
|
2022-04-15 12:43:17 +08:00
|
|
|
|
|
|
|
log.Println("I! agent stopped")
|
2022-04-13 14:19:02 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
func (a *Agent) Reload() {
|
|
|
|
log.Println("I! agent reloading")
|
|
|
|
|
2022-04-13 17:16:16 +08:00
|
|
|
a.Stop()
|
|
|
|
a.Start()
|
2022-04-13 14:19:02 +08:00
|
|
|
}
|
2022-04-13 16:24:12 +08:00
|
|
|
|
2022-04-13 17:16:16 +08:00
|
|
|
func (a *Agent) startInputs() error {
|
|
|
|
names, err := a.getInputsByDirs()
|
2022-04-13 16:24:12 +08:00
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
if len(names) == 0 {
|
|
|
|
log.Println("I! no inputs")
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
for _, name := range names {
|
2022-04-15 12:35:45 +08:00
|
|
|
if len(a.InputFilters) > 0 {
|
|
|
|
// do filter
|
|
|
|
if _, has := a.InputFilters[name]; !has {
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2022-04-13 16:24:12 +08:00
|
|
|
creator, has := inputs.InputCreators[name]
|
|
|
|
if !has {
|
|
|
|
log.Println("E! input:", name, "not supported")
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
|
|
|
// construct input instance
|
|
|
|
instance := creator()
|
|
|
|
|
|
|
|
// set configurations for input instance
|
2022-04-13 20:12:20 +08:00
|
|
|
cfg.LoadConfigs(path.Join(config.Config.ConfigDir, "input."+name), instance)
|
2022-04-13 16:24:12 +08:00
|
|
|
|
2022-04-15 12:35:45 +08:00
|
|
|
if err = instance.Init(); err != nil {
|
|
|
|
log.Println("E! failed to init input:", name, "error:", err)
|
2022-04-13 16:24:12 +08:00
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
2022-04-16 16:52:23 +08:00
|
|
|
reader := &Reader{
|
2022-04-13 16:24:12 +08:00
|
|
|
Instance: instance,
|
2022-04-16 16:52:23 +08:00
|
|
|
QuitChan: make(chan struct{}, 1),
|
|
|
|
Queue: make(chan *types.Sample, config.Config.WriterOpt.ChanSize),
|
2022-04-13 16:24:12 +08:00
|
|
|
}
|
|
|
|
|
2022-04-13 17:16:16 +08:00
|
|
|
log.Println("I! input:", name, "started")
|
2022-04-16 16:52:23 +08:00
|
|
|
reader.Start()
|
2022-04-13 16:24:12 +08:00
|
|
|
|
2022-04-16 16:52:23 +08:00
|
|
|
InputReaders[name] = reader
|
2022-04-13 16:24:12 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// input dir should has prefix input.
|
2022-04-13 17:16:16 +08:00
|
|
|
func (a *Agent) getInputsByDirs() ([]string, error) {
|
2022-04-13 20:12:20 +08:00
|
|
|
dirs, err := file.DirsUnder(config.Config.ConfigDir)
|
2022-04-13 16:24:12 +08:00
|
|
|
if err != nil {
|
2022-04-13 20:12:20 +08:00
|
|
|
return nil, fmt.Errorf("failed to get dirs under %s : %v", config.Config.ConfigDir, err)
|
2022-04-13 16:24:12 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
count := len(dirs)
|
|
|
|
if count == 0 {
|
|
|
|
return dirs, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
names := make([]string, 0, count)
|
|
|
|
for i := 0; i < count; i++ {
|
|
|
|
if strings.HasPrefix(dirs[i], "input.") {
|
|
|
|
names = append(names, dirs[i][6:])
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
return names, nil
|
|
|
|
}
|