Merge branch 'master' of https://github.com/didi/nightingale
This commit is contained in:
commit
472ed62c12
|
@ -4,7 +4,7 @@ v3.x终于来了,文档正在编写中,稍安勿躁...
|
|||
|
||||
# 升级说明
|
||||
|
||||
v3.x的版本和v2.x差别巨大,如果短期没办法迁移,可以继续使用 [v.2.8.0](https://github.com/didi/nightingale/tree/v2.8.0) ,我们之所以决定升级到v3.x,具体原因 [请看这里](https://mp.weixin.qq.com/s/BoGcqPiIQIuiK7cM3PTvrw) ,简而言之,我们是希望夜莺逐渐演化为一个运维平台
|
||||
v3.x的版本和v2.x差别巨大,没办法平滑迁移,可以继续使用 [v.2.8.0](https://github.com/didi/nightingale/tree/v2.8.0) ,我们之所以决定升级到v3.x,具体原因 [请看这里](https://mp.weixin.qq.com/s/BoGcqPiIQIuiK7cM3PTvrw) ,简而言之,我们是希望夜莺逐渐演化为一个运维平台。如果v2.x用着也能满足需求,可以继续用v2.x,毕竟,适合自己的才是最好的
|
||||
|
||||
# 新版效果
|
||||
|
||||
|
@ -64,8 +64,8 @@ grep redis -r .
|
|||
|
||||
```shell script
|
||||
cd /home/n9e
|
||||
wget http://116.85.64.82/pub.0927.tar.gz
|
||||
tar zxvf pub.0927.tar.gz
|
||||
wget http://116.85.64.82/pub.20200928.tar.gz
|
||||
tar zxvf pub.20200928.tar.gz
|
||||
```
|
||||
|
||||
6、覆盖nginx.conf,建议大家还是看一下这个配置,熟悉一下nginx配置,夜莺不同web侧组件就是通过nginx的不同location区分的。覆盖完了配置记得reload一下或者重启nginx
|
||||
|
|
|
@ -51,6 +51,11 @@ redis:
|
|||
read: 3000
|
||||
write: 3000
|
||||
|
||||
rabbitmq:
|
||||
enable: false
|
||||
addr: amqp://root:1234@127.0.0.1:5672/
|
||||
queue: test
|
||||
|
||||
sender:
|
||||
mail:
|
||||
# three choice: smtp|shell|api
|
||||
|
|
|
@ -84,11 +84,11 @@ type CludeRecv struct {
|
|||
}
|
||||
|
||||
type XcludeResp struct {
|
||||
Endpoints []string `json:"endpoints"`
|
||||
Metric string `json:"metric"`
|
||||
Tags []string `json:"tags"`
|
||||
Step int `json:"step"`
|
||||
DsType string `json:"dstype"`
|
||||
Endpoint string `json:"endpoint"`
|
||||
Metric string `json:"metric"`
|
||||
Tags []string `json:"tags"`
|
||||
Step int `json:"step"`
|
||||
DsType string `json:"dstype"`
|
||||
}
|
||||
|
||||
type IndexByFullTagsRecv struct {
|
||||
|
|
|
@ -76,8 +76,9 @@ type timeoutSection struct {
|
|||
}
|
||||
|
||||
type rabbitmqSection struct {
|
||||
Addr string `yaml:"addr"`
|
||||
Queue string `yaml:"queue"`
|
||||
Enable bool `yaml:"enable"`
|
||||
Addr string `yaml:"addr"`
|
||||
Queue string `yaml:"queue"`
|
||||
}
|
||||
|
||||
var Config *ConfigT
|
||||
|
|
|
@ -1,9 +1,11 @@
|
|||
package rabbitmq
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"log"
|
||||
|
||||
"github.com/streadway/amqp"
|
||||
"github.com/toolkits/pkg/logger"
|
||||
)
|
||||
|
||||
var (
|
||||
|
@ -19,6 +21,54 @@ func Init(url string) {
|
|||
}
|
||||
}
|
||||
|
||||
// ping 测试rabbitmq连接是否正常
|
||||
func ping() (err error) {
|
||||
if conn == nil {
|
||||
return fmt.Errorf("conn is nil")
|
||||
}
|
||||
|
||||
ch, err := conn.Channel()
|
||||
if err != nil {
|
||||
logger.Error(err)
|
||||
close()
|
||||
return err
|
||||
}
|
||||
|
||||
defer ch.Close()
|
||||
|
||||
err = ch.ExchangeDeclare("ping.ping", "topic", false, true, false, true, nil)
|
||||
if err != nil {
|
||||
logger.Error(err)
|
||||
return err
|
||||
}
|
||||
|
||||
msgContent := "ping.ping"
|
||||
err = ch.Publish("ping.ping", "ping.ping", false, false, amqp.Publishing{
|
||||
ContentType: "text/plain",
|
||||
Body: []byte(msgContent),
|
||||
})
|
||||
if err != nil {
|
||||
logger.Error(err)
|
||||
return err
|
||||
}
|
||||
err = ch.ExchangeDelete("ping.ping", false, false)
|
||||
if err != nil {
|
||||
logger.Error(err)
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
func close() {
|
||||
if conn != nil {
|
||||
err := conn.Close()
|
||||
if err != nil {
|
||||
logger.Error(err)
|
||||
}
|
||||
conn = nil
|
||||
}
|
||||
}
|
||||
|
||||
func Shutdown() {
|
||||
conn.Close()
|
||||
exit <- true
|
||||
|
|
|
@ -3,26 +3,42 @@ package rabbitmq
|
|||
import (
|
||||
"time"
|
||||
|
||||
"github.com/streadway/amqp"
|
||||
"github.com/toolkits/pkg/logger"
|
||||
)
|
||||
|
||||
func Consume(queueName string) {
|
||||
go func(queueName string) {
|
||||
for {
|
||||
// Consume 消费消息
|
||||
func Consume(url, queueName string) {
|
||||
for {
|
||||
select {
|
||||
case <-exit:
|
||||
return
|
||||
default:
|
||||
if err := ping(); err != nil {
|
||||
logger.Error("rabbitmq conn failed")
|
||||
conn, err = amqp.Dial(url)
|
||||
if err != nil {
|
||||
conn = nil
|
||||
logger.Error(err)
|
||||
time.Sleep(500 * time.Millisecond)
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
sleep := consume(queueName)
|
||||
if sleep {
|
||||
time.Sleep(300 * time.Millisecond)
|
||||
}
|
||||
|
||||
if _, ok := <-exit; ok {
|
||||
return
|
||||
}
|
||||
}
|
||||
}(queueName)
|
||||
}
|
||||
}
|
||||
|
||||
// 如果操作MQ出现问题,或者没有load到数据,就sleep一下
|
||||
func consume(queueName string) bool {
|
||||
if conn == nil {
|
||||
return true
|
||||
}
|
||||
|
||||
ch, err := conn.Channel()
|
||||
if err != nil {
|
||||
logger.Error(err)
|
||||
|
@ -45,7 +61,7 @@ func consume(queueName string) bool {
|
|||
}
|
||||
|
||||
err = ch.Qos(
|
||||
0, // prefetch count
|
||||
80, // prefetch count
|
||||
0, // prefetch size
|
||||
false, // global
|
||||
)
|
||||
|
|
|
@ -68,9 +68,9 @@ func main() {
|
|||
cron.InitWorker()
|
||||
|
||||
// 初始化 rabbitmq 处理部分异步逻辑
|
||||
if len(config.Config.RabbitMQ.Addr) > 0 {
|
||||
if config.Config.RabbitMQ.Enable {
|
||||
rabbitmq.Init(config.Config.RabbitMQ.Addr)
|
||||
rabbitmq.Consume(config.Config.RabbitMQ.Queue)
|
||||
go rabbitmq.Consume(config.Config.RabbitMQ.Addr, config.Config.RabbitMQ.Queue)
|
||||
}
|
||||
|
||||
go cron.ConsumeMail()
|
||||
|
@ -102,7 +102,7 @@ func endingProc() {
|
|||
http.Shutdown()
|
||||
redisc.CloseRedis()
|
||||
|
||||
if len(config.Config.RabbitMQ.Addr) > 0 {
|
||||
if config.Config.RabbitMQ.Enable {
|
||||
rabbitmq.Shutdown()
|
||||
}
|
||||
|
||||
|
|
|
@ -36,43 +36,51 @@ func (query *ShowSeries) renderEndpoints() {
|
|||
}
|
||||
endpointPart = endpointPart[:len(endpointPart)-len("OR")]
|
||||
endpointPart += ")"
|
||||
query.RawQuery = fmt.Sprintf("\"%s\" WHERE \"%s\"", query.RawQuery, endpointPart)
|
||||
query.RawQuery = fmt.Sprintf("%s WHERE %s", query.RawQuery, endpointPart)
|
||||
}
|
||||
}
|
||||
|
||||
func (query *ShowSeries) renderInclude() {
|
||||
if len(query.Include) > 0 {
|
||||
// include
|
||||
if len(query.Include) == 1 && query.Include[0] == nil {
|
||||
return
|
||||
}
|
||||
includePart := "("
|
||||
for _, include := range query.Include {
|
||||
for _, value := range include.Values {
|
||||
includePart += fmt.Sprintf(" \"%s\"='%s' OR", include.Key, value)
|
||||
}
|
||||
}
|
||||
includePart = includePart[:len(includePart)-len("OR")]
|
||||
includePart = includePart[:len(includePart)-len("AND")]
|
||||
includePart += ")"
|
||||
if !strings.Contains(query.RawQuery, "WHERE") {
|
||||
query.RawQuery += " WHERE"
|
||||
query.RawQuery = fmt.Sprintf(" %s WHERE %s", query.RawQuery, includePart)
|
||||
} else {
|
||||
query.RawQuery = fmt.Sprintf(" %s AND %s", query.RawQuery, includePart)
|
||||
}
|
||||
query.RawQuery = fmt.Sprintf(" %s AND %s", query.RawQuery, includePart)
|
||||
}
|
||||
}
|
||||
|
||||
func (query *ShowSeries) renderExclude() {
|
||||
if len(query.Exclude) > 0 {
|
||||
// exclude
|
||||
if len(query.Exclude) == 1 && query.Exclude[0] == nil {
|
||||
return
|
||||
}
|
||||
excludePart := "("
|
||||
for _, exclude := range query.Exclude {
|
||||
for _, value := range exclude.Values {
|
||||
excludePart += fmt.Sprintf(" \"%s\"='%s' OR", exclude.Key, value)
|
||||
excludePart += fmt.Sprintf(" \"%s\"!='%s' AND", exclude.Key, value)
|
||||
}
|
||||
}
|
||||
excludePart = excludePart[:len(excludePart)-len("OR")]
|
||||
excludePart = excludePart[:len(excludePart)-len("AND")]
|
||||
excludePart += ")"
|
||||
if !strings.Contains(query.RawQuery, "WHERE") {
|
||||
query.RawQuery += " WHERE"
|
||||
query.RawQuery = fmt.Sprintf(" %s WHERE %s", query.RawQuery, excludePart)
|
||||
} else {
|
||||
query.RawQuery = fmt.Sprintf(" %s AND %s", query.RawQuery, excludePart)
|
||||
}
|
||||
query.RawQuery = fmt.Sprintf(" %s AND %s", query.RawQuery, excludePart)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -25,15 +25,34 @@ func (influxdb *InfluxdbDataSource) QueryData(inputs []dataobj.QueryData) []*dat
|
|||
return nil
|
||||
}
|
||||
|
||||
respMap := make(map[string]*dataobj.TsdbQueryResponse)
|
||||
queryResponse := make([]*dataobj.TsdbQueryResponse, 0)
|
||||
for _, input := range inputs {
|
||||
for _, counter := range input.Counters {
|
||||
items := strings.Split(counter, "/")
|
||||
items := strings.SplitN(counter, "/", 2)
|
||||
metric := items[0]
|
||||
var tags = make([]string, 0)
|
||||
if len(items) > 1 && len(items[1]) > 0 {
|
||||
tags := make([]string, 0)
|
||||
if len(items) > 1 {
|
||||
tags = strings.Split(items[1], ",")
|
||||
tagMap := dataobj.DictedTagstring(items[1])
|
||||
if counter, err = dataobj.GetCounter(metric, "", tagMap); err != nil {
|
||||
logger.Warningf("get counter error: %+v", err)
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
for _, endpoint := range input.Endpoints {
|
||||
key := fmt.Sprintf("%s%s", endpoint, counter)
|
||||
respMap[key] = &dataobj.TsdbQueryResponse{
|
||||
Start: input.Start,
|
||||
End: input.End,
|
||||
Endpoint: endpoint,
|
||||
Counter: counter,
|
||||
DsType: input.DsType,
|
||||
Step: input.Step,
|
||||
}
|
||||
}
|
||||
|
||||
influxdbQuery := QueryData{
|
||||
Start: input.Start,
|
||||
End: input.End,
|
||||
|
@ -42,11 +61,13 @@ func (influxdb *InfluxdbDataSource) QueryData(inputs []dataobj.QueryData) []*dat
|
|||
Tags: tags,
|
||||
Step: input.Step,
|
||||
DsType: input.DsType,
|
||||
GroupKey: []string{"*"},
|
||||
}
|
||||
influxdbQuery.renderSelect()
|
||||
influxdbQuery.renderEndpoints()
|
||||
influxdbQuery.renderTags()
|
||||
influxdbQuery.renderTimeRange()
|
||||
influxdbQuery.renderGroupBy()
|
||||
logger.Debugf("query influxql %s", influxdbQuery.RawQuery)
|
||||
|
||||
query := client.NewQuery(influxdbQuery.RawQuery, c.Database, c.Precision)
|
||||
|
@ -54,31 +75,34 @@ func (influxdb *InfluxdbDataSource) QueryData(inputs []dataobj.QueryData) []*dat
|
|||
for _, result := range response.Results {
|
||||
for _, series := range result.Series {
|
||||
|
||||
// fixme : influx client get series.Tags is nil
|
||||
endpoint := series.Tags["endpoint"]
|
||||
delete(series.Tags, endpoint)
|
||||
counter, err := dataobj.GetCounter(series.Name, "", series.Tags)
|
||||
delete(series.Tags, "endpoint")
|
||||
|
||||
influxCounter, err := dataobj.GetCounter(series.Name, "", series.Tags)
|
||||
if err != nil {
|
||||
logger.Warningf("get counter error: %+v", err)
|
||||
continue
|
||||
}
|
||||
values := convertValues(series)
|
||||
|
||||
resp := &dataobj.TsdbQueryResponse{
|
||||
Start: influxdbQuery.Start,
|
||||
End: influxdbQuery.End,
|
||||
Endpoint: endpoint,
|
||||
Counter: counter,
|
||||
DsType: influxdbQuery.DsType,
|
||||
Step: influxdbQuery.Step,
|
||||
Values: values,
|
||||
key := fmt.Sprintf("%s%s", endpoint, influxCounter)
|
||||
if _, exists := respMap[key]; exists {
|
||||
respMap[key].Values = convertValues(series)
|
||||
}
|
||||
queryResponse = append(queryResponse, resp)
|
||||
|
||||
}
|
||||
}
|
||||
} else {
|
||||
if err != nil {
|
||||
logger.Warningf("query data point on influxdb error %v.", err)
|
||||
} else if response.Error() != nil {
|
||||
logger.Warningf("query data point on influxdb, resp error: %v.", response.Error())
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
for _, resp := range respMap {
|
||||
queryResponse = append(queryResponse, resp)
|
||||
}
|
||||
return queryResponse
|
||||
}
|
||||
|
||||
|
@ -174,6 +198,12 @@ func (influxdb *InfluxdbDataSource) QueryMetrics(recv dataobj.EndpointsRecv) *da
|
|||
}
|
||||
}
|
||||
return resp
|
||||
} else {
|
||||
if err != nil {
|
||||
logger.Warningf("query metrics on influxdb error %v.", err)
|
||||
} else if response.Error() != nil {
|
||||
logger.Warningf("query metrics on influxdb, resp error: %v.", response.Error())
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
@ -198,7 +228,7 @@ func (influxdb *InfluxdbDataSource) QueryTagPairs(recv dataobj.EndpointMetricRec
|
|||
Tagkv: make([]*dataobj.TagPair, 0),
|
||||
}
|
||||
// show tag keys
|
||||
keys := showTagKeys(c, metric, influxdb.Section.Database)
|
||||
keys := showTagKeys(c, metric, influxdb.Section.Database, recv.Endpoints)
|
||||
if len(keys) > 0 {
|
||||
// show tag values
|
||||
tagkvResp.Tagkv = showTagValues(c, keys, metric, influxdb.Section.Database)
|
||||
|
@ -211,9 +241,17 @@ func (influxdb *InfluxdbDataSource) QueryTagPairs(recv dataobj.EndpointMetricRec
|
|||
|
||||
// show tag keys on n9e from metric where ...
|
||||
// (exclude default endpoint tag)
|
||||
func showTagKeys(c *InfluxClient, metric, database string) []string {
|
||||
func showTagKeys(c *InfluxClient, metric, database string, endpoints []string) []string {
|
||||
keys := make([]string, 0)
|
||||
influxql := fmt.Sprintf("SHOW TAG KEYS ON \"%s\" FROM \"%s\"", database, metric)
|
||||
if len(endpoints) > 0 {
|
||||
endpointPart := ""
|
||||
for _, endpoint := range endpoints {
|
||||
endpointPart += fmt.Sprintf(" \"endpoint\"='%s' OR", endpoint)
|
||||
}
|
||||
endpointPart = endpointPart[:len(endpointPart)-len("OR")]
|
||||
influxql = fmt.Sprintf("%s WHERE %s", influxql, endpointPart)
|
||||
}
|
||||
query := client.NewQuery(influxql, c.Database, c.Precision)
|
||||
if response, err := c.Client.Query(query); err == nil && response.Error() == nil {
|
||||
for _, result := range response.Results {
|
||||
|
@ -227,6 +265,12 @@ func showTagKeys(c *InfluxClient, metric, database string) []string {
|
|||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
if err != nil {
|
||||
logger.Warningf("query tag keys on influxdb error %v.", err)
|
||||
} else if response.Error() != nil {
|
||||
logger.Warningf("query tag keys on influxdb, resp error: %v.", response.Error())
|
||||
}
|
||||
}
|
||||
return keys
|
||||
}
|
||||
|
@ -258,6 +302,12 @@ func showTagValues(c *InfluxClient, keys []string, metric, database string) []*d
|
|||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
if err != nil {
|
||||
logger.Warningf("query tag values on influxdb error %v.", err)
|
||||
} else if response.Error() != nil {
|
||||
logger.Warningf("query tag values on influxdb, resp error: %v.", response.Error())
|
||||
}
|
||||
}
|
||||
return tagkv
|
||||
}
|
||||
|
@ -275,19 +325,23 @@ func (influxdb *InfluxdbDataSource) QueryIndexByClude(recvs []dataobj.CludeRecv)
|
|||
}
|
||||
resp := make([]dataobj.XcludeResp, 0)
|
||||
for _, recv := range recvs {
|
||||
xcludeResp := dataobj.XcludeResp{
|
||||
Endpoints: recv.Endpoints,
|
||||
Metric: recv.Metric,
|
||||
Tags: make([]string, 0),
|
||||
Step: -1, // fixme
|
||||
DsType: "GAUGE",
|
||||
}
|
||||
|
||||
if len(recv.Endpoints) == 0 {
|
||||
resp = append(resp, xcludeResp)
|
||||
continue
|
||||
}
|
||||
|
||||
xcludeRespMap := make(map[string]*dataobj.XcludeResp)
|
||||
for _, endpoint := range recv.Endpoints {
|
||||
key := fmt.Sprintf("endpoint=%s", endpoint)
|
||||
xcludeRespMap[key] = &dataobj.XcludeResp{
|
||||
Endpoint: endpoint,
|
||||
Metric: recv.Metric,
|
||||
Tags: make([]string, 0),
|
||||
Step: 10,
|
||||
DsType: "GAUGE",
|
||||
}
|
||||
}
|
||||
|
||||
showSeries := ShowSeries{
|
||||
Database: influxdb.Section.Database,
|
||||
Metric: recv.Metric,
|
||||
|
@ -307,7 +361,7 @@ func (influxdb *InfluxdbDataSource) QueryIndexByClude(recvs []dataobj.CludeRecv)
|
|||
for _, result := range response.Results {
|
||||
for _, series := range result.Series {
|
||||
for _, valuePair := range series.Values {
|
||||
|
||||
var curItem string
|
||||
// proc.port.listen,endpoint=localhost,port=22,service=sshd
|
||||
tagKey := valuePair[0].(string)
|
||||
|
||||
|
@ -315,29 +369,45 @@ func (influxdb *InfluxdbDataSource) QueryIndexByClude(recvs []dataobj.CludeRecv)
|
|||
items := strings.Split(tagKey, ",")
|
||||
newItems := make([]string, 0)
|
||||
for _, item := range items {
|
||||
if item != recv.Metric && !strings.Contains(item, "endpoint") {
|
||||
if strings.HasPrefix(item, "endpoint=") {
|
||||
curItem = item
|
||||
continue
|
||||
}
|
||||
if item != recv.Metric {
|
||||
newItems = append(newItems, item)
|
||||
}
|
||||
}
|
||||
|
||||
if curItem == "" {
|
||||
continue
|
||||
}
|
||||
|
||||
if len(newItems) > 0 {
|
||||
if tags, err := dataobj.SplitTagsString(strings.Join(newItems, ",")); err == nil {
|
||||
xcludeResp.Tags = append(xcludeResp.Tags, dataobj.SortedTags(tags))
|
||||
xcludeRespMap[curItem].Tags = append(xcludeRespMap[curItem].Tags, dataobj.SortedTags(tags))
|
||||
xcludeRespMap[curItem].Step = 10
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
if err != nil {
|
||||
logger.Warningf("query index by clude on influxdb error: %v.", err)
|
||||
} else if response.Error() != nil {
|
||||
logger.Warningf("query index by clude on influxdb, resp error: %v.", response.Error())
|
||||
}
|
||||
}
|
||||
for _, xcludeResp := range xcludeRespMap {
|
||||
resp = append(resp, *xcludeResp)
|
||||
}
|
||||
resp = append(resp, xcludeResp)
|
||||
}
|
||||
|
||||
return resp
|
||||
}
|
||||
|
||||
// show series from metric where ...
|
||||
func (influxdb *InfluxdbDataSource) QueryIndexByFullTags(recvs []dataobj.IndexByFullTagsRecv) []dataobj.
|
||||
IndexByFullTagsResp {
|
||||
func (influxdb *InfluxdbDataSource) QueryIndexByFullTags(recvs []dataobj.IndexByFullTagsRecv) []dataobj.IndexByFullTagsResp {
|
||||
logger.Debugf("query IndexByFullTags , recv: %+v", recvs)
|
||||
|
||||
c, err := NewInfluxdbClient(influxdb.Section)
|
||||
|
@ -354,7 +424,7 @@ func (influxdb *InfluxdbDataSource) QueryIndexByFullTags(recvs []dataobj.IndexBy
|
|||
Endpoints: recv.Endpoints,
|
||||
Metric: recv.Metric,
|
||||
Tags: make([]string, 0),
|
||||
Step: -1, // FIXME
|
||||
Step: 10,
|
||||
DsType: "GAUGE",
|
||||
}
|
||||
|
||||
|
@ -403,6 +473,12 @@ func (influxdb *InfluxdbDataSource) QueryIndexByFullTags(recvs []dataobj.IndexBy
|
|||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
if err != nil {
|
||||
logger.Warningf("query index by full tags on influxdb error %v.", err)
|
||||
} else if response.Error() != nil {
|
||||
logger.Warningf("query index by full tags on influxdb error %v.", response.Error())
|
||||
}
|
||||
}
|
||||
resp = append(resp, fullTagResp)
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue