rename consumer -> reader

This commit is contained in:
Ulric Qin 2022-04-14 00:05:48 +08:00
parent 6ae0309cf6
commit 46cd4a21ac
2 changed files with 10 additions and 10 deletions

View File

@ -41,9 +41,9 @@ func (a *Agent) Start() {
func (a *Agent) Stop() {
log.Println("I! agent stopping")
for name := range InputConsumers {
InputConsumers[name].Instance.StopGoroutines()
close(InputConsumers[name].Queue)
for name := range InputReaders {
InputReaders[name].Instance.StopGoroutines()
close(InputReaders[name].Queue)
}
}
@ -84,7 +84,7 @@ func (a *Agent) startInputs() error {
continue
}
c := &Consumer{
c := &Reader{
Instance: instance,
Queue: make(chan *types.Sample, 1000000),
}
@ -92,7 +92,7 @@ func (a *Agent) startInputs() error {
log.Println("I! input:", name, "started")
c.Start()
InputConsumers[name] = c
InputReaders[name] = c
}
return nil

View File

@ -13,22 +13,22 @@ import (
"github.com/prometheus/prometheus/prompb"
)
type Consumer struct {
type Reader struct {
Instance inputs.Input
Queue chan *types.Sample
}
var InputConsumers = map[string]*Consumer{}
var InputReaders = map[string]*Reader{}
func (c *Consumer) Start() {
func (c *Reader) Start() {
// start consumer goroutines
go consume(c.Queue)
go read(c.Queue)
// start collector goroutines
c.Instance.StartGoroutines(c.Queue)
}
func consume(queue chan *types.Sample) {
func read(queue chan *types.Sample) {
batch := config.Config.WriterOpt.Batch
if batch <= 0 {
batch = 2000