code refactor

This commit is contained in:
Ulric Qin 2022-07-23 20:25:27 +08:00
parent fecc1f8384
commit e91c8d2897
2 changed files with 7 additions and 12 deletions

View File

@ -57,10 +57,7 @@ func (a *Agent) startMetricsAgent() error {
continue
}
reader := NewInputReader(name, inp)
reader.Start()
a.InputReaders[name] = reader
a.StartReader(name, inp)
log.Println("I! input:", name, "started")
}

View File

@ -22,7 +22,12 @@ type InputReader struct {
inputName string
input inputs.Input
quitChan chan struct{}
queue chan *types.Sample
}
func (a *Agent) StartReader(name string, in inputs.Input) {
reader := NewInputReader(name, in)
go reader.startInput()
a.InputReaders[name] = reader
}
func NewInputReader(inputName string, in inputs.Input) *InputReader {
@ -30,15 +35,9 @@ func NewInputReader(inputName string, in inputs.Input) *InputReader {
inputName: inputName,
input: in,
quitChan: make(chan struct{}, 1),
queue: make(chan *types.Sample, config.Config.WriterOpt.ChanSize),
}
}
func (r *InputReader) Start() {
// start collector instance
go r.startInput()
}
func (r *InputReader) Stop() {
r.quitChan <- struct{}{}
r.input.Drop()
@ -54,7 +53,6 @@ func (r *InputReader) startInput() {
select {
case <-r.quitChan:
close(r.quitChan)
close(r.queue)
return
default:
time.Sleep(interval)