From 5df96d41a7ebb73906d92349db53f21af65d2086 Mon Sep 17 00:00:00 2001 From: 710leo <710leo@gmail.com> Date: Thu, 19 Mar 2020 12:45:33 +0800 Subject: [PATCH] refactor: add some metrics --- src/modules/collector/stra/cron.go | 2 +- src/modules/collector/sys/funcs/push.go | 4 +++- src/modules/index/cache/endpoints.go | 6 ++++-- src/modules/index/cache/indexdb.go | 3 ++- src/modules/judge/backend/redi/funcs.go | 1 - src/modules/judge/backend/redi/redis.go | 4 ++++ src/modules/judge/judge/judge.go | 3 ++- src/modules/judge/stra/stra.go | 2 ++ src/modules/monapi/cron/checker_judge.go | 2 ++ src/modules/monapi/redisc/redis.go | 4 ++++ src/modules/transfer/backend/query.go | 3 +++ src/modules/transfer/backend/sender.go | 21 ++++++++++++--------- src/modules/transfer/rpc/push.go | 5 +++-- src/modules/tsdb/index/index.go | 2 +- src/modules/tsdb/rpc/push.go | 1 - src/modules/tsdb/rrdtool/sync_disk.go | 1 + 16 files changed, 44 insertions(+), 20 deletions(-) diff --git a/src/modules/collector/stra/cron.go b/src/modules/collector/stra/cron.go index 0f5d0a9f..a46bddd0 100644 --- a/src/modules/collector/stra/cron.go +++ b/src/modules/collector/stra/cron.go @@ -74,7 +74,7 @@ func getCollects() (CollectResp, error) { url := fmt.Sprintf("http://%s%s%s", addr, StraConfig.Api, identity.Identity) err = httplib.Get(url).SetTimeout(time.Duration(StraConfig.Timeout) * time.Millisecond).ToJSON(&res) if err != nil { - err = fmt.Errorf("get collects from remote failed, error:%v", err) + err = fmt.Errorf("get collects from remote:%s failed, error:%v", url, err) } return res, err diff --git a/src/modules/collector/sys/funcs/push.go b/src/modules/collector/sys/funcs/push.go index fde83019..bf9e732a 100644 --- a/src/modules/collector/sys/funcs/push.go +++ b/src/modules/collector/sys/funcs/push.go @@ -48,7 +48,9 @@ func Push(items []*dataobj.MetricValue) { logger.Error(err) continue } else { - logger.Info("push succ, reply: ", reply) + if reply.Msg != "ok" { + logger.Error("some item push err", reply) + } return } } diff --git a/src/modules/index/cache/endpoints.go b/src/modules/index/cache/endpoints.go index 29d588a4..aae9e2cb 100644 --- a/src/modules/index/cache/endpoints.go +++ b/src/modules/index/cache/endpoints.go @@ -6,6 +6,7 @@ import ( "time" "github.com/didi/nightingale/src/toolkits/address" + "github.com/didi/nightingale/src/toolkits/stats" "github.com/toolkits/pkg/concurrent/semaphore" "github.com/toolkits/pkg/logger" @@ -49,11 +50,12 @@ func reportEndpoint(endpoints []interface{}) { err := httplib.Post(url).JSONBodyQuiet(m).SetTimeout(3*time.Second).Header("x-srv-token", "monapi-builtin-token").ToJSON(&body) if err != nil { logger.Warningf("curl %s fail: %v. retry", url, err) + stats.Counter.Set("report.endpoint.err", 1) continue } - - if body.Err != "" { + if body.Err != "" { //数据库连接出错会出现此情况 logger.Warningf("curl %s fail: %s. retry", url, body.Err) + stats.Counter.Set("report.endpoint.err", 1) continue } diff --git a/src/modules/index/cache/indexdb.go b/src/modules/index/cache/indexdb.go index 6eacd61b..51a28a22 100644 --- a/src/modules/index/cache/indexdb.go +++ b/src/modules/index/cache/indexdb.go @@ -19,6 +19,7 @@ import ( "github.com/didi/nightingale/src/toolkits/compress" "github.com/didi/nightingale/src/toolkits/identity" "github.com/didi/nightingale/src/toolkits/report" + "github.com/didi/nightingale/src/toolkits/stats" ) type CacheSection struct { @@ -72,8 +73,8 @@ func StartPersist(interval int) { err := Persist("normal") if err != nil { logger.Error("Persist err:", err) + stats.Counter.Set("persist.err", 1) } - //logger.Infof("clean %+v, took %.2f ms\n", cleanRet, float64(time.Since(start).Nanoseconds())*1e-6) } } diff --git a/src/modules/judge/backend/redi/funcs.go b/src/modules/judge/backend/redi/funcs.go index 933f7d68..d671bc83 100644 --- a/src/modules/judge/backend/redi/funcs.go +++ b/src/modules/judge/backend/redi/funcs.go @@ -42,6 +42,5 @@ func Push(event *dataobj.Event) error { return nil } - stats.Counter.Set("redis.failed", 1) return fmt.Errorf("redis publish failed finally:%v", err) } diff --git a/src/modules/judge/backend/redi/redis.go b/src/modules/judge/backend/redi/redis.go index c1359e79..8b167630 100644 --- a/src/modules/judge/backend/redi/redis.go +++ b/src/modules/judge/backend/redi/redis.go @@ -4,6 +4,7 @@ import ( "log" "time" + "github.com/didi/nightingale/src/toolkits/stats" "github.com/garyburd/redigo/redis" "github.com/toolkits/pkg/logger" ) @@ -44,6 +45,7 @@ func Init(cfg RedisSection) { c, err := redis.Dial("tcp", addr, redis.DialConnectTimeout(connTimeout), redis.DialReadTimeout(readTimeout), redis.DialWriteTimeout(writeTimeout)) if err != nil { logger.Errorf("conn redis err:%v", err) + stats.Counter.Set("redis.conn.failed", 1) return nil, err } @@ -51,6 +53,8 @@ func Init(cfg RedisSection) { if _, err := c.Do("AUTH", pass); err != nil { c.Close() logger.Errorf("ERR: redis auth fail:%v", err) + stats.Counter.Set("redis.conn.failed", 1) + return nil, err } } diff --git a/src/modules/judge/judge/judge.go b/src/modules/judge/judge/judge.go index 50e566a3..e50c935c 100644 --- a/src/modules/judge/judge/judge.go +++ b/src/modules/judge/judge/judge.go @@ -65,7 +65,7 @@ func Judge(stra *model.Stra, exps []model.Exp, historyData []*dataobj.RRDData, f stats.Counter.Set("running", 1) if len(exps) < 1 { - stats.Counter.Set("stra.err", 1) + stats.Counter.Set("stra.illegal", 1) logger.Warningf("stra:%v exp is null", stra) return } @@ -421,6 +421,7 @@ func sendEvent(event *dataobj.Event) { err := redi.Push(event) if err != nil { + stats.Counter.Set("redis.push.failed", 1) logger.Errorf("push event:%v err:%v", event, err) } } diff --git a/src/modules/judge/stra/stra.go b/src/modules/judge/stra/stra.go index 44abeebb..ee930105 100644 --- a/src/modules/judge/stra/stra.go +++ b/src/modules/judge/stra/stra.go @@ -56,11 +56,13 @@ func getStrategy(opts StrategySection) { if err != nil { logger.Warningf("get strategy from remote failed, error:%v", err) + stats.Counter.Set("stra.get.err", 1) continue } if resp.Err != "" { logger.Warningf("get strategy from remote failed, error:%v", resp.Err) + stats.Counter.Set("stra.get.err", 1) continue } diff --git a/src/modules/monapi/cron/checker_judge.go b/src/modules/monapi/cron/checker_judge.go index 21483a8f..96db8b31 100644 --- a/src/modules/monapi/cron/checker_judge.go +++ b/src/modules/monapi/cron/checker_judge.go @@ -11,6 +11,7 @@ import ( "github.com/didi/nightingale/src/model" "github.com/didi/nightingale/src/modules/monapi/config" "github.com/didi/nightingale/src/modules/monapi/scache" + "github.com/didi/nightingale/src/toolkits/stats" ) func CheckJudgeLoop() { @@ -19,6 +20,7 @@ func CheckJudgeLoop() { time.Sleep(duration) err := CheckJudge() if err != nil { + stats.Counter.Set("get.judge.err", 1) logger.Error("check judge fail: ", err) } } diff --git a/src/modules/monapi/redisc/redis.go b/src/modules/monapi/redisc/redis.go index c310c59d..576e3455 100644 --- a/src/modules/monapi/redisc/redis.go +++ b/src/modules/monapi/redisc/redis.go @@ -7,6 +7,7 @@ import ( "github.com/toolkits/pkg/logger" "github.com/didi/nightingale/src/modules/monapi/config" + "github.com/didi/nightingale/src/toolkits/stats" ) var RedisConnPool *redis.Pool @@ -29,6 +30,8 @@ func InitRedis() { Dial: func() (redis.Conn, error) { c, err := redis.Dial("tcp", addr, redis.DialConnectTimeout(connTimeout), redis.DialReadTimeout(readTimeout), redis.DialWriteTimeout(writeTimeout)) if err != nil { + logger.Errorf("conn redis err:%v", err) + stats.Counter.Set("redis.conn.failed", 1) return nil, err } @@ -36,6 +39,7 @@ func InitRedis() { if _, err := c.Do("AUTH", pass); err != nil { c.Close() logger.Error("redis auth fail, pass: ", pass) + stats.Counter.Set("redis.conn.failed", 1) return nil, err } } diff --git a/src/modules/transfer/backend/query.go b/src/modules/transfer/backend/query.go index b028b317..34bb9a3f 100644 --- a/src/modules/transfer/backend/query.go +++ b/src/modules/transfer/backend/query.go @@ -12,6 +12,7 @@ import ( "github.com/didi/nightingale/src/dataobj" "github.com/didi/nightingale/src/modules/transfer/calc" "github.com/didi/nightingale/src/toolkits/address" + "github.com/didi/nightingale/src/toolkits/stats" "github.com/toolkits/pkg/logger" "github.com/toolkits/pkg/net/httplib" @@ -161,10 +162,12 @@ func fetchDataSync(start, end int64, consolFun, endpoint, counter string, step i defer func() { <-worker }() + stats.Counter.Set("query.tsdb", 1) data, err := fetchData(start, end, consolFun, endpoint, counter, step) if err != nil { logger.Warning(err) + stats.Counter.Set("query.data.err", 1) } dataChan <- data return diff --git a/src/modules/transfer/backend/sender.go b/src/modules/transfer/backend/sender.go index 4dc1f4e5..f28dde18 100644 --- a/src/modules/transfer/backend/sender.go +++ b/src/modules/transfer/backend/sender.go @@ -107,6 +107,7 @@ func Send2TsdbTask(Q *list.SafeListLimited, node string, addr string, concurrent // 将数据 打入 某个Tsdb的发送缓存队列, 具体是哪一个Tsdb 由一致性哈希 决定 func Push2TsdbSendQueue(items []*dataobj.MetricValue) { + errCnt := 0 for _, item := range items { tsdbItem := convert2TsdbItem(item) stats.Counter.Set("tsdb.queue.push", 1) @@ -118,19 +119,18 @@ func Push2TsdbSendQueue(items []*dataobj.MetricValue) { } cnode := Config.ClusterList[node] - errCnt := 0 for _, addr := range cnode.Addrs { Q := TsdbQueues[node+addr] if !Q.PushFront(tsdbItem) { errCnt += 1 } } + } - // statistics - if errCnt > 0 { - stats.Counter.Set("tsdb.queue.err", errCnt) - logger.Error("Push2TsdbSendQueue err num: ", errCnt) - } + // statistics + if errCnt > 0 { + stats.Counter.Set("tsdb.queue.err", errCnt) + logger.Error("Push2TsdbSendQueue err num: ", errCnt) } } @@ -172,7 +172,7 @@ func Send2JudgeTask(Q *list.SafeListLimited, addr string, concurrent int) { if !sendOk { stats.Counter.Set("points.out.judge.err", 1) - logger.Errorf("send judge %s fail: %v", addr, err) + logger.Errorf("send %v to judge %s fail: %v", judgeItems, addr, err) } }(addr, judgeItems, count) @@ -180,6 +180,7 @@ func Send2JudgeTask(Q *list.SafeListLimited, addr string, concurrent int) { } func Push2JudgeSendQueue(items []*dataobj.MetricValue) { + errCnt := 0 for _, item := range items { key := str.PK(item.Metric, item.Endpoint) stras := cache.StraMap.GetByKey(key) @@ -203,11 +204,13 @@ func Push2JudgeSendQueue(items []*dataobj.MetricValue) { q, exists := JudgeQueues.Get(stra.JudgeInstance) if exists { - q.PushFront(judgeItem) + if !q.PushFront(judgeItem) { + errCnt += 1 + } } } - } + stats.Counter.Set("judge.queue.err", errCnt) } // 打到Tsdb的数据,要根据rrdtool的特定 来限制 step、counterType、timestamp diff --git a/src/modules/transfer/rpc/push.go b/src/modules/transfer/rpc/push.go index 70b10c00..dddd38df 100644 --- a/src/modules/transfer/rpc/push.go +++ b/src/modules/transfer/rpc/push.go @@ -27,9 +27,10 @@ func (t *Transfer) Push(args []*dataobj.MetricValue, reply *dataobj.TransferResp err := v.CheckValidity() if err != nil { stats.Counter.Set("points.in.err", 1) - logger.Warningf("item is illegal item:%s err:%v", v, err) + msg := fmt.Sprintf("item is illegal item:%s err:%v", v, err) + logger.Warningf(msg) reply.Invalid += 1 - reply.Msg += fmt.Sprintf("%v\n", err) + reply.Msg += msg continue } diff --git a/src/modules/tsdb/index/index.go b/src/modules/tsdb/index/index.go index 661fb143..b68cdf1b 100644 --- a/src/modules/tsdb/index/index.go +++ b/src/modules/tsdb/index/index.go @@ -47,7 +47,7 @@ func GetIndexLoop() { func GetIndex() { instances, err := report.GetAlive("index", Config.HbsMod) if err != nil { - stats.Counter.Set("index.get.err", 1) + stats.Counter.Set("get.index.err", 1) logger.Warningf("get index list err:%v", err) return } diff --git a/src/modules/tsdb/rpc/push.go b/src/modules/tsdb/rpc/push.go index 28051778..9bdc3a4c 100644 --- a/src/modules/tsdb/rpc/push.go +++ b/src/modules/tsdb/rpc/push.go @@ -58,7 +58,6 @@ func handleItems(items []*dataobj.TsdbItem) { //todo hash冲突问题需要解决 if err := cache.Caches.Push(item.Key, item.Timestamp, item.Value); err != nil { stats.Counter.Set("points.in.err", 1) - logger.Warningf("push obj error, obj: %v, error: %v\n", items[i], err) fail++ } diff --git a/src/modules/tsdb/rrdtool/sync_disk.go b/src/modules/tsdb/rrdtool/sync_disk.go index d76fe31c..cd2885e9 100644 --- a/src/modules/tsdb/rrdtool/sync_disk.go +++ b/src/modules/tsdb/rrdtool/sync_disk.go @@ -231,6 +231,7 @@ func FlushRRD(flushChunks map[interface{}][]*cache.Chunk) { err := FlushFile(seriesID, items) if err != nil { + stats.Counter.Set("flush.rrd.err", 1) logger.Errorf("flush %v data to rrd err:%v", seriesID, err) continue }