a real big code refactor

This commit is contained in:
Ulric Qin 2022-07-24 00:18:00 +08:00
parent ee3c28a7d7
commit 62e0361350
54 changed files with 617 additions and 1450 deletions

View File

@ -58,6 +58,30 @@ func (a *Agent) startMetricsAgent() error {
continue
}
if input.GetInstances() != nil {
instances := input.GetInstances()
if len(instances) == 0 {
continue
}
empty := true
for i := 0; i < len(instances); i++ {
err := instances[i].Init()
if err != nil {
if !errors.Is(err, types.ErrInstancesEmpty) {
log.Println("E! failed to init input:", name, "error:", err)
}
continue
}
empty = false
}
if empty {
continue
}
}
a.StartInputReader(name, input)
log.Println("I! input:", name, "started")
}

View File

@ -3,6 +3,8 @@ package agent
import (
"log"
"strings"
"sync"
"sync/atomic"
"time"
"flashcat.cloud/categraf/config"
@ -19,9 +21,11 @@ const agentHostnameLabelKey = "agent_hostname"
var metricReplacer = strings.NewReplacer("-", "_", ".", "_", " ", "_", "'", "_", "\"", "_")
type InputReader struct {
inputName string
input inputs.Input
quitChan chan struct{}
inputName string
input inputs.Input
quitChan chan struct{}
runCounter uint64
waitGroup sync.WaitGroup
}
func (a *Agent) StartInputReader(name string, in inputs.Input) {
@ -72,6 +76,39 @@ func (r *InputReader) startInput() {
}
}
func (r *InputReader) work(slist *list.SafeList) {
instances := r.input.GetInstances()
if instances == nil {
r.input.Gather(slist)
return
}
if len(instances) == 0 {
return
}
atomic.AddUint64(&r.runCounter, 1)
for i := 0; i < len(instances); i++ {
r.waitGroup.Add(1)
go func(slist *list.SafeList, ins inputs.Instance) {
defer r.waitGroup.Done()
it := ins.GetIntervalTimes()
if it > 0 {
counter := atomic.LoadUint64(&r.runCounter)
if counter%uint64(it) != 0 {
return
}
}
ins.Gather(slist)
}(slist, instances[i])
}
r.waitGroup.Wait()
}
func (r *InputReader) gatherOnce() {
defer func() {
if rc := recover(); rc != nil {
@ -81,7 +118,7 @@ func (r *InputReader) gatherOnce() {
// gather
slist := list.NewSafeList()
r.input.Gather(slist)
r.work(slist)
// handle result
samples := slist.PopBackAll()

View File

@ -7,3 +7,20 @@ type Interval struct {
func (i Interval) GetInterval() Duration {
return i.Interval
}
type InstanceConfig struct {
Labels map[string]string `toml:"labels"`
IntervalTimes int64 `toml:"interval_times"`
}
func (ic InstanceConfig) GetLabels() map[string]string {
if ic.Labels != nil {
return ic.Labels
}
return map[string]string{}
}
func (ic InstanceConfig) GetIntervalTimes() int64 {
return ic.IntervalTimes
}

View File

@ -42,6 +42,11 @@ func init() {
})
}
// just placeholder
func (c *Conntrack) GetInstances() []inputs.Instance {
return nil
}
func (c *Conntrack) Prefix() string {
return inputName
}

View File

@ -30,17 +30,10 @@ func init() {
})
}
func (s *CPUStats) Prefix() string {
return inputName
}
// overwrite func
func (c *CPUStats) Init() error {
return nil
}
func (c *CPUStats) Drop() {
}
func (c *CPUStats) Prefix() string { return inputName }
func (c *CPUStats) Init() error { return nil }
func (c *CPUStats) Drop() {}
func (c *CPUStats) GetInstances() []inputs.Instance { return nil }
func (c *CPUStats) Gather(slist *list.SafeList) {
times, err := c.ps.CPUTimes(c.CollectPerCPU, true)

View File

@ -31,6 +31,11 @@ func init() {
})
}
// just placeholder
func (s *DiskStats) GetInstances() []inputs.Instance {
return nil
}
func (s *DiskStats) Prefix() string {
return inputName
}

View File

@ -30,6 +30,11 @@ func init() {
})
}
// just placeholder
func (d *DiskIO) GetInstances() []inputs.Instance {
return nil
}
func (d *DiskIO) Prefix() string {
return inputName
}

View File

@ -6,7 +6,6 @@ import (
"net"
"strconv"
"sync"
"sync/atomic"
"time"
"flashcat.cloud/categraf/config"
@ -28,8 +27,6 @@ const (
type DnsQuery struct {
config.Interval
counter uint64
waitgrp sync.WaitGroup
Instances []*Instance `toml:"instances"`
}
@ -39,57 +36,21 @@ func init() {
})
}
func (r *DnsQuery) Prefix() string {
return inputName
}
func (dq *DnsQuery) Prefix() string { return inputName }
func (dq *DnsQuery) Init() error { return nil }
func (dq *DnsQuery) Drop() {}
func (dq *DnsQuery) Gather(slist *list.SafeList) {}
func (r *DnsQuery) Init() error {
if len(r.Instances) == 0 {
return types.ErrInstancesEmpty
func (dq *DnsQuery) GetInstances() []inputs.Instance {
ret := make([]inputs.Instance, len(dq.Instances))
for i := 0; i < len(dq.Instances); i++ {
ret[i] = dq.Instances[i]
}
for i := 0; i < len(r.Instances); i++ {
if err := r.Instances[i].Init(); err != nil {
return err
}
}
return nil
}
func (r *DnsQuery) Drop() {}
func (r *DnsQuery) Gather(slist *list.SafeList) {
atomic.AddUint64(&r.counter, 1)
for i := range r.Instances {
ins := r.Instances[i]
if len(ins.Servers) == 0 {
continue
}
r.waitgrp.Add(1)
go func(slist *list.SafeList, ins *Instance) {
defer r.waitgrp.Done()
if ins.IntervalTimes > 0 {
counter := atomic.LoadUint64(&r.counter)
if counter%uint64(ins.IntervalTimes) != 0 {
return
}
}
ins.gatherOnce(slist)
}(slist, ins)
}
r.waitgrp.Wait()
return ret
}
type Instance struct {
Labels map[string]string `toml:"labels"`
IntervalTimes int64 `toml:"interval_times"`
config.InstanceConfig
// Domains or subdomains to query
Domains []string `toml:"domains"`
@ -112,7 +73,7 @@ type Instance struct {
func (ins *Instance) Init() error {
if len(ins.Servers) == 0 {
return nil
return types.ErrInstancesEmpty
}
if ins.Network == "" {
@ -139,7 +100,7 @@ func (ins *Instance) Init() error {
return nil
}
func (ins *Instance) gatherOnce(slist *list.SafeList) {
func (ins *Instance) Gather(slist *list.SafeList) {
var wg sync.WaitGroup
for _, domain := range ins.Domains {

View File

@ -9,7 +9,6 @@ import (
"log"
"strings"
"sync"
"sync/atomic"
"time"
"flashcat.cloud/categraf/config"
@ -45,8 +44,6 @@ var (
type Docker struct {
config.Interval
counter uint64
waitgrp sync.WaitGroup
Instances []*Instance `toml:"instances"`
}
@ -56,57 +53,21 @@ func init() {
})
}
func (d *Docker) Prefix() string {
return ""
}
func (d *Docker) Init() error {
if len(d.Instances) == 0 {
return itypes.ErrInstancesEmpty
}
func (d *Docker) Prefix() string { return "" }
func (d *Docker) Init() error { return nil }
func (d *Docker) Drop() {}
func (d *Docker) Gather(slist *list.SafeList) {}
func (d *Docker) GetInstances() []inputs.Instance {
ret := make([]inputs.Instance, len(d.Instances))
for i := 0; i < len(d.Instances); i++ {
if err := d.Instances[i].Init(); err != nil {
return err
}
ret[i] = d.Instances[i]
}
return nil
}
func (d *Docker) Drop() {}
func (d *Docker) Gather(slist *list.SafeList) {
atomic.AddUint64(&d.counter, 1)
for i := range d.Instances {
ins := d.Instances[i]
if len(ins.Endpoint) == 0 {
continue
}
d.waitgrp.Add(1)
go func(slist *list.SafeList, ins *Instance) {
defer d.waitgrp.Done()
if ins.IntervalTimes > 0 {
counter := atomic.LoadUint64(&d.counter)
if counter%uint64(ins.IntervalTimes) != 0 {
return
}
}
ins.gatherOnce(slist)
}(slist, ins)
}
d.waitgrp.Wait()
return ret
}
type Instance struct {
Labels map[string]string `toml:"labels"`
IntervalTimes int64 `toml:"interval_times"`
config.InstanceConfig
Endpoint string `toml:"endpoint"`
GatherServices bool `toml:"gather_services"`
@ -134,7 +95,7 @@ type Instance struct {
func (ins *Instance) Init() error {
if len(ins.Endpoint) == 0 {
return nil
return itypes.ErrInstancesEmpty
}
c, err := ins.getNewClient()
@ -168,7 +129,7 @@ func (ins *Instance) Init() error {
return nil
}
func (ins *Instance) gatherOnce(slist *list.SafeList) {
func (ins *Instance) Gather(slist *list.SafeList) {
if ins.Endpoint == "" {
return
}

View File

@ -9,7 +9,6 @@ import (
"sort"
"strings"
"sync"
"sync/atomic"
"time"
"flashcat.cloud/categraf/config"
@ -89,8 +88,6 @@ type indexStat struct {
type Elasticsearch struct {
config.Interval
counter uint64
waitgrp sync.WaitGroup
Instances []*Instance `toml:"instances"`
}
@ -100,57 +97,21 @@ func init() {
})
}
func (r *Elasticsearch) Prefix() string {
return inputName
}
func (r *Elasticsearch) Init() error {
if len(r.Instances) == 0 {
return types.ErrInstancesEmpty
}
func (r *Elasticsearch) Prefix() string { return inputName }
func (r *Elasticsearch) Init() error { return nil }
func (r *Elasticsearch) Drop() {}
func (r *Elasticsearch) Gather(slist *list.SafeList) {}
func (r *Elasticsearch) GetInstances() []inputs.Instance {
ret := make([]inputs.Instance, len(r.Instances))
for i := 0; i < len(r.Instances); i++ {
if err := r.Instances[i].Init(); err != nil {
return err
}
ret[i] = r.Instances[i]
}
return nil
}
func (r *Elasticsearch) Drop() {}
func (r *Elasticsearch) Gather(slist *list.SafeList) {
atomic.AddUint64(&r.counter, 1)
for i := range r.Instances {
ins := r.Instances[i]
if len(ins.Servers) == 0 {
continue
}
r.waitgrp.Add(1)
go func(slist *list.SafeList, ins *Instance) {
defer r.waitgrp.Done()
if ins.IntervalTimes > 0 {
counter := atomic.LoadUint64(&r.counter)
if counter%uint64(ins.IntervalTimes) != 0 {
return
}
}
ins.gatherOnce(slist)
}(slist, ins)
}
r.waitgrp.Wait()
return ret
}
type Instance struct {
Labels map[string]string `toml:"labels"`
IntervalTimes int64 `toml:"interval_times"`
config.InstanceConfig
Local bool `toml:"local"`
Servers []string `toml:"servers"`
@ -183,7 +144,7 @@ func (i serverInfo) isMaster() bool {
func (ins *Instance) Init() error {
if len(ins.Servers) == 0 {
return nil
return types.ErrInstancesEmpty
}
if ins.HTTPTimeout <= 0 {
@ -222,7 +183,7 @@ func (ins *Instance) compileIndexMatchers() (map[string]filter.Filter, error) {
return indexMatchers, nil
}
func (ins *Instance) gatherOnce(slist *list.SafeList) {
func (ins *Instance) Gather(slist *list.SafeList) {
if ins.ClusterStats || len(ins.IndicesInclude) > 0 || len(ins.IndicesLevel) > 0 {
var wgC sync.WaitGroup
wgC.Add(len(ins.Servers))

View File

@ -10,7 +10,6 @@ import (
"runtime"
"strings"
"sync"
"sync/atomic"
"time"
"flashcat.cloud/categraf/config"
@ -28,18 +27,18 @@ import (
const inputName = "exec"
const MaxStderrBytes int = 512
type ExecInstance struct {
Commands []string `toml:"commands"`
Timeout config.Duration `toml:"timeout"`
IntervalTimes int64 `toml:"interval_times"`
DataFormat string `toml:"data_format"`
parser parser.Parser
type Instance struct {
config.InstanceConfig
Commands []string `toml:"commands"`
Timeout config.Duration `toml:"timeout"`
DataFormat string `toml:"data_format"`
parser parser.Parser
}
type Exec struct {
config.Interval
Instances []ExecInstance `toml:"instances"`
Counter uint64
Instances []*Instance `toml:"instances"`
}
func init() {
@ -48,65 +47,42 @@ func init() {
})
}
func (e *Exec) Prefix() string {
return ""
func (e *Exec) Prefix() string { return "" }
func (e *Exec) Init() error { return nil }
func (e *Exec) Drop() {}
func (e *Exec) Gather(slist *list.SafeList) {}
func (e *Exec) GetInstances() []inputs.Instance {
ret := make([]inputs.Instance, len(e.Instances))
for i := 0; i < len(e.Instances); i++ {
ret[i] = e.Instances[i]
}
return ret
}
func (e *Exec) Drop() {}
func (e *Exec) Init() error {
if len(e.Instances) == 0 {
func (ins *Instance) Init() error {
if len(ins.Commands) == 0 {
return types.ErrInstancesEmpty
}
for i := 0; i < len(e.Instances); i++ {
if len(e.Instances[i].Commands) == 0 {
continue
}
if e.Instances[i].DataFormat == "" || e.Instances[i].DataFormat == "influx" {
e.Instances[i].parser = influx.NewParser()
} else if e.Instances[i].DataFormat == "falcon" {
e.Instances[i].parser = falcon.NewParser()
} else if strings.HasPrefix(e.Instances[i].DataFormat, "prom") {
e.Instances[i].parser = prometheus.NewParser("", map[string]string{}, nil, nil, nil)
} else {
return fmt.Errorf("data_format(%s) not supported", e.Instances[i].DataFormat)
}
if ins.DataFormat == "" || ins.DataFormat == "influx" {
ins.parser = influx.NewParser()
} else if ins.DataFormat == "falcon" {
ins.parser = falcon.NewParser()
} else if strings.HasPrefix(ins.DataFormat, "prom") {
ins.parser = prometheus.NewParser("", map[string]string{}, nil, nil, nil)
} else {
return fmt.Errorf("data_format(%s) not supported", ins.DataFormat)
}
if e.Instances[i].Timeout == 0 {
e.Instances[i].Timeout = config.Duration(time.Second * 5)
}
if ins.Timeout == 0 {
ins.Timeout = config.Duration(time.Second * 5)
}
return nil
}
func (e *Exec) Gather(slist *list.SafeList) {
atomic.AddUint64(&e.Counter, 1)
var wg sync.WaitGroup
wg.Add(len(e.Instances))
for i := range e.Instances {
ins := e.Instances[i]
if len(ins.Commands) == 0 {
continue
}
go e.GatherOnce(&wg, slist, ins)
}
wg.Wait()
}
func (e *Exec) GatherOnce(wg *sync.WaitGroup, slist *list.SafeList, ins ExecInstance) {
defer wg.Done()
if ins.IntervalTimes > 0 {
counter := atomic.LoadUint64(&e.Counter)
if counter%uint64(ins.IntervalTimes) != 0 {
return
}
}
func (ins *Instance) Gather(slist *list.SafeList) {
var commands []string
for _, pattern := range ins.Commands {
cmdAndArgs := strings.SplitN(pattern, " ", 2)
@ -146,13 +122,13 @@ func (e *Exec) GatherOnce(wg *sync.WaitGroup, slist *list.SafeList, ins ExecInst
var waitCommands sync.WaitGroup
waitCommands.Add(len(commands))
for _, command := range commands {
go e.ProcessCommand(slist, command, ins, &waitCommands)
go ins.ProcessCommand(slist, command, &waitCommands)
}
waitCommands.Wait()
}
func (e *Exec) ProcessCommand(slist *list.SafeList, command string, ins ExecInstance, wg *sync.WaitGroup) {
func (ins *Instance) ProcessCommand(slist *list.SafeList, command string, wg *sync.WaitGroup) {
defer wg.Done()
out, errbuf, runErr := commandRun(command, time.Duration(ins.Timeout))

View File

@ -10,7 +10,6 @@ import (
"net/url"
"strings"
"sync"
"sync/atomic"
"time"
"flashcat.cloud/categraf/config"
@ -34,19 +33,19 @@ const (
)
type Instance struct {
Targets []string `toml:"targets"`
Labels map[string]string `toml:"labels"`
IntervalTimes int64 `toml:"interval_times"`
Interface string `toml:"interface"`
Method string `toml:"method"`
ResponseTimeout config.Duration `toml:"response_timeout"`
FollowRedirects bool `toml:"follow_redirects"`
Username string `toml:"username"`
Password string `toml:"password"`
Headers []string `toml:"headers"`
Body string `toml:"body"`
ExpectResponseSubstring string `toml:"expect_response_substring"`
ExpectResponseStatusCode *int `toml:"expect_response_status_code"`
config.InstanceConfig
Targets []string `toml:"targets"`
Interface string `toml:"interface"`
Method string `toml:"method"`
ResponseTimeout config.Duration `toml:"response_timeout"`
FollowRedirects bool `toml:"follow_redirects"`
Username string `toml:"username"`
Password string `toml:"password"`
Headers []string `toml:"headers"`
Body string `toml:"body"`
ExpectResponseSubstring string `toml:"expect_response_substring"`
ExpectResponseStatusCode *int `toml:"expect_response_status_code"`
config.HTTPProxy
tls.ClientConfig
@ -59,7 +58,7 @@ type httpClient interface {
func (ins *Instance) Init() error {
if len(ins.Targets) == 0 {
return nil
return types.ErrInstancesEmpty
}
if ins.ResponseTimeout < config.Duration(time.Second) {
@ -139,8 +138,6 @@ func (ins *Instance) createHTTPClient() (*http.Client, error) {
type HTTPResponse struct {
config.Interval
Instances []*Instance `toml:"instances"`
Counter uint64
wg sync.WaitGroup
}
func init() {
@ -149,53 +146,22 @@ func init() {
})
}
func (h *HTTPResponse) Prefix() string {
return inputName
}
func (h *HTTPResponse) Init() error {
if len(h.Instances) == 0 {
return types.ErrInstancesEmpty
}
func (h *HTTPResponse) Prefix() string { return inputName }
func (h *HTTPResponse) Init() error { return nil }
func (h *HTTPResponse) Drop() {}
func (h *HTTPResponse) Gather(slist *list.SafeList) {}
func (h *HTTPResponse) GetInstances() []inputs.Instance {
ret := make([]inputs.Instance, len(h.Instances))
for i := 0; i < len(h.Instances); i++ {
if err := h.Instances[i].Init(); err != nil {
return err
}
ret[i] = h.Instances[i]
}
return nil
return ret
}
func (h *HTTPResponse) Drop() {}
func (h *HTTPResponse) Gather(slist *list.SafeList) {
atomic.AddUint64(&h.Counter, 1)
for i := range h.Instances {
ins := h.Instances[i]
if len(ins.Targets) == 0 {
continue
}
h.wg.Add(1)
go h.gatherOnce(slist, ins)
}
h.wg.Wait()
}
func (h *HTTPResponse) gatherOnce(slist *list.SafeList, ins *Instance) {
defer h.wg.Done()
if ins.IntervalTimes > 0 {
counter := atomic.LoadUint64(&h.Counter)
if counter%uint64(ins.IntervalTimes) != 0 {
return
}
}
if config.Config.DebugMode {
if len(ins.Targets) == 0 {
log.Println("D! http_response targets empty")
}
func (ins *Instance) Gather(slist *list.SafeList) {
if len(ins.Targets) == 0 {
return
}
wg := new(sync.WaitGroup)

View File

@ -11,6 +11,7 @@ type Input interface {
Prefix() string
GetInterval() config.Duration
Gather(slist *list.SafeList)
GetInstances() []Instance
}
type Creator func() Input
@ -20,3 +21,10 @@ var InputCreators = map[string]Creator{}
func Add(name string, creator Creator) {
InputCreators[name] = creator
}
type Instance interface {
GetLabels() map[string]string
GetIntervalTimes() int64
Init() error
Gather(slist *list.SafeList)
}

View File

@ -1,11 +1,9 @@
package jolokia_agent
import (
"errors"
"fmt"
"log"
"sync"
"sync/atomic"
"time"
"flashcat.cloud/categraf/config"
@ -20,8 +18,6 @@ const inputName = "jolokia_agent"
type JolokiaAgent struct {
config.Interval
counter uint64
waitgrp sync.WaitGroup
Instances []*Instance `toml:"instances"`
}
@ -31,59 +27,21 @@ func init() {
})
}
func (r *JolokiaAgent) Prefix() string {
return ""
}
func (r *JolokiaAgent) Init() error {
if len(r.Instances) == 0 {
return types.ErrInstancesEmpty
}
func (r *JolokiaAgent) Prefix() string { return "" }
func (r *JolokiaAgent) Init() error { return nil }
func (r *JolokiaAgent) Drop() {}
func (r *JolokiaAgent) Gather(slist *list.SafeList) {}
func (r *JolokiaAgent) GetInstances() []inputs.Instance {
ret := make([]inputs.Instance, len(r.Instances))
for i := 0; i < len(r.Instances); i++ {
if err := r.Instances[i].Init(); err != nil {
if !errors.Is(err, types.ErrInstancesEmpty) {
return err
}
}
ret[i] = r.Instances[i]
}
return nil
}
func (r *JolokiaAgent) Drop() {}
func (r *JolokiaAgent) Gather(slist *list.SafeList) {
atomic.AddUint64(&r.counter, 1)
for i := range r.Instances {
ins := r.Instances[i]
if len(ins.URLs) == 0 {
continue
}
r.waitgrp.Add(1)
go func(slist *list.SafeList, ins *Instance) {
defer r.waitgrp.Done()
if ins.IntervalTimes > 0 {
counter := atomic.LoadUint64(&r.counter)
if counter%uint64(ins.IntervalTimes) != 0 {
return
}
}
ins.gatherOnce(slist)
}(slist, ins)
}
r.waitgrp.Wait()
return ret
}
type Instance struct {
Labels map[string]string `toml:"labels"`
IntervalTimes int64 `toml:"interval_times"`
config.InstanceConfig
URLs []string `toml:"urls"`
Username string `toml:"username"`
@ -102,7 +60,7 @@ type Instance struct {
func (ins *Instance) Init() error {
if len(ins.URLs) == 0 {
return nil
return types.ErrInstancesEmpty
}
if ins.DefaultFieldSeparator == "" {
@ -112,7 +70,7 @@ func (ins *Instance) Init() error {
return nil
}
func (ins *Instance) gatherOnce(slist *list.SafeList) {
func (ins *Instance) Gather(slist *list.SafeList) {
if ins.gatherer == nil {
ins.gatherer = jolokia.NewGatherer(ins.createMetrics())
}

View File

@ -1,11 +1,8 @@
package jolokia_proxy
import (
"errors"
"fmt"
"log"
"sync"
"sync/atomic"
"time"
"flashcat.cloud/categraf/config"
@ -20,8 +17,6 @@ const inputName = "jolokia_proxy"
type JolokiaProxy struct {
config.Interval
counter uint64
waitgrp sync.WaitGroup
Instances []*Instance `toml:"instances"`
}
@ -31,54 +26,17 @@ func init() {
})
}
func (r *JolokiaProxy) Prefix() string {
return ""
}
func (r *JolokiaProxy) Init() error {
if len(r.Instances) == 0 {
return types.ErrInstancesEmpty
}
func (r *JolokiaProxy) Prefix() string { return "" }
func (r *JolokiaProxy) Init() error { return nil }
func (r *JolokiaProxy) Drop() {}
func (r *JolokiaProxy) Gather(slist *list.SafeList) {}
func (r *JolokiaProxy) GetInstances() []inputs.Instance {
ret := make([]inputs.Instance, len(r.Instances))
for i := 0; i < len(r.Instances); i++ {
if err := r.Instances[i].Init(); err != nil {
if !errors.Is(err, types.ErrInstancesEmpty) {
return err
}
}
ret[i] = r.Instances[i]
}
return nil
}
func (r *JolokiaProxy) Drop() {}
func (r *JolokiaProxy) Gather(slist *list.SafeList) {
atomic.AddUint64(&r.counter, 1)
for i := range r.Instances {
ins := r.Instances[i]
if len(ins.URL) == 0 {
continue
}
r.waitgrp.Add(1)
go func(slist *list.SafeList, ins *Instance) {
defer r.waitgrp.Done()
if ins.IntervalTimes > 0 {
counter := atomic.LoadUint64(&r.counter)
if counter%uint64(ins.IntervalTimes) != 0 {
return
}
}
ins.gatherOnce(slist)
}(slist, ins)
}
r.waitgrp.Wait()
return ret
}
type JolokiaProxyTargetConfig struct {
@ -88,8 +46,7 @@ type JolokiaProxyTargetConfig struct {
}
type Instance struct {
Labels map[string]string `toml:"labels"`
IntervalTimes int64 `toml:"interval_times"`
config.InstanceConfig
URL string `toml:"url"`
Username string `toml:"username"`
@ -112,7 +69,7 @@ type Instance struct {
func (ins *Instance) Init() error {
if len(ins.URL) == 0 {
return nil
return types.ErrInstancesEmpty
}
if ins.DefaultFieldSeparator == "" {
@ -122,7 +79,7 @@ func (ins *Instance) Init() error {
return nil
}
func (ins *Instance) gatherOnce(slist *list.SafeList) {
func (ins *Instance) Gather(slist *list.SafeList) {
if ins.gatherer == nil {
ins.gatherer = jolokia.NewGatherer(ins.createMetrics())
}

View File

@ -5,8 +5,6 @@ import (
"log"
"os"
"strings"
"sync"
"sync/atomic"
"flashcat.cloud/categraf/config"
"flashcat.cloud/categraf/inputs"
@ -23,8 +21,6 @@ const inputName = "kafka"
type Kafka struct {
config.Interval
counter uint64
waitgrp sync.WaitGroup
Instances []*Instance `toml:"instances"`
}
@ -34,22 +30,16 @@ func init() {
})
}
func (r *Kafka) Prefix() string {
return ""
}
func (r *Kafka) Init() error {
if len(r.Instances) == 0 {
return types.ErrInstancesEmpty
}
func (r *Kafka) Prefix() string { return "" }
func (r *Kafka) Init() error { return nil }
func (r *Kafka) Gather(slist *list.SafeList) {}
func (r *Kafka) GetInstances() []inputs.Instance {
ret := make([]inputs.Instance, len(r.Instances))
for i := 0; i < len(r.Instances); i++ {
if err := r.Instances[i].Init(); err != nil {
return err
}
ret[i] = r.Instances[i]
}
return nil
return ret
}
func (r *Kafka) Drop() {
@ -64,38 +54,10 @@ func (r *Kafka) Drop() {
}
}
func (r *Kafka) Gather(slist *list.SafeList) {
atomic.AddUint64(&r.counter, 1)
for i := range r.Instances {
ins := r.Instances[i]
if len(ins.KafkaURIs) == 0 {
continue
}
r.waitgrp.Add(1)
go func(slist *list.SafeList, ins *Instance) {
defer r.waitgrp.Done()
if ins.IntervalTimes > 0 {
counter := atomic.LoadUint64(&r.counter)
if counter%uint64(ins.IntervalTimes) != 0 {
return
}
}
ins.gatherOnce(slist)
}(slist, ins)
}
r.waitgrp.Wait()
}
type Instance struct {
Labels map[string]string `toml:"labels"`
IntervalTimes int64 `toml:"interval_times"`
LogLevel string `toml:"log_level"`
config.InstanceConfig
LogLevel string `toml:"log_level"`
// Address (host:port) of Kafka server.
KafkaURIs []string `toml:"kafka_uris,omitempty"`
@ -163,7 +125,7 @@ type Instance struct {
func (ins *Instance) Init() error {
if len(ins.KafkaURIs) == 0 || ins.KafkaURIs[0] == "" {
return nil
return types.ErrInstancesEmpty
}
if ins.UseTLS && (ins.CertFile == "" || ins.KeyFile == "") {
@ -250,7 +212,7 @@ func (ins *Instance) Init() error {
return nil
}
func (ins *Instance) gatherOnce(slist *list.SafeList) {
func (ins *Instance) Gather(slist *list.SafeList) {
err := inputs.Collect(ins.e, slist)
if err != nil {
log.Println("E! failed to collect metrics:", err)

View File

@ -43,15 +43,10 @@ func init() {
})
}
func (s *KernelStats) Prefix() string {
return inputName
}
func (s *KernelStats) Init() error {
return nil
}
func (s *KernelStats) Drop() {}
func (s *KernelStats) Prefix() string { return inputName }
func (s *KernelStats) Init() error { return nil }
func (s *KernelStats) Drop() {}
func (s *KernelStats) GetInstances() []inputs.Instance { return nil }
func (s *KernelStats) Gather(slist *list.SafeList) {
data, err := s.getProcStat()

View File

@ -32,15 +32,10 @@ func init() {
})
}
func (s *KernelVmstat) Prefix() string {
return inputName
}
func (s *KernelVmstat) Init() error {
return nil
}
func (s *KernelVmstat) Drop() {}
func (s *KernelVmstat) Prefix() string { return inputName }
func (s *KernelVmstat) Init() error { return nil }
func (s *KernelVmstat) Drop() {}
func (s *KernelVmstat) GetInstances() []inputs.Instance { return nil }
func (s *KernelVmstat) Gather(slist *list.SafeList) {
data, err := s.getProcVmstat()

View File

@ -7,8 +7,6 @@ import (
"net/http"
"os"
"strings"
"sync"
"sync/atomic"
"time"
"flashcat.cloud/categraf/config"
@ -26,8 +24,6 @@ const (
type Kubernetes struct {
config.Interval
counter uint64
waitgrp sync.WaitGroup
Instances []*Instance `toml:"instances"`
}
@ -37,57 +33,21 @@ func init() {
})
}
func (k *Kubernetes) Prefix() string {
return inputName
}
func (k *Kubernetes) Init() error {
if len(k.Instances) == 0 {
return types.ErrInstancesEmpty
}
func (k *Kubernetes) Prefix() string { return inputName }
func (k *Kubernetes) Init() error { return nil }
func (k *Kubernetes) Drop() {}
func (k *Kubernetes) Gather(slist *list.SafeList) {}
func (k *Kubernetes) GetInstances() []inputs.Instance {
ret := make([]inputs.Instance, len(k.Instances))
for i := 0; i < len(k.Instances); i++ {
if err := k.Instances[i].Init(); err != nil {
return err
}
ret[i] = k.Instances[i]
}
return nil
}
func (k *Kubernetes) Drop() {}
func (k *Kubernetes) Gather(slist *list.SafeList) {
atomic.AddUint64(&k.counter, 1)
for i := range k.Instances {
ins := k.Instances[i]
if ins.URL == "" {
continue
}
k.waitgrp.Add(1)
go func(slist *list.SafeList, ins *Instance) {
defer k.waitgrp.Done()
if ins.IntervalTimes > 0 {
counter := atomic.LoadUint64(&k.counter)
if counter%uint64(ins.IntervalTimes) != 0 {
return
}
}
ins.gatherOnce(slist)
}(slist, ins)
}
k.waitgrp.Wait()
return ret
}
type Instance struct {
Labels map[string]string `toml:"labels"`
IntervalTimes int64 `toml:"interval_times"`
config.InstanceConfig
URL string
@ -116,7 +76,7 @@ type Instance struct {
func (ins *Instance) Init() error {
if ins.URL == "" {
return nil
return types.ErrInstancesEmpty
}
ins.URL = os.Expand(ins.URL, config.GetEnv)
@ -143,7 +103,7 @@ func (ins *Instance) Init() error {
return nil
}
func (ins *Instance) gatherOnce(slist *list.SafeList) {
func (ins *Instance) Gather(slist *list.SafeList) {
summaryMetrics := &SummaryMetrics{}
urlpath := fmt.Sprintf("%s/stats/summary", ins.URL)
err := ins.LoadJSON(urlpath, summaryMetrics)

View File

@ -33,15 +33,10 @@ func init() {
})
}
func (s *SysctlFS) Prefix() string {
return inputName
}
func (s *SysctlFS) Init() error {
return nil
}
func (s *SysctlFS) Drop() {}
func (s *SysctlFS) Prefix() string { return inputName }
func (s *SysctlFS) Init() error { return nil }
func (s *SysctlFS) Drop() {}
func (s *SysctlFS) GetInstances() []inputs.Instance { return nil }
func (s *SysctlFS) Gather(slist *list.SafeList) {
fields := map[string]interface{}{}

View File

@ -8,8 +8,6 @@ import (
"net/http"
"net/url"
"strings"
"sync"
"sync/atomic"
"time"
"flashcat.cloud/categraf/config"
@ -26,8 +24,6 @@ const inputName = "logstash"
type Logstash struct {
config.Interval
counter uint64
waitgrp sync.WaitGroup
Instances []*Instance `toml:"instances"`
}
@ -37,57 +33,21 @@ func init() {
})
}
func (r *Logstash) Prefix() string {
return inputName
}
func (l *Logstash) Prefix() string { return inputName }
func (l *Logstash) Init() error { return nil }
func (l *Logstash) Drop() {}
func (l *Logstash) Gather(slist *list.SafeList) {}
func (r *Logstash) Init() error {
if len(r.Instances) == 0 {
return types.ErrInstancesEmpty
func (l *Logstash) GetInstances() []inputs.Instance {
ret := make([]inputs.Instance, len(l.Instances))
for i := 0; i < len(l.Instances); i++ {
ret[i] = l.Instances[i]
}
for i := 0; i < len(r.Instances); i++ {
if err := r.Instances[i].Init(); err != nil {
return err
}
}
return nil
}
func (r *Logstash) Drop() {}
func (r *Logstash) Gather(slist *list.SafeList) {
atomic.AddUint64(&r.counter, 1)
for i := range r.Instances {
ins := r.Instances[i]
if len(ins.URL) == 0 {
continue
}
r.waitgrp.Add(1)
go func(slist *list.SafeList, ins *Instance) {
defer r.waitgrp.Done()
if ins.IntervalTimes > 0 {
counter := atomic.LoadUint64(&r.counter)
if counter%uint64(ins.IntervalTimes) != 0 {
return
}
}
ins.gatherOnce(slist)
}(slist, ins)
}
r.waitgrp.Wait()
return ret
}
type Instance struct {
Labels map[string]string `toml:"labels"`
IntervalTimes int64 `toml:"interval_times"`
config.InstanceConfig
URL string `toml:"url"`
SinglePipeline bool `toml:"single_pipeline"`
@ -171,7 +131,7 @@ const pipelineStats = "/_node/stats/pipeline"
func (ins *Instance) Init() error {
if len(ins.URL) == 0 {
return nil
return types.ErrInstancesEmpty
}
client, err := ins.createHTTPClient()
@ -187,7 +147,7 @@ func (ins *Instance) Init() error {
return nil
}
func (ins *Instance) gatherOnce(slist *list.SafeList) {
func (ins *Instance) Gather(slist *list.SafeList) {
if choice.Contains("jvm", ins.Collect) {
jvmURL, err := url.Parse(ins.URL + jvmStats)
if err != nil {

View File

@ -13,8 +13,7 @@ import (
const inputName = "mem"
type MemStats struct {
ps system.PS
platform string
ps system.PS
config.Interval
CollectPlatformFields bool `toml:"collect_platform_fields"`
@ -29,16 +28,10 @@ func init() {
})
}
func (s *MemStats) Prefix() string {
return inputName
}
func (s *MemStats) Drop() {}
func (s *MemStats) Init() error {
s.platform = runtime.GOOS
return nil
}
func (s *MemStats) Prefix() string { return inputName }
func (s *MemStats) Init() error { return nil }
func (s *MemStats) Drop() {}
func (s *MemStats) GetInstances() []inputs.Instance { return nil }
func (s *MemStats) Gather(slist *list.SafeList) {
vm, err := s.ps.VMStat()
@ -56,7 +49,7 @@ func (s *MemStats) Gather(slist *list.SafeList) {
}
if s.CollectPlatformFields {
switch s.platform {
switch runtime.GOOS {
case "darwin":
fields["active"] = vm.Active
fields["free"] = vm.Free

View File

@ -4,8 +4,6 @@ import (
"errors"
"fmt"
"log"
"sync"
"sync/atomic"
"flashcat.cloud/categraf/config"
"flashcat.cloud/categraf/inputs"
@ -19,8 +17,6 @@ const inputName = "mongodb"
type MongoDB struct {
config.Interval
counter uint64
waitgrp sync.WaitGroup
Instances []*Instance `toml:"instances"`
}
@ -30,22 +26,16 @@ func init() {
})
}
func (r *MongoDB) Prefix() string {
return ""
}
func (r *MongoDB) Init() error {
if len(r.Instances) == 0 {
return types.ErrInstancesEmpty
}
func (r *MongoDB) Prefix() string { return "" }
func (r *MongoDB) Init() error { return nil }
func (r *MongoDB) Gather(slist *list.SafeList) {}
func (r *MongoDB) GetInstances() []inputs.Instance {
ret := make([]inputs.Instance, len(r.Instances))
for i := 0; i < len(r.Instances); i++ {
if err := r.Instances[i].Init(); err != nil {
return err
}
ret[i] = r.Instances[i]
}
return nil
return ret
}
func (r *MongoDB) Drop() {
@ -60,38 +50,10 @@ func (r *MongoDB) Drop() {
}
}
func (r *MongoDB) Gather(slist *list.SafeList) {
atomic.AddUint64(&r.counter, 1)
for i := range r.Instances {
ins := r.Instances[i]
if len(ins.MongodbURI) == 0 {
continue
}
r.waitgrp.Add(1)
go func(slist *list.SafeList, ins *Instance) {
defer r.waitgrp.Done()
if ins.IntervalTimes > 0 {
counter := atomic.LoadUint64(&r.counter)
if counter%uint64(ins.IntervalTimes) != 0 {
return
}
}
ins.gatherOnce(slist)
}(slist, ins)
}
r.waitgrp.Wait()
}
type Instance struct {
Labels map[string]string `toml:"labels"`
IntervalTimes int64 `toml:"interval_times"`
LogLevel string `toml:"log_level"`
config.InstanceConfig
LogLevel string `toml:"log_level"`
// Address (host:port) of MongoDB server.
MongodbURI string `toml:"mongodb_uri,omitempty"`
@ -117,7 +79,7 @@ type Instance struct {
func (ins *Instance) Init() error {
if len(ins.MongodbURI) == 0 {
return nil
return types.ErrInstancesEmpty
}
if len(ins.LogLevel) == 0 {
@ -167,7 +129,7 @@ func (ins *Instance) Init() error {
return nil
}
func (ins *Instance) gatherOnce(slist *list.SafeList) {
func (ins *Instance) Gather(slist *list.SafeList) {
err := inputs.Collect(ins.e, slist, ins.Labels)
if err != nil {
log.Println("E! failed to collect metrics:", err)

View File

@ -11,7 +11,7 @@ import (
"github.com/toolkits/pkg/container/list"
)
func (m *MySQL) gatherBinlog(slist *list.SafeList, ins *Instance, db *sql.DB, globalTags map[string]string) {
func (ins *Instance) gatherBinlog(slist *list.SafeList, db *sql.DB, globalTags map[string]string) {
var logBin uint8
err := db.QueryRow(`SELECT @@log_bin`).Scan(&logBin)
if err != nil {

View File

@ -14,17 +14,17 @@ import (
"github.com/toolkits/pkg/container/list"
)
func (m *MySQL) gatherCustomQueries(slist *list.SafeList, ins *Instance, db *sql.DB, globalTags map[string]string) {
func (ins *Instance) gatherCustomQueries(slist *list.SafeList, db *sql.DB, globalTags map[string]string) {
wg := new(sync.WaitGroup)
defer wg.Wait()
for i := 0; i < len(ins.Queries); i++ {
wg.Add(1)
go m.gatherOneQuery(slist, ins, db, globalTags, wg, ins.Queries[i])
go ins.gatherOneQuery(slist, db, globalTags, wg, ins.Queries[i])
}
}
func (m *MySQL) gatherOneQuery(slist *list.SafeList, ins *Instance, db *sql.DB, globalTags map[string]string, wg *sync.WaitGroup, query QueryConfig) {
func (ins *Instance) gatherOneQuery(slist *list.SafeList, db *sql.DB, globalTags map[string]string, wg *sync.WaitGroup, query QueryConfig) {
defer wg.Done()
timeout := time.Duration(query.Timeout)
@ -69,13 +69,13 @@ func (m *MySQL) gatherOneQuery(slist *list.SafeList, ins *Instance, db *sql.DB,
row[strings.ToLower(colName)] = string(*val)
}
if err = m.parseRow(row, query, slist, globalTags); err != nil {
if err = ins.parseRow(row, query, slist, globalTags); err != nil {
log.Println("E! failed to parse row:", err, "sql:", query.Request)
}
}
}
func (m *MySQL) parseRow(row map[string]string, query QueryConfig, slist *list.SafeList, globalTags map[string]string) error {
func (ins *Instance) parseRow(row map[string]string, query QueryConfig, slist *list.SafeList, globalTags map[string]string) error {
labels := tagx.Copy(globalTags)
for _, label := range query.LabelFields {

View File

@ -12,7 +12,7 @@ import (
"github.com/toolkits/pkg/container/list"
)
func (m *MySQL) gatherEngineInnodbStatus(slist *list.SafeList, ins *Instance, db *sql.DB, globalTags map[string]string, cache map[string]float64) {
func (ins *Instance) gatherEngineInnodbStatus(slist *list.SafeList, db *sql.DB, globalTags map[string]string, cache map[string]float64) {
rows, err := db.Query(SQL_ENGINE_INNODB_STATUS)
if err != nil {
log.Println("E! failed to query engine innodb status:", err)

View File

@ -8,7 +8,7 @@ import (
"github.com/toolkits/pkg/container/list"
)
func (m *MySQL) gatherEngineInnodbStatusCompute(slist *list.SafeList, ins *Instance, db *sql.DB, globalTags map[string]string, cache map[string]float64) {
func (ins *Instance) gatherEngineInnodbStatusCompute(slist *list.SafeList, db *sql.DB, globalTags map[string]string, cache map[string]float64) {
tags := tagx.Copy(globalTags)
pageUsed := cache["innodb_buffer_pool_pages_total"] - cache["innodb_buffer_pool_pages_free"]

View File

@ -16,7 +16,7 @@ import (
// Regexp to match various groups of status vars.
var globalStatusRE = regexp.MustCompile(`^(com|handler|connection_errors|innodb_buffer_pool_pages|innodb_rows|performance_schema)_(.*)$`)
func (m *MySQL) gatherGlobalStatus(slist *list.SafeList, ins *Instance, db *sql.DB, globalTags map[string]string, cache map[string]float64) {
func (ins *Instance) gatherGlobalStatus(slist *list.SafeList, db *sql.DB, globalTags map[string]string, cache map[string]float64) {
rows, err := db.Query(SQL_GLOBAL_STATUS)
if err != nil {
log.Println("E! failed to query global status:", err)

View File

@ -12,7 +12,7 @@ import (
"github.com/toolkits/pkg/container/list"
)
func (m *MySQL) gatherGlobalVariables(slist *list.SafeList, ins *Instance, db *sql.DB, globalTags map[string]string, cache map[string]float64) {
func (ins *Instance) gatherGlobalVariables(slist *list.SafeList, db *sql.DB, globalTags map[string]string, cache map[string]float64) {
rows, err := db.Query(SQL_GLOBAL_VARIABLES)
if err != nil {
log.Println("E! failed to query global variables:", err)

View File

@ -4,8 +4,6 @@ import (
"database/sql"
"fmt"
"log"
"sync"
"sync/atomic"
"time"
"flashcat.cloud/categraf/config"
@ -28,15 +26,15 @@ type QueryConfig struct {
}
type Instance struct {
config.InstanceConfig
Address string `toml:"address"`
Username string `toml:"username"`
Password string `toml:"password"`
Parameters string `toml:"parameters"`
TimeoutSeconds int64 `toml:"timeout_seconds"`
Labels map[string]string `toml:"labels"`
IntervalTimes int64 `toml:"interval_times"`
Queries []QueryConfig `toml:"queries"`
Queries []QueryConfig `toml:"queries"`
ExtraStatusMetrics bool `toml:"extra_status_metrics"`
ExtraInnodbMetrics bool `toml:"extra_innodb_metrics"`
@ -54,7 +52,7 @@ type Instance struct {
func (ins *Instance) Init() error {
if ins.Address == "" {
return nil
return types.ErrInstancesEmpty
}
if ins.UseTLS {
@ -153,9 +151,6 @@ func (ins *Instance) InitValidMetrics() {
type MySQL struct {
config.Interval
Instances []*Instance `toml:"instances"`
Counter uint64
wg sync.WaitGroup
}
func init() {
@ -164,49 +159,20 @@ func init() {
})
}
func (m *MySQL) Prefix() string {
return inputName
}
func (m *MySQL) Init() error {
if len(m.Instances) == 0 {
return types.ErrInstancesEmpty
}
func (m *MySQL) Prefix() string { return inputName }
func (m *MySQL) Init() error { return nil }
func (m *MySQL) Drop() {}
func (m *MySQL) Gather(slist *list.SafeList) {}
func (m *MySQL) GetInstances() []inputs.Instance {
ret := make([]inputs.Instance, len(m.Instances))
for i := 0; i < len(m.Instances); i++ {
if err := m.Instances[i].Init(); err != nil {
return err
}
ret[i] = m.Instances[i]
}
return nil
return ret
}
func (m *MySQL) Drop() {}
func (m *MySQL) Gather(slist *list.SafeList) {
atomic.AddUint64(&m.Counter, 1)
for i := range m.Instances {
ins := m.Instances[i]
if len(ins.Address) == 0 {
continue
}
m.wg.Add(1)
go m.gatherOnce(slist, ins)
}
m.wg.Wait()
}
func (m *MySQL) gatherOnce(slist *list.SafeList, ins *Instance) {
defer m.wg.Done()
if ins.IntervalTimes > 0 {
counter := atomic.LoadUint64(&m.Counter)
if counter%uint64(ins.IntervalTimes) != 0 {
return
}
}
func (ins *Instance) Gather(slist *list.SafeList) {
tags := map[string]string{"address": ins.Address}
for k, v := range ins.Labels {
tags[k] = v
@ -243,16 +209,16 @@ func (m *MySQL) gatherOnce(slist *list.SafeList, ins *Instance) {
cache := make(map[string]float64)
m.gatherGlobalStatus(slist, ins, db, tags, cache)
m.gatherGlobalVariables(slist, ins, db, tags, cache)
m.gatherEngineInnodbStatus(slist, ins, db, tags, cache)
m.gatherEngineInnodbStatusCompute(slist, ins, db, tags, cache)
m.gatherBinlog(slist, ins, db, tags)
m.gatherProcesslistByState(slist, ins, db, tags)
m.gatherProcesslistByUser(slist, ins, db, tags)
m.gatherSchemaSize(slist, ins, db, tags)
m.gatherTableSize(slist, ins, db, tags, false)
m.gatherTableSize(slist, ins, db, tags, true)
m.gatherSlaveStatus(slist, ins, db, tags)
m.gatherCustomQueries(slist, ins, db, tags)
ins.gatherGlobalStatus(slist, db, tags, cache)
ins.gatherGlobalVariables(slist, db, tags, cache)
ins.gatherEngineInnodbStatus(slist, db, tags, cache)
ins.gatherEngineInnodbStatusCompute(slist, db, tags, cache)
ins.gatherBinlog(slist, db, tags)
ins.gatherProcesslistByState(slist, db, tags)
ins.gatherProcesslistByUser(slist, db, tags)
ins.gatherSchemaSize(slist, db, tags)
ins.gatherTableSize(slist, db, tags, false)
ins.gatherTableSize(slist, db, tags, true)
ins.gatherSlaveStatus(slist, db, tags)
ins.gatherCustomQueries(slist, db, tags)
}

View File

@ -90,7 +90,7 @@ var (
}
)
func (m *MySQL) gatherProcesslistByState(slist *list.SafeList, ins *Instance, db *sql.DB, globalTags map[string]string) {
func (ins *Instance) gatherProcesslistByState(slist *list.SafeList, db *sql.DB, globalTags map[string]string) {
if !ins.GatherProcessListProcessByState {
return
}

View File

@ -9,7 +9,7 @@ import (
"github.com/toolkits/pkg/container/list"
)
func (m *MySQL) gatherProcesslistByUser(slist *list.SafeList, ins *Instance, db *sql.DB, globalTags map[string]string) {
func (ins *Instance) gatherProcesslistByUser(slist *list.SafeList, db *sql.DB, globalTags map[string]string) {
if !ins.GatherProcessListProcessByUser {
return
}

View File

@ -9,7 +9,7 @@ import (
"github.com/toolkits/pkg/container/list"
)
func (m *MySQL) gatherSchemaSize(slist *list.SafeList, ins *Instance, db *sql.DB, globalTags map[string]string) {
func (ins *Instance) gatherSchemaSize(slist *list.SafeList, db *sql.DB, globalTags map[string]string) {
if !ins.GatherSchemaSize {
return
}

View File

@ -31,7 +31,7 @@ func querySlaveStatus(db *sql.DB) (rows *sql.Rows, err error) {
return
}
func (m *MySQL) gatherSlaveStatus(slist *list.SafeList, ins *Instance, db *sql.DB, globalTags map[string]string) {
func (ins *Instance) gatherSlaveStatus(slist *list.SafeList, db *sql.DB, globalTags map[string]string) {
if !ins.GatherSlaveStatus {
return
}

View File

@ -9,7 +9,7 @@ import (
"github.com/toolkits/pkg/container/list"
)
func (m *MySQL) gatherTableSize(slist *list.SafeList, ins *Instance, db *sql.DB, globalTags map[string]string, isSystem bool) {
func (ins *Instance) gatherTableSize(slist *list.SafeList, db *sql.DB, globalTags map[string]string, isSystem bool) {
query := SQL_QUERY_TABLE_SIZE
if isSystem {
query = SQL_QUERY_SYSTEM_TABLE_SIZE

View File

@ -33,11 +33,9 @@ func init() {
})
}
func (s *NetIOStats) Prefix() string {
return inputName
}
func (s *NetIOStats) Drop() {}
func (s *NetIOStats) Prefix() string { return inputName }
func (s *NetIOStats) Drop() {}
func (s *NetIOStats) GetInstances() []inputs.Instance { return nil }
func (s *NetIOStats) Init() error {
var err error

View File

@ -9,7 +9,6 @@ import (
"net/textproto"
"strings"
"sync"
"sync/atomic"
"time"
"flashcat.cloud/categraf/config"
@ -29,19 +28,19 @@ const (
)
type Instance struct {
Targets []string `toml:"targets"`
Labels map[string]string `toml:"labels"`
IntervalTimes int64 `toml:"interval_times"`
Protocol string `toml:"protocol"`
Timeout config.Duration `toml:"timeout"`
ReadTimeout config.Duration `toml:"read_timeout"`
Send string `toml:"send"`
Expect string `toml:"expect"`
config.InstanceConfig
Targets []string `toml:"targets"`
Protocol string `toml:"protocol"`
Timeout config.Duration `toml:"timeout"`
ReadTimeout config.Duration `toml:"read_timeout"`
Send string `toml:"send"`
Expect string `toml:"expect"`
}
func (ins *Instance) Init() error {
if len(ins.Targets) == 0 {
return nil
return types.ErrInstancesEmpty
}
if ins.Protocol == "" {
@ -87,8 +86,6 @@ func (ins *Instance) Init() error {
type NetResponse struct {
config.Interval
Instances []*Instance `toml:"instances"`
Counter uint64
wg sync.WaitGroup
}
func init() {
@ -97,53 +94,22 @@ func init() {
})
}
func (n *NetResponse) Prefix() string {
return inputName
}
func (n *NetResponse) Init() error {
if len(n.Instances) == 0 {
return types.ErrInstancesEmpty
}
func (n *NetResponse) Prefix() string { return inputName }
func (n *NetResponse) Init() error { return nil }
func (n *NetResponse) Drop() {}
func (n *NetResponse) Gather(slist *list.SafeList) {}
func (n *NetResponse) GetInstances() []inputs.Instance {
ret := make([]inputs.Instance, len(n.Instances))
for i := 0; i < len(n.Instances); i++ {
if err := n.Instances[i].Init(); err != nil {
return err
}
ret[i] = n.Instances[i]
}
return nil
return ret
}
func (n *NetResponse) Drop() {}
func (n *NetResponse) Gather(slist *list.SafeList) {
atomic.AddUint64(&n.Counter, 1)
for i := range n.Instances {
ins := n.Instances[i]
if len(ins.Targets) == 0 {
continue
}
n.wg.Add(1)
go n.gatherOnce(slist, ins)
}
n.wg.Wait()
}
func (n *NetResponse) gatherOnce(slist *list.SafeList, ins *Instance) {
defer n.wg.Done()
if ins.IntervalTimes > 0 {
counter := atomic.LoadUint64(&n.Counter)
if counter%uint64(ins.IntervalTimes) != 0 {
return
}
}
if config.Config.DebugMode {
if len(ins.Targets) == 0 {
log.Println("D! net_response targets empty")
}
func (ins *Instance) Gather(slist *list.SafeList) {
if len(ins.Targets) == 0 {
return
}
wg := new(sync.WaitGroup)

View File

@ -26,15 +26,10 @@ func init() {
})
}
func (s *NetStats) Prefix() string {
return inputName
}
func (s *NetStats) Drop() {}
func (s *NetStats) Init() error {
return nil
}
func (s *NetStats) Prefix() string { return inputName }
func (s *NetStats) Init() error { return nil }
func (s *NetStats) Drop() {}
func (s *NetStats) GetInstances() []inputs.Instance { return nil }
func (s *NetStats) Gather(slist *list.SafeList) {
netconns, err := s.ps.NetConnections()

View File

@ -10,7 +10,6 @@ import (
"net/url"
"strconv"
"sync"
"sync/atomic"
"time"
"flashcat.cloud/categraf/config"
@ -25,8 +24,6 @@ const inputName = "nginx_upstream_check"
type NginxUpstreamCheck struct {
config.Interval
counter uint64
waitgrp sync.WaitGroup
Instances []*Instance `toml:"instances"`
}
@ -36,57 +33,21 @@ func init() {
})
}
func (r *NginxUpstreamCheck) Prefix() string {
return inputName
}
func (r *NginxUpstreamCheck) Init() error {
if len(r.Instances) == 0 {
return types.ErrInstancesEmpty
}
func (r *NginxUpstreamCheck) Prefix() string { return inputName }
func (r *NginxUpstreamCheck) Init() error { return nil }
func (r *NginxUpstreamCheck) Drop() {}
func (r *NginxUpstreamCheck) Gather(slist *list.SafeList) {}
func (r *NginxUpstreamCheck) GetInstances() []inputs.Instance {
ret := make([]inputs.Instance, len(r.Instances))
for i := 0; i < len(r.Instances); i++ {
if err := r.Instances[i].Init(); err != nil {
return err
}
ret[i] = r.Instances[i]
}
return nil
}
func (r *NginxUpstreamCheck) Drop() {}
func (r *NginxUpstreamCheck) Gather(slist *list.SafeList) {
atomic.AddUint64(&r.counter, 1)
for i := range r.Instances {
ins := r.Instances[i]
if len(ins.Targets) == 0 {
continue
}
r.waitgrp.Add(1)
go func(slist *list.SafeList, ins *Instance) {
defer r.waitgrp.Done()
if ins.IntervalTimes > 0 {
counter := atomic.LoadUint64(&r.counter)
if counter%uint64(ins.IntervalTimes) != 0 {
return
}
}
ins.gatherOnce(slist)
}(slist, ins)
}
r.waitgrp.Wait()
return ret
}
type Instance struct {
Labels map[string]string `toml:"labels"`
IntervalTimes int64 `toml:"interval_times"`
config.InstanceConfig
Targets []string `toml:"targets"`
Interface string `toml:"interface"`
@ -108,7 +69,7 @@ type httpClient interface {
func (ins *Instance) Init() error {
if len(ins.Targets) == 0 {
return nil
return types.ErrInstancesEmpty
}
if ins.Timeout < config.Duration(time.Second) {
@ -188,7 +149,7 @@ func (ins *Instance) createHTTPClient() (*http.Client, error) {
return client, nil
}
func (ins *Instance) gatherOnce(slist *list.SafeList) {
func (ins *Instance) Gather(slist *list.SafeList) {
wg := new(sync.WaitGroup)
for _, target := range ins.Targets {
wg.Add(1)

View File

@ -25,11 +25,9 @@ func init() {
})
}
func (n *NTPStat) Prefix() string {
return inputName
}
func (n *NTPStat) Drop() {}
func (n *NTPStat) Prefix() string { return inputName }
func (n *NTPStat) Drop() {}
func (n *NTPStat) GetInstances() []inputs.Instance { return nil }
func (n *NTPStat) Init() error {
if len(n.NTPServers) == 0 {

View File

@ -31,11 +31,9 @@ func init() {
})
}
func (s *GPUStats) Prefix() string {
return inputName
}
func (s *GPUStats) Drop() {}
func (s *GPUStats) Prefix() string { return inputName }
func (s *GPUStats) Drop() {}
func (s *GPUStats) GetInstances() []inputs.Instance { return nil }
func (s *GPUStats) Init() error {
if s.NvidiaSmiCommand == "" {

View File

@ -25,16 +25,18 @@ import (
const inputName = "oracle"
type Instance struct {
Address string `toml:"address"`
Username string `toml:"username"`
Password string `toml:"password"`
IsSysDBA bool `toml:"is_sys_dba"`
IsSysOper bool `toml:"is_sys_oper"`
DisableConnectionPool bool `toml:"disable_connection_pool"`
MaxOpenConnections int `toml:"max_open_connections"`
Labels map[string]string `toml:"labels"`
IntervalTimes int64 `toml:"interval_times"`
Metrics []MetricConfig `toml:"metrics"`
config.InstanceConfig
Address string `toml:"address"`
Username string `toml:"username"`
Password string `toml:"password"`
IsSysDBA bool `toml:"is_sys_dba"`
IsSysOper bool `toml:"is_sys_oper"`
DisableConnectionPool bool `toml:"disable_connection_pool"`
MaxOpenConnections int `toml:"max_open_connections"`
Metrics []MetricConfig `toml:"metrics"`
client *sqlx.DB
}
type MetricConfig struct {
@ -49,12 +51,8 @@ type MetricConfig struct {
type Oracle struct {
config.Interval
Instances []Instance `toml:"instances"`
Instances []*Instance `toml:"instances"`
Metrics []MetricConfig `toml:"metrics"`
dbconnpool map[string]*sqlx.DB // key: instance
Counter uint64
wg sync.WaitGroup
}
func init() {
@ -63,67 +61,50 @@ func init() {
})
}
func (o *Oracle) Prefix() string {
return inputName
func (o *Oracle) Prefix() string { return inputName }
func (o *Oracle) Init() error { return nil }
func (o *Oracle) Gather(slist *list.SafeList) {}
func (o *Oracle) Drop() {
for i := 0; i < len(o.Instances); i++ {
o.Instances[i].Drop()
}
}
func (o *Oracle) Init() error {
if len(o.Instances) == 0 {
func (o *Oracle) GetInstances() []inputs.Instance {
ret := make([]inputs.Instance, len(o.Instances))
for i := 0; i < len(o.Instances); i++ {
ret[i] = o.Instances[i]
}
return ret
}
func (ins *Instance) Init() error {
if ins.Address == "" {
return types.ErrInstancesEmpty
}
o.dbconnpool = make(map[string]*sqlx.DB)
for i := 0; i < len(o.Instances); i++ {
dbConf := o.Instances[i]
if dbConf.Address == "" {
continue
}
connString := getConnectionString(dbConf)
db, err := sqlx.Open("godror", connString)
if err != nil {
return fmt.Errorf("failed to open oracle connection: %v", err)
}
db.SetMaxOpenConns(dbConf.MaxOpenConnections)
o.dbconnpool[dbConf.Address] = db
connString := ins.getConnectionString()
var err error
client, err = sqlx.Open("godror", connString)
if err != nil {
return fmt.Errorf("failed to open oracle connection: %v", err)
}
return nil
client.SetMaxOpenConns(ins.MaxOpenConnections)
}
func (o *Oracle) Drop() {
for address := range o.dbconnpool {
if config.Config.DebugMode {
log.Println("D! dropping oracle connection:", address)
}
if err := o.dbconnpool[address].Close(); err != nil {
log.Println("E! failed to close oracle connection:", address, "error:", err)
}
func (ins *Instance) Drop() error {
if config.Config.DebugMode {
log.Println("D! dropping oracle connection:", ins.Address)
}
if err := ins.Close(); err != nil {
log.Println("E! failed to close oracle connection:", ins.Address, "error:", err)
}
}
func (o *Oracle) Gather(slist *list.SafeList) {
atomic.AddUint64(&o.Counter, 1)
for i := range o.Instances {
ins := o.Instances[i]
if ins.Address == "" {
continue
}
o.wg.Add(1)
go o.gatherOnce(slist, ins)
}
o.wg.Wait()
}
func (o *Oracle) gatherOnce(slist *list.SafeList, ins Instance) {
defer o.wg.Done()
if ins.IntervalTimes > 0 {
counter := atomic.LoadUint64(&o.Counter)
if counter%uint64(ins.IntervalTimes) != 0 {
return
}
}
func (ins *Instance) Gather(slist *list.SafeList) {
tags := map[string]string{"address": ins.Address}
for k, v := range ins.Labels {
tags[k] = v
@ -134,9 +115,7 @@ func (o *Oracle) gatherOnce(slist *list.SafeList, ins Instance) {
slist.PushFront(types.NewSample("scrape_use_seconds", use, tags))
}(time.Now())
db := o.dbconnpool[ins.Address]
if err := db.Ping(); err != nil {
if err := ins.client.Ping(); err != nil {
slist.PushFront(types.NewSample("up", 0, tags))
log.Println("E! failed to ping oracle:", ins.Address, "error:", err)
} else {
@ -148,26 +127,26 @@ func (o *Oracle) gatherOnce(slist *list.SafeList, ins Instance) {
for i := 0; i < len(o.Metrics); i++ {
m := o.Metrics[i]
waitMetrics.Add(1)
go ins.scrapeMetric(waitMetrics, slist, db, m, tags)
go ins.scrapeMetric(waitMetrics, slist, m, tags)
}
for i := 0; i < len(ins.Metrics); i++ {
m := ins.Metrics[i]
waitMetrics.Add(1)
go ins.scrapeMetric(waitMetrics, slist, db, m, tags)
go ins.scrapeMetric(waitMetrics, slist, m, tags)
}
waitMetrics.Wait()
}
func (ins *Instance) scrapeMetric(waitMetrics *sync.WaitGroup, slist *list.SafeList, db *sqlx.DB, metricConf MetricConfig, tags map[string]string) {
func (ins *Instance) scrapeMetric(waitMetrics *sync.WaitGroup, slist *list.SafeList, metricConf MetricConfig, tags map[string]string) {
defer waitMetrics.Done()
timeout := time.Duration(metricConf.Timeout)
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
rows, err := db.QueryContext(ctx, metricConf.Request)
rows, err := client.QueryContext(ctx, metricConf.Request)
if ctx.Err() == context.DeadlineExceeded {
log.Println("E! oracle query timeout, request:", metricConf.Request)
@ -269,22 +248,22 @@ func cleanName(s string) string {
return s
}
func getConnectionString(args Instance) string {
func (ins *Instance) getConnectionString() string {
return godror.ConnectionParams{
StandaloneConnection: args.DisableConnectionPool,
StandaloneConnection: ins.DisableConnectionPool,
CommonParams: dsn.CommonParams{
Username: args.Username,
Password: dsn.NewPassword(args.Password),
ConnectString: args.Address,
Username: ins.Username,
Password: dsn.NewPassword(ins.Password),
ConnectString: ins.Address,
},
PoolParams: dsn.PoolParams{
MinSessions: 0,
MaxSessions: args.MaxOpenConnections,
MaxSessions: ins.MaxOpenConnections,
SessionIncrement: 1,
},
ConnParams: dsn.ConnParams{
IsSysDBA: args.IsSysDBA,
IsSysOper: args.IsSysOper,
IsSysDBA: ins.IsSysDBA,
IsSysOper: ins.IsSysOper,
},
}.StringWithPassword()
}

View File

@ -7,7 +7,6 @@ import (
"runtime"
"strings"
"sync"
"sync/atomic"
"time"
"flashcat.cloud/categraf/config"
@ -23,15 +22,15 @@ const (
)
type Instance struct {
Targets []string `toml:"targets"`
Labels map[string]string `toml:"labels"`
IntervalTimes int64 `toml:"interval_times"`
Count int `toml:"count"` // ping -c <COUNT>
PingInterval float64 `toml:"ping_interval"` // ping -i <INTERVAL>
Timeout float64 `toml:"timeout"` // ping -W <TIMEOUT>
Interface string `toml:"interface"` // ping -I/-S <INTERFACE/SRC_ADDR>
IPv6 bool `toml:"ipv6"` // Whether to resolve addresses using ipv6 or not.
Size *int `toml:"size"` // Packet size
config.InstanceConfig
Targets []string `toml:"targets"`
Count int `toml:"count"` // ping -c <COUNT>
PingInterval float64 `toml:"ping_interval"` // ping -i <INTERVAL>
Timeout float64 `toml:"timeout"` // ping -W <TIMEOUT>
Interface string `toml:"interface"` // ping -I/-S <INTERFACE/SRC_ADDR>
IPv6 bool `toml:"ipv6"` // Whether to resolve addresses using ipv6 or not.
Size *int `toml:"size"` // Packet size
calcInterval time.Duration
calcTimeout time.Duration
@ -40,7 +39,7 @@ type Instance struct {
func (ins *Instance) Init() error {
if len(ins.Targets) == 0 {
return nil
return types.ErrInstancesEmpty
}
if ins.Count < 1 {
@ -83,8 +82,6 @@ func (ins *Instance) Init() error {
type Ping struct {
config.Interval
Instances []*Instance `toml:"instances"`
Counter uint64
wg sync.WaitGroup
}
func init() {
@ -93,53 +90,22 @@ func init() {
})
}
func (p *Ping) Prefix() string {
return inputName
}
func (p *Ping) Init() error {
if len(p.Instances) == 0 {
return types.ErrInstancesEmpty
}
func (p *Ping) Prefix() string { return inputName }
func (p *Ping) Init() error { return nil }
func (p *Ping) Drop() {}
func (p *Ping) Gather(slist *list.SafeList) {}
func (p *Ping) GetInstances() []inputs.Instance {
ret := make([]inputs.Instance, len(p.Instances))
for i := 0; i < len(p.Instances); i++ {
if err := p.Instances[i].Init(); err != nil {
return err
}
ret[i] = p.Instances[i]
}
return nil
return ret
}
func (p *Ping) Drop() {}
func (p *Ping) Gather(slist *list.SafeList) {
atomic.AddUint64(&p.Counter, 1)
for i := range p.Instances {
ins := p.Instances[i]
if len(ins.Targets) == 0 {
continue
}
p.wg.Add(1)
go p.gatherOnce(slist, ins)
}
p.wg.Wait()
}
func (p *Ping) gatherOnce(slist *list.SafeList, ins *Instance) {
defer p.wg.Done()
if ins.IntervalTimes > 0 {
counter := atomic.LoadUint64(&p.Counter)
if counter%uint64(ins.IntervalTimes) != 0 {
return
}
}
if config.Config.DebugMode {
if len(ins.Targets) == 0 {
log.Println("D! ping targets empty")
}
func (ins *Instance) Gather(slist *list.SafeList) {
if len(ins.Targets) == 0 {
return
}
wg := new(sync.WaitGroup)

View File

@ -34,15 +34,10 @@ func init() {
})
}
func (p *Processes) Prefix() string {
return inputName
}
func (p *Processes) Drop() {}
func (p *Processes) Init() error {
return nil
}
func (p *Processes) Prefix() string { return inputName }
func (p *Processes) Init() error { return nil }
func (p *Processes) Drop() {}
func (p *Processes) GetInstances() []inputs.Instance { return nil }
func (p *Processes) Gather(slist *list.SafeList) {
// Get an empty map of metric fields

View File

@ -6,8 +6,6 @@ import (
"log"
"runtime"
"strings"
"sync"
"sync/atomic"
"time"
"flashcat.cloud/categraf/config"
@ -22,15 +20,15 @@ const inputName = "procstat"
type PID int32
type Instance struct {
SearchExecSubstring string `toml:"search_exec_substring"`
SearchCmdlineSubstring string `toml:"search_cmdline_substring"`
SearchWinService string `toml:"search_win_service"`
Labels map[string]string `toml:"labels"`
IntervalTimes int64 `toml:"interval_times"`
Mode string `toml:"mode"`
GatherTotal bool `toml:"gather_total"`
GatherPerPid bool `toml:"gather_per_pid"`
GatherMoreMetrics []string `toml:"gather_more_metrics"`
config.InstanceConfig
SearchExecSubstring string `toml:"search_exec_substring"`
SearchCmdlineSubstring string `toml:"search_cmdline_substring"`
SearchWinService string `toml:"search_win_service"`
Mode string `toml:"mode"`
GatherTotal bool `toml:"gather_total"`
GatherPerPid bool `toml:"gather_per_pid"`
GatherMoreMetrics []string `toml:"gather_more_metrics"`
searchString string
solarisMode bool
@ -65,8 +63,6 @@ func (ins *Instance) Init() error {
type Procstat struct {
config.Interval
Instances []*Instance `toml:"instances"`
Counter uint64
wg sync.WaitGroup
}
func init() {
@ -75,46 +71,20 @@ func init() {
})
}
func (s *Procstat) Prefix() string {
return inputName
}
func (s *Procstat) Init() error {
if len(s.Instances) == 0 {
return types.ErrInstancesEmpty
}
func (s *Procstat) Prefix() string { return inputName }
func (s *Procstat) Init() error { return nil }
func (s *Procstat) Drop() {}
func (s *Procstat) Gather(slist *list.SafeList) {}
func (s *Procstat) GetInstances() []inputs.Instance {
ret := make([]inputs.Instance, len(s.Instances))
for i := 0; i < len(s.Instances); i++ {
if err := s.Instances[i].Init(); err != nil {
return err
}
ret[i] = s.Instances[i]
}
return nil
return ret
}
func (s *Procstat) Drop() {}
func (s *Procstat) Gather(slist *list.SafeList) {
atomic.AddUint64(&s.Counter, 1)
for i := range s.Instances {
ins := s.Instances[i]
s.wg.Add(1)
go s.gatherOnce(slist, ins)
}
s.wg.Wait()
}
func (s *Procstat) gatherOnce(slist *list.SafeList, ins *Instance) {
defer s.wg.Done()
if ins.IntervalTimes > 0 {
counter := atomic.LoadUint64(&s.Counter)
if counter%uint64(ins.IntervalTimes) != 0 {
return
}
}
func (ins *Instance) Gather(slist *list.SafeList) {
var (
pids []PID
err error
@ -131,7 +101,7 @@ func (s *Procstat) gatherOnce(slist *list.SafeList, ins *Instance) {
} else if ins.SearchCmdlineSubstring != "" {
pids, err = pg.FullPattern(ins.SearchCmdlineSubstring)
} else if ins.SearchWinService != "" {
pids, err = s.winServicePIDs(ins.SearchWinService)
pids, err = ins.winServicePIDs()
} else {
log.Println("E! Oops... search string not found")
return
@ -152,7 +122,7 @@ func (s *Procstat) gatherOnce(slist *list.SafeList, ins *Instance) {
return
}
s.updateProcesses(ins, pids)
ins.updateProcesses(pids)
for _, field := range ins.GatherMoreMetrics {
switch field {
@ -176,7 +146,7 @@ func (s *Procstat) gatherOnce(slist *list.SafeList, ins *Instance) {
}
}
func (s *Procstat) updateProcesses(ins *Instance, pids []PID) {
func (ins *Instance) updateProcesses(pids []PID) {
procs := make(map[PID]Process)
for _, pid := range pids {
@ -367,10 +337,10 @@ func (ins *Instance) gatherLimit(slist *list.SafeList, procs map[PID]Process, ta
}
}
func (s *Procstat) winServicePIDs(winService string) ([]PID, error) {
func (ins *Instance) winServicePIDs() ([]PID, error) {
var pids []PID
pid, err := queryPidWithWinServiceName(winService)
pid, err := queryPidWithWinServiceName(ins.SearchWinService)
if err != nil {
return pids, err
}

View File

@ -1,7 +1,6 @@
package prometheus
import (
"errors"
"io"
"log"
"net/http"
@ -9,7 +8,6 @@ import (
"os"
"strings"
"sync"
"sync/atomic"
"time"
"flashcat.cloud/categraf/config"
@ -25,19 +23,19 @@ const inputName = "prometheus"
const acceptHeader = `application/vnd.google.protobuf;proto=io.prometheus.client.MetricFamily;encoding=delimited;q=0.7,text/plain;version=0.0.4;q=0.3,*/*;q=0.1`
type Instance struct {
URLs []string `toml:"urls"`
ConsulConfig ConsulConfig `toml:"consul"`
NamePrefix string `toml:"name_prefix"`
Labels map[string]string `toml:"labels"`
IntervalTimes int64 `toml:"interval_times"`
BearerTokenString string `toml:"bearer_token_string"`
BearerTokeFile string `toml:"bearer_token_file"`
Username string `toml:"username"`
Password string `toml:"password"`
Timeout config.Duration `toml:"timeout"`
IgnoreMetrics []string `toml:"ignore_metrics"`
IgnoreLabelKeys []string `toml:"ignore_label_keys"`
Headers []string `toml:"headers"`
config.InstanceConfig
URLs []string `toml:"urls"`
ConsulConfig ConsulConfig `toml:"consul"`
NamePrefix string `toml:"name_prefix"`
BearerTokenString string `toml:"bearer_token_string"`
BearerTokeFile string `toml:"bearer_token_file"`
Username string `toml:"username"`
Password string `toml:"password"`
Timeout config.Duration `toml:"timeout"`
IgnoreMetrics []string `toml:"ignore_metrics"`
IgnoreLabelKeys []string `toml:"ignore_label_keys"`
Headers []string `toml:"headers"`
config.UrlLabel
@ -61,7 +59,7 @@ func (ins *Instance) Empty() bool {
func (ins *Instance) Init() error {
if ins.Empty() {
return nil
return types.ErrInstancesEmpty
}
if ins.ConsulConfig.Enabled && len(ins.ConsulConfig.Queries) > 0 {
@ -130,9 +128,6 @@ func (ins *Instance) createHTTPClient() (*http.Client, error) {
type Prometheus struct {
config.Interval
Instances []*Instance `toml:"instances"`
counter uint64
waitgrp sync.WaitGroup
}
func init() {
@ -141,57 +136,20 @@ func init() {
})
}
func (p *Prometheus) Prefix() string {
return ""
}
func (p *Prometheus) Init() error {
if len(p.Instances) == 0 {
return types.ErrInstancesEmpty
}
func (p *Prometheus) Prefix() string { return "" }
func (p *Prometheus) Init() error { return nil }
func (p *Prometheus) Drop() {}
func (p *Prometheus) Gather(slist *list.SafeList) {}
func (p *Prometheus) GetInstances() []inputs.Instance {
ret := make([]inputs.Instance, len(p.Instances))
for i := 0; i < len(p.Instances); i++ {
if err := p.Instances[i].Init(); err != nil {
if !errors.Is(err, types.ErrInstancesEmpty) {
return err
}
}
ret[i] = p.Instances[i]
}
return nil
return ret
}
func (p *Prometheus) Drop() {}
func (p *Prometheus) Gather(slist *list.SafeList) {
atomic.AddUint64(&p.counter, 1)
for i := range p.Instances {
ins := p.Instances[i]
if ins.Empty() {
continue
}
p.waitgrp.Add(1)
go func(slist *list.SafeList, ins *Instance) {
defer p.waitgrp.Done()
if ins.IntervalTimes > 0 {
counter := atomic.LoadUint64(&p.counter)
if counter%uint64(ins.IntervalTimes) != 0 {
return
}
}
p.gatherOnce(slist, ins)
}(slist, ins)
}
p.waitgrp.Wait()
}
func (p *Prometheus) gatherOnce(slist *list.SafeList, ins *Instance) {
func (ins *Instance) Gather(slist *list.SafeList) {
urlwg := new(sync.WaitGroup)
defer urlwg.Wait()
@ -204,7 +162,7 @@ func (p *Prometheus) gatherOnce(slist *list.SafeList, ins *Instance) {
urlwg.Add(1)
go p.gatherUrl(urlwg, slist, ins, ScrapeUrl{URL: u, Tags: map[string]string{}})
go ins.gatherUrl(urlwg, slist, ScrapeUrl{URL: u, Tags: map[string]string{}})
}
urls, err := ins.UrlsFromConsul()
@ -215,11 +173,11 @@ func (p *Prometheus) gatherOnce(slist *list.SafeList, ins *Instance) {
for i := 0; i < len(urls); i++ {
urlwg.Add(1)
go p.gatherUrl(urlwg, slist, ins, urls[i])
go ins.gatherUrl(urlwg, slist, urls[i])
}
}
func (p *Prometheus) gatherUrl(urlwg *sync.WaitGroup, slist *list.SafeList, ins *Instance, uri ScrapeUrl) {
func (ins *Instance) gatherUrl(urlwg *sync.WaitGroup, slist *list.SafeList, uri ScrapeUrl) {
defer urlwg.Done()
u := uri.URL

View File

@ -7,7 +7,6 @@ import (
"log"
"net/http"
"sync"
"sync/atomic"
"time"
"flashcat.cloud/categraf/config"
@ -22,8 +21,6 @@ const inputName = "rabbitmq"
type RabbitMQ struct {
config.Interval
counter uint64
waitgrp sync.WaitGroup
Instances []*Instance `toml:"instances"`
}
@ -33,62 +30,26 @@ func init() {
})
}
func (r *RabbitMQ) Prefix() string {
return inputName
}
func (r *RabbitMQ) Init() error {
if len(r.Instances) == 0 {
return types.ErrInstancesEmpty
}
func (r *RabbitMQ) Prefix() string { return inputName }
func (r *RabbitMQ) Init() error { return nil }
func (r *RabbitMQ) Drop() {}
func (r *RabbitMQ) Gather(slist *list.SafeList) {}
func (r *RabbitMQ) GetInstances() []inputs.Instance {
ret := make([]inputs.Instance, len(r.Instances))
for i := 0; i < len(r.Instances); i++ {
if err := r.Instances[i].Init(); err != nil {
return err
}
ret[i] = r.Instances[i]
}
return nil
}
func (r *RabbitMQ) Drop() {}
func (r *RabbitMQ) Gather(slist *list.SafeList) {
atomic.AddUint64(&r.counter, 1)
for i := range r.Instances {
ins := r.Instances[i]
if ins.URL == "" {
continue
}
r.waitgrp.Add(1)
go func(slist *list.SafeList, ins *Instance) {
defer r.waitgrp.Done()
if ins.IntervalTimes > 0 {
counter := atomic.LoadUint64(&r.counter)
if counter%uint64(ins.IntervalTimes) != 0 {
return
}
}
ins.gatherOnce(slist)
}(slist, ins)
}
r.waitgrp.Wait()
return ret
}
type Instance struct {
config.InstanceConfig
URL string `toml:"url"`
Username string `toml:"username"`
Password string `toml:"password"`
Labels map[string]string `toml:"labels"`
IntervalTimes int64 `toml:"interval_times"`
HeaderTimeout config.Duration `toml:"header_timeout"`
ClientTimeout config.Duration `toml:"client_timeout"`
@ -114,7 +75,7 @@ type Instance struct {
func (ins *Instance) Init() error {
if ins.URL == "" {
return nil
return types.ErrInstancesEmpty
}
var err error
@ -385,7 +346,7 @@ var gatherFunctions = map[string]gatherFunc{
"queue": gatherQueues,
}
func (ins *Instance) gatherOnce(slist *list.SafeList) {
func (ins *Instance) Gather(slist *list.SafeList) {
tags := map[string]string{"url": ins.URL}
begun := time.Now()

View File

@ -8,8 +8,6 @@ import (
"regexp"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
"flashcat.cloud/categraf/config"
@ -31,13 +29,13 @@ type Command struct {
}
type Instance struct {
Address string `toml:"address"`
Username string `toml:"username"`
Password string `toml:"password"`
PoolSize int `toml:"pool_size"`
Labels map[string]string `toml:"labels"`
IntervalTimes int64 `toml:"interval_times"`
Commands []Command `toml:"commands"`
config.InstanceConfig
Address string `toml:"address"`
Username string `toml:"username"`
Password string `toml:"password"`
PoolSize int `toml:"pool_size"`
Commands []Command `toml:"commands"`
tls.ClientConfig
client *redis.Client
@ -45,7 +43,7 @@ type Instance struct {
func (ins *Instance) Init() error {
if ins.Address == "" {
return nil
return types.ErrInstancesEmpty
}
redisOptions := &redis.Options{
@ -70,9 +68,6 @@ func (ins *Instance) Init() error {
type Redis struct {
config.Interval
Instances []*Instance `toml:"instances"`
Counter uint64
wg sync.WaitGroup
}
func init() {
@ -81,22 +76,16 @@ func init() {
})
}
func (r *Redis) Prefix() string {
return inputName
}
func (r *Redis) Init() error {
if len(r.Instances) == 0 {
return types.ErrInstancesEmpty
}
func (r *Redis) Prefix() string { return inputName }
func (r *Redis) Init() error { return nil }
func (r *Redis) Gather(slist *list.SafeList) {}
func (r *Redis) GetInstances() []inputs.Instance {
ret := make([]inputs.Instance, len(r.Instances))
for i := 0; i < len(r.Instances); i++ {
if err := r.Instances[i].Init(); err != nil {
return err
}
ret[i] = r.Instances[i]
}
return nil
return ret
}
func (r *Redis) Drop() {
@ -107,29 +96,7 @@ func (r *Redis) Drop() {
}
}
func (r *Redis) Gather(slist *list.SafeList) {
atomic.AddUint64(&r.Counter, 1)
for i := range r.Instances {
ins := r.Instances[i]
if ins.Address == "" {
continue
}
r.wg.Add(1)
go r.gatherOnce(slist, ins)
}
r.wg.Wait()
}
func (r *Redis) gatherOnce(slist *list.SafeList, ins *Instance) {
defer r.wg.Done()
if ins.IntervalTimes > 0 {
counter := atomic.LoadUint64(&r.Counter)
if counter%uint64(ins.IntervalTimes) != 0 {
return
}
}
func (ins *Instance) Gather(slist *list.SafeList) {
tags := map[string]string{"address": ins.Address}
for k, v := range ins.Labels {
tags[k] = v
@ -154,11 +121,11 @@ func (r *Redis) gatherOnce(slist *list.SafeList, ins *Instance) {
slist.PushFront(types.NewSample("up", 1, tags))
}
r.gatherInfoAll(slist, ins, tags)
r.gatherCommandValues(slist, ins, tags)
ins.gatherInfoAll(slist, tags)
ins.gatherCommandValues(slist, tags)
}
func (r *Redis) gatherCommandValues(slist *list.SafeList, ins *Instance, tags map[string]string) {
func (ins *Instance) gatherCommandValues(slist *list.SafeList, tags map[string]string) {
fields := make(map[string]interface{})
for _, cmd := range ins.Commands {
val, err := ins.client.Do(context.Background(), cmd.Command...).Result()
@ -181,7 +148,7 @@ func (r *Redis) gatherCommandValues(slist *list.SafeList, ins *Instance, tags ma
}
}
func (r *Redis) gatherInfoAll(slist *list.SafeList, ins *Instance, tags map[string]string) {
func (ins *Instance) gatherInfoAll(slist *list.SafeList, tags map[string]string) {
info, err := ins.client.Info(context.Background(), "ALL").Result()
if err != nil {
info, err = ins.client.Info(context.Background()).Result()

View File

@ -10,7 +10,6 @@ import (
"strconv"
"strings"
"sync"
"sync/atomic"
"flashcat.cloud/categraf/config"
"flashcat.cloud/categraf/inputs"
@ -28,8 +27,6 @@ const measurementReplicas = "redis_sentinel_replicas"
type RedisSentinel struct {
config.Interval
counter uint64
waitgrp sync.WaitGroup
Instances []*Instance `toml:"instances"`
}
@ -39,57 +36,21 @@ func init() {
})
}
func (r *RedisSentinel) Prefix() string {
return ""
}
func (r *RedisSentinel) Init() error {
if len(r.Instances) == 0 {
return types.ErrInstancesEmpty
}
func (r *RedisSentinel) Prefix() string { return "" }
func (r *RedisSentinel) Init() error { return nil }
func (r *RedisSentinel) Drop() {}
func (r *RedisSentinel) Gather(slist *list.SafeList) {}
func (r *RedisSentinel) GetInstances() []inputs.Instance {
ret := make([]inputs.Instance, len(r.Instances))
for i := 0; i < len(r.Instances); i++ {
if err := r.Instances[i].Init(); err != nil {
return err
}
ret[i] = r.Instances[i]
}
return nil
}
func (r *RedisSentinel) Drop() {}
func (r *RedisSentinel) Gather(slist *list.SafeList) {
atomic.AddUint64(&r.counter, 1)
for i := range r.Instances {
ins := r.Instances[i]
if len(ins.Servers) == 0 {
continue
}
r.waitgrp.Add(1)
go func(slist *list.SafeList, ins *Instance) {
defer r.waitgrp.Done()
if ins.IntervalTimes > 0 {
counter := atomic.LoadUint64(&r.counter)
if counter%uint64(ins.IntervalTimes) != 0 {
return
}
}
ins.gatherOnce(slist)
}(slist, ins)
}
r.waitgrp.Wait()
return ret
}
type Instance struct {
Labels map[string]string `toml:"labels"`
IntervalTimes int64 `toml:"interval_times"`
config.InstanceConfig
Servers []string `toml:"servers"`
clients []*RedisSentinelClient
@ -103,7 +64,7 @@ type RedisSentinelClient struct {
func (ins *Instance) Init() error {
if len(ins.Servers) == 0 {
return nil
return types.ErrInstancesEmpty
}
ins.clients = make([]*RedisSentinelClient, len(ins.Servers))
@ -157,7 +118,7 @@ func (ins *Instance) Init() error {
return nil
}
func (ins *Instance) gatherOnce(slist *list.SafeList) {
func (ins *Instance) Gather(slist *list.SafeList) {
var wg sync.WaitGroup
for _, client := range ins.clients {

View File

@ -5,7 +5,6 @@ import (
"fmt"
"log"
"sync"
"sync/atomic"
"time"
"flashcat.cloud/categraf/config"
@ -24,8 +23,6 @@ const inputName = "switch_legacy"
type Switch struct {
config.Interval
counter uint64
waitgrp sync.WaitGroup
Instances []*Instance `toml:"instances"`
SwitchIdLabel string `toml:"switch_id_label"`
Mappings map[string]string `toml:"mappings"`
@ -37,8 +34,16 @@ func init() {
})
}
func (s *Switch) Prefix() string {
return inputName
func (s *Switch) Prefix() string { return inputName }
func (s *Switch) Drop() {}
func (s *Switch) Gather(slist *list.SafeList) {}
func (s *Switch) GetInstances() []inputs.Instance {
ret := make([]inputs.Instance, len(s.Instances))
for i := 0; i < len(s.Instances); i++ {
ret[i] = s.Instances[i]
}
return ret
}
func (s *Switch) Init() error {
@ -47,7 +52,7 @@ func (s *Switch) Init() error {
}
for i := 0; i < len(s.Instances); i++ {
if err := s.Instances[i].Init(); err != nil {
if err := s.Instances[i].RealInit(); err != nil {
return err
} else {
s.Instances[i].parent = s
@ -57,39 +62,8 @@ func (s *Switch) Init() error {
return nil
}
func (s *Switch) Drop() {}
func (s *Switch) Gather(slist *list.SafeList) {
atomic.AddUint64(&s.counter, 1)
for i := range s.Instances {
ins := s.Instances[i]
if len(ins.IPs) == 0 {
continue
}
s.waitgrp.Add(1)
go func(slist *list.SafeList, ins *Instance) {
defer s.waitgrp.Done()
if ins.IntervalTimes > 0 {
counter := atomic.LoadUint64(&s.counter)
if counter%uint64(ins.IntervalTimes) != 0 {
return
}
}
ins.gatherOnce(slist)
}(slist, ins)
}
s.waitgrp.Wait()
}
type Instance struct {
Labels map[string]string `toml:"labels"`
IntervalTimes int64 `toml:"interval_times"`
config.InstanceConfig
IPs []string `toml:"ips"`
Community string `toml:"community"`
@ -142,9 +116,11 @@ type Custom struct {
OID string `toml:"oid"`
}
func (ins *Instance) Init() error {
func (ins *Instance) Init() error { return nil }
func (ins *Instance) RealInit() error {
if len(ins.IPs) == 0 {
return nil
return types.ErrInstancesEmpty
}
ips := ins.parseIPs()
@ -156,10 +132,10 @@ func (ins *Instance) Init() error {
return nil
}
func (ins *Instance) gatherOnce(slist *list.SafeList) error {
func (ins *Instance) Gather(slist *list.SafeList) {
ips := ins.parseIPs()
if len(ips) == 0 {
return nil
return
}
start := time.Now()
@ -188,8 +164,6 @@ func (ins *Instance) gatherOnce(slist *list.SafeList) error {
if len(ins.Customs) > 0 {
ins.gatherCustoms(ips, slist)
}
return nil
}
func (ins *Instance) gatherCustoms(ips []string, slist *list.SafeList) {

View File

@ -26,15 +26,10 @@ func init() {
})
}
func (s *SystemStats) Prefix() string {
return inputName
}
func (s *SystemStats) Init() error {
return nil
}
func (s *SystemStats) Drop() {}
func (s *SystemStats) Prefix() string { return inputName }
func (s *SystemStats) Init() error { return nil }
func (s *SystemStats) Drop() {}
func (s *SystemStats) GetInstances() []inputs.Instance { return nil }
func (s *SystemStats) Gather(slist *list.SafeList) {
loadavg, err := load.Avg()

View File

@ -6,8 +6,6 @@ import (
"net/http"
"net/url"
"strconv"
"sync"
"sync/atomic"
"time"
"flashcat.cloud/categraf/config"
@ -65,12 +63,12 @@ type RequestInfo struct {
}
type Instance struct {
URL string `toml:"url"`
Username string `toml:"username"`
Password string `toml:"password"`
Timeout config.Duration `toml:"timeout"`
Labels map[string]string `toml:"labels"`
IntervalTimes int64 `toml:"interval_times"`
config.InstanceConfig
URL string `toml:"url"`
Username string `toml:"username"`
Password string `toml:"password"`
Timeout config.Duration `toml:"timeout"`
tls.ClientConfig
client *http.Client
@ -79,7 +77,7 @@ type Instance struct {
func (ins *Instance) Init() error {
if ins.URL == "" {
return nil
return types.ErrInstancesEmpty
}
if ins.Timeout <= 0 {
@ -134,9 +132,6 @@ func (ins *Instance) createHTTPClient() (*http.Client, error) {
type Tomcat struct {
config.Interval
Instances []*Instance `toml:"instances"`
Counter uint64
wg sync.WaitGroup
}
func init() {
@ -145,49 +140,20 @@ func init() {
})
}
func (t *Tomcat) Prefix() string {
return inputName
}
func (t *Tomcat) Init() error {
if len(t.Instances) == 0 {
return types.ErrInstancesEmpty
}
func (t *Tomcat) Prefix() string { return inputName }
func (t *Tomcat) Init() error { return nil }
func (t *Tomcat) Drop() {}
func (t *Tomcat) Gather(slist *list.SafeList) {}
func (t *Tomcat) GetInstances() []inputs.Instance {
ret := make([]inputs.Instance, len(t.Instances))
for i := 0; i < len(t.Instances); i++ {
if err := t.Instances[i].Init(); err != nil {
return err
}
ret[i] = t.Instances[i]
}
return nil
return ret
}
func (t *Tomcat) Drop() {}
func (t *Tomcat) Gather(slist *list.SafeList) {
atomic.AddUint64(&t.Counter, 1)
for i := range t.Instances {
ins := t.Instances[i]
if ins.URL == "" {
continue
}
t.wg.Add(1)
go t.gatherOnce(slist, ins)
}
t.wg.Wait()
}
func (t *Tomcat) gatherOnce(slist *list.SafeList, ins *Instance) {
defer t.wg.Done()
if ins.IntervalTimes > 0 {
counter := atomic.LoadUint64(&t.Counter)
if counter%uint64(ins.IntervalTimes) != 0 {
return
}
}
func (ins *Instance) Gather(slist *list.SafeList) {
tags := map[string]string{"url": ins.URL}
for k, v := range ins.Labels {
tags[k] = v

View File

@ -1,12 +1,8 @@
package tpl
import (
"sync"
"sync/atomic"
"flashcat.cloud/categraf/config"
"flashcat.cloud/categraf/inputs"
"flashcat.cloud/categraf/types"
"github.com/toolkits/pkg/container/list"
)
@ -14,8 +10,6 @@ const inputName = "plugin_tpl"
type PluginTpl struct {
config.Interval
counter uint64
waitgrp sync.WaitGroup
Instances []*Instance `toml:"instances"`
}
@ -25,59 +19,27 @@ func init() {
})
}
func (r *PluginTpl) Prefix() string {
return inputName
}
func (pt *PluginTpl) Prefix() string { return inputName }
func (pt *PluginTpl) Init() error { return nil }
func (pt *PluginTpl) Drop() {}
func (pt *PluginTpl) Gather(slist *list.SafeList) {}
func (r *PluginTpl) Init() error {
if len(r.Instances) == 0 {
return types.ErrInstancesEmpty
func (pt *PluginTpl) GetInstances() []inputs.Instance {
ret := make([]inputs.Instance, len(pt.Instances))
for i := 0; i < len(pt.Instances); i++ {
ret[i] = pt.Instances[i]
}
for i := 0; i < len(r.Instances); i++ {
if err := r.Instances[i].Init(); err != nil {
return err
}
}
return nil
}
func (r *PluginTpl) Drop() {}
func (r *PluginTpl) Gather(slist *list.SafeList) {
atomic.AddUint64(&r.counter, 1)
for i := range r.Instances {
ins := r.Instances[i]
r.waitgrp.Add(1)
go func(slist *list.SafeList, ins *Instance) {
defer r.waitgrp.Done()
if ins.IntervalTimes > 0 {
counter := atomic.LoadUint64(&r.counter)
if counter%uint64(ins.IntervalTimes) != 0 {
return
}
}
ins.gatherOnce(slist)
}(slist, ins)
}
r.waitgrp.Wait()
return ret
}
type Instance struct {
Labels map[string]string `toml:"labels"`
IntervalTimes int64 `toml:"interval_times"`
config.InstanceConfig
}
func (ins *Instance) Init() error {
return nil
}
func (ins *Instance) gatherOnce(slist *list.SafeList) {
func (ins *Instance) Gather(slist *list.SafeList) {
}

View File

@ -10,7 +10,6 @@ import (
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
"flashcat.cloud/categraf/config"
@ -33,11 +32,11 @@ var (
)
type Instance struct {
Addresses string `toml:"addresses"`
Timeout int `toml:"timeout"`
ClusterName string `toml:"cluster_name"`
Labels map[string]string `toml:"labels"`
IntervalTimes int64 `toml:"interval_times"`
config.InstanceConfig
Addresses string `toml:"addresses"`
Timeout int `toml:"timeout"`
ClusterName string `toml:"cluster_name"`
tls.ClientConfig
}
@ -64,8 +63,6 @@ func (ins *Instance) ZkConnect(host string) (net.Conn, error) {
type Zookeeper struct {
config.Interval
counter uint64
waitgrp sync.WaitGroup
Instances []*Instance `toml:"instances"`
}
@ -75,51 +72,29 @@ func init() {
})
}
func (z *Zookeeper) Prefix() string {
return ""
func (z *Zookeeper) Prefix() string { return "" }
func (z *Zookeeper) Init() error { return nil }
func (z *Zookeeper) Drop() {}
func (z *Zookeeper) Gather(slist *list.SafeList) {}
func (z *Zookeeper) GetInstances() []inputs.Instance {
ret := make([]inputs.Instance, len(z.Instances))
for i := 0; i < len(z.Instances); i++ {
ret[i] = z.Instances[i]
}
return ret
}
func (z *Zookeeper) Init() error {
if len(z.Instances) == 0 {
func (ins *Instance) Init() error {
if len(ins.ZkHosts()) == 0 {
return types.ErrInstancesEmpty
}
return nil
}
func (z *Zookeeper) Drop() {}
func (z *Zookeeper) Gather(slist *list.SafeList) {
atomic.AddUint64(&z.counter, 1)
for i := range z.Instances {
ins := z.Instances[i]
if len(ins.Addresses) == 0 {
continue
}
z.waitgrp.Add(1)
go func(slist *list.SafeList, ins *Instance) {
defer z.waitgrp.Done()
if ins.IntervalTimes > 0 {
counter := atomic.LoadUint64(&z.counter)
if counter%uint64(ins.IntervalTimes) != 0 {
return
}
}
ins.gatherOnce(slist)
}(slist, ins)
}
z.waitgrp.Wait()
}
func (ins *Instance) gatherOnce(slist *list.SafeList) {
func (ins *Instance) Gather(slist *list.SafeList) {
hosts := ins.ZkHosts()
if len(hosts) == 0 {
log.Println("E! addresses empty")
return
}