influxdb query sql bug fix (#313)

* bug fix influxdb query data, query index

Co-authored-by: litianshun <litianshun@meicai.cn>
This commit is contained in:
litianshun 2020-09-28 11:10:16 +08:00 committed by GitHub
parent ceb86a2d5f
commit 68b213b737
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 130 additions and 46 deletions

View File

@ -84,11 +84,11 @@ type CludeRecv struct {
}
type XcludeResp struct {
Endpoints []string `json:"endpoints"`
Metric string `json:"metric"`
Tags []string `json:"tags"`
Step int `json:"step"`
DsType string `json:"dstype"`
Endpoint string `json:"endpoint"`
Metric string `json:"metric"`
Tags []string `json:"tags"`
Step int `json:"step"`
DsType string `json:"dstype"`
}
type IndexByFullTagsRecv struct {

View File

@ -36,43 +36,51 @@ func (query *ShowSeries) renderEndpoints() {
}
endpointPart = endpointPart[:len(endpointPart)-len("OR")]
endpointPart += ")"
query.RawQuery = fmt.Sprintf("\"%s\" WHERE \"%s\"", query.RawQuery, endpointPart)
query.RawQuery = fmt.Sprintf("%s WHERE %s", query.RawQuery, endpointPart)
}
}
func (query *ShowSeries) renderInclude() {
if len(query.Include) > 0 {
// include
if len(query.Include) == 1 && query.Include[0] == nil {
return
}
includePart := "("
for _, include := range query.Include {
for _, value := range include.Values {
includePart += fmt.Sprintf(" \"%s\"='%s' OR", include.Key, value)
}
}
includePart = includePart[:len(includePart)-len("OR")]
includePart = includePart[:len(includePart)-len("AND")]
includePart += ")"
if !strings.Contains(query.RawQuery, "WHERE") {
query.RawQuery += " WHERE"
query.RawQuery = fmt.Sprintf(" %s WHERE %s", query.RawQuery, includePart)
} else {
query.RawQuery = fmt.Sprintf(" %s AND %s", query.RawQuery, includePart)
}
query.RawQuery = fmt.Sprintf(" %s AND %s", query.RawQuery, includePart)
}
}
func (query *ShowSeries) renderExclude() {
if len(query.Exclude) > 0 {
// exclude
if len(query.Exclude) == 1 && query.Exclude[0] == nil {
return
}
excludePart := "("
for _, exclude := range query.Exclude {
for _, value := range exclude.Values {
excludePart += fmt.Sprintf(" \"%s\"='%s' OR", exclude.Key, value)
excludePart += fmt.Sprintf(" \"%s\"!='%s' AND", exclude.Key, value)
}
}
excludePart = excludePart[:len(excludePart)-len("OR")]
excludePart = excludePart[:len(excludePart)-len("AND")]
excludePart += ")"
if !strings.Contains(query.RawQuery, "WHERE") {
query.RawQuery += " WHERE"
query.RawQuery = fmt.Sprintf(" %s WHERE %s", query.RawQuery, excludePart)
} else {
query.RawQuery = fmt.Sprintf(" %s AND %s", query.RawQuery, excludePart)
}
query.RawQuery = fmt.Sprintf(" %s AND %s", query.RawQuery, excludePart)
}
}

View File

@ -25,15 +25,34 @@ func (influxdb *InfluxdbDataSource) QueryData(inputs []dataobj.QueryData) []*dat
return nil
}
respMap := make(map[string]*dataobj.TsdbQueryResponse)
queryResponse := make([]*dataobj.TsdbQueryResponse, 0)
for _, input := range inputs {
for _, counter := range input.Counters {
items := strings.Split(counter, "/")
items := strings.SplitN(counter, "/", 2)
metric := items[0]
var tags = make([]string, 0)
if len(items) > 1 && len(items[1]) > 0 {
tags := make([]string, 0)
if len(items) > 1 {
tags = strings.Split(items[1], ",")
tagMap := dataobj.DictedTagstring(items[1])
if counter, err = dataobj.GetCounter(metric, "", tagMap); err != nil {
logger.Warningf("get counter error: %+v", err)
continue
}
}
for _, endpoint := range input.Endpoints {
key := fmt.Sprintf("%s%s", endpoint, counter)
respMap[key] = &dataobj.TsdbQueryResponse{
Start: input.Start,
End: input.End,
Endpoint: endpoint,
Counter: counter,
DsType: input.DsType,
Step: input.Step,
}
}
influxdbQuery := QueryData{
Start: input.Start,
End: input.End,
@ -42,11 +61,13 @@ func (influxdb *InfluxdbDataSource) QueryData(inputs []dataobj.QueryData) []*dat
Tags: tags,
Step: input.Step,
DsType: input.DsType,
GroupKey: []string{"*"},
}
influxdbQuery.renderSelect()
influxdbQuery.renderEndpoints()
influxdbQuery.renderTags()
influxdbQuery.renderTimeRange()
influxdbQuery.renderGroupBy()
logger.Debugf("query influxql %s", influxdbQuery.RawQuery)
query := client.NewQuery(influxdbQuery.RawQuery, c.Database, c.Precision)
@ -54,31 +75,34 @@ func (influxdb *InfluxdbDataSource) QueryData(inputs []dataobj.QueryData) []*dat
for _, result := range response.Results {
for _, series := range result.Series {
// fixme : influx client get series.Tags is nil
endpoint := series.Tags["endpoint"]
delete(series.Tags, endpoint)
counter, err := dataobj.GetCounter(series.Name, "", series.Tags)
delete(series.Tags, "endpoint")
influxCounter, err := dataobj.GetCounter(series.Name, "", series.Tags)
if err != nil {
logger.Warningf("get counter error: %+v", err)
continue
}
values := convertValues(series)
resp := &dataobj.TsdbQueryResponse{
Start: influxdbQuery.Start,
End: influxdbQuery.End,
Endpoint: endpoint,
Counter: counter,
DsType: influxdbQuery.DsType,
Step: influxdbQuery.Step,
Values: values,
key := fmt.Sprintf("%s%s", endpoint, influxCounter)
if _, exists := respMap[key]; exists {
respMap[key].Values = convertValues(series)
}
queryResponse = append(queryResponse, resp)
}
}
} else {
if err != nil {
logger.Warningf("query data point on influxdb error %v.", err)
} else if response.Error() != nil {
logger.Warningf("query data point on influxdb, resp error: %v.", response.Error())
}
}
}
}
for _, resp := range respMap {
queryResponse = append(queryResponse, resp)
}
return queryResponse
}
@ -174,6 +198,12 @@ func (influxdb *InfluxdbDataSource) QueryMetrics(recv dataobj.EndpointsRecv) *da
}
}
return resp
} else {
if err != nil {
logger.Warningf("query metrics on influxdb error %v.", err)
} else if response.Error() != nil {
logger.Warningf("query metrics on influxdb, resp error: %v.", response.Error())
}
}
return nil
}
@ -198,7 +228,7 @@ func (influxdb *InfluxdbDataSource) QueryTagPairs(recv dataobj.EndpointMetricRec
Tagkv: make([]*dataobj.TagPair, 0),
}
// show tag keys
keys := showTagKeys(c, metric, influxdb.Section.Database)
keys := showTagKeys(c, metric, influxdb.Section.Database, recv.Endpoints)
if len(keys) > 0 {
// show tag values
tagkvResp.Tagkv = showTagValues(c, keys, metric, influxdb.Section.Database)
@ -211,9 +241,17 @@ func (influxdb *InfluxdbDataSource) QueryTagPairs(recv dataobj.EndpointMetricRec
// show tag keys on n9e from metric where ...
// (exclude default endpoint tag)
func showTagKeys(c *InfluxClient, metric, database string) []string {
func showTagKeys(c *InfluxClient, metric, database string, endpoints []string) []string {
keys := make([]string, 0)
influxql := fmt.Sprintf("SHOW TAG KEYS ON \"%s\" FROM \"%s\"", database, metric)
if len(endpoints) > 0 {
endpointPart := ""
for _, endpoint := range endpoints {
endpointPart += fmt.Sprintf(" \"endpoint\"='%s' OR", endpoint)
}
endpointPart = endpointPart[:len(endpointPart)-len("OR")]
influxql = fmt.Sprintf("%s WHERE %s", influxql, endpointPart)
}
query := client.NewQuery(influxql, c.Database, c.Precision)
if response, err := c.Client.Query(query); err == nil && response.Error() == nil {
for _, result := range response.Results {
@ -227,6 +265,12 @@ func showTagKeys(c *InfluxClient, metric, database string) []string {
}
}
}
} else {
if err != nil {
logger.Warningf("query tag keys on influxdb error %v.", err)
} else if response.Error() != nil {
logger.Warningf("query tag keys on influxdb, resp error: %v.", response.Error())
}
}
return keys
}
@ -258,6 +302,12 @@ func showTagValues(c *InfluxClient, keys []string, metric, database string) []*d
}
}
}
} else {
if err != nil {
logger.Warningf("query tag values on influxdb error %v.", err)
} else if response.Error() != nil {
logger.Warningf("query tag values on influxdb, resp error: %v.", response.Error())
}
}
return tagkv
}
@ -275,19 +325,23 @@ func (influxdb *InfluxdbDataSource) QueryIndexByClude(recvs []dataobj.CludeRecv)
}
resp := make([]dataobj.XcludeResp, 0)
for _, recv := range recvs {
xcludeResp := dataobj.XcludeResp{
Endpoints: recv.Endpoints,
Metric: recv.Metric,
Tags: make([]string, 0),
Step: -1, // fixme
DsType: "GAUGE",
}
if len(recv.Endpoints) == 0 {
resp = append(resp, xcludeResp)
continue
}
xcludeRespMap := make(map[string]*dataobj.XcludeResp)
for _, endpoint := range recv.Endpoints {
key := fmt.Sprintf("endpoint=%s", endpoint)
xcludeRespMap[key] = &dataobj.XcludeResp{
Endpoint: endpoint,
Metric: recv.Metric,
Tags: make([]string, 0),
Step: 10,
DsType: "GAUGE",
}
}
showSeries := ShowSeries{
Database: influxdb.Section.Database,
Metric: recv.Metric,
@ -307,7 +361,7 @@ func (influxdb *InfluxdbDataSource) QueryIndexByClude(recvs []dataobj.CludeRecv)
for _, result := range response.Results {
for _, series := range result.Series {
for _, valuePair := range series.Values {
var curItem string
// proc.port.listen,endpoint=localhost,port=22,service=sshd
tagKey := valuePair[0].(string)
@ -315,29 +369,45 @@ func (influxdb *InfluxdbDataSource) QueryIndexByClude(recvs []dataobj.CludeRecv)
items := strings.Split(tagKey, ",")
newItems := make([]string, 0)
for _, item := range items {
if item != recv.Metric && !strings.Contains(item, "endpoint") {
if strings.HasPrefix(item, "endpoint=") {
curItem = item
continue
}
if item != recv.Metric {
newItems = append(newItems, item)
}
}
if curItem == "" {
continue
}
if len(newItems) > 0 {
if tags, err := dataobj.SplitTagsString(strings.Join(newItems, ",")); err == nil {
xcludeResp.Tags = append(xcludeResp.Tags, dataobj.SortedTags(tags))
xcludeRespMap[curItem].Tags = append(xcludeRespMap[curItem].Tags, dataobj.SortedTags(tags))
xcludeRespMap[curItem].Step = 10
}
}
}
}
}
} else {
if err != nil {
logger.Warningf("query index by clude on influxdb error: %v.", err)
} else if response.Error() != nil {
logger.Warningf("query index by clude on influxdb, resp error: %v.", response.Error())
}
}
for _, xcludeResp := range xcludeRespMap {
resp = append(resp, *xcludeResp)
}
resp = append(resp, xcludeResp)
}
return resp
}
// show series from metric where ...
func (influxdb *InfluxdbDataSource) QueryIndexByFullTags(recvs []dataobj.IndexByFullTagsRecv) []dataobj.
IndexByFullTagsResp {
func (influxdb *InfluxdbDataSource) QueryIndexByFullTags(recvs []dataobj.IndexByFullTagsRecv) []dataobj.IndexByFullTagsResp {
logger.Debugf("query IndexByFullTags , recv: %+v", recvs)
c, err := NewInfluxdbClient(influxdb.Section)
@ -354,7 +424,7 @@ func (influxdb *InfluxdbDataSource) QueryIndexByFullTags(recvs []dataobj.IndexBy
Endpoints: recv.Endpoints,
Metric: recv.Metric,
Tags: make([]string, 0),
Step: -1, // FIXME
Step: 10,
DsType: "GAUGE",
}
@ -403,6 +473,12 @@ func (influxdb *InfluxdbDataSource) QueryIndexByFullTags(recvs []dataobj.IndexBy
}
}
}
} else {
if err != nil {
logger.Warningf("query index by full tags on influxdb error %v.", err)
} else if response.Error() != nil {
logger.Warningf("query index by full tags on influxdb error %v.", response.Error())
}
}
resp = append(resp, fullTagResp)
}