diff --git a/agent/agent.go b/agent/agent.go index 44daaf9..2937068 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -15,6 +15,7 @@ import ( // auto registry _ "flashcat.cloud/categraf/inputs/cpu" _ "flashcat.cloud/categraf/inputs/disk" + _ "flashcat.cloud/categraf/inputs/diskio" _ "flashcat.cloud/categraf/inputs/mem" _ "flashcat.cloud/categraf/inputs/redis" _ "flashcat.cloud/categraf/inputs/system" diff --git a/conf/input.disk/disk.toml b/conf/input.disk/disk.toml index 0a074f5..231c48b 100644 --- a/conf/input.disk/disk.toml +++ b/conf/input.disk/disk.toml @@ -9,4 +9,4 @@ # mount_points = ["/"] # Ignore mount points by filesystem type. -ignore_fs = ["tmpfs", "devtmpfs", "devfs", "iso9660", "overlay", "aufs", "squashfs"] \ No newline at end of file +ignore_fs = ["tmpfs", "devtmpfs", "devfs", "iso9660", "overlay", "aufs", "squashfs"] diff --git a/conf/input.diskio/diskio.toml b/conf/input.diskio/diskio.toml new file mode 100644 index 0000000..ee572d8 --- /dev/null +++ b/conf/input.diskio/diskio.toml @@ -0,0 +1,9 @@ +# # whether print configs +# print_configs = false + +# # collect interval, default 15s +# interval_seconds = 15 + +# # By default, categraf will gather stats for all devices including disk partitions. +# # Setting devices will restrict the stats to the specified devices. +# devices = ["sda", "sdb", "vd*"] \ No newline at end of file diff --git a/go.mod b/go.mod index 639a797..8e6b0dd 100644 --- a/go.mod +++ b/go.mod @@ -3,6 +3,7 @@ module flashcat.cloud/categraf go 1.17 require ( + github.com/gobwas/glob v0.2.3 github.com/golang/protobuf v1.5.2 github.com/golang/snappy v0.0.4 github.com/koding/multiconfig v0.0.0-20171124222453-69c27309b2d7 diff --git a/go.sum b/go.sum index 67c04ec..ec78934 100644 --- a/go.sum +++ b/go.sum @@ -83,6 +83,8 @@ github.com/go-playground/locales v0.13.0/go.mod h1:taPMhCMXrRLJO55olJkUXHZBHCxTM github.com/go-playground/universal-translator v0.17.0/go.mod h1:UkSxE5sNxxRwHyU+Scu5vgOQjsIJAF8j9muTVoKLVtA= github.com/go-playground/validator/v10 v10.4.1/go.mod h1:nlOn6nFhuKACm19sB/8EGNn9GlaMV7XkbRSipzJ0Ii4= github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= +github.com/gobwas/glob v0.2.3 h1:A4xDbljILXROh+kObIiy5kIaPYD8e96x1tgBhUI5J+Y= +github.com/gobwas/glob v0.2.3/go.mod h1:d3Ez4x06l9bZtSvzIay5+Yzi0fmZzPgnTbPcKjJAkT8= github.com/gogo/protobuf v1.1.1 h1:72R+M5VuhED/KujmZVcIquuo8mBgX4oVda//DQb3PXo= github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= diff --git a/inputs/diskio/diskio.go b/inputs/diskio/diskio.go new file mode 100644 index 0000000..24f231b --- /dev/null +++ b/inputs/diskio/diskio.go @@ -0,0 +1,138 @@ +package diskio + +import ( + "fmt" + "log" + "time" + + "flashcat.cloud/categraf/config" + "flashcat.cloud/categraf/inputs" + "flashcat.cloud/categraf/inputs/system" + "flashcat.cloud/categraf/pkg/filter" + "flashcat.cloud/categraf/types" +) + +const InputName = "diskio" + +type DiskIO struct { + quit chan struct{} + ps system.PS + + PrintConfigs bool `toml:"print_configs"` + IntervalSeconds int64 `toml:"interval_seconds"` + Devices []string `toml:"devices"` + deviceFilter filter.Filter +} + +func init() { + ps := system.NewSystemPS() + inputs.Add(InputName, func() inputs.Input { + return &DiskIO{ + quit: make(chan struct{}), + ps: ps, + } + }) +} + +func (d *DiskIO) getInterval() time.Duration { + if d.IntervalSeconds != 0 { + return time.Duration(d.IntervalSeconds) * time.Second + } + return config.GetInterval() +} + +// overwrite func +func (d *DiskIO) Init() error { + for _, device := range d.Devices { + if filter.HasMeta(device) { + deviceFilter, err := filter.Compile(d.Devices) + if err != nil { + return fmt.Errorf("error compiling device pattern: %s", err.Error()) + } + d.deviceFilter = deviceFilter + } + } + return nil +} + +// overwrite func +func (d *DiskIO) StopGoroutines() { + d.quit <- struct{}{} +} + +// overwrite func +func (d *DiskIO) StartGoroutines(queue chan *types.Sample) { + go d.LoopGather(queue) +} + +func (d *DiskIO) LoopGather(queue chan *types.Sample) { + interval := d.getInterval() + for { + select { + case <-d.quit: + close(d.quit) + return + default: + time.Sleep(interval) + d.Gather(queue) + } + } +} + +// overwrite func +func (d *DiskIO) Gather(queue chan *types.Sample) { + var samples []*types.Sample + + defer func() { + // if r := recover(); r != nil { + // if strings.Contains(fmt.Sprint(r), "closed channel") { + // return + // } else { + // log.Println("E! gather metrics panic:", r) + // } + // } + + now := time.Now() + for i := 0; i < len(samples); i++ { + samples[i].Timestamp = now + samples[i].Metric = InputName + "_" + samples[i].Metric + queue <- samples[i] + } + }() + + // ---------------------------------------------- + + devices := []string{} + if d.deviceFilter == nil { + // no glob chars + devices = d.Devices + } + + diskio, err := d.ps.DiskIO(devices) + if err != nil { + log.Println("E! failed to get disk io:", err) + return + } + + for _, io := range diskio { + if d.deviceFilter != nil && !d.deviceFilter.Match(io.Name) { + continue + } + + fields := map[string]interface{}{ + "reads": io.ReadCount, + "writes": io.WriteCount, + "read_bytes": io.ReadBytes, + "write_bytes": io.WriteBytes, + "read_time": io.ReadTime, + "write_time": io.WriteTime, + "io_time": io.IoTime, + "weighted_io_time": io.WeightedIO, + "iops_in_progress": io.IopsInProgress, + "merged_reads": io.MergedReadCount, + "merged_writes": io.MergedWriteCount, + } + + samples = append(samples, inputs.NewSamples(fields, map[string]string{"name": io.Name})...) + } +} diff --git a/inputs/system/ps.go b/inputs/system/ps.go index 34abab8..0924a5f 100644 --- a/inputs/system/ps.go +++ b/inputs/system/ps.go @@ -174,7 +174,7 @@ func (s *SystemPS) NetConnections() ([]net.ConnectionStat, error) { func (s *SystemPS) DiskIO(names []string) (map[string]disk.IOCountersStat, error) { m, err := disk.IOCounters(names...) - if strings.Contains(err.Error(), "not implemented") { + if err != nil && strings.Contains(err.Error(), "not implemented") { return nil, nil } diff --git a/pkg/filter/filter.go b/pkg/filter/filter.go new file mode 100644 index 0000000..b4c4551 --- /dev/null +++ b/pkg/filter/filter.go @@ -0,0 +1,132 @@ +package filter + +import ( + "strings" + + "github.com/gobwas/glob" +) + +type Filter interface { + Match(string) bool +} + +// Compile takes a list of string filters and returns a Filter interface +// for matching a given string against the filter list. The filter list +// supports glob matching too, ie: +// +// f, _ := Compile([]string{"cpu", "mem", "net*"}) +// f.Match("cpu") // true +// f.Match("network") // true +// f.Match("memory") // false +// +func Compile(filters []string) (Filter, error) { + // return if there is nothing to compile + if len(filters) == 0 { + return nil, nil + } + + // check if we can compile a non-glob filter + noGlob := true + for _, filter := range filters { + if HasMeta(filter) { + noGlob = false + break + } + } + + switch { + case noGlob: + // return non-globbing filter if not needed. + return compileFilterNoGlob(filters), nil + case len(filters) == 1: + return glob.Compile(filters[0]) + default: + return glob.Compile("{" + strings.Join(filters, ",") + "}") + } +} + +// HasMeta reports whether path contains any magic glob characters. +func HasMeta(s string) bool { + return strings.ContainsAny(s, "*?[") +} + +type filter struct { + m map[string]struct{} +} + +func (f *filter) Match(s string) bool { + _, ok := f.m[s] + return ok +} + +type filtersingle struct { + s string +} + +func (f *filtersingle) Match(s string) bool { + return f.s == s +} + +func compileFilterNoGlob(filters []string) Filter { + if len(filters) == 1 { + return &filtersingle{s: filters[0]} + } + out := filter{m: make(map[string]struct{})} + for _, filter := range filters { + out.m[filter] = struct{}{} + } + return &out +} + +type IncludeExcludeFilter struct { + include Filter + exclude Filter + includeDefault bool + excludeDefault bool +} + +func NewIncludeExcludeFilter( + include []string, + exclude []string, +) (Filter, error) { + return NewIncludeExcludeFilterDefaults(include, exclude, true, false) +} + +func NewIncludeExcludeFilterDefaults( + include []string, + exclude []string, + includeDefault bool, + excludeDefault bool, +) (Filter, error) { + in, err := Compile(include) + if err != nil { + return nil, err + } + + ex, err := Compile(exclude) + if err != nil { + return nil, err + } + + return &IncludeExcludeFilter{in, ex, includeDefault, excludeDefault}, nil +} + +func (f *IncludeExcludeFilter) Match(s string) bool { + if f.include != nil { + if !f.include.Match(s) { + return false + } + } else if !f.includeDefault { + return false + } + + if f.exclude != nil { + if f.exclude.Match(s) { + return false + } + } else if f.excludeDefault { + return false + } + + return true +}