From 0b2d4d6d1970f8c30127711440f8075c2966ef81 Mon Sep 17 00:00:00 2001 From: 710leo <710leo@gmail.com> Date: Thu, 16 Apr 2020 15:35:33 +0800 Subject: [PATCH] fix: concurrent map --- src/modules/index/cache/counter_map.go | 6 +++++- src/modules/index/cache/endpoint_map.go | 2 ++ src/modules/index/cache/indexdb.go | 6 ++++-- src/modules/index/http/routes/index_router.go | 8 ++++++++ 4 files changed, 19 insertions(+), 3 deletions(-) diff --git a/src/modules/index/cache/counter_map.go b/src/modules/index/cache/counter_map.go index 8a404784..926183c2 100644 --- a/src/modules/index/cache/counter_map.go +++ b/src/modules/index/cache/counter_map.go @@ -40,7 +40,11 @@ func (c *CounterTsMap) Clean(now, timeDuration int64, endpoint, metric string) { func (c *CounterTsMap) GetCounters() map[string]int64 { c.RLock() defer c.RUnlock() - return c.M + m := make(map[string]int64) + for k, v := range c.M { + m[k] = v + } + return m } func (c *CounterTsMap) Len() int { diff --git a/src/modules/index/cache/endpoint_map.go b/src/modules/index/cache/endpoint_map.go index 2d61343c..d63dcac0 100644 --- a/src/modules/index/cache/endpoint_map.go +++ b/src/modules/index/cache/endpoint_map.go @@ -38,7 +38,9 @@ func (e *EndpointIndexMap) Push(item dataobj.IndexModel, now int64) { metricIndexMap.SetMetricIndex(metric, NewMetricIndex(item, tags, now)) return } + metricIndexMap.Lock() metricIndex.Set(item, tags, now) + metricIndexMap.Unlock() } func (e *EndpointIndexMap) Clean(timeDuration int64) { diff --git a/src/modules/index/cache/indexdb.go b/src/modules/index/cache/indexdb.go index e565279d..467750f2 100644 --- a/src/modules/index/cache/indexdb.go +++ b/src/modules/index/cache/indexdb.go @@ -215,9 +215,11 @@ func WriteIndexToFile(indexDir, endpoint string) error { return fmt.Errorf("endpoint index doesn't found") } - metricIndexMap.Lock() + metricIndexMap.RLock() body, err := json.Marshal(metricIndexMap) - metricIndexMap.Unlock() + stats.Counter.Set("write.file", 1) + metricIndexMap.RUnlock() + if err != nil { return fmt.Errorf("marshal struct to json failed:%v", err) } diff --git a/src/modules/index/http/routes/index_router.go b/src/modules/index/http/routes/index_router.go index e3926e75..32b7c9b6 100644 --- a/src/modules/index/http/routes/index_router.go +++ b/src/modules/index/http/routes/index_router.go @@ -78,7 +78,9 @@ func DelCounter(c *gin.Context) { for _, tagPair := range recv.Tagkv { for _, v := range tagPair.Values { + metricIndex.Lock() metricIndex.TagkvMap.DelTag(tagPair.Key, v) + metricIndex.Unlock() } } } @@ -104,7 +106,9 @@ func GetTagPairs(c *gin.Context) { continue } + metricIndex.RLock() tagkvMap := metricIndex.TagkvMap.GetTagkvMap() + metricIndex.RUnlock() for tagk, tagvs := range tagkvMap { tagvFilter, exists := tagkvFilter[tagk] @@ -192,12 +196,14 @@ func GetIndexByFullTags(c *gin.Context) { continue } + metricIndex.RLock() if step == 0 || dsType == "" { step = metricIndex.Step dsType = metricIndex.DsType } countersMap := metricIndex.CounterMap.GetCounters() + metricIndex.RUnlock() tagPairs := cache.GetSortTags(cache.TagPairToMap(tagkv)) tags := cache.GetAllCounter(tagPairs) @@ -290,6 +296,7 @@ func GetIndexByClude(c *gin.Context) { continue } + metricIndex.RLock() if step == 0 || dsType == "" { step = metricIndex.Step dsType = metricIndex.DsType @@ -298,6 +305,7 @@ func GetIndexByClude(c *gin.Context) { // 校验和 tag 有关的 counter 是否存在 // 如果一个指标,比如 port.listen 有 name=uic,port=8056 和 name=hsp,port=8002。避免产生 4 个曲线 counterMap := metricIndex.CounterMap.GetCounters() + metricIndex.RUnlock() var err error var tags []string