diff --git a/go.mod b/go.mod index 08f56fde..8be61aac 100644 --- a/go.mod +++ b/go.mod @@ -10,6 +10,7 @@ require ( github.com/gin-gonic/gin v1.7.4 github.com/go-ldap/ldap/v3 v3.4.1 github.com/go-redis/redis/v8 v8.11.3 + github.com/gogo/protobuf v1.1.1 github.com/golang-jwt/jwt v3.2.2+incompatible github.com/golang/protobuf v1.5.2 github.com/golang/snappy v0.0.4 diff --git a/src/server/router/router.go b/src/server/router/router.go index 8eed8e95..578f8288 100644 --- a/src/server/router/router.go +++ b/src/server/router/router.go @@ -71,6 +71,8 @@ func configRoute(r *gin.Engine, version string) { } r.POST("/opentsdb/put", handleOpenTSDB) + r.POST("/prometheus/v1/write", remoteWrite) + r.POST("/prometheus/v1/query", queryPromql) r.GET("/memory/alert-rule", alertRuleGet) r.GET("/memory/idents", identsGets) @@ -80,7 +82,5 @@ func configRoute(r *gin.Engine, version string) { r.GET("/memory/user", userGet) r.GET("/memory/user-group", userGroupGet) - r.POST("/prom/vectors", vectorsPost) - r.GET("/metrics", gin.WrapH(promhttp.Handler())) } diff --git a/src/server/router/router_prom.go b/src/server/router/router_prom.go index afe78f41..b51879cd 100644 --- a/src/server/router/router_prom.go +++ b/src/server/router/router_prom.go @@ -1,21 +1,32 @@ package router import ( + "io" + "io/ioutil" + "net/http" "time" "github.com/gin-gonic/gin" + "github.com/gogo/protobuf/proto" + "github.com/golang/snappy" + "github.com/prometheus/prometheus/prompb" "github.com/toolkits/pkg/ginx" + "github.com/didi/nightingale/v5/src/server/config" "github.com/didi/nightingale/v5/src/server/engine" + "github.com/didi/nightingale/v5/src/server/idents" + "github.com/didi/nightingale/v5/src/server/memsto" "github.com/didi/nightingale/v5/src/server/reader" + promstat "github.com/didi/nightingale/v5/src/server/stat" + "github.com/didi/nightingale/v5/src/server/writer" ) -type vectorsForm struct { +type promqlForm struct { PromQL string `json:"promql"` } -func vectorsPost(c *gin.Context) { - var f vectorsForm +func queryPromql(c *gin.Context) { + var f promqlForm ginx.BindJSON(c, &f) value, warnings, err := reader.Reader.Client.Query(c.Request.Context(), f.PromQL, time.Now()) @@ -31,3 +42,82 @@ func vectorsPost(c *gin.Context) { c.JSON(200, engine.ConvertVectors(value)) } + +func remoteWrite(c *gin.Context) { + req, err := DecodeWriteRequest(c.Request.Body) + if err != nil { + c.String(http.StatusBadRequest, err.Error()) + return + } + + count := len(req.Timeseries) + + if count == 0 { + c.String(200, "") + return + } + + var ( + now = time.Now().Unix() + ids = make(map[string]interface{}) + lst = make([]interface{}, 0, count) + ) + + for i := 0; i < count; i++ { + labels := req.Timeseries[i].GetLabels() + + ident := "" + for _, label := range labels { + if label.GetName() == "ident" { + ident = label.GetValue() + } + } + + if len(ident) > 0 { + // register host + ids[ident] = now + + // fill tags + target, has := memsto.TargetCache.Get(ident) + if has { + for key, value := range target.TagsMap { + req.Timeseries[i].Labels = append(req.Timeseries[i].Labels, &prompb.Label{ + Name: key, + Value: value, + }) + } + } + } + + lst[i] = req.Timeseries[i] + } + + promstat.CounterSampleTotal.WithLabelValues(config.C.ClusterName, "prometheus").Add(float64(count)) + idents.Idents.MSet(ids) + if writer.Writers.PushQueue(lst) { + c.String(200, "") + } else { + c.String(http.StatusInternalServerError, "wirter queue full") + } +} + +// DecodeWriteRequest from an io.Reader into a prompb.WriteRequest, handling +// snappy decompression. +func DecodeWriteRequest(r io.Reader) (*prompb.WriteRequest, error) { + compressed, err := ioutil.ReadAll(r) + if err != nil { + return nil, err + } + + reqBuf, err := snappy.Decode(nil, compressed) + if err != nil { + return nil, err + } + + var req prompb.WriteRequest + if err := proto.Unmarshal(reqBuf, &req); err != nil { + return nil, err + } + + return &req, nil +}