diff --git a/agent/metrics_agent.go b/agent/metrics_agent.go index 50eee0e..72e4c0b 100644 --- a/agent/metrics_agent.go +++ b/agent/metrics_agent.go @@ -57,8 +57,8 @@ func (a *Agent) startMetricsAgent() error { continue } - reader := NewInputReader(inp) - reader.Start(name) + reader := NewInputReader(name, inp) + reader.Start() a.InputReaders[name] = reader log.Println("I! input:", name, "started") diff --git a/agent/metrics_reader.go b/agent/metrics_reader.go index e08bc38..e940312 100644 --- a/agent/metrics_reader.go +++ b/agent/metrics_reader.go @@ -19,25 +19,28 @@ const agentHostnameLabelKey = "agent_hostname" var metricReplacer = strings.NewReplacer("-", "_", ".", "_", " ", "_", "'", "_", "\"", "_") type InputReader struct { - input inputs.Input - quitChan chan struct{} - queue chan *types.Sample + inputName string + input inputs.Input + quitChan chan struct { + } + queue chan *types.Sample } -func NewInputReader(in inputs.Input) *InputReader { +func NewInputReader(inputName string, in inputs.Input) *InputReader { return &InputReader{ - input: in, - quitChan: make(chan struct{}, 1), - queue: make(chan *types.Sample, config.Config.WriterOpt.ChanSize), + inputName: inputName, + input: in, + quitChan: make(chan struct{}, 1), + queue: make(chan *types.Sample, config.Config.WriterOpt.ChanSize), } } -func (r *InputReader) Start(inputName string) { +func (r *InputReader) Start() { // start consumer goroutines - go r.read(inputName) + go r.read() // start collector instance - go r.startInput(inputName) + go r.startInput() } func (r *InputReader) Stop() { @@ -45,7 +48,7 @@ func (r *InputReader) Stop() { r.input.Drop() } -func (r *InputReader) startInput(inputName string) { +func (r *InputReader) startInput() { interval := config.GetInterval() if r.input.GetInterval() > 0 { interval = time.Duration(r.input.GetInterval()) @@ -62,23 +65,23 @@ func (r *InputReader) startInput(inputName string) { var start time.Time if config.Config.DebugMode { start = time.Now() - log.Println("D!", inputName, ": before gather once") + log.Println("D!", r.inputName, ": before gather once") } - r.gatherOnce(inputName) + r.gatherOnce() if config.Config.DebugMode { ms := time.Since(start).Milliseconds() - log.Println("D!", inputName, ": after gather once,", "duration:", ms, "ms") + log.Println("D!", r.inputName, ": after gather once,", "duration:", ms, "ms") } } } } -func (r *InputReader) gatherOnce(inputName string) { +func (r *InputReader) gatherOnce() { defer func() { - if r := recover(); r != nil { - log.Println("E!", inputName, ": gather metrics panic:", r, string(runtimex.Stack(3))) + if rc := recover(); rc != nil { + log.Println("E!", r.inputName, ": gather metrics panic:", r, string(runtimex.Stack(3))) } }() @@ -95,7 +98,7 @@ func (r *InputReader) gatherOnce(inputName string) { } if config.Config.DebugMode { - log.Println("D!", inputName, ": gathered samples size:", size) + log.Println("D!", r.inputName, ": gathered samples size:", size) } now := time.Now() @@ -145,7 +148,7 @@ func (r *InputReader) gatherOnce(inputName string) { } } -func (r *InputReader) read(inputName string) { +func (r *InputReader) read() { batch := config.Config.WriterOpt.Batch if batch <= 0 { batch = 2000