control interval for instance level

This commit is contained in:
Ulric Qin 2022-04-19 12:08:44 +08:00
parent 1b5aa869d8
commit e41629ffe6
2 changed files with 65 additions and 35 deletions

View File

@ -4,6 +4,7 @@
# # collect interval # # collect interval
# interval = 15 # interval = 15
[[instances]]
# commands, support glob # commands, support glob
commands = [ commands = [
"/opt/categraf/scripts/*.sh" "/opt/categraf/scripts/*.sh"
@ -12,5 +13,8 @@ commands = [
# # timeout for each command to complete # # timeout for each command to complete
# timeout = 5 # timeout = 5
# # interval = global.interval * interval_times
# interval_times = 1
# # mesurement,labelkey1=labelval1,labelkey2=labelval2 field1=1.2,field2=2.3 # # mesurement,labelkey1=labelval1,labelkey2=labelval2 field1=1.2,field2=2.3
# data_format = "influx" # data_format = "influx"

View File

@ -10,6 +10,7 @@ import (
"runtime" "runtime"
"strings" "strings"
"sync" "sync"
"sync/atomic"
"time" "time"
"flashcat.cloud/categraf/config" "flashcat.cloud/categraf/config"
@ -25,21 +26,24 @@ import (
const inputName = "exec" const inputName = "exec"
const MaxStderrBytes int = 512 const MaxStderrBytes int = 512
type ExecInstance struct {
Commands []string `toml:"commands"`
Timeout config.Duration `toml:"timeout"`
IntervalTimes int64 `toml:"interval_times"`
DataFormat string `toml:"data_format"`
parser parser.Parser
}
type Exec struct { type Exec struct {
PrintConfigs bool `toml:"print_configs"` PrintConfigs bool `toml:"print_configs"`
Interval config.Duration `toml:"interval"` Interval config.Duration `toml:"interval"`
Commands []string `toml:"commands"` Instances []ExecInstance `toml:"instances"`
Timeout config.Duration `toml:"timeout"` Counter uint64
DataFormat string `toml:"data_format"`
parser parser.Parser
} }
func init() { func init() {
inputs.Add(inputName, func() inputs.Input { inputs.Add(inputName, func() inputs.Input {
return &Exec{ return &Exec{}
Timeout: config.Duration(time.Second * 5),
}
}) })
} }
@ -54,24 +58,55 @@ func (e *Exec) GetInterval() config.Duration {
} }
func (e *Exec) Init() error { func (e *Exec) Init() error {
if e.DataFormat == "" { for i := 0; i < len(e.Instances); i++ {
e.parser = influx.NewSeriesParser() if e.Instances[i].DataFormat == "" || e.Instances[i].DataFormat == "influx" {
return nil e.Instances[i].parser = influx.NewSeriesParser()
} } else {
return fmt.Errorf("data_format(%s) not supported", e.Instances[i].DataFormat)
}
switch e.DataFormat { if e.Instances[i].Timeout == 0 {
case "influx": e.Instances[i].Timeout = config.Duration(time.Second * 5)
e.parser = influx.NewSeriesParser() }
default:
return fmt.Errorf("data_format(%s) not supported", e.DataFormat)
} }
return nil return nil
} }
func (e *Exec) Gather() (samples []*types.Sample) { func (e *Exec) Gather() (samples []*types.Sample) {
atomic.AddUint64(&e.Counter, 1)
slist := list.NewSafeList()
var wg sync.WaitGroup
wg.Add(len(e.Instances))
for i := range e.Instances {
ins := e.Instances[i]
go e.GatherOnce(&wg, slist, ins)
}
wg.Wait()
interfaceList := slist.PopBackAll()
for i := 0; i < len(interfaceList); i++ {
samples = append(samples, interfaceList[i].(*types.Sample))
}
return
}
func (e *Exec) GatherOnce(wg *sync.WaitGroup, slist *list.SafeList, ins ExecInstance) {
defer wg.Done()
if ins.IntervalTimes > 0 {
counter := atomic.LoadUint64(&e.Counter)
if counter%uint64(ins.IntervalTimes) != 0 {
return
}
}
var commands []string var commands []string
for _, pattern := range e.Commands { for _, pattern := range ins.Commands {
cmdAndArgs := strings.SplitN(pattern, " ", 2) cmdAndArgs := strings.SplitN(pattern, " ", 2)
if len(cmdAndArgs) == 0 { if len(cmdAndArgs) == 0 {
continue continue
@ -106,34 +141,25 @@ func (e *Exec) Gather() (samples []*types.Sample) {
return return
} }
slist := list.NewSafeList() var waitCommands sync.WaitGroup
waitCommands.Add(len(commands))
var wg sync.WaitGroup
wg.Add(len(commands))
for _, command := range commands { for _, command := range commands {
go e.ProcessCommand(slist, command, &wg) go e.ProcessCommand(slist, command, ins, &waitCommands)
} }
wg.Wait() waitCommands.Wait()
interfaceList := slist.PopBackAll()
for i := 0; i < len(interfaceList); i++ {
samples = append(samples, interfaceList[i].(*types.Sample))
}
return
} }
func (e *Exec) ProcessCommand(slist *list.SafeList, command string, wg *sync.WaitGroup) { func (e *Exec) ProcessCommand(slist *list.SafeList, command string, ins ExecInstance, wg *sync.WaitGroup) {
defer wg.Done() defer wg.Done()
out, errbuf, runErr := commandRun(command, time.Duration(e.Timeout)) out, errbuf, runErr := commandRun(command, time.Duration(ins.Timeout))
if runErr != nil || len(errbuf) > 0 { if runErr != nil || len(errbuf) > 0 {
log.Println("E! exec_command:", command, "error:", runErr, "stderr:", string(errbuf)) log.Println("E! exec_command:", command, "error:", runErr, "stderr:", string(errbuf))
return return
} }
metrics, err := e.parser.Parse(out) metrics, err := ins.parser.Parse(out)
if err != nil { if err != nil {
log.Println("E! failed to parse command stdout:", err) log.Println("E! failed to parse command stdout:", err)
return return