2022-04-13 20:12:20 +08:00
|
|
|
package agent
|
|
|
|
|
|
|
|
import (
|
2022-04-13 23:35:43 +08:00
|
|
|
"fmt"
|
|
|
|
"strings"
|
2022-04-14 15:04:23 +08:00
|
|
|
"sync"
|
2022-04-13 20:12:20 +08:00
|
|
|
"time"
|
|
|
|
|
|
|
|
"flashcat.cloud/categraf/config"
|
|
|
|
"flashcat.cloud/categraf/inputs"
|
|
|
|
"flashcat.cloud/categraf/types"
|
|
|
|
"flashcat.cloud/categraf/writer"
|
|
|
|
"github.com/prometheus/common/model"
|
|
|
|
"github.com/prometheus/prometheus/prompb"
|
|
|
|
)
|
|
|
|
|
2022-04-14 15:09:14 +08:00
|
|
|
const agentHostnameLabelKey = "agent_hostname"
|
|
|
|
|
2022-04-14 00:05:48 +08:00
|
|
|
type Reader struct {
|
2022-04-13 20:12:20 +08:00
|
|
|
Instance inputs.Input
|
|
|
|
Queue chan *types.Sample
|
|
|
|
}
|
|
|
|
|
2022-04-14 00:05:48 +08:00
|
|
|
var InputReaders = map[string]*Reader{}
|
2022-04-13 20:12:20 +08:00
|
|
|
|
2022-04-14 00:05:48 +08:00
|
|
|
func (c *Reader) Start() {
|
2022-04-13 20:12:20 +08:00
|
|
|
// start consumer goroutines
|
2022-04-14 00:05:48 +08:00
|
|
|
go read(c.Queue)
|
2022-04-13 20:12:20 +08:00
|
|
|
|
|
|
|
// start collector goroutines
|
|
|
|
c.Instance.StartGoroutines(c.Queue)
|
|
|
|
}
|
|
|
|
|
2022-04-14 00:05:48 +08:00
|
|
|
func read(queue chan *types.Sample) {
|
2022-04-13 23:56:04 +08:00
|
|
|
batch := config.Config.WriterOpt.Batch
|
|
|
|
if batch <= 0 {
|
|
|
|
batch = 2000
|
|
|
|
}
|
|
|
|
|
2022-04-13 20:12:20 +08:00
|
|
|
series := make([]*prompb.TimeSeries, 0, batch)
|
|
|
|
|
|
|
|
var count int
|
|
|
|
|
|
|
|
for {
|
|
|
|
select {
|
|
|
|
case item := <-queue:
|
|
|
|
series = append(series, convert(item))
|
|
|
|
count++
|
|
|
|
if count >= batch {
|
|
|
|
postSeries(series)
|
|
|
|
count = 0
|
|
|
|
series = make([]*prompb.TimeSeries, 0, batch)
|
|
|
|
}
|
|
|
|
default:
|
|
|
|
if len(series) > 0 {
|
|
|
|
postSeries(series)
|
|
|
|
count = 0
|
|
|
|
series = make([]*prompb.TimeSeries, 0, batch)
|
|
|
|
}
|
|
|
|
time.Sleep(time.Millisecond * 100)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func postSeries(series []*prompb.TimeSeries) {
|
2022-04-13 23:35:43 +08:00
|
|
|
if config.Config.TestMode {
|
|
|
|
for i := 0; i < len(series); i++ {
|
|
|
|
var sb strings.Builder
|
|
|
|
|
|
|
|
sb.WriteString(">> ")
|
|
|
|
|
|
|
|
for j := range series[i].Labels {
|
|
|
|
sb.WriteString(series[i].Labels[j].Name)
|
|
|
|
sb.WriteString("=")
|
|
|
|
sb.WriteString(series[i].Labels[j].Value)
|
|
|
|
sb.WriteString(" ")
|
|
|
|
}
|
|
|
|
|
|
|
|
for j := range series[i].Samples {
|
|
|
|
sb.WriteString(fmt.Sprint(series[i].Samples[j].Timestamp))
|
|
|
|
sb.WriteString(" ")
|
|
|
|
sb.WriteString(fmt.Sprint(series[i].Samples[j].Value))
|
|
|
|
}
|
|
|
|
|
|
|
|
fmt.Println(sb.String())
|
|
|
|
}
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
2022-04-14 15:04:23 +08:00
|
|
|
wg := sync.WaitGroup{}
|
|
|
|
for key := range writer.Writers {
|
|
|
|
wg.Add(1)
|
|
|
|
go func(key string) {
|
|
|
|
defer wg.Done()
|
|
|
|
writer.Writers[key].Write(series)
|
|
|
|
}(key)
|
2022-04-13 20:12:20 +08:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func convert(item *types.Sample) *prompb.TimeSeries {
|
2022-04-13 23:56:04 +08:00
|
|
|
if item.Labels == nil {
|
|
|
|
item.Labels = make(map[string]string)
|
|
|
|
}
|
|
|
|
|
|
|
|
// add label: agent_hostname
|
2022-04-14 15:09:14 +08:00
|
|
|
if _, has := item.Labels[agentHostnameLabelKey]; !has {
|
2022-04-13 23:56:04 +08:00
|
|
|
if !config.Config.Global.OmitHostname {
|
2022-04-14 15:09:14 +08:00
|
|
|
item.Labels[agentHostnameLabelKey] = config.Config.Global.Hostname
|
2022-04-13 23:56:04 +08:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// add global labels
|
|
|
|
for k, v := range config.Config.Global.Labels {
|
|
|
|
if _, has := item.Labels[k]; !has {
|
|
|
|
item.Labels[k] = v
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2022-04-13 20:12:20 +08:00
|
|
|
pt := &prompb.TimeSeries{}
|
|
|
|
|
2022-04-14 18:34:27 +08:00
|
|
|
timestamp := item.Timestamp.UnixMilli()
|
|
|
|
if config.Config.Global.Precision == "s" {
|
|
|
|
timestamp = item.Timestamp.Unix()
|
2022-04-13 20:12:20 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
pt.Samples = append(pt.Samples, prompb.Sample{
|
2022-04-14 18:34:27 +08:00
|
|
|
Timestamp: timestamp,
|
2022-04-13 20:12:20 +08:00
|
|
|
Value: item.Value,
|
|
|
|
})
|
|
|
|
|
|
|
|
// add label: metric
|
|
|
|
pt.Labels = append(pt.Labels, &prompb.Label{
|
|
|
|
Name: model.MetricNameLabel,
|
|
|
|
Value: item.Metric,
|
|
|
|
})
|
|
|
|
|
|
|
|
// add other labels
|
|
|
|
for k, v := range item.Labels {
|
|
|
|
pt.Labels = append(pt.Labels, &prompb.Label{
|
|
|
|
Name: k,
|
|
|
|
Value: v,
|
|
|
|
})
|
|
|
|
}
|
|
|
|
|
|
|
|
return pt
|
|
|
|
}
|