categraf/agent/reader.go

232 lines
4.4 KiB
Go
Raw Normal View History

2022-04-13 20:12:20 +08:00
package agent
import (
2022-04-13 23:35:43 +08:00
"fmt"
"log"
2022-04-15 18:28:43 +08:00
"sort"
2022-04-13 23:35:43 +08:00
"strings"
2022-04-14 15:04:23 +08:00
"sync"
2022-04-13 20:12:20 +08:00
"time"
"flashcat.cloud/categraf/config"
"flashcat.cloud/categraf/inputs"
"flashcat.cloud/categraf/types"
"flashcat.cloud/categraf/writer"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/prompb"
2022-04-25 15:34:15 +08:00
"github.com/toolkits/pkg/container/list"
2022-04-13 20:12:20 +08:00
)
2022-04-14 15:09:14 +08:00
const agentHostnameLabelKey = "agent_hostname"
2022-04-14 00:05:48 +08:00
type Reader struct {
2022-04-13 20:12:20 +08:00
Instance inputs.Input
2022-04-16 16:52:23 +08:00
QuitChan chan struct{}
2022-04-13 20:12:20 +08:00
Queue chan *types.Sample
}
2022-04-14 00:05:48 +08:00
var InputReaders = map[string]*Reader{}
2022-04-13 20:12:20 +08:00
2022-04-16 16:52:23 +08:00
func (r *Reader) Start() {
2022-04-13 20:12:20 +08:00
// start consumer goroutines
2022-04-16 16:52:23 +08:00
go r.read()
2022-04-13 20:12:20 +08:00
2022-04-16 16:52:23 +08:00
// start collector instance
go r.startInstance()
2022-04-13 20:12:20 +08:00
}
2022-04-16 16:52:23 +08:00
func (r *Reader) startInstance() {
interval := config.GetInterval()
2022-04-17 12:54:32 +08:00
if r.Instance.GetInterval() > 0 {
interval = time.Duration(r.Instance.GetInterval())
2022-04-16 16:52:23 +08:00
}
for {
select {
case <-r.QuitChan:
close(r.QuitChan)
return
default:
time.Sleep(interval)
r.gatherOnce()
}
}
}
func (r *Reader) gatherOnce() {
defer func() {
if r := recover(); r != nil {
if strings.Contains(fmt.Sprint(r), "closed channel") {
return
} else {
log.Println("E! gather metrics panic:", r)
}
}
}()
2022-04-25 15:34:15 +08:00
// gather
slist := list.NewSafeList()
r.Instance.Gather(slist)
// handle result
samples := slist.PopBackAll()
2022-04-16 16:52:23 +08:00
if len(samples) == 0 {
return
}
now := time.Now()
for i := 0; i < len(samples); i++ {
2022-04-19 18:09:16 +08:00
if samples[i] == nil {
continue
}
2022-04-25 15:34:15 +08:00
s := samples[i].(*types.Sample)
if s.Timestamp.IsZero() {
s.Timestamp = now
2022-04-16 16:52:23 +08:00
}
if len(r.Instance.Prefix()) > 0 {
s.Metric = r.Instance.Prefix() + "_" + strings.ReplaceAll(s.Metric, "-", "_")
2022-04-18 14:23:00 +08:00
} else {
2022-04-25 15:34:15 +08:00
s.Metric = strings.ReplaceAll(s.Metric, "-", "_")
2022-04-18 14:23:00 +08:00
}
2022-04-25 15:34:15 +08:00
r.Queue <- s
2022-04-16 16:52:23 +08:00
}
}
func (r *Reader) read() {
2022-04-13 23:56:04 +08:00
batch := config.Config.WriterOpt.Batch
if batch <= 0 {
batch = 2000
}
2022-04-13 20:12:20 +08:00
series := make([]*prompb.TimeSeries, 0, batch)
var count int
for {
select {
2022-05-11 17:19:51 +08:00
case item, open := <-r.Queue:
if !open {
2022-04-15 12:35:45 +08:00
// queue closed
return
}
2022-05-11 17:42:16 +08:00
if item == nil {
continue
}
2022-04-13 20:12:20 +08:00
series = append(series, convert(item))
count++
if count >= batch {
postSeries(series)
count = 0
series = make([]*prompb.TimeSeries, 0, batch)
}
default:
if len(series) > 0 {
postSeries(series)
count = 0
series = make([]*prompb.TimeSeries, 0, batch)
}
time.Sleep(time.Millisecond * 100)
}
}
}
func postSeries(series []*prompb.TimeSeries) {
2022-04-13 23:35:43 +08:00
if config.Config.TestMode {
2022-04-15 18:28:43 +08:00
log.Println(">> count:", len(series))
2022-04-13 23:35:43 +08:00
for i := 0; i < len(series); i++ {
var sb strings.Builder
sb.WriteString(">> ")
2022-04-15 18:28:43 +08:00
sort.SliceStable(series[i].Labels, func(x, y int) bool {
return strings.Compare(series[i].Labels[x].Name, series[i].Labels[y].Name) == -1
})
2022-04-13 23:35:43 +08:00
for j := range series[i].Labels {
sb.WriteString(series[i].Labels[j].Name)
sb.WriteString("=")
sb.WriteString(series[i].Labels[j].Value)
sb.WriteString(" ")
}
for j := range series[i].Samples {
sb.WriteString(fmt.Sprint(series[i].Samples[j].Timestamp))
sb.WriteString(" ")
sb.WriteString(fmt.Sprint(series[i].Samples[j].Value))
}
2022-04-26 14:24:52 +08:00
fmt.Println(sb.String())
2022-04-13 23:35:43 +08:00
}
return
}
2022-04-14 15:04:23 +08:00
wg := sync.WaitGroup{}
for key := range writer.Writers {
wg.Add(1)
go func(key string) {
defer wg.Done()
writer.Writers[key].Write(series)
}(key)
2022-04-13 20:12:20 +08:00
}
2022-04-15 18:28:43 +08:00
wg.Wait()
2022-04-13 20:12:20 +08:00
}
func convert(item *types.Sample) *prompb.TimeSeries {
2022-04-13 23:56:04 +08:00
if item.Labels == nil {
item.Labels = make(map[string]string)
}
// add label: agent_hostname
2022-04-14 15:09:14 +08:00
if _, has := item.Labels[agentHostnameLabelKey]; !has {
2022-04-13 23:56:04 +08:00
if !config.Config.Global.OmitHostname {
2022-05-31 16:53:21 +08:00
item.Labels[agentHostnameLabelKey] = config.Config.GetHostname()
2022-04-13 23:56:04 +08:00
}
}
// add global labels
for k, v := range config.Config.Global.Labels {
if _, has := item.Labels[k]; !has {
item.Labels[k] = v
}
}
2022-04-13 20:12:20 +08:00
pt := &prompb.TimeSeries{}
2022-04-14 18:34:27 +08:00
timestamp := item.Timestamp.UnixMilli()
if config.Config.Global.Precision == "s" {
timestamp = item.Timestamp.Unix()
2022-04-13 20:12:20 +08:00
}
pt.Samples = append(pt.Samples, prompb.Sample{
2022-04-14 18:34:27 +08:00
Timestamp: timestamp,
2022-04-13 20:12:20 +08:00
Value: item.Value,
})
// add label: metric
pt.Labels = append(pt.Labels, &prompb.Label{
Name: model.MetricNameLabel,
Value: item.Metric,
})
// add other labels
for k, v := range item.Labels {
2022-06-07 11:23:01 +08:00
k = strings.Replace(k, "/", "_", -1)
k = strings.Replace(k, ".", "_", -1)
k = strings.Replace(k, "-", "_", -1)
k = strings.Replace(k, " ", "_", -1)
2022-04-13 20:12:20 +08:00
pt.Labels = append(pt.Labels, &prompb.Label{
Name: k,
Value: v,
})
}
return pt
}