support time limit for m3db query (#396)

This commit is contained in:
yubo 2020-11-12 15:50:31 +08:00 committed by GitHub
parent 033383eea4
commit dd67efe0f6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 38 additions and 19 deletions

2
.gitignore vendored
View File

@ -54,4 +54,4 @@ _test
/src/modules/transfer/transfer /src/modules/transfer/transfer
/src/modules/tsdb/tsdb /src/modules/tsdb/tsdb
tmp/

View File

@ -10,9 +10,8 @@ http:
sso: sso:
enable: false enable: false
ssoAddr: "http://10.1.2.3:8071" ssoAddr: "http://{sso-host}"
# TODO: redirectURL: "http://10.1.2.3:8072/auth-callback" redirectURL: "http://{rdb-host}/auth-callback"
redirectURL: "http://10.1.2.3:8072/api/rdb/auth/callback"
clientId: "" clientId: ""
clientSecret: "" clientSecret: ""
apiKey: "" apiKey: ""

View File

@ -25,6 +25,7 @@ const (
METRIC_NAME = "__name__" METRIC_NAME = "__name__"
SERIES_LIMIT = 1000 SERIES_LIMIT = 1000
DOCS_LIMIT = 100 DOCS_LIMIT = 100
MAX_PONINTS = 720
) )
type M3dbSection struct { type M3dbSection struct {
@ -51,6 +52,10 @@ type Client struct {
namespaceID ident.ID namespaceID ident.ID
} }
func indexStartTime() time.Time {
return time.Now().Add(-time.Hour * 25)
}
func NewClient(cfg M3dbSection) (*Client, error) { func NewClient(cfg M3dbSection) (*Client, error) {
client, err := cfg.Config.NewClient(client.ConfigurationParameters{}) client, err := cfg.Config.NewClient(client.ConfigurationParameters{})
if err != nil { if err != nil {
@ -267,7 +272,6 @@ func (p *Client) queryIndexByFullTags(session client.Session, input dataobj.Inde
ret = dataobj.IndexByFullTagsResp{ ret = dataobj.IndexByFullTagsResp{
Metric: input.Metric, Metric: input.Metric,
Tags: []string{}, Tags: []string{},
Step: 10,
DsType: "GAUGE", DsType: "GAUGE",
} }
@ -285,20 +289,22 @@ func (p *Client) queryIndexByFullTags(session client.Session, input dataobj.Inde
} }
ret.Endpoints = input.Endpoints ret.Endpoints = input.Endpoints
tags := map[string]struct{}{}
for iter.Next() { for iter.Next() {
log.Printf("iter.next() ")
_, _, tagIter := iter.Current() _, _, tagIter := iter.Current()
resp := xcludeResp(tagIter) resp := xcludeResp(tagIter)
if len(resp.Tags) > 0 { if len(resp.Tags) > 0 && len(resp.Tags[0]) > 0 {
ret.Tags = append(ret.Tags, resp.Tags[0]) tags[resp.Tags[0]] = struct{}{}
} }
} }
for k, _ := range tags {
ret.Tags = append(ret.Tags, k)
}
if err := iter.Err(); err != nil { if err := iter.Err(); err != nil {
logger.Errorf("FetchTaggedIDs iter:", err) logger.Errorf("FetchTaggedIDs iter:", err)
} }
return ret return ret
} }
// GetInstance: && (metric) (endpoint) (&& tags...) // GetInstance: && (metric) (endpoint) (&& tags...)
@ -438,12 +444,21 @@ func seriesIterWalk(iter encoding.SeriesIterator) (out *dataobj.TsdbQueryRespons
tagsIter := iter.Tags() tagsIter := iter.Tags()
tags := map[string]string{} tags := map[string]string{}
var metric, endpoint string
for tagsIter.Next() { for tagsIter.Next() {
tag := tagsIter.Current() tag := tagsIter.Current()
tags[tag.Name.String()] = tag.Value.String() k := tag.Name.String()
v := tag.Value.String()
switch k {
case METRIC_NAME:
metric = v
case ENDPOINT_NAME, NID_NAME:
endpoint = v
default:
tags[k] = v
}
} }
metric := tags[METRIC_NAME]
endpoint := tags[ENDPOINT_NAME]
counter, err := dataobj.GetCounter(metric, "", tags) counter, err := dataobj.GetCounter(metric, "", tags)
return &dataobj.TsdbQueryResponse{ return &dataobj.TsdbQueryResponse{
@ -483,7 +498,7 @@ func (cfg M3dbSection) validateTime(start, end int64, step *int) error {
} }
if *step == 0 { if *step == 0 {
*step = int((end - start) / 720) *step = int((end - start) / MAX_PONINTS)
} }
if *step > cfg.MinStep { if *step > cfg.MinStep {

View File

@ -134,7 +134,7 @@ func (cfg M3dbSection) queryMetricsOptions(input dataobj.EndpointsRecv) (index.Q
)}, )},
index.AggregationOptions{ index.AggregationOptions{
QueryOptions: index.QueryOptions{ QueryOptions: index.QueryOptions{
StartInclusive: time.Time{}, StartInclusive: indexStartTime(),
EndExclusive: time.Now(), EndExclusive: time.Now(),
SeriesLimit: cfg.SeriesLimit, SeriesLimit: cfg.SeriesLimit,
DocsLimit: cfg.DocsLimit, DocsLimit: cfg.DocsLimit,
@ -153,7 +153,7 @@ func (cfg M3dbSection) queryTagPairsOptions(input dataobj.EndpointMetricRecv) (i
return index.Query{idx.NewConjunctionQuery(q1, q2)}, return index.Query{idx.NewConjunctionQuery(q1, q2)},
index.AggregationOptions{ index.AggregationOptions{
QueryOptions: index.QueryOptions{ QueryOptions: index.QueryOptions{
StartInclusive: time.Time{}, StartInclusive: indexStartTime(),
EndExclusive: time.Now(), EndExclusive: time.Now(),
SeriesLimit: cfg.SeriesLimit, SeriesLimit: cfg.SeriesLimit,
DocsLimit: cfg.DocsLimit, DocsLimit: cfg.DocsLimit,
@ -188,7 +188,7 @@ func (cfg M3dbSection) queryIndexByCludeOptions(input dataobj.CludeRecv) (index.
} }
return query, index.QueryOptions{ return query, index.QueryOptions{
StartInclusive: time.Time{}, StartInclusive: indexStartTime(),
EndExclusive: time.Now(), EndExclusive: time.Now(),
SeriesLimit: cfg.SeriesLimit, SeriesLimit: cfg.SeriesLimit,
DocsLimit: cfg.DocsLimit, DocsLimit: cfg.DocsLimit,
@ -218,7 +218,7 @@ func (cfg M3dbSection) queryIndexByFullTagsOptions(input dataobj.IndexByFullTags
} }
return query, index.QueryOptions{ return query, index.QueryOptions{
StartInclusive: time.Time{}, StartInclusive: indexStartTime(),
EndExclusive: time.Now(), EndExclusive: time.Now(),
SeriesLimit: cfg.SeriesLimit, SeriesLimit: cfg.SeriesLimit,
DocsLimit: cfg.DocsLimit, DocsLimit: cfg.DocsLimit,

View File

@ -7,7 +7,7 @@
curl -X POST \ curl -X POST \
http://localhost:8008/api/index/metrics \ http://localhost:8008/api/index/metrics \
-d '{ -d '{
"endpoints": [] "endpoints": ["10.178.24.116"]
}' }'

View File

@ -148,5 +148,10 @@ func GetIndexByFullTags(c *gin.Context) {
} }
resp := dataSource.QueryIndexByFullTags(recvs) resp := dataSource.QueryIndexByFullTags(recvs)
render.Data(c, resp, nil) render.Data(c, &listResp{List: resp, Count: len(resp)}, nil)
}
type listResp struct {
List interface{} `json:"list"`
Count int `json:"count"`
} }