Merge branch 'main' of github.com:didi/nightingale

This commit is contained in:
UlricQin 2022-01-04 16:47:15 +08:00
commit 40e7ede5e3
41 changed files with 632 additions and 201 deletions

View File

@ -2,7 +2,7 @@
NOW = $(shell date -u '+%Y%m%d%I%M%S')
RELEASE_VERSION = 5.0.0-ga-05
RELEASE_VERSION = 5.2.0
APP = n9e
SERVER_BIN = $(APP)

View File

@ -1,25 +1,28 @@
## 简介
## Introduction
Nightingale, Prometheus enterprise edition
💡 A Distributed and High-Performance Monitoring System. Prometheus enterprise edition.
## Architecture
## 文档
![n9e-architecture](doc/img/arch.png)
- 国外:[https://n9e.github.io/](https://n9e.github.io/)
- 国内:[https://n9e.gitee.io/](https://n9e.gitee.io/)
- [v4老文档](https://gitee.com/n9e/book/tree/master/content/v4/docs)
## Docs
- github: [https://n9e.github.io/](https://n9e.github.io/)
- gitee: [https://n9e.gitee.io/](https://n9e.gitee.io/)
- v4(old version): [https://n9e.didiyun.com/](https://n9e.didiyun.com/)
## TODO
- [x] deploy nightingale in docker
- [x] export /metrics endpoint
- [ ] notify.py support feishu
- [x] notify.py support feishu
- [ ] notify.py support sms
- [ ] notify.py support voice
- [ ] support remote write api
- [x] support remote write api
- [ ] support pushgateway api
## 大本营
## Any questions?
微信公众号:`__n9e__`(夜莺监控),回复“加群”可以加入交流群,回复“星球”可加入知识星球提问题
[Click me](https://s3-gz01.didistatic.com/n9e-pub/image/n9e-wx.png)

BIN
doc/img/arch.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 198 KiB

View File

@ -79,7 +79,7 @@ services:
- "server"
nwebapi:
image: ulric2019/nightingale:5.0.0-ga-05
image: ulric2019/nightingale:5.1.0
container_name: nwebapi
hostname: nwebapi
restart: always
@ -107,7 +107,7 @@ services:
- "webapi"
nserver:
image: ulric2019/nightingale:5.0.0-ga-05
image: ulric2019/nightingale:5.1.0
container_name: nserver
hostname: nserver
restart: always

View File

@ -155,7 +155,7 @@ CREATE TABLE `dashboard` (
`group_id` bigint not null default 0 comment 'busi group id',
`name` varchar(191) not null,
`tags` varchar(255) not null comment 'split by space',
`configs` varchar(4096) comment 'dashboard variables',
`configs` varchar(8192) comment 'dashboard variables',
`create_at` bigint not null default 0,
`create_by` varchar(64) not null default '',
`update_at` bigint not null default 0,
@ -186,7 +186,7 @@ CREATE TABLE `chart` (
CREATE TABLE `chart_share` (
`id` bigint unsigned not null auto_increment,
`cluster` varchar(128) not null,
`configs` varchar(8192),
`configs` text,
`create_at` bigint not null default 0,
`create_by` varchar(64) not null default '',
primary key (`id`),
@ -202,7 +202,7 @@ CREATE TABLE `alert_rule` (
`severity` tinyint(1) not null comment '0:Emergency 1:Warning 2:Notice',
`disabled` tinyint(1) not null comment '0:enabled 1:disabled',
`prom_for_duration` int not null comment 'prometheus for, unit:s',
`prom_ql` varchar(4096) not null comment 'promql',
`prom_ql` varchar(8192) not null comment 'promql',
`prom_eval_interval` int not null comment 'evaluate interval',
`enable_stime` char(5) not null default '00:00',
`enable_etime` char(5) not null default '23:59',
@ -227,7 +227,7 @@ CREATE TABLE `alert_mute` (
`id` bigint unsigned not null auto_increment,
`group_id` bigint not null default 0 comment 'busi group id',
`cluster` varchar(128) not null,
`tags` varchar(2048) not null default '' comment 'json,map,tagkey->regexp|value',
`tags` varchar(4096) not null default '' comment 'json,map,tagkey->regexp|value',
`cause` varchar(255) not null default '',
`btime` bigint not null default 0 comment 'begin time',
`etime` bigint not null default 0 comment 'end time',
@ -243,7 +243,7 @@ CREATE TABLE `alert_subscribe` (
`group_id` bigint not null default 0 comment 'busi group id',
`cluster` varchar(128) not null,
`rule_id` bigint not null default 0,
`tags` varchar(2048) not null default '' comment 'json,map,tagkey->regexp|value',
`tags` varchar(4096) not null default '' comment 'json,map,tagkey->regexp|value',
`redefine_severity` tinyint(1) default 0 comment 'is redefine severity?',
`new_severity` tinyint(1) not null comment '0:Emergency 1:Warning 2:Notice',
`redefine_channels` tinyint(1) default 0 comment 'is redefine channels?',
@ -302,7 +302,7 @@ CREATE TABLE `alert_cur_event` (
`rule_note` varchar(512) not null default 'alert rule note',
`severity` tinyint(1) not null comment '0:Emergency 1:Warning 2:Notice',
`prom_for_duration` int not null comment 'prometheus for, unit:s',
`prom_ql` varchar(4096) not null comment 'promql',
`prom_ql` varchar(8192) not null comment 'promql',
`prom_eval_interval` int not null comment 'evaluate interval',
`callbacks` varchar(255) not null default '' comment 'split by space: http://a.com/api/x http://a.com/api/y',
`runbook_url` varchar(255),
@ -333,7 +333,7 @@ CREATE TABLE `alert_his_event` (
`rule_note` varchar(512) not null default 'alert rule note',
`severity` tinyint(1) not null comment '0:Emergency 1:Warning 2:Notice',
`prom_for_duration` int not null comment 'prometheus for, unit:s',
`prom_ql` varchar(4096) not null comment 'promql',
`prom_ql` varchar(8192) not null comment 'promql',
`prom_eval_interval` int not null comment 'evaluate interval',
`callbacks` varchar(255) not null default '' comment 'split by space: http://a.com/api/x http://a.com/api/y',
`runbook_url` varchar(255),

View File

@ -24,6 +24,10 @@ mail_from = "ulricqin@163.com"
class Sender(object):
@classmethod
def send_email(cls, payload):
if mail_user == "ulricqin" and mail_pass == "password":
print("invalid smtp configuration")
return
users = payload.get('event').get("notify_users_obj")
emails = {}

View File

@ -54,6 +54,7 @@ Interval = 1000
[Alerting]
NotifyScriptPath = "./etc/script/notify.py"
NotifyConcurrency = 100
TemplatesDir = "./etc/template"
[Alerting.RedisPub]
Enable = false

View File

@ -7,6 +7,9 @@ RunMode = "release"
# do not change
AdminRole = "Admin"
# metrics descriptions
MetricsYamlFile = "./etc/metrics.yaml"
# Linkage with notify.py script
NotifyChannels = [ "email", "dingtalk", "wecom", "feishu" ]

View File

@ -27,3 +27,8 @@ scrape_configs:
static_configs:
- targets: ['localhost:9090']
- job_name: 'n9e'
file_sd_configs:
- files:
- targets.json

View File

@ -0,0 +1,7 @@
[
{
"targets": [
"nwebapi:18000","nserver:19000"
]
}
]

View File

@ -6,6 +6,9 @@ import urllib2
import smtplib
from email.mime.text import MIMEText
reload(sys)
sys.setdefaultencoding('utf8')
notify_channel_funcs = {
"email":"email",
"sms":"sms",
@ -24,6 +27,10 @@ mail_from = "ulricqin@163.com"
class Sender(object):
@classmethod
def send_email(cls, payload):
if mail_user == "ulricqin" and mail_pass == "password":
print("invalid smtp configuration")
return
users = payload.get('event').get("notify_users_obj")
emails = {}
@ -82,7 +89,13 @@ class Sender(object):
@classmethod
def send_dingtalk(cls, payload):
users = payload.get('event').get("notify_users_obj")
event = payload.get('event')
users = event.get("notify_users_obj")
rule_name = event.get("rule_name")
event_state = "Triggered"
if event.get("is_recovered"):
event_state = "Recovered"
tokens = {}
phones = {}
@ -101,9 +114,10 @@ class Sender(object):
for t in tokens:
url = "https://oapi.dingtalk.com/robot/send?access_token={}".format(t)
body = {
"msgtype": "text",
"text": {
"content": payload.get('tpls').get("dingtalk.tpl", "dingtalk.tpl not found")
"msgtype": "markdown",
"markdown": {
"title": "{} - {}".format(event_state, rule_name),
"text": payload.get('tpls').get("dingtalk.tpl", "dingtalk.tpl not found") + ' '.join(["@"+i for i in phones.keys()])
},
"at": {
"atMobiles": phones.keys(),

View File

@ -57,6 +57,7 @@ Interval = 1000
[Alerting]
NotifyScriptPath = "./etc/script/notify.py"
NotifyConcurrency = 100
TemplatesDir = "./etc/template"
[Alerting.RedisPub]
Enable = false

View File

@ -1,6 +1,11 @@
级别状态: S{{.Severity}} {{if .IsRecovered}}Recovered{{else}}Triggered{{end}}
规则名称: {{.RuleName}}{{if .RuleNote}}
规则备注: {{.RuleNote}}{{end}}
监控指标: {{.TagsJSON}}
{{if .IsRecovered}}恢复时间:{{timeformat .LastEvalTime}}{{else}}触发时间: {{timeformat .TriggerTime}}
触发时值: {{.TriggerValue}}{{end}}
#### {{if .IsRecovered}}<font color="#008800">S{{.Severity}} - Recovered - {{.RuleName}}</font>{{else}}<font color="#FF0000">S{{.Severity}} - Triggered - {{.RuleName}}</font>{{end}}
---
- **规则标题**: {{.RuleName}}{{if .RuleNote}}
- **规则备注**: {{.RuleNote}}{{end}}
- **监控指标**: {{.TagsJSON}}
- {{if .IsRecovered}}**恢复时间**{{timeformat .LastEvalTime}}{{else}}**触发时间**: {{timeformat .TriggerTime}}
- **触发时值**: {{.TriggerValue}}{{end}}
- **发送时间**: {{timestamp}}

View File

@ -3,4 +3,5 @@
规则备注: {{.RuleNote}}{{end}}
监控指标: {{.TagsJSON}}
{{if .IsRecovered}}恢复时间:{{timeformat .LastEvalTime}}{{else}}触发时间: {{timeformat .TriggerTime}}
触发时值: {{.TriggerValue}}{{end}}
触发时值: {{.TriggerValue}}{{end}}
发送时间: {{timestamp}}

View File

@ -181,6 +181,13 @@
</tr>
{{end}}
<tr>
<th>发送时间:</th>
<td>
{{timestamp}}
</td>
</tr>
<tr>
<th>PromQL</th>
<td>

View File

@ -3,4 +3,5 @@
**规则备注**: {{.RuleNote}}{{end}}
**监控指标**: {{.TagsJSON}}
{{if .IsRecovered}}**恢复时间**{{timeformat .LastEvalTime}}{{else}}**触发时间**: {{timeformat .TriggerTime}}
**触发时值**: {{.TriggerValue}}{{end}}
**触发时值**: {{.TriggerValue}}{{end}}
**发送时间**: {{timestamp}}

View File

@ -7,6 +7,9 @@ RunMode = "release"
# do not change
AdminRole = "Admin"
# metrics descriptions
MetricsYamlFile = "./etc/metrics.yaml"
# Linkage with notify.py script
NotifyChannels = [ "email", "dingtalk", "wecom", "feishu" ]

1
go.mod
View File

@ -10,6 +10,7 @@ require (
github.com/gin-gonic/gin v1.7.4
github.com/go-ldap/ldap/v3 v3.4.1
github.com/go-redis/redis/v8 v8.11.3
github.com/gogo/protobuf v1.1.1
github.com/golang-jwt/jwt v3.2.2+incompatible
github.com/golang/protobuf v1.5.2
github.com/golang/snappy v0.0.4

View File

@ -104,9 +104,8 @@ func AlertMuteDel(ids []int64) error {
return DB().Where("id in ?", ids).Delete(new(AlertMute)).Error
}
func AlertMuteStatistics(cluster string, btime int64) (*Statistics, error) {
session := DB().Model(&AlertMute{}).Select("count(*) as total", "max(create_at) as last_updated").Where("btime <= ?", btime)
func AlertMuteStatistics(cluster string) (*Statistics, error) {
session := DB().Model(&AlertMute{}).Select("count(*) as total", "max(create_at) as last_updated")
if cluster != "" {
session = session.Where("cluster = ?", cluster)
}
@ -120,7 +119,7 @@ func AlertMuteStatistics(cluster string, btime int64) (*Statistics, error) {
return stats[0], nil
}
func AlertMuteGetsByCluster(cluster string, btime int64) ([]*AlertMute, error) {
func AlertMuteGetsByCluster(cluster string) ([]*AlertMute, error) {
// clean expired first
buf := int64(30)
err := DB().Where("etime < ?", time.Now().Unix()+buf).Delete(new(AlertMute)).Error
@ -129,7 +128,7 @@ func AlertMuteGetsByCluster(cluster string, btime int64) ([]*AlertMute, error) {
}
// get my cluster's mutes
session := DB().Model(&AlertMute{}).Where("btime <= ?", btime)
session := DB().Model(&AlertMute{})
if cluster != "" {
session = session.Where("cluster = ?", cluster)
}

View File

@ -12,7 +12,6 @@ import (
"github.com/didi/nightingale/v5/src/pkg/httpx"
"github.com/didi/nightingale/v5/src/pkg/logx"
"github.com/didi/nightingale/v5/src/server/naming"
"github.com/didi/nightingale/v5/src/server/reader"
"github.com/didi/nightingale/v5/src/server/writer"
"github.com/didi/nightingale/v5/src/storage"
@ -77,7 +76,6 @@ func MustLoad(fpaths ...string) {
}
C.Heartbeat.Endpoint = fmt.Sprintf("%s:%d", C.Heartbeat.IP, C.HTTP.Port)
C.Heartbeat.Cluster = C.ClusterName
C.Alerting.RedisPub.ChannelKey = C.Alerting.RedisPub.ChannelPrefix + C.ClusterName
@ -93,7 +91,7 @@ type Config struct {
Log logx.Config
HTTP httpx.Config
BasicAuth gin.Accounts
Heartbeat naming.HeartbeatConfig
Heartbeat HeartbeatConfig
Alerting Alerting
NoData NoData
Redis storage.RedisConfig
@ -106,9 +104,16 @@ type Config struct {
Ibex Ibex
}
type HeartbeatConfig struct {
IP string
Interval int64
Endpoint string
}
type Alerting struct {
NotifyScriptPath string
NotifyConcurrency int
TemplatesDir string
RedisPub RedisPub
}

View File

@ -58,36 +58,7 @@ func persist(event *models.AlertCurEvent) {
his := event.ToHis()
if has {
// 数据库里有这个事件,说明之前触发过了
if event.IsRecovered {
// 本次恢复了,把未恢复的事件删除,在全量告警里添加记录
err := models.AlertCurEventDelByHash(event.Hash)
if err != nil {
logger.Errorf("event_del_cur_fail: %v hash=%s", err, event.Hash)
}
if err := his.Add(); err != nil {
logger.Errorf(
"event_persist_his_fail: %v rule_id=%d hash=%s tags=%v timestamp=%d value=%s",
err,
event.RuleId,
event.Hash,
event.TagsJSON,
event.TriggerTime,
event.TriggerValue,
)
}
}
return
}
if event.IsRecovered {
// alert_cur_event表里没有数据表示之前没告警结果现在报了恢复神奇....理论上不应该出现的
return
}
// 本次是告警alert_cur_event表里也没有数据
// 不管是告警还是恢复,全量告警里都要记录
if err := his.Add(); err != nil {
logger.Errorf(
"event_persist_his_fail: %v rule_id=%d hash=%s tags=%v timestamp=%d value=%s",
@ -100,6 +71,41 @@ func persist(event *models.AlertCurEvent) {
)
}
if has {
// 活跃告警表中有记录,删之
err = models.AlertCurEventDelByHash(event.Hash)
if err != nil {
logger.Errorf("event_del_cur_fail: %v hash=%s", err, event.Hash)
return
}
if !event.IsRecovered {
// 恢复事件从活跃告警列表彻底删掉告警事件要重新加进来新的event
// use his id as cur id
event.Id = his.Id
if event.Id > 0 {
if err := event.Add(); err != nil {
logger.Errorf(
"event_persist_cur_fail: %v rule_id=%d hash=%s tags=%v timestamp=%d value=%s",
err,
event.RuleId,
event.Hash,
event.TagsJSON,
event.TriggerTime,
event.TriggerValue,
)
}
}
}
return
}
if event.IsRecovered {
// alert_cur_event表里没有数据表示之前没告警结果现在报了恢复神奇....理论上不应该出现的
return
}
// use his id as cur id
event.Id = his.Id
if event.Id > 0 {

View File

@ -20,9 +20,6 @@ func Start(ctx context.Context) error {
// filter my rules and start worker
go loopFilterRules(ctx)
// repeat notifier
go loopRepeat(ctx)
go reportQueueSize()
return nil

View File

@ -5,14 +5,15 @@ import (
"github.com/didi/nightingale/v5/src/server/memsto"
)
func isMuted(event *models.AlertCurEvent) bool {
// 如果传入了clock这个可选参数就表示使用这个clock表示的时间否则就从event的字段中取TriggerTime
func isMuted(event *models.AlertCurEvent, clock ...int64) bool {
mutes, has := memsto.AlertMuteCache.Gets(event.GroupId)
if !has || len(mutes) == 0 {
return false
}
for i := 0; i < len(mutes); i++ {
if matchMute(event, mutes[i]) {
if matchMute(event, mutes[i], clock...) {
return true
}
}
@ -20,8 +21,13 @@ func isMuted(event *models.AlertCurEvent) bool {
return false
}
func matchMute(event *models.AlertCurEvent, mute *models.AlertMute) bool {
if event.TriggerTime < mute.Btime || event.TriggerTime > mute.Etime {
func matchMute(event *models.AlertCurEvent, mute *models.AlertMute, clock ...int64) bool {
ts := event.TriggerTime
if len(clock) > 0 {
ts = clock[0]
}
if ts < mute.Btime || ts > mute.Etime {
return false
}

View File

@ -44,15 +44,17 @@ var fns = template.FuncMap{
}
func initTpls() error {
tplDir := path.Join(runner.Cwd, "etc", "template")
if config.C.Alerting.TemplatesDir == "" {
config.C.Alerting.TemplatesDir = path.Join(runner.Cwd, "etc", "template")
}
filenames, err := file.FilesUnder(tplDir)
filenames, err := file.FilesUnder(config.C.Alerting.TemplatesDir)
if err != nil {
return errors.WithMessage(err, "failed to exec FilesUnder")
}
if len(filenames) == 0 {
return errors.New("no tpl files under " + tplDir)
return errors.New("no tpl files under " + config.C.Alerting.TemplatesDir)
}
tplFiles := make([]string, 0, len(filenames))
@ -63,11 +65,11 @@ func initTpls() error {
}
if len(tplFiles) == 0 {
return errors.New("no tpl files under " + tplDir)
return errors.New("no tpl files under " + config.C.Alerting.TemplatesDir)
}
for i := 0; i < len(tplFiles); i++ {
tplpath := path.Join(tplDir, tplFiles[i])
tplpath := path.Join(config.C.Alerting.TemplatesDir, tplFiles[i])
tpl, err := template.New(tplFiles[i]).Funcs(fns).ParseFiles(tplpath)
if err != nil {

View File

@ -1,66 +0,0 @@
package engine
import (
"context"
"time"
"github.com/didi/nightingale/v5/src/models"
"github.com/didi/nightingale/v5/src/server/config"
"github.com/didi/nightingale/v5/src/server/memsto"
"github.com/toolkits/pkg/logger"
)
func loopRepeat(ctx context.Context) {
duration := time.Duration(9000) * time.Millisecond
for {
select {
case <-ctx.Done():
return
case <-time.After(duration):
repeat()
}
}
}
// 拉取未恢复的告警表中需要重复通知的数据
func repeat() {
events, err := models.AlertCurEventNeedRepeat(config.C.ClusterName)
if err != nil {
logger.Errorf("repeat: AlertCurEventNeedRepeat: %v", err)
return
}
if len(events) == 0 {
return
}
for i := 0; i < len(events); i++ {
event := events[i]
rule := memsto.AlertRuleCache.Get(event.RuleId)
if rule == nil {
continue
}
if rule.NotifyRepeatStep == 0 {
// 用户后来调整了这个字段,不让继续发送了
continue
}
event.DB2Mem()
if isNoneffective(event.TriggerTime, rule) {
continue
}
if isMuted(event) {
continue
}
fillUsers(event)
notify(event)
if err = event.IncRepeatStep(int64(rule.NotifyRepeatStep * 60)); err != nil {
logger.Errorf("repeat: IncRepeatStep: %v", err)
}
}
}

View File

@ -187,6 +187,10 @@ func (r RuleEval) judge(vectors []Vector) {
alertingKeys := make(map[string]struct{})
now := time.Now().Unix()
for i := 0; i < count; i++ {
// compute hash
hash := str.MD5(fmt.Sprintf("%d_%s", r.rule.Id, vectors[i].Key))
alertingKeys[hash] = struct{}{}
// rule disabled in this time span?
if isNoneffective(vectors[i].Timestamp, r.rule) {
continue
@ -226,10 +230,6 @@ func (r RuleEval) judge(vectors []Vector) {
continue
}
// compute hash
hash := str.MD5(fmt.Sprintf("%d_%s", r.rule.Id, vectors[i].Key))
alertingKeys[hash] = struct{}{}
tagsArr := labelMapToArr(tagsMap)
sort.Strings(tagsArr)
@ -288,14 +288,8 @@ func labelMapToArr(m map[string]string) []string {
}
func (r RuleEval) handleNewEvent(event *models.AlertCurEvent) {
if _, has := r.fires[event.Hash]; has {
// fired before, nothing to do
return
}
if event.PromForDuration == 0 {
r.fires[event.Hash] = event
pushEventToQueue(event)
r.fireEvent(event)
return
}
@ -307,6 +301,23 @@ func (r RuleEval) handleNewEvent(event *models.AlertCurEvent) {
}
if r.pendings[event.Hash].LastEvalTime-r.pendings[event.Hash].TriggerTime > int64(event.PromForDuration) {
r.fireEvent(event)
}
}
func (r RuleEval) fireEvent(event *models.AlertCurEvent) {
if fired, has := r.fires[event.Hash]; has {
if r.rule.NotifyRepeatStep == 0 {
// 说明不想重复通知那就直接返回了nothing to do
return
}
// 之前发送过告警了,这次是否要继续发送,要看是否过了通道静默时间
if event.LastEvalTime > fired.LastEvalTime+int64(r.rule.NotifyRepeatStep)*60 {
r.fires[event.Hash] = event
pushEventToQueue(event)
}
} else {
r.fires[event.Hash] = event
pushEventToQueue(event)
}

View File

@ -3,7 +3,6 @@ package idents
import (
"context"
"fmt"
"sort"
"strconv"
"time"
@ -92,20 +91,13 @@ func loopPushMetrics(ctx context.Context) {
}
func pushMetrics() {
servers, err := naming.ActiveServers(context.Background(), config.C.ClusterName)
isLeader, err := naming.IamLeader()
if err != nil {
logger.Errorf("handle_idents: failed to get active servers: %v", err)
logger.Errorf("handle_idents: %v", err)
return
}
if len(servers) == 0 {
logger.Errorf("handle_idents: active servers empty")
return
}
sort.Strings(servers)
if config.C.Heartbeat.Endpoint != servers[0] {
if !isLeader {
logger.Info("handle_idents: i am not leader")
return
}

View File

@ -89,9 +89,8 @@ func loopSyncAlertMutes() {
func syncAlertMutes() error {
start := time.Now()
btime := start.Unix() - int64(30)
stat, err := models.AlertMuteStatistics(config.C.ClusterName, btime)
stat, err := models.AlertMuteStatistics(config.C.ClusterName)
if err != nil {
return errors.WithMessage(err, "failed to exec AlertMuteStatistics")
}
@ -103,7 +102,7 @@ func syncAlertMutes() error {
return nil
}
lst, err := models.AlertMuteGetsByCluster(config.C.ClusterName, btime)
lst, err := models.AlertMuteGetsByCluster(config.C.ClusterName)
if err != nil {
return errors.WithMessage(err, "failed to exec AlertMuteGetsByCluster")
}

View File

@ -10,34 +10,28 @@ import (
"github.com/toolkits/pkg/logger"
"github.com/didi/nightingale/v5/src/server/config"
"github.com/didi/nightingale/v5/src/storage"
)
// local servers
var localss string
type HeartbeatConfig struct {
IP string
Interval int64
Endpoint string
Cluster string
}
func Heartbeat(ctx context.Context, cfg HeartbeatConfig) error {
if err := heartbeat(ctx, cfg); err != nil {
func Heartbeat(ctx context.Context) error {
if err := heartbeat(ctx); err != nil {
fmt.Println("failed to heartbeat:", err)
return err
}
go loopHeartbeat(ctx, cfg)
go loopHeartbeat(ctx)
return nil
}
func loopHeartbeat(ctx context.Context, cfg HeartbeatConfig) {
interval := time.Duration(cfg.Interval) * time.Millisecond
func loopHeartbeat(ctx context.Context) {
interval := time.Duration(config.C.Heartbeat.Interval) * time.Millisecond
for {
time.Sleep(interval)
if err := heartbeat(ctx, cfg); err != nil {
if err := heartbeat(ctx); err != nil {
logger.Warning(err)
}
}
@ -52,15 +46,15 @@ func redisKey(cluster string) string {
return fmt.Sprintf("/server/heartbeat/%s", cluster)
}
func heartbeat(ctx context.Context, cfg HeartbeatConfig) error {
func heartbeat(ctx context.Context) error {
now := time.Now().Unix()
key := redisKey(cfg.Cluster)
err := storage.Redis.HSet(ctx, key, cfg.Endpoint, now).Err()
key := redisKey(config.C.ClusterName)
err := storage.Redis.HSet(ctx, key, config.C.Heartbeat.Endpoint, now).Err()
if err != nil {
return err
}
servers, err := ActiveServers(ctx, cfg.Cluster)
servers, err := ActiveServers(ctx, config.C.ClusterName)
if err != nil {
return err
}

View File

@ -0,0 +1,26 @@
package naming
import (
"context"
"sort"
"github.com/didi/nightingale/v5/src/server/config"
"github.com/toolkits/pkg/logger"
)
func IamLeader() (bool, error) {
servers, err := ActiveServers(context.Background(), config.C.ClusterName)
if err != nil {
logger.Errorf("failed to get active servers: %v", err)
return false, err
}
if len(servers) == 0 {
logger.Errorf("active servers empty")
return false, err
}
sort.Strings(servers)
return config.C.Heartbeat.Endpoint == servers[0], nil
}

View File

@ -65,12 +65,17 @@ func configRoute(r *gin.Engine, version string) {
ginx.NewRender(c).Data(lst, err)
})
// use apiKey not basic auth
r.POST("/datadog/api/v1/series", datadogSeries)
if len(config.C.BasicAuth) > 0 {
auth := gin.BasicAuth(config.C.BasicAuth)
r.Use(auth)
}
r.POST("/opentsdb/put", handleOpenTSDB)
r.POST("/prometheus/v1/write", remoteWrite)
r.POST("/prometheus/v1/query", queryPromql)
r.GET("/memory/alert-rule", alertRuleGet)
r.GET("/memory/idents", identsGets)
@ -80,7 +85,5 @@ func configRoute(r *gin.Engine, version string) {
r.GET("/memory/user", userGet)
r.GET("/memory/user-group", userGroupGet)
r.POST("/prom/vectors", vectorsPost)
r.GET("/metrics", gin.WrapH(promhttp.Handler()))
}

View File

@ -0,0 +1,268 @@
package router
import (
"compress/gzip"
"compress/zlib"
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"strings"
"time"
"github.com/didi/nightingale/v5/src/server/config"
"github.com/didi/nightingale/v5/src/server/idents"
"github.com/didi/nightingale/v5/src/server/memsto"
promstat "github.com/didi/nightingale/v5/src/server/stat"
"github.com/didi/nightingale/v5/src/server/writer"
"github.com/gin-gonic/gin"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/prompb"
)
type TimeSeries struct {
Series []*DatadogMetric `json:"series"`
}
type DatadogMetric struct {
Metric string `json:"metric"`
Points []DatadogPoint `json:"points"`
Host string `json:"host"`
Tags []string `json:"tags,omitempty"`
}
type DatadogPoint [2]float64
func (m *DatadogMetric) Clean() error {
if m.Metric == "" {
return fmt.Errorf("metric is blank")
}
return nil
}
func (m *DatadogMetric) ToProm() (*prompb.TimeSeries, string, error) {
pt := &prompb.TimeSeries{}
for i := 0; i < len(m.Points); i++ {
pt.Samples = append(pt.Samples, prompb.Sample{
// use ms
Timestamp: int64(m.Points[i][0]) * 1000,
Value: m.Points[i][1],
})
}
if strings.IndexByte(m.Metric, '.') != -1 {
m.Metric = strings.ReplaceAll(m.Metric, ".", "_")
}
if strings.IndexByte(m.Metric, '-') != -1 {
m.Metric = strings.ReplaceAll(m.Metric, "-", "_")
}
if !model.MetricNameRE.MatchString(m.Metric) {
return nil, "", fmt.Errorf("invalid metric name: %s", m.Metric)
}
pt.Labels = append(pt.Labels, &prompb.Label{
Name: model.MetricNameLabel,
Value: m.Metric,
})
identInTag := ""
hostInTag := ""
for i := 0; i < len(m.Tags); i++ {
arr := strings.SplitN(m.Tags[i], ":", 2)
if len(arr) != 2 {
continue
}
key := arr[0]
if key == "ident" {
// 如果tags中有ident那就用
identInTag = arr[1]
pt.Labels = append(pt.Labels, &prompb.Label{
Name: key,
Value: arr[1],
})
continue
}
if key == "host" {
hostInTag = arr[1]
continue
}
if strings.IndexByte(key, '.') != -1 {
key = strings.ReplaceAll(key, ".", "_")
}
if strings.IndexByte(key, '-') != -1 {
key = strings.ReplaceAll(key, "-", "_")
}
if !model.LabelNameRE.MatchString(key) {
return nil, "", fmt.Errorf("invalid tag name: %s", key)
}
pt.Labels = append(pt.Labels, &prompb.Label{
Name: key,
Value: arr[1],
})
}
if m.Host != "" {
// 以外层为准外层host字段覆盖标签中的host
hostInTag = m.Host
}
if hostInTag != "" {
if identInTag != "" {
pt.Labels = append(pt.Labels, &prompb.Label{
Name: "host",
Value: hostInTag,
})
} else {
pt.Labels = append(pt.Labels, &prompb.Label{
Name: "ident",
Value: hostInTag,
})
}
}
ident := hostInTag
if identInTag != "" {
ident = identInTag
}
return pt, ident, nil
}
func datadogSeries(c *gin.Context) {
apiKey, has := c.GetQuery("api_key")
if !has {
apiKey = ""
}
if len(config.C.BasicAuth) > 0 {
// n9e-server need basic auth
ok := false
for _, v := range config.C.BasicAuth {
if apiKey == v {
ok = true
break
}
}
if !ok {
c.String(http.StatusUnauthorized, "unauthorized")
return
}
}
var bs []byte
var err error
enc := c.GetHeader("Content-Encoding")
if enc == "gzip" {
r, err := gzip.NewReader(c.Request.Body)
if err != nil {
c.String(400, err.Error())
return
}
defer r.Close()
bs, err = ioutil.ReadAll(r)
} else if enc == "deflate" {
r, err := zlib.NewReader(c.Request.Body)
if err != nil {
c.String(400, err.Error())
return
}
defer r.Close()
bs, err = ioutil.ReadAll(r)
} else {
defer c.Request.Body.Close()
bs, err = ioutil.ReadAll(c.Request.Body)
}
if err != nil {
c.String(400, err.Error())
return
}
var series TimeSeries
err = json.Unmarshal(bs, &series)
if err != nil {
c.String(400, err.Error())
return
}
cnt := len(series.Series)
if cnt == 0 {
c.String(400, "series empty")
return
}
var (
succ int
fail int
msg = "data pushed to queue"
list []interface{}
ts = time.Now().Unix()
ids = make(map[string]interface{})
)
for i := 0; i < cnt; i++ {
item := series.Series[i]
if item == nil {
continue
}
if err = item.Clean(); err != nil {
fail++
continue
}
pt, ident, err := item.ToProm()
if err != nil {
fail++
continue
}
if ident != "" {
// register host
ids[ident] = ts
// fill tags
target, has := memsto.TargetCache.Get(ident)
if has {
for key, value := range target.TagsMap {
pt.Labels = append(pt.Labels, &prompb.Label{
Name: key,
Value: value,
})
}
}
}
list = append(list, pt)
succ++
}
if len(list) > 0 {
promstat.CounterSampleTotal.WithLabelValues(config.C.ClusterName, "datadog").Add(float64(len(list)))
if !writer.Writers.PushQueue(list) {
msg = "writer queue full"
}
idents.Idents.MSet(ids)
}
c.JSON(200, gin.H{
"succ": succ,
"fail": fail,
"msg": msg,
})
}

View File

@ -148,6 +148,11 @@ func handleOpenTSDB(c *gin.Context) {
arr = []HTTPMetric{one}
}
if err != nil {
c.String(400, err.Error())
return
}
var (
succ int
fail int
@ -193,7 +198,7 @@ func handleOpenTSDB(c *gin.Context) {
if len(list) > 0 {
promstat.CounterSampleTotal.WithLabelValues(config.C.ClusterName, "opentsdb").Add(float64(len(list)))
if !writer.Writers.PushQueue(list) {
msg = "wirter queue full"
msg = "writer queue full"
}
idents.Idents.MSet(ids)

View File

@ -1,21 +1,32 @@
package router
import (
"io"
"io/ioutil"
"net/http"
"time"
"github.com/gin-gonic/gin"
"github.com/gogo/protobuf/proto"
"github.com/golang/snappy"
"github.com/prometheus/prometheus/prompb"
"github.com/toolkits/pkg/ginx"
"github.com/didi/nightingale/v5/src/server/config"
"github.com/didi/nightingale/v5/src/server/engine"
"github.com/didi/nightingale/v5/src/server/idents"
"github.com/didi/nightingale/v5/src/server/memsto"
"github.com/didi/nightingale/v5/src/server/reader"
promstat "github.com/didi/nightingale/v5/src/server/stat"
"github.com/didi/nightingale/v5/src/server/writer"
)
type vectorsForm struct {
type promqlForm struct {
PromQL string `json:"promql"`
}
func vectorsPost(c *gin.Context) {
var f vectorsForm
func queryPromql(c *gin.Context) {
var f promqlForm
ginx.BindJSON(c, &f)
value, warnings, err := reader.Reader.Client.Query(c.Request.Context(), f.PromQL, time.Now())
@ -31,3 +42,94 @@ func vectorsPost(c *gin.Context) {
c.JSON(200, engine.ConvertVectors(value))
}
func remoteWrite(c *gin.Context) {
req, err := DecodeWriteRequest(c.Request.Body)
if err != nil {
c.String(http.StatusBadRequest, err.Error())
return
}
count := len(req.Timeseries)
if count == 0 {
c.String(200, "")
return
}
var (
now = time.Now().Unix()
ids = make(map[string]interface{})
lst = make([]interface{}, count)
ident string
)
for i := 0; i < count; i++ {
ident = ""
// find ident label
for j := 0; j < len(req.Timeseries[i].Labels); j++ {
if req.Timeseries[i].Labels[j].Name == "ident" {
ident = req.Timeseries[i].Labels[j].Value
}
}
if ident == "" {
// not found, try agent_hostname
for j := 0; j < len(req.Timeseries[i].Labels); j++ {
// agent_hostname for grafana-agent
if req.Timeseries[i].Labels[j].Name == "agent_hostname" {
req.Timeseries[i].Labels[j].Name = "ident"
ident = req.Timeseries[i].Labels[j].Value
}
}
}
if len(ident) > 0 {
// register host
ids[ident] = now
// fill tags
target, has := memsto.TargetCache.Get(ident)
if has {
for key, value := range target.TagsMap {
req.Timeseries[i].Labels = append(req.Timeseries[i].Labels, &prompb.Label{
Name: key,
Value: value,
})
}
}
}
lst[i] = req.Timeseries[i]
}
promstat.CounterSampleTotal.WithLabelValues(config.C.ClusterName, "prometheus").Add(float64(count))
idents.Idents.MSet(ids)
if writer.Writers.PushQueue(lst) {
c.String(200, "")
} else {
c.String(http.StatusInternalServerError, "writer queue full")
}
}
// DecodeWriteRequest from an io.Reader into a prompb.WriteRequest, handling
// snappy decompression.
func DecodeWriteRequest(r io.Reader) (*prompb.WriteRequest, error) {
compressed, err := ioutil.ReadAll(r)
if err != nil {
return nil, err
}
reqBuf, err := snappy.Decode(nil, compressed)
if err != nil {
return nil, err
}
var req prompb.WriteRequest
if err := proto.Unmarshal(reqBuf, &req); err != nil {
return nil, err
}
return &req, nil
}

View File

@ -134,7 +134,7 @@ func (s Server) initialize() (func(), error) {
memsto.Sync()
// start heartbeat
if err = naming.Heartbeat(ctx, config.C.Heartbeat); err != nil {
if err = naming.Heartbeat(ctx); err != nil {
return fns.Ret(), err
}

View File

@ -3,6 +3,7 @@ package writer
import (
"bytes"
"context"
"fmt"
"net"
"net/http"
"time"
@ -44,6 +45,10 @@ type WriterType struct {
}
func (w WriterType) Write(items []*prompb.TimeSeries) {
if len(items) == 0 {
return
}
req := &prompb.WriteRequest{
Timeseries: items,
}
@ -56,6 +61,7 @@ func (w WriterType) Write(items []*prompb.TimeSeries) {
if err := w.Post(snappy.Encode(nil, data)); err != nil {
logger.Warningf("post to %s got error: %v", w.Opts.Url, err)
logger.Warning("example timeseries:", items[0].String())
}
}
@ -82,7 +88,7 @@ func (w WriterType) Post(req []byte) error {
}
if resp.StatusCode >= 400 {
logger.Warningf("push data with remote write request got status code: %v, response body: %s", resp.StatusCode, string(body))
err = fmt.Errorf("push data with remote write request got status code: %v, response body: %s", resp.StatusCode, string(body))
return err
}

View File

@ -77,6 +77,7 @@ type Config struct {
RunMode string
I18N string
AdminRole string
MetricsYamlFile string
ContactKeys []ContactKey
NotifyChannels []string
Log logx.Config

View File

@ -42,6 +42,7 @@ var (
"The business group must retain at least one team": "业务组下要保留至少一个团队",
"At least one team have rw permission": "业务组下至少要有一个具备读写权限的团队",
"duplicate tagkey(%s)": "标签KEY(%s)重复了",
"Failed to create BusiGroup(%s)": "创建业务(%s)组失败",
}
langDict = map[string]map[string]string{
"zh": dict,

View File

@ -231,7 +231,10 @@ func configRoute(r *gin.Engine, version string) {
pages.PUT("/busi-group/:id/task/*url", jwtAuth(), user(), perm("/job-tasks/put"), bgrw(), taskProxy)
}
service := r.Group("/v1/n9e", gin.BasicAuth(config.C.BasicAuth))
service := r.Group("/v1/n9e")
if len(config.C.BasicAuth) > 0 {
service.Use(gin.BasicAuth(config.C.BasicAuth))
}
{
service.Any("/prometheus/*url", prometheusProxy)
service.POST("/users", userAddPost)

View File

@ -37,7 +37,18 @@ func busiGroupAdd(c *gin.Context) {
}
username := c.MustGet("username").(string)
ginx.NewRender(c).Message(models.BusiGroupAdd(f.Name, f.Members, username))
ginx.Dangerous(models.BusiGroupAdd(f.Name, f.Members, username))
// 如果创建成功拿着name去查应该可以查到
newbg, err := models.BusiGroupGet("name=?", f.Name)
ginx.Dangerous(err)
if newbg == nil {
ginx.NewRender(c).Message("Failed to create BusiGroup(%s)", f.Name)
return
}
ginx.NewRender(c).Data(newbg.Id, nil)
}
func busiGroupPut(c *gin.Context) {

View File

@ -12,7 +12,11 @@ import (
)
func metricsDescGetFile(c *gin.Context) {
fp := path.Join(runner.Cwd, "etc", "metrics.yaml")
fp := config.C.MetricsYamlFile
if fp == "" {
fp = path.Join(runner.Cwd, "etc", "metrics.yaml")
}
if !file.IsExist(fp) {
c.String(404, "%s not found", fp)
return