perf(opentsdb): 数据拉取以ident分发,并把list方式改为chan方式,提高消费效率。如果有多个prometheus实例,也可以通过header中的Ident字段进行一致性hash分发。 (#906)

Co-authored-by: zhihao.qu <zhihao.qu@ly.com>
This commit is contained in:
qzh 2022-04-14 10:31:36 +08:00 committed by GitHub
parent 42a6be95e8
commit 3d587a5762
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 153 additions and 12 deletions

View File

@ -164,7 +164,6 @@ func handleOpenTSDB(c *gin.Context) {
succ int
fail int
msg = "data pushed to queue"
list = make([]interface{}, 0, len(arr))
ts = time.Now().Unix()
ids = make(map[string]interface{})
)
@ -191,17 +190,19 @@ func handleOpenTSDB(c *gin.Context) {
if has {
common.AppendLabels(pt, target)
}
// 更改分发方式通过ident分发
writer.Writers.PushIdentChan(host, pt)
} else {
// 如果没有则默认放入指标名前缀的chan中
ident := arr[i].Metric[0:strings.Index(arr[i].Metric, "_")]
writer.Writers.PushIdentChan(ident, pt)
}
list = append(list, pt)
succ++
}
if len(list) > 0 {
promstat.CounterSampleTotal.WithLabelValues(config.C.ClusterName, "opentsdb").Add(float64(len(list)))
if !writer.Writers.PushQueue(list) {
msg = "writer queue full"
}
if succ > 0 {
promstat.CounterSampleTotal.WithLabelValues(config.C.ClusterName, "opentsdb").Add(float64(succ))
idents.Idents.MSet(ids)
}

View File

@ -4,8 +4,10 @@ import (
"bytes"
"context"
"fmt"
cmap "github.com/orcaman/concurrent-map"
"net"
"net/http"
"sync"
"time"
"github.com/golang/protobuf/proto"
@ -44,6 +46,8 @@ type WriterType struct {
Client api.Client
}
var lock = sync.RWMutex{}
func (w WriterType) Write(items []*prompb.TimeSeries) {
if len(items) == 0 {
return
@ -59,13 +63,34 @@ func (w WriterType) Write(items []*prompb.TimeSeries) {
return
}
if err := w.Post(snappy.Encode(nil, data)); err != nil {
if err := w.Post(snappy.Encode(nil, data), nil); err != nil {
logger.Warningf("post to %s got error: %v", w.Opts.Url, err)
logger.Warning("example timeseries:", items[0].String())
}
}
func (w WriterType) Post(req []byte) error {
func (w WriterType) WriteWithHeader(items []*prompb.TimeSeries, headerMap map[string]string) {
if len(items) == 0 {
return
}
req := &prompb.WriteRequest{
Timeseries: items,
}
data, err := proto.Marshal(req)
if err != nil {
logger.Warningf("marshal prom data to proto got error: %v, data: %+v", err, items)
return
}
if err := w.Post(snappy.Encode(nil, data), headerMap); err != nil {
logger.Warningf("post to %s got error: %v", w.Opts.Url, err)
logger.Warning("example timeseries:", items[0].String())
}
}
func (w WriterType) Post(req []byte, headerMap map[string]string) error {
httpReq, err := http.NewRequest("POST", w.Opts.Url, bytes.NewReader(req))
if err != nil {
logger.Warningf("create remote write request got error: %s", err.Error())
@ -77,6 +102,12 @@ func (w WriterType) Post(req []byte) error {
httpReq.Header.Set("User-Agent", "n9e")
httpReq.Header.Set("X-Prometheus-Remote-Write-Version", "0.1.0")
if headerMap != nil {
for k, v := range headerMap {
httpReq.Header.Set(k, v)
}
}
if w.Opts.BasicAuthUser != "" {
httpReq.SetBasicAuth(w.Opts.BasicAuthUser, w.Opts.BasicAuthPass)
}
@ -96,9 +127,10 @@ func (w WriterType) Post(req []byte) error {
}
type WritersType struct {
globalOpt GlobalOpt
m map[string]WriterType
queue *list.SafeListLimited
globalOpt GlobalOpt
m map[string]WriterType
queue *list.SafeListLimited
IdentChanMap cmap.ConcurrentMap
}
func (ws *WritersType) Put(name string, writer WriterType) {
@ -109,6 +141,114 @@ func (ws *WritersType) PushQueue(vs []interface{}) bool {
return ws.queue.PushFrontBatch(vs)
}
//
// PushIdentChan 放入chan 以ident分发
// @Author: quzhihao
// @Description:
// @receiver ws
// @param ident
// @param vs
//
func (ws *WritersType) PushIdentChan(ident string, vs interface{}) {
if !ws.IdentChanMap.Has(ident) {
lock.Lock()
if !ws.IdentChanMap.Has(ident) {
c := make(chan *prompb.TimeSeries, Writers.globalOpt.QueueMaxSize)
ws.IdentChanMap.Set(ident, c)
go func() {
ws.InitIdentChanWorker(ident, c)
}()
}
lock.Unlock()
}
// 往chan扔会导致内存不断增大如果写入阻塞了需要提示
c, ok := ws.IdentChanMap.Get(ident)
ch := c.(chan *prompb.TimeSeries)
if ok {
select {
case ch <- vs.(*prompb.TimeSeries):
case <-time.After(time.Duration(200) * time.Millisecond):
logger.Warningf("[%s] Write IdentChanMap Full, DropSize: %d", ident, len(ch))
}
}
}
//
// InitIdentChanWorker 初始化ident消费者
// @Author: quzhihao
// @Description:
// @receiver ws
// @param ident
// @param data
//
func (ws *WritersType) InitIdentChanWorker(ident string, data chan *prompb.TimeSeries) {
popCounter := 0
batch := ws.globalOpt.QueuePopSize
if batch <= 0 {
batch = 1000
}
logger.Infof("[%s] Start Ident Chan Worker, MaxSize:%d, batchSize:%d", ident, ws.globalOpt.QueueMaxSize, batch)
series := make([]*prompb.TimeSeries, 0, batch)
closePrepareCounter := 0
for {
select {
case item := <-data:
closePrepareCounter = 0
series = append(series, item)
popCounter++
if popCounter >= ws.globalOpt.QueuePopSize {
popCounter = 0
// 发送到prometheus
ws.postPrometheus(ident, series)
series = make([]*prompb.TimeSeries, 0, batch)
}
case <-time.After(10 * time.Second):
// 10秒清空一下如果有数据的话
if len(series) > 0 {
ws.postPrometheus(ident, series)
series = make([]*prompb.TimeSeries, 0, batch)
closePrepareCounter = 0
} else {
closePrepareCounter++
}
// 一小时没数据就关闭chan
if closePrepareCounter > 6*60 {
logger.Infof("[%s] Ident Chan Closing. Reason: No Data For An Hour.", ident)
lock.Lock()
close(data)
// 移除
ws.IdentChanMap.Remove(ident)
lock.Unlock()
logger.Infof("[%s] Ident Chan Closed Success.", ident)
return
}
}
}
}
//
// postPrometheus 发送数据至prometheus
// @Author: quzhihao
// @Description:
// @receiver ws
// @param ident
// @param series
//
func (ws *WritersType) postPrometheus(ident string, series []*prompb.TimeSeries) {
// 发送至prom
wg := sync.WaitGroup{}
wg.Add(len(ws.m))
headerMap := make(map[string]string, 1)
headerMap["ident"] = ident
for key := range ws.m {
go func(key string) {
defer wg.Done()
ws.m[key].WriteWithHeader(series, headerMap)
}(key)
}
wg.Wait()
}
func (ws *WritersType) Writes() {
batch := ws.globalOpt.QueuePopSize
if batch <= 0 {