support type: SUBTRACT
This commit is contained in:
parent
018d19857d
commit
0b4d1639c6
|
@ -10,10 +10,11 @@ import (
|
|||
)
|
||||
|
||||
const (
|
||||
GAUGE = "GAUGE"
|
||||
COUNTER = "COUNTER"
|
||||
DERIVE = "DERIVE"
|
||||
SPLIT = "/"
|
||||
GAUGE = "GAUGE"
|
||||
COUNTER = "COUNTER"
|
||||
SUBTRACT = "SUBTRACT"
|
||||
DERIVE = "DERIVE"
|
||||
SPLIT = "/"
|
||||
)
|
||||
|
||||
type MetricValue struct {
|
||||
|
|
|
@ -37,8 +37,14 @@ func Push(metricItems []*dataobj.MetricValue) error {
|
|||
continue
|
||||
}
|
||||
if item.CounterType == dataobj.COUNTER {
|
||||
if err := CounterToGauge(item); err != nil {
|
||||
logger.Warning(err)
|
||||
item = CounterToGauge(item)
|
||||
if item == nil {
|
||||
continue
|
||||
}
|
||||
}
|
||||
if item.CounterType == dataobj.SUBTRACT {
|
||||
item = SubtractToGauge(item)
|
||||
if item == nil {
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
@ -143,25 +149,59 @@ func rpcClient(addr string) (*rpc.Client, error) {
|
|||
return client, nil
|
||||
}
|
||||
|
||||
func CounterToGauge(item *dataobj.MetricValue) error {
|
||||
func CounterToGauge(item *dataobj.MetricValue) *dataobj.MetricValue {
|
||||
key := item.PK()
|
||||
|
||||
old, exists := cache.MetricHistory.Get(key)
|
||||
cache.MetricHistory.Set(key, *item)
|
||||
|
||||
if !exists {
|
||||
return fmt.Errorf("not found old item:%v", item)
|
||||
logger.Debugf("not found old item:%v, maybe this is the first item", item)
|
||||
return nil
|
||||
}
|
||||
|
||||
if old.Value > item.Value {
|
||||
return fmt.Errorf("item:%v old value:%v greater than new value:%v", item, old.Value, item.Value)
|
||||
logger.Warningf("item:%v old value:%v greater than new value:%v", item, old.Value, item.Value)
|
||||
return nil
|
||||
}
|
||||
|
||||
if old.Timestamp >= item.Timestamp {
|
||||
return fmt.Errorf("item:%v old timestamp:%v greater than new timestamp:%v", item, old.Timestamp, item.Timestamp)
|
||||
logger.Warningf("item:%v old timestamp:%v greater than new timestamp:%v", item, old.Timestamp, item.Timestamp)
|
||||
return nil
|
||||
}
|
||||
|
||||
item.ValueUntyped = (item.Value - old.Value) / float64(item.Timestamp-old.Timestamp)
|
||||
item.CounterType = dataobj.GAUGE
|
||||
return nil
|
||||
return item
|
||||
}
|
||||
|
||||
func SubtractToGauge(item *dataobj.MetricValue) *dataobj.MetricValue {
|
||||
key := item.PK()
|
||||
|
||||
old, exists := cache.MetricHistory.Get(key)
|
||||
cache.MetricHistory.Set(key, *item)
|
||||
|
||||
if !exists {
|
||||
logger.Debugf("not found old item:%v, maybe this is the first item", item)
|
||||
return nil
|
||||
}
|
||||
|
||||
if old.Value > item.Value {
|
||||
logger.Warningf("item:%v old value:%v greater than new value:%v", item, old.Value, item.Value)
|
||||
return nil
|
||||
}
|
||||
|
||||
if old.Timestamp >= item.Timestamp {
|
||||
logger.Warningf("item:%v old timestamp:%v greater than new timestamp:%v", item, old.Timestamp, item.Timestamp)
|
||||
return nil
|
||||
}
|
||||
|
||||
if old.Timestamp <= item.Timestamp-2*item.Step {
|
||||
logger.Warningf("item:%v old timestamp:%v too old <= %v = (new timestamp: %v - 2 * step: %v), maybe some point lost", item, old.Timestamp, item.Timestamp-2*item.Step, item.Timestamp, item.Step)
|
||||
return nil
|
||||
}
|
||||
|
||||
item.ValueUntyped = item.Value - old.Value
|
||||
item.CounterType = dataobj.GAUGE
|
||||
return item
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue