Add recording rule (#1015)

* add prometheus recording rules

* fix recording rule sql

* add record rule note

* fix copy error

* add some regx

Co-authored-by: 尚承志 <chengzhi.shang@longbridge.sg>
This commit is contained in:
Tripitakav 2022-07-06 15:58:08 +08:00 committed by GitHub
parent bfe340d24d
commit 1304a4630b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 726 additions and 11 deletions

View File

@ -123,7 +123,10 @@ insert into `role_operation`(role_name, operation) values('Standard', '/job-tpls
insert into `role_operation`(role_name, operation) values('Standard', '/job-tasks');
insert into `role_operation`(role_name, operation) values('Standard', '/job-tasks/add');
insert into `role_operation`(role_name, operation) values('Standard', '/job-tasks/put');
insert into `role_operation`(role_name, operation) values('Standard', '/recording-rules');
insert into `role_operation`(role_name, operation) values('Standard', '/recording-rules/add');
insert into `role_operation`(role_name, operation) values('Standard', '/recording-rules/put');
insert into `role_operation`(role_name, operation) values('Standard', '/recording-rules/del');
-- for alert_rule | collect_rule | mute | dashboard grouping
CREATE TABLE `busi_group` (
@ -341,6 +344,29 @@ CREATE TABLE `metric_view` (
insert into metric_view(name, cate, configs) values('Host View', 0, '{"filters":[{"oper":"=","label":"__name__","value":"cpu_usage_idle"}],"dynamicLabels":[],"dimensionLabels":[{"label":"ident","value":""}]}');
-- ----------------------------
-- Table structure for recording_rule
-- ----------------------------
DROP TABLE IF EXISTS `recording_rule`;
CREATE TABLE `recording_rule` (
`id` bigint unsigned not null auto_increment,
`group_id` bigint not null default '0' comment 'group_id',
`cluster` varchar(128) not null,
`name` varchar(255) not null comment 'new metric name',
`note` varchar(255) not null comment 'rule note',
`prom_ql` varchar(8192) not null comment 'promql',
`prom_eval_interval` int not null comment 'evaluate interval',
`append_tags` varchar(255) default '' comment 'split by space: service=n9e mod=api',
`create_at` bigint default '0',
`create_by` varchar(64) default '',
`update_at` bigint default '0',
`update_by` varchar(64) default '',
PRIMARY KEY (`id`),
KEY `group_id` (`group_id`),
KEY `update_at` (`update_at`)
) ENGINE=InnoDB DEFAULT CHARSET = utf8mb4;
CREATE TABLE `alert_aggr_view` (
`id` bigint unsigned not null auto_increment,
`name` varchar(191) not null default '',

View File

@ -165,6 +165,7 @@ MaxConnsPerHost = 0
MaxIdleConns = 100
MaxIdleConnsPerHost = 10
[WriterOpt]
# queue channel count
QueueCount = 100

View File

@ -0,0 +1,201 @@
package models
import (
"fmt"
"strings"
"time"
"github.com/pkg/errors"
"github.com/prometheus/common/model"
)
// A RecordingRule records its vector expression into new timeseries.
type RecordingRule struct {
Id int64 `json:"id" gorm:"primaryKey"`
GroupId int64 `json:"group_id"` // busi group id
Cluster string `json:"cluster"` // take effect by cluster
Name string `json:"name"` // new metric name
Note string `json:"note"` // note
PromQl string `json:"prom_ql"` // just one ql for promql
PromEvalInterval int `json:"prom_eval_interval"` // unit:s
AppendTags string `json:"-"` // split by space: service=n9e mod=api
AppendTagsJSON []string `json:"append_tags" gorm:"-"` // for fe
CreateAt int64 `json:"create_at"`
CreateBy string `json:"create_by"`
UpdateAt int64 `json:"update_at"`
UpdateBy string `json:"update_by"`
}
func (re *RecordingRule) TableName() string {
return "recording_rule"
}
func (re *RecordingRule) FE2DB() {
//re.Cluster = strings.Join(re.ClusterJSON, " ")
re.AppendTags = strings.Join(re.AppendTagsJSON, " ")
}
func (re *RecordingRule) DB2FE() {
//re.ClusterJSON = strings.Fields(re.Cluster)
re.AppendTagsJSON = strings.Fields(re.AppendTags)
}
func (re *RecordingRule) Verify() error {
if re.GroupId < 0 {
return fmt.Errorf("GroupId(%d) invalid", re.GroupId)
}
if re.Cluster == "" {
return errors.New("cluster is blank")
}
if !model.MetricNameRE.MatchString(re.Name) {
return errors.New("Name has invalid chreacters")
}
if re.Name == "" {
return errors.New("name is blank")
}
if re.PromEvalInterval <= 0 {
re.PromEvalInterval = 60
}
re.AppendTags = strings.TrimSpace(re.AppendTags)
rer := strings.Fields(re.AppendTags)
for i := 0; i < len(rer); i++ {
if len(strings.Split(rer[i], "=")) != 2 || !model.LabelNameRE.MatchString(strings.Split(rer[i], "=")[0]) {
return fmt.Errorf("AppendTags(%s) invalid", rer[i])
}
}
return nil
}
func (re *RecordingRule) Add() error {
if err := re.Verify(); err != nil {
return err
}
exists, err := RecordingRuleExists("group_id=? and cluster=? and name=?", re.GroupId, re.Cluster, re.Name)
if err != nil {
return err
}
if exists {
return errors.New("RecordingRule already exists")
}
now := time.Now().Unix()
re.CreateAt = now
re.UpdateAt = now
return Insert(re)
}
func (re *RecordingRule) Update(ref RecordingRule) error {
if re.Name != ref.Name {
exists, err := RecordingRuleExists("group_id=? and cluster=? and name=? and id <> ?", re.GroupId, re.Cluster, ref.Name, re.Id)
if err != nil {
return err
}
if exists {
return errors.New("RecordingRule already exists")
}
}
ref.FE2DB()
ref.Id = re.Id
ref.GroupId = re.GroupId
ref.CreateAt = re.CreateAt
ref.CreateBy = re.CreateBy
ref.UpdateAt = time.Now().Unix()
err := ref.Verify()
if err != nil {
return err
}
return DB().Model(re).Select("*").Updates(ref).Error
}
func (re *RecordingRule) UpdateFieldsMap(fields map[string]interface{}) error {
return DB().Model(re).Updates(fields).Error
}
func RecordingRuleDels(ids []int64, groupId int64) error {
for i := 0; i < len(ids); i++ {
ret := DB().Where("id = ? and group_id=?", ids[i], groupId).Delete(&RecordingRule{})
if ret.Error != nil {
return ret.Error
}
}
return nil
}
func RecordingRuleExists(where string, regs ...interface{}) (bool, error) {
return Exists(DB().Model(&RecordingRule{}).Where(where, regs...))
}
func RecordingRuleGets(groupId int64) ([]RecordingRule, error) {
session := DB().Where("group_id=?", groupId).Order("name")
var lst []RecordingRule
err := session.Find(&lst).Error
if err == nil {
for i := 0; i < len(lst); i++ {
lst[i].DB2FE()
}
}
return lst, err
}
func RecordingRuleGet(where string, regs ...interface{}) (*RecordingRule, error) {
var lst []*RecordingRule
err := DB().Where(where, regs...).Find(&lst).Error
if err != nil {
return nil, err
}
if len(lst) == 0 {
return nil, nil
}
lst[0].DB2FE()
return lst[0], nil
}
func RecordingRuleGetById(id int64) (*RecordingRule, error) {
return RecordingRuleGet("id=?", id)
}
func RecordingRuleGetsByCluster(cluster string) ([]*RecordingRule, error) {
session := DB()
if cluster != "" {
session = DB().Where("cluster = ?", cluster)
}
var lst []*RecordingRule
err := session.Find(&lst).Error
if err == nil {
for i := 0; i < len(lst); i++ {
lst[i].DB2FE()
}
}
return lst, err
}
func RecordingRuleStatistics(cluster string) (*Statistics, error) {
session := DB().Model(&RecordingRule{}).Select("count(*) as total", "max(update_at) as last_updated")
if cluster != "" {
session = session.Where("cluster = ?", cluster)
}
var stats []*Statistics
err := session.Find(&stats).Error
if err != nil {
return nil, err
}
return stats[0], nil
}

View File

@ -1,9 +1,10 @@
package ormx
import (
"time"
"fmt"
"strings"
"time"
"gorm.io/driver/mysql"
"gorm.io/driver/postgres"
"gorm.io/gorm"

View File

@ -15,13 +15,13 @@ func (j *JSONObj) Scan(value interface{}) error {
// 判断是不是byte类型
bytes, ok := value.([]byte)
if !ok {
// 判断是不是string类型
strings, ok := value.(string)
if !ok {
return errors.New(fmt.Sprint("Failed to unmarshal JSONB value:", value))
}
// string类型转byte[]
bytes = []byte(strings)
// 判断是不是string类型
strings, ok := value.(string)
if !ok {
return errors.New(fmt.Sprint("Failed to unmarshal JSONB value:", value))
}
// string类型转byte[]
bytes = []byte(strings)
}
result := json.RawMessage{}

View File

@ -0,0 +1,118 @@
package conv
import (
"math"
"strings"
"time"
"github.com/didi/nightingale/v5/src/models"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/prompb"
)
const (
LabelName = "__name__"
)
func ConvertToTimeSeries(value model.Value, rule *models.RecordingRule) (lst []*prompb.TimeSeries) {
switch value.Type() {
case model.ValVector:
items, ok := value.(model.Vector)
if !ok {
return
}
for _, item := range items {
if math.IsNaN(float64(item.Value)) {
continue
}
s := prompb.Sample{}
s.Timestamp = time.Unix(item.Timestamp.Unix(), 0).UnixNano() / 1e6
s.Value = float64(item.Value)
l := labelsToLabelsProto(item.Metric, rule)
lst = append(lst, &prompb.TimeSeries{
Labels: l,
Samples: []prompb.Sample{s},
})
}
case model.ValMatrix:
items, ok := value.(model.Matrix)
if !ok {
return
}
for _, item := range items {
if len(item.Values) == 0 {
return
}
last := item.Values[len(item.Values)-1]
if math.IsNaN(float64(last.Value)) {
continue
}
l := labelsToLabelsProto(item.Metric, rule)
var slst []prompb.Sample
for _, v := range item.Values {
if math.IsNaN(float64(v.Value)) {
continue
}
slst = append(slst, prompb.Sample{
Timestamp: time.Unix(v.Timestamp.Unix(), 0).UnixNano() / 1e6,
Value: float64(v.Value),
})
}
lst = append(lst, &prompb.TimeSeries{
Labels: l,
Samples: slst,
})
}
case model.ValScalar:
item, ok := value.(*model.Scalar)
if !ok {
return
}
if math.IsNaN(float64(item.Value)) {
return
}
lst = append(lst, &prompb.TimeSeries{
Labels: nil,
Samples: []prompb.Sample{{Value: float64(item.Value), Timestamp: time.Unix(item.Timestamp.Unix(), 0).UnixNano() / 1e6}},
})
default:
return
}
return
}
func labelsToLabelsProto(labels model.Metric, rule *models.RecordingRule) (result []*prompb.Label) {
//name
nameLs := &prompb.Label{
Name: LabelName,
Value: rule.Name,
}
result = append(result, nameLs)
for k, v := range labels {
if model.LabelNameRE.MatchString(string(k)) {
result = append(result, &prompb.Label{
Name: string(k),
Value: string(v),
})
}
}
if len(rule.AppendTagsJSON) != 0 {
for _, v := range rule.AppendTagsJSON {
index := strings.Index(v, "=")
if model.LabelNameRE.MatchString(v[:index]) {
result = append(result, &prompb.Label{
Name: v[:index],
Value: v[index+1:],
})
}
}
}
return result
}

View File

@ -8,6 +8,7 @@ import (
"strings"
"time"
"github.com/didi/nightingale/v5/src/server/writer"
"github.com/prometheus/common/model"
"github.com/toolkits/pkg/logger"
"github.com/toolkits/pkg/net/httplib"
@ -33,6 +34,7 @@ func loopFilterRules(ctx context.Context) {
return
case <-time.After(duration):
filterRules()
filterRecordingRules()
}
}
}
@ -145,10 +147,11 @@ func (r RuleEval) Work() {
}
type WorkersType struct {
rules map[string]RuleEval
rules map[string]RuleEval
recordRules map[string]RecordingRuleEval
}
var Workers = &WorkersType{rules: make(map[string]RuleEval)}
var Workers = &WorkersType{rules: make(map[string]RuleEval), recordRules: make(map[string]RecordingRuleEval)}
func (ws *WorkersType) Build(rids []int64) {
rules := make(map[string]*models.AlertRule)
@ -208,6 +211,49 @@ func (ws *WorkersType) Build(rids []int64) {
}
}
func (ws *WorkersType) BuildRe(rids []int64) {
rules := make(map[string]*models.RecordingRule)
for i := 0; i < len(rids); i++ {
rule := memsto.RecordingRuleCache.Get(rids[i])
if rule == nil {
continue
}
hash := str.MD5(fmt.Sprintf("%d_%d_%s_%s",
rule.Id,
rule.PromEvalInterval,
rule.PromQl,
rule.AppendTags,
))
rules[hash] = rule
}
// stop old
for hash := range Workers.recordRules {
if _, has := rules[hash]; !has {
Workers.recordRules[hash].Stop()
delete(Workers.recordRules, hash)
}
}
// start new
for hash := range rules {
if _, has := Workers.recordRules[hash]; has {
// already exists
continue
}
re := RecordingRuleEval{
rule: rules[hash],
quit: make(chan struct{}),
}
go re.Start()
Workers.recordRules[hash] = re
}
}
func (r RuleEval) judge(vectors []conv.Vector) {
// 有可能rule的一些配置已经发生变化比如告警接收人、callbacks等
// 这些信息的修改是不会引起worker restart的但是确实会影响告警处理逻辑
@ -444,3 +490,80 @@ func (r RuleEval) pushEventToQueue(event *models.AlertCurEvent) {
logger.Warningf("event_push_queue: queue is full")
}
}
func filterRecordingRules() {
ids := memsto.RecordingRuleCache.GetRuleIds()
count := len(ids)
mines := make([]int64, 0, count)
for i := 0; i < count; i++ {
node, err := naming.HashRing.GetNode(fmt.Sprint(ids[i]))
if err != nil {
logger.Warning("failed to get node from hashring:", err)
continue
}
if node == config.C.Heartbeat.Endpoint {
mines = append(mines, ids[i])
}
}
Workers.BuildRe(mines)
}
type RecordingRuleEval struct {
rule *models.RecordingRule
quit chan struct{}
}
func (r RecordingRuleEval) Stop() {
logger.Infof("recording_rule_eval:%d stopping", r.RuleID())
close(r.quit)
}
func (r RecordingRuleEval) RuleID() int64 {
return r.rule.Id
}
func (r RecordingRuleEval) Start() {
logger.Infof("recording_rule_eval:%d started", r.RuleID())
for {
select {
case <-r.quit:
// logger.Infof("rule_eval:%d stopped", r.RuleID())
return
default:
r.Work()
interval := r.rule.PromEvalInterval
if interval <= 0 {
interval = 10
}
time.Sleep(time.Duration(interval) * time.Second)
}
}
}
func (r RecordingRuleEval) Work() {
promql := strings.TrimSpace(r.rule.PromQl)
if promql == "" {
logger.Errorf("recording_rule_eval:%d promql is blank", r.RuleID())
return
}
value, warnings, err := reader.Reader.Client.Query(context.Background(), promql, time.Now())
if err != nil {
logger.Errorf("recording_rule_eval:%d promql:%s, error:%v", r.RuleID(), promql, err)
return
}
if len(warnings) > 0 {
logger.Errorf("recording_rule_eval:%d promql:%s, warnings:%v", r.RuleID(), promql, warnings)
return
}
ts := conv.ConvertToTimeSeries(value, r.rule)
if len(ts) != 0 {
for _, v := range ts {
writer.Writers.PushSample(r.rule.Name, v)
}
}
}

View File

@ -19,4 +19,5 @@ func Sync() {
SyncAlertSubscribes()
SyncAlertRules()
SyncTargets()
SyncRecordingRules()
}

View File

@ -0,0 +1,119 @@
package memsto
import (
"fmt"
"sync"
"time"
"github.com/didi/nightingale/v5/src/models"
"github.com/didi/nightingale/v5/src/server/config"
promstat "github.com/didi/nightingale/v5/src/server/stat"
"github.com/pkg/errors"
"github.com/toolkits/pkg/logger"
)
type RecordingRuleCacheType struct {
statTotal int64
statLastUpdated int64
sync.RWMutex
rules map[int64]*models.RecordingRule // key: rule id
}
var RecordingRuleCache = RecordingRuleCacheType{
statTotal: -1,
statLastUpdated: -1,
rules: make(map[int64]*models.RecordingRule),
}
func (rrc *RecordingRuleCacheType) StatChanged(total, lastUpdated int64) bool {
if rrc.statTotal == total && rrc.statLastUpdated == lastUpdated {
return false
}
return true
}
func (rrc *RecordingRuleCacheType) Set(m map[int64]*models.RecordingRule, total, lastUpdated int64) {
rrc.Lock()
rrc.rules = m
rrc.Unlock()
// only one goroutine used, so no need lock
rrc.statTotal = total
rrc.statLastUpdated = lastUpdated
}
func (rrc *RecordingRuleCacheType) Get(ruleId int64) *models.RecordingRule {
rrc.RLock()
defer rrc.RUnlock()
return rrc.rules[ruleId]
}
func (rrc *RecordingRuleCacheType) GetRuleIds() []int64 {
rrc.RLock()
defer rrc.RUnlock()
count := len(rrc.rules)
list := make([]int64, 0, count)
for ruleId := range rrc.rules {
list = append(list, ruleId)
}
return list
}
func SyncRecordingRules() {
err := syncRecordingRules()
if err != nil {
fmt.Println("failed to sync recording rules:", err)
exit(1)
}
go loopSyncRecordingRules()
}
func loopSyncRecordingRules() {
duration := time.Duration(9000) * time.Millisecond
for {
time.Sleep(duration)
if err := syncRecordingRules(); err != nil {
logger.Warning("failed to sync recording rules:", err)
}
}
}
func syncRecordingRules() error {
start := time.Now()
stat, err := models.RecordingRuleStatistics(config.C.ClusterName)
if err != nil {
return errors.WithMessage(err, "failed to exec RecordingRuleStatistics")
}
if !RecordingRuleCache.StatChanged(stat.Total, stat.LastUpdated) {
promstat.GaugeCronDuration.WithLabelValues(config.C.ClusterName, "sync_recording_rules").Set(0)
promstat.GaugeSyncNumber.WithLabelValues(config.C.ClusterName, "sync_recording_rules").Set(0)
logger.Debug("recoding rules not changed")
return nil
}
lst, err := models.RecordingRuleGetsByCluster(config.C.ClusterName)
if err != nil {
return errors.WithMessage(err, "failed to exec RecordingRuleGetsByCluster")
}
m := make(map[int64]*models.RecordingRule)
for i := 0; i < len(lst); i++ {
m[lst[i].Id] = lst[i]
}
RecordingRuleCache.Set(m, stat.Total, stat.LastUpdated)
ms := time.Since(start).Milliseconds()
promstat.GaugeCronDuration.WithLabelValues(config.C.ClusterName, "sync_recording_rules").Set(float64(ms))
promstat.GaugeSyncNumber.WithLabelValues(config.C.ClusterName, "sync_recording_rules").Set(float64(len(m)))
logger.Infof("timer: sync recording rules done, cost: %dms, number: %d", ms, len(m))
return nil
}

View File

@ -218,6 +218,13 @@ func configRoute(r *gin.Engine, version string) {
pages.PUT("/busi-group/:id/alert-rule/:arid", jwtAuth(), user(), perm("/alert-rules/put"), alertRulePutByFE)
pages.GET("/alert-rule/:arid", jwtAuth(), user(), perm("/alert-rules"), alertRuleGet)
pages.GET("/busi-group/:id/recording-rules", jwtAuth(), user(), perm("/recording-rules"), recordingRuleGets)
pages.POST("/busi-group/:id/recording-rules", jwtAuth(), user(), perm("/recording-rules/add"), bgrw(), recordingRuleAddByFE)
pages.DELETE("/busi-group/:id/recording-rules", jwtAuth(), user(), perm("/recording-rules/del"), bgrw(), recordingRuleDel)
pages.PUT("/busi-group/:id/recording-rule/:rrid", jwtAuth(), user(), perm("/recording-rules/put"), bgrw(), recordingRulePutByFE)
pages.GET("/recording-rule/:rrid", jwtAuth(), user(), perm("/recording-rules"), recordingRuleGet)
pages.PUT("/busi-group/:id/recording-rules/fields", jwtAuth(), user(), perm("/recording-rules/put"), recordingRulePutFields)
pages.GET("/busi-group/:id/alert-mutes", jwtAuth(), user(), perm("/alert-mutes"), bgro(), alertMuteGetsByBG)
pages.POST("/busi-group/:id/alert-mutes", jwtAuth(), user(), perm("/alert-mutes/add"), bgrw(), alertMuteAdd)
pages.DELETE("/busi-group/:id/alert-mutes", jwtAuth(), user(), perm("/alert-mutes/del"), bgrw(), alertMuteDel)

View File

@ -0,0 +1,118 @@
package router
import (
"net/http"
"time"
"github.com/didi/nightingale/v5/src/models"
"github.com/gin-gonic/gin"
"github.com/toolkits/pkg/ginx"
)
func recordingRuleGets(c *gin.Context) {
busiGroupId := ginx.UrlParamInt64(c, "id")
ars, err := models.RecordingRuleGets(busiGroupId)
ginx.NewRender(c).Data(ars, err)
}
func recordingRuleGet(c *gin.Context) {
rrid := ginx.UrlParamInt64(c, "rrid")
ar, err := models.RecordingRuleGetById(rrid)
ginx.Dangerous(err)
if ar == nil {
ginx.NewRender(c, http.StatusNotFound).Message("No such recording rule")
return
}
ginx.NewRender(c).Data(ar, err)
}
func recordingRuleAddByFE(c *gin.Context) {
username := c.MustGet("username").(string)
var lst []models.RecordingRule
ginx.BindJSON(c, &lst)
count := len(lst)
if count == 0 {
ginx.Bomb(http.StatusBadRequest, "input json is empty")
}
bgid := ginx.UrlParamInt64(c, "id")
reterr := make(map[string]string)
for i := 0; i < count; i++ {
lst[i].Id = 0
lst[i].GroupId = bgid
lst[i].CreateBy = username
lst[i].UpdateBy = username
lst[i].FE2DB()
if err := lst[i].Add(); err != nil {
reterr[lst[i].Name] = err.Error()
} else {
reterr[lst[i].Name] = ""
}
}
ginx.NewRender(c).Data(reterr, nil)
}
func recordingRulePutByFE(c *gin.Context) {
var f models.RecordingRule
ginx.BindJSON(c, &f)
rrid := ginx.UrlParamInt64(c, "rrid")
ar, err := models.RecordingRuleGetById(rrid)
ginx.Dangerous(err)
if ar == nil {
ginx.NewRender(c, http.StatusNotFound).Message("No such recording rule")
return
}
bgrwCheck(c, ar.GroupId)
f.UpdateBy = c.MustGet("username").(string)
ginx.NewRender(c).Message(ar.Update(f))
}
func recordingRuleDel(c *gin.Context) {
var f idsForm
ginx.BindJSON(c, &f)
f.Verify()
ginx.NewRender(c).Message(models.RecordingRuleDels(f.Ids, ginx.UrlParamInt64(c, "id")))
}
type recordRuleFieldForm struct {
Ids []int64 `json:"ids"`
Fields map[string]interface{} `json:"fields"`
}
func recordingRulePutFields(c *gin.Context) {
var f recordRuleFieldForm
ginx.BindJSON(c, &f)
if len(f.Fields) == 0 {
ginx.Bomb(http.StatusBadRequest, "fields empty")
}
f.Fields["update_by"] = c.MustGet("username").(string)
f.Fields["update_at"] = time.Now().Unix()
for i := 0; i < len(f.Ids); i++ {
ar, err := models.RecordingRuleGetById(f.Ids[i])
ginx.Dangerous(err)
if ar == nil {
continue
}
ginx.Dangerous(ar.UpdateFieldsMap(f.Fields))
}
ginx.NewRender(c).Message(nil)
}