second time: code refactor for pr 906. new concurrent-map when init; move lock to WritersType

This commit is contained in:
Ulric Qin 2022-04-14 12:43:39 +08:00
parent e73da37bc0
commit 8d6101ec5a
3 changed files with 81 additions and 75 deletions

View File

@ -101,6 +101,18 @@ func MustLoad(fpaths ...string) {
}
}
if C.WriterOpt.QueueMaxSize <= 0 {
C.WriterOpt.QueueMaxSize = 10000000
}
if C.WriterOpt.QueuePopSize <= 0 {
C.WriterOpt.QueuePopSize = 1000
}
if C.WriterOpt.SleepInterval <= 0 {
C.WriterOpt.SleepInterval = 50
}
fmt.Println("heartbeat.ip:", C.Heartbeat.IP)
fmt.Printf("heartbeat.interval: %dms\n", C.Heartbeat.Interval)
})

View File

@ -190,12 +190,10 @@ func handleOpenTSDB(c *gin.Context) {
if has {
common.AppendLabels(pt, target)
}
// 更改分发方式通过ident分发
writer.Writers.PushIdentChan(host, pt)
writer.Writers.PushSample(host, pt)
} else {
// 如果没有则默认放入指标名前缀的chan中
ident := arr[i].Metric[0:strings.Index(arr[i].Metric, "_")]
writer.Writers.PushIdentChan(ident, pt)
writer.Writers.PushSample("default_hash_string", pt)
}
succ++
@ -203,7 +201,6 @@ func handleOpenTSDB(c *gin.Context) {
if succ > 0 {
promstat.CounterSampleTotal.WithLabelValues(config.C.ClusterName, "opentsdb").Add(float64(succ))
idents.Idents.MSet(ids)
}

View File

@ -47,8 +47,6 @@ type WriterType struct {
Client api.Client
}
var lock = sync.RWMutex{}
func (w WriterType) Write(items []*prompb.TimeSeries, headers ...map[string]string) {
if len(items) == 0 {
return
@ -107,10 +105,11 @@ func (w WriterType) Post(req []byte, headers ...map[string]string) error {
}
type WritersType struct {
globalOpt GlobalOpt
m map[string]WriterType
queue *list.SafeListLimited
IdentChanMap cmap.ConcurrentMap
globalOpt GlobalOpt
m map[string]WriterType
queue *list.SafeListLimited
chans cmap.ConcurrentMap
sync.RWMutex
}
func (ws *WritersType) Put(name string, writer WriterType) {
@ -121,99 +120,96 @@ func (ws *WritersType) PushQueue(vs []interface{}) bool {
return ws.queue.PushFrontBatch(vs)
}
//
// PushIdentChan 放入chan 以ident分发
// PushSample Push one sample to chan, hash by ident
// @Author: quzhihao
// @Description:
// @receiver ws
// @param ident
// @param vs
//
func (ws *WritersType) PushIdentChan(ident string, vs interface{}) {
if !ws.IdentChanMap.Has(ident) {
lock.Lock()
if !ws.IdentChanMap.Has(ident) {
func (ws *WritersType) PushSample(ident string, v interface{}) {
if !ws.chans.Has(ident) {
ws.Lock()
// important: check twice
if !ws.chans.Has(ident) {
c := make(chan *prompb.TimeSeries, Writers.globalOpt.QueueMaxSize)
ws.IdentChanMap.Set(ident, c)
go func() {
ws.InitIdentChanWorker(ident, c)
}()
ws.chans.Set(ident, c)
go ws.StartConsumer(ident, c)
}
lock.Unlock()
ws.Unlock()
}
// 往chan扔会导致内存不断增大如果写入阻塞了需要提示
c, ok := ws.IdentChanMap.Get(ident)
ch := c.(chan *prompb.TimeSeries)
c, ok := ws.chans.Get(ident)
if ok {
ch := c.(chan *prompb.TimeSeries)
select {
case ch <- vs.(*prompb.TimeSeries):
case <-time.After(time.Duration(200) * time.Millisecond):
logger.Warningf("[%s] Write IdentChanMap Full, DropSize: %d", ident, len(ch))
case ch <- v.(*prompb.TimeSeries):
default:
logger.Warningf("Write channel(%s) full, current channel size: %d", ident, len(ch))
}
}
}
//
// InitIdentChanWorker 初始化ident消费者
// StartConsumer every ident channel has a consumer, start it
// @Author: quzhihao
// @Description:
// @receiver ws
// @param ident
// @param data
//
func (ws *WritersType) InitIdentChanWorker(ident string, data chan *prompb.TimeSeries) {
popCounter := 0
batch := ws.globalOpt.QueuePopSize
if batch <= 0 {
batch = 1000
}
logger.Infof("[%s] Start Ident Chan Worker, MaxSize:%d, batchSize:%d", ident, ws.globalOpt.QueueMaxSize, batch)
series := make([]*prompb.TimeSeries, 0, batch)
closePrepareCounter := 0
func (ws *WritersType) StartConsumer(ident string, ch chan *prompb.TimeSeries) {
var (
batch = ws.globalOpt.QueuePopSize
max = ws.globalOpt.QueueMaxSize
batchCounter int
closeCounter int
series = make([]*prompb.TimeSeries, 0, batch)
)
logger.Infof("Starting channel(%s) consumer, max size:%d, batch:%d", ident, max, batch)
for {
select {
case item := <-data:
closePrepareCounter = 0
case item := <-ch:
// has data, no need to close
closeCounter = 0
series = append(series, item)
popCounter++
if popCounter >= ws.globalOpt.QueuePopSize {
popCounter = 0
// 发送到prometheus
ws.postPrometheus(ident, series)
batchCounter++
if batchCounter >= ws.globalOpt.QueuePopSize {
ws.post(ident, series)
// reset
batchCounter = 0
series = make([]*prompb.TimeSeries, 0, batch)
}
case <-time.After(10 * time.Second):
// 10秒清空一下如果有数据的话
case <-time.After(time.Second):
if len(series) > 0 {
ws.postPrometheus(ident, series)
// has data, no need to close
closeCounter = 0
ws.post(ident, series)
// reset
batchCounter = 0
series = make([]*prompb.TimeSeries, 0, batch)
closePrepareCounter = 0
} else {
closePrepareCounter++
closeCounter++
}
// 一小时没数据就关闭chan
if closePrepareCounter > 6*60 {
logger.Infof("[%s] Ident Chan Closing. Reason: No Data For An Hour.", ident)
lock.Lock()
close(data)
// 移除
ws.IdentChanMap.Remove(ident)
lock.Unlock()
logger.Infof("[%s] Ident Chan Closed Success.", ident)
if closeCounter > 3600 {
logger.Infof("Closing channel(%s) reason: no data for an hour", ident)
ws.Lock()
close(ch)
ws.chans.Remove(ident)
ws.Unlock()
logger.Infof("Closed channel(%s) reason: no data for an hour", ident)
return
}
}
}
}
//
// postPrometheus 发送数据至prometheus
// post post series to TSDB
// @Author: quzhihao
//
func (ws *WritersType) postPrometheus(ident string, series []*prompb.TimeSeries) {
func (ws *WritersType) post(ident string, series []*prompb.TimeSeries) {
wg := sync.WaitGroup{}
wg.Add(len(ws.m))
// maybe as backend hashstring
headers := map[string]string{"ident": ident}
for key := range ws.m {
go func(key string) {
@ -272,6 +268,7 @@ var Writers = NewWriters()
func Init(opts []Options, globalOpt GlobalOpt) error {
Writers.globalOpt = globalOpt
Writers.queue = list.NewSafeListLimited(globalOpt.QueueMaxSize)
Writers.chans = cmap.New()
for i := 0; i < len(opts); i++ {
cli, err := api.NewClient(api.Config{