reader code refactor
This commit is contained in:
parent
b74a9ce888
commit
76c0124214
|
@ -57,8 +57,8 @@ func (a *Agent) startMetricsAgent() error {
|
|||
continue
|
||||
}
|
||||
|
||||
reader := NewInputReader(inp)
|
||||
reader.Start(name)
|
||||
reader := NewInputReader(name, inp)
|
||||
reader.Start()
|
||||
a.InputReaders[name] = reader
|
||||
|
||||
log.Println("I! input:", name, "started")
|
||||
|
|
|
@ -19,25 +19,28 @@ const agentHostnameLabelKey = "agent_hostname"
|
|||
var metricReplacer = strings.NewReplacer("-", "_", ".", "_", " ", "_", "'", "_", "\"", "_")
|
||||
|
||||
type InputReader struct {
|
||||
input inputs.Input
|
||||
quitChan chan struct{}
|
||||
queue chan *types.Sample
|
||||
inputName string
|
||||
input inputs.Input
|
||||
quitChan chan struct {
|
||||
}
|
||||
queue chan *types.Sample
|
||||
}
|
||||
|
||||
func NewInputReader(in inputs.Input) *InputReader {
|
||||
func NewInputReader(inputName string, in inputs.Input) *InputReader {
|
||||
return &InputReader{
|
||||
input: in,
|
||||
quitChan: make(chan struct{}, 1),
|
||||
queue: make(chan *types.Sample, config.Config.WriterOpt.ChanSize),
|
||||
inputName: inputName,
|
||||
input: in,
|
||||
quitChan: make(chan struct{}, 1),
|
||||
queue: make(chan *types.Sample, config.Config.WriterOpt.ChanSize),
|
||||
}
|
||||
}
|
||||
|
||||
func (r *InputReader) Start(inputName string) {
|
||||
func (r *InputReader) Start() {
|
||||
// start consumer goroutines
|
||||
go r.read(inputName)
|
||||
go r.read()
|
||||
|
||||
// start collector instance
|
||||
go r.startInput(inputName)
|
||||
go r.startInput()
|
||||
}
|
||||
|
||||
func (r *InputReader) Stop() {
|
||||
|
@ -45,7 +48,7 @@ func (r *InputReader) Stop() {
|
|||
r.input.Drop()
|
||||
}
|
||||
|
||||
func (r *InputReader) startInput(inputName string) {
|
||||
func (r *InputReader) startInput() {
|
||||
interval := config.GetInterval()
|
||||
if r.input.GetInterval() > 0 {
|
||||
interval = time.Duration(r.input.GetInterval())
|
||||
|
@ -62,23 +65,23 @@ func (r *InputReader) startInput(inputName string) {
|
|||
var start time.Time
|
||||
if config.Config.DebugMode {
|
||||
start = time.Now()
|
||||
log.Println("D!", inputName, ": before gather once")
|
||||
log.Println("D!", r.inputName, ": before gather once")
|
||||
}
|
||||
|
||||
r.gatherOnce(inputName)
|
||||
r.gatherOnce()
|
||||
|
||||
if config.Config.DebugMode {
|
||||
ms := time.Since(start).Milliseconds()
|
||||
log.Println("D!", inputName, ": after gather once,", "duration:", ms, "ms")
|
||||
log.Println("D!", r.inputName, ": after gather once,", "duration:", ms, "ms")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (r *InputReader) gatherOnce(inputName string) {
|
||||
func (r *InputReader) gatherOnce() {
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
log.Println("E!", inputName, ": gather metrics panic:", r, string(runtimex.Stack(3)))
|
||||
if rc := recover(); rc != nil {
|
||||
log.Println("E!", r.inputName, ": gather metrics panic:", r, string(runtimex.Stack(3)))
|
||||
}
|
||||
}()
|
||||
|
||||
|
@ -95,7 +98,7 @@ func (r *InputReader) gatherOnce(inputName string) {
|
|||
}
|
||||
|
||||
if config.Config.DebugMode {
|
||||
log.Println("D!", inputName, ": gathered samples size:", size)
|
||||
log.Println("D!", r.inputName, ": gathered samples size:", size)
|
||||
}
|
||||
|
||||
now := time.Now()
|
||||
|
@ -145,7 +148,7 @@ func (r *InputReader) gatherOnce(inputName string) {
|
|||
}
|
||||
}
|
||||
|
||||
func (r *InputReader) read(inputName string) {
|
||||
func (r *InputReader) read() {
|
||||
batch := config.Config.WriterOpt.Batch
|
||||
if batch <= 0 {
|
||||
batch = 2000
|
||||
|
|
Loading…
Reference in New Issue