validate ui query, add aggrFun support for resample (#392)

This commit is contained in:
yubo 2020-11-12 11:50:40 +08:00 committed by GitHub
parent ee873a4ae2
commit 033383eea4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 162 additions and 56 deletions

View File

@ -4,6 +4,9 @@ backend:
enabled: false
name: "m3db"
namespace: "test"
seriesLimit: 0
docsLimit: 0
daysLimit: 7 # max query time
# https://m3db.github.io/m3/m3db/architecture/consistencylevels/
writeConsistencyLevel: "majority" # one|majority|all
readConsistencyLevel: "unstrict_majority" # one|unstrict_majority|majority|all

View File

@ -21,8 +21,8 @@ type QueryDataForUI struct {
Step int `json:"step"`
DsType string `json:"dstype"`
GroupKey []string `json:"groupKey"` //聚合维度
AggrFunc string `json:"aggrFunc"` //聚合计算
ConsolFunc string `json:"consolFunc"`
AggrFunc string `json:"aggrFunc" description:"sum,avg,max,min"` //聚合计算
ConsolFunc string `json:"consolFunc" description:"AVERAGE,MIN,MAX,LAST"`
Comparisons []int64 `json:"comparisons"` //环比多少时间
}

View File

@ -81,7 +81,7 @@ func Init(cfg BackendSection) {
// init m3db
if cfg.M3db.Enabled {
var err error
m3dbDataSource, err = m3db.NewClient(cfg.M3db.Namespace, &cfg.M3db.Config)
m3dbDataSource, err = m3db.NewClient(cfg.M3db)
if err != nil {
log.Fatalf("unable to new m3db client: %v", err)
}

View File

@ -116,9 +116,9 @@ func aggregateResp(data []*dataobj.TsdbQueryResponse, opts dataobj.QueryDataForU
return data
}
// Adjust the data
// resample the data
for _, v := range data {
v.Values = resample(v.Values, opts.Start, opts.End, int64(opts.Step))
v.Values = resample(v.Values, opts.Start, opts.End, int64(opts.Step), opts.AggrFunc)
}
// 没有聚合 tag, 或者曲线没有其他 tags, 直接所有曲线进行计算
@ -179,11 +179,7 @@ func aggregateResp(data []*dataobj.TsdbQueryResponse, opts dataobj.QueryDataForU
return aggrDatas
}
var (
nanFloat = dataobj.JsonFloat(math.NaN())
)
func resample(data []*dataobj.RRDData, start, end, step int64) []*dataobj.RRDData {
func resample(data []*dataobj.RRDData, start, end, step int64, aggrFunc string) []*dataobj.RRDData {
l := int((end - start) / step)
if l <= 0 {
return []*dataobj.RRDData{}
@ -198,31 +194,93 @@ func resample(data []*dataobj.RRDData, start, end, step int64) []*dataobj.RRDDat
j := 0
for ; ts < end; ts += step {
get := func() *dataobj.RRDData {
get := func() (ret []dataobj.JsonFloat) {
if j == len(data) {
return nil
return
}
for {
if j == len(data) {
return &dataobj.RRDData{Timestamp: ts, Value: nanFloat}
return
}
if d := data[j]; d.Timestamp < ts {
ret = append(ret, d.Value)
j++
continue
} else if d.Timestamp >= ts+step {
return &dataobj.RRDData{Timestamp: ts, Value: nanFloat}
return
} else {
ret = append(ret, d.Value)
j++
return d
return
}
}
}
ret = append(ret, get())
ret = append(ret, &dataobj.RRDData{
Timestamp: ts,
Value: aggrData(aggrFunc, get()),
})
}
return ret
}
func aggrData(fn string, data []dataobj.JsonFloat) dataobj.JsonFloat {
if len(data) == 0 {
return dataobj.JsonFloat(math.NaN())
}
switch fn {
case "sum":
return sum(data)
case "avg":
return avg(data)
case "max":
return max(data)
case "min":
return min(data)
// case "last":
default:
return last(data)
}
}
func sum(data []dataobj.JsonFloat) (ret dataobj.JsonFloat) {
for _, v := range data {
ret += v
}
return ret
}
func avg(data []dataobj.JsonFloat) (ret dataobj.JsonFloat) {
for _, v := range data {
ret += v
}
return ret / dataobj.JsonFloat(len(data))
}
func max(data []dataobj.JsonFloat) (ret dataobj.JsonFloat) {
ret = data[0]
for i := 1; i < len(data); i++ {
if data[i] > ret {
ret = data[i]
}
}
return ret
}
func min(data []dataobj.JsonFloat) (ret dataobj.JsonFloat) {
ret = data[0]
for i := 1; i < len(data); i++ {
if data[i] < ret {
ret = data[i]
}
}
return ret
}
func last(data []dataobj.JsonFloat) (ret dataobj.JsonFloat) {
return data[len(data)-1]
}
func getTags(counter string) (tags string) {
idx := strings.IndexAny(counter, "/")
if idx == -1 {

View File

@ -31,6 +31,10 @@ type M3dbSection struct {
Name string `yaml:"name"`
Enabled bool `yaml:"enabled"`
Namespace string `yaml:"namespace"`
DaysLimit int `yaml:"daysLimit"`
SeriesLimit int `yaml:"seriesLimit"`
DocsLimit int `yaml:"docsLimit"`
MinStep int `yaml:"minStep"`
Config client.Configuration `yaml:",inline"`
}
@ -42,22 +46,26 @@ type Client struct {
opts client.Options
namespace string
config *client.Configuration
config *M3dbSection
namespaceID ident.ID
}
func NewClient(namespace string, cfg *client.Configuration) (*Client, error) {
client, err := cfg.NewClient(client.ConfigurationParameters{})
func NewClient(cfg M3dbSection) (*Client, error) {
client, err := cfg.Config.NewClient(client.ConfigurationParameters{})
if err != nil {
return nil, fmt.Errorf("unable to get new M3DB client: %v", err)
}
if cfg.MinStep == 0 {
cfg.MinStep = 1
}
ret := &Client{
namespace: namespace,
config: cfg,
namespace: cfg.Namespace,
config: &cfg,
client: client,
namespaceID: ident.StringID(namespace),
namespaceID: ident.StringID(cfg.Namespace),
}
if _, err := ret.session(); err != nil {
@ -107,7 +115,7 @@ func (p *Client) QueryData(inputs []dataobj.QueryData) []*dataobj.TsdbQueryRespo
return nil
}
query, opts := queryDataOptions(inputs)
query, opts := p.config.queryDataOptions(inputs)
ret, err := fetchTagged(session, p.namespaceID, query, opts)
if err != nil {
logger.Errorf("unable to query data: ", err)
@ -118,17 +126,21 @@ func (p *Client) QueryData(inputs []dataobj.QueryData) []*dataobj.TsdbQueryRespo
}
// QueryDataForUi: && (metric) (|| endpoints...) (&& tags...)
// get kv
func (p *Client) QueryDataForUI(input dataobj.QueryDataForUI) []*dataobj.TsdbQueryResponse {
logger.Debugf("query data for ui, input: %+v", input)
if err := p.config.validateQueryDataForUI(&input); err != nil {
logger.Errorf("input is invalid %s", err)
return nil
}
session, err := p.session()
if err != nil {
logger.Errorf("unable to get m3db session: %s", err)
return nil
}
query, opts := queryDataUIOptions(input)
query, opts := p.config.queryDataUIOptions(input)
ret, err := fetchTagged(session, p.namespaceID, query, opts)
if err != nil {
@ -136,10 +148,6 @@ func (p *Client) QueryDataForUI(input dataobj.QueryDataForUI) []*dataobj.TsdbQue
return nil
}
if input.Step == 0 {
input.Step = 60
}
return aggregateResp(ret, input)
}
@ -152,7 +160,7 @@ func (p *Client) QueryMetrics(input dataobj.EndpointsRecv) *dataobj.MetricResp {
return nil
}
query, opts := queryMetricsOptions(input)
query, opts := p.config.queryMetricsOptions(input)
tags, err := completeTags(session, p.namespaceID, query, opts)
if err != nil {
@ -171,7 +179,7 @@ func (p *Client) QueryTagPairs(input dataobj.EndpointMetricRecv) []dataobj.Index
return nil
}
query, opts := queryTagPairsOptions(input)
query, opts := p.config.queryTagPairsOptions(input)
tags, err := completeTags(session, p.namespaceID, query, opts)
if err != nil {
@ -199,7 +207,7 @@ func (p *Client) QueryIndexByClude(inputs []dataobj.CludeRecv) (ret []dataobj.Xc
}
func (p *Client) queryIndexByClude(session client.Session, input dataobj.CludeRecv) []dataobj.XcludeResp {
query, opts := queryIndexByCludeOptions(input)
query, opts := p.config.queryIndexByCludeOptions(input)
iter, _, err := session.FetchTaggedIDs(p.namespaceID, query, opts)
if err != nil {
@ -263,7 +271,7 @@ func (p *Client) queryIndexByFullTags(session client.Session, input dataobj.Inde
DsType: "GAUGE",
}
query, opts := queryIndexByFullTagsOptions(input)
query, opts := p.config.queryIndexByFullTagsOptions(input)
if query.Query.Equal(idx.NewAllQuery()) {
ret.Endpoints = input.Endpoints
log.Printf("all query")
@ -446,3 +454,40 @@ func seriesIterWalk(iter encoding.SeriesIterator) (out *dataobj.TsdbQueryRespons
Values: values,
}, nil
}
func (cfg M3dbSection) validateQueryDataForUI(in *dataobj.QueryDataForUI) (err error) {
if in.AggrFunc != "" &&
in.AggrFunc != "sum" &&
in.AggrFunc != "avg" &&
in.AggrFunc != "max" &&
in.AggrFunc != "min" {
return fmt.Errorf("%s is invalid aggrfunc", in.AggrFunc)
}
if err := cfg.validateTime(in.Start, in.End, &in.Step); err != nil {
return err
}
return nil
}
func (cfg M3dbSection) validateTime(start, end int64, step *int) error {
if end <= start {
return fmt.Errorf("query time range is invalid end %d <= start %d", end, start)
}
if cfg.DaysLimit > 0 {
if days := int((end - start) / 86400); days > cfg.DaysLimit {
return fmt.Errorf("query time reange in invalid, daysLimit(%d/%d)", days, cfg.DaysLimit)
}
}
if *step == 0 {
*step = int((end - start) / 720)
}
if *step > cfg.MinStep {
*step = cfg.MinStep
}
return nil
}

View File

@ -10,7 +10,7 @@ import (
)
// QueryData
func queryDataOptions(inputs []dataobj.QueryData) (index.Query, index.QueryOptions) {
func (cfg M3dbSection) queryDataOptions(inputs []dataobj.QueryData) (index.Query, index.QueryOptions) {
q := []idx.Query{}
for _, input := range inputs {
@ -23,14 +23,14 @@ func queryDataOptions(inputs []dataobj.QueryData) (index.Query, index.QueryOptio
index.QueryOptions{
StartInclusive: time.Unix(inputs[0].Start, 0),
EndExclusive: time.Unix(inputs[0].End, 0),
DocsLimit: DOCS_LIMIT,
SeriesLimit: SERIES_LIMIT,
SeriesLimit: cfg.SeriesLimit,
DocsLimit: cfg.DocsLimit,
}
}
// QueryDataForUI
// metric && (endpoints[0] || endporint[1] ...) && (tags[0] || tags[1] ...)
func queryDataUIOptions(input dataobj.QueryDataForUI) (index.Query, index.QueryOptions) {
func (cfg M3dbSection) queryDataUIOptions(input dataobj.QueryDataForUI) (index.Query, index.QueryOptions) {
q1 := idx.NewTermQuery([]byte(METRIC_NAME), []byte(input.Metric))
q2 := endpointsQuery(input.Nids, input.Endpoints)
q3 := metricTagsQuery(input.Tags)
@ -39,8 +39,8 @@ func queryDataUIOptions(input dataobj.QueryDataForUI) (index.Query, index.QueryO
index.QueryOptions{
StartInclusive: time.Unix(input.Start, 0),
EndExclusive: time.Unix(input.End, 0),
SeriesLimit: SERIES_LIMIT,
DocsLimit: DOCS_LIMIT,
SeriesLimit: cfg.SeriesLimit,
DocsLimit: cfg.DocsLimit,
}
}
@ -126,7 +126,7 @@ func metricTagsQuery(tags []string) idx.Query {
// QueryMetrics
// (endpoint[0] || endpoint[1] ... )
func queryMetricsOptions(input dataobj.EndpointsRecv) (index.Query, index.AggregationOptions) {
func (cfg M3dbSection) queryMetricsOptions(input dataobj.EndpointsRecv) (index.Query, index.AggregationOptions) {
nameByte := []byte(METRIC_NAME)
return index.Query{idx.NewConjunctionQuery(
endpointsQuery(nil, input.Endpoints),
@ -136,8 +136,8 @@ func queryMetricsOptions(input dataobj.EndpointsRecv) (index.Query, index.Aggreg
QueryOptions: index.QueryOptions{
StartInclusive: time.Time{},
EndExclusive: time.Now(),
SeriesLimit: SERIES_LIMIT,
DocsLimit: DOCS_LIMIT,
SeriesLimit: cfg.SeriesLimit,
DocsLimit: cfg.DocsLimit,
},
FieldFilter: [][]byte{nameByte},
Type: index.AggregateTagNamesAndValues,
@ -146,7 +146,7 @@ func queryMetricsOptions(input dataobj.EndpointsRecv) (index.Query, index.Aggreg
// QueryTagPairs
// (endpoint[0] || endpoint[1]...) && (metrics[0] || metrics[1] ... )
func queryTagPairsOptions(input dataobj.EndpointMetricRecv) (index.Query, index.AggregationOptions) {
func (cfg M3dbSection) queryTagPairsOptions(input dataobj.EndpointMetricRecv) (index.Query, index.AggregationOptions) {
q1 := endpointsQuery(nil, input.Endpoints)
q2 := metricsQuery(input.Metrics)
@ -155,8 +155,8 @@ func queryTagPairsOptions(input dataobj.EndpointMetricRecv) (index.Query, index.
QueryOptions: index.QueryOptions{
StartInclusive: time.Time{},
EndExclusive: time.Now(),
SeriesLimit: SERIES_LIMIT,
DocsLimit: DOCS_LIMIT,
SeriesLimit: cfg.SeriesLimit,
DocsLimit: cfg.DocsLimit,
},
FieldFilter: index.AggregateFieldFilter(nil),
Type: index.AggregateTagNamesAndValues,
@ -164,7 +164,7 @@ func queryTagPairsOptions(input dataobj.EndpointMetricRecv) (index.Query, index.
}
// QueryIndexByClude: || (&& (|| endpoints...) (metric) (|| include...) (&& exclude..))
func queryIndexByCludeOptions(input dataobj.CludeRecv) (index.Query, index.QueryOptions) {
func (cfg M3dbSection) queryIndexByCludeOptions(input dataobj.CludeRecv) (index.Query, index.QueryOptions) {
query := index.Query{}
q := []idx.Query{}
@ -190,14 +190,14 @@ func queryIndexByCludeOptions(input dataobj.CludeRecv) (index.Query, index.Query
return query, index.QueryOptions{
StartInclusive: time.Time{},
EndExclusive: time.Now(),
SeriesLimit: SERIES_LIMIT,
DocsLimit: DOCS_LIMIT,
SeriesLimit: cfg.SeriesLimit,
DocsLimit: cfg.DocsLimit,
}
}
// QueryIndexByFullTags: && (|| endpoints) (metric) (&& tagkv)
func queryIndexByFullTagsOptions(input dataobj.IndexByFullTagsRecv) (index.Query, index.QueryOptions) {
func (cfg M3dbSection) queryIndexByFullTagsOptions(input dataobj.IndexByFullTagsRecv) (index.Query, index.QueryOptions) {
query := index.Query{}
q := []idx.Query{}
@ -220,8 +220,8 @@ func queryIndexByFullTagsOptions(input dataobj.IndexByFullTagsRecv) (index.Query
return query, index.QueryOptions{
StartInclusive: time.Time{},
EndExclusive: time.Now(),
SeriesLimit: SERIES_LIMIT,
DocsLimit: DOCS_LIMIT,
SeriesLimit: cfg.SeriesLimit,
DocsLimit: cfg.DocsLimit,
}
}