diff --git a/src/modules/transfer/backend/m3db/m3db.go b/src/modules/transfer/backend/m3db/m3db.go index 7a66f67a..e4467c74 100644 --- a/src/modules/transfer/backend/m3db/m3db.go +++ b/src/modules/transfer/backend/m3db/m3db.go @@ -3,6 +3,7 @@ package m3db import ( "fmt" "sync" + "sync/atomic" "time" "github.com/didi/nightingale/src/common/dataobj" @@ -89,23 +90,31 @@ func (p *Client) Push2Queue(items []*dataobj.MetricValue) { logger.Errorf("unable to get m3db session: %s", err) return } - - errCnt := 0 + var errCnt int32 + var ( + wg sync.WaitGroup + ) for _, item := range items { - if err := session.WriteTagged( - p.namespaceID, - mvID(item), - ident.NewTagsIterator(mvTags(item)), - time.Unix(item.Timestamp, 0), - item.Value, - xtime.Second, - nil, - ); err != nil { - logger.Errorf("unable to writeTagged: %s", err) - errCnt++ - } + wg.Add(1) + go func(dm *dataobj.MetricValue) { + err := session.WriteTagged( + p.namespaceID, + mvID(dm), + ident.NewTagsIterator(mvTags(dm)), + time.Unix(dm.Timestamp, 0), + dm.Value, + xtime.Second, + nil) + if err != nil { + logger.Errorf("unable to writeTagged: %s", err) + atomic.AddInt32(&errCnt, 1) + } + wg.Done() + }(item) + } - stats.Counter.Set("m3db.queue.err", errCnt) + wg.Wait() + stats.Counter.Set("m3db.queue.err", int(errCnt)) } // QueryData: || (|| endpoints...) (&& tags...)