Merge pull request 'support redis cluster' (#5) from main into master

This commit is contained in:
UlricQin 2022-05-30 08:41:27 +08:00
commit 86be3b5caf
21 changed files with 308 additions and 65 deletions

View File

@ -2,7 +2,7 @@
NOW = $(shell date -u '+%Y%m%d%I%M%S')
RELEASE_VERSION = 5.8.1
RELEASE_VERSION = 5.9.0
APP = n9e
SERVER_BIN = $(APP)

View File

@ -81,4 +81,4 @@ We welcome your participation in the Nightingale open source project and open so
- [ ] support pushgateway api
## License
Nightingale with [Apache License V2.0](https://github.com/didi/nightingale/blob/main/LICENSE) open source license.
Nightingale with [Apache License V2.0](https://github.com/didi/nightingale/blob/main/LICENSE) open source license.

View File

@ -14,6 +14,7 @@ CREATE TABLE `users` (
`portrait` varchar(255) not null default '' comment 'portrait image url',
`roles` varchar(255) not null comment 'Admin | Standard | Guest, split by space',
`contacts` varchar(1024) comment 'json e.g. {wecom:xx, dingtalk_robot_token:yy}',
`maintainer` tinyint(1) not null default 0,
`create_at` bigint not null default 0,
`create_by` varchar(64) not null default '',
`update_at` bigint not null default 0,

View File

@ -29,7 +29,7 @@ type inter interface {
[Alerting.CallPlugin]
Enable = false
PluginPath = "./etc/script/notify.so"
# 注意此处caller必须在notify.so中作为变量暴露
Caller = "n9eCaller"
# 注意此处caller必须在notify.so中作为变量暴露,首字母必须大写才能暴露
Caller = "N9eCaller"
```

View File

@ -37,8 +37,8 @@ func (n *N9EPlugin) Notify(bs []byte) {
}
}
// will be loaded for alertingCall
var n9eCaller = N9EPlugin{
// will be loaded for alertingCall , The first letter must be capitalized to be exported
var N9eCaller = N9EPlugin{
Name: "n9e",
Description: "演示告警通过动态链接库方式通知",
BuildAt: time.Now().Local().Format("2006/01/02 15:04:05"),

View File

@ -29,7 +29,7 @@ type inter interface {
[Alerting.CallPlugin]
Enable = false
PluginPath = "./etc/script/notify.so"
# 注意此处caller必须在notify.so中作为变量暴露
Caller = "n9eCaller"
# 注意此处caller必须在notify.so中作为变量暴露,首字母必须大写才能暴露
Caller = "N9eCaller"
```

View File

@ -37,8 +37,8 @@ func (n *N9EPlugin) Notify(bs []byte) {
}
}
// will be loaded for alertingCall
var n9eCaller = N9EPlugin{
// will be loaded for alertingCall , The first letter must be capitalized to be exported
var N9eCaller = N9EPlugin{
Name: "n9e",
Description: "演示告警通过动态链接库方式通知",
BuildAt: time.Now().Local().Format("2006/01/02 15:04:05"),

View File

@ -85,7 +85,8 @@ ScriptPath = "./etc/script/notify.py"
Enable = false
# use a plugin via `go build -buildmode=plugin -o notify.so`
PluginPath = "./etc/script/notify.so"
Caller = "n9eCaller"
# The first letter must be capitalized to be exported
Caller = "N9eCaller"
[Alerting.RedisPub]
Enable = false
@ -115,13 +116,17 @@ BasicAuthPass = "ibex"
Timeout = 3000
[Redis]
# address, ip:port
# address, ip:port or ip1:port,ip2:port for cluster and sentinel(SentinelAddrs)
Address = "127.0.0.1:6379"
# Username = ""
# Password = ""
# DB = 0
# UseTLS = false
# TLSMinVersion = "1.2"
# standalone cluster sentinel
RedisType = "standalone"
# Mastername for sentinel type
# MasterName = "mymaster"
[DB]
# postgres: host=%s port=%s user=%s dbname=%s password=%s sslmode=%s

View File

@ -142,13 +142,17 @@ Phone = "phone_number"
Email = "email"
[Redis]
# address, ip:port
# address, ip:port or ip1:port,ip2:port for cluster and sentinel(SentinelAddrs)
Address = "127.0.0.1:6379"
# Username = ""
# Password = ""
# DB = 0
# UseTLS = false
# TLSMinVersion = "1.2"
# standalone cluster sentinel
RedisType = "standalone"
# Mastername for sentinel type
# MasterName = "mymaster"
[DB]
DSN="root:1234@tcp(127.0.0.1:3306)/n9e_v5?charset=utf8mb4&parseTime=True&loc=Local&allowNativePasswords=true"

View File

@ -245,8 +245,8 @@ func (e *AlertCurEvent) FillNotifyGroups(cache map[int64]*UserGroup) error {
return nil
}
func AlertCurEventTotal(bgid, stime, etime int64, severity int, clusters []string, query string) (int64, error) {
session := DB().Model(&AlertCurEvent{}).Where("trigger_time between ? and ?", stime, etime)
func AlertCurEventTotal(prod string, bgid, stime, etime int64, severity int, clusters []string, query string) (int64, error) {
session := DB().Model(&AlertCurEvent{}).Where("trigger_time between ? and ? and rule_prod = ?", stime, etime, prod)
if bgid > 0 {
session = session.Where("group_id = ?", bgid)
@ -271,8 +271,8 @@ func AlertCurEventTotal(bgid, stime, etime int64, severity int, clusters []strin
return Count(session)
}
func AlertCurEventGets(bgid, stime, etime int64, severity int, clusters []string, query string, limit, offset int) ([]AlertCurEvent, error) {
session := DB().Where("trigger_time between ? and ?", stime, etime)
func AlertCurEventGets(prod string, bgid, stime, etime int64, severity int, clusters []string, query string, limit, offset int) ([]AlertCurEvent, error) {
session := DB().Where("trigger_time between ? and ? and rule_prod = ?", stime, etime, prod)
if bgid > 0 {
session = session.Where("group_id = ?", bgid)

View File

@ -89,8 +89,8 @@ func (e *AlertHisEvent) FillNotifyGroups(cache map[int64]*UserGroup) error {
return nil
}
func AlertHisEventTotal(bgid, stime, etime int64, severity int, recovered int, clusters []string, query string) (int64, error) {
session := DB().Model(&AlertHisEvent{}).Where("last_eval_time between ? and ?", stime, etime)
func AlertHisEventTotal(prod string, bgid, stime, etime int64, severity int, recovered int, clusters []string, query string) (int64, error) {
session := DB().Model(&AlertHisEvent{}).Where("last_eval_time between ? and ? and rule_prod = ?", stime, etime, prod)
if bgid > 0 {
session = session.Where("group_id = ?", bgid)
@ -119,8 +119,8 @@ func AlertHisEventTotal(bgid, stime, etime int64, severity int, recovered int, c
return Count(session)
}
func AlertHisEventGets(bgid, stime, etime int64, severity int, recovered int, clusters []string, query string, limit, offset int) ([]AlertHisEvent, error) {
session := DB().Where("last_eval_time between ? and ?", stime, etime)
func AlertHisEventGets(prod string, bgid, stime, etime int64, severity int, recovered int, clusters []string, query string, limit, offset int) ([]AlertHisEvent, error) {
session := DB().Where("last_eval_time between ? and ? and rule_prod = ?", stime, etime, prod)
if bgid > 0 {
session = session.Where("group_id = ?", bgid)

View File

@ -36,13 +36,28 @@ func (m *AlertMute) TableName() string {
return "alert_mute"
}
func AlertMuteGets(groupId int64) (lst []AlertMute, err error) {
func AlertMuteGets(bgid int64, query string) (lst []AlertMute, err error) {
session := DB().Where("group_id = ?", bgid)
if query != "" {
arr := strings.Fields(query)
for i := 0; i < len(arr); i++ {
qarg := "%\"" + arr[i] + "\"%"
session = session.Where("tags like ?", qarg, qarg)
}
}
err = session.Order("id desc").Find(&lst).Error
return
}
func AlertMuteGetsByBG(groupId int64) (lst []AlertMute, err error) {
err = DB().Where("group_id=?", groupId).Order("id desc").Find(&lst).Error
return
}
func (m *AlertMute) Verify() error {
if m.GroupId <= 0 {
if m.GroupId < 0 {
return errors.New("group_id invalid")
}

View File

@ -17,21 +17,22 @@ import (
)
type User struct {
Id int64 `json:"id" gorm:"primaryKey"`
Username string `json:"username"`
Nickname string `json:"nickname"`
Password string `json:"-"`
Phone string `json:"phone"`
Email string `json:"email"`
Portrait string `json:"portrait"`
Roles string `json:"-"` // 这个字段写入数据库
RolesLst []string `json:"roles" gorm:"-"` // 这个字段和前端交互
Contacts ormx.JSONObj `json:"contacts"` // 内容为 map[string]string 结构
CreateAt int64 `json:"create_at"`
CreateBy string `json:"create_by"`
UpdateAt int64 `json:"update_at"`
UpdateBy string `json:"update_by"`
Admin bool `json:"admin" gorm:"-"` // 方便前端使用
Id int64 `json:"id" gorm:"primaryKey"`
Username string `json:"username"`
Nickname string `json:"nickname"`
Password string `json:"-"`
Phone string `json:"phone"`
Email string `json:"email"`
Portrait string `json:"portrait"`
Roles string `json:"-"` // 这个字段写入数据库
RolesLst []string `json:"roles" gorm:"-"` // 这个字段和前端交互
Contacts ormx.JSONObj `json:"contacts"` // 内容为 map[string]string 结构
Maintainer int `json:"maintainer"` // 是否给管理员发消息 0:not send 1:send
CreateAt int64 `json:"create_at"`
CreateBy string `json:"create_by"`
UpdateAt int64 `json:"update_at"`
UpdateBy string `json:"update_by"`
Admin bool `json:"admin" gorm:"-"` // 方便前端使用
}
func (u *User) TableName() string {

View File

@ -0,0 +1,108 @@
package engine
import (
"time"
"github.com/didi/nightingale/v5/src/server/common/sender"
"github.com/didi/nightingale/v5/src/server/config"
"github.com/didi/nightingale/v5/src/server/memsto"
"github.com/tidwall/gjson"
"github.com/toolkits/pkg/logger"
)
// notify to maintainer to handle the error
func notifyToMaintainer(e error, title string) {
logger.Errorf("notifyToMaintainertitle:%s, error:%v", title, e)
if len(config.C.Alerting.NotifyBuiltinChannels) == 0 {
return
}
maintainerUsers := memsto.UserCache.GetMaintainerUsers()
if len(maintainerUsers) == 0 {
return
}
emailset := make(map[string]struct{})
phoneset := make(map[string]struct{})
wecomset := make(map[string]struct{})
dingtalkset := make(map[string]struct{})
feishuset := make(map[string]struct{})
for _, user := range maintainerUsers {
if user.Email != "" {
emailset[user.Email] = struct{}{}
}
if user.Phone != "" {
phoneset[user.Phone] = struct{}{}
}
bs, err := user.Contacts.MarshalJSON()
if err != nil {
logger.Errorf("handle_notice: failed to marshal contacts: %v", err)
continue
}
ret := gjson.GetBytes(bs, "dingtalk_robot_token")
if ret.Exists() {
dingtalkset[ret.String()] = struct{}{}
}
ret = gjson.GetBytes(bs, "wecom_robot_token")
if ret.Exists() {
wecomset[ret.String()] = struct{}{}
}
ret = gjson.GetBytes(bs, "feishu_robot_token")
if ret.Exists() {
feishuset[ret.String()] = struct{}{}
}
}
phones := StringSetKeys(phoneset)
triggerTime := time.Now().Format("2006/01/02 - 15:04:05")
for _, ch := range config.C.Alerting.NotifyBuiltinChannels {
switch ch {
case "email":
if len(emailset) == 0 {
continue
}
content := "【内部处理错误】当前标题: " + title + "\n【内部处理错误】当前异常: " + e.Error() + "\n【内部处理错误】发送时间: " + triggerTime
sender.WriteEmail(title, content, StringSetKeys(emailset))
case "dingtalk":
if len(dingtalkset) == 0 {
continue
}
content := "**【内部处理错误】当前标题: **" + title + "\n**【内部处理错误】当前异常: **" + e.Error() + "\n**【内部处理错误】发送时间: **" + triggerTime
sender.SendDingtalk(sender.DingtalkMessage{
Title: title,
Text: content,
AtMobiles: phones,
Tokens: StringSetKeys(dingtalkset),
})
case "wecom":
if len(wecomset) == 0 {
continue
}
content := "**【内部处理错误】当前标题: **" + title + "\n**【内部处理错误】当前异常: **" + e.Error() + "\n**【内部处理错误】发送时间: **" + triggerTime
sender.SendWecom(sender.WecomMessage{
Text: content,
Tokens: StringSetKeys(wecomset),
})
case "feishu":
if len(feishuset) == 0 {
continue
}
content := "【内部处理错误】当前标题: " + title + "\n【内部处理错误】当前异常: " + e.Error() + "\n【内部处理错误】发送时间: " + triggerTime
sender.SendFeishu(sender.FeishuMessage{
Text: content,
AtMobiles: phones,
Tokens: StringSetKeys(feishuset),
})
}
}
}

View File

@ -39,6 +39,7 @@ func loopFilterRules(ctx context.Context) {
func filterRules() {
ids := memsto.AlertRuleCache.GetRuleIds()
logger.Infof("AlertRuleCache.GetRuleIds successids.len: %d", len(ids))
count := len(ids)
mines := make([]int64, 0, count)
@ -83,6 +84,7 @@ func (r RuleEval) Start() {
return
default:
r.Work()
logger.Infof("rule executedrule_id=%d", r.RuleID())
interval := r.rule.PromEvalInterval
if interval <= 0 {
interval = 10
@ -111,6 +113,8 @@ func (r RuleEval) Work() {
value, warnings, err = reader.Reader.Client.Query(context.Background(), promql, time.Now())
if err != nil {
logger.Errorf("rule_eval:%d promql:%s, error:%v", r.RuleID(), promql, err)
// 告警查询prometheus逻辑出错发告警信息给管理员
notifyToMaintainer(err, "查询prometheus出错")
return
}
@ -182,6 +186,7 @@ func (ws *WorkersType) Build(rids []int64) {
elst, err := models.AlertCurEventGetByRule(rules[hash].Id)
if err != nil {
logger.Errorf("worker_build: AlertCurEventGetByRule failed: %v", err)
notifyToMaintainer(err, "AlertCurEventGetByRule ErrorruleID="+fmt.Sprint(rules[hash].Id))
continue
}

View File

@ -78,6 +78,24 @@ func (uc *UserCacheType) GetByUserIds(ids []int64) []*models.User {
return users
}
func (uc *UserCacheType) GetMaintainerUsers() []*models.User {
uc.RLock()
defer uc.RUnlock()
var users []*models.User
for _, v := range uc.users {
if v.Maintainer == 1 {
users = append(users, v)
}
}
if users == nil {
users = []*models.User{}
}
return users
}
func SyncUsers() {
err := syncUsers()
if err != nil {

View File

@ -4,10 +4,13 @@ import (
"context"
"fmt"
"os"
"github.com/go-redis/redis/v8"
"gorm.io/gorm"
"strings"
"time"
"github.com/didi/nightingale/v5/src/pkg/ormx"
"github.com/didi/nightingale/v5/src/pkg/tls"
"github.com/go-redis/redis/v8"
"gorm.io/gorm"
)
type RedisConfig struct {
@ -17,6 +20,8 @@ type RedisConfig struct {
DB int
UseTLS bool
tls.ClientConfig
RedisType string
MasterName string
}
var DB *gorm.DB
@ -29,26 +34,81 @@ func InitDB(cfg ormx.DBConfig) error {
return err
}
var Redis *redis.Client
var Redis interface {
Del(ctx context.Context, keys ...string) *redis.IntCmd
Get(ctx context.Context, key string) *redis.StringCmd
Set(ctx context.Context, key string, value interface{}, expiration time.Duration) *redis.StatusCmd
HGetAll(ctx context.Context, key string) *redis.StringStringMapCmd
HSet(ctx context.Context, key string, values ...interface{}) *redis.IntCmd
HDel(ctx context.Context, key string, fields ...string) *redis.IntCmd
Close() error
Ping(ctx context.Context) *redis.StatusCmd
Publish(ctx context.Context, channel string, message interface{}) *redis.IntCmd
}
func InitRedis(cfg RedisConfig) (func(), error) {
redisOptions := &redis.Options{
Addr: cfg.Address,
Username: cfg.Username,
Password: cfg.Password,
DB: cfg.DB,
}
if cfg.UseTLS {
tlsConfig, err := cfg.TLSConfig()
if err != nil {
fmt.Println("failed to init redis tls config:", err)
os.Exit(1)
switch cfg.RedisType {
case "standalone", "":
redisOptions := &redis.Options{
Addr: cfg.Address,
Username: cfg.Username,
Password: cfg.Password,
DB: cfg.DB,
}
redisOptions.TLSConfig = tlsConfig
}
Redis = redis.NewClient(redisOptions)
if cfg.UseTLS {
tlsConfig, err := cfg.TLSConfig()
if err != nil {
fmt.Println("failed to init redis tls config:", err)
os.Exit(1)
}
redisOptions.TLSConfig = tlsConfig
}
Redis = redis.NewClient(redisOptions)
case "cluster":
redisOptions := &redis.ClusterOptions{
Addrs: strings.Split(cfg.Address, ","),
Username: cfg.Username,
Password: cfg.Password,
}
if cfg.UseTLS {
tlsConfig, err := cfg.TLSConfig()
if err != nil {
fmt.Println("failed to init redis tls config:", err)
os.Exit(1)
}
redisOptions.TLSConfig = tlsConfig
}
Redis = redis.NewClusterClient(redisOptions)
case "sentinel":
redisOptions := &redis.FailoverOptions{
MasterName: cfg.MasterName,
SentinelAddrs: strings.Split(cfg.Address, ","),
Username: cfg.Username,
Password: cfg.Password,
DB: cfg.DB,
}
if cfg.UseTLS {
tlsConfig, err := cfg.TLSConfig()
if err != nil {
fmt.Println("failed to init redis tls config:", err)
os.Exit(1)
}
redisOptions.TLSConfig = tlsConfig
}
Redis = redis.NewFailoverClient(redisOptions)
default:
fmt.Println("failed to init redis , redis type is illegal:", cfg.RedisType)
os.Exit(1)
}
err := Redis.Ping(context.Background()).Err()
if err != nil {

View File

@ -218,7 +218,7 @@ func configRoute(r *gin.Engine, version string) {
pages.PUT("/busi-group/:id/alert-rule/:arid", jwtAuth(), user(), perm("/alert-rules/put"), alertRulePutByFE)
pages.GET("/alert-rule/:arid", jwtAuth(), user(), perm("/alert-rules"), alertRuleGet)
pages.GET("/busi-group/:id/alert-mutes", jwtAuth(), user(), perm("/alert-mutes"), bgro(), alertMuteGets)
pages.GET("/busi-group/:id/alert-mutes", jwtAuth(), user(), perm("/alert-mutes"), bgro(), alertMuteGetsByBG)
pages.POST("/busi-group/:id/alert-mutes", jwtAuth(), user(), perm("/alert-mutes/add"), bgrw(), alertMuteAdd)
pages.DELETE("/busi-group/:id/alert-mutes", jwtAuth(), user(), perm("/alert-mutes/del"), bgrw(), alertMuteDel)
@ -279,8 +279,15 @@ func configRoute(r *gin.Engine, version string) {
service.GET("/alert-rules", alertRuleGets)
service.POST("/alert-rules", alertRuleAddByService)
service.DELETE("/alert-rules", alertRuleDel)
service.PUT("/alert-rule", alertRulePutByService)
service.PUT("/alert-rule/:arid", alertRulePutByService)
service.GET("/alert-rule/:arid", alertRuleGet)
service.GET("/alert-rules-get-by-prod", alertRulesGetByProds)
service.GET("/alert-mutes", alertMuteGets)
service.POST("/alert-mutes", alertMuteAddByService)
service.DELETE("/alert-mutes", alertMuteDel)
pages.GET("/alert-cur-events", alertCurEventsList)
pages.GET("/alert-his-events", alertHisEventsList)
}
}

View File

@ -45,9 +45,10 @@ func alertCurEventsCard(c *gin.Context) {
busiGroupId := ginx.QueryInt64(c, "bgid", 0)
clusters := queryClusters(c)
rules := parseAggrRules(c)
prod := ginx.QueryStr(c, "prod", "")
// 最多获取50000个获取太多也没啥意义
list, err := models.AlertCurEventGets(busiGroupId, stime, etime, severity, clusters, query, 50000, 0)
list, err := models.AlertCurEventGets(prod, busiGroupId, stime, etime, severity, clusters, query, 50000, 0)
ginx.Dangerous(err)
cardmap := make(map[string]*AlertCard)
@ -121,11 +122,12 @@ func alertCurEventsList(c *gin.Context) {
limit := ginx.QueryInt(c, "limit", 20)
busiGroupId := ginx.QueryInt64(c, "bgid", 0)
clusters := queryClusters(c)
prod := ginx.QueryStr(c, "prod", "")
total, err := models.AlertCurEventTotal(busiGroupId, stime, etime, severity, clusters, query)
total, err := models.AlertCurEventTotal(prod, busiGroupId, stime, etime, severity, clusters, query)
ginx.Dangerous(err)
list, err := models.AlertCurEventGets(busiGroupId, stime, etime, severity, clusters, query, limit, ginx.Offset(c, limit))
list, err := models.AlertCurEventGets(prod, busiGroupId, stime, etime, severity, clusters, query, limit, ginx.Offset(c, limit))
ginx.Dangerous(err)
cache := make(map[int64]*models.UserGroup)

View File

@ -34,11 +34,12 @@ func alertHisEventsList(c *gin.Context) {
limit := ginx.QueryInt(c, "limit", 20)
busiGroupId := ginx.QueryInt64(c, "bgid", 0)
clusters := queryClusters(c)
prod := ginx.QueryStr(c, "prod", "")
total, err := models.AlertHisEventTotal(busiGroupId, stime, etime, severity, recovered, clusters, query)
total, err := models.AlertHisEventTotal(prod, busiGroupId, stime, etime, severity, recovered, clusters, query)
ginx.Dangerous(err)
list, err := models.AlertHisEventGets(busiGroupId, stime, etime, severity, recovered, clusters, query, limit, ginx.Offset(c, limit))
list, err := models.AlertHisEventGets(prod, busiGroupId, stime, etime, severity, recovered, clusters, query, limit, ginx.Offset(c, limit))
ginx.Dangerous(err)
cache := make(map[int64]*models.UserGroup)

View File

@ -8,9 +8,17 @@ import (
)
// Return all, front-end search and paging
func alertMuteGets(c *gin.Context) {
func alertMuteGetsByBG(c *gin.Context) {
bgid := ginx.UrlParamInt64(c, "id")
lst, err := models.AlertMuteGets(bgid)
lst, err := models.AlertMuteGetsByBG(bgid)
ginx.NewRender(c).Data(lst, err)
}
func alertMuteGets(c *gin.Context) {
bgid := ginx.QueryInt64(c, "bgid", 0)
query := ginx.QueryStr(c, "query", "")
lst, err := models.AlertMuteGets(bgid, query)
ginx.NewRender(c).Data(lst, err)
}
@ -25,6 +33,14 @@ func alertMuteAdd(c *gin.Context) {
ginx.NewRender(c).Message(f.Add())
}
func alertMuteAddByService(c *gin.Context) {
var f models.AlertMute
ginx.BindJSON(c, &f)
f.GroupId = 0
ginx.NewRender(c).Message(f.Add())
}
func alertMuteDel(c *gin.Context) {
var f idsForm
ginx.BindJSON(c, &f)