2022-04-13 20:12:20 +08:00
|
|
|
package agent
|
|
|
|
|
|
|
|
import (
|
2022-04-14 23:00:39 +08:00
|
|
|
"log"
|
2022-04-13 23:35:43 +08:00
|
|
|
"strings"
|
2022-07-24 00:18:00 +08:00
|
|
|
"sync"
|
|
|
|
"sync/atomic"
|
2022-04-13 20:12:20 +08:00
|
|
|
"time"
|
|
|
|
|
|
|
|
"flashcat.cloud/categraf/config"
|
2022-06-17 19:04:02 +08:00
|
|
|
"flashcat.cloud/categraf/house"
|
2022-04-13 20:12:20 +08:00
|
|
|
"flashcat.cloud/categraf/inputs"
|
2022-07-03 22:01:41 +08:00
|
|
|
"flashcat.cloud/categraf/pkg/runtimex"
|
2022-04-13 20:12:20 +08:00
|
|
|
"flashcat.cloud/categraf/types"
|
|
|
|
"flashcat.cloud/categraf/writer"
|
|
|
|
)
|
|
|
|
|
2022-06-29 08:57:38 +08:00
|
|
|
var metricReplacer = strings.NewReplacer("-", "_", ".", "_", " ", "_", "'", "_", "\"", "_")
|
2022-06-17 19:04:02 +08:00
|
|
|
|
2022-06-24 17:00:49 +08:00
|
|
|
type InputReader struct {
|
2022-07-24 00:18:00 +08:00
|
|
|
inputName string
|
|
|
|
input inputs.Input
|
|
|
|
quitChan chan struct{}
|
|
|
|
runCounter uint64
|
|
|
|
waitGroup sync.WaitGroup
|
2022-07-23 20:25:27 +08:00
|
|
|
}
|
|
|
|
|
2022-07-23 20:37:47 +08:00
|
|
|
func (a *Agent) StartInputReader(name string, in inputs.Input) {
|
2022-07-23 20:25:27 +08:00
|
|
|
reader := NewInputReader(name, in)
|
|
|
|
go reader.startInput()
|
|
|
|
a.InputReaders[name] = reader
|
2022-04-13 20:12:20 +08:00
|
|
|
}
|
|
|
|
|
2022-07-19 18:43:05 +08:00
|
|
|
func NewInputReader(inputName string, in inputs.Input) *InputReader {
|
2022-06-24 17:00:49 +08:00
|
|
|
return &InputReader{
|
2022-07-19 18:43:05 +08:00
|
|
|
inputName: inputName,
|
|
|
|
input: in,
|
|
|
|
quitChan: make(chan struct{}, 1),
|
2022-06-24 17:00:49 +08:00
|
|
|
}
|
|
|
|
}
|
2022-04-13 20:12:20 +08:00
|
|
|
|
2022-06-24 17:00:49 +08:00
|
|
|
func (r *InputReader) Stop() {
|
|
|
|
r.quitChan <- struct{}{}
|
2022-08-01 19:00:28 +08:00
|
|
|
inputs.MayDrop(r.input)
|
2022-06-24 17:00:49 +08:00
|
|
|
}
|
|
|
|
|
2022-07-19 18:43:05 +08:00
|
|
|
func (r *InputReader) startInput() {
|
2022-04-16 16:52:23 +08:00
|
|
|
interval := config.GetInterval()
|
2022-06-24 17:14:59 +08:00
|
|
|
if r.input.GetInterval() > 0 {
|
|
|
|
interval = time.Duration(r.input.GetInterval())
|
2022-04-16 16:52:23 +08:00
|
|
|
}
|
2022-07-07 10:12:26 +08:00
|
|
|
|
2022-04-16 16:52:23 +08:00
|
|
|
for {
|
|
|
|
select {
|
2022-06-24 17:00:49 +08:00
|
|
|
case <-r.quitChan:
|
|
|
|
close(r.quitChan)
|
2022-04-16 16:52:23 +08:00
|
|
|
return
|
|
|
|
default:
|
2022-07-08 13:49:41 +08:00
|
|
|
var start time.Time
|
|
|
|
if config.Config.DebugMode {
|
|
|
|
start = time.Now()
|
2022-07-19 18:43:05 +08:00
|
|
|
log.Println("D!", r.inputName, ": before gather once")
|
2022-07-08 13:49:41 +08:00
|
|
|
}
|
|
|
|
|
2022-07-19 18:43:05 +08:00
|
|
|
r.gatherOnce()
|
2022-07-08 13:49:41 +08:00
|
|
|
|
|
|
|
if config.Config.DebugMode {
|
2022-07-23 20:37:47 +08:00
|
|
|
log.Println("D!", r.inputName, ": after gather once,", "duration:", time.Since(start))
|
2022-07-08 13:49:41 +08:00
|
|
|
}
|
2022-07-23 20:37:47 +08:00
|
|
|
|
|
|
|
time.Sleep(interval)
|
2022-04-16 16:52:23 +08:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2022-07-25 19:27:14 +08:00
|
|
|
func (r *InputReader) gatherOnce() {
|
|
|
|
defer func() {
|
|
|
|
if rc := recover(); rc != nil {
|
|
|
|
log.Println("E!", r.inputName, ": gather metrics panic:", r, string(runtimex.Stack(3)))
|
|
|
|
}
|
|
|
|
}()
|
2022-07-24 00:18:00 +08:00
|
|
|
|
2022-07-25 19:27:14 +08:00
|
|
|
// plugin level, for system plugins
|
|
|
|
slist := types.NewSampleList()
|
2022-08-01 19:00:28 +08:00
|
|
|
inputs.MayGather(r.input, slist)
|
2022-07-25 19:27:14 +08:00
|
|
|
r.forward(r.input.Process(slist))
|
|
|
|
|
2022-08-01 19:00:28 +08:00
|
|
|
instances := inputs.MayGetInstances(r.input)
|
2022-07-24 00:18:00 +08:00
|
|
|
if len(instances) == 0 {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
atomic.AddUint64(&r.runCounter, 1)
|
|
|
|
|
|
|
|
for i := 0; i < len(instances); i++ {
|
|
|
|
r.waitGroup.Add(1)
|
2022-07-25 19:27:14 +08:00
|
|
|
go func(ins inputs.Instance) {
|
2022-07-24 00:18:00 +08:00
|
|
|
defer r.waitGroup.Done()
|
|
|
|
|
|
|
|
it := ins.GetIntervalTimes()
|
|
|
|
if it > 0 {
|
|
|
|
counter := atomic.LoadUint64(&r.runCounter)
|
|
|
|
if counter%uint64(it) != 0 {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2022-07-25 19:27:14 +08:00
|
|
|
insList := types.NewSampleList()
|
2022-08-01 19:00:28 +08:00
|
|
|
inputs.MayGather(ins, insList)
|
2022-07-25 19:27:14 +08:00
|
|
|
r.forward(ins.Process(insList))
|
|
|
|
}(instances[i])
|
2022-07-24 00:18:00 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
r.waitGroup.Wait()
|
|
|
|
}
|
|
|
|
|
2022-07-25 19:27:14 +08:00
|
|
|
func (r *InputReader) forward(slist *types.SampleList) {
|
2022-08-01 17:43:59 +08:00
|
|
|
if slist == nil {
|
|
|
|
return
|
|
|
|
}
|
2022-07-25 19:27:14 +08:00
|
|
|
arr := slist.PopBackAll()
|
|
|
|
for i := 0; i < len(arr); i++ {
|
|
|
|
writer.PushQueue(arr[i])
|
|
|
|
house.MetricsHouse.Push(arr[i])
|
2022-04-16 16:52:23 +08:00
|
|
|
}
|
|
|
|
}
|