first time: code refactor for pr 906

This commit is contained in:
Ulric Qin 2022-04-14 11:11:14 +08:00
parent 3d587a5762
commit e73da37bc0
1 changed files with 11 additions and 35 deletions

View File

@ -4,12 +4,13 @@ import (
"bytes"
"context"
"fmt"
cmap "github.com/orcaman/concurrent-map"
"net"
"net/http"
"sync"
"time"
cmap "github.com/orcaman/concurrent-map"
"github.com/golang/protobuf/proto"
"github.com/golang/snappy"
"github.com/prometheus/client_golang/api"
@ -48,7 +49,7 @@ type WriterType struct {
var lock = sync.RWMutex{}
func (w WriterType) Write(items []*prompb.TimeSeries) {
func (w WriterType) Write(items []*prompb.TimeSeries, headers ...map[string]string) {
if len(items) == 0 {
return
}
@ -63,34 +64,13 @@ func (w WriterType) Write(items []*prompb.TimeSeries) {
return
}
if err := w.Post(snappy.Encode(nil, data), nil); err != nil {
if err := w.Post(snappy.Encode(nil, data), headers...); err != nil {
logger.Warningf("post to %s got error: %v", w.Opts.Url, err)
logger.Warning("example timeseries:", items[0].String())
}
}
func (w WriterType) WriteWithHeader(items []*prompb.TimeSeries, headerMap map[string]string) {
if len(items) == 0 {
return
}
req := &prompb.WriteRequest{
Timeseries: items,
}
data, err := proto.Marshal(req)
if err != nil {
logger.Warningf("marshal prom data to proto got error: %v, data: %+v", err, items)
return
}
if err := w.Post(snappy.Encode(nil, data), headerMap); err != nil {
logger.Warningf("post to %s got error: %v", w.Opts.Url, err)
logger.Warning("example timeseries:", items[0].String())
}
}
func (w WriterType) Post(req []byte, headerMap map[string]string) error {
func (w WriterType) Post(req []byte, headers ...map[string]string) error {
httpReq, err := http.NewRequest("POST", w.Opts.Url, bytes.NewReader(req))
if err != nil {
logger.Warningf("create remote write request got error: %s", err.Error())
@ -102,8 +82,8 @@ func (w WriterType) Post(req []byte, headerMap map[string]string) error {
httpReq.Header.Set("User-Agent", "n9e")
httpReq.Header.Set("X-Prometheus-Remote-Write-Version", "0.1.0")
if headerMap != nil {
for k, v := range headerMap {
if len(headers) > 0 {
for k, v := range headers[0] {
httpReq.Header.Set(k, v)
}
}
@ -229,23 +209,19 @@ func (ws *WritersType) InitIdentChanWorker(ident string, data chan *prompb.TimeS
//
// postPrometheus 发送数据至prometheus
// @Author: quzhihao
// @Description:
// @receiver ws
// @param ident
// @param series
//
func (ws *WritersType) postPrometheus(ident string, series []*prompb.TimeSeries) {
// 发送至prom
wg := sync.WaitGroup{}
wg.Add(len(ws.m))
headerMap := make(map[string]string, 1)
headerMap["ident"] = ident
headers := map[string]string{"ident": ident}
for key := range ws.m {
go func(key string) {
defer wg.Done()
ws.m[key].WriteWithHeader(series, headerMap)
ws.m[key].Write(series, headers)
}(key)
}
wg.Wait()
}