refactor input reader

This commit is contained in:
Ulric Qin 2022-06-24 17:14:59 +08:00
parent e18d26f3ce
commit f3b2569d3b
1 changed files with 9 additions and 9 deletions

View File

@ -19,14 +19,14 @@ const agentHostnameLabelKey = "agent_hostname"
var metricReplacer = strings.NewReplacer("-", "_", ".", "_")
type InputReader struct {
instance inputs.Input
input inputs.Input
quitChan chan struct{}
queue chan *types.Sample
}
func NewInputReader(ins inputs.Input) *InputReader {
func NewInputReader(in inputs.Input) *InputReader {
return &InputReader{
instance: ins,
input: in,
quitChan: make(chan struct{}, 1),
queue: make(chan *types.Sample, config.Config.WriterOpt.ChanSize),
}
@ -43,13 +43,13 @@ func (r *InputReader) Start() {
func (r *InputReader) Stop() {
r.quitChan <- struct{}{}
close(r.queue)
r.instance.Drop()
r.input.Drop()
}
func (r *InputReader) startInstance() {
interval := config.GetInterval()
if r.instance.GetInterval() > 0 {
interval = time.Duration(r.instance.GetInterval())
if r.input.GetInterval() > 0 {
interval = time.Duration(r.input.GetInterval())
}
for {
select {
@ -76,7 +76,7 @@ func (r *InputReader) gatherOnce() {
// gather
slist := list.NewSafeList()
r.instance.Gather(slist)
r.input.Gather(slist)
// handle result
samples := slist.PopBackAll()
@ -97,8 +97,8 @@ func (r *InputReader) gatherOnce() {
s.Timestamp = now
}
if len(r.instance.Prefix()) > 0 {
s.Metric = r.instance.Prefix() + "_" + metricReplacer.Replace(s.Metric)
if len(r.input.Prefix()) > 0 {
s.Metric = r.input.Prefix() + "_" + metricReplacer.Replace(s.Metric)
} else {
s.Metric = metricReplacer.Replace(s.Metric)
}