third time: code refactor for pr 906. use channel as queue for all the receivers

This commit is contained in:
Ulric Qin 2022-04-14 12:57:30 +08:00
parent 8d6101ec5a
commit 3a97a67c7e
5 changed files with 24 additions and 83 deletions

View File

@ -127,8 +127,6 @@ func pushMetrics() {
}
}
var series []interface{}
// 有心跳target_up = 1
// 如果找到target就把target的tags补充到series上
// 如果没有target就在数据库创建target
@ -170,7 +168,7 @@ func pushMetrics() {
common.AppendLabels(pt, target)
}
series = append(series, pt)
writer.Writers.PushSample(active, pt)
}
// 把actives传给TargetCache看看除了active的部分还有别的target么有的话返回设置target_up = 0
@ -195,10 +193,6 @@ func pushMetrics() {
})
common.AppendLabels(pt, dead)
series = append(series, pt)
}
if !writer.Writers.PushQueue(series) {
logger.Errorf("handle_idents: writer queue full")
writer.Writers.PushSample(ident, pt)
}
}

View File

@ -230,7 +230,6 @@ func datadogSeries(c *gin.Context) {
succ int
fail int
msg = "data pushed to queue"
list []interface{}
ts = time.Now().Unix()
ids = make(map[string]interface{})
)
@ -262,18 +261,17 @@ func datadogSeries(c *gin.Context) {
if has {
common.AppendLabels(pt, target)
}
writer.Writers.PushSample(ident, pt)
} else {
writer.Writers.PushSample("default_hash_string", pt)
}
list = append(list, pt)
succ++
}
if len(list) > 0 {
promstat.CounterSampleTotal.WithLabelValues(config.C.ClusterName, "datadog").Add(float64(len(list)))
if !writer.Writers.PushQueue(list) {
msg = "writer queue full"
}
if succ > 0 {
promstat.CounterSampleTotal.WithLabelValues(config.C.ClusterName, "datadog").Add(float64(succ))
idents.Idents.MSet(ids)
}

View File

@ -181,7 +181,6 @@ func falconPush(c *gin.Context) {
succ int
fail int
msg = "data pushed to queue"
list = make([]interface{}, 0, len(arr))
ts = time.Now().Unix()
ids = make(map[string]interface{})
)
@ -207,18 +206,17 @@ func falconPush(c *gin.Context) {
if has {
common.AppendLabels(pt, target)
}
writer.Writers.PushSample(ident, pt)
} else {
writer.Writers.PushSample("default_hash_string", pt)
}
list = append(list, pt)
succ++
}
if len(list) > 0 {
promstat.CounterSampleTotal.WithLabelValues(config.C.ClusterName, "openfalcon").Add(float64(len(list)))
if !writer.Writers.PushQueue(list) {
msg = "writer queue full"
}
if succ > 0 {
promstat.CounterSampleTotal.WithLabelValues(config.C.ClusterName, "openfalcon").Add(float64(succ))
idents.Idents.MSet(ids)
}

View File

@ -61,7 +61,6 @@ func remoteWrite(c *gin.Context) {
var (
now = time.Now().Unix()
ids = make(map[string]interface{})
lst = make([]interface{}, count)
ident string
)
@ -95,18 +94,15 @@ func remoteWrite(c *gin.Context) {
if has {
common.AppendLabels(req.Timeseries[i], target)
}
}
lst[i] = req.Timeseries[i]
writer.Writers.PushSample(ident, req.Timeseries[i])
} else {
writer.Writers.PushSample("default_hash_string", req.Timeseries[i])
}
}
promstat.CounterSampleTotal.WithLabelValues(config.C.ClusterName, "prometheus").Add(float64(count))
idents.Idents.MSet(ids)
if writer.Writers.PushQueue(lst) {
c.String(200, "")
} else {
c.String(http.StatusInternalServerError, "writer queue full")
}
}
// DecodeWriteRequest from an io.Reader into a prompb.WriteRequest, handling

View File

@ -15,7 +15,6 @@ import (
"github.com/golang/snappy"
"github.com/prometheus/client_golang/api"
"github.com/prometheus/prometheus/prompb"
"github.com/toolkits/pkg/container/list"
"github.com/toolkits/pkg/logger"
)
@ -106,18 +105,13 @@ func (w WriterType) Post(req []byte, headers ...map[string]string) error {
type WritersType struct {
globalOpt GlobalOpt
m map[string]WriterType
queue *list.SafeListLimited
backends map[string]WriterType
chans cmap.ConcurrentMap
sync.RWMutex
}
func (ws *WritersType) Put(name string, writer WriterType) {
ws.m[name] = writer
}
func (ws *WritersType) PushQueue(vs []interface{}) bool {
return ws.queue.PushFrontBatch(vs)
ws.backends[name] = writer
}
// PushSample Push one sample to chan, hash by ident
@ -207,59 +201,23 @@ func (ws *WritersType) StartConsumer(ident string, ch chan *prompb.TimeSeries) {
// @Author: quzhihao
func (ws *WritersType) post(ident string, series []*prompb.TimeSeries) {
wg := sync.WaitGroup{}
wg.Add(len(ws.m))
wg.Add(len(ws.backends))
// maybe as backend hashstring
headers := map[string]string{"ident": ident}
for key := range ws.m {
for key := range ws.backends {
go func(key string) {
defer wg.Done()
ws.m[key].Write(series, headers)
ws.backends[key].Write(series, headers)
}(key)
}
wg.Wait()
}
func (ws *WritersType) Writes() {
batch := ws.globalOpt.QueuePopSize
if batch <= 0 {
batch = 2000
}
duration := time.Duration(ws.globalOpt.SleepInterval) * time.Millisecond
for {
items := ws.queue.PopBackBy(batch)
count := len(items)
if count == 0 {
time.Sleep(duration)
continue
}
series := make([]*prompb.TimeSeries, 0, count)
for i := 0; i < count; i++ {
item, ok := items[i].(*prompb.TimeSeries)
if !ok {
// in theory, it can be converted successfully
continue
}
series = append(series, item)
}
if len(series) == 0 {
continue
}
for key := range ws.m {
go ws.m[key].Write(series)
}
}
}
func NewWriters() WritersType {
return WritersType{
m: make(map[string]WriterType),
backends: make(map[string]WriterType),
}
}
@ -267,7 +225,6 @@ var Writers = NewWriters()
func Init(opts []Options, globalOpt GlobalOpt) error {
Writers.globalOpt = globalOpt
Writers.queue = list.NewSafeListLimited(globalOpt.QueueMaxSize)
Writers.chans = cmap.New()
for i := 0; i < len(opts); i++ {
@ -302,7 +259,5 @@ func Init(opts []Options, globalOpt GlobalOpt) error {
Writers.Put(opts[i].Url, writer)
}
go Writers.Writes()
return nil
}