diff --git a/etc/transfer.yml b/etc/transfer.yml index a04f103c..d0753cd6 100644 --- a/etc/transfer.yml +++ b/etc/transfer.yml @@ -4,6 +4,9 @@ backend: enabled: false name: "m3db" namespace: "test" + seriesLimit: 0 + docsLimit: 0 + daysLimit: 7 # max query time # https://m3db.github.io/m3/m3db/architecture/consistencylevels/ writeConsistencyLevel: "majority" # one|majority|all readConsistencyLevel: "unstrict_majority" # one|unstrict_majority|majority|all diff --git a/src/common/dataobj/query_item.go b/src/common/dataobj/query_item.go index 9ea11cb4..5230c335 100644 --- a/src/common/dataobj/query_item.go +++ b/src/common/dataobj/query_item.go @@ -20,9 +20,9 @@ type QueryDataForUI struct { Tags []string `json:"tags"` Step int `json:"step"` DsType string `json:"dstype"` - GroupKey []string `json:"groupKey"` //聚合维度 - AggrFunc string `json:"aggrFunc"` //聚合计算 - ConsolFunc string `json:"consolFunc"` + GroupKey []string `json:"groupKey"` //聚合维度 + AggrFunc string `json:"aggrFunc" description:"sum,avg,max,min"` //聚合计算 + ConsolFunc string `json:"consolFunc" description:"AVERAGE,MIN,MAX,LAST"` Comparisons []int64 `json:"comparisons"` //环比多少时间 } diff --git a/src/modules/transfer/backend/init.go b/src/modules/transfer/backend/init.go index 850a2bcb..5c8f33dd 100644 --- a/src/modules/transfer/backend/init.go +++ b/src/modules/transfer/backend/init.go @@ -81,7 +81,7 @@ func Init(cfg BackendSection) { // init m3db if cfg.M3db.Enabled { var err error - m3dbDataSource, err = m3db.NewClient(cfg.M3db.Namespace, &cfg.M3db.Config) + m3dbDataSource, err = m3db.NewClient(cfg.M3db) if err != nil { log.Fatalf("unable to new m3db client: %v", err) } diff --git a/src/modules/transfer/backend/m3db/convert.go b/src/modules/transfer/backend/m3db/convert.go index c03defb3..be6d30f1 100644 --- a/src/modules/transfer/backend/m3db/convert.go +++ b/src/modules/transfer/backend/m3db/convert.go @@ -116,9 +116,9 @@ func aggregateResp(data []*dataobj.TsdbQueryResponse, opts dataobj.QueryDataForU return data } - // Adjust the data + // resample the data for _, v := range data { - v.Values = resample(v.Values, opts.Start, opts.End, int64(opts.Step)) + v.Values = resample(v.Values, opts.Start, opts.End, int64(opts.Step), opts.AggrFunc) } // 没有聚合 tag, 或者曲线没有其他 tags, 直接所有曲线进行计算 @@ -179,11 +179,7 @@ func aggregateResp(data []*dataobj.TsdbQueryResponse, opts dataobj.QueryDataForU return aggrDatas } -var ( - nanFloat = dataobj.JsonFloat(math.NaN()) -) - -func resample(data []*dataobj.RRDData, start, end, step int64) []*dataobj.RRDData { +func resample(data []*dataobj.RRDData, start, end, step int64, aggrFunc string) []*dataobj.RRDData { l := int((end - start) / step) if l <= 0 { return []*dataobj.RRDData{} @@ -198,31 +194,93 @@ func resample(data []*dataobj.RRDData, start, end, step int64) []*dataobj.RRDDat j := 0 for ; ts < end; ts += step { - get := func() *dataobj.RRDData { + get := func() (ret []dataobj.JsonFloat) { if j == len(data) { - return nil + return } for { if j == len(data) { - return &dataobj.RRDData{Timestamp: ts, Value: nanFloat} + return } if d := data[j]; d.Timestamp < ts { + ret = append(ret, d.Value) j++ continue } else if d.Timestamp >= ts+step { - return &dataobj.RRDData{Timestamp: ts, Value: nanFloat} + return } else { + ret = append(ret, d.Value) j++ - return d + return } } } - ret = append(ret, get()) + ret = append(ret, &dataobj.RRDData{ + Timestamp: ts, + Value: aggrData(aggrFunc, get()), + }) } return ret } +func aggrData(fn string, data []dataobj.JsonFloat) dataobj.JsonFloat { + if len(data) == 0 { + return dataobj.JsonFloat(math.NaN()) + } + switch fn { + case "sum": + return sum(data) + case "avg": + return avg(data) + case "max": + return max(data) + case "min": + return min(data) + // case "last": + default: + return last(data) + } +} + +func sum(data []dataobj.JsonFloat) (ret dataobj.JsonFloat) { + for _, v := range data { + ret += v + } + return ret +} + +func avg(data []dataobj.JsonFloat) (ret dataobj.JsonFloat) { + for _, v := range data { + ret += v + } + return ret / dataobj.JsonFloat(len(data)) +} + +func max(data []dataobj.JsonFloat) (ret dataobj.JsonFloat) { + ret = data[0] + for i := 1; i < len(data); i++ { + if data[i] > ret { + ret = data[i] + } + } + return ret +} + +func min(data []dataobj.JsonFloat) (ret dataobj.JsonFloat) { + ret = data[0] + for i := 1; i < len(data); i++ { + if data[i] < ret { + ret = data[i] + } + } + return ret +} + +func last(data []dataobj.JsonFloat) (ret dataobj.JsonFloat) { + return data[len(data)-1] +} + func getTags(counter string) (tags string) { idx := strings.IndexAny(counter, "/") if idx == -1 { diff --git a/src/modules/transfer/backend/m3db/m3db.go b/src/modules/transfer/backend/m3db/m3db.go index e8ddad1a..a9dca336 100644 --- a/src/modules/transfer/backend/m3db/m3db.go +++ b/src/modules/transfer/backend/m3db/m3db.go @@ -28,10 +28,14 @@ const ( ) type M3dbSection struct { - Name string `yaml:"name"` - Enabled bool `yaml:"enabled"` - Namespace string `yaml:"namespace"` - Config client.Configuration `yaml:",inline"` + Name string `yaml:"name"` + Enabled bool `yaml:"enabled"` + Namespace string `yaml:"namespace"` + DaysLimit int `yaml:"daysLimit"` + SeriesLimit int `yaml:"seriesLimit"` + DocsLimit int `yaml:"docsLimit"` + MinStep int `yaml:"minStep"` + Config client.Configuration `yaml:",inline"` } type Client struct { @@ -42,22 +46,26 @@ type Client struct { opts client.Options namespace string - config *client.Configuration + config *M3dbSection namespaceID ident.ID } -func NewClient(namespace string, cfg *client.Configuration) (*Client, error) { - client, err := cfg.NewClient(client.ConfigurationParameters{}) +func NewClient(cfg M3dbSection) (*Client, error) { + client, err := cfg.Config.NewClient(client.ConfigurationParameters{}) if err != nil { return nil, fmt.Errorf("unable to get new M3DB client: %v", err) } + if cfg.MinStep == 0 { + cfg.MinStep = 1 + } + ret := &Client{ - namespace: namespace, - config: cfg, + namespace: cfg.Namespace, + config: &cfg, client: client, - namespaceID: ident.StringID(namespace), + namespaceID: ident.StringID(cfg.Namespace), } if _, err := ret.session(); err != nil { @@ -107,7 +115,7 @@ func (p *Client) QueryData(inputs []dataobj.QueryData) []*dataobj.TsdbQueryRespo return nil } - query, opts := queryDataOptions(inputs) + query, opts := p.config.queryDataOptions(inputs) ret, err := fetchTagged(session, p.namespaceID, query, opts) if err != nil { logger.Errorf("unable to query data: ", err) @@ -118,17 +126,21 @@ func (p *Client) QueryData(inputs []dataobj.QueryData) []*dataobj.TsdbQueryRespo } // QueryDataForUi: && (metric) (|| endpoints...) (&& tags...) -// get kv func (p *Client) QueryDataForUI(input dataobj.QueryDataForUI) []*dataobj.TsdbQueryResponse { logger.Debugf("query data for ui, input: %+v", input) + if err := p.config.validateQueryDataForUI(&input); err != nil { + logger.Errorf("input is invalid %s", err) + return nil + } + session, err := p.session() if err != nil { logger.Errorf("unable to get m3db session: %s", err) return nil } - query, opts := queryDataUIOptions(input) + query, opts := p.config.queryDataUIOptions(input) ret, err := fetchTagged(session, p.namespaceID, query, opts) if err != nil { @@ -136,10 +148,6 @@ func (p *Client) QueryDataForUI(input dataobj.QueryDataForUI) []*dataobj.TsdbQue return nil } - if input.Step == 0 { - input.Step = 60 - } - return aggregateResp(ret, input) } @@ -152,7 +160,7 @@ func (p *Client) QueryMetrics(input dataobj.EndpointsRecv) *dataobj.MetricResp { return nil } - query, opts := queryMetricsOptions(input) + query, opts := p.config.queryMetricsOptions(input) tags, err := completeTags(session, p.namespaceID, query, opts) if err != nil { @@ -171,7 +179,7 @@ func (p *Client) QueryTagPairs(input dataobj.EndpointMetricRecv) []dataobj.Index return nil } - query, opts := queryTagPairsOptions(input) + query, opts := p.config.queryTagPairsOptions(input) tags, err := completeTags(session, p.namespaceID, query, opts) if err != nil { @@ -199,7 +207,7 @@ func (p *Client) QueryIndexByClude(inputs []dataobj.CludeRecv) (ret []dataobj.Xc } func (p *Client) queryIndexByClude(session client.Session, input dataobj.CludeRecv) []dataobj.XcludeResp { - query, opts := queryIndexByCludeOptions(input) + query, opts := p.config.queryIndexByCludeOptions(input) iter, _, err := session.FetchTaggedIDs(p.namespaceID, query, opts) if err != nil { @@ -263,7 +271,7 @@ func (p *Client) queryIndexByFullTags(session client.Session, input dataobj.Inde DsType: "GAUGE", } - query, opts := queryIndexByFullTagsOptions(input) + query, opts := p.config.queryIndexByFullTagsOptions(input) if query.Query.Equal(idx.NewAllQuery()) { ret.Endpoints = input.Endpoints log.Printf("all query") @@ -446,3 +454,40 @@ func seriesIterWalk(iter encoding.SeriesIterator) (out *dataobj.TsdbQueryRespons Values: values, }, nil } + +func (cfg M3dbSection) validateQueryDataForUI(in *dataobj.QueryDataForUI) (err error) { + if in.AggrFunc != "" && + in.AggrFunc != "sum" && + in.AggrFunc != "avg" && + in.AggrFunc != "max" && + in.AggrFunc != "min" { + return fmt.Errorf("%s is invalid aggrfunc", in.AggrFunc) + } + + if err := cfg.validateTime(in.Start, in.End, &in.Step); err != nil { + return err + } + + return nil +} + +func (cfg M3dbSection) validateTime(start, end int64, step *int) error { + if end <= start { + return fmt.Errorf("query time range is invalid end %d <= start %d", end, start) + } + + if cfg.DaysLimit > 0 { + if days := int((end - start) / 86400); days > cfg.DaysLimit { + return fmt.Errorf("query time reange in invalid, daysLimit(%d/%d)", days, cfg.DaysLimit) + } + } + + if *step == 0 { + *step = int((end - start) / 720) + } + + if *step > cfg.MinStep { + *step = cfg.MinStep + } + return nil +} diff --git a/src/modules/transfer/backend/m3db/query.go b/src/modules/transfer/backend/m3db/query.go index 49855d46..9adb185c 100644 --- a/src/modules/transfer/backend/m3db/query.go +++ b/src/modules/transfer/backend/m3db/query.go @@ -10,7 +10,7 @@ import ( ) // QueryData -func queryDataOptions(inputs []dataobj.QueryData) (index.Query, index.QueryOptions) { +func (cfg M3dbSection) queryDataOptions(inputs []dataobj.QueryData) (index.Query, index.QueryOptions) { q := []idx.Query{} for _, input := range inputs { @@ -23,14 +23,14 @@ func queryDataOptions(inputs []dataobj.QueryData) (index.Query, index.QueryOptio index.QueryOptions{ StartInclusive: time.Unix(inputs[0].Start, 0), EndExclusive: time.Unix(inputs[0].End, 0), - DocsLimit: DOCS_LIMIT, - SeriesLimit: SERIES_LIMIT, + SeriesLimit: cfg.SeriesLimit, + DocsLimit: cfg.DocsLimit, } } // QueryDataForUI // metric && (endpoints[0] || endporint[1] ...) && (tags[0] || tags[1] ...) -func queryDataUIOptions(input dataobj.QueryDataForUI) (index.Query, index.QueryOptions) { +func (cfg M3dbSection) queryDataUIOptions(input dataobj.QueryDataForUI) (index.Query, index.QueryOptions) { q1 := idx.NewTermQuery([]byte(METRIC_NAME), []byte(input.Metric)) q2 := endpointsQuery(input.Nids, input.Endpoints) q3 := metricTagsQuery(input.Tags) @@ -39,8 +39,8 @@ func queryDataUIOptions(input dataobj.QueryDataForUI) (index.Query, index.QueryO index.QueryOptions{ StartInclusive: time.Unix(input.Start, 0), EndExclusive: time.Unix(input.End, 0), - SeriesLimit: SERIES_LIMIT, - DocsLimit: DOCS_LIMIT, + SeriesLimit: cfg.SeriesLimit, + DocsLimit: cfg.DocsLimit, } } @@ -126,7 +126,7 @@ func metricTagsQuery(tags []string) idx.Query { // QueryMetrics // (endpoint[0] || endpoint[1] ... ) -func queryMetricsOptions(input dataobj.EndpointsRecv) (index.Query, index.AggregationOptions) { +func (cfg M3dbSection) queryMetricsOptions(input dataobj.EndpointsRecv) (index.Query, index.AggregationOptions) { nameByte := []byte(METRIC_NAME) return index.Query{idx.NewConjunctionQuery( endpointsQuery(nil, input.Endpoints), @@ -136,8 +136,8 @@ func queryMetricsOptions(input dataobj.EndpointsRecv) (index.Query, index.Aggreg QueryOptions: index.QueryOptions{ StartInclusive: time.Time{}, EndExclusive: time.Now(), - SeriesLimit: SERIES_LIMIT, - DocsLimit: DOCS_LIMIT, + SeriesLimit: cfg.SeriesLimit, + DocsLimit: cfg.DocsLimit, }, FieldFilter: [][]byte{nameByte}, Type: index.AggregateTagNamesAndValues, @@ -146,7 +146,7 @@ func queryMetricsOptions(input dataobj.EndpointsRecv) (index.Query, index.Aggreg // QueryTagPairs // (endpoint[0] || endpoint[1]...) && (metrics[0] || metrics[1] ... ) -func queryTagPairsOptions(input dataobj.EndpointMetricRecv) (index.Query, index.AggregationOptions) { +func (cfg M3dbSection) queryTagPairsOptions(input dataobj.EndpointMetricRecv) (index.Query, index.AggregationOptions) { q1 := endpointsQuery(nil, input.Endpoints) q2 := metricsQuery(input.Metrics) @@ -155,8 +155,8 @@ func queryTagPairsOptions(input dataobj.EndpointMetricRecv) (index.Query, index. QueryOptions: index.QueryOptions{ StartInclusive: time.Time{}, EndExclusive: time.Now(), - SeriesLimit: SERIES_LIMIT, - DocsLimit: DOCS_LIMIT, + SeriesLimit: cfg.SeriesLimit, + DocsLimit: cfg.DocsLimit, }, FieldFilter: index.AggregateFieldFilter(nil), Type: index.AggregateTagNamesAndValues, @@ -164,7 +164,7 @@ func queryTagPairsOptions(input dataobj.EndpointMetricRecv) (index.Query, index. } // QueryIndexByClude: || (&& (|| endpoints...) (metric) (|| include...) (&& exclude..)) -func queryIndexByCludeOptions(input dataobj.CludeRecv) (index.Query, index.QueryOptions) { +func (cfg M3dbSection) queryIndexByCludeOptions(input dataobj.CludeRecv) (index.Query, index.QueryOptions) { query := index.Query{} q := []idx.Query{} @@ -190,14 +190,14 @@ func queryIndexByCludeOptions(input dataobj.CludeRecv) (index.Query, index.Query return query, index.QueryOptions{ StartInclusive: time.Time{}, EndExclusive: time.Now(), - SeriesLimit: SERIES_LIMIT, - DocsLimit: DOCS_LIMIT, + SeriesLimit: cfg.SeriesLimit, + DocsLimit: cfg.DocsLimit, } } // QueryIndexByFullTags: && (|| endpoints) (metric) (&& tagkv) -func queryIndexByFullTagsOptions(input dataobj.IndexByFullTagsRecv) (index.Query, index.QueryOptions) { +func (cfg M3dbSection) queryIndexByFullTagsOptions(input dataobj.IndexByFullTagsRecv) (index.Query, index.QueryOptions) { query := index.Query{} q := []idx.Query{} @@ -220,8 +220,8 @@ func queryIndexByFullTagsOptions(input dataobj.IndexByFullTagsRecv) (index.Query return query, index.QueryOptions{ StartInclusive: time.Time{}, EndExclusive: time.Now(), - SeriesLimit: SERIES_LIMIT, - DocsLimit: DOCS_LIMIT, + SeriesLimit: cfg.SeriesLimit, + DocsLimit: cfg.DocsLimit, } }