diff --git a/conf/example.input.exec/exec.toml b/conf/example.input.exec/exec.toml index 51e1d1e..0493d10 100644 --- a/conf/example.input.exec/exec.toml +++ b/conf/example.input.exec/exec.toml @@ -4,6 +4,7 @@ # # collect interval # interval = 15 +[[instances]] # commands, support glob commands = [ "/opt/categraf/scripts/*.sh" @@ -12,5 +13,8 @@ commands = [ # # timeout for each command to complete # timeout = 5 +# # interval = global.interval * interval_times +# interval_times = 1 + # # mesurement,labelkey1=labelval1,labelkey2=labelval2 field1=1.2,field2=2.3 -# data_format = "influx" \ No newline at end of file +# data_format = "influx" diff --git a/inputs/exec/exec.go b/inputs/exec/exec.go index e9bfa7e..ceef54d 100644 --- a/inputs/exec/exec.go +++ b/inputs/exec/exec.go @@ -10,6 +10,7 @@ import ( "runtime" "strings" "sync" + "sync/atomic" "time" "flashcat.cloud/categraf/config" @@ -25,21 +26,24 @@ import ( const inputName = "exec" const MaxStderrBytes int = 512 +type ExecInstance struct { + Commands []string `toml:"commands"` + Timeout config.Duration `toml:"timeout"` + IntervalTimes int64 `toml:"interval_times"` + DataFormat string `toml:"data_format"` + parser parser.Parser +} + type Exec struct { PrintConfigs bool `toml:"print_configs"` Interval config.Duration `toml:"interval"` - Commands []string `toml:"commands"` - Timeout config.Duration `toml:"timeout"` - DataFormat string `toml:"data_format"` - - parser parser.Parser + Instances []ExecInstance `toml:"instances"` + Counter uint64 } func init() { inputs.Add(inputName, func() inputs.Input { - return &Exec{ - Timeout: config.Duration(time.Second * 5), - } + return &Exec{} }) } @@ -54,24 +58,55 @@ func (e *Exec) GetInterval() config.Duration { } func (e *Exec) Init() error { - if e.DataFormat == "" { - e.parser = influx.NewSeriesParser() - return nil - } + for i := 0; i < len(e.Instances); i++ { + if e.Instances[i].DataFormat == "" || e.Instances[i].DataFormat == "influx" { + e.Instances[i].parser = influx.NewSeriesParser() + } else { + return fmt.Errorf("data_format(%s) not supported", e.Instances[i].DataFormat) + } - switch e.DataFormat { - case "influx": - e.parser = influx.NewSeriesParser() - default: - return fmt.Errorf("data_format(%s) not supported", e.DataFormat) + if e.Instances[i].Timeout == 0 { + e.Instances[i].Timeout = config.Duration(time.Second * 5) + } } return nil } func (e *Exec) Gather() (samples []*types.Sample) { + atomic.AddUint64(&e.Counter, 1) + + slist := list.NewSafeList() + + var wg sync.WaitGroup + wg.Add(len(e.Instances)) + for i := range e.Instances { + ins := e.Instances[i] + go e.GatherOnce(&wg, slist, ins) + } + + wg.Wait() + + interfaceList := slist.PopBackAll() + for i := 0; i < len(interfaceList); i++ { + samples = append(samples, interfaceList[i].(*types.Sample)) + } + + return +} + +func (e *Exec) GatherOnce(wg *sync.WaitGroup, slist *list.SafeList, ins ExecInstance) { + defer wg.Done() + + if ins.IntervalTimes > 0 { + counter := atomic.LoadUint64(&e.Counter) + if counter%uint64(ins.IntervalTimes) != 0 { + return + } + } + var commands []string - for _, pattern := range e.Commands { + for _, pattern := range ins.Commands { cmdAndArgs := strings.SplitN(pattern, " ", 2) if len(cmdAndArgs) == 0 { continue @@ -106,34 +141,25 @@ func (e *Exec) Gather() (samples []*types.Sample) { return } - slist := list.NewSafeList() - - var wg sync.WaitGroup - wg.Add(len(commands)) + var waitCommands sync.WaitGroup + waitCommands.Add(len(commands)) for _, command := range commands { - go e.ProcessCommand(slist, command, &wg) + go e.ProcessCommand(slist, command, ins, &waitCommands) } - wg.Wait() - - interfaceList := slist.PopBackAll() - for i := 0; i < len(interfaceList); i++ { - samples = append(samples, interfaceList[i].(*types.Sample)) - } - - return + waitCommands.Wait() } -func (e *Exec) ProcessCommand(slist *list.SafeList, command string, wg *sync.WaitGroup) { +func (e *Exec) ProcessCommand(slist *list.SafeList, command string, ins ExecInstance, wg *sync.WaitGroup) { defer wg.Done() - out, errbuf, runErr := commandRun(command, time.Duration(e.Timeout)) + out, errbuf, runErr := commandRun(command, time.Duration(ins.Timeout)) if runErr != nil || len(errbuf) > 0 { log.Println("E! exec_command:", command, "error:", runErr, "stderr:", string(errbuf)) return } - metrics, err := e.parser.Parse(out) + metrics, err := ins.parser.Parse(out) if err != nil { log.Println("E! failed to parse command stdout:", err) return