input code refactor

This commit is contained in:
ziv 2022-08-01 19:00:28 +08:00
parent 89248e48fb
commit 13cc06e338
43 changed files with 50 additions and 200 deletions

View File

@ -56,19 +56,15 @@ func (a *Agent) startMetricsAgent() error {
continue
}
if err = input.Init(); err != nil {
if err = inputs.MayInit(input); err != nil {
if !errors.Is(err, types.ErrInstancesEmpty) {
log.Println("E! failed to init input:", name, "error:", err)
}
continue
}
if input.GetInstances() != nil {
instances := input.GetInstances()
if len(instances) == 0 {
continue
}
instances := inputs.MayGetInstances(input)
if instances != nil {
empty := true
for i := 0; i < len(instances); i++ {
if err := instances[i].InitInternalConfig(); err != nil {
@ -76,13 +72,12 @@ func (a *Agent) startMetricsAgent() error {
continue
}
if err := instances[i].Init(); err != nil {
if err := inputs.MayInit(instances[i]); err != nil {
if !errors.Is(err, types.ErrInstancesEmpty) {
log.Println("E! failed to init input:", name, "error:", err)
}
continue
}
empty = false
}

View File

@ -41,7 +41,7 @@ func NewInputReader(inputName string, in inputs.Input) *InputReader {
func (r *InputReader) Stop() {
r.quitChan <- struct{}{}
r.input.Drop()
inputs.MayDrop(r.input)
}
func (r *InputReader) startInput() {
@ -82,10 +82,10 @@ func (r *InputReader) gatherOnce() {
// plugin level, for system plugins
slist := types.NewSampleList()
r.input.Gather(slist)
inputs.MayGather(r.input, slist)
r.forward(r.input.Process(slist))
instances := r.input.GetInstances()
instances := inputs.MayGetInstances(r.input)
if len(instances) == 0 {
return
}
@ -106,7 +106,7 @@ func (r *InputReader) gatherOnce() {
}
insList := types.NewSampleList()
ins.Gather(insList)
inputs.MayGather(ins, insList)
r.forward(ins.Process(insList))
}(instances[i])
}

View File

@ -40,11 +40,6 @@ func init() {
})
}
// just placeholder
func (c *Conntrack) GetInstances() []inputs.Instance {
return nil
}
func (c *Conntrack) setDefaults() {
if len(c.Dirs) == 0 {
c.Dirs = dfltDirs
@ -60,8 +55,6 @@ func (c *Conntrack) Init() error {
return nil
}
func (c *Conntrack) Drop() {}
func (c *Conntrack) Gather(slist *types.SampleList) {
var metricKey string
fields := make(map[string]interface{})

View File

@ -28,10 +28,6 @@ func init() {
})
}
func (c *CPUStats) Init() error { return nil }
func (c *CPUStats) Drop() {}
func (c *CPUStats) GetInstances() []inputs.Instance { return nil }
func (c *CPUStats) Gather(slist *types.SampleList) {
times, err := c.ps.CPUTimes(c.CollectPerCPU, true)
if err != nil {

View File

@ -29,18 +29,6 @@ func init() {
})
}
// just placeholder
func (s *DiskStats) GetInstances() []inputs.Instance {
return nil
}
func (s *DiskStats) Init() error {
return nil
}
func (s *DiskStats) Drop() {
}
func (s *DiskStats) Gather(slist *types.SampleList) {
disks, partitions, err := s.ps.DiskUsage(s.MountPoints, s.IgnoreFS)
if err != nil {

View File

@ -28,13 +28,6 @@ func init() {
})
}
// just placeholder
func (d *DiskIO) GetInstances() []inputs.Instance {
return nil
}
func (d *DiskIO) Drop() {}
func (d *DiskIO) Init() error {
for _, device := range d.Devices {
if filter.HasMeta(device) {

View File

@ -34,10 +34,6 @@ func init() {
})
}
func (dq *DnsQuery) Init() error { return nil }
func (dq *DnsQuery) Drop() {}
func (dq *DnsQuery) Gather(slist *types.SampleList) {}
func (dq *DnsQuery) GetInstances() []inputs.Instance {
ret := make([]inputs.Instance, len(dq.Instances))
for i := 0; i < len(dq.Instances); i++ {

View File

@ -50,10 +50,6 @@ func init() {
})
}
func (d *Docker) Init() error { return nil }
func (d *Docker) Drop() {}
func (d *Docker) Gather(slist *itypes.SampleList) {}
func (d *Docker) GetInstances() []inputs.Instance {
ret := make([]inputs.Instance, len(d.Instances))
for i := 0; i < len(d.Instances); i++ {

View File

@ -94,10 +94,6 @@ func init() {
})
}
func (r *Elasticsearch) Init() error { return nil }
func (r *Elasticsearch) Drop() {}
func (r *Elasticsearch) Gather(slist *types.SampleList) {}
func (r *Elasticsearch) GetInstances() []inputs.Instance {
ret := make([]inputs.Instance, len(r.Instances))
for i := 0; i < len(r.Instances); i++ {

View File

@ -45,10 +45,6 @@ func init() {
})
}
func (e *Exec) Init() error { return nil }
func (e *Exec) Drop() {}
func (e *Exec) Gather(slist *types.SampleList) {}
func (e *Exec) GetInstances() []inputs.Instance {
ret := make([]inputs.Instance, len(e.Instances))
for i := 0; i < len(e.Instances); i++ {

View File

@ -145,10 +145,6 @@ func init() {
})
}
func (h *HTTPResponse) Init() error { return nil }
func (h *HTTPResponse) Drop() {}
func (h *HTTPResponse) Gather(slist *types.SampleList) {}
func (h *HTTPResponse) GetInstances() []inputs.Instance {
ret := make([]inputs.Instance, len(h.Instances))
for i := 0; i < len(h.Instances); i++ {

View File

@ -5,16 +5,53 @@ import (
"flashcat.cloud/categraf/types"
)
type Initializer interface {
Init() error
}
type SampleGatherer interface {
Gather(*types.SampleList)
}
type Dropper interface {
Drop()
}
type InstancesGetter interface {
GetInstances() []Instance
}
func MayInit(t interface{}) error {
if initializer, ok := t.(Initializer); ok {
return initializer.Init()
}
return nil
}
func MayGather(t interface{}, slist *types.SampleList) {
if gather, ok := t.(SampleGatherer); ok {
gather.Gather(slist)
}
}
func MayDrop(t interface{}) {
if dropper, ok := t.(Dropper); ok {
dropper.Drop()
}
}
func MayGetInstances(t interface{}) []Instance {
if instancesGetter, ok := t.(InstancesGetter); ok {
return instancesGetter.GetInstances()
}
return nil
}
type Input interface {
GetLabels() map[string]string
GetInterval() config.Duration
InitInternalConfig() error
Process(*types.SampleList) *types.SampleList
Init() error
Drop()
Gather(*types.SampleList)
GetInstances() []Instance
}
type Creator func() Input
@ -30,7 +67,4 @@ type Instance interface {
GetIntervalTimes() int64
InitInternalConfig() error
Process(*types.SampleList) *types.SampleList
Init() error
Gather(*types.SampleList)
}

View File

@ -26,10 +26,6 @@ func init() {
})
}
func (r *JolokiaAgent) Init() error { return nil }
func (r *JolokiaAgent) Drop() {}
func (r *JolokiaAgent) Gather(slist *types.SampleList) {}
func (r *JolokiaAgent) GetInstances() []inputs.Instance {
ret := make([]inputs.Instance, len(r.Instances))
for i := 0; i < len(r.Instances); i++ {

View File

@ -25,10 +25,6 @@ func init() {
})
}
func (r *JolokiaProxy) Init() error { return nil }
func (r *JolokiaProxy) Drop() {}
func (r *JolokiaProxy) Gather(slist *types.SampleList) {}
func (r *JolokiaProxy) GetInstances() []inputs.Instance {
ret := make([]inputs.Instance, len(r.Instances))
for i := 0; i < len(r.Instances); i++ {

View File

@ -28,9 +28,6 @@ func init() {
})
}
func (r *Kafka) Init() error { return nil }
func (r *Kafka) Gather(slist *types.SampleList) {}
func (r *Kafka) GetInstances() []inputs.Instance {
ret := make([]inputs.Instance, len(r.Instances))
for i := 0; i < len(r.Instances); i++ {

View File

@ -43,10 +43,6 @@ func init() {
})
}
func (s *KernelStats) Init() error { return nil }
func (s *KernelStats) Drop() {}
func (s *KernelStats) GetInstances() []inputs.Instance { return nil }
func (s *KernelStats) Gather(slist *types.SampleList) {
data, err := s.getProcStat()
if err != nil {

View File

@ -32,10 +32,6 @@ func init() {
})
}
func (s *KernelVmstat) Init() error { return nil }
func (s *KernelVmstat) Drop() {}
func (s *KernelVmstat) GetInstances() []inputs.Instance { return nil }
func (s *KernelVmstat) Gather(slist *types.SampleList) {
data, err := s.getProcVmstat()
if err != nil {

View File

@ -32,10 +32,6 @@ func init() {
})
}
func (k *Kubernetes) Init() error { return nil }
func (k *Kubernetes) Drop() {}
func (k *Kubernetes) Gather(slist *types.SampleList) {}
func (k *Kubernetes) GetInstances() []inputs.Instance {
ret := make([]inputs.Instance, len(k.Instances))
for i := 0; i < len(k.Instances); i++ {

View File

@ -33,10 +33,6 @@ func init() {
})
}
func (s *SysctlFS) Init() error { return nil }
func (s *SysctlFS) Drop() {}
func (s *SysctlFS) GetInstances() []inputs.Instance { return nil }
func (s *SysctlFS) Gather(slist *types.SampleList) {
fields := map[string]interface{}{}

View File

@ -31,10 +31,6 @@ func init() {
})
}
func (l *Logstash) Init() error { return nil }
func (l *Logstash) Drop() {}
func (l *Logstash) Gather(slist *types.SampleList) {}
func (l *Logstash) GetInstances() []inputs.Instance {
ret := make([]inputs.Instance, len(l.Instances))
for i := 0; i < len(l.Instances); i++ {

View File

@ -28,10 +28,6 @@ func init() {
})
}
func (s *MemStats) Init() error { return nil }
func (s *MemStats) Drop() {}
func (s *MemStats) GetInstances() []inputs.Instance { return nil }
func (s *MemStats) Gather(slist *types.SampleList) {
vm, err := s.ps.VMStat()
if err != nil {

View File

@ -24,9 +24,6 @@ func init() {
})
}
func (r *MongoDB) Init() error { return nil }
func (r *MongoDB) Gather(slist *types.SampleList) {}
func (r *MongoDB) GetInstances() []inputs.Instance {
ret := make([]inputs.Instance, len(r.Instances))
for i := 0; i < len(r.Instances); i++ {

View File

@ -158,10 +158,6 @@ func init() {
})
}
func (m *MySQL) Init() error { return nil }
func (m *MySQL) Drop() {}
func (m *MySQL) Gather(slist *types.SampleList) {}
func (m *MySQL) GetInstances() []inputs.Instance {
ret := make([]inputs.Instance, len(m.Instances))
for i := 0; i < len(m.Instances); i++ {

View File

@ -33,9 +33,6 @@ func init() {
})
}
func (s *NetIOStats) Drop() {}
func (s *NetIOStats) GetInstances() []inputs.Instance { return nil }
func (s *NetIOStats) Init() error {
var err error

View File

@ -93,10 +93,6 @@ func init() {
})
}
func (n *NetResponse) Init() error { return nil }
func (n *NetResponse) Drop() {}
func (n *NetResponse) Gather(slist *types.SampleList) {}
func (n *NetResponse) GetInstances() []inputs.Instance {
ret := make([]inputs.Instance, len(n.Instances))
for i := 0; i < len(n.Instances); i++ {

View File

@ -26,10 +26,6 @@ func init() {
})
}
func (s *NetStats) Init() error { return nil }
func (s *NetStats) Drop() {}
func (s *NetStats) GetInstances() []inputs.Instance { return nil }
func (s *NetStats) Gather(slist *types.SampleList) {
netconns, err := s.ps.NetConnections()
if err != nil {

View File

@ -212,8 +212,6 @@ func (s *NfsClient) Init() error {
return nil
}
func (r *NfsClient) Drop() {}
func (s *NfsClient) Gather(slist *types.SampleList) {
file, err := os.Open(s.mountstatsPath)
if err != nil {
@ -231,9 +229,6 @@ func (s *NfsClient) Gather(slist *types.SampleList) {
log.Println("E!", err)
}
}
func (s *NfsClient) GetInstances() []inputs.Instance {
return nil
}
func convertToUint64(line []string) ([]uint64, error) {
/* A "line" of input data (a pre-split array of strings) is

View File

@ -32,10 +32,6 @@ func init() {
})
}
func (r *NginxUpstreamCheck) Init() error { return nil }
func (r *NginxUpstreamCheck) Drop() {}
func (r *NginxUpstreamCheck) Gather(slist *types.SampleList) {}
func (r *NginxUpstreamCheck) GetInstances() []inputs.Instance {
ret := make([]inputs.Instance, len(r.Instances))
for i := 0; i < len(r.Instances); i++ {

View File

@ -24,9 +24,6 @@ func init() {
})
}
func (n *NTPStat) Drop() {}
func (n *NTPStat) GetInstances() []inputs.Instance { return nil }
func (n *NTPStat) Init() error {
if len(n.NTPServers) == 0 {
return types.ErrInstancesEmpty

View File

@ -30,9 +30,6 @@ func init() {
})
}
func (s *GPUStats) Drop() {}
func (s *GPUStats) GetInstances() []inputs.Instance { return nil }
func (s *GPUStats) Init() error {
if s.NvidiaSmiCommand == "" {
return types.ErrInstancesEmpty

View File

@ -59,9 +59,6 @@ func init() {
})
}
func (o *Oracle) Init() error { return nil }
func (o *Oracle) Gather(slist *types.SampleList) {}
func (o *Oracle) Drop() {
for i := 0; i < len(o.Instances); i++ {
o.Instances[i].Drop()

View File

@ -89,10 +89,6 @@ func init() {
})
}
func (p *Ping) Init() error { return nil }
func (p *Ping) Drop() {}
func (p *Ping) Gather(slist *types.SampleList) {}
func (p *Ping) GetInstances() []inputs.Instance {
ret := make([]inputs.Instance, len(p.Instances))
for i := 0; i < len(p.Instances); i++ {

View File

@ -34,10 +34,6 @@ func init() {
})
}
func (p *Processes) Init() error { return nil }
func (p *Processes) Drop() {}
func (p *Processes) GetInstances() []inputs.Instance { return nil }
func (p *Processes) Gather(slist *types.SampleList) {
// Get an empty map of metric fields
fields := getEmptyFields()

View File

@ -70,10 +70,6 @@ func init() {
})
}
func (s *Procstat) Init() error { return nil }
func (s *Procstat) Drop() {}
func (s *Procstat) Gather(slist *types.SampleList) {}
func (s *Procstat) GetInstances() []inputs.Instance {
ret := make([]inputs.Instance, len(s.Instances))
for i := 0; i < len(s.Instances); i++ {

View File

@ -135,10 +135,6 @@ func init() {
})
}
func (p *Prometheus) Init() error { return nil }
func (p *Prometheus) Drop() {}
func (p *Prometheus) Gather(slist *types.SampleList) {}
func (p *Prometheus) GetInstances() []inputs.Instance {
ret := make([]inputs.Instance, len(p.Instances))
for i := 0; i < len(p.Instances); i++ {

View File

@ -29,10 +29,6 @@ func init() {
})
}
func (r *RabbitMQ) Init() error { return nil }
func (r *RabbitMQ) Drop() {}
func (r *RabbitMQ) Gather(slist *types.SampleList) {}
func (r *RabbitMQ) GetInstances() []inputs.Instance {
ret := make([]inputs.Instance, len(r.Instances))
for i := 0; i < len(r.Instances); i++ {

View File

@ -75,9 +75,6 @@ func init() {
})
}
func (r *Redis) Init() error { return nil }
func (r *Redis) Gather(slist *types.SampleList) {}
func (r *Redis) GetInstances() []inputs.Instance {
ret := make([]inputs.Instance, len(r.Instances))
for i := 0; i < len(r.Instances); i++ {

View File

@ -35,10 +35,6 @@ func init() {
})
}
func (r *RedisSentinel) Init() error { return nil }
func (r *RedisSentinel) Drop() {}
func (r *RedisSentinel) Gather(slist *types.SampleList) {}
func (r *RedisSentinel) GetInstances() []inputs.Instance {
ret := make([]inputs.Instance, len(r.Instances))
for i := 0; i < len(r.Instances); i++ {

View File

@ -33,9 +33,6 @@ func init() {
})
}
func (s *Switch) Drop() {}
func (s *Switch) Gather(slist *types.SampleList) {}
func (s *Switch) MappingIP(ip string) string {
val, has := s.Mappings[ip]
if has {
@ -122,8 +119,6 @@ type Custom struct {
OID string `toml:"oid"`
}
func (ins *Instance) Init() error { return nil }
func (ins *Instance) RealInit() error {
if len(ins.IPs) == 0 {
return types.ErrInstancesEmpty

View File

@ -26,10 +26,6 @@ func init() {
})
}
func (s *SystemStats) Init() error { return nil }
func (s *SystemStats) Drop() {}
func (s *SystemStats) GetInstances() []inputs.Instance { return nil }
func (s *SystemStats) Gather(slist *types.SampleList) {
loadavg, err := load.Avg()
if err != nil && !strings.Contains(err.Error(), "not implemented") {

View File

@ -139,10 +139,6 @@ func init() {
})
}
func (t *Tomcat) Init() error { return nil }
func (t *Tomcat) Drop() {}
func (t *Tomcat) Gather(slist *types.SampleList) {}
func (t *Tomcat) GetInstances() []inputs.Instance {
ret := make([]inputs.Instance, len(t.Instances))
for i := 0; i < len(t.Instances); i++ {

View File

@ -3,7 +3,6 @@ package tpl
import (
"flashcat.cloud/categraf/config"
"flashcat.cloud/categraf/inputs"
"flashcat.cloud/categraf/types"
)
const inputName = "plugin_tpl"
@ -19,11 +18,6 @@ func init() {
})
}
func (pt *PluginTpl) Prefix() string { return inputName }
func (pt *PluginTpl) Init() error { return nil }
func (pt *PluginTpl) Drop() {}
func (pt *PluginTpl) Gather(slist *types.SampleList) {}
func (pt *PluginTpl) GetInstances() []inputs.Instance {
ret := make([]inputs.Instance, len(pt.Instances))
for i := 0; i < len(pt.Instances); i++ {
@ -35,11 +29,3 @@ func (pt *PluginTpl) GetInstances() []inputs.Instance {
type Instance struct {
config.InstanceConfig
}
func (ins *Instance) Init() error {
return nil
}
func (ins *Instance) Gather(slist *types.SampleList) {
}

View File

@ -71,10 +71,6 @@ func init() {
})
}
func (z *Zookeeper) Init() error { return nil }
func (z *Zookeeper) Drop() {}
func (z *Zookeeper) Gather(slist *types.SampleList) {}
func (z *Zookeeper) GetInstances() []inputs.Instance {
ret := make([]inputs.Instance, len(z.Instances))
for i := 0; i < len(z.Instances); i++ {