add queue count control chan number

This commit is contained in:
Ulric Qin 2022-04-21 12:24:26 +08:00
parent 808fa5839a
commit 7a1a65c31b
2 changed files with 23 additions and 52 deletions

View File

@ -180,8 +180,10 @@ MaxIdleConns = 100
MaxIdleConnsPerHost = 10
[WriterOpt]
# queue channel count
QueueCount = 100
# queue max size
QueueMaxSize = 100000
QueueMaxSize = 200000
# once pop samples number from queue
QueuePopSize = 2000

View File

@ -4,13 +4,11 @@ import (
"bytes"
"context"
"fmt"
"hash/crc32"
"net"
"net/http"
"sync"
"time"
cmap "github.com/orcaman/concurrent-map"
"github.com/golang/protobuf/proto"
"github.com/golang/snappy"
"github.com/prometheus/client_golang/api"
@ -36,6 +34,7 @@ type Options struct {
}
type GlobalOpt struct {
QueueCount int
QueueMaxSize int
QueuePopSize int
}
@ -105,8 +104,7 @@ func (w WriterType) Post(req []byte, headers ...map[string]string) error {
type WritersType struct {
globalOpt GlobalOpt
backends map[string]WriterType
chans cmap.ConcurrentMap
sync.RWMutex
chans map[int]chan *prompb.TimeSeries
}
func (ws *WritersType) Put(name string, writer WriterType) {
@ -116,51 +114,36 @@ func (ws *WritersType) Put(name string, writer WriterType) {
// PushSample Push one sample to chan, hash by ident
// @Author: quzhihao
func (ws *WritersType) PushSample(ident string, v interface{}) {
if !ws.chans.Has(ident) {
ws.Lock()
// important: check twice
if !ws.chans.Has(ident) {
c := make(chan *prompb.TimeSeries, Writers.globalOpt.QueueMaxSize)
ws.chans.Set(ident, c)
go ws.StartConsumer(ident, c)
}
ws.Unlock()
}
hashkey := crc32.ChecksumIEEE([]byte(ident)) % uint32(ws.globalOpt.QueueCount)
c, ok := ws.chans.Get(ident)
c, ok := ws.chans[int(hashkey)]
if ok {
ch := c.(chan *prompb.TimeSeries)
select {
case ch <- v.(*prompb.TimeSeries):
case c <- v.(*prompb.TimeSeries):
default:
logger.Warningf("Write channel(%s) full, current channel size: %d", ident, len(ch))
logger.Warningf("Write channel(%s) full, current channel size: %d", ident, len(c))
}
}
}
// StartConsumer every ident channel has a consumer, start it
// @Author: quzhihao
func (ws *WritersType) StartConsumer(ident string, ch chan *prompb.TimeSeries) {
func (ws *WritersType) StartConsumer(ch chan *prompb.TimeSeries) {
var (
batch = ws.globalOpt.QueuePopSize
max = ws.globalOpt.QueueMaxSize
batchCounter int
closeCounter int
series = make([]*prompb.TimeSeries, 0, batch)
batchCounter int
)
logger.Infof("Starting channel(%s) consumer, max size:%d, batch:%d", ident, max, batch)
for {
select {
case item := <-ch:
// has data, no need to close
closeCounter = 0
series = append(series, item)
batchCounter++
if batchCounter >= ws.globalOpt.QueuePopSize {
ws.post(ident, series)
ws.post(series)
// reset
batchCounter = 0
@ -168,29 +151,11 @@ func (ws *WritersType) StartConsumer(ident string, ch chan *prompb.TimeSeries) {
}
case <-time.After(time.Second):
if len(series) > 0 {
// has data, no need to close
closeCounter = 0
ws.post(ident, series)
ws.post(series)
// reset
batchCounter = 0
series = make([]*prompb.TimeSeries, 0, batch)
} else {
closeCounter++
}
if closeCounter > 3600 {
logger.Infof("Closing channel(%s) reason: no data for an hour", ident)
ws.Lock()
close(ch)
ws.chans.Remove(ident)
ws.Unlock()
logger.Infof("Closed channel(%s) reason: no data for an hour", ident)
return
}
}
}
@ -198,11 +163,9 @@ func (ws *WritersType) StartConsumer(ident string, ch chan *prompb.TimeSeries) {
// post post series to TSDB
// @Author: quzhihao
func (ws *WritersType) post(ident string, series []*prompb.TimeSeries) {
// maybe as backend hashstring
headers := map[string]string{"ident": ident}
func (ws *WritersType) post(series []*prompb.TimeSeries) {
for key := range ws.backends {
go ws.backends[key].Write(series, headers)
go ws.backends[key].Write(series)
}
}
@ -216,7 +179,13 @@ var Writers = NewWriters()
func Init(opts []Options, globalOpt GlobalOpt) error {
Writers.globalOpt = globalOpt
Writers.chans = cmap.New()
Writers.chans = make(map[int]chan *prompb.TimeSeries)
// init channels
for i := 0; i < globalOpt.QueueCount; i++ {
Writers.chans[i] = make(chan *prompb.TimeSeries, Writers.globalOpt.QueueMaxSize)
go Writers.StartConsumer(Writers.chans[i])
}
for i := 0; i < len(opts); i++ {
cli, err := api.NewClient(api.Config{