diff --git a/src/modules/judge/backend/query/query.go b/src/modules/judge/backend/query/query.go index c9862374..b789e8af 100644 --- a/src/modules/judge/backend/query/query.go +++ b/src/modules/judge/backend/query/query.go @@ -4,10 +4,12 @@ import ( "errors" "fmt" "math/rand" + "sort" "strings" "time" "github.com/didi/nightingale/src/common/dataobj" + "github.com/didi/nightingale/src/models" "github.com/didi/nightingale/src/modules/judge/cache" "github.com/didi/nightingale/src/toolkits/stats" "github.com/didi/nightingale/src/toolkits/str" @@ -23,17 +25,18 @@ var ( // 执行Query操作 // 默认不重试, 如果要做重试, 在这里完成 -func Query(reqs []*dataobj.QueryData, sid int64, expFunc string) []*dataobj.TsdbQueryResponse { +func Query(reqs []*dataobj.QueryData, stra *models.Stra, expFunc string) []*dataobj.TsdbQueryResponse { stats.Counter.Set("query.data", 1) var resp *dataobj.QueryDataResp - var respData []*dataobj.TsdbQueryResponse var err error - respData, reqs = QueryFromMem(reqs, sid) - if len(reqs) > 0 { + filterMap := make(map[string]struct{}) + + respData, newReqs := QueryFromMem(reqs, stra) + if len(newReqs) > 0 { stats.Counter.Set("query.data.by.transfer", 1) for i := 0; i < 3; i++ { - err = TransferConnPools.Call("", "Transfer.Query", reqs, &resp) + err = TransferConnPools.Call("", "Transfer.Query", newReqs, &resp) if err == nil { break } @@ -41,16 +44,73 @@ func Query(reqs []*dataobj.QueryData, sid int64, expFunc string) []*dataobj.Tsdb } if err != nil { stats.Counter.Set("query.data.transfer.err", 1) - logger.Warning("get data err:%v msg:%+v, query data from mem", err, resp) + logger.Warningf("get data err:%v", err) } else { - respData = append(respData, resp.Data...) + for i := 0; i < len(resp.Data); i++ { + var values dataobj.RRDValues + count := len(resp.Data[i].Values) + //裁剪掉多余的点 + for j := count - 1; j > 0; j-- { + if resp.Data[i].Values[count-1].Timestamp-resp.Data[i].Values[j].Timestamp > int64(stra.AlertDur) { + break + } + values = append(values, resp.Data[i].Values[j]) + } + sort.Sort(values) + + resp.Data[i].Values = values + respData = append(respData, resp.Data[i]) + key := resp.Data[i].Endpoint + "/" + resp.Data[i].Nid + "/" + resp.Data[i].Counter + filterMap[key] = struct{}{} + } + } + } + + //补全查询数据丢失的曲线 + for _, req := range newReqs { + if len(req.Endpoints) > 0 { + for _, endpoint := range req.Endpoints { + for _, counter := range req.Counters { + key := endpoint + "//" + counter + if _, exists := filterMap[key]; exists { + continue + } + data := &dataobj.TsdbQueryResponse{ + Start: req.Start, + End: req.End, + Endpoint: endpoint, + Counter: counter, + Step: req.Step, + } + respData = append(respData, data) + } + } + } + + if len(req.Nids) > 0 { + for _, nid := range req.Nids { + for _, counter := range req.Counters { + key := "/" + nid + "/" + counter + if _, exists := filterMap[key]; exists { + continue + } + data := &dataobj.TsdbQueryResponse{ + Start: req.Start, + End: req.End, + Nid: nid, + Counter: counter, + Step: req.Step, + } + respData = append(respData, data) + } + } } } return respData } -func QueryFromMem(reqs []*dataobj.QueryData, sid int64) ([]*dataobj.TsdbQueryResponse, []*dataobj.QueryData) { +func QueryFromMem(reqs []*dataobj.QueryData, stra *models.Stra) ([]*dataobj.TsdbQueryResponse, []*dataobj.QueryData) { stats.Counter.Set("query.data.by.mem", 1) var resps []*dataobj.TsdbQueryResponse @@ -79,7 +139,7 @@ func QueryFromMem(reqs []*dataobj.QueryData, sid int64) ([]*dataobj.TsdbQueryRes Nid: nid, Metric: metric, TagsMap: tagsMap, - Sid: sid, + Sid: stra.Id, } pk := item.MD5() @@ -88,7 +148,7 @@ func QueryFromMem(reqs []*dataobj.QueryData, sid int64) ([]*dataobj.TsdbQueryRes historyData := linkedList.QueryDataByTS(req.Start, req.End) resp.Values = dataobj.HistoryData2RRDData(historyData) } - if len(resp.Values) > 0 { + if len(resp.Values) > 0 && resp.Values[len(resp.Values)-1].Timestamp-resp.Values[0].Timestamp >= int64(stra.AlertDur) { resps = append(resps, resp) } else { newReq.Nids = append(newReq.Nids, nid) @@ -112,7 +172,7 @@ func QueryFromMem(reqs []*dataobj.QueryData, sid int64) ([]*dataobj.TsdbQueryRes Endpoint: endpoint, Metric: metric, TagsMap: tagsMap, - Sid: sid, + Sid: stra.Id, } pk := item.MD5() @@ -121,7 +181,7 @@ func QueryFromMem(reqs []*dataobj.QueryData, sid int64) ([]*dataobj.TsdbQueryRes historyData := linkedList.QueryDataByTS(req.Start, req.End) resp.Values = dataobj.HistoryData2RRDData(historyData) } - if len(resp.Values) > 0 { + if len(resp.Values) > 0 && resp.Values[len(resp.Values)-1].Timestamp-resp.Values[0].Timestamp >= int64(stra.AlertDur) { resps = append(resps, resp) } else { newReq.Endpoints = append(newReq.Endpoints, endpoint) diff --git a/src/modules/judge/judge/judge.go b/src/modules/judge/judge/judge.go index d0901ae6..dfb56fd9 100644 --- a/src/modules/judge/judge/judge.go +++ b/src/modules/judge/judge/judge.go @@ -86,7 +86,7 @@ func ToJudge(historyMap *cache.JudgeItemMap, key string, val *dataobj.JudgeItem, } } else { //与条件 for _, expr := range stra.Exprs { - respData, err := GetData(stra, expr, val, now, true) + respData, err := GetData(stra, expr, val, now) if err != nil { logger.Errorf("stra:%+v get query data err:%v", stra, err) return @@ -130,7 +130,7 @@ func ToJudge(historyMap *cache.JudgeItemMap, key string, val *dataobj.JudgeItem, Hashid: getHashId(stra.Id, val), } - sendEventIfNeed(historyData, statusArr, event, stra) + sendEventIfNeed(statusArr, event, stra) } func Judge(stra *models.Stra, exp models.Exp, historyData []*dataobj.HistoryData, firstItem *dataobj.JudgeItem, now int64) (history dataobj.History, info string, lastValue string, status bool) { @@ -188,7 +188,7 @@ func judgeItemWithStrategy(stra *models.Stra, historyData []*dataobj.HistoryData stra.AlertDur = 7 * firstItem.Step } - respItems, err := GetData(stra, exp, firstItem, now-int64(exp.Params[0]), true) + respItems, err := GetData(stra, exp, firstItem, now-int64(exp.Params[0])) if err != nil { logger.Errorf("stra:%v %+v get compare data err:%v", stra.Id, exp, err) return @@ -218,45 +218,30 @@ func judgeItemWithStrategy(stra *models.Stra, historyData []*dataobj.HistoryData return fn.Compute(historyData) } -func GetData(stra *models.Stra, exp models.Exp, firstItem *dataobj.JudgeItem, now int64, sameTag bool) ([]*dataobj.TsdbQueryResponse, error) { +func GetData(stra *models.Stra, exp models.Exp, firstItem *dataobj.JudgeItem, now int64) ([]*dataobj.TsdbQueryResponse, error) { var reqs []*dataobj.QueryData var respData []*dataobj.TsdbQueryResponse var err error - if sameTag { //与条件要求是相同tag的场景,不需要查询索引 - if firstItem.Tags != "" && len(firstItem.TagsMap) == 0 { - firstItem.TagsMap = str.DictedTagstring(firstItem.Tags) - } - //+1 防止由于查询不到最新点,导致点数不够 - start := now - int64(stra.AlertDur) - int64(firstItem.Step) + 1 + if firstItem.Tags != "" && len(firstItem.TagsMap) == 0 { + firstItem.TagsMap = str.DictedTagstring(firstItem.Tags) + } - queryParam, err := query.NewQueryRequest(firstItem.Nid, firstItem.Endpoint, exp.Metric, firstItem.TagsMap, firstItem.Step, start, now) - if err != nil { - return respData, err - } + //多查一些数据,防止由于查询不到最新点,导致点数不够 + start := now - int64(stra.AlertDur) - int64(firstItem.Step) - 60 - reqs = append(reqs, queryParam) - } /*else if firstItem != nil { //点驱动告警策略的场景 - var nids, endpoints []string - if firstItem.Nid != "" { - nids = []string{firstItem.Nid} - } else if firstItem.Endpoint != "" { - endpoints = []string{firstItem.Endpoint} - } - reqs = GetReqs(stra, exp.Metric, nids, endpoints, now) - //} else { //nodata的场景 - // reqs = GetReqs(stra, exp.Metric, stra.Nids, stra.Endpoints, now) - }*/ + queryParam, err := query.NewQueryRequest(firstItem.Nid, firstItem.Endpoint, exp.Metric, firstItem.TagsMap, firstItem.Step, start, now) + if err != nil { + return respData, err + } + + reqs = append(reqs, queryParam) if len(reqs) == 0 { return respData, err } - respData = query.Query(reqs, stra.Id, exp.Func) + respData = query.Query(reqs, stra, exp.Func) - if len(respData) < 1 { - stats.Counter.Set("get.data.null", 1) - err = fmt.Errorf("get query data is null") - } return respData, err } @@ -292,46 +277,31 @@ func GetReqs(stra *models.Stra, metric string, nids, endpoints []string, now int lostSeries := []cache.Series{} for _, index := range indexsData { - if index.Step == 0 { - //没有查到索引的 endpoint+metric 也要记录,给nodata处理 + if len(index.Tags) == 0 { + hash := getHash(index, "") s := cache.Series{ Nid: index.Nid, Endpoint: index.Endpoint, Metric: index.Metric, Tag: "", - Step: 10, - Dstype: "GAUGE", + Step: index.Step, + Dstype: index.Dstype, TS: now, } - lostSeries = append(lostSeries, s) + cache.SeriesMap.Set(stra.Id, hash, s) } else { - if len(index.Tags) == 0 { - hash := getHash(index, "") + for _, tag := range index.Tags { + hash := getHash(index, tag) s := cache.Series{ Nid: index.Nid, Endpoint: index.Endpoint, Metric: index.Metric, - Tag: "", + Tag: tag, Step: index.Step, Dstype: index.Dstype, TS: now, } cache.SeriesMap.Set(stra.Id, hash, s) - } else { - for _, tag := range index.Tags { - hash := getHash(index, tag) - s := cache.Series{ - Nid: index.Nid, - Endpoint: index.Endpoint, - Metric: index.Metric, - Tag: tag, - Step: index.Step, - Dstype: index.Dstype, - TS: now, - } - cache.SeriesMap.Set(stra.Id, hash, s) - } - } } } @@ -397,7 +367,7 @@ func GetReqs(stra *models.Stra, metric string, nids, endpoints []string, now int return reqs } -func sendEventIfNeed(historyData []*dataobj.HistoryData, status []bool, event *dataobj.Event, stra *models.Stra) { +func sendEventIfNeed(status []bool, event *dataobj.Event, stra *models.Stra) { isTriggered := true for _, s := range status { isTriggered = isTriggered && s diff --git a/src/modules/judge/judge/nodata.go b/src/modules/judge/judge/nodata.go index 48066f48..9c0ece89 100644 --- a/src/modules/judge/judge/nodata.go +++ b/src/modules/judge/judge/nodata.go @@ -46,46 +46,27 @@ func nodataJudge() { logger.Debugf("stra:%+v endpoints or nids is null", stra) continue } - - now := time.Now().Unix() - respData, err := GetData(stra, stra.Exprs[0], nil, now, false) - if err != nil { - logger.Errorf("stra:%+v get query data err:%v", stra, err) + if len(stra.Exprs) == 0 { + logger.Debugf("stra:%+v exp or nids is null", stra) continue } - for _, data := range respData { - var metric, tag string - // 兼容格式disk.bytes.free/mount=/data/docker/overlay2/xxx/merged - arr := strings.SplitN(data.Counter, "/", 2) - if len(arr) == 2 { - metric = arr[0] - tag = arr[1] - } else { - metric = data.Counter - } - - if data.Endpoint == "" && data.Nid == "" { - continue - } - - judgeItem := &dataobj.JudgeItem{ - Nid: data.Nid, - Endpoint: data.Endpoint, - Metric: metric, - Tags: tag, - TagsMap: dataobj.DictedTagstring(tag), - DsType: data.DsType, - Step: data.Step, - } + now := time.Now().Unix() + reqs := GetReqs(stra, stra.Exprs[0].Metric, stra.Nids, stra.Endpoints, now) + if len(reqs) == 0 { + logger.Errorf("stra:%+v get query data err:req is null", stra) + continue + } + items := getJudgeItems(reqs) + for _, item := range items { nodataJob.Acquire() - go AsyncJudge(nodataJob, stra, stra.Exprs, dataobj.RRDData2HistoryData(data.Values), judgeItem, now) + go AsyncJudge(nodataJob, stra, stra.Exprs, item, now) } } } -func AsyncJudge(sema *semaphore.Semaphore, stra *models.Stra, exps []models.Exp, historyData []*dataobj.HistoryData, firstItem *dataobj.JudgeItem, now int64) { +func AsyncJudge(sema *semaphore.Semaphore, stra *models.Stra, exps []models.Exp, firstItem *dataobj.JudgeItem, now int64) { defer sema.Release() historyArr := []dataobj.History{} @@ -94,7 +75,7 @@ func AsyncJudge(sema *semaphore.Semaphore, stra *models.Stra, exps []models.Exp, value := "" for _, expr := range exps { - respData, err := GetData(stra, expr, firstItem, now, true) + respData, err := GetData(stra, expr, firstItem, now) if err != nil { logger.Errorf("stra:%+v get query data err:%v", stra, err) return @@ -136,5 +117,50 @@ func AsyncJudge(sema *semaphore.Semaphore, stra *models.Stra, exps []models.Exp, Hashid: getHashId(stra.Id, firstItem), } - sendEventIfNeed(historyData, statusArr, event, stra) + sendEventIfNeed(statusArr, event, stra) +} + +func getJudgeItems(reqs []*dataobj.QueryData) []*dataobj.JudgeItem { + var items []*dataobj.JudgeItem + for _, req := range reqs { + for _, counter := range req.Counters { + var metric, tag string + // 兼容格式disk.bytes.free/mount=/data/docker/overlay2/xxx/merged + arr := strings.SplitN(counter, "/", 2) + if len(arr) == 2 { + metric = arr[0] + tag = arr[1] + } else { + metric = counter + } + + if len(req.Nids) != 0 { + for _, nid := range req.Nids { + judgeItem := &dataobj.JudgeItem{ + Nid: nid, + Endpoint: "", + Metric: metric, + Tags: tag, + TagsMap: dataobj.DictedTagstring(tag), + DsType: req.DsType, + Step: req.Step, + } + items = append(items, judgeItem) + } + } else { + for _, endpoint := range req.Endpoints { + judgeItem := &dataobj.JudgeItem{ + Endpoint: endpoint, + Metric: metric, + Tags: tag, + TagsMap: dataobj.DictedTagstring(tag), + DsType: req.DsType, + Step: req.Step, + } + items = append(items, judgeItem) + } + } + } + } + return items }