From 68b213b73726d59631dc5603a3518453a6bc735f Mon Sep 17 00:00:00 2001 From: litianshun Date: Mon, 28 Sep 2020 11:10:16 +0800 Subject: [PATCH] influxdb query sql bug fix (#313) * bug fix influxdb query data, query index Co-authored-by: litianshun --- src/common/dataobj/query_item.go | 10 +- .../transfer/backend/influxdb/model.go | 24 ++- .../transfer/backend/influxdb/query.go | 142 ++++++++++++++---- 3 files changed, 130 insertions(+), 46 deletions(-) diff --git a/src/common/dataobj/query_item.go b/src/common/dataobj/query_item.go index 5e2205d4..f76b2c5c 100644 --- a/src/common/dataobj/query_item.go +++ b/src/common/dataobj/query_item.go @@ -84,11 +84,11 @@ type CludeRecv struct { } type XcludeResp struct { - Endpoints []string `json:"endpoints"` - Metric string `json:"metric"` - Tags []string `json:"tags"` - Step int `json:"step"` - DsType string `json:"dstype"` + Endpoint string `json:"endpoint"` + Metric string `json:"metric"` + Tags []string `json:"tags"` + Step int `json:"step"` + DsType string `json:"dstype"` } type IndexByFullTagsRecv struct { diff --git a/src/modules/transfer/backend/influxdb/model.go b/src/modules/transfer/backend/influxdb/model.go index 22a8ad76..9452923a 100644 --- a/src/modules/transfer/backend/influxdb/model.go +++ b/src/modules/transfer/backend/influxdb/model.go @@ -36,43 +36,51 @@ func (query *ShowSeries) renderEndpoints() { } endpointPart = endpointPart[:len(endpointPart)-len("OR")] endpointPart += ")" - query.RawQuery = fmt.Sprintf("\"%s\" WHERE \"%s\"", query.RawQuery, endpointPart) + query.RawQuery = fmt.Sprintf("%s WHERE %s", query.RawQuery, endpointPart) } } func (query *ShowSeries) renderInclude() { if len(query.Include) > 0 { // include + if len(query.Include) == 1 && query.Include[0] == nil { + return + } includePart := "(" for _, include := range query.Include { for _, value := range include.Values { includePart += fmt.Sprintf(" \"%s\"='%s' OR", include.Key, value) } } - includePart = includePart[:len(includePart)-len("OR")] + includePart = includePart[:len(includePart)-len("AND")] includePart += ")" if !strings.Contains(query.RawQuery, "WHERE") { - query.RawQuery += " WHERE" + query.RawQuery = fmt.Sprintf(" %s WHERE %s", query.RawQuery, includePart) + } else { + query.RawQuery = fmt.Sprintf(" %s AND %s", query.RawQuery, includePart) } - query.RawQuery = fmt.Sprintf(" %s AND %s", query.RawQuery, includePart) } } func (query *ShowSeries) renderExclude() { if len(query.Exclude) > 0 { // exclude + if len(query.Exclude) == 1 && query.Exclude[0] == nil { + return + } excludePart := "(" for _, exclude := range query.Exclude { for _, value := range exclude.Values { - excludePart += fmt.Sprintf(" \"%s\"='%s' OR", exclude.Key, value) + excludePart += fmt.Sprintf(" \"%s\"!='%s' AND", exclude.Key, value) } } - excludePart = excludePart[:len(excludePart)-len("OR")] + excludePart = excludePart[:len(excludePart)-len("AND")] excludePart += ")" if !strings.Contains(query.RawQuery, "WHERE") { - query.RawQuery += " WHERE" + query.RawQuery = fmt.Sprintf(" %s WHERE %s", query.RawQuery, excludePart) + } else { + query.RawQuery = fmt.Sprintf(" %s AND %s", query.RawQuery, excludePart) } - query.RawQuery = fmt.Sprintf(" %s AND %s", query.RawQuery, excludePart) } } diff --git a/src/modules/transfer/backend/influxdb/query.go b/src/modules/transfer/backend/influxdb/query.go index 0ddfbc9c..7c343dc7 100644 --- a/src/modules/transfer/backend/influxdb/query.go +++ b/src/modules/transfer/backend/influxdb/query.go @@ -25,15 +25,34 @@ func (influxdb *InfluxdbDataSource) QueryData(inputs []dataobj.QueryData) []*dat return nil } + respMap := make(map[string]*dataobj.TsdbQueryResponse) queryResponse := make([]*dataobj.TsdbQueryResponse, 0) for _, input := range inputs { for _, counter := range input.Counters { - items := strings.Split(counter, "/") + items := strings.SplitN(counter, "/", 2) metric := items[0] - var tags = make([]string, 0) - if len(items) > 1 && len(items[1]) > 0 { + tags := make([]string, 0) + if len(items) > 1 { tags = strings.Split(items[1], ",") + tagMap := dataobj.DictedTagstring(items[1]) + if counter, err = dataobj.GetCounter(metric, "", tagMap); err != nil { + logger.Warningf("get counter error: %+v", err) + continue + } } + + for _, endpoint := range input.Endpoints { + key := fmt.Sprintf("%s%s", endpoint, counter) + respMap[key] = &dataobj.TsdbQueryResponse{ + Start: input.Start, + End: input.End, + Endpoint: endpoint, + Counter: counter, + DsType: input.DsType, + Step: input.Step, + } + } + influxdbQuery := QueryData{ Start: input.Start, End: input.End, @@ -42,11 +61,13 @@ func (influxdb *InfluxdbDataSource) QueryData(inputs []dataobj.QueryData) []*dat Tags: tags, Step: input.Step, DsType: input.DsType, + GroupKey: []string{"*"}, } influxdbQuery.renderSelect() influxdbQuery.renderEndpoints() influxdbQuery.renderTags() influxdbQuery.renderTimeRange() + influxdbQuery.renderGroupBy() logger.Debugf("query influxql %s", influxdbQuery.RawQuery) query := client.NewQuery(influxdbQuery.RawQuery, c.Database, c.Precision) @@ -54,31 +75,34 @@ func (influxdb *InfluxdbDataSource) QueryData(inputs []dataobj.QueryData) []*dat for _, result := range response.Results { for _, series := range result.Series { - // fixme : influx client get series.Tags is nil endpoint := series.Tags["endpoint"] - delete(series.Tags, endpoint) - counter, err := dataobj.GetCounter(series.Name, "", series.Tags) + delete(series.Tags, "endpoint") + + influxCounter, err := dataobj.GetCounter(series.Name, "", series.Tags) if err != nil { logger.Warningf("get counter error: %+v", err) continue } - values := convertValues(series) - resp := &dataobj.TsdbQueryResponse{ - Start: influxdbQuery.Start, - End: influxdbQuery.End, - Endpoint: endpoint, - Counter: counter, - DsType: influxdbQuery.DsType, - Step: influxdbQuery.Step, - Values: values, + key := fmt.Sprintf("%s%s", endpoint, influxCounter) + if _, exists := respMap[key]; exists { + respMap[key].Values = convertValues(series) } - queryResponse = append(queryResponse, resp) + } } + } else { + if err != nil { + logger.Warningf("query data point on influxdb error %v.", err) + } else if response.Error() != nil { + logger.Warningf("query data point on influxdb, resp error: %v.", response.Error()) + } } } } + for _, resp := range respMap { + queryResponse = append(queryResponse, resp) + } return queryResponse } @@ -174,6 +198,12 @@ func (influxdb *InfluxdbDataSource) QueryMetrics(recv dataobj.EndpointsRecv) *da } } return resp + } else { + if err != nil { + logger.Warningf("query metrics on influxdb error %v.", err) + } else if response.Error() != nil { + logger.Warningf("query metrics on influxdb, resp error: %v.", response.Error()) + } } return nil } @@ -198,7 +228,7 @@ func (influxdb *InfluxdbDataSource) QueryTagPairs(recv dataobj.EndpointMetricRec Tagkv: make([]*dataobj.TagPair, 0), } // show tag keys - keys := showTagKeys(c, metric, influxdb.Section.Database) + keys := showTagKeys(c, metric, influxdb.Section.Database, recv.Endpoints) if len(keys) > 0 { // show tag values tagkvResp.Tagkv = showTagValues(c, keys, metric, influxdb.Section.Database) @@ -211,9 +241,17 @@ func (influxdb *InfluxdbDataSource) QueryTagPairs(recv dataobj.EndpointMetricRec // show tag keys on n9e from metric where ... // (exclude default endpoint tag) -func showTagKeys(c *InfluxClient, metric, database string) []string { +func showTagKeys(c *InfluxClient, metric, database string, endpoints []string) []string { keys := make([]string, 0) influxql := fmt.Sprintf("SHOW TAG KEYS ON \"%s\" FROM \"%s\"", database, metric) + if len(endpoints) > 0 { + endpointPart := "" + for _, endpoint := range endpoints { + endpointPart += fmt.Sprintf(" \"endpoint\"='%s' OR", endpoint) + } + endpointPart = endpointPart[:len(endpointPart)-len("OR")] + influxql = fmt.Sprintf("%s WHERE %s", influxql, endpointPart) + } query := client.NewQuery(influxql, c.Database, c.Precision) if response, err := c.Client.Query(query); err == nil && response.Error() == nil { for _, result := range response.Results { @@ -227,6 +265,12 @@ func showTagKeys(c *InfluxClient, metric, database string) []string { } } } + } else { + if err != nil { + logger.Warningf("query tag keys on influxdb error %v.", err) + } else if response.Error() != nil { + logger.Warningf("query tag keys on influxdb, resp error: %v.", response.Error()) + } } return keys } @@ -258,6 +302,12 @@ func showTagValues(c *InfluxClient, keys []string, metric, database string) []*d } } } + } else { + if err != nil { + logger.Warningf("query tag values on influxdb error %v.", err) + } else if response.Error() != nil { + logger.Warningf("query tag values on influxdb, resp error: %v.", response.Error()) + } } return tagkv } @@ -275,19 +325,23 @@ func (influxdb *InfluxdbDataSource) QueryIndexByClude(recvs []dataobj.CludeRecv) } resp := make([]dataobj.XcludeResp, 0) for _, recv := range recvs { - xcludeResp := dataobj.XcludeResp{ - Endpoints: recv.Endpoints, - Metric: recv.Metric, - Tags: make([]string, 0), - Step: -1, // fixme - DsType: "GAUGE", - } if len(recv.Endpoints) == 0 { - resp = append(resp, xcludeResp) continue } + xcludeRespMap := make(map[string]*dataobj.XcludeResp) + for _, endpoint := range recv.Endpoints { + key := fmt.Sprintf("endpoint=%s", endpoint) + xcludeRespMap[key] = &dataobj.XcludeResp{ + Endpoint: endpoint, + Metric: recv.Metric, + Tags: make([]string, 0), + Step: 10, + DsType: "GAUGE", + } + } + showSeries := ShowSeries{ Database: influxdb.Section.Database, Metric: recv.Metric, @@ -307,7 +361,7 @@ func (influxdb *InfluxdbDataSource) QueryIndexByClude(recvs []dataobj.CludeRecv) for _, result := range response.Results { for _, series := range result.Series { for _, valuePair := range series.Values { - + var curItem string // proc.port.listen,endpoint=localhost,port=22,service=sshd tagKey := valuePair[0].(string) @@ -315,29 +369,45 @@ func (influxdb *InfluxdbDataSource) QueryIndexByClude(recvs []dataobj.CludeRecv) items := strings.Split(tagKey, ",") newItems := make([]string, 0) for _, item := range items { - if item != recv.Metric && !strings.Contains(item, "endpoint") { + if strings.HasPrefix(item, "endpoint=") { + curItem = item + continue + } + if item != recv.Metric { newItems = append(newItems, item) } } + if curItem == "" { + continue + } + if len(newItems) > 0 { if tags, err := dataobj.SplitTagsString(strings.Join(newItems, ",")); err == nil { - xcludeResp.Tags = append(xcludeResp.Tags, dataobj.SortedTags(tags)) + xcludeRespMap[curItem].Tags = append(xcludeRespMap[curItem].Tags, dataobj.SortedTags(tags)) + xcludeRespMap[curItem].Step = 10 } } } } } + } else { + if err != nil { + logger.Warningf("query index by clude on influxdb error: %v.", err) + } else if response.Error() != nil { + logger.Warningf("query index by clude on influxdb, resp error: %v.", response.Error()) + } + } + for _, xcludeResp := range xcludeRespMap { + resp = append(resp, *xcludeResp) } - resp = append(resp, xcludeResp) } return resp } // show series from metric where ... -func (influxdb *InfluxdbDataSource) QueryIndexByFullTags(recvs []dataobj.IndexByFullTagsRecv) []dataobj. - IndexByFullTagsResp { +func (influxdb *InfluxdbDataSource) QueryIndexByFullTags(recvs []dataobj.IndexByFullTagsRecv) []dataobj.IndexByFullTagsResp { logger.Debugf("query IndexByFullTags , recv: %+v", recvs) c, err := NewInfluxdbClient(influxdb.Section) @@ -354,7 +424,7 @@ func (influxdb *InfluxdbDataSource) QueryIndexByFullTags(recvs []dataobj.IndexBy Endpoints: recv.Endpoints, Metric: recv.Metric, Tags: make([]string, 0), - Step: -1, // FIXME + Step: 10, DsType: "GAUGE", } @@ -403,6 +473,12 @@ func (influxdb *InfluxdbDataSource) QueryIndexByFullTags(recvs []dataobj.IndexBy } } } + } else { + if err != nil { + logger.Warningf("query index by full tags on influxdb error %v.", err) + } else if response.Error() != nil { + logger.Warningf("query index by full tags on influxdb error %v.", response.Error()) + } } resp = append(resp, fullTagResp) }