feat: add timer: CleanStalePoints
This commit is contained in:
parent
de99077b32
commit
98fe1e0121
|
@ -49,6 +49,7 @@ func Send(points []*vos.MetricPoint) {
|
||||||
// 这个监控数据没有关联任何告警策略,省事了不用处理
|
// 这个监控数据没有关联任何告警策略,省事了不用处理
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
logger.Debugf("[point_match_alertRules][point:%+v][alertRuleNum:%+v]", points[i], rulesCount)
|
logger.Debugf("[point_match_alertRules][point:%+v][alertRuleNum:%+v]", points[i], rulesCount)
|
||||||
// 不同的告警规则,alert_duration字段大小不同,找到最大的,按照最大的值来缓存历史数据
|
// 不同的告警规则,alert_duration字段大小不同,找到最大的,按照最大的值来缓存历史数据
|
||||||
var maxAliveDuration = 0
|
var maxAliveDuration = 0
|
||||||
|
@ -57,10 +58,6 @@ func Send(points []*vos.MetricPoint) {
|
||||||
maxAliveDuration = alertRules[j].AlertDuration
|
maxAliveDuration = alertRules[j].AlertDuration
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if len(points[i].PK) < 2 {
|
|
||||||
logger.Debugf("[point:%+v] len(pk)<2", points[i])
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
ll := PointCaches[points[i].PK[0:2]].PutPoint(points[i], int64(maxAliveDuration))
|
ll := PointCaches[points[i].PK[0:2]].PutPoint(points[i], int64(maxAliveDuration))
|
||||||
|
|
||||||
|
|
|
@ -17,6 +17,7 @@ package judge
|
||||||
import (
|
import (
|
||||||
"container/list"
|
"container/list"
|
||||||
"sync"
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/didi/nightingale/v5/vos"
|
"github.com/didi/nightingale/v5/vos"
|
||||||
)
|
)
|
||||||
|
@ -50,7 +51,7 @@ func (pc *PointCache) Len() int {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (pc *PointCache) CleanStale(before int64) {
|
func (pc *PointCache) CleanStale(before int64) {
|
||||||
keys := []string{}
|
var keys []string
|
||||||
|
|
||||||
pc.RLock()
|
pc.RLock()
|
||||||
for key, L := range pc.M {
|
for key, L := range pc.M {
|
||||||
|
@ -97,12 +98,25 @@ func (pc *PointCache) PutPoint(p *vos.MetricPoint, maxAliveDuration int64) *Safe
|
||||||
|
|
||||||
// 这是个线程不安全的大Map,需要提前初始化好
|
// 这是个线程不安全的大Map,需要提前初始化好
|
||||||
var PointCaches = make(map[string]*PointCache)
|
var PointCaches = make(map[string]*PointCache)
|
||||||
|
var pointChars = []string{"0", "1", "2", "3", "4", "5", "6", "7", "8", "9", "a", "b", "c", "d", "e", "f"}
|
||||||
|
var pointHeadKeys = make([]string, 0, 256)
|
||||||
|
|
||||||
func initPointCaches() {
|
func initPointCaches() {
|
||||||
arr := []string{"0", "1", "2", "3", "4", "5", "6", "7", "8", "9", "a", "b", "c", "d", "e", "f"}
|
|
||||||
for i := 0; i < 16; i++ {
|
for i := 0; i < 16; i++ {
|
||||||
for j := 0; j < 16; j++ {
|
for j := 0; j < 16; j++ {
|
||||||
PointCaches[arr[i]+arr[j]] = NewPointCache()
|
pointHeadKeys = append(pointHeadKeys, pointChars[i]+pointChars[j])
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
for i := 0; i < 256; i++ {
|
||||||
|
PointCaches[pointHeadKeys[i]] = NewPointCache()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func CleanStalePoints() {
|
||||||
|
// 监控数据2天都没关联到任何告警策略,说明对应的告警策略已经删除了
|
||||||
|
before := time.Now().Unix() - 3600*24*2
|
||||||
|
for i := 0; i < 256; i++ {
|
||||||
|
PointCaches[pointHeadKeys[i]].CleanStale(before)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -38,10 +38,14 @@ func Start(ctx context.Context) {
|
||||||
os.Exit(1)
|
os.Exit(1)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// 启动心跳goroutinue,如果挂了,trans可以及时感知
|
||||||
go loopHeartbeat()
|
go loopHeartbeat()
|
||||||
|
|
||||||
// PULL型的策略不着急,等一段时间(等哈希环是稳态的)再开始周期性干活
|
// PULL型的策略不着急,等一段时间(等哈希环是稳态的)再开始周期性干活
|
||||||
go syncPullRules(ctx)
|
go syncPullRules(ctx)
|
||||||
|
|
||||||
|
// 告警策略删除之后,针对这些告警策略缓存的监控数据要被清理
|
||||||
|
go loopCleanStalePoints()
|
||||||
}
|
}
|
||||||
|
|
||||||
func syncPullRules(ctx context.Context) {
|
func syncPullRules(ctx context.Context) {
|
||||||
|
@ -102,3 +106,10 @@ func heartbeat(endpoint string) error {
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func loopCleanStalePoints() {
|
||||||
|
for {
|
||||||
|
time.Sleep(time.Hour)
|
||||||
|
CleanStalePoints()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in New Issue