forward samples in sequence

This commit is contained in:
Ulric Qin 2022-06-10 14:20:18 +08:00
parent a8c60c9f2b
commit ea2249c30c
2 changed files with 20 additions and 4 deletions

View File

@ -180,8 +180,8 @@ BasicAuthUser = ""
# Basic auth password
BasicAuthPass = ""
# timeout settings, unit: ms
Timeout = 30000
DialTimeout = 10000
Timeout = 10000
DialTimeout = 3000
TLSHandshakeTimeout = 30000
ExpectContinueTimeout = 1000
IdleConnTimeout = 90000

View File

@ -7,6 +7,7 @@ import (
"hash/crc32"
"net"
"net/http"
"sync"
"time"
"github.com/didi/nightingale/v5/src/server/config"
@ -143,8 +144,23 @@ func (ws *WritersType) StartConsumer(index int, ch chan *prompb.TimeSeries) {
// @Author: quzhihao
func (ws *WritersType) post(index int, series []*prompb.TimeSeries) {
header := map[string]string{"hash": fmt.Sprintf("%s-%d", config.C.Heartbeat.Endpoint, index)}
for key := range ws.backends {
go ws.backends[key].Write(series, header)
if len(ws.backends) == 1 {
for key := range ws.backends {
ws.backends[key].Write(series, header)
}
return
}
if len(ws.backends) > 1 {
wg := new(sync.WaitGroup)
for key := range ws.backends {
wg.Add(1)
go func(wg *sync.WaitGroup, backend WriterType, items []*prompb.TimeSeries, header map[string]string) {
defer wg.Done()
backend.Write(series, header)
}(wg, ws.backends[key], series, header)
}
wg.Wait()
}
}