refactor: change transfer stats metric

This commit is contained in:
710leo 2020-03-27 14:58:36 +08:00
parent aac5fd1a93
commit 2f86831c60
2 changed files with 10 additions and 11 deletions

View File

@ -40,6 +40,9 @@ func (i *IndexMap) Set(id int64, hash string, s Series) {
} }
func (i *IndexMap) Get(id int64) []Series { func (i *IndexMap) Get(id int64) []Series {
i.RLock()
defer i.RUnlock()
seriess := []Series{} seriess := []Series{}
if ss, exists := i.Data[id]; exists { if ss, exists := i.Data[id]; exists {
for _, s := range ss { for _, s := range ss {

View File

@ -62,17 +62,15 @@ func Send2TsdbTask(Q *list.SafeListLimited, node string, addr string, concurrent
for { for {
items := Q.PopBackBy(batch) items := Q.PopBackBy(batch)
count := len(items) count := len(items)
stats.Counter.Set("tsdb.queue.push", count)
if count == 0 { if count == 0 {
time.Sleep(DefaultSendTaskSleepInterval) time.Sleep(DefaultSendTaskSleepInterval)
continue continue
} }
tsdbItems := make([]*dataobj.TsdbItem, count) tsdbItems := make([]*dataobj.TsdbItem, count)
stats.Counter.Set("points.out.tsdb", count)
for i := 0; i < count; i++ { for i := 0; i < count; i++ {
tsdbItems[i] = items[i].(*dataobj.TsdbItem) tsdbItems[i] = items[i].(*dataobj.TsdbItem)
stats.Counter.Set("points.out.tsdb", 1)
logger.Debug("send to tsdb->: ", tsdbItems[i]) logger.Debug("send to tsdb->: ", tsdbItems[i])
} }
@ -93,10 +91,8 @@ func Send2TsdbTask(Q *list.SafeListLimited, node string, addr string, concurrent
time.Sleep(time.Millisecond * 10) time.Sleep(time.Millisecond * 10)
} }
// statistics
//atomic.AddInt64(&PointOut2Tsdb, int64(count))
if !sendOk { if !sendOk {
stats.Counter.Set("points.out.tsdb.err", 1) stats.Counter.Set("points.out.tsdb.err", count)
logger.Errorf("send %v to tsdb %s:%s fail: %v", tsdbItems, node, addr, err) logger.Errorf("send %v to tsdb %s:%s fail: %v", tsdbItems, node, addr, err)
} else { } else {
logger.Debugf("send to tsdb %s:%s ok", node, addr) logger.Debugf("send to tsdb %s:%s ok", node, addr)
@ -145,11 +141,10 @@ func Send2JudgeTask(Q *list.SafeListLimited, addr string, concurrent int) {
time.Sleep(DefaultSendTaskSleepInterval) time.Sleep(DefaultSendTaskSleepInterval)
continue continue
} }
judgeItems := make([]*dataobj.JudgeItem, count) judgeItems := make([]*dataobj.JudgeItem, count)
stats.Counter.Set("points.out.judge", count)
for i := 0; i < count; i++ { for i := 0; i < count; i++ {
judgeItems[i] = items[i].(*dataobj.JudgeItem) judgeItems[i] = items[i].(*dataobj.JudgeItem)
stats.Counter.Set("points.out.judge", 1)
logger.Debug("send to judge: ", judgeItems[i]) logger.Debug("send to judge: ", judgeItems[i])
} }
@ -171,8 +166,10 @@ func Send2JudgeTask(Q *list.SafeListLimited, addr string, concurrent int) {
} }
if !sendOk { if !sendOk {
stats.Counter.Set("points.out.judge.err", 1) stats.Counter.Set("points.out.judge.err", count)
logger.Errorf("send %v to judge %s fail: %v", judgeItems, addr, err) for _, item := range judgeItems {
logger.Errorf("send %v to judge %s fail: %v", item, addr, err)
}
} }
}(addr, judgeItems, count) }(addr, judgeItems, count)
@ -186,7 +183,6 @@ func Push2JudgeSendQueue(items []*dataobj.MetricValue) {
stras := cache.StraMap.GetByKey(key) stras := cache.StraMap.GetByKey(key)
for _, stra := range stras { for _, stra := range stras {
if !TagMatch(stra.Tags, item.TagsMap) { if !TagMatch(stra.Tags, item.TagsMap) {
continue continue
} }