diff --git a/.gitignore b/.gitignore index ef34b927..c1360187 100644 --- a/.gitignore +++ b/.gitignore @@ -29,6 +29,7 @@ _test /build /dist /etc/*.local.yml +/etc/log/log.test.json /data* .idea diff --git a/src/dataobj/event.go b/src/dataobj/event.go index 35f7790c..f62bc87d 100644 --- a/src/dataobj/event.go +++ b/src/dataobj/event.go @@ -21,4 +21,5 @@ type History struct { Tags map[string]string `json:"tags,omitempty"` // endpoint/counter Granularity int `json:"-"` // alarm补齐数据时需要 Points []*RRDData `json:"points"` // 现场值 + Extra string `json:"extra"` } diff --git a/src/dataobj/judge.go b/src/dataobj/judge.go index 507f5f3d..5031084b 100644 --- a/src/dataobj/judge.go +++ b/src/dataobj/judge.go @@ -18,6 +18,7 @@ type JudgeItem struct { DsType string `json:"dstype"` Step int `json:"step"` Sid int64 `json:"sid"` + Extra string `json:"extra"` } func (j *JudgeItem) PrimaryKey() string { diff --git a/src/dataobj/metric.go b/src/dataobj/metric.go index 75e1e891..0a10bf5e 100644 --- a/src/dataobj/metric.go +++ b/src/dataobj/metric.go @@ -27,6 +27,7 @@ type MetricValue struct { CounterType string `json:"counterType"` Tags string `json:"tags"` TagsMap map[string]string `json:"tagsMap"` //保留2种格式,方便后端组件使用 + Extra string `json:"extra"` } var bufferPool = sync.Pool{ diff --git a/src/model/event.go b/src/model/event.go index f2e8d4d7..c7a44acb 100644 --- a/src/model/event.go +++ b/src/model/event.go @@ -39,6 +39,7 @@ type EventDetail struct { Tags map[string]string `json:"tags"` Points []*EventDetailPoint `json:"points"` PredPoints []*EventDetailPoint `json:"pred_points,omitempty"` // 预测值, 预测值不为空时, 现场值对应的是实际值 + Extra string `json:"extra"` } type EventDetailPoint struct { diff --git a/src/modules/collector/sys/funcs/push.go b/src/modules/collector/sys/funcs/push.go index df3f4a73..ad61afdd 100644 --- a/src/modules/collector/sys/funcs/push.go +++ b/src/modules/collector/sys/funcs/push.go @@ -35,7 +35,6 @@ func Push(metricItems []*dataobj.MetricValue) error { } if item.CounterType == dataobj.COUNTER { if err := CounterToGauge(item); err != nil { - //旧值不存在则不推送 logger.Warning(err) continue } diff --git a/src/modules/collector/sys/plugins/scheduler.go b/src/modules/collector/sys/plugins/scheduler.go index f771b5e1..f9f28887 100644 --- a/src/modules/collector/sys/plugins/scheduler.go +++ b/src/modules/collector/sys/plugins/scheduler.go @@ -28,22 +28,22 @@ func NewPluginScheduler(p *Plugin) *PluginScheduler { return &scheduler } -func (this *PluginScheduler) Schedule() { +func (p *PluginScheduler) Schedule() { go func() { for { select { - case <-this.Ticker.C: - PluginRun(this.Plugin) - case <-this.Quit: - this.Ticker.Stop() + case <-p.Ticker.C: + PluginRun(p.Plugin) + case <-p.Quit: + p.Ticker.Stop() return } } }() } -func (this *PluginScheduler) Stop() { - close(this.Quit) +func (p *PluginScheduler) Stop() { + close(p.Quit) } func PluginRun(plugin *Plugin) { diff --git a/src/modules/collector/sys/ports/scheduler.go b/src/modules/collector/sys/ports/scheduler.go index 78431310..0eb20f40 100644 --- a/src/modules/collector/sys/ports/scheduler.go +++ b/src/modules/collector/sys/ports/scheduler.go @@ -24,22 +24,22 @@ func NewPortScheduler(p *model.PortCollect) *PortScheduler { return &scheduler } -func (this *PortScheduler) Schedule() { +func (p *PortScheduler) Schedule() { go func() { for { select { - case <-this.Ticker.C: - PortCollect(this.Port) - case <-this.Quit: - this.Ticker.Stop() + case <-p.Ticker.C: + PortCollect(p.Port) + case <-p.Quit: + p.Ticker.Stop() return } } }() } -func (this *PortScheduler) Stop() { - close(this.Quit) +func (p *PortScheduler) Stop() { + close(p.Quit) } func PortCollect(p *model.PortCollect) { diff --git a/src/modules/collector/sys/procs/scheduler.go b/src/modules/collector/sys/procs/scheduler.go index c27fc83f..036f3e10 100644 --- a/src/modules/collector/sys/procs/scheduler.go +++ b/src/modules/collector/sys/procs/scheduler.go @@ -26,22 +26,22 @@ func NewProcScheduler(p *model.ProcCollect) *ProcScheduler { return &scheduler } -func (this *ProcScheduler) Schedule() { +func (p *ProcScheduler) Schedule() { go func() { for { select { - case <-this.Ticker.C: - ProcCollect(this.Proc) - case <-this.Quit: - this.Ticker.Stop() + case <-p.Ticker.C: + ProcCollect(p.Proc) + case <-p.Quit: + p.Ticker.Stop() return } } }() } -func (this *ProcScheduler) Stop() { - close(this.Quit) +func (p *ProcScheduler) Stop() { + close(p.Quit) } func ProcCollect(p *model.ProcCollect) { diff --git a/src/modules/judge/backend/query/pool.go b/src/modules/judge/backend/query/pool.go index e5561978..06bfb1e4 100644 --- a/src/modules/judge/backend/query/pool.go +++ b/src/modules/judge/backend/query/pool.go @@ -66,8 +66,8 @@ func createOnePool(name string, address string, connTimeout time.Duration, maxCo } // 同步发送, 完成发送或超时后 才能返回 -func (this *ConnPools) Call(method string, args interface{}, resp interface{}) error { - connPool := this.Get() +func (cp *ConnPools) Call(method string, args interface{}, resp interface{}) error { + connPool := cp.Get() conn, err := connPool.Fetch() if err != nil { @@ -75,7 +75,7 @@ func (this *ConnPools) Call(method string, args interface{}, resp interface{}) e } rpcClient := conn.(RpcClient) - callTimeout := time.Duration(this.CallTimeout) * time.Millisecond + callTimeout := time.Duration(cp.CallTimeout) * time.Millisecond done := make(chan error, 1) go func() { @@ -97,12 +97,12 @@ func (this *ConnPools) Call(method string, args interface{}, resp interface{}) e } } -func (this *ConnPools) Get() *pool.ConnPool { - this.RLock() - defer this.RUnlock() - i := rand.Intn(len(this.Pools)) +func (cp *ConnPools) Get() *pool.ConnPool { + cp.RLock() + defer cp.RUnlock() + i := rand.Intn(len(cp.Pools)) - return this.Pools[i] + return cp.Pools[i] } // RpcCient, 要实现io.Closer接口 @@ -111,23 +111,23 @@ type RpcClient struct { name string } -func (this RpcClient) Name() string { - return this.name +func (r RpcClient) Name() string { + return r.name } -func (this RpcClient) Closed() bool { - return this.cli == nil +func (r RpcClient) Closed() bool { + return r.cli == nil } -func (this RpcClient) Close() error { - if this.cli != nil { - err := this.cli.Close() - this.cli = nil +func (r RpcClient) Close() error { + if r.cli != nil { + err := r.cli.Close() + r.cli = nil return err } return nil } -func (this RpcClient) Call(method string, args interface{}, reply interface{}) error { - return this.cli.Call(method, args, reply) +func (r RpcClient) Call(method string, args interface{}, reply interface{}) error { + return r.cli.Call(method, args, reply) } diff --git a/src/modules/judge/cache/event.go b/src/modules/judge/cache/event.go index 4d4f89f4..4108ba08 100644 --- a/src/modules/judge/cache/event.go +++ b/src/modules/judge/cache/event.go @@ -15,15 +15,15 @@ var ( LastEvents = &SafeEventMap{M: make(map[string]*dataobj.Event)} ) -func (this *SafeEventMap) Get(key string) (*dataobj.Event, bool) { - this.RLock() - defer this.RUnlock() - event, exists := this.M[key] +func (s *SafeEventMap) Get(key string) (*dataobj.Event, bool) { + s.RLock() + defer s.RUnlock() + event, exists := s.M[key] return event, exists } -func (this *SafeEventMap) Set(key string, event *dataobj.Event) { - this.Lock() - defer this.Unlock() - this.M[key] = event +func (s *SafeEventMap) Set(key string, event *dataobj.Event) { + s.Lock() + defer s.Unlock() + s.M[key] = event } diff --git a/src/modules/judge/cache/history.go b/src/modules/judge/cache/history.go index f046c9ae..bb5c90df 100644 --- a/src/modules/judge/cache/history.go +++ b/src/modules/judge/cache/history.go @@ -15,30 +15,30 @@ func NewJudgeItemMap() *JudgeItemMap { return &JudgeItemMap{M: make(map[string]*SafeLinkedList)} } -func (this *JudgeItemMap) Get(key string) (*SafeLinkedList, bool) { - this.RLock() - defer this.RUnlock() - val, ok := this.M[key] +func (j *JudgeItemMap) Get(key string) (*SafeLinkedList, bool) { + j.RLock() + defer j.RUnlock() + val, ok := j.M[key] return val, ok } -func (this *JudgeItemMap) Set(key string, val *SafeLinkedList) { - this.Lock() - defer this.Unlock() - this.M[key] = val +func (j *JudgeItemMap) Set(key string, val *SafeLinkedList) { + j.Lock() + defer j.Unlock() + j.M[key] = val } -func (this *JudgeItemMap) Len() int { - this.RLock() - defer this.RUnlock() - return len(this.M) +func (j *JudgeItemMap) Len() int { + j.RLock() + defer j.RUnlock() + return len(j.M) } -func (this *JudgeItemMap) CleanStale(before int64) { +func (j *JudgeItemMap) CleanStale(before int64) { keys := []string{} - this.RLock() - for key, L := range this.M { + j.RLock() + for key, L := range j.M { front := L.Front() if front == nil { continue @@ -48,21 +48,21 @@ func (this *JudgeItemMap) CleanStale(before int64) { keys = append(keys, key) } } - this.RUnlock() + j.RUnlock() - this.BatchDelete(keys) + j.BatchDelete(keys) } -func (this *JudgeItemMap) BatchDelete(keys []string) { +func (j *JudgeItemMap) BatchDelete(keys []string) { count := len(keys) if count == 0 { return } - this.Lock() - defer this.Unlock() + j.Lock() + defer j.Unlock() for i := 0; i < count; i++ { - delete(this.M, keys[i]) + delete(j.M, keys[i]) } } diff --git a/src/modules/judge/judge/judge.go b/src/modules/judge/judge/judge.go index 78698949..cb9dc1ea 100644 --- a/src/modules/judge/judge/judge.go +++ b/src/modules/judge/judge/judge.go @@ -58,10 +58,10 @@ func ToJudge(historyMap *cache.JudgeItemMap, key string, val *dataobj.JudgeItem, } history := []dataobj.History{} - Judge(stra, stra.Exprs, historyData, val, now, history, "", "", []bool{}) + Judge(stra, stra.Exprs, historyData, val, now, history, "", "", "", []bool{}) } -func Judge(stra *model.Stra, exps []model.Exp, historyData []*dataobj.RRDData, firstItem *dataobj.JudgeItem, now int64, history []dataobj.History, info string, value string, status []bool) { +func Judge(stra *model.Stra, exps []model.Exp, historyData []*dataobj.RRDData, firstItem *dataobj.JudgeItem, now int64, history []dataobj.History, info string, value string, extra string, status []bool) { stats.Counter.Set("running", 1) if len(exps) < 1 { @@ -85,6 +85,10 @@ func Judge(stra *model.Stra, exps []model.Exp, historyData []*dataobj.RRDData, f Granularity: int(firstItem.Step), Points: historyData, } + if len(history) == 0 { + //只有第一个指标是push的模式,可以获取到extra字段 + h.Extra = firstItem.Extra + } history = append(history, h) defer func() { @@ -130,7 +134,7 @@ func Judge(stra *model.Stra, exps []model.Exp, historyData []*dataobj.RRDData, f Tags: "", DsType: "GAUGE", } - Judge(stra, exps[1:], []*dataobj.RRDData{}, judgeItem, now, history, info, value, status) + Judge(stra, exps[1:], []*dataobj.RRDData{}, judgeItem, now, history, info, value, extra, status) return } @@ -138,7 +142,7 @@ func Judge(stra *model.Stra, exps []model.Exp, historyData []*dataobj.RRDData, f firstItem.Endpoint = respData[i].Endpoint firstItem.Tags = getTags(respData[i].Counter) firstItem.Step = respData[i].Step - Judge(stra, exps[1:], respData[i].Values, firstItem, now, history, info, value, status) + Judge(stra, exps[1:], respData[i].Values, firstItem, now, history, info, value, extra, status) } } else { @@ -157,7 +161,7 @@ func Judge(stra *model.Stra, exps []model.Exp, historyData []*dataobj.RRDData, f firstItem.Endpoint = respData[i].Endpoint firstItem.Tags = getTags(respData[i].Counter) firstItem.Step = respData[i].Step - Judge(stra, exps[1:], respData[i].Values, firstItem, now, history, info, value, status) + Judge(stra, exps[1:], respData[i].Values, firstItem, now, history, info, value, extra, status) } } } diff --git a/src/modules/judge/judge/nodata.go b/src/modules/judge/judge/nodata.go index 5a7c406c..0b2a3d2f 100644 --- a/src/modules/judge/judge/nodata.go +++ b/src/modules/judge/judge/nodata.go @@ -49,10 +49,7 @@ func nodataJudge() { } nodataJob.Acquire() - go func(stra *model.Stra, exps []model.Exp, historyData []*dataobj.RRDData, firstItem *dataobj.JudgeItem, now int64, history []dataobj.History, info string, value string, status []bool) { - defer nodataJob.Release() - Judge(stra, exps, historyData, firstItem, now, history, info, value, status) - }(stra, stra.Exprs, []*dataobj.RRDData{}, judgeItem, now, []dataobj.History{}, "", "", []bool{}) + go AsyncJudge(nodataJob, stra, stra.Exprs, []*dataobj.RRDData{}, judgeItem, now, []dataobj.History{}, "", "", "", []bool{}) } return } @@ -79,10 +76,12 @@ func nodataJudge() { } nodataJob.Acquire() - go func(stra *model.Stra, exps []model.Exp, historyData []*dataobj.RRDData, firstItem *dataobj.JudgeItem, now int64, history []dataobj.History, info string, value string, status []bool) { - defer nodataJob.Release() - Judge(stra, exps, historyData, firstItem, now, history, info, value, status) - }(stra, stra.Exprs, data.Values, judgeItem, now, []dataobj.History{}, "", "", []bool{}) + go AsyncJudge(nodataJob, stra, stra.Exprs, data.Values, judgeItem, now, []dataobj.History{}, "", "", "", []bool{}) } } } + +func AsyncJudge(sema *semaphore.Semaphore, stra *model.Stra, exps []model.Exp, historyData []*dataobj.RRDData, firstItem *dataobj.JudgeItem, now int64, history []dataobj.History, info string, value string, extra string, status []bool) { + defer sema.Release() + Judge(stra, exps, historyData, firstItem, now, history, info, value, extra, status) +} diff --git a/src/modules/transfer/backend/ring.go b/src/modules/transfer/backend/ring.go index 8cfc1e76..cd53ec81 100644 --- a/src/modules/transfer/backend/ring.go +++ b/src/modules/transfer/backend/ring.go @@ -11,24 +11,24 @@ type ConsistentHashRing struct { ring *consistent.Consistent } -func (this *ConsistentHashRing) GetNode(pk string) (string, error) { - this.RLock() - defer this.RUnlock() +func (c *ConsistentHashRing) GetNode(pk string) (string, error) { + c.RLock() + defer c.RUnlock() - return this.ring.Get(pk) + return c.ring.Get(pk) } -func (this *ConsistentHashRing) Set(r *consistent.Consistent) { - this.Lock() - defer this.Unlock() - this.ring = r +func (c *ConsistentHashRing) Set(r *consistent.Consistent) { + c.Lock() + defer c.Unlock() + c.ring = r } -func (this *ConsistentHashRing) GetRing() *consistent.Consistent { - this.RLock() - defer this.RUnlock() +func (c *ConsistentHashRing) GetRing() *consistent.Consistent { + c.RLock() + defer c.RUnlock() - return this.ring + return c.ring } func NewConsistentHashRing(replicas int32, nodes []string) *ConsistentHashRing { diff --git a/src/modules/transfer/backend/sender.go b/src/modules/transfer/backend/sender.go index f3ea01a0..d3b4e8ca 100644 --- a/src/modules/transfer/backend/sender.go +++ b/src/modules/transfer/backend/sender.go @@ -196,6 +196,7 @@ func Push2JudgeSendQueue(items []*dataobj.MetricValue) { TagsMap: item.TagsMap, Step: int(item.Step), Sid: stra.Id, + Extra: item.Extra, } q, exists := JudgeQueues.Get(stra.JudgeInstance) diff --git a/src/modules/transfer/cache/stra.go b/src/modules/transfer/cache/stra.go index 902ce6ec..49a7f62f 100644 --- a/src/modules/transfer/cache/stra.go +++ b/src/modules/transfer/cache/stra.go @@ -15,16 +15,16 @@ var ( StraMap = &SafeStraMap{M: make(map[string]map[string][]*model.Stra)} ) -func (this *SafeStraMap) ReInit(m map[string]map[string][]*model.Stra) { - this.Lock() - defer this.Unlock() - this.M = m +func (s *SafeStraMap) ReInit(m map[string]map[string][]*model.Stra) { + s.Lock() + defer s.Unlock() + s.M = m } -func (this *SafeStraMap) GetByKey(key string) []*model.Stra { - this.RLock() - defer this.RUnlock() - m, exists := this.M[key[0:2]] +func (s *SafeStraMap) GetByKey(key string) []*model.Stra { + s.RLock() + defer s.RUnlock() + m, exists := s.M[key[0:2]] if !exists { return []*model.Stra{} } @@ -32,11 +32,11 @@ func (this *SafeStraMap) GetByKey(key string) []*model.Stra { return m[key] } -func (this *SafeStraMap) GetAll() []*model.Stra { - this.RLock() - defer this.RUnlock() +func (s *SafeStraMap) GetAll() []*model.Stra { + s.RLock() + defer s.RUnlock() stras := []*model.Stra{} - for _, m := range this.M { + for _, m := range s.M { for _, stra := range m { stras = append(stras, stra...) } diff --git a/src/modules/transfer/rpc/query.go b/src/modules/transfer/rpc/query.go index 272d147b..f995b893 100644 --- a/src/modules/transfer/rpc/query.go +++ b/src/modules/transfer/rpc/query.go @@ -5,7 +5,7 @@ import ( "github.com/didi/nightingale/src/modules/transfer/backend" ) -func (this *Transfer) Query(args []dataobj.QueryData, reply *dataobj.QueryDataResp) error { +func (t *Transfer) Query(args []dataobj.QueryData, reply *dataobj.QueryDataResp) error { //start := time.Now() reply.Data = backend.FetchData(args) return nil