From 1ff6d0a2dc9ef94a460290e42b5944f06bd1fa79 Mon Sep 17 00:00:00 2001 From: yubo Date: Tue, 30 Mar 2021 18:10:14 +0800 Subject: [PATCH] feature: add [start,end) param for clude, endpointMetric, endpoints api (#639) --- src/common/dataobj/query_item.go | 88 +++++++++++++++------- src/modules/transfer/backend/m3db/m3db.go | 18 ++++- src/modules/transfer/backend/m3db/query.go | 12 +-- 3 files changed, 80 insertions(+), 38 deletions(-) diff --git a/src/common/dataobj/query_item.go b/src/common/dataobj/query_item.go index dc08a58f..52832fa3 100644 --- a/src/common/dataobj/query_item.go +++ b/src/common/dataobj/query_item.go @@ -58,8 +58,17 @@ func (resp *TsdbQueryResponse) Key() string { } type EndpointsRecv struct { - Endpoints []string `json:"endpoints"` - Nids []string `json:"nids"` + Endpoints []string `json:"endpoints"` + Nids []string `json:"nids"` + Start int64 `json:"start" description:"inclusive"` + End int64 `json:"end" description:"exclusive"` + StartInclusive time.Time `json:"-"` + EndExclusive time.Time `json:"-"` +} + +func (p *EndpointsRecv) Validate() (err error) { + p.StartInclusive, p.EndExclusive, err = timeRangeValidate(p.Start, p.End) + return } type MetricResp struct { @@ -67,9 +76,18 @@ type MetricResp struct { } type EndpointMetricRecv struct { - Endpoints []string `json:"endpoints"` - Nids []string `json:"nids"` - Metrics []string `json:"metrics"` + Endpoints []string `json:"endpoints"` + Nids []string `json:"nids"` + Metrics []string `json:"metrics"` + Start int64 `json:"start" description:"inclusive"` + End int64 `json:"end" description:"exclusive"` + StartInclusive time.Time `json:"-"` + EndExclusive time.Time `json:"-"` +} + +func (p *EndpointMetricRecv) Validate() (err error) { + p.StartInclusive, p.EndExclusive, err = timeRangeValidate(p.Start, p.End) + return } type IndexTagkvResp struct { @@ -85,11 +103,20 @@ type TagPair struct { } type CludeRecv struct { - Endpoints []string `json:"endpoints"` - Nids []string `json:"nids"` - Metric string `json:"metric"` - Include []*TagPair `json:"include"` - Exclude []*TagPair `json:"exclude"` + Endpoints []string `json:"endpoints"` + Nids []string `json:"nids"` + Metric string `json:"metric"` + Include []*TagPair `json:"include"` + Exclude []*TagPair `json:"exclude"` + Start int64 `json:"start" description:"inclusive"` + End int64 `json:"end" description:"exclusive"` + StartInclusive time.Time `json:"-"` + EndExclusive time.Time `json:"-"` +} + +func (p *CludeRecv) Validate() (err error) { + p.StartInclusive, p.EndExclusive, err = timeRangeValidate(p.Start, p.End) + return } type XcludeResp struct { @@ -112,24 +139,9 @@ type IndexByFullTagsRecv struct { EndExclusive time.Time `json:"-"` } -func (p *IndexByFullTagsRecv) Validate() error { - if p.End == 0 { - p.EndExclusive = time.Now() - } else { - p.EndExclusive = time.Unix(p.End, 0) - } - - if p.Start == 0 { - p.StartInclusive = p.EndExclusive.Add(-time.Hour * 25) - } else { - p.StartInclusive = time.Unix(p.Start, 0) - } - - if p.StartInclusive.After(p.EndExclusive) { - return fmt.Errorf("start is after end") - } - - return nil +func (p *IndexByFullTagsRecv) Validate() (err error) { + p.StartInclusive, p.EndExclusive, err = timeRangeValidate(p.Start, p.End) + return } type IndexByFullTagsResp struct { @@ -141,3 +153,23 @@ type IndexByFullTagsResp struct { DsType string `json:"dstype"` Count int `json:"count"` } + +func timeRangeValidate(start, end int64) (startInclusive, endExclusive time.Time, err error) { + if end == 0 { + endExclusive = time.Now() + } else { + endExclusive = time.Unix(end, 0) + } + + if start == 0 { + startInclusive = endExclusive.Add(-time.Hour * 25) + } else { + startInclusive = time.Unix(start, 0) + } + + if startInclusive.After(endExclusive) { + err = fmt.Errorf("start is after end") + } + + return +} diff --git a/src/modules/transfer/backend/m3db/m3db.go b/src/modules/transfer/backend/m3db/m3db.go index 1f361c93..da14d317 100644 --- a/src/modules/transfer/backend/m3db/m3db.go +++ b/src/modules/transfer/backend/m3db/m3db.go @@ -58,10 +58,6 @@ type Client struct { namespaceID ident.ID } -func indexStartTime() time.Time { - return time.Now().Add(-time.Hour * 25) -} - func NewClient(cfg M3dbSection) (*Client, error) { client, err := cfg.Config.NewClient(client.ConfigurationParameters{}) if err != nil { @@ -186,6 +182,11 @@ func (p *Client) QueryDataForUI(input dataobj.QueryDataForUI) []*dataobj.TsdbQue // QueryMetrics: || (&& (endpoint)) (counter)... // return all the values that tag == __name__ func (p *Client) QueryMetrics(input dataobj.EndpointsRecv) *dataobj.MetricResp { + if err := input.Validate(); err != nil { + logger.Errorf("input validate err %s", err) + return nil + } + session, err := p.session() if err != nil { logger.Errorf("unable to get m3db session: %s", err) @@ -205,6 +206,11 @@ func (p *Client) QueryMetrics(input dataobj.EndpointsRecv) *dataobj.MetricResp { // QueryTagPairs: && (|| endpoints...) (|| metrics...) // return all the tags that matches func (p *Client) QueryTagPairs(input dataobj.EndpointMetricRecv) []dataobj.IndexTagkvResp { + if err := input.Validate(); err != nil { + logger.Errorf("input validate err %s", err) + return nil + } + session, err := p.session() if err != nil { logger.Errorf("unable to get m3db session: %s", err) @@ -232,6 +238,10 @@ func (p *Client) QueryIndexByClude(inputs []dataobj.CludeRecv) (ret []dataobj.Xc } for _, input := range inputs { + if err := input.Validate(); err != nil { + logger.Errorf("input validate err %s", err) + continue + } ret = append(ret, p.queryIndexByClude(session, input)...) } diff --git a/src/modules/transfer/backend/m3db/query.go b/src/modules/transfer/backend/m3db/query.go index 9bc80985..b91e841d 100644 --- a/src/modules/transfer/backend/m3db/query.go +++ b/src/modules/transfer/backend/m3db/query.go @@ -145,8 +145,8 @@ func (cfg M3dbSection) queryMetricsOptions(input dataobj.EndpointsRecv) (index.Q )}, index.AggregationOptions{ QueryOptions: index.QueryOptions{ - StartInclusive: indexStartTime(), - EndExclusive: time.Now(), + StartInclusive: input.StartInclusive, + EndExclusive: input.EndExclusive, SeriesLimit: cfg.SeriesLimit, DocsLimit: cfg.DocsLimit, }, @@ -164,8 +164,8 @@ func (cfg M3dbSection) queryTagPairsOptions(input dataobj.EndpointMetricRecv) (i return index.Query{idx.NewConjunctionQuery(q1, q2)}, index.AggregationOptions{ QueryOptions: index.QueryOptions{ - StartInclusive: indexStartTime(), - EndExclusive: time.Now(), + StartInclusive: input.StartInclusive, + EndExclusive: input.EndExclusive, SeriesLimit: cfg.SeriesLimit, DocsLimit: cfg.DocsLimit, }, @@ -201,8 +201,8 @@ func (cfg M3dbSection) queryIndexByCludeOptions(input dataobj.CludeRecv) (index. } return query, index.QueryOptions{ - StartInclusive: indexStartTime(), - EndExclusive: time.Now(), + StartInclusive: input.StartInclusive, + EndExclusive: input.EndExclusive, SeriesLimit: cfg.SeriesLimit, DocsLimit: cfg.DocsLimit, }