nightingale/trans/push.go

140 lines
3.6 KiB
Go
Raw Normal View History

package trans
import (
"bytes"
"fmt"
"sort"
"sync"
"time"
"github.com/toolkits/pkg/logger"
"github.com/toolkits/pkg/str"
"github.com/didi/nightingale/v5/backend"
"github.com/didi/nightingale/v5/cache"
"github.com/didi/nightingale/v5/models"
"github.com/didi/nightingale/v5/naming"
"github.com/didi/nightingale/v5/vos"
)
func Push(points []*vos.MetricPoint) error {
if points == nil {
return fmt.Errorf("param(points) is nil")
}
count := len(points)
if count == 0 {
return fmt.Errorf("param(points) is empty")
}
var reterr error
// 把ident->alias做成map放内存里后续要周期性与DB中的数据对比更新resource表
aliasMapper := make(map[string]interface{})
now := time.Now().Unix()
validPoints := make([]*vos.MetricPoint, 0, count)
for i := 0; i < count; i++ {
logger.Debugf("recv %+v", points[i])
// 如果tags中发现有__ident__和__alias__就提到外层这个逻辑是为了应对snmp之类的场景
if val, has := points[i].TagsMap["__ident__"]; has {
points[i].Ident = val
delete(points[i].TagsMap, "__ident__")
}
if val, has := points[i].TagsMap["__alias__"]; has {
points[i].Alias = val
delete(points[i].TagsMap, "__alias__")
}
if err := points[i].Tidy(now); err != nil {
// 如果有部分point校验失败没关系把error返回即可正常的可以继续往下走
logger.Warningf("point %+v is invalid, err:%v ", points[i], err)
reterr = err
} else {
if points[i].Ident != "" {
// 把当前时间也带上处理的时候只处理最近的数据避免alias发生变化且数据分散在多个server造成的alias不一致的问题
aliasMapper[points[i].Ident] = &models.AliasTime{Alias: points[i].Alias, Time: now}
}
// 将resource的tag追加到曲线的tag中根据tagsmap生成tagslst排序生成primarykey
enrich(points[i])
validPoints = append(validPoints, points[i])
}
}
models.AliasMapper.MSet(aliasMapper)
// 路由数据做转发的逻辑可以做成异步这个过程如果有错都是系统内部错误不需要暴露给client侧
go DispatchPoints(validPoints)
return reterr
}
func DispatchPoints(points []*vos.MetricPoint) {
// send to push endpoints
pushEndpoints, err := backend.GetPushEndpoints()
if err != nil {
logger.Errorf("could not find pushendpoint:%v", err)
} else {
for _, pushendpoint := range pushEndpoints {
go pushendpoint.Push2Queue(points)
}
}
// send to judge queue
for i := range points {
node, err := naming.HashRing.GetNode(points[i].PK)
if err != nil {
logger.Errorf("could not find node:%v", err)
continue
}
q, exists := queues.Get(node)
if !exists {
logger.Errorf("could not find queue by %s", node)
continue
}
q.PushFront(points[i])
}
}
var bufferPool = sync.Pool{New: func() interface{} { return new(bytes.Buffer) }}
func enrich(point *vos.MetricPoint) {
// 把res的tags附到point上
resAndTags, exists := cache.ResTags.Get(point.Ident)
if exists {
for k, v := range resAndTags.Tags {
point.TagsMap[k] = v
}
}
// 根据tagsmap生成tagslstsort
count := len(point.TagsMap)
if count == 0 {
point.TagsLst = []string{}
} else {
lst := make([]string, 0, count)
for k, v := range point.TagsMap {
lst = append(lst, k+"="+v)
}
sort.Strings(lst)
point.TagsLst = lst
}
// ident metric tagslst 生成 pk
ret := bufferPool.Get().(*bytes.Buffer)
ret.Reset()
defer bufferPool.Put(ret)
ret.WriteString(point.Ident)
ret.WriteString(point.Metric)
for i := 0; i < len(point.TagsLst); i++ {
ret.WriteString(point.TagsLst[i])
}
point.PK = str.MD5(ret.String())
}