From 6b1e432f6d8b70e9b5c962ceb8e6d20b8f774458 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=87=95=E5=B0=8F=E4=B9=99?= <907974064@qq.com> Date: Fri, 15 Jan 2021 09:16:06 +0800 Subject: [PATCH] =?UTF-8?q?m3db=20writetagged=E5=BA=94=E8=AF=A5=E5=B9=B6?= =?UTF-8?q?=E5=8F=91=E5=81=9A=EF=BC=8C=E4=B8=8D=E7=84=B6=E4=BC=9A=E5=AF=BC?= =?UTF-8?q?=E8=87=B4transfer=20rpc=E5=8F=98=E6=85=A2=20(#514)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * m3db writetagged应该并发做,不然会导致transfer rpc变慢 * go func指针传参问题 --- src/modules/transfer/backend/m3db/m3db.go | 39 ++++++++++++++--------- 1 file changed, 24 insertions(+), 15 deletions(-) diff --git a/src/modules/transfer/backend/m3db/m3db.go b/src/modules/transfer/backend/m3db/m3db.go index 7a66f67a..e4467c74 100644 --- a/src/modules/transfer/backend/m3db/m3db.go +++ b/src/modules/transfer/backend/m3db/m3db.go @@ -3,6 +3,7 @@ package m3db import ( "fmt" "sync" + "sync/atomic" "time" "github.com/didi/nightingale/src/common/dataobj" @@ -89,23 +90,31 @@ func (p *Client) Push2Queue(items []*dataobj.MetricValue) { logger.Errorf("unable to get m3db session: %s", err) return } - - errCnt := 0 + var errCnt int32 + var ( + wg sync.WaitGroup + ) for _, item := range items { - if err := session.WriteTagged( - p.namespaceID, - mvID(item), - ident.NewTagsIterator(mvTags(item)), - time.Unix(item.Timestamp, 0), - item.Value, - xtime.Second, - nil, - ); err != nil { - logger.Errorf("unable to writeTagged: %s", err) - errCnt++ - } + wg.Add(1) + go func(dm *dataobj.MetricValue) { + err := session.WriteTagged( + p.namespaceID, + mvID(dm), + ident.NewTagsIterator(mvTags(dm)), + time.Unix(dm.Timestamp, 0), + dm.Value, + xtime.Second, + nil) + if err != nil { + logger.Errorf("unable to writeTagged: %s", err) + atomic.AddInt32(&errCnt, 1) + } + wg.Done() + }(item) + } - stats.Counter.Set("m3db.queue.err", errCnt) + wg.Wait() + stats.Counter.Set("m3db.queue.err", int(errCnt)) } // QueryData: || (|| endpoints...) (&& tags...)