add nid to transfer query data/index (#411)

This commit is contained in:
yubo 2020-11-18 23:46:51 +08:00 committed by GitHub
parent a6b160caed
commit 299122f965
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 24 additions and 9 deletions

View File

@ -54,6 +54,7 @@ func (resp *TsdbQueryResponse) Key() string {
type EndpointsRecv struct {
Endpoints []string `json:"endpoints"`
Nids []string `json:"nids"`
}
type MetricResp struct {
@ -62,11 +63,13 @@ type MetricResp struct {
type EndpointMetricRecv struct {
Endpoints []string `json:"endpoints"`
Nids []string `json:"nids"`
Metrics []string `json:"metrics"`
}
type IndexTagkvResp struct {
Endpoints []string `json:"endpoints"`
Nids []string `json:"nids"`
Metric string `json:"metric"`
Tagkv []*TagPair `json:"tagkv"`
}
@ -78,6 +81,7 @@ type TagPair struct {
type CludeRecv struct {
Endpoints []string `json:"endpoints"`
Nids []string `json:"nids"`
Metric string `json:"metric"`
Include []*TagPair `json:"include"`
Exclude []*TagPair `json:"exclude"`
@ -94,12 +98,14 @@ type XcludeResp struct {
type IndexByFullTagsRecv struct {
Endpoints []string `json:"endpoints"`
Nids []string `json:"nids"`
Metric string `json:"metric"`
Tagkv []TagPair `json:"tagkv"`
}
type IndexByFullTagsResp struct {
Endpoints []string `json:"endpoints"`
Nids []string `json:"nids"`
Metric string `json:"metric"`
Tags []string `json:"tags"`
Step int `json:"step"`

View File

@ -75,6 +75,11 @@ func tagsIndexTagkvResp(tags *consolidators.CompleteTagsResult) *dataobj.IndexTa
for i, v := range tag.Values {
ret.Endpoints[i] = string(v)
}
case NID_NAME:
ret.Nids = make([]string, len(tag.Values))
for i, v := range tag.Values {
ret.Nids[i] = string(v)
}
default:
kv := &dataobj.TagPair{Key: string(tag.Name)}
kv.Values = make([]string, len(tag.Values))

View File

@ -292,6 +292,7 @@ func (p *Client) queryIndexByFullTags(session client.Session, input dataobj.Inde
}
ret.Endpoints = input.Endpoints
ret.Nids = input.Nids
tags := map[string]struct{}{}
for iter.Next() {
_, _, tagIter := iter.Current()
@ -446,7 +447,7 @@ func seriesIterWalk(iter encoding.SeriesIterator) (out *dataobj.TsdbQueryRespons
tagsIter := iter.Tags()
tags := map[string]string{}
var metric, endpoint string
var metric, endpoint, nid string
for tagsIter.Next() {
tag := tagsIter.Current()
@ -455,8 +456,10 @@ func seriesIterWalk(iter encoding.SeriesIterator) (out *dataobj.TsdbQueryRespons
switch k {
case METRIC_NAME:
metric = v
case ENDPOINT_NAME, NID_NAME:
case ENDPOINT_NAME:
endpoint = v
case NID_NAME:
nid = v
default:
tags[k] = v
}
@ -467,6 +470,7 @@ func seriesIterWalk(iter encoding.SeriesIterator) (out *dataobj.TsdbQueryRespons
Start: iter.Start().Unix(),
End: iter.End().Unix(),
Endpoint: endpoint,
Nid: nid,
Counter: counter,
Values: values,
}, nil

View File

@ -140,7 +140,7 @@ func metricTagsQuery(tags []string) idx.Query {
func (cfg M3dbSection) queryMetricsOptions(input dataobj.EndpointsRecv) (index.Query, index.AggregationOptions) {
nameByte := []byte(METRIC_NAME)
return index.Query{idx.NewConjunctionQuery(
endpointsQuery(nil, input.Endpoints),
endpointsQuery(input.Nids, input.Endpoints),
idx.NewFieldQuery(nameByte),
)},
index.AggregationOptions{
@ -158,7 +158,7 @@ func (cfg M3dbSection) queryMetricsOptions(input dataobj.EndpointsRecv) (index.Q
// QueryTagPairs
// (endpoint[0] || endpoint[1]...) && (metrics[0] || metrics[1] ... )
func (cfg M3dbSection) queryTagPairsOptions(input dataobj.EndpointMetricRecv) (index.Query, index.AggregationOptions) {
q1 := endpointsQuery(nil, input.Endpoints)
q1 := endpointsQuery(input.Nids, input.Endpoints)
q2 := metricsQuery(input.Metrics)
return index.Query{idx.NewConjunctionQuery(q1, q2)},
@ -179,8 +179,8 @@ func (cfg M3dbSection) queryIndexByCludeOptions(input dataobj.CludeRecv) (index.
query := index.Query{}
q := []idx.Query{}
if len(input.Endpoints) > 0 {
q = append(q, endpointsQuery(nil, input.Endpoints))
if len(input.Endpoints) > 0 || len(input.Nids) > 0 {
q = append(q, endpointsQuery(input.Nids, input.Endpoints))
}
if input.Metric != "" {
q = append(q, metricQuery(input.Metric))
@ -214,8 +214,8 @@ func (cfg M3dbSection) queryIndexByFullTagsOptions(input dataobj.IndexByFullTags
query := index.Query{}
q := []idx.Query{}
if len(input.Endpoints) > 0 {
q = append(q, endpointsQuery(nil, input.Endpoints))
if len(input.Endpoints) > 0 || len(input.Nids) > 0 {
q = append(q, endpointsQuery(input.Nids, input.Endpoints))
}
if input.Metric != "" {
q = append(q, metricQuery(input.Metric))

View File

@ -7,7 +7,7 @@
curl -X POST \
http://localhost:8008/api/index/metrics \
-d '{
"endpoints": ["10.178.24.116"]
"endpoints": ["10.178.24.120", "10.178.25.123"]
}'