Query batch feature (#1052)

* batch query prom for single panel

* make code better:

1.extract server/api.go

2.make webapi reading prom with reusing server's API,not a new prom client

* clear code

* clear code

* format code
clear code

* move reader.go,reuse webapi/prom/prom.go clusterTypes clients cache

* clear code,extract common method
This commit is contained in:
SunnyBoy-WYH 2022-07-17 12:52:33 +08:00 committed by GitHub
parent b7ff82d722
commit 05651ad744
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 139 additions and 39 deletions

View File

@ -13,7 +13,7 @@
// Package v1 provides bindings to the Prometheus HTTP API v1:
// http://prometheus.io/docs/querying/api/
package reader
package prom
import (
"context"
@ -558,10 +558,11 @@ func (qr *queryResult) UnmarshalJSON(b []byte) error {
// NewAPI returns a new API for the client.
//
// It is safe to use the returned API from multiple goroutines.
func NewAPI(c api.Client) API {
func NewAPI(c api.Client, opt ClientOptions) API {
return &httpAPI{
client: &apiClientImpl{
client: c,
opt: opt,
},
}
}
@ -891,6 +892,7 @@ type apiClient interface {
type apiClientImpl struct {
client api.Client
opt ClientOptions
}
type apiResponse struct {
@ -921,16 +923,16 @@ func (h *apiClientImpl) URL(ep string, args map[string]string) *url.URL {
}
func (h *apiClientImpl) Do(ctx context.Context, req *http.Request) (*http.Response, []byte, Warnings, error) {
if Reader.Opts.BasicAuthUser != "" && Reader.Opts.BasicAuthPass != "" {
req.SetBasicAuth(Reader.Opts.BasicAuthUser, Reader.Opts.BasicAuthPass)
if h.opt.BasicAuthUser != "" && h.opt.BasicAuthPass != "" {
req.SetBasicAuth(h.opt.BasicAuthUser, h.opt.BasicAuthPass)
}
headerCount := len(Reader.Opts.Headers)
headerCount := len(h.opt.Headers)
if headerCount > 0 && headerCount%2 == 0 {
for i := 0; i < len(Reader.Opts.Headers); i += 2 {
req.Header.Add(Reader.Opts.Headers[i], Reader.Opts.Headers[i+1])
if Reader.Opts.Headers[i] == "Host" {
req.Host = Reader.Opts.Headers[i+1]
for i := 0; i < len(h.opt.Headers); i += 2 {
req.Header.Add(h.opt.Headers[i], h.opt.Headers[i+1])
if h.opt.Headers[i] == "Host" {
req.Host = h.opt.Headers[i+1]
}
}
}

View File

@ -0,0 +1,9 @@
package prom
type ClientOptions struct {
BasicAuthUser string
BasicAuthPass string
Headers []string
}

View File

@ -15,6 +15,7 @@ import (
"github.com/toolkits/pkg/str"
"github.com/didi/nightingale/v5/src/models"
"github.com/didi/nightingale/v5/src/pkg/prom"
"github.com/didi/nightingale/v5/src/server/common/conv"
"github.com/didi/nightingale/v5/src/server/config"
"github.com/didi/nightingale/v5/src/server/memsto"
@ -111,7 +112,7 @@ func (r RuleEval) Work() {
var value model.Value
var err error
if r.rule.Algorithm == "" {
var warnings reader.Warnings
var warnings prom.Warnings
value, warnings, err = reader.Reader.Client.Query(context.Background(), promql, time.Now())
if err != nil {
logger.Errorf("rule_eval:%d promql:%s, error:%v", r.RuleID(), promql, err)

View File

@ -5,13 +5,14 @@ import (
"net/http"
"time"
"github.com/didi/nightingale/v5/src/pkg/prom"
"github.com/didi/nightingale/v5/src/server/config"
"github.com/prometheus/client_golang/api"
)
type ReaderType struct {
Opts config.ReaderOptions
Client API
Client prom.API
}
var Reader ReaderType
@ -41,8 +42,12 @@ func Init(opts config.ReaderOptions) error {
}
Reader = ReaderType{
Opts: opts,
Client: NewAPI(cli),
Opts: opts,
Client: prom.NewAPI(cli, prom.ClientOptions{
BasicAuthUser: opts.BasicAuthUser,
BasicAuthPass: opts.BasicAuthPass,
Headers: opts.Headers,
}),
}
return nil

View File

@ -11,14 +11,17 @@ import (
"sync"
"time"
"github.com/didi/nightingale/v5/src/pkg/prom"
"github.com/didi/nightingale/v5/src/webapi/config"
"github.com/prometheus/client_golang/api"
"github.com/toolkits/pkg/logger"
"github.com/toolkits/pkg/net/httplib"
)
type ClusterType struct {
Opts config.ClusterOptions
Transport *http.Transport
Opts config.ClusterOptions
Transport *http.Transport
PromClient prom.API
}
type ClustersType struct {
@ -61,18 +64,7 @@ func initClustersFromConfig() error {
opts := config.C.Clusters
for i := 0; i < len(opts); i++ {
cluster := &ClusterType{
Opts: opts[i],
Transport: &http.Transport{
// TLSClientConfig: tlsConfig,
Proxy: http.ProxyFromEnvironment,
DialContext: (&net.Dialer{
Timeout: time.Duration(opts[i].DialTimeout) * time.Millisecond,
}).DialContext,
ResponseHeaderTimeout: time.Duration(opts[i].Timeout) * time.Millisecond,
MaxIdleConnsPerHost: opts[i].MaxIdleConnsPerHost,
},
}
cluster := newClusterTypeByOption(opts[i])
Clusters.Put(opts[i].Name, cluster)
}
@ -173,21 +165,42 @@ func loadClustersFromAPI() {
MaxIdleConnsPerHost: 32,
}
cluster := &ClusterType{
Opts: opt,
Transport: &http.Transport{
// TLSClientConfig: tlsConfig,
Proxy: http.ProxyFromEnvironment,
DialContext: (&net.Dialer{
Timeout: time.Duration(opt.DialTimeout) * time.Millisecond,
}).DialContext,
ResponseHeaderTimeout: time.Duration(opt.Timeout) * time.Millisecond,
MaxIdleConnsPerHost: opt.MaxIdleConnsPerHost,
},
}
cluster := newClusterTypeByOption(opt)
Clusters.Put(item.Name, cluster)
continue
}
}
}
func newClusterTypeByOption(opt config.ClusterOptions) *ClusterType {
transport := &http.Transport{
// TLSClientConfig: tlsConfig,
Proxy: http.ProxyFromEnvironment,
DialContext: (&net.Dialer{
Timeout: time.Duration(opt.DialTimeout) * time.Millisecond,
}).DialContext,
ResponseHeaderTimeout: time.Duration(opt.Timeout) * time.Millisecond,
MaxIdleConnsPerHost: opt.MaxIdleConnsPerHost,
}
cli, err := api.NewClient(api.Config{
Address: opt.Prom,
RoundTripper: transport,
})
if err != nil {
logger.Errorf("new client fail: %v", err)
}
cluster := &ClusterType{
Opts: opt,
Transport: transport,
PromClient: prom.NewAPI(cli, prom.ClientOptions{
BasicAuthUser: opt.BasicAuthUser,
BasicAuthPass: opt.BasicAuthPass,
Headers: opt.Headers,
}),
}
return cluster
}

View File

@ -98,10 +98,15 @@ func configRoute(r *gin.Engine, version string) {
pages := r.Group(pagesPrefix)
{
if config.C.AnonymousAccess.PromQuerier {
pages.Any("/prometheus/*url", prometheusProxy)
pages.POST("/query-range-batch", promBatchQueryRange)
} else {
pages.Any("/prometheus/*url", auth(), prometheusProxy)
pages.POST("/query-range-batch", auth(), promBatchQueryRange)
}
pages.GET("/version", func(c *gin.Context) {

View File

@ -1,18 +1,59 @@
package router
import (
"context"
"errors"
"net/http"
"net/http/httputil"
"net/url"
"strings"
"time"
"github.com/gin-gonic/gin"
"github.com/toolkits/pkg/ginx"
. "github.com/didi/nightingale/v5/src/pkg/prom"
"github.com/didi/nightingale/v5/src/webapi/config"
"github.com/didi/nightingale/v5/src/webapi/prom"
"github.com/prometheus/common/model"
)
type queryFormItem struct {
Start int64 `json:"start" binding:"required"`
End int64 `json:"end" binding:"required"`
Step int64 `json:"step" binding:"required"`
Query string `json:"query" binding:"required"`
}
type batchQueryForm struct {
Queries []queryFormItem `json:"queries" binding:"required"`
}
type batchQueryRes struct {
Data []model.Value `json:"data"`
}
func promBatchQueryRange(c *gin.Context) {
xcluster := c.GetHeader("X-Cluster")
if xcluster == "" {
c.String(500, "X-Cluster is blank")
return
}
var f batchQueryForm
err := c.BindJSON(&f)
if err != nil {
c.String(500, "%s", err.Error())
return
}
res, err := batchQueryRange(xcluster, f.Queries)
ginx.NewRender(c).Data(res, err)
}
func prometheusProxy(c *gin.Context) {
xcluster := c.GetHeader("X-Cluster")
if xcluster == "" {
@ -93,3 +134,27 @@ func clustersGets(c *gin.Context) {
}
ginx.NewRender(c).Data(names, nil)
}
func batchQueryRange(clusterName string, data []queryFormItem) (batchQueryRes, error) {
var res batchQueryRes
clusterType, exist := prom.Clusters.Get(clusterName)
if !exist {
return batchQueryRes{}, errors.New("cluster client not exist")
}
for _, item := range data {
r := Range{
Start: time.Unix(item.Start, 0),
End: time.Unix(item.End, 0),
Step: time.Duration(item.Step) * time.Second,
}
resp, _, err := clusterType.PromClient.QueryRange(context.Background(), item.Query, r)
if err != nil {
return res, err
}
res.Data = append(res.Data, resp)
}
return res, nil
}