feat: event support extra field

This commit is contained in:
710leo 2020-04-13 17:48:24 +08:00
parent 0604c690d4
commit 99ca7fb862
18 changed files with 115 additions and 107 deletions

1
.gitignore vendored
View File

@ -29,6 +29,7 @@ _test
/build /build
/dist /dist
/etc/*.local.yml /etc/*.local.yml
/etc/log/log.test.json
/data* /data*
.idea .idea

View File

@ -21,4 +21,5 @@ type History struct {
Tags map[string]string `json:"tags,omitempty"` // endpoint/counter Tags map[string]string `json:"tags,omitempty"` // endpoint/counter
Granularity int `json:"-"` // alarm补齐数据时需要 Granularity int `json:"-"` // alarm补齐数据时需要
Points []*RRDData `json:"points"` // 现场值 Points []*RRDData `json:"points"` // 现场值
Extra string `json:"extra"`
} }

View File

@ -18,6 +18,7 @@ type JudgeItem struct {
DsType string `json:"dstype"` DsType string `json:"dstype"`
Step int `json:"step"` Step int `json:"step"`
Sid int64 `json:"sid"` Sid int64 `json:"sid"`
Extra string `json:"extra"`
} }
func (j *JudgeItem) PrimaryKey() string { func (j *JudgeItem) PrimaryKey() string {

View File

@ -27,6 +27,7 @@ type MetricValue struct {
CounterType string `json:"counterType"` CounterType string `json:"counterType"`
Tags string `json:"tags"` Tags string `json:"tags"`
TagsMap map[string]string `json:"tagsMap"` //保留2种格式方便后端组件使用 TagsMap map[string]string `json:"tagsMap"` //保留2种格式方便后端组件使用
Extra string `json:"extra"`
} }
var bufferPool = sync.Pool{ var bufferPool = sync.Pool{

View File

@ -39,6 +39,7 @@ type EventDetail struct {
Tags map[string]string `json:"tags"` Tags map[string]string `json:"tags"`
Points []*EventDetailPoint `json:"points"` Points []*EventDetailPoint `json:"points"`
PredPoints []*EventDetailPoint `json:"pred_points,omitempty"` // 预测值, 预测值不为空时, 现场值对应的是实际值 PredPoints []*EventDetailPoint `json:"pred_points,omitempty"` // 预测值, 预测值不为空时, 现场值对应的是实际值
Extra string `json:"extra"`
} }
type EventDetailPoint struct { type EventDetailPoint struct {

View File

@ -35,7 +35,6 @@ func Push(metricItems []*dataobj.MetricValue) error {
} }
if item.CounterType == dataobj.COUNTER { if item.CounterType == dataobj.COUNTER {
if err := CounterToGauge(item); err != nil { if err := CounterToGauge(item); err != nil {
//旧值不存在则不推送
logger.Warning(err) logger.Warning(err)
continue continue
} }

View File

@ -28,22 +28,22 @@ func NewPluginScheduler(p *Plugin) *PluginScheduler {
return &scheduler return &scheduler
} }
func (this *PluginScheduler) Schedule() { func (p *PluginScheduler) Schedule() {
go func() { go func() {
for { for {
select { select {
case <-this.Ticker.C: case <-p.Ticker.C:
PluginRun(this.Plugin) PluginRun(p.Plugin)
case <-this.Quit: case <-p.Quit:
this.Ticker.Stop() p.Ticker.Stop()
return return
} }
} }
}() }()
} }
func (this *PluginScheduler) Stop() { func (p *PluginScheduler) Stop() {
close(this.Quit) close(p.Quit)
} }
func PluginRun(plugin *Plugin) { func PluginRun(plugin *Plugin) {

View File

@ -24,22 +24,22 @@ func NewPortScheduler(p *model.PortCollect) *PortScheduler {
return &scheduler return &scheduler
} }
func (this *PortScheduler) Schedule() { func (p *PortScheduler) Schedule() {
go func() { go func() {
for { for {
select { select {
case <-this.Ticker.C: case <-p.Ticker.C:
PortCollect(this.Port) PortCollect(p.Port)
case <-this.Quit: case <-p.Quit:
this.Ticker.Stop() p.Ticker.Stop()
return return
} }
} }
}() }()
} }
func (this *PortScheduler) Stop() { func (p *PortScheduler) Stop() {
close(this.Quit) close(p.Quit)
} }
func PortCollect(p *model.PortCollect) { func PortCollect(p *model.PortCollect) {

View File

@ -26,22 +26,22 @@ func NewProcScheduler(p *model.ProcCollect) *ProcScheduler {
return &scheduler return &scheduler
} }
func (this *ProcScheduler) Schedule() { func (p *ProcScheduler) Schedule() {
go func() { go func() {
for { for {
select { select {
case <-this.Ticker.C: case <-p.Ticker.C:
ProcCollect(this.Proc) ProcCollect(p.Proc)
case <-this.Quit: case <-p.Quit:
this.Ticker.Stop() p.Ticker.Stop()
return return
} }
} }
}() }()
} }
func (this *ProcScheduler) Stop() { func (p *ProcScheduler) Stop() {
close(this.Quit) close(p.Quit)
} }
func ProcCollect(p *model.ProcCollect) { func ProcCollect(p *model.ProcCollect) {

View File

@ -66,8 +66,8 @@ func createOnePool(name string, address string, connTimeout time.Duration, maxCo
} }
// 同步发送, 完成发送或超时后 才能返回 // 同步发送, 完成发送或超时后 才能返回
func (this *ConnPools) Call(method string, args interface{}, resp interface{}) error { func (cp *ConnPools) Call(method string, args interface{}, resp interface{}) error {
connPool := this.Get() connPool := cp.Get()
conn, err := connPool.Fetch() conn, err := connPool.Fetch()
if err != nil { if err != nil {
@ -75,7 +75,7 @@ func (this *ConnPools) Call(method string, args interface{}, resp interface{}) e
} }
rpcClient := conn.(RpcClient) rpcClient := conn.(RpcClient)
callTimeout := time.Duration(this.CallTimeout) * time.Millisecond callTimeout := time.Duration(cp.CallTimeout) * time.Millisecond
done := make(chan error, 1) done := make(chan error, 1)
go func() { go func() {
@ -97,12 +97,12 @@ func (this *ConnPools) Call(method string, args interface{}, resp interface{}) e
} }
} }
func (this *ConnPools) Get() *pool.ConnPool { func (cp *ConnPools) Get() *pool.ConnPool {
this.RLock() cp.RLock()
defer this.RUnlock() defer cp.RUnlock()
i := rand.Intn(len(this.Pools)) i := rand.Intn(len(cp.Pools))
return this.Pools[i] return cp.Pools[i]
} }
// RpcCient, 要实现io.Closer接口 // RpcCient, 要实现io.Closer接口
@ -111,23 +111,23 @@ type RpcClient struct {
name string name string
} }
func (this RpcClient) Name() string { func (r RpcClient) Name() string {
return this.name return r.name
} }
func (this RpcClient) Closed() bool { func (r RpcClient) Closed() bool {
return this.cli == nil return r.cli == nil
} }
func (this RpcClient) Close() error { func (r RpcClient) Close() error {
if this.cli != nil { if r.cli != nil {
err := this.cli.Close() err := r.cli.Close()
this.cli = nil r.cli = nil
return err return err
} }
return nil return nil
} }
func (this RpcClient) Call(method string, args interface{}, reply interface{}) error { func (r RpcClient) Call(method string, args interface{}, reply interface{}) error {
return this.cli.Call(method, args, reply) return r.cli.Call(method, args, reply)
} }

View File

@ -15,15 +15,15 @@ var (
LastEvents = &SafeEventMap{M: make(map[string]*dataobj.Event)} LastEvents = &SafeEventMap{M: make(map[string]*dataobj.Event)}
) )
func (this *SafeEventMap) Get(key string) (*dataobj.Event, bool) { func (s *SafeEventMap) Get(key string) (*dataobj.Event, bool) {
this.RLock() s.RLock()
defer this.RUnlock() defer s.RUnlock()
event, exists := this.M[key] event, exists := s.M[key]
return event, exists return event, exists
} }
func (this *SafeEventMap) Set(key string, event *dataobj.Event) { func (s *SafeEventMap) Set(key string, event *dataobj.Event) {
this.Lock() s.Lock()
defer this.Unlock() defer s.Unlock()
this.M[key] = event s.M[key] = event
} }

View File

@ -15,30 +15,30 @@ func NewJudgeItemMap() *JudgeItemMap {
return &JudgeItemMap{M: make(map[string]*SafeLinkedList)} return &JudgeItemMap{M: make(map[string]*SafeLinkedList)}
} }
func (this *JudgeItemMap) Get(key string) (*SafeLinkedList, bool) { func (j *JudgeItemMap) Get(key string) (*SafeLinkedList, bool) {
this.RLock() j.RLock()
defer this.RUnlock() defer j.RUnlock()
val, ok := this.M[key] val, ok := j.M[key]
return val, ok return val, ok
} }
func (this *JudgeItemMap) Set(key string, val *SafeLinkedList) { func (j *JudgeItemMap) Set(key string, val *SafeLinkedList) {
this.Lock() j.Lock()
defer this.Unlock() defer j.Unlock()
this.M[key] = val j.M[key] = val
} }
func (this *JudgeItemMap) Len() int { func (j *JudgeItemMap) Len() int {
this.RLock() j.RLock()
defer this.RUnlock() defer j.RUnlock()
return len(this.M) return len(j.M)
} }
func (this *JudgeItemMap) CleanStale(before int64) { func (j *JudgeItemMap) CleanStale(before int64) {
keys := []string{} keys := []string{}
this.RLock() j.RLock()
for key, L := range this.M { for key, L := range j.M {
front := L.Front() front := L.Front()
if front == nil { if front == nil {
continue continue
@ -48,21 +48,21 @@ func (this *JudgeItemMap) CleanStale(before int64) {
keys = append(keys, key) keys = append(keys, key)
} }
} }
this.RUnlock() j.RUnlock()
this.BatchDelete(keys) j.BatchDelete(keys)
} }
func (this *JudgeItemMap) BatchDelete(keys []string) { func (j *JudgeItemMap) BatchDelete(keys []string) {
count := len(keys) count := len(keys)
if count == 0 { if count == 0 {
return return
} }
this.Lock() j.Lock()
defer this.Unlock() defer j.Unlock()
for i := 0; i < count; i++ { for i := 0; i < count; i++ {
delete(this.M, keys[i]) delete(j.M, keys[i])
} }
} }

View File

@ -58,10 +58,10 @@ func ToJudge(historyMap *cache.JudgeItemMap, key string, val *dataobj.JudgeItem,
} }
history := []dataobj.History{} history := []dataobj.History{}
Judge(stra, stra.Exprs, historyData, val, now, history, "", "", []bool{}) Judge(stra, stra.Exprs, historyData, val, now, history, "", "", "", []bool{})
} }
func Judge(stra *model.Stra, exps []model.Exp, historyData []*dataobj.RRDData, firstItem *dataobj.JudgeItem, now int64, history []dataobj.History, info string, value string, status []bool) { func Judge(stra *model.Stra, exps []model.Exp, historyData []*dataobj.RRDData, firstItem *dataobj.JudgeItem, now int64, history []dataobj.History, info string, value string, extra string, status []bool) {
stats.Counter.Set("running", 1) stats.Counter.Set("running", 1)
if len(exps) < 1 { if len(exps) < 1 {
@ -85,6 +85,10 @@ func Judge(stra *model.Stra, exps []model.Exp, historyData []*dataobj.RRDData, f
Granularity: int(firstItem.Step), Granularity: int(firstItem.Step),
Points: historyData, Points: historyData,
} }
if len(history) == 0 {
//只有第一个指标是push的模式可以获取到extra字段
h.Extra = firstItem.Extra
}
history = append(history, h) history = append(history, h)
defer func() { defer func() {
@ -130,7 +134,7 @@ func Judge(stra *model.Stra, exps []model.Exp, historyData []*dataobj.RRDData, f
Tags: "", Tags: "",
DsType: "GAUGE", DsType: "GAUGE",
} }
Judge(stra, exps[1:], []*dataobj.RRDData{}, judgeItem, now, history, info, value, status) Judge(stra, exps[1:], []*dataobj.RRDData{}, judgeItem, now, history, info, value, extra, status)
return return
} }
@ -138,7 +142,7 @@ func Judge(stra *model.Stra, exps []model.Exp, historyData []*dataobj.RRDData, f
firstItem.Endpoint = respData[i].Endpoint firstItem.Endpoint = respData[i].Endpoint
firstItem.Tags = getTags(respData[i].Counter) firstItem.Tags = getTags(respData[i].Counter)
firstItem.Step = respData[i].Step firstItem.Step = respData[i].Step
Judge(stra, exps[1:], respData[i].Values, firstItem, now, history, info, value, status) Judge(stra, exps[1:], respData[i].Values, firstItem, now, history, info, value, extra, status)
} }
} else { } else {
@ -157,7 +161,7 @@ func Judge(stra *model.Stra, exps []model.Exp, historyData []*dataobj.RRDData, f
firstItem.Endpoint = respData[i].Endpoint firstItem.Endpoint = respData[i].Endpoint
firstItem.Tags = getTags(respData[i].Counter) firstItem.Tags = getTags(respData[i].Counter)
firstItem.Step = respData[i].Step firstItem.Step = respData[i].Step
Judge(stra, exps[1:], respData[i].Values, firstItem, now, history, info, value, status) Judge(stra, exps[1:], respData[i].Values, firstItem, now, history, info, value, extra, status)
} }
} }
} }

View File

@ -49,10 +49,7 @@ func nodataJudge() {
} }
nodataJob.Acquire() nodataJob.Acquire()
go func(stra *model.Stra, exps []model.Exp, historyData []*dataobj.RRDData, firstItem *dataobj.JudgeItem, now int64, history []dataobj.History, info string, value string, status []bool) { go AsyncJudge(nodataJob, stra, stra.Exprs, []*dataobj.RRDData{}, judgeItem, now, []dataobj.History{}, "", "", "", []bool{})
defer nodataJob.Release()
Judge(stra, exps, historyData, firstItem, now, history, info, value, status)
}(stra, stra.Exprs, []*dataobj.RRDData{}, judgeItem, now, []dataobj.History{}, "", "", []bool{})
} }
return return
} }
@ -79,10 +76,12 @@ func nodataJudge() {
} }
nodataJob.Acquire() nodataJob.Acquire()
go func(stra *model.Stra, exps []model.Exp, historyData []*dataobj.RRDData, firstItem *dataobj.JudgeItem, now int64, history []dataobj.History, info string, value string, status []bool) { go AsyncJudge(nodataJob, stra, stra.Exprs, data.Values, judgeItem, now, []dataobj.History{}, "", "", "", []bool{})
defer nodataJob.Release()
Judge(stra, exps, historyData, firstItem, now, history, info, value, status)
}(stra, stra.Exprs, data.Values, judgeItem, now, []dataobj.History{}, "", "", []bool{})
} }
} }
} }
func AsyncJudge(sema *semaphore.Semaphore, stra *model.Stra, exps []model.Exp, historyData []*dataobj.RRDData, firstItem *dataobj.JudgeItem, now int64, history []dataobj.History, info string, value string, extra string, status []bool) {
defer sema.Release()
Judge(stra, exps, historyData, firstItem, now, history, info, value, extra, status)
}

View File

@ -11,24 +11,24 @@ type ConsistentHashRing struct {
ring *consistent.Consistent ring *consistent.Consistent
} }
func (this *ConsistentHashRing) GetNode(pk string) (string, error) { func (c *ConsistentHashRing) GetNode(pk string) (string, error) {
this.RLock() c.RLock()
defer this.RUnlock() defer c.RUnlock()
return this.ring.Get(pk) return c.ring.Get(pk)
} }
func (this *ConsistentHashRing) Set(r *consistent.Consistent) { func (c *ConsistentHashRing) Set(r *consistent.Consistent) {
this.Lock() c.Lock()
defer this.Unlock() defer c.Unlock()
this.ring = r c.ring = r
} }
func (this *ConsistentHashRing) GetRing() *consistent.Consistent { func (c *ConsistentHashRing) GetRing() *consistent.Consistent {
this.RLock() c.RLock()
defer this.RUnlock() defer c.RUnlock()
return this.ring return c.ring
} }
func NewConsistentHashRing(replicas int32, nodes []string) *ConsistentHashRing { func NewConsistentHashRing(replicas int32, nodes []string) *ConsistentHashRing {

View File

@ -196,6 +196,7 @@ func Push2JudgeSendQueue(items []*dataobj.MetricValue) {
TagsMap: item.TagsMap, TagsMap: item.TagsMap,
Step: int(item.Step), Step: int(item.Step),
Sid: stra.Id, Sid: stra.Id,
Extra: item.Extra,
} }
q, exists := JudgeQueues.Get(stra.JudgeInstance) q, exists := JudgeQueues.Get(stra.JudgeInstance)

View File

@ -15,16 +15,16 @@ var (
StraMap = &SafeStraMap{M: make(map[string]map[string][]*model.Stra)} StraMap = &SafeStraMap{M: make(map[string]map[string][]*model.Stra)}
) )
func (this *SafeStraMap) ReInit(m map[string]map[string][]*model.Stra) { func (s *SafeStraMap) ReInit(m map[string]map[string][]*model.Stra) {
this.Lock() s.Lock()
defer this.Unlock() defer s.Unlock()
this.M = m s.M = m
} }
func (this *SafeStraMap) GetByKey(key string) []*model.Stra { func (s *SafeStraMap) GetByKey(key string) []*model.Stra {
this.RLock() s.RLock()
defer this.RUnlock() defer s.RUnlock()
m, exists := this.M[key[0:2]] m, exists := s.M[key[0:2]]
if !exists { if !exists {
return []*model.Stra{} return []*model.Stra{}
} }
@ -32,11 +32,11 @@ func (this *SafeStraMap) GetByKey(key string) []*model.Stra {
return m[key] return m[key]
} }
func (this *SafeStraMap) GetAll() []*model.Stra { func (s *SafeStraMap) GetAll() []*model.Stra {
this.RLock() s.RLock()
defer this.RUnlock() defer s.RUnlock()
stras := []*model.Stra{} stras := []*model.Stra{}
for _, m := range this.M { for _, m := range s.M {
for _, stra := range m { for _, stra := range m {
stras = append(stras, stra...) stras = append(stras, stra...)
} }

View File

@ -5,7 +5,7 @@ import (
"github.com/didi/nightingale/src/modules/transfer/backend" "github.com/didi/nightingale/src/modules/transfer/backend"
) )
func (this *Transfer) Query(args []dataobj.QueryData, reply *dataobj.QueryDataResp) error { func (t *Transfer) Query(args []dataobj.QueryData, reply *dataobj.QueryDataResp) error {
//start := time.Now() //start := time.Now()
reply.Data = backend.FetchData(args) reply.Data = backend.FetchData(args)
return nil return nil