add datadog receiver

This commit is contained in:
Ulric Qin 2021-12-28 11:00:48 +08:00
parent 86649d8314
commit d57415d23d
1 changed files with 187 additions and 8 deletions

View File

@ -3,25 +3,139 @@ package router
import (
"compress/gzip"
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"strings"
"time"
"github.com/didi/nightingale/v5/src/server/config"
"github.com/didi/nightingale/v5/src/server/idents"
"github.com/didi/nightingale/v5/src/server/memsto"
promstat "github.com/didi/nightingale/v5/src/server/stat"
"github.com/didi/nightingale/v5/src/server/writer"
"github.com/gin-gonic/gin"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/prompb"
)
type TimeSeries struct {
Series []*Metric `json:"series"`
Series []*DatadogMetric `json:"series"`
}
type Metric struct {
Metric string `json:"metric"`
Points []Point `json:"points"`
Host string `json:"host"`
Tags []string `json:"tags,omitempty"`
type DatadogMetric struct {
Metric string `json:"metric"`
Points []DatadogPoint `json:"points"`
Host string `json:"host"`
Tags []string `json:"tags,omitempty"`
}
type Point [2]float64
type DatadogPoint [2]float64
func (m *DatadogMetric) Clean() error {
if m.Metric == "" {
return fmt.Errorf("metric is blank")
}
return nil
}
func (m *DatadogMetric) ToProm() (*prompb.TimeSeries, string, error) {
pt := &prompb.TimeSeries{}
for _, point := range m.Points {
pt.Samples = append(pt.Samples, prompb.Sample{
// use ms
Timestamp: int64(point[0]) * 1000,
Value: point[1],
})
}
if strings.IndexByte(m.Metric, '.') != -1 {
m.Metric = strings.ReplaceAll(m.Metric, ".", "_")
}
if strings.IndexByte(m.Metric, '-') != -1 {
m.Metric = strings.ReplaceAll(m.Metric, "-", "_")
}
if !model.MetricNameRE.MatchString(m.Metric) {
return nil, "", fmt.Errorf("invalid metric name: %s", m.Metric)
}
pt.Labels = append(pt.Labels, &prompb.Label{
Name: model.MetricNameLabel,
Value: m.Metric,
})
identInTag := ""
hostInTag := ""
for i := 0; i < len(m.Tags); i++ {
arr := strings.SplitN(m.Tags[i], ":", 2)
if len(arr) != 2 {
continue
}
key := arr[0]
if key == "ident" {
// 如果tags中有ident那就用
identInTag = arr[1]
pt.Labels = append(pt.Labels, &prompb.Label{
Name: key,
Value: arr[1],
})
continue
}
if key == "host" {
hostInTag = arr[1]
continue
}
if strings.IndexByte(key, '.') != -1 {
key = strings.ReplaceAll(key, ".", "_")
}
if strings.IndexByte(key, '-') != -1 {
key = strings.ReplaceAll(key, "-", "_")
}
if !model.LabelNameRE.MatchString(key) {
return nil, "", fmt.Errorf("invalid tag name: %s", key)
}
pt.Labels = append(pt.Labels, &prompb.Label{
Name: key,
Value: arr[1],
})
}
if m.Host != "" {
// 以外层为准外层host字段覆盖标签中的host
hostInTag = m.Host
}
if hostInTag != "" {
if identInTag != "" {
pt.Labels = append(pt.Labels, &prompb.Label{
Name: "host",
Value: hostInTag,
})
} else {
pt.Labels = append(pt.Labels, &prompb.Label{
Name: "ident",
Value: hostInTag,
})
}
}
ident := hostInTag
if identInTag != "" {
ident = identInTag
}
return pt, ident, nil
}
func datadogSeries(c *gin.Context) {
apiKey, has := c.GetQuery("api_key")
@ -73,6 +187,71 @@ func datadogSeries(c *gin.Context) {
return
}
// TODO clean and convert
cnt := len(series.Series)
if cnt == 0 {
c.String(400, "series empty")
return
}
var (
succ int
fail int
msg = "data pushed to queue"
list []interface{}
ts = time.Now().Unix()
ids = make(map[string]interface{})
)
for i := 0; i < cnt; i++ {
item := series.Series[i]
if item == nil {
continue
}
if err = item.Clean(); err != nil {
fail++
continue
}
pt, ident, err := item.ToProm()
if err != nil {
fail++
continue
}
if ident != "" {
// register host
ids[ident] = ts
// fill tags
target, has := memsto.TargetCache.Get(ident)
if has {
for key, value := range target.TagsMap {
pt.Labels = append(pt.Labels, &prompb.Label{
Name: key,
Value: value,
})
}
}
}
list = append(list, pt)
succ++
}
if len(list) > 0 {
promstat.CounterSampleTotal.WithLabelValues(config.C.ClusterName, "datadog").Add(float64(len(list)))
if !writer.Writers.PushQueue(list) {
msg = "writer queue full"
}
idents.Idents.MSet(ids)
}
c.JSON(200, gin.H{
"succ": succ,
"fail": fail,
"msg": msg,
})
}