judge refactor

This commit is contained in:
710leo 2020-12-03 11:50:45 +08:00
parent 94eb306692
commit a8f7f6a04e
3 changed files with 156 additions and 100 deletions

View File

@ -4,10 +4,12 @@ import (
"errors"
"fmt"
"math/rand"
"sort"
"strings"
"time"
"github.com/didi/nightingale/src/common/dataobj"
"github.com/didi/nightingale/src/models"
"github.com/didi/nightingale/src/modules/judge/cache"
"github.com/didi/nightingale/src/toolkits/stats"
"github.com/didi/nightingale/src/toolkits/str"
@ -23,17 +25,18 @@ var (
// 执行Query操作
// 默认不重试, 如果要做重试, 在这里完成
func Query(reqs []*dataobj.QueryData, sid int64, expFunc string) []*dataobj.TsdbQueryResponse {
func Query(reqs []*dataobj.QueryData, stra *models.Stra, expFunc string) []*dataobj.TsdbQueryResponse {
stats.Counter.Set("query.data", 1)
var resp *dataobj.QueryDataResp
var respData []*dataobj.TsdbQueryResponse
var err error
respData, reqs = QueryFromMem(reqs, sid)
if len(reqs) > 0 {
filterMap := make(map[string]struct{})
respData, newReqs := QueryFromMem(reqs, stra)
if len(newReqs) > 0 {
stats.Counter.Set("query.data.by.transfer", 1)
for i := 0; i < 3; i++ {
err = TransferConnPools.Call("", "Transfer.Query", reqs, &resp)
err = TransferConnPools.Call("", "Transfer.Query", newReqs, &resp)
if err == nil {
break
}
@ -41,16 +44,73 @@ func Query(reqs []*dataobj.QueryData, sid int64, expFunc string) []*dataobj.Tsdb
}
if err != nil {
stats.Counter.Set("query.data.transfer.err", 1)
logger.Warning("get data err:%v msg:%+v, query data from mem", err, resp)
logger.Warningf("get data err:%v", err)
} else {
respData = append(respData, resp.Data...)
for i := 0; i < len(resp.Data); i++ {
var values dataobj.RRDValues
count := len(resp.Data[i].Values)
//裁剪掉多余的点
for j := count - 1; j > 0; j-- {
if resp.Data[i].Values[count-1].Timestamp-resp.Data[i].Values[j].Timestamp > int64(stra.AlertDur) {
break
}
values = append(values, resp.Data[i].Values[j])
}
sort.Sort(values)
resp.Data[i].Values = values
respData = append(respData, resp.Data[i])
key := resp.Data[i].Endpoint + "/" + resp.Data[i].Nid + "/" + resp.Data[i].Counter
filterMap[key] = struct{}{}
}
}
}
//补全查询数据丢失的曲线
for _, req := range newReqs {
if len(req.Endpoints) > 0 {
for _, endpoint := range req.Endpoints {
for _, counter := range req.Counters {
key := endpoint + "//" + counter
if _, exists := filterMap[key]; exists {
continue
}
data := &dataobj.TsdbQueryResponse{
Start: req.Start,
End: req.End,
Endpoint: endpoint,
Counter: counter,
Step: req.Step,
}
respData = append(respData, data)
}
}
}
if len(req.Nids) > 0 {
for _, nid := range req.Nids {
for _, counter := range req.Counters {
key := "/" + nid + "/" + counter
if _, exists := filterMap[key]; exists {
continue
}
data := &dataobj.TsdbQueryResponse{
Start: req.Start,
End: req.End,
Nid: nid,
Counter: counter,
Step: req.Step,
}
respData = append(respData, data)
}
}
}
}
return respData
}
func QueryFromMem(reqs []*dataobj.QueryData, sid int64) ([]*dataobj.TsdbQueryResponse, []*dataobj.QueryData) {
func QueryFromMem(reqs []*dataobj.QueryData, stra *models.Stra) ([]*dataobj.TsdbQueryResponse, []*dataobj.QueryData) {
stats.Counter.Set("query.data.by.mem", 1)
var resps []*dataobj.TsdbQueryResponse
@ -79,7 +139,7 @@ func QueryFromMem(reqs []*dataobj.QueryData, sid int64) ([]*dataobj.TsdbQueryRes
Nid: nid,
Metric: metric,
TagsMap: tagsMap,
Sid: sid,
Sid: stra.Id,
}
pk := item.MD5()
@ -88,7 +148,7 @@ func QueryFromMem(reqs []*dataobj.QueryData, sid int64) ([]*dataobj.TsdbQueryRes
historyData := linkedList.QueryDataByTS(req.Start, req.End)
resp.Values = dataobj.HistoryData2RRDData(historyData)
}
if len(resp.Values) > 0 {
if len(resp.Values) > 0 && resp.Values[len(resp.Values)-1].Timestamp-resp.Values[0].Timestamp >= int64(stra.AlertDur) {
resps = append(resps, resp)
} else {
newReq.Nids = append(newReq.Nids, nid)
@ -112,7 +172,7 @@ func QueryFromMem(reqs []*dataobj.QueryData, sid int64) ([]*dataobj.TsdbQueryRes
Endpoint: endpoint,
Metric: metric,
TagsMap: tagsMap,
Sid: sid,
Sid: stra.Id,
}
pk := item.MD5()
@ -121,7 +181,7 @@ func QueryFromMem(reqs []*dataobj.QueryData, sid int64) ([]*dataobj.TsdbQueryRes
historyData := linkedList.QueryDataByTS(req.Start, req.End)
resp.Values = dataobj.HistoryData2RRDData(historyData)
}
if len(resp.Values) > 0 {
if len(resp.Values) > 0 && resp.Values[len(resp.Values)-1].Timestamp-resp.Values[0].Timestamp >= int64(stra.AlertDur) {
resps = append(resps, resp)
} else {
newReq.Endpoints = append(newReq.Endpoints, endpoint)

View File

@ -86,7 +86,7 @@ func ToJudge(historyMap *cache.JudgeItemMap, key string, val *dataobj.JudgeItem,
}
} else { //与条件
for _, expr := range stra.Exprs {
respData, err := GetData(stra, expr, val, now, true)
respData, err := GetData(stra, expr, val, now)
if err != nil {
logger.Errorf("stra:%+v get query data err:%v", stra, err)
return
@ -130,7 +130,7 @@ func ToJudge(historyMap *cache.JudgeItemMap, key string, val *dataobj.JudgeItem,
Hashid: getHashId(stra.Id, val),
}
sendEventIfNeed(historyData, statusArr, event, stra)
sendEventIfNeed(statusArr, event, stra)
}
func Judge(stra *models.Stra, exp models.Exp, historyData []*dataobj.HistoryData, firstItem *dataobj.JudgeItem, now int64) (history dataobj.History, info string, lastValue string, status bool) {
@ -188,7 +188,7 @@ func judgeItemWithStrategy(stra *models.Stra, historyData []*dataobj.HistoryData
stra.AlertDur = 7 * firstItem.Step
}
respItems, err := GetData(stra, exp, firstItem, now-int64(exp.Params[0]), true)
respItems, err := GetData(stra, exp, firstItem, now-int64(exp.Params[0]))
if err != nil {
logger.Errorf("stra:%v %+v get compare data err:%v", stra.Id, exp, err)
return
@ -218,45 +218,30 @@ func judgeItemWithStrategy(stra *models.Stra, historyData []*dataobj.HistoryData
return fn.Compute(historyData)
}
func GetData(stra *models.Stra, exp models.Exp, firstItem *dataobj.JudgeItem, now int64, sameTag bool) ([]*dataobj.TsdbQueryResponse, error) {
func GetData(stra *models.Stra, exp models.Exp, firstItem *dataobj.JudgeItem, now int64) ([]*dataobj.TsdbQueryResponse, error) {
var reqs []*dataobj.QueryData
var respData []*dataobj.TsdbQueryResponse
var err error
if sameTag { //与条件要求是相同tag的场景不需要查询索引
if firstItem.Tags != "" && len(firstItem.TagsMap) == 0 {
firstItem.TagsMap = str.DictedTagstring(firstItem.Tags)
}
//+1 防止由于查询不到最新点,导致点数不够
start := now - int64(stra.AlertDur) - int64(firstItem.Step) + 1
if firstItem.Tags != "" && len(firstItem.TagsMap) == 0 {
firstItem.TagsMap = str.DictedTagstring(firstItem.Tags)
}
queryParam, err := query.NewQueryRequest(firstItem.Nid, firstItem.Endpoint, exp.Metric, firstItem.TagsMap, firstItem.Step, start, now)
if err != nil {
return respData, err
}
//多查一些数据,防止由于查询不到最新点,导致点数不够
start := now - int64(stra.AlertDur) - int64(firstItem.Step) - 60
reqs = append(reqs, queryParam)
} /*else if firstItem != nil { //点驱动告警策略的场景
var nids, endpoints []string
if firstItem.Nid != "" {
nids = []string{firstItem.Nid}
} else if firstItem.Endpoint != "" {
endpoints = []string{firstItem.Endpoint}
}
reqs = GetReqs(stra, exp.Metric, nids, endpoints, now)
//} else { //nodata的场景
// reqs = GetReqs(stra, exp.Metric, stra.Nids, stra.Endpoints, now)
}*/
queryParam, err := query.NewQueryRequest(firstItem.Nid, firstItem.Endpoint, exp.Metric, firstItem.TagsMap, firstItem.Step, start, now)
if err != nil {
return respData, err
}
reqs = append(reqs, queryParam)
if len(reqs) == 0 {
return respData, err
}
respData = query.Query(reqs, stra.Id, exp.Func)
respData = query.Query(reqs, stra, exp.Func)
if len(respData) < 1 {
stats.Counter.Set("get.data.null", 1)
err = fmt.Errorf("get query data is null")
}
return respData, err
}
@ -292,46 +277,31 @@ func GetReqs(stra *models.Stra, metric string, nids, endpoints []string, now int
lostSeries := []cache.Series{}
for _, index := range indexsData {
if index.Step == 0 {
//没有查到索引的 endpoint+metric 也要记录给nodata处理
if len(index.Tags) == 0 {
hash := getHash(index, "")
s := cache.Series{
Nid: index.Nid,
Endpoint: index.Endpoint,
Metric: index.Metric,
Tag: "",
Step: 10,
Dstype: "GAUGE",
Step: index.Step,
Dstype: index.Dstype,
TS: now,
}
lostSeries = append(lostSeries, s)
cache.SeriesMap.Set(stra.Id, hash, s)
} else {
if len(index.Tags) == 0 {
hash := getHash(index, "")
for _, tag := range index.Tags {
hash := getHash(index, tag)
s := cache.Series{
Nid: index.Nid,
Endpoint: index.Endpoint,
Metric: index.Metric,
Tag: "",
Tag: tag,
Step: index.Step,
Dstype: index.Dstype,
TS: now,
}
cache.SeriesMap.Set(stra.Id, hash, s)
} else {
for _, tag := range index.Tags {
hash := getHash(index, tag)
s := cache.Series{
Nid: index.Nid,
Endpoint: index.Endpoint,
Metric: index.Metric,
Tag: tag,
Step: index.Step,
Dstype: index.Dstype,
TS: now,
}
cache.SeriesMap.Set(stra.Id, hash, s)
}
}
}
}
@ -397,7 +367,7 @@ func GetReqs(stra *models.Stra, metric string, nids, endpoints []string, now int
return reqs
}
func sendEventIfNeed(historyData []*dataobj.HistoryData, status []bool, event *dataobj.Event, stra *models.Stra) {
func sendEventIfNeed(status []bool, event *dataobj.Event, stra *models.Stra) {
isTriggered := true
for _, s := range status {
isTriggered = isTriggered && s

View File

@ -46,46 +46,27 @@ func nodataJudge() {
logger.Debugf("stra:%+v endpoints or nids is null", stra)
continue
}
now := time.Now().Unix()
respData, err := GetData(stra, stra.Exprs[0], nil, now, false)
if err != nil {
logger.Errorf("stra:%+v get query data err:%v", stra, err)
if len(stra.Exprs) == 0 {
logger.Debugf("stra:%+v exp or nids is null", stra)
continue
}
for _, data := range respData {
var metric, tag string
// 兼容格式disk.bytes.free/mount=/data/docker/overlay2/xxx/merged
arr := strings.SplitN(data.Counter, "/", 2)
if len(arr) == 2 {
metric = arr[0]
tag = arr[1]
} else {
metric = data.Counter
}
if data.Endpoint == "" && data.Nid == "" {
continue
}
judgeItem := &dataobj.JudgeItem{
Nid: data.Nid,
Endpoint: data.Endpoint,
Metric: metric,
Tags: tag,
TagsMap: dataobj.DictedTagstring(tag),
DsType: data.DsType,
Step: data.Step,
}
now := time.Now().Unix()
reqs := GetReqs(stra, stra.Exprs[0].Metric, stra.Nids, stra.Endpoints, now)
if len(reqs) == 0 {
logger.Errorf("stra:%+v get query data err:req is null", stra)
continue
}
items := getJudgeItems(reqs)
for _, item := range items {
nodataJob.Acquire()
go AsyncJudge(nodataJob, stra, stra.Exprs, dataobj.RRDData2HistoryData(data.Values), judgeItem, now)
go AsyncJudge(nodataJob, stra, stra.Exprs, item, now)
}
}
}
func AsyncJudge(sema *semaphore.Semaphore, stra *models.Stra, exps []models.Exp, historyData []*dataobj.HistoryData, firstItem *dataobj.JudgeItem, now int64) {
func AsyncJudge(sema *semaphore.Semaphore, stra *models.Stra, exps []models.Exp, firstItem *dataobj.JudgeItem, now int64) {
defer sema.Release()
historyArr := []dataobj.History{}
@ -94,7 +75,7 @@ func AsyncJudge(sema *semaphore.Semaphore, stra *models.Stra, exps []models.Exp,
value := ""
for _, expr := range exps {
respData, err := GetData(stra, expr, firstItem, now, true)
respData, err := GetData(stra, expr, firstItem, now)
if err != nil {
logger.Errorf("stra:%+v get query data err:%v", stra, err)
return
@ -136,5 +117,50 @@ func AsyncJudge(sema *semaphore.Semaphore, stra *models.Stra, exps []models.Exp,
Hashid: getHashId(stra.Id, firstItem),
}
sendEventIfNeed(historyData, statusArr, event, stra)
sendEventIfNeed(statusArr, event, stra)
}
func getJudgeItems(reqs []*dataobj.QueryData) []*dataobj.JudgeItem {
var items []*dataobj.JudgeItem
for _, req := range reqs {
for _, counter := range req.Counters {
var metric, tag string
// 兼容格式disk.bytes.free/mount=/data/docker/overlay2/xxx/merged
arr := strings.SplitN(counter, "/", 2)
if len(arr) == 2 {
metric = arr[0]
tag = arr[1]
} else {
metric = counter
}
if len(req.Nids) != 0 {
for _, nid := range req.Nids {
judgeItem := &dataobj.JudgeItem{
Nid: nid,
Endpoint: "",
Metric: metric,
Tags: tag,
TagsMap: dataobj.DictedTagstring(tag),
DsType: req.DsType,
Step: req.Step,
}
items = append(items, judgeItem)
}
} else {
for _, endpoint := range req.Endpoints {
judgeItem := &dataobj.JudgeItem{
Endpoint: endpoint,
Metric: metric,
Tags: tag,
TagsMap: dataobj.DictedTagstring(tag),
DsType: req.DsType,
Step: req.Step,
}
items = append(items, judgeItem)
}
}
}
}
return items
}