refactor framework

This commit is contained in:
Ulric Qin 2022-04-25 15:34:15 +08:00
parent 9b11b6a799
commit 4980450a97
24 changed files with 119 additions and 162 deletions

View File

@ -14,6 +14,7 @@ import (
"flashcat.cloud/categraf/writer"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/prompb"
"github.com/toolkits/pkg/container/list"
)
const agentHostnameLabelKey = "agent_hostname"
@ -62,7 +63,12 @@ func (r *Reader) gatherOnce() {
}
}()
samples := r.Instance.Gather()
// gather
slist := list.NewSafeList()
r.Instance.Gather(slist)
// handle result
samples := slist.PopBackAll()
if len(samples) == 0 {
return
@ -74,17 +80,19 @@ func (r *Reader) gatherOnce() {
continue
}
if samples[i].Timestamp.IsZero() {
samples[i].Timestamp = now
s := samples[i].(*types.Sample)
if s.Timestamp.IsZero() {
s.Timestamp = now
}
if len(r.Instance.GetInputName()) > 0 {
samples[i].Metric = r.Instance.GetInputName() + "_" + strings.ReplaceAll(samples[i].Metric, "-", "_")
s.Metric = r.Instance.GetInputName() + "_" + strings.ReplaceAll(s.Metric, "-", "_")
} else {
samples[i].Metric = strings.ReplaceAll(samples[i].Metric, "-", "_")
s.Metric = strings.ReplaceAll(s.Metric, "-", "_")
}
r.Queue <- samples[i]
r.Queue <- s
}
}
@ -186,10 +194,6 @@ func convert(item *types.Sample) *prompb.TimeSeries {
pt := &prompb.TimeSeries{}
if item.Timestamp.IsZero() {
item.Timestamp = time.Now()
}
timestamp := item.Timestamp.UnixMilli()
if config.Config.Global.Precision == "s" {
timestamp = item.Timestamp.Unix()

View File

@ -6,6 +6,9 @@ address = "127.0.0.1:3306"
username = "root"
password = "1234"
# # timeout
# timeout_seconds = 3
# # interval = global.interval * interval_times
# interval_times = 1

1
go.mod
View File

@ -5,6 +5,7 @@ go 1.17
require (
github.com/go-ping/ping v0.0.0-20211130115550-779d1e919534
github.com/go-redis/redis/v8 v8.11.5
github.com/go-sql-driver/mysql v1.6.0
github.com/gobwas/glob v0.2.3
github.com/godror/godror v0.33.0
github.com/golang/protobuf v1.5.2

View File

@ -4,11 +4,11 @@ import (
"log"
cpuUtil "github.com/shirou/gopsutil/v3/cpu"
"github.com/toolkits/pkg/container/list"
"flashcat.cloud/categraf/config"
"flashcat.cloud/categraf/inputs"
"flashcat.cloud/categraf/inputs/system"
"flashcat.cloud/categraf/types"
)
const inputName = "cpu"
@ -46,13 +46,11 @@ func (c *CPUStats) Init() error {
func (c *CPUStats) Drop() {
}
func (c *CPUStats) Gather() []*types.Sample {
var samples []*types.Sample
func (c *CPUStats) Gather(slist *list.SafeList) {
times, err := c.ps.CPUTimes(c.CollectPerCPU, true)
if err != nil {
log.Println("E! failed to get cpu metrics:", err)
return samples
return
}
for _, cts := range times {
@ -101,15 +99,13 @@ func (c *CPUStats) Gather() []*types.Sample {
"usage_active": 100 * (active - lastActive) / totalDelta,
}
samples = append(samples, inputs.NewSamples(fields, tags)...)
inputs.PushSamples(slist, fields, tags)
}
c.lastStats = make(map[string]cpuUtil.TimesStat)
for _, cts := range times {
c.lastStats[cts.CPU] = cts
}
return samples
}
func totalCPUTime(t cpuUtil.TimesStat) float64 {

View File

@ -8,7 +8,7 @@ import (
"flashcat.cloud/categraf/inputs"
"flashcat.cloud/categraf/inputs/system"
"flashcat.cloud/categraf/pkg/choice"
"flashcat.cloud/categraf/types"
"github.com/toolkits/pkg/container/list"
)
const inputName = "disk"
@ -46,13 +46,11 @@ func (s *DiskStats) Init() error {
func (s *DiskStats) Drop() {
}
func (s *DiskStats) Gather() []*types.Sample {
var samples []*types.Sample
func (s *DiskStats) Gather(slist *list.SafeList) {
disks, partitions, err := s.ps.DiskUsage(s.MountPoints, s.IgnoreFS)
if err != nil {
log.Println("E! failed to get disk usage:", err)
return samples
return
}
for i, du := range disks {
@ -90,10 +88,8 @@ func (s *DiskStats) Gather() []*types.Sample {
"inodes_used": du.InodesUsed,
}
samples = append(samples, inputs.NewSamples(fields, tags)...)
inputs.PushSamples(slist, fields, tags)
}
return samples
}
type MountOptions []string

View File

@ -8,7 +8,7 @@ import (
"flashcat.cloud/categraf/inputs"
"flashcat.cloud/categraf/inputs/system"
"flashcat.cloud/categraf/pkg/filter"
"flashcat.cloud/categraf/types"
"github.com/toolkits/pkg/container/list"
)
const inputName = "diskio"
@ -53,9 +53,7 @@ func (d *DiskIO) Init() error {
return nil
}
func (d *DiskIO) Gather() []*types.Sample {
var samples []*types.Sample
func (d *DiskIO) Gather(slist *list.SafeList) {
devices := []string{}
if d.deviceFilter == nil {
// no glob chars
@ -65,7 +63,7 @@ func (d *DiskIO) Gather() []*types.Sample {
diskio, err := d.ps.DiskIO(devices)
if err != nil {
log.Println("E! failed to get disk io:", err)
return samples
return
}
for _, io := range diskio {
@ -87,8 +85,6 @@ func (d *DiskIO) Gather() []*types.Sample {
"merged_writes": io.MergedWriteCount,
}
samples = append(samples, inputs.NewSamples(fields, map[string]string{"name": io.Name})...)
inputs.PushSamples(slist, fields, map[string]string{"name": io.Name})
}
return samples
}

View File

@ -79,11 +79,9 @@ func (e *Exec) Init() error {
return nil
}
func (e *Exec) Gather() (samples []*types.Sample) {
func (e *Exec) Gather(slist *list.SafeList) {
atomic.AddUint64(&e.Counter, 1)
slist := list.NewSafeList()
var wg sync.WaitGroup
wg.Add(len(e.Instances))
for i := range e.Instances {
@ -92,13 +90,6 @@ func (e *Exec) Gather() (samples []*types.Sample) {
}
wg.Wait()
interfaceList := slist.PopBackAll()
for i := 0; i < len(interfaceList); i++ {
samples = append(samples, interfaceList[i].(*types.Sample))
}
return
}
func (e *Exec) GatherOnce(wg *sync.WaitGroup, slist *list.SafeList, ins ExecInstance) {

View File

@ -164,23 +164,14 @@ func (h *HTTPResponse) Init() error {
func (h *HTTPResponse) Drop() {}
func (h *HTTPResponse) Gather() (samples []*types.Sample) {
func (h *HTTPResponse) Gather(slist *list.SafeList) {
atomic.AddUint64(&h.Counter, 1)
slist := list.NewSafeList()
for i := range h.Instances {
ins := h.Instances[i]
h.wg.Add(1)
go h.gatherOnce(slist, ins)
}
h.wg.Wait()
interfaceList := slist.PopBackAll()
for i := 0; i < len(interfaceList); i++ {
samples = append(samples, interfaceList[i].(*types.Sample))
}
return
}
func (h *HTTPResponse) gatherOnce(slist *list.SafeList, ins *Instance) {

View File

@ -4,6 +4,7 @@ import (
"flashcat.cloud/categraf/config"
"flashcat.cloud/categraf/pkg/conv"
"flashcat.cloud/categraf/types"
"github.com/toolkits/pkg/container/list"
)
type Input interface {
@ -11,7 +12,7 @@ type Input interface {
Drop()
GetInputName() string
GetInterval() config.Duration
Gather() []*types.Sample
Gather(slist *list.SafeList)
}
type Creator func() Input
@ -57,3 +58,13 @@ func NewSamples(fields map[string]interface{}, labels ...map[string]string) []*t
return samples
}
func PushSamples(slist *list.SafeList, fields map[string]interface{}, labels ...map[string]string) {
for metric, value := range fields {
floatValue, err := conv.ToFloat64(value)
if err != nil {
continue
}
slist.PushFront(NewSample(metric, floatValue, labels...))
}
}

View File

@ -14,6 +14,7 @@ import (
"flashcat.cloud/categraf/config"
"flashcat.cloud/categraf/inputs"
"flashcat.cloud/categraf/types"
"github.com/toolkits/pkg/container/list"
)
const inputName = "kernel"
@ -57,7 +58,7 @@ func (s *KernelStats) Init() error {
func (s *KernelStats) Drop() {}
func (s *KernelStats) Gather() (samples []*types.Sample) {
func (s *KernelStats) Gather(slist *list.SafeList) {
data, err := s.getProcStat()
if err != nil {
log.Println("E! failed to read:", s.statFile, "error:", err)
@ -121,8 +122,7 @@ func (s *KernelStats) Gather() (samples []*types.Sample) {
}
}
samples = append(samples, inputs.NewSamples(fields)...)
return
inputs.PushSamples(slist, fields)
}
func (s *KernelStats) getProcStat() ([]byte, error) {

View File

@ -13,6 +13,7 @@ import (
"flashcat.cloud/categraf/config"
"flashcat.cloud/categraf/inputs"
"flashcat.cloud/categraf/types"
"github.com/toolkits/pkg/container/list"
)
const inputName = "kernelvmstat"
@ -46,7 +47,7 @@ func (s *KernelVmstat) Init() error {
func (s *KernelVmstat) Drop() {}
func (s *KernelVmstat) Gather() (samples []*types.Sample) {
func (s *KernelVmstat) Gather(slist *list.SafeList) {
data, err := s.getProcVmstat()
if err != nil {
log.Println("E! failed to gather vmstat:", err)
@ -77,7 +78,7 @@ func (s *KernelVmstat) Gather() (samples []*types.Sample) {
}
}
return inputs.NewSamples(fields)
inputs.PushSamples(slist, fields)
}
func (s *KernelVmstat) getProcVmstat() ([]byte, error) {

View File

@ -15,6 +15,7 @@ import (
"flashcat.cloud/categraf/inputs"
"flashcat.cloud/categraf/pkg/osx"
"flashcat.cloud/categraf/types"
"github.com/toolkits/pkg/container/list"
)
const inputName = "linuxsysctlfs"
@ -47,7 +48,7 @@ func (s *SysctlFS) Init() error {
func (s *SysctlFS) Drop() {}
func (s *SysctlFS) Gather() []*types.Sample {
func (s *SysctlFS) Gather(slist *list.SafeList) {
fields := map[string]interface{}{}
for _, n := range []string{"aio-nr", "aio-max-nr", "dquot-nr", "dquot-max", "super-nr", "super-max"} {
@ -71,7 +72,7 @@ func (s *SysctlFS) Gather() []*types.Sample {
log.Println("E! failed to gather file-nr:", err)
}
return inputs.NewSamples(fields)
inputs.PushSamples(fields)
}
func (s *SysctlFS) gatherOne(name string, fields map[string]interface{}) error {

View File

@ -7,7 +7,7 @@ import (
"flashcat.cloud/categraf/config"
"flashcat.cloud/categraf/inputs"
"flashcat.cloud/categraf/inputs/system"
"flashcat.cloud/categraf/types"
"github.com/toolkits/pkg/container/list"
)
const inputName = "mem"
@ -44,11 +44,11 @@ func (s *MemStats) Init() error {
return nil
}
func (s *MemStats) Gather() []*types.Sample {
func (s *MemStats) Gather(slist *list.SafeList) {
vm, err := s.ps.VMStat()
if err != nil {
log.Println("E! failed to get vmstat:", err)
return nil
return
}
fields := map[string]interface{}{
@ -113,5 +113,5 @@ func (s *MemStats) Gather() []*types.Sample {
}
}
return inputs.NewSamples(fields)
inputs.PushSamples(slist, fields)
}

View File

@ -1,7 +1,10 @@
package mysql
import (
"database/sql"
"errors"
"fmt"
"log"
"sync"
"sync/atomic"
"time"
@ -10,18 +13,22 @@ import (
"flashcat.cloud/categraf/inputs"
"flashcat.cloud/categraf/pkg/tls"
"flashcat.cloud/categraf/types"
"github.com/go-sql-driver/mysql"
"github.com/toolkits/pkg/container/list"
)
const inputName = "mysql"
type Instance struct {
Address string `toml:"address"`
Username string `toml:"username"`
Password string `toml:"password"`
Address string `toml:"address"`
Username string `toml:"username"`
Password string `toml:"password"`
TimeoutSeconds int64 `toml:"timeout_seconds"`
Labels map[string]string `toml:"labels"`
IntervalTimes int64 `toml:"interval_times"`
dsn string
tls.ClientConfig
}
@ -29,6 +36,23 @@ func (ins *Instance) Init() error {
if ins.Address == "" {
return errors.New("address is blank")
}
ins.dsn = fmt.Sprintf("%s:%s@tcp(%s)/", ins.Username, ins.Password, ins.Address)
conf, err := mysql.ParseDSN(ins.dsn)
if err != nil {
return err
}
if conf.Timeout == 0 {
if ins.TimeoutSeconds == 0 {
ins.TimeoutSeconds = 3
}
conf.Timeout = time.Second * time.Duration(ins.TimeoutSeconds)
}
ins.dsn = conf.FormatDSN()
return nil
}
@ -70,24 +94,14 @@ func (m *MySQL) Init() error {
func (m *MySQL) Drop() {}
func (m *MySQL) Gather() (samples []*types.Sample) {
func (m *MySQL) Gather(slist *list.SafeList) {
atomic.AddUint64(&m.Counter, 1)
slist := list.NewSafeList()
for i := range m.Instances {
ins := m.Instances[i]
m.wg.Add(1)
go m.gatherOnce(slist, ins)
}
m.wg.Wait()
interfaceList := slist.PopBackAll()
for i := 0; i < len(interfaceList); i++ {
samples = append(samples, interfaceList[i].(*types.Sample))
}
return
}
func (m *MySQL) gatherOnce(slist *list.SafeList, ins *Instance) {
@ -113,5 +127,14 @@ func (m *MySQL) gatherOnce(slist *list.SafeList, ins *Instance) {
slist.PushFront(inputs.NewSample("scrape_use_seconds", use, tags))
}(begun)
//
db, err := sql.Open("mysql", ins.dsn)
if err != nil {
slist.PushFront(inputs.NewSample("up", 0, tags))
log.Println("E! failed to open mysql:", err)
return
}
slist.PushFront(inputs.NewSample("up", 1, tags))
defer db.Close()
}

View File

@ -9,7 +9,7 @@ import (
"flashcat.cloud/categraf/inputs"
"flashcat.cloud/categraf/inputs/system"
"flashcat.cloud/categraf/pkg/filter"
"flashcat.cloud/categraf/types"
"github.com/toolkits/pkg/container/list"
)
const inputName = "net"
@ -56,17 +56,17 @@ func (s *NetIOStats) Init() error {
return nil
}
func (s *NetIOStats) Gather() []*types.Sample {
func (s *NetIOStats) Gather(slist *list.SafeList) {
netio, err := s.ps.NetIO()
if err != nil {
log.Println("E! failed to get net io metrics:", err)
return nil
return
}
interfaces, err := net.Interfaces()
if err != nil {
log.Println("E! failed to list interfaces:", err)
return nil
return
}
interfacesByName := map[string]net.Interface{}
@ -74,8 +74,6 @@ func (s *NetIOStats) Gather() []*types.Sample {
interfacesByName[iface.Name] = iface
}
var samples []*types.Sample
for _, io := range netio {
if len(s.Interfaces) > 0 {
var found bool
@ -117,8 +115,6 @@ func (s *NetIOStats) Gather() []*types.Sample {
"drop_out": io.Dropout,
}
samples = append(samples, inputs.NewSamples(fields, tags)...)
inputs.PushSamples(slist, fields, tags)
}
return samples
}

View File

@ -121,23 +121,14 @@ func (n *NetResponse) Init() error {
func (n *NetResponse) Drop() {}
func (n *NetResponse) Gather() (samples []*types.Sample) {
func (n *NetResponse) Gather(slist *list.SafeList) {
atomic.AddUint64(&n.Counter, 1)
slist := list.NewSafeList()
for i := range n.Instances {
ins := n.Instances[i]
n.wg.Add(1)
go n.gatherOnce(slist, ins)
}
n.wg.Wait()
interfaceList := slist.PopBackAll()
for i := 0; i < len(interfaceList); i++ {
samples = append(samples, interfaceList[i].(*types.Sample))
}
return
}
func (n *NetResponse) gatherOnce(slist *list.SafeList, ins *Instance) {

View File

@ -7,7 +7,7 @@ import (
"flashcat.cloud/categraf/config"
"flashcat.cloud/categraf/inputs"
"flashcat.cloud/categraf/inputs/system"
"flashcat.cloud/categraf/types"
"github.com/toolkits/pkg/container/list"
)
const inputName = "netstat"
@ -41,7 +41,7 @@ func (s *NetStats) Init() error {
return nil
}
func (s *NetStats) Gather() (samples []*types.Sample) {
func (s *NetStats) Gather(slist *list.SafeList) {
netconns, err := s.ps.NetConnections()
if err != nil {
log.Println("E! failed to get net connections:", err)
@ -81,6 +81,5 @@ func (s *NetStats) Gather() (samples []*types.Sample) {
"udp_socket": counts["UDP"],
}
samples = append(samples, inputs.NewSamples(fields, tags)...)
return
inputs.PushSamples(slist, fields, tags)
}

View File

@ -7,7 +7,7 @@ import (
"flashcat.cloud/categraf/config"
"flashcat.cloud/categraf/inputs"
"flashcat.cloud/categraf/types"
"github.com/toolkits/pkg/container/list"
"github.com/toolkits/pkg/nux"
)
@ -42,7 +42,7 @@ func (n *NTPStat) Init() error {
return nil
}
func (n *NTPStat) Gather() (samples []*types.Sample) {
func (n *NTPStat) Gather(slist *list.SafeList) {
for _, server := range n.NTPServers {
if n.server == "" {
n.server = server
@ -62,9 +62,7 @@ func (n *NTPStat) Gather() (samples []*types.Sample) {
duration := ((serverReciveTime.UnixNano() - orgTime.UnixNano()) + (serverTransmitTime.UnixNano() - dstTime.UnixNano())) / 2
delta := duration / 1e6 // convert to ms
samples = append(samples, inputs.NewSample("offset_ms", delta))
slist.PushFront(inputs.NewSample("offset_ms", delta))
break
}
return
}

View File

@ -101,24 +101,14 @@ func (o *Oracle) Drop() {
}
}
func (o *Oracle) Gather() (samples []*types.Sample) {
func (o *Oracle) Gather(slist *list.SafeList) {
atomic.AddUint64(&o.Counter, 1)
slist := list.NewSafeList()
for i := range o.Instances {
ins := o.Instances[i]
o.wg.Add(1)
go o.gatherOnce(slist, ins)
}
o.wg.Wait()
interfaceList := slist.PopBackAll()
for i := 0; i < len(interfaceList); i++ {
samples = append(samples, interfaceList[i].(*types.Sample))
}
return
}
func (o *Oracle) gatherOnce(slist *list.SafeList, ins OrclInstance) {

View File

@ -113,24 +113,14 @@ func (p *Ping) Init() error {
func (p *Ping) Drop() {}
func (p *Ping) Gather() (samples []*types.Sample) {
func (p *Ping) Gather(slist *list.SafeList) {
atomic.AddUint64(&p.Counter, 1)
slist := list.NewSafeList()
for i := range p.Instances {
ins := p.Instances[i]
p.wg.Add(1)
go p.gatherOnce(slist, ins)
}
p.wg.Wait()
interfaceList := slist.PopBackAll()
for i := 0; i < len(interfaceList); i++ {
samples = append(samples, interfaceList[i].(*types.Sample))
}
return
}
func (p *Ping) gatherOnce(slist *list.SafeList, ins *PingInstance) {

View File

@ -17,7 +17,7 @@ import (
"flashcat.cloud/categraf/config"
"flashcat.cloud/categraf/inputs"
"flashcat.cloud/categraf/pkg/osx"
"flashcat.cloud/categraf/types"
"github.com/toolkits/pkg/container/list"
)
const inputName = "processes"
@ -48,7 +48,7 @@ func (p *Processes) Init() error {
return nil
}
func (p *Processes) Gather() (samples []*types.Sample) {
func (p *Processes) Gather(slist *list.SafeList) {
// Get an empty map of metric fields
fields := getEmptyFields()
@ -74,7 +74,7 @@ func (p *Processes) Gather() (samples []*types.Sample) {
}
}
return inputs.NewSamples(fields)
inputs.PushSamples(slist, fields)
}
// Gets empty fields of metrics based on the OS

View File

@ -96,23 +96,14 @@ func (s *Procstat) Init() error {
func (s *Procstat) Drop() {}
func (s *Procstat) Gather() (samples []*types.Sample) {
func (s *Procstat) Gather(slist *list.SafeList) {
atomic.AddUint64(&s.Counter, 1)
slist := list.NewSafeList()
for i := range s.Instances {
ins := s.Instances[i]
s.wg.Add(1)
go s.gatherOnce(slist, ins)
}
s.wg.Wait()
interfaceList := slist.PopBackAll()
for i := 0; i < len(interfaceList); i++ {
samples = append(samples, interfaceList[i].(*types.Sample))
}
return
}
func (s *Procstat) gatherOnce(slist *list.SafeList, ins *Instance) {

View File

@ -108,24 +108,14 @@ func (r *Redis) Drop() {
}
}
func (r *Redis) Gather() (samples []*types.Sample) {
func (r *Redis) Gather(slist *list.SafeList) {
atomic.AddUint64(&r.Counter, 1)
slist := list.NewSafeList()
for i := range r.Instances {
ins := r.Instances[i]
r.wg.Add(1)
go r.gatherOnce(slist, ins)
}
r.wg.Wait()
interfaceList := slist.PopBackAll()
for i := 0; i < len(interfaceList); i++ {
samples = append(samples, interfaceList[i].(*types.Sample))
}
return
}
func (r *Redis) gatherOnce(slist *list.SafeList, ins *Instance) {

View File

@ -7,10 +7,10 @@ import (
"flashcat.cloud/categraf/config"
"flashcat.cloud/categraf/inputs"
"flashcat.cloud/categraf/types"
"github.com/shirou/gopsutil/v3/cpu"
"github.com/shirou/gopsutil/v3/host"
"github.com/shirou/gopsutil/v3/load"
"github.com/toolkits/pkg/container/list"
)
const inputName = "system"
@ -40,19 +40,17 @@ func (s *SystemStats) Init() error {
func (s *SystemStats) Drop() {}
func (s *SystemStats) Gather() []*types.Sample {
var samples []*types.Sample
func (s *SystemStats) Gather(slist *list.SafeList) {
loadavg, err := load.Avg()
if err != nil && !strings.Contains(err.Error(), "not implemented") {
log.Println("E! failed to gather system load:", err)
return samples
return
}
numCPUs, err := cpu.Counts(true)
if err != nil {
log.Println("E! failed to gather cpu number:", err)
return samples
return
}
fields := map[string]interface{}{
@ -83,5 +81,5 @@ func (s *SystemStats) Gather() []*types.Sample {
}
}
return inputs.NewSamples(fields)
inputs.PushSamples(slist, fields)
}