refactor: add timestamp check
This commit is contained in:
parent
f648a6c8c2
commit
7f1a947226
|
@ -7,7 +7,6 @@ import (
|
|||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
const (
|
||||
|
@ -61,7 +60,7 @@ func (m *MetricValue) PK() string {
|
|||
return ret.String()
|
||||
}
|
||||
|
||||
func (m *MetricValue) CheckValidity() (err error) {
|
||||
func (m *MetricValue) CheckValidity(now int64) (err error) {
|
||||
if m == nil {
|
||||
err = fmt.Errorf("item is nil")
|
||||
return
|
||||
|
@ -86,7 +85,7 @@ func (m *MetricValue) CheckValidity() (err error) {
|
|||
if m.CounterType == "" {
|
||||
m.CounterType = GAUGE
|
||||
}
|
||||
if m.CounterType != COUNTER && m.CounterType != GAUGE && m.CounterType != DERIVE {
|
||||
if m.CounterType != GAUGE {
|
||||
err = fmt.Errorf("wrong counter type")
|
||||
return
|
||||
}
|
||||
|
@ -108,16 +107,24 @@ func (m *MetricValue) CheckValidity() (err error) {
|
|||
}
|
||||
}
|
||||
|
||||
m.Tags = SortedTags(m.TagsMap)
|
||||
// TODO(): why 510?
|
||||
if len(m.Metric)+len(m.Tags) > 510 {
|
||||
err = fmt.Errorf("len(m.Metric)+len(m.Tags) is too large")
|
||||
if len(m.Metric) > 255 {
|
||||
err = fmt.Errorf("len(m.Metric) is too large")
|
||||
return
|
||||
}
|
||||
|
||||
//规范时间戳
|
||||
now := time.Now().Unix()
|
||||
if m.Timestamp <= 0 || m.Timestamp > now*2 {
|
||||
m.Tags = SortedTags(m.TagsMap)
|
||||
if len(m.Tags) > 255 {
|
||||
err = fmt.Errorf("len(m.Tags) is too large")
|
||||
return
|
||||
}
|
||||
|
||||
//时间超前5分钟则报错
|
||||
if m.Timestamp-now > 300 {
|
||||
err = fmt.Errorf("point timestamp:%d is ahead of now:%d")
|
||||
return
|
||||
}
|
||||
|
||||
if m.Timestamp <= 0 {
|
||||
m.Timestamp = now
|
||||
}
|
||||
|
||||
|
|
|
@ -22,12 +22,14 @@ import (
|
|||
func Push(metricItems []*dataobj.MetricValue) error {
|
||||
var err error
|
||||
var items []*dataobj.MetricValue
|
||||
now := time.Now().Unix()
|
||||
|
||||
for _, item := range metricItems {
|
||||
logger.Debug("->recv: ", item)
|
||||
if item.Endpoint == "" {
|
||||
item.Endpoint = identity.Identity
|
||||
}
|
||||
err = item.CheckValidity()
|
||||
err = item.CheckValidity(now)
|
||||
if err != nil {
|
||||
msg := fmt.Errorf("metric:%v err:%v", item, err)
|
||||
logger.Warning(msg)
|
||||
|
|
|
@ -2,6 +2,7 @@ package routes
|
|||
|
||||
import (
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/didi/nightingale/src/dataobj"
|
||||
"github.com/didi/nightingale/src/modules/transfer/backend"
|
||||
|
@ -14,6 +15,7 @@ import (
|
|||
)
|
||||
|
||||
func PushData(c *gin.Context) {
|
||||
now := time.Now().Unix()
|
||||
if c.Request.ContentLength == 0 {
|
||||
render.Message(c, "blank body")
|
||||
return
|
||||
|
@ -28,7 +30,7 @@ func PushData(c *gin.Context) {
|
|||
logger.Debug("->recv: ", v)
|
||||
stats.Counter.Set("points.in", 1)
|
||||
|
||||
err := v.CheckValidity()
|
||||
err := v.CheckValidity(now)
|
||||
if err != nil {
|
||||
stats.Counter.Set("points.in.err", 1)
|
||||
msg += fmt.Sprintf("recv metric %v err:%v\n", v, err)
|
||||
|
|
|
@ -24,7 +24,7 @@ func (t *Transfer) Push(args []*dataobj.MetricValue, reply *dataobj.TransferResp
|
|||
for _, v := range args {
|
||||
logger.Debug("->recv: ", v)
|
||||
stats.Counter.Set("points.in", 1)
|
||||
if err := v.CheckValidity(); err != nil {
|
||||
if err := v.CheckValidity(start.Unix()); err != nil {
|
||||
stats.Counter.Set("points.in.err", 1)
|
||||
msg := fmt.Sprintf("illegal item:%s err:%v", v, err)
|
||||
logger.Warningf(msg)
|
||||
|
|
Loading…
Reference in New Issue