From e73da37bc0630210bd34da94b537aeb4ef5e5f78 Mon Sep 17 00:00:00 2001 From: Ulric Qin Date: Thu, 14 Apr 2022 11:11:14 +0800 Subject: [PATCH] first time: code refactor for pr 906 --- src/server/writer/writer.go | 46 +++++++++---------------------------- 1 file changed, 11 insertions(+), 35 deletions(-) diff --git a/src/server/writer/writer.go b/src/server/writer/writer.go index 5b28e13e..239266ba 100644 --- a/src/server/writer/writer.go +++ b/src/server/writer/writer.go @@ -4,12 +4,13 @@ import ( "bytes" "context" "fmt" - cmap "github.com/orcaman/concurrent-map" "net" "net/http" "sync" "time" + cmap "github.com/orcaman/concurrent-map" + "github.com/golang/protobuf/proto" "github.com/golang/snappy" "github.com/prometheus/client_golang/api" @@ -48,7 +49,7 @@ type WriterType struct { var lock = sync.RWMutex{} -func (w WriterType) Write(items []*prompb.TimeSeries) { +func (w WriterType) Write(items []*prompb.TimeSeries, headers ...map[string]string) { if len(items) == 0 { return } @@ -63,34 +64,13 @@ func (w WriterType) Write(items []*prompb.TimeSeries) { return } - if err := w.Post(snappy.Encode(nil, data), nil); err != nil { + if err := w.Post(snappy.Encode(nil, data), headers...); err != nil { logger.Warningf("post to %s got error: %v", w.Opts.Url, err) logger.Warning("example timeseries:", items[0].String()) } } -func (w WriterType) WriteWithHeader(items []*prompb.TimeSeries, headerMap map[string]string) { - if len(items) == 0 { - return - } - - req := &prompb.WriteRequest{ - Timeseries: items, - } - - data, err := proto.Marshal(req) - if err != nil { - logger.Warningf("marshal prom data to proto got error: %v, data: %+v", err, items) - return - } - - if err := w.Post(snappy.Encode(nil, data), headerMap); err != nil { - logger.Warningf("post to %s got error: %v", w.Opts.Url, err) - logger.Warning("example timeseries:", items[0].String()) - } -} - -func (w WriterType) Post(req []byte, headerMap map[string]string) error { +func (w WriterType) Post(req []byte, headers ...map[string]string) error { httpReq, err := http.NewRequest("POST", w.Opts.Url, bytes.NewReader(req)) if err != nil { logger.Warningf("create remote write request got error: %s", err.Error()) @@ -102,8 +82,8 @@ func (w WriterType) Post(req []byte, headerMap map[string]string) error { httpReq.Header.Set("User-Agent", "n9e") httpReq.Header.Set("X-Prometheus-Remote-Write-Version", "0.1.0") - if headerMap != nil { - for k, v := range headerMap { + if len(headers) > 0 { + for k, v := range headers[0] { httpReq.Header.Set(k, v) } } @@ -229,23 +209,19 @@ func (ws *WritersType) InitIdentChanWorker(ident string, data chan *prompb.TimeS // // postPrometheus 发送数据至prometheus // @Author: quzhihao -// @Description: -// @receiver ws -// @param ident -// @param series // func (ws *WritersType) postPrometheus(ident string, series []*prompb.TimeSeries) { - // 发送至prom wg := sync.WaitGroup{} wg.Add(len(ws.m)) - headerMap := make(map[string]string, 1) - headerMap["ident"] = ident + + headers := map[string]string{"ident": ident} for key := range ws.m { go func(key string) { defer wg.Done() - ws.m[key].WriteWithHeader(series, headerMap) + ws.m[key].Write(series, headers) }(key) } + wg.Wait() }