add remote write api support

This commit is contained in:
Ulric Qin 2021-12-16 16:59:51 +08:00
parent a71edc4040
commit 0f65a1f5dd
3 changed files with 96 additions and 5 deletions

1
go.mod
View File

@ -10,6 +10,7 @@ require (
github.com/gin-gonic/gin v1.7.4
github.com/go-ldap/ldap/v3 v3.4.1
github.com/go-redis/redis/v8 v8.11.3
github.com/gogo/protobuf v1.1.1
github.com/golang-jwt/jwt v3.2.2+incompatible
github.com/golang/protobuf v1.5.2
github.com/golang/snappy v0.0.4

View File

@ -71,6 +71,8 @@ func configRoute(r *gin.Engine, version string) {
}
r.POST("/opentsdb/put", handleOpenTSDB)
r.POST("/prometheus/v1/write", remoteWrite)
r.POST("/prometheus/v1/query", queryPromql)
r.GET("/memory/alert-rule", alertRuleGet)
r.GET("/memory/idents", identsGets)
@ -80,7 +82,5 @@ func configRoute(r *gin.Engine, version string) {
r.GET("/memory/user", userGet)
r.GET("/memory/user-group", userGroupGet)
r.POST("/prom/vectors", vectorsPost)
r.GET("/metrics", gin.WrapH(promhttp.Handler()))
}

View File

@ -1,21 +1,32 @@
package router
import (
"io"
"io/ioutil"
"net/http"
"time"
"github.com/gin-gonic/gin"
"github.com/gogo/protobuf/proto"
"github.com/golang/snappy"
"github.com/prometheus/prometheus/prompb"
"github.com/toolkits/pkg/ginx"
"github.com/didi/nightingale/v5/src/server/config"
"github.com/didi/nightingale/v5/src/server/engine"
"github.com/didi/nightingale/v5/src/server/idents"
"github.com/didi/nightingale/v5/src/server/memsto"
"github.com/didi/nightingale/v5/src/server/reader"
promstat "github.com/didi/nightingale/v5/src/server/stat"
"github.com/didi/nightingale/v5/src/server/writer"
)
type vectorsForm struct {
type promqlForm struct {
PromQL string `json:"promql"`
}
func vectorsPost(c *gin.Context) {
var f vectorsForm
func queryPromql(c *gin.Context) {
var f promqlForm
ginx.BindJSON(c, &f)
value, warnings, err := reader.Reader.Client.Query(c.Request.Context(), f.PromQL, time.Now())
@ -31,3 +42,82 @@ func vectorsPost(c *gin.Context) {
c.JSON(200, engine.ConvertVectors(value))
}
func remoteWrite(c *gin.Context) {
req, err := DecodeWriteRequest(c.Request.Body)
if err != nil {
c.String(http.StatusBadRequest, err.Error())
return
}
count := len(req.Timeseries)
if count == 0 {
c.String(200, "")
return
}
var (
now = time.Now().Unix()
ids = make(map[string]interface{})
lst = make([]interface{}, 0, count)
)
for i := 0; i < count; i++ {
labels := req.Timeseries[i].GetLabels()
ident := ""
for _, label := range labels {
if label.GetName() == "ident" {
ident = label.GetValue()
}
}
if len(ident) > 0 {
// register host
ids[ident] = now
// fill tags
target, has := memsto.TargetCache.Get(ident)
if has {
for key, value := range target.TagsMap {
req.Timeseries[i].Labels = append(req.Timeseries[i].Labels, &prompb.Label{
Name: key,
Value: value,
})
}
}
}
lst[i] = req.Timeseries[i]
}
promstat.CounterSampleTotal.WithLabelValues(config.C.ClusterName, "prometheus").Add(float64(count))
idents.Idents.MSet(ids)
if writer.Writers.PushQueue(lst) {
c.String(200, "")
} else {
c.String(http.StatusInternalServerError, "wirter queue full")
}
}
// DecodeWriteRequest from an io.Reader into a prompb.WriteRequest, handling
// snappy decompression.
func DecodeWriteRequest(r io.Reader) (*prompb.WriteRequest, error) {
compressed, err := ioutil.ReadAll(r)
if err != nil {
return nil, err
}
reqBuf, err := snappy.Decode(nil, compressed)
if err != nil {
return nil, err
}
var req prompb.WriteRequest
if err := proto.Unmarshal(reqBuf, &req); err != nil {
return nil, err
}
return &req, nil
}