fix: concurrent map

This commit is contained in:
710leo 2020-04-16 15:35:33 +08:00
parent b1a0580603
commit 0b2d4d6d19
4 changed files with 19 additions and 3 deletions

View File

@ -40,7 +40,11 @@ func (c *CounterTsMap) Clean(now, timeDuration int64, endpoint, metric string) {
func (c *CounterTsMap) GetCounters() map[string]int64 { func (c *CounterTsMap) GetCounters() map[string]int64 {
c.RLock() c.RLock()
defer c.RUnlock() defer c.RUnlock()
return c.M m := make(map[string]int64)
for k, v := range c.M {
m[k] = v
}
return m
} }
func (c *CounterTsMap) Len() int { func (c *CounterTsMap) Len() int {

View File

@ -38,7 +38,9 @@ func (e *EndpointIndexMap) Push(item dataobj.IndexModel, now int64) {
metricIndexMap.SetMetricIndex(metric, NewMetricIndex(item, tags, now)) metricIndexMap.SetMetricIndex(metric, NewMetricIndex(item, tags, now))
return return
} }
metricIndexMap.Lock()
metricIndex.Set(item, tags, now) metricIndex.Set(item, tags, now)
metricIndexMap.Unlock()
} }
func (e *EndpointIndexMap) Clean(timeDuration int64) { func (e *EndpointIndexMap) Clean(timeDuration int64) {

View File

@ -215,9 +215,11 @@ func WriteIndexToFile(indexDir, endpoint string) error {
return fmt.Errorf("endpoint index doesn't found") return fmt.Errorf("endpoint index doesn't found")
} }
metricIndexMap.Lock() metricIndexMap.RLock()
body, err := json.Marshal(metricIndexMap) body, err := json.Marshal(metricIndexMap)
metricIndexMap.Unlock() stats.Counter.Set("write.file", 1)
metricIndexMap.RUnlock()
if err != nil { if err != nil {
return fmt.Errorf("marshal struct to json failed:%v", err) return fmt.Errorf("marshal struct to json failed:%v", err)
} }

View File

@ -78,7 +78,9 @@ func DelCounter(c *gin.Context) {
for _, tagPair := range recv.Tagkv { for _, tagPair := range recv.Tagkv {
for _, v := range tagPair.Values { for _, v := range tagPair.Values {
metricIndex.Lock()
metricIndex.TagkvMap.DelTag(tagPair.Key, v) metricIndex.TagkvMap.DelTag(tagPair.Key, v)
metricIndex.Unlock()
} }
} }
} }
@ -104,7 +106,9 @@ func GetTagPairs(c *gin.Context) {
continue continue
} }
metricIndex.RLock()
tagkvMap := metricIndex.TagkvMap.GetTagkvMap() tagkvMap := metricIndex.TagkvMap.GetTagkvMap()
metricIndex.RUnlock()
for tagk, tagvs := range tagkvMap { for tagk, tagvs := range tagkvMap {
tagvFilter, exists := tagkvFilter[tagk] tagvFilter, exists := tagkvFilter[tagk]
@ -192,12 +196,14 @@ func GetIndexByFullTags(c *gin.Context) {
continue continue
} }
metricIndex.RLock()
if step == 0 || dsType == "" { if step == 0 || dsType == "" {
step = metricIndex.Step step = metricIndex.Step
dsType = metricIndex.DsType dsType = metricIndex.DsType
} }
countersMap := metricIndex.CounterMap.GetCounters() countersMap := metricIndex.CounterMap.GetCounters()
metricIndex.RUnlock()
tagPairs := cache.GetSortTags(cache.TagPairToMap(tagkv)) tagPairs := cache.GetSortTags(cache.TagPairToMap(tagkv))
tags := cache.GetAllCounter(tagPairs) tags := cache.GetAllCounter(tagPairs)
@ -290,6 +296,7 @@ func GetIndexByClude(c *gin.Context) {
continue continue
} }
metricIndex.RLock()
if step == 0 || dsType == "" { if step == 0 || dsType == "" {
step = metricIndex.Step step = metricIndex.Step
dsType = metricIndex.DsType dsType = metricIndex.DsType
@ -298,6 +305,7 @@ func GetIndexByClude(c *gin.Context) {
// 校验和 tag 有关的 counter 是否存在 // 校验和 tag 有关的 counter 是否存在
// 如果一个指标,比如 port.listen 有 name=uic,port=8056 和 name=hsp,port=8002。避免产生 4 个曲线 // 如果一个指标,比如 port.listen 有 name=uic,port=8056 和 name=hsp,port=8002。避免产生 4 个曲线
counterMap := metricIndex.CounterMap.GetCounters() counterMap := metricIndex.CounterMap.GetCounters()
metricIndex.RUnlock()
var err error var err error
var tags []string var tags []string