categraf/agent/metrics_reader.go

127 lines
2.5 KiB
Go

package agent
import (
"log"
"strings"
"sync"
"sync/atomic"
"time"
"flashcat.cloud/categraf/config"
"flashcat.cloud/categraf/house"
"flashcat.cloud/categraf/inputs"
"flashcat.cloud/categraf/pkg/runtimex"
"flashcat.cloud/categraf/types"
"flashcat.cloud/categraf/writer"
)
var metricReplacer = strings.NewReplacer("-", "_", ".", "_", " ", "_", "'", "_", "\"", "_")
type InputReader struct {
inputName string
input inputs.Input
quitChan chan struct{}
runCounter uint64
waitGroup sync.WaitGroup
}
func (a *Agent) StartInputReader(name string, in inputs.Input) {
reader := NewInputReader(name, in)
go reader.startInput()
a.InputReaders[name] = reader
}
func NewInputReader(inputName string, in inputs.Input) *InputReader {
return &InputReader{
inputName: inputName,
input: in,
quitChan: make(chan struct{}, 1),
}
}
func (r *InputReader) Stop() {
r.quitChan <- struct{}{}
inputs.MayDrop(r.input)
}
func (r *InputReader) startInput() {
interval := config.GetInterval()
if r.input.GetInterval() > 0 {
interval = time.Duration(r.input.GetInterval())
}
for {
select {
case <-r.quitChan:
close(r.quitChan)
return
default:
var start time.Time
if config.Config.DebugMode {
start = time.Now()
log.Println("D!", r.inputName, ": before gather once")
}
r.gatherOnce()
if config.Config.DebugMode {
log.Println("D!", r.inputName, ": after gather once,", "duration:", time.Since(start))
}
time.Sleep(interval)
}
}
}
func (r *InputReader) gatherOnce() {
defer func() {
if rc := recover(); rc != nil {
log.Println("E!", r.inputName, ": gather metrics panic:", r, string(runtimex.Stack(3)))
}
}()
// plugin level, for system plugins
slist := types.NewSampleList()
inputs.MayGather(r.input, slist)
r.forward(r.input.Process(slist))
instances := inputs.MayGetInstances(r.input)
if len(instances) == 0 {
return
}
atomic.AddUint64(&r.runCounter, 1)
for i := 0; i < len(instances); i++ {
r.waitGroup.Add(1)
go func(ins inputs.Instance) {
defer r.waitGroup.Done()
it := ins.GetIntervalTimes()
if it > 0 {
counter := atomic.LoadUint64(&r.runCounter)
if counter%uint64(it) != 0 {
return
}
}
insList := types.NewSampleList()
inputs.MayGather(ins, insList)
r.forward(ins.Process(insList))
}(instances[i])
}
r.waitGroup.Wait()
}
func (r *InputReader) forward(slist *types.SampleList) {
if slist == nil {
return
}
arr := slist.PopBackAll()
for i := 0; i < len(arr); i++ {
writer.PushQueue(arr[i])
house.MetricsHouse.Push(arr[i])
}
}