bugfix: query index by clude only get last record (#401)

* support time limit for m3db query

* bugfix: query index by clude only get last record
This commit is contained in:
yubo 2020-11-16 13:25:35 +08:00 committed by GitHub
parent 7b1ccd956b
commit 86df27587e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 16 additions and 18 deletions

View File

@ -88,7 +88,8 @@ func tagsIndexTagkvResp(tags *consolidators.CompleteTagsResult) *dataobj.IndexTa
return ret return ret
} }
func xcludeResp(iter ident.TagIterator) (ret dataobj.XcludeResp) { func xcludeResp(iter ident.TagIterator) *dataobj.XcludeResp {
ret := &dataobj.XcludeResp{}
tags := map[string]string{} tags := map[string]string{}
for iter.Next() { for iter.Next() {
tag := iter.Current() tag := iter.Current()

View File

@ -221,20 +221,20 @@ func (p *Client) queryIndexByClude(session client.Session, input dataobj.CludeRe
} }
// group by endpoint-metric // group by endpoint-metric
respMap := make(map[string]dataobj.XcludeResp) respMap := make(map[string]*dataobj.XcludeResp)
for iter.Next() { for iter.Next() {
_, _, tagIter := iter.Current() _, _, tagIter := iter.Current()
resp := xcludeResp(tagIter) resp := xcludeResp(tagIter)
if len(resp.Tags) > 0 && len(resp.Tags[0]) > 0 {
key := fmt.Sprintf("%s-%s", resp.Endpoint, resp.Metric) key := fmt.Sprintf("%s-%s", resp.Endpoint, resp.Metric)
if v, ok := respMap[key]; ok { if v, ok := respMap[key]; ok {
if len(resp.Tags) > 0 {
v.Tags = append(v.Tags, resp.Tags[0]) v.Tags = append(v.Tags, resp.Tags[0])
}
} else { } else {
respMap[key] = resp respMap[key] = resp
} }
} }
}
if err := iter.Err(); err != nil { if err := iter.Err(); err != nil {
logger.Errorf("FetchTaggedIDs iter:", err) logger.Errorf("FetchTaggedIDs iter:", err)
@ -243,7 +243,7 @@ func (p *Client) queryIndexByClude(session client.Session, input dataobj.CludeRe
resp := make([]dataobj.XcludeResp, 0, len(respMap)) resp := make([]dataobj.XcludeResp, 0, len(respMap))
for _, v := range respMap { for _, v := range respMap {
resp = append(resp, v) resp = append(resp, *v)
} }
return resp return resp

View File

@ -184,7 +184,7 @@ func (cfg M3dbSection) queryIndexByCludeOptions(input dataobj.CludeRecv) (index.
if len(q) == 0 { if len(q) == 0 {
query = index.Query{idx.NewAllQuery()} query = index.Query{idx.NewAllQuery()}
} else { } else {
query = index.Query{idx.NewDisjunctionQuery(q...)} query = index.Query{idx.NewConjunctionQuery(q...)}
} }
return query, index.QueryOptions{ return query, index.QueryOptions{
@ -268,12 +268,8 @@ func includeTagsQuery2(in []dataobj.TagPair) idx.Query {
func excludeTagsQuery(in []*dataobj.TagPair) idx.Query { func excludeTagsQuery(in []*dataobj.TagPair) idx.Query {
q := []idx.Query{} q := []idx.Query{}
for _, kvs := range in { for _, kvs := range in {
q1 := []idx.Query{}
for _, v := range kvs.Values { for _, v := range kvs.Values {
q1 = append(q1, idx.NewNegationQuery(idx.NewTermQuery([]byte(kvs.Key), []byte(v)))) q = append(q, idx.NewNegationQuery(idx.NewTermQuery([]byte(kvs.Key), []byte(v))))
}
if len(q1) > 0 {
q = append(q, idx.NewConjunctionQuery(q1...))
} }
} }

View File

@ -1,5 +1,6 @@
#!/bin/bash #!/bin/bash
# data:[{Nid: Endpoint:10.86.76.13 Metric:disk.bytes.used.percent Tags:[mount=/tmp] Step:0 Dstype:}]
# type CludeRecv struct { # type CludeRecv struct {
# Endpoints []string `json:"endpoints"` # Endpoints []string `json:"endpoints"`
@ -12,8 +13,8 @@
curl -X POST \ curl -X POST \
http://localhost:8008/api/index/counter/clude \ http://localhost:8008/api/index/counter/clude \
-d '[{ -d '[{
"endpoints": [], "endpoints": ["10.86.76.13"],
"metric": "test", "metric": "disk.bytes.used.percent",
"include": [], "exclude": [{"tagk":"mount", "tagv": ["/boot"]}],
"exclude": [{"tagk":"city", "tagv": ["bjo"]}] "include": [{"tagk":"mount", "tagv": ["/", "/home"]}]
}]' | jq . }]' | jq .