From 5df96d41a7ebb73906d92349db53f21af65d2086 Mon Sep 17 00:00:00 2001
From: 710leo <710leo@gmail.com>
Date: Thu, 19 Mar 2020 12:45:33 +0800
Subject: [PATCH 1/2] refactor: add some metrics

---
 src/modules/collector/stra/cron.go       |  2 +-
 src/modules/collector/sys/funcs/push.go  |  4 +++-
 src/modules/index/cache/endpoints.go     |  6 ++++--
 src/modules/index/cache/indexdb.go       |  3 ++-
 src/modules/judge/backend/redi/funcs.go  |  1 -
 src/modules/judge/backend/redi/redis.go  |  4 ++++
 src/modules/judge/judge/judge.go         |  3 ++-
 src/modules/judge/stra/stra.go           |  2 ++
 src/modules/monapi/cron/checker_judge.go |  2 ++
 src/modules/monapi/redisc/redis.go       |  4 ++++
 src/modules/transfer/backend/query.go    |  3 +++
 src/modules/transfer/backend/sender.go   | 21 ++++++++++++---------
 src/modules/transfer/rpc/push.go         |  5 +++--
 src/modules/tsdb/index/index.go          |  2 +-
 src/modules/tsdb/rpc/push.go             |  1 -
 src/modules/tsdb/rrdtool/sync_disk.go    |  1 +
 16 files changed, 44 insertions(+), 20 deletions(-)

diff --git a/src/modules/collector/stra/cron.go b/src/modules/collector/stra/cron.go
index 0f5d0a9f..a46bddd0 100644
--- a/src/modules/collector/stra/cron.go
+++ b/src/modules/collector/stra/cron.go
@@ -74,7 +74,7 @@ func getCollects() (CollectResp, error) {
 	url := fmt.Sprintf("http://%s%s%s", addr, StraConfig.Api, identity.Identity)
 	err = httplib.Get(url).SetTimeout(time.Duration(StraConfig.Timeout) * time.Millisecond).ToJSON(&res)
 	if err != nil {
-		err = fmt.Errorf("get collects from remote failed, error:%v", err)
+		err = fmt.Errorf("get collects from remote:%s failed, error:%v", url, err)
 	}
 
 	return res, err
diff --git a/src/modules/collector/sys/funcs/push.go b/src/modules/collector/sys/funcs/push.go
index fde83019..bf9e732a 100644
--- a/src/modules/collector/sys/funcs/push.go
+++ b/src/modules/collector/sys/funcs/push.go
@@ -48,7 +48,9 @@ func Push(items []*dataobj.MetricValue) {
 				logger.Error(err)
 				continue
 			} else {
-				logger.Info("push succ, reply: ", reply)
+				if reply.Msg != "ok" {
+					logger.Error("some item push err", reply)
+				}
 				return
 			}
 		}
diff --git a/src/modules/index/cache/endpoints.go b/src/modules/index/cache/endpoints.go
index 29d588a4..aae9e2cb 100644
--- a/src/modules/index/cache/endpoints.go
+++ b/src/modules/index/cache/endpoints.go
@@ -6,6 +6,7 @@ import (
 	"time"
 
 	"github.com/didi/nightingale/src/toolkits/address"
+	"github.com/didi/nightingale/src/toolkits/stats"
 
 	"github.com/toolkits/pkg/concurrent/semaphore"
 	"github.com/toolkits/pkg/logger"
@@ -49,11 +50,12 @@ func reportEndpoint(endpoints []interface{}) {
 			err := httplib.Post(url).JSONBodyQuiet(m).SetTimeout(3*time.Second).Header("x-srv-token", "monapi-builtin-token").ToJSON(&body)
 			if err != nil {
 				logger.Warningf("curl %s fail: %v. retry", url, err)
+				stats.Counter.Set("report.endpoint.err", 1)
 				continue
 			}
-
-			if body.Err != "" {
+			if body.Err != "" { //数据库连接出错会出现此情况
 				logger.Warningf("curl %s fail: %s. retry", url, body.Err)
+				stats.Counter.Set("report.endpoint.err", 1)
 				continue
 			}
 
diff --git a/src/modules/index/cache/indexdb.go b/src/modules/index/cache/indexdb.go
index 6eacd61b..51a28a22 100644
--- a/src/modules/index/cache/indexdb.go
+++ b/src/modules/index/cache/indexdb.go
@@ -19,6 +19,7 @@ import (
 	"github.com/didi/nightingale/src/toolkits/compress"
 	"github.com/didi/nightingale/src/toolkits/identity"
 	"github.com/didi/nightingale/src/toolkits/report"
+	"github.com/didi/nightingale/src/toolkits/stats"
 )
 
 type CacheSection struct {
@@ -72,8 +73,8 @@ func StartPersist(interval int) {
 		err := Persist("normal")
 		if err != nil {
 			logger.Error("Persist err:", err)
+			stats.Counter.Set("persist.err", 1)
 		}
-		//logger.Infof("clean %+v, took %.2f ms\n", cleanRet, float64(time.Since(start).Nanoseconds())*1e-6)
 	}
 }
 
diff --git a/src/modules/judge/backend/redi/funcs.go b/src/modules/judge/backend/redi/funcs.go
index 933f7d68..d671bc83 100644
--- a/src/modules/judge/backend/redi/funcs.go
+++ b/src/modules/judge/backend/redi/funcs.go
@@ -42,6 +42,5 @@ func Push(event *dataobj.Event) error {
 		return nil
 	}
 
-	stats.Counter.Set("redis.failed", 1)
 	return fmt.Errorf("redis publish failed finally:%v", err)
 }
diff --git a/src/modules/judge/backend/redi/redis.go b/src/modules/judge/backend/redi/redis.go
index c1359e79..8b167630 100644
--- a/src/modules/judge/backend/redi/redis.go
+++ b/src/modules/judge/backend/redi/redis.go
@@ -4,6 +4,7 @@ import (
 	"log"
 	"time"
 
+	"github.com/didi/nightingale/src/toolkits/stats"
 	"github.com/garyburd/redigo/redis"
 	"github.com/toolkits/pkg/logger"
 )
@@ -44,6 +45,7 @@ func Init(cfg RedisSection) {
 				c, err := redis.Dial("tcp", addr, redis.DialConnectTimeout(connTimeout), redis.DialReadTimeout(readTimeout), redis.DialWriteTimeout(writeTimeout))
 				if err != nil {
 					logger.Errorf("conn redis err:%v", err)
+					stats.Counter.Set("redis.conn.failed", 1)
 					return nil, err
 				}
 
@@ -51,6 +53,8 @@ func Init(cfg RedisSection) {
 					if _, err := c.Do("AUTH", pass); err != nil {
 						c.Close()
 						logger.Errorf("ERR: redis auth fail:%v", err)
+						stats.Counter.Set("redis.conn.failed", 1)
+
 						return nil, err
 					}
 				}
diff --git a/src/modules/judge/judge/judge.go b/src/modules/judge/judge/judge.go
index 50e566a3..e50c935c 100644
--- a/src/modules/judge/judge/judge.go
+++ b/src/modules/judge/judge/judge.go
@@ -65,7 +65,7 @@ func Judge(stra *model.Stra, exps []model.Exp, historyData []*dataobj.RRDData, f
 	stats.Counter.Set("running", 1)
 
 	if len(exps) < 1 {
-		stats.Counter.Set("stra.err", 1)
+		stats.Counter.Set("stra.illegal", 1)
 		logger.Warningf("stra:%v exp is null", stra)
 		return
 	}
@@ -421,6 +421,7 @@ func sendEvent(event *dataobj.Event) {
 
 	err := redi.Push(event)
 	if err != nil {
+		stats.Counter.Set("redis.push.failed", 1)
 		logger.Errorf("push event:%v err:%v", event, err)
 	}
 }
diff --git a/src/modules/judge/stra/stra.go b/src/modules/judge/stra/stra.go
index 44abeebb..ee930105 100644
--- a/src/modules/judge/stra/stra.go
+++ b/src/modules/judge/stra/stra.go
@@ -56,11 +56,13 @@ func getStrategy(opts StrategySection) {
 
 		if err != nil {
 			logger.Warningf("get strategy from remote failed, error:%v", err)
+			stats.Counter.Set("stra.get.err", 1)
 			continue
 		}
 
 		if resp.Err != "" {
 			logger.Warningf("get strategy from remote failed, error:%v", resp.Err)
+			stats.Counter.Set("stra.get.err", 1)
 			continue
 		}
 
diff --git a/src/modules/monapi/cron/checker_judge.go b/src/modules/monapi/cron/checker_judge.go
index 21483a8f..96db8b31 100644
--- a/src/modules/monapi/cron/checker_judge.go
+++ b/src/modules/monapi/cron/checker_judge.go
@@ -11,6 +11,7 @@ import (
 	"github.com/didi/nightingale/src/model"
 	"github.com/didi/nightingale/src/modules/monapi/config"
 	"github.com/didi/nightingale/src/modules/monapi/scache"
+	"github.com/didi/nightingale/src/toolkits/stats"
 )
 
 func CheckJudgeLoop() {
@@ -19,6 +20,7 @@ func CheckJudgeLoop() {
 		time.Sleep(duration)
 		err := CheckJudge()
 		if err != nil {
+			stats.Counter.Set("get.judge.err", 1)
 			logger.Error("check judge fail: ", err)
 		}
 	}
diff --git a/src/modules/monapi/redisc/redis.go b/src/modules/monapi/redisc/redis.go
index c310c59d..576e3455 100644
--- a/src/modules/monapi/redisc/redis.go
+++ b/src/modules/monapi/redisc/redis.go
@@ -7,6 +7,7 @@ import (
 	"github.com/toolkits/pkg/logger"
 
 	"github.com/didi/nightingale/src/modules/monapi/config"
+	"github.com/didi/nightingale/src/toolkits/stats"
 )
 
 var RedisConnPool *redis.Pool
@@ -29,6 +30,8 @@ func InitRedis() {
 		Dial: func() (redis.Conn, error) {
 			c, err := redis.Dial("tcp", addr, redis.DialConnectTimeout(connTimeout), redis.DialReadTimeout(readTimeout), redis.DialWriteTimeout(writeTimeout))
 			if err != nil {
+				logger.Errorf("conn redis err:%v", err)
+				stats.Counter.Set("redis.conn.failed", 1)
 				return nil, err
 			}
 
@@ -36,6 +39,7 @@ func InitRedis() {
 				if _, err := c.Do("AUTH", pass); err != nil {
 					c.Close()
 					logger.Error("redis auth fail, pass: ", pass)
+					stats.Counter.Set("redis.conn.failed", 1)
 					return nil, err
 				}
 			}
diff --git a/src/modules/transfer/backend/query.go b/src/modules/transfer/backend/query.go
index b028b317..34bb9a3f 100644
--- a/src/modules/transfer/backend/query.go
+++ b/src/modules/transfer/backend/query.go
@@ -12,6 +12,7 @@ import (
 	"github.com/didi/nightingale/src/dataobj"
 	"github.com/didi/nightingale/src/modules/transfer/calc"
 	"github.com/didi/nightingale/src/toolkits/address"
+	"github.com/didi/nightingale/src/toolkits/stats"
 
 	"github.com/toolkits/pkg/logger"
 	"github.com/toolkits/pkg/net/httplib"
@@ -161,10 +162,12 @@ func fetchDataSync(start, end int64, consolFun, endpoint, counter string, step i
 	defer func() {
 		<-worker
 	}()
+	stats.Counter.Set("query.tsdb", 1)
 
 	data, err := fetchData(start, end, consolFun, endpoint, counter, step)
 	if err != nil {
 		logger.Warning(err)
+		stats.Counter.Set("query.data.err", 1)
 	}
 	dataChan <- data
 	return
diff --git a/src/modules/transfer/backend/sender.go b/src/modules/transfer/backend/sender.go
index 4dc1f4e5..f28dde18 100644
--- a/src/modules/transfer/backend/sender.go
+++ b/src/modules/transfer/backend/sender.go
@@ -107,6 +107,7 @@ func Send2TsdbTask(Q *list.SafeListLimited, node string, addr string, concurrent
 
 // 将数据 打入 某个Tsdb的发送缓存队列, 具体是哪一个Tsdb 由一致性哈希 决定
 func Push2TsdbSendQueue(items []*dataobj.MetricValue) {
+	errCnt := 0
 	for _, item := range items {
 		tsdbItem := convert2TsdbItem(item)
 		stats.Counter.Set("tsdb.queue.push", 1)
@@ -118,19 +119,18 @@ func Push2TsdbSendQueue(items []*dataobj.MetricValue) {
 		}
 
 		cnode := Config.ClusterList[node]
-		errCnt := 0
 		for _, addr := range cnode.Addrs {
 			Q := TsdbQueues[node+addr]
 			if !Q.PushFront(tsdbItem) {
 				errCnt += 1
 			}
 		}
+	}
 
-		// statistics
-		if errCnt > 0 {
-			stats.Counter.Set("tsdb.queue.err", errCnt)
-			logger.Error("Push2TsdbSendQueue err num: ", errCnt)
-		}
+	// statistics
+	if errCnt > 0 {
+		stats.Counter.Set("tsdb.queue.err", errCnt)
+		logger.Error("Push2TsdbSendQueue err num: ", errCnt)
 	}
 }
 
@@ -172,7 +172,7 @@ func Send2JudgeTask(Q *list.SafeListLimited, addr string, concurrent int) {
 
 			if !sendOk {
 				stats.Counter.Set("points.out.judge.err", 1)
-				logger.Errorf("send judge %s fail: %v", addr, err)
+				logger.Errorf("send %v to judge %s fail: %v", judgeItems, addr, err)
 			}
 
 		}(addr, judgeItems, count)
@@ -180,6 +180,7 @@ func Send2JudgeTask(Q *list.SafeListLimited, addr string, concurrent int) {
 }
 
 func Push2JudgeSendQueue(items []*dataobj.MetricValue) {
+	errCnt := 0
 	for _, item := range items {
 		key := str.PK(item.Metric, item.Endpoint)
 		stras := cache.StraMap.GetByKey(key)
@@ -203,11 +204,13 @@ func Push2JudgeSendQueue(items []*dataobj.MetricValue) {
 
 			q, exists := JudgeQueues.Get(stra.JudgeInstance)
 			if exists {
-				q.PushFront(judgeItem)
+				if !q.PushFront(judgeItem) {
+					errCnt += 1
+				}
 			}
 		}
-
 	}
+	stats.Counter.Set("judge.queue.err", errCnt)
 }
 
 // 打到Tsdb的数据,要根据rrdtool的特定 来限制 step、counterType、timestamp
diff --git a/src/modules/transfer/rpc/push.go b/src/modules/transfer/rpc/push.go
index 70b10c00..dddd38df 100644
--- a/src/modules/transfer/rpc/push.go
+++ b/src/modules/transfer/rpc/push.go
@@ -27,9 +27,10 @@ func (t *Transfer) Push(args []*dataobj.MetricValue, reply *dataobj.TransferResp
 		err := v.CheckValidity()
 		if err != nil {
 			stats.Counter.Set("points.in.err", 1)
-			logger.Warningf("item is illegal item:%s err:%v", v, err)
+			msg := fmt.Sprintf("item is illegal item:%s err:%v", v, err)
+			logger.Warningf(msg)
 			reply.Invalid += 1
-			reply.Msg += fmt.Sprintf("%v\n", err)
+			reply.Msg += msg
 			continue
 		}
 
diff --git a/src/modules/tsdb/index/index.go b/src/modules/tsdb/index/index.go
index 661fb143..b68cdf1b 100644
--- a/src/modules/tsdb/index/index.go
+++ b/src/modules/tsdb/index/index.go
@@ -47,7 +47,7 @@ func GetIndexLoop() {
 func GetIndex() {
 	instances, err := report.GetAlive("index", Config.HbsMod)
 	if err != nil {
-		stats.Counter.Set("index.get.err", 1)
+		stats.Counter.Set("get.index.err", 1)
 		logger.Warningf("get index list err:%v", err)
 		return
 	}
diff --git a/src/modules/tsdb/rpc/push.go b/src/modules/tsdb/rpc/push.go
index 28051778..9bdc3a4c 100644
--- a/src/modules/tsdb/rpc/push.go
+++ b/src/modules/tsdb/rpc/push.go
@@ -58,7 +58,6 @@ func handleItems(items []*dataobj.TsdbItem) {
 		//todo hash冲突问题需要解决
 		if err := cache.Caches.Push(item.Key, item.Timestamp, item.Value); err != nil {
 			stats.Counter.Set("points.in.err", 1)
-
 			logger.Warningf("push obj error, obj: %v, error: %v\n", items[i], err)
 			fail++
 		}
diff --git a/src/modules/tsdb/rrdtool/sync_disk.go b/src/modules/tsdb/rrdtool/sync_disk.go
index d76fe31c..cd2885e9 100644
--- a/src/modules/tsdb/rrdtool/sync_disk.go
+++ b/src/modules/tsdb/rrdtool/sync_disk.go
@@ -231,6 +231,7 @@ func FlushRRD(flushChunks map[interface{}][]*cache.Chunk) {
 
 				err := FlushFile(seriesID, items)
 				if err != nil {
+					stats.Counter.Set("flush.rrd.err", 1)
 					logger.Errorf("flush %v data to rrd err:%v", seriesID, err)
 					continue
 				}

From 3092b4b65d3a174e9e4cfac1fe9b9e25115a2615 Mon Sep 17 00:00:00 2001
From: 710leo <710leo@gmail.com>
Date: Thu, 19 Mar 2020 17:20:29 +0800
Subject: [PATCH 2/2] fix: recovery alert value is null & refactor stats

---
 src/modules/judge/judge/judge.go  | 8 ++++----
 src/toolkits/identity/identity.go | 3 ++-
 src/toolkits/stats/init.go        | 2 --
 3 files changed, 6 insertions(+), 7 deletions(-)

diff --git a/src/modules/judge/judge/judge.go b/src/modules/judge/judge/judge.go
index e50c935c..68739dcc 100644
--- a/src/modules/judge/judge/judge.go
+++ b/src/modules/judge/judge/judge.go
@@ -110,16 +110,16 @@ func Judge(stra *model.Stra, exps []model.Exp, historyData []*dataobj.RRDData, f
 	}()
 
 	leftValue, isTriggered = judgeItemWithStrategy(stra, historyData, exps[0], firstItem, now)
-	if !isTriggered {
-		return
-	}
-
 	if value == "" {
 		value = fmt.Sprintf("%s: %v", exp.Metric, leftValue)
 	} else {
 		value += fmt.Sprintf("; %s: %v", exp.Metric, leftValue)
 	}
 
+	if !isTriggered {
+		return
+	}
+
 	//与条件情况下执行
 	if len(exps) > 1 {
 		if exps[1].Func == "nodata" { //nodata重新查询索引来进行告警判断
diff --git a/src/toolkits/identity/identity.go b/src/toolkits/identity/identity.go
index d39aeae6..7a8bdb28 100644
--- a/src/toolkits/identity/identity.go
+++ b/src/toolkits/identity/identity.go
@@ -18,11 +18,12 @@ type IdentitySection struct {
 func Init(identity IdentitySection) {
 	if identity.Specify != "" {
 		Identity = identity.Specify
+		return
 	}
 
 	var err error
 	Identity, err = sys.CmdOutTrim("bash", "-c", identity.Shell)
 	if err != nil {
-		log.Fatalln("[F] cannot get hostname")
+		log.Fatalln("[F] cannot get identity")
 	}
 }
diff --git a/src/toolkits/stats/init.go b/src/toolkits/stats/init.go
index 40a09def..3ef9bca6 100644
--- a/src/toolkits/stats/init.go
+++ b/src/toolkits/stats/init.go
@@ -7,7 +7,6 @@ import (
 	"time"
 
 	"github.com/didi/nightingale/src/dataobj"
-	"github.com/didi/nightingale/src/toolkits/identity"
 
 	"github.com/toolkits/pkg/logger"
 )
@@ -42,7 +41,6 @@ func Push() {
 func NewMetricValue(metric string, value int64) *dataobj.MetricValue {
 	item := &dataobj.MetricValue{
 		Metric:       metric,
-		Endpoint:     identity.Identity,
 		Timestamp:    time.Now().Unix(),
 		ValueUntyped: value,
 		CounterType:  "GAUGE",