nightingale/judge/handler.go

539 lines
14 KiB
Go
Raw Normal View History

// Copyright 2017 Xiaomi, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package judge
import (
"bytes"
"encoding/json"
"fmt"
"math"
"regexp"
"strconv"
"strings"
"sync"
"time"
"github.com/toolkits/pkg/logger"
"github.com/toolkits/pkg/str"
"github.com/didi/nightingale/v5/cache"
"github.com/didi/nightingale/v5/models"
"github.com/didi/nightingale/v5/vos"
)
var (
bufferPool = sync.Pool{New: func() interface{} { return new(bytes.Buffer) }}
EVENT_ALERT = "alert"
EVENT_RECOVER = "recovery"
)
func Send(points []*vos.MetricPoint) {
for i := range points {
alertRules := getMatchAlertRules(points[i])
rulesCount := len(alertRules)
if rulesCount == 0 {
// 这个监控数据没有关联任何告警策略,省事了不用处理
continue
}
logger.Debugf("[point_match_alertRules][point:%+v][alertRuleNum:%+v]", points[i], rulesCount)
// 不同的告警规则alert_duration字段大小不同找到最大的按照最大的值来缓存历史数据
var maxAliveDuration = 0
for j := range alertRules {
if maxAliveDuration < alertRules[j].AlertDuration {
maxAliveDuration = alertRules[j].AlertDuration
}
}
if len(points[i].PK) < 2 {
logger.Debugf("[point:%+v] len(pk)<2", points[i])
continue
}
ll := PointCaches[points[i].PK[0:2]].PutPoint(points[i], int64(maxAliveDuration))
for j := range alertRules {
go ToJudge(ll, alertRules[j], points[i])
}
}
}
func getMatchAlertRules(point *vos.MetricPoint) []*models.AlertRule {
alertRules := cache.AlertRulesByMetric.GetBy(point.Metric)
matchRules := make([]*models.AlertRule, 0, len(alertRules))
for i := range alertRules {
if alertRules[i].Type == models.PULL {
continue
}
if matchAlertRule(point, alertRules[i]) {
matchRules = append(matchRules, alertRules[i])
}
}
return matchRules
}
func matchAlertRule(item *vos.MetricPoint, alertRule *models.AlertRule) bool {
//TODO 过滤方式待优化
for _, filter := range alertRule.PushExpr.ResFilters {
if !valueMatch(item.Ident, filter.Func, filter.Params) {
return false
}
}
for _, filter := range alertRule.PushExpr.TagFilters {
value, exists := item.TagsMap[filter.Key]
if !exists {
return false
}
if !valueMatch(value, filter.Func, filter.Params) {
return false
}
}
return true
}
func valueMatch(value, f string, params []string) bool {
switch f {
case "InClasspath":
for i := range params {
if cache.ResClasspath.Exists(value, params[i]) {
return true
}
}
return false
case "NotInClasspath":
for i := range params {
if cache.ResClasspath.Exists(value, params[i]) {
return false
}
}
return true
case "InClasspathPrefix":
classpaths := cache.ResClasspath.GetValues(value)
for _, classpath := range classpaths {
for i := range params {
if strings.HasPrefix(classpath, params[i]) {
return true
}
}
}
return false
case "NotInClasspathPrefix":
classpaths := cache.ResClasspath.GetValues(value)
for _, classpath := range classpaths {
for i := range params {
if strings.HasPrefix(classpath, params[i]) {
return false
}
}
}
return true
case "InList":
for i := range params {
if value == params[i] {
return true
}
}
return false
case "NotInList":
for i := range params {
if value == params[i] {
return false
}
}
return true
case "InResourceList":
for i := range params {
if value == params[i] {
return true
}
}
return false
case "NotInResourceList":
for i := range params {
if value == params[i] {
return false
}
}
return true
case "HasPrefixString":
for i := range params {
if strings.HasPrefix(value, params[i]) {
return true
}
}
return false
case "NoPrefixString":
for i := range params {
if strings.HasPrefix(value, params[i]) {
return false
}
}
return true
case "HasSuffixString":
for i := range params {
if strings.HasSuffix(value, params[i]) {
return true
}
}
return false
case "NoSuffixString":
for i := range params {
if strings.HasSuffix(value, params[i]) {
return false
}
}
return true
case "ContainsString":
for i := range params {
if strings.Contains(value, params[i]) {
return true
}
}
return false
case "NotContainsString":
for i := range params {
if strings.Contains(value, params[i]) {
return false
}
}
return true
case "MatchRegexp":
for i := range params {
r, _ := regexp.Compile(params[i])
if r.MatchString(value) {
return true
}
}
return false
case "NotMatchRegexp":
for i := range params {
r, _ := regexp.Compile(params[i])
if r.MatchString(value) {
return false
}
}
return true
}
return false
}
func ToJudge(linkedList *SafeLinkedList, stra *models.AlertRule, val *vos.MetricPoint) {
logger.Debugf("[ToJudge.start][stra:%+v][val:%+v]", stra, val)
now := val.Time
hps := linkedList.HistoryPoints(now - int64(stra.AlertDuration))
if len(hps) == 0 {
return
}
historyArr := []vos.HistoryPoints{}
statusArr := []bool{}
eventInfo := ""
value := ""
if len(stra.PushExpr.Exps) == 1 {
for _, expr := range stra.PushExpr.Exps {
history, info, lastValue, status := Judge(stra, expr, hps, val, now)
statusArr = append(statusArr, status)
if value == "" {
value = fmt.Sprintf("%s: %s", expr.Metric, lastValue)
} else {
value += fmt.Sprintf("; %s: %s", expr.Metric, lastValue)
}
historyArr = append(historyArr, history)
eventInfo += info
}
} else { //多个条件
for _, expr := range stra.PushExpr.Exps {
respData, err := GetData(stra, expr, val, now)
if err != nil {
logger.Errorf("stra:%+v get query data err:%v", stra, err)
return
}
if len(respData) <= 0 {
logger.Errorf("stra:%+v get query data respData:%v err", stra, respData)
return
}
history, info, lastValue, status := Judge(stra, expr, respData, val, now)
statusArr = append(statusArr, status)
if value == "" {
value = fmt.Sprintf("%s: %s", expr.Metric, lastValue)
} else {
value += fmt.Sprintf("; %s: %s", expr.Metric, lastValue)
}
historyArr = append(historyArr, history)
if eventInfo == "" {
eventInfo = info
} else {
if stra.PushExpr.TogetherOrAny == 0 {
eventInfo += fmt.Sprintf(" & %s", info)
} else if stra.PushExpr.TogetherOrAny == 1 {
eventInfo += fmt.Sprintf(" || %s", info)
}
}
}
}
bs, err := json.Marshal(historyArr)
if err != nil {
logger.Errorf("Marshal history:%+v err:%v", historyArr, err)
}
event := &models.AlertEvent{
RuleId: stra.Id,
RuleName: stra.Name,
RuleNote: stra.Note,
HashId: str.MD5(fmt.Sprintf("%d_%s", stra.Id, val.PK)),
ResIdent: val.Ident,
Priority: stra.Priority,
HistoryPoints: bs,
TriggerTime: now,
Values: value,
NotifyChannels: stra.NotifyChannels,
NotifyGroups: stra.NotifyGroups,
NotifyUsers: stra.NotifyUsers,
RunbookUrl: stra.RunbookUrl,
ReadableExpression: eventInfo,
TagMap: val.TagsMap,
}
2021-07-10 02:32:28 +08:00
logger.Debugf("[ToJudge.event.create][statusArr:%v][type=push][stra:%+v][val:%+v][event:%+v]", statusArr, stra, val, event)
sendEventIfNeed(statusArr, event, stra)
}
func Judge(stra *models.AlertRule, exp models.Exp, historyData []*vos.HPoint, firstItem *vos.MetricPoint, now int64) (history vos.HistoryPoints, info string, lastValue string, status bool) {
var leftValue vos.JsonFloat
if exp.Func == "stddev" {
info = fmt.Sprintf(" %s (%s,%ds) %v", exp.Metric, exp.Func, stra.AlertDuration, exp.Params)
} else if exp.Func == "happen" {
info = fmt.Sprintf(" %s (%s,%ds) %v %s %v", exp.Metric, exp.Func, stra.AlertDuration, exp.Params, exp.Optr, exp.Threshold)
} else {
info = fmt.Sprintf(" %s(%s,%ds) %s %v", exp.Metric, exp.Func, stra.AlertDuration, exp.Optr, exp.Threshold)
}
leftValue, status = judgeItemWithStrategy(stra, historyData, exp, firstItem, now)
lastValue = "null"
if !math.IsNaN(float64(leftValue)) {
lastValue = strconv.FormatFloat(float64(leftValue), 'f', -1, 64)
}
history = vos.HistoryPoints{
Metric: exp.Metric,
Tags: firstItem.TagsMap,
Points: historyData,
}
return
}
func judgeItemWithStrategy(stra *models.AlertRule, historyData []*vos.HPoint, exp models.Exp, firstItem *vos.MetricPoint, now int64) (leftValue vos.JsonFloat, isTriggered bool) {
straFunc := exp.Func
var straParam []interface{}
straParam = append(straParam, stra.AlertDuration)
switch straFunc {
case "happen", "stddev":
if len(exp.Params) < 1 {
logger.Errorf("stra:%d exp:%+v stra param is null", stra.Id, exp)
return
}
straParam = append(straParam, exp.Params[0])
case "c_avg", "c_avg_abs", "c_avg_rate", "c_avg_rate_abs":
if len(exp.Params) < 1 {
logger.Errorf("stra:%d exp:%+v stra param is null", stra.Id, exp)
return
}
hisD, err := GetData(stra, exp, firstItem, now-int64(exp.Params[0]))
if err != nil {
logger.Errorf("stra:%v %+v get compare data err:%v", stra.Id, exp, err)
return
}
if len(hisD) != 1 {
logger.Errorf("stra:%d %+v get compare data err, respItems:%v", stra.Id, exp, hisD)
return
}
var sum float64
for _, i := range hisD {
sum += float64(i.Value)
}
//环比数据的平均值
straParam = append(straParam, sum/float64(len(hisD)))
}
fn, err := ParseFuncFromString(straFunc, straParam, exp.Optr, exp.Threshold)
if err != nil {
logger.Errorf("stra:%d %+v parse func fail: %v", stra.Id, exp, err)
return
}
return fn.Compute(historyData)
}
func GetData(stra *models.AlertRule, exp models.Exp, firstItem *vos.MetricPoint, now int64) ([]*vos.HPoint, error) {
var respData []*vos.HPoint
var err error
//多查一些数据,防止由于查询不到最新点,导致点数不够
start := now - int64(stra.AlertDuration) - 2
// 这里的参数肯定只有一个
queryParam, err := NewQueryRequest(firstItem.Ident, exp.Metric, firstItem.TagsMap, start, now)
if err != nil {
return respData, err
}
respData = Query(queryParam)
logger.Debugf("[exp:%+v][queryParam:%+v][respData:%+v]\n", exp, queryParam, respData)
return respData, err
}
// 虽然最近的数据确实产生了事件(产生事件很频繁),但是未必一定要发送,只有告警/恢复状态发生变化的时候才需发送
func sendEventIfNeed(status []bool, event *models.AlertEvent, stra *models.AlertRule) {
isTriggered := true
if stra.Type == 0 {
// 只判断push型的
switch stra.PushExpr.TogetherOrAny {
case 0:
// 全部触发
for _, s := range status {
isTriggered = isTriggered && s
}
case 1:
// 任意一个触发
isTriggered = false
for _, s := range status {
if s == true {
isTriggered = true
break
}
}
}
}
now := time.Now().Unix()
lastEvent, exists := LastEvents.Get(event.HashId)
switch event.IsPromePull {
case 0:
// push型的 && 与条件型的
if exists && lastEvent.IsPromePull == 1 {
// 之前内存中的事件是pull型的先清空内存中的事件
LastEvents.Del(event.HashId)
}
if isTriggered {
// 新告警或者上次是恢复事件,都需要立即发送
if !exists || lastEvent.IsRecov() {
event.MarkAlert()
SendEvent(event)
}
} else {
// 上次是告警事件,现在恢复了,自然需要通知
if exists && lastEvent.IsAlert() {
event.MarkRecov()
SendEvent(event)
}
}
case 1:
// pull型的产生的事件一定是触发了阈值的即这个case里不存在recovery的场景recovery的场景用resolve_timeout的cron来处理
if exists && lastEvent.IsPromePull == 0 {
// 之前内存中的事件是push型的先清空内存中的事件
LastEvents.Del(event.HashId)
}
// 1. 第一次来并且AlertDuration=0直接发送
// 2. 触发累计到AlertDuration时长后触发一条
if !exists {
// 这是个新事件,之前未曾产生过的
if stra.AlertDuration == 0 {
// 代表prometheus rule for 配置为0直接发送
event.LastSend = true
event.MarkAlert()
SendEvent(event)
} else {
// 只有一条事件显然无法满足for AlertDuration的时间放到内存里等待
LastEvents.Set(event.HashId, event)
}
return
}
// 内存里有事件虽然AlertDuration是0但是上次没有发过(可能是中间调整过AlertDuration比如从某个大于0的值调整为0)
if stra.AlertDuration == 0 && !lastEvent.LastSend {
event.LastSend = true
event.MarkAlert()
SendEvent(event)
return
}
// 内存里有事件AlertDuration也是大于0的需要判断Prometheus里的for的逻辑
if now-lastEvent.TriggerTime < int64(stra.AlertDuration) {
// 距离上次告警的时间小于告警统计周期即不满足for的条件不产生告警通知
return
}
logger.Debugf("[lastEvent.LastSend:%+v][event.LastSend:%+v][now:%+v][lastEvent.TriggerTime:%+v][stra.AlertDuration:%+v][now-lastEvent.TriggerTime:%+v]\n",
lastEvent.LastSend,
event.LastSend,
now,
lastEvent.TriggerTime,
stra.AlertDuration,
now-lastEvent.TriggerTime,
)
// 满足for的条件了应产生事件但是未必一定要发送上次没发送或者上次是恢复这次才发送即保证只发一条
if !lastEvent.LastSend || lastEvent.IsRecov() {
event.LastSend = true
event.MarkAlert()
SendEvent(event)
}
}
}
func SendEvent(event *models.AlertEvent) {
// update last event
LastEvents.Set(event.HashId, event)
ok := EventQueue.PushFront(event)
if !ok {
logger.Errorf("push event:%v err", event)
}
logger.Debugf("[SendEvent.event.success][type:%+v][event:%+v]", event.IsPromePull, event)
}