categraf/logs/sender/batch_strategy.go

155 lines
4.6 KiB
Go

// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2016-present Datadog, Inc.
package sender
import (
"context"
"log"
"sync"
"time"
"flashcat.cloud/categraf/logs/message"
)
// batchStrategy contains all the logic to send logs in batch.
type batchStrategy struct {
buffer *MessageBuffer
// pipelineName provides a name for the strategy to differentiate it from other instances in other internal pipelines
pipelineName string
serializer Serializer
batchWait time.Duration
climit chan struct{} // semaphore for limiting concurrent sends
pendingSends sync.WaitGroup // waitgroup for concurrent sends
syncFlushTrigger chan struct{} // trigger a synchronous flush
syncFlushDone chan struct{} // wait for a synchronous flush to finish
}
// NewBatchStrategy returns a new batch concurrent strategy with the specified batch & content size limits
// If `maxConcurrent` > 0, then at most that many payloads will be sent concurrently, else there is no concurrency
// and the pipeline will block while sending each payload.
func NewBatchStrategy(serializer Serializer, batchWait time.Duration, maxConcurrent int, maxBatchSize int, maxContentSize int, pipelineName string) Strategy {
if maxConcurrent < 0 {
maxConcurrent = 0
}
return &batchStrategy{
buffer: NewMessageBuffer(maxBatchSize, maxContentSize),
serializer: serializer,
batchWait: batchWait,
climit: make(chan struct{}, maxConcurrent),
syncFlushTrigger: make(chan struct{}),
syncFlushDone: make(chan struct{}),
pipelineName: pipelineName,
}
}
func (s *batchStrategy) Flush(ctx context.Context) {
select {
case <-ctx.Done():
return
default:
s.syncFlushTrigger <- struct{}{}
<-s.syncFlushDone
}
}
func (s *batchStrategy) syncFlush(inputChan chan *message.Message, outputChan chan *message.Message, send func([]byte) error) {
defer func() {
s.flushBuffer(outputChan, send)
s.pendingSends.Wait()
}()
for {
select {
case m, isOpen := <-inputChan:
if !isOpen {
return
}
s.processMessage(m, outputChan, send)
default:
return
}
}
}
// Send accumulates messages to a buffer and sends them when the buffer is full or outdated.
func (s *batchStrategy) Send(inputChan chan *message.Message, outputChan chan *message.Message, send func([]byte) error) {
flushTicker := time.NewTicker(s.batchWait)
defer func() {
s.flushBuffer(outputChan, send)
flushTicker.Stop()
s.pendingSends.Wait()
}()
for {
select {
case m, isOpen := <-inputChan:
if !isOpen {
// inputChan has been closed, no more payloads are expected
return
}
s.processMessage(m, outputChan, send)
case <-flushTicker.C:
// the first message that was added to the buffer has been here for too long, send the payload now
s.flushBuffer(outputChan, send)
case <-s.syncFlushTrigger:
s.syncFlush(inputChan, outputChan, send)
s.syncFlushDone <- struct{}{}
}
}
}
func (s *batchStrategy) processMessage(m *message.Message, outputChan chan *message.Message, send func([]byte) error) {
if m.Origin != nil {
m.Origin.LogSource.LatencyStats.Add(m.GetLatency())
}
added := s.buffer.AddMessage(m)
if !added || s.buffer.IsFull() {
s.flushBuffer(outputChan, send)
}
if !added {
// it's possible that the m could not be added because the buffer was full
// so we need to retry once again
if !s.buffer.AddMessage(m) {
log.Printf("Dropped message in pipeline=%s reason=too-large ContentLength=%d ContentSizeLimit=%d\n", s.pipelineName, len(m.Content), s.buffer.ContentSizeLimit())
}
}
}
// flushBuffer sends all the messages that are stored in the buffer and forwards them
// to the next stage of the pipeline.
func (s *batchStrategy) flushBuffer(outputChan chan *message.Message, send func([]byte) error) {
if s.buffer.IsEmpty() {
return
}
messages := s.buffer.GetMessages()
s.buffer.Clear()
// if the channel is non-buffered then there is no concurrency and we block on sending each payload
if cap(s.climit) == 0 {
s.sendMessages(messages, outputChan, send)
return
}
s.climit <- struct{}{}
s.pendingSends.Add(1)
go func() {
s.sendMessages(messages, outputChan, send)
s.pendingSends.Done()
<-s.climit
}()
}
func (s *batchStrategy) sendMessages(messages []*message.Message, outputChan chan *message.Message, send func([]byte) error) {
err := send(s.serializer.Serialize(messages))
if err != nil {
if shouldStopSending(err) {
return
}
log.Printf("Could not send payload: %v\n", err)
}
for _, message := range messages {
outputChan <- message
}
}