categraf/agent/metrics_reader.go

189 lines
3.6 KiB
Go
Raw Normal View History

2022-04-13 20:12:20 +08:00
package agent
import (
"log"
2022-04-13 23:35:43 +08:00
"strings"
2022-04-13 20:12:20 +08:00
"time"
"flashcat.cloud/categraf/config"
"flashcat.cloud/categraf/house"
2022-04-13 20:12:20 +08:00
"flashcat.cloud/categraf/inputs"
"flashcat.cloud/categraf/pkg/runtimex"
2022-04-13 20:12:20 +08:00
"flashcat.cloud/categraf/types"
"flashcat.cloud/categraf/writer"
2022-04-25 15:34:15 +08:00
"github.com/toolkits/pkg/container/list"
2022-04-13 20:12:20 +08:00
)
2022-04-14 15:09:14 +08:00
const agentHostnameLabelKey = "agent_hostname"
2022-06-29 08:57:38 +08:00
var metricReplacer = strings.NewReplacer("-", "_", ".", "_", " ", "_", "'", "_", "\"", "_")
2022-06-24 17:00:49 +08:00
type InputReader struct {
2022-07-19 18:43:05 +08:00
inputName string
input inputs.Input
2022-07-19 19:24:06 +08:00
quitChan chan struct{}
queue chan *types.Sample
2022-04-13 20:12:20 +08:00
}
2022-07-19 18:43:05 +08:00
func NewInputReader(inputName string, in inputs.Input) *InputReader {
2022-06-24 17:00:49 +08:00
return &InputReader{
2022-07-19 18:43:05 +08:00
inputName: inputName,
input: in,
quitChan: make(chan struct{}, 1),
queue: make(chan *types.Sample, config.Config.WriterOpt.ChanSize),
2022-06-24 17:00:49 +08:00
}
}
2022-04-13 20:12:20 +08:00
2022-07-19 18:43:05 +08:00
func (r *InputReader) Start() {
2022-04-13 20:12:20 +08:00
// start consumer goroutines
2022-07-19 18:43:05 +08:00
go r.read()
2022-04-13 20:12:20 +08:00
2022-04-16 16:52:23 +08:00
// start collector instance
2022-07-19 18:43:05 +08:00
go r.startInput()
2022-04-13 20:12:20 +08:00
}
2022-06-24 17:00:49 +08:00
func (r *InputReader) Stop() {
r.quitChan <- struct{}{}
2022-06-24 17:14:59 +08:00
r.input.Drop()
2022-06-24 17:00:49 +08:00
}
2022-07-19 18:43:05 +08:00
func (r *InputReader) startInput() {
2022-04-16 16:52:23 +08:00
interval := config.GetInterval()
2022-06-24 17:14:59 +08:00
if r.input.GetInterval() > 0 {
interval = time.Duration(r.input.GetInterval())
2022-04-16 16:52:23 +08:00
}
2022-07-07 10:12:26 +08:00
2022-04-16 16:52:23 +08:00
for {
select {
2022-06-24 17:00:49 +08:00
case <-r.quitChan:
close(r.quitChan)
2022-07-08 13:49:41 +08:00
close(r.queue)
2022-04-16 16:52:23 +08:00
return
default:
time.Sleep(interval)
2022-07-08 13:49:41 +08:00
var start time.Time
if config.Config.DebugMode {
start = time.Now()
2022-07-19 18:43:05 +08:00
log.Println("D!", r.inputName, ": before gather once")
2022-07-08 13:49:41 +08:00
}
2022-07-19 18:43:05 +08:00
r.gatherOnce()
2022-07-08 13:49:41 +08:00
if config.Config.DebugMode {
ms := time.Since(start).Milliseconds()
2022-07-19 18:43:05 +08:00
log.Println("D!", r.inputName, ": after gather once,", "duration:", ms, "ms")
2022-07-08 13:49:41 +08:00
}
2022-04-16 16:52:23 +08:00
}
}
}
2022-07-19 18:43:05 +08:00
func (r *InputReader) gatherOnce() {
2022-04-16 16:52:23 +08:00
defer func() {
2022-07-19 18:43:05 +08:00
if rc := recover(); rc != nil {
log.Println("E!", r.inputName, ": gather metrics panic:", r, string(runtimex.Stack(3)))
2022-04-16 16:52:23 +08:00
}
}()
2022-04-25 15:34:15 +08:00
// gather
slist := list.NewSafeList()
2022-06-24 17:14:59 +08:00
r.input.Gather(slist)
2022-04-25 15:34:15 +08:00
// handle result
samples := slist.PopBackAll()
2022-04-16 16:52:23 +08:00
2022-07-08 13:49:41 +08:00
size := len(samples)
if size == 0 {
2022-04-16 16:52:23 +08:00
return
}
2022-07-08 13:49:41 +08:00
if config.Config.DebugMode {
2022-07-19 18:43:05 +08:00
log.Println("D!", r.inputName, ": gathered samples size:", size)
2022-07-08 13:49:41 +08:00
}
2022-04-16 16:52:23 +08:00
now := time.Now()
2022-07-08 13:49:41 +08:00
for i := 0; i < size; i++ {
2022-04-19 18:09:16 +08:00
if samples[i] == nil {
continue
}
2022-04-25 15:34:15 +08:00
s := samples[i].(*types.Sample)
if s == nil {
continue
}
2022-04-25 15:34:15 +08:00
if s.Timestamp.IsZero() {
s.Timestamp = now
2022-04-16 16:52:23 +08:00
}
2022-06-24 17:14:59 +08:00
if len(r.input.Prefix()) > 0 {
s.Metric = r.input.Prefix() + "_" + metricReplacer.Replace(s.Metric)
2022-04-18 14:23:00 +08:00
} else {
s.Metric = metricReplacer.Replace(s.Metric)
}
if s.Labels == nil {
s.Labels = make(map[string]string)
}
// add label: agent_hostname
if _, has := s.Labels[agentHostnameLabelKey]; !has {
if !config.Config.Global.OmitHostname {
s.Labels[agentHostnameLabelKey] = config.Config.GetHostname()
}
}
// add global labels
for k, v := range config.Config.Global.Labels {
if _, has := s.Labels[k]; !has {
s.Labels[k] = v
}
2022-04-18 14:23:00 +08:00
}
// write to remote write queue
2022-06-24 17:00:49 +08:00
r.queue <- s
// write to clickhouse queue
house.MetricsHouse.Push(s)
2022-04-16 16:52:23 +08:00
}
}
2022-07-19 18:43:05 +08:00
func (r *InputReader) read() {
2022-04-13 23:56:04 +08:00
batch := config.Config.WriterOpt.Batch
if batch <= 0 {
batch = 2000
}
2022-06-17 20:21:37 +08:00
series := make([]*types.Sample, 0, batch)
2022-04-13 20:12:20 +08:00
var count int
for {
select {
2022-06-24 17:00:49 +08:00
case item, open := <-r.queue:
2022-05-11 17:19:51 +08:00
if !open {
2022-04-15 12:35:45 +08:00
// queue closed
return
}
2022-05-11 17:42:16 +08:00
if item == nil {
continue
}
2022-06-17 20:21:37 +08:00
series = append(series, item)
2022-04-13 20:12:20 +08:00
count++
if count >= batch {
2022-06-17 20:21:37 +08:00
writer.PostSeries(series)
2022-04-13 20:12:20 +08:00
count = 0
2022-06-17 20:21:37 +08:00
series = make([]*types.Sample, 0, batch)
2022-04-13 20:12:20 +08:00
}
default:
if len(series) > 0 {
2022-06-17 20:21:37 +08:00
writer.PostSeries(series)
2022-04-13 20:12:20 +08:00
count = 0
2022-06-17 20:21:37 +08:00
series = make([]*types.Sample, 0, batch)
2022-04-13 20:12:20 +08:00
}
time.Sleep(time.Millisecond * 100)
}
}
}