From 7f1a947226f41cb80f495aa4755ceb77464c60d1 Mon Sep 17 00:00:00 2001 From: 710leo <710leo@gmail.com> Date: Tue, 28 Apr 2020 19:26:07 +0800 Subject: [PATCH] refactor: add timestamp check --- src/dataobj/metric.go | 27 ++++++++++++------- src/modules/collector/sys/funcs/push.go | 4 ++- .../transfer/http/routes/push_router.go | 4 ++- src/modules/transfer/rpc/push.go | 2 +- 4 files changed, 24 insertions(+), 13 deletions(-) diff --git a/src/dataobj/metric.go b/src/dataobj/metric.go index 0a10bf5e..4e60c5cc 100644 --- a/src/dataobj/metric.go +++ b/src/dataobj/metric.go @@ -7,7 +7,6 @@ import ( "strconv" "strings" "sync" - "time" ) const ( @@ -61,7 +60,7 @@ func (m *MetricValue) PK() string { return ret.String() } -func (m *MetricValue) CheckValidity() (err error) { +func (m *MetricValue) CheckValidity(now int64) (err error) { if m == nil { err = fmt.Errorf("item is nil") return @@ -86,7 +85,7 @@ func (m *MetricValue) CheckValidity() (err error) { if m.CounterType == "" { m.CounterType = GAUGE } - if m.CounterType != COUNTER && m.CounterType != GAUGE && m.CounterType != DERIVE { + if m.CounterType != GAUGE { err = fmt.Errorf("wrong counter type") return } @@ -108,16 +107,24 @@ func (m *MetricValue) CheckValidity() (err error) { } } - m.Tags = SortedTags(m.TagsMap) - // TODO(): why 510? - if len(m.Metric)+len(m.Tags) > 510 { - err = fmt.Errorf("len(m.Metric)+len(m.Tags) is too large") + if len(m.Metric) > 255 { + err = fmt.Errorf("len(m.Metric) is too large") return } - //规范时间戳 - now := time.Now().Unix() - if m.Timestamp <= 0 || m.Timestamp > now*2 { + m.Tags = SortedTags(m.TagsMap) + if len(m.Tags) > 255 { + err = fmt.Errorf("len(m.Tags) is too large") + return + } + + //时间超前5分钟则报错 + if m.Timestamp-now > 300 { + err = fmt.Errorf("point timestamp:%d is ahead of now:%d") + return + } + + if m.Timestamp <= 0 { m.Timestamp = now } diff --git a/src/modules/collector/sys/funcs/push.go b/src/modules/collector/sys/funcs/push.go index c0f66602..ad4f323e 100644 --- a/src/modules/collector/sys/funcs/push.go +++ b/src/modules/collector/sys/funcs/push.go @@ -22,12 +22,14 @@ import ( func Push(metricItems []*dataobj.MetricValue) error { var err error var items []*dataobj.MetricValue + now := time.Now().Unix() + for _, item := range metricItems { logger.Debug("->recv: ", item) if item.Endpoint == "" { item.Endpoint = identity.Identity } - err = item.CheckValidity() + err = item.CheckValidity(now) if err != nil { msg := fmt.Errorf("metric:%v err:%v", item, err) logger.Warning(msg) diff --git a/src/modules/transfer/http/routes/push_router.go b/src/modules/transfer/http/routes/push_router.go index a924b47e..a9d08b6f 100644 --- a/src/modules/transfer/http/routes/push_router.go +++ b/src/modules/transfer/http/routes/push_router.go @@ -2,6 +2,7 @@ package routes import ( "fmt" + "time" "github.com/didi/nightingale/src/dataobj" "github.com/didi/nightingale/src/modules/transfer/backend" @@ -14,6 +15,7 @@ import ( ) func PushData(c *gin.Context) { + now := time.Now().Unix() if c.Request.ContentLength == 0 { render.Message(c, "blank body") return @@ -28,7 +30,7 @@ func PushData(c *gin.Context) { logger.Debug("->recv: ", v) stats.Counter.Set("points.in", 1) - err := v.CheckValidity() + err := v.CheckValidity(now) if err != nil { stats.Counter.Set("points.in.err", 1) msg += fmt.Sprintf("recv metric %v err:%v\n", v, err) diff --git a/src/modules/transfer/rpc/push.go b/src/modules/transfer/rpc/push.go index d2fcf74f..e54369ad 100644 --- a/src/modules/transfer/rpc/push.go +++ b/src/modules/transfer/rpc/push.go @@ -24,7 +24,7 @@ func (t *Transfer) Push(args []*dataobj.MetricValue, reply *dataobj.TransferResp for _, v := range args { logger.Debug("->recv: ", v) stats.Counter.Set("points.in", 1) - if err := v.CheckValidity(); err != nil { + if err := v.CheckValidity(start.Unix()); err != nil { stats.Counter.Set("points.in.err", 1) msg := fmt.Sprintf("illegal item:%s err:%v", v, err) logger.Warningf(msg)