add diskio collector
This commit is contained in:
parent
d2af1d9f72
commit
e40c5ae3d2
|
@ -15,6 +15,7 @@ import (
|
|||
// auto registry
|
||||
_ "flashcat.cloud/categraf/inputs/cpu"
|
||||
_ "flashcat.cloud/categraf/inputs/disk"
|
||||
_ "flashcat.cloud/categraf/inputs/diskio"
|
||||
_ "flashcat.cloud/categraf/inputs/mem"
|
||||
_ "flashcat.cloud/categraf/inputs/redis"
|
||||
_ "flashcat.cloud/categraf/inputs/system"
|
||||
|
|
|
@ -9,4 +9,4 @@
|
|||
# mount_points = ["/"]
|
||||
|
||||
# Ignore mount points by filesystem type.
|
||||
ignore_fs = ["tmpfs", "devtmpfs", "devfs", "iso9660", "overlay", "aufs", "squashfs"]
|
||||
ignore_fs = ["tmpfs", "devtmpfs", "devfs", "iso9660", "overlay", "aufs", "squashfs"]
|
||||
|
|
|
@ -0,0 +1,9 @@
|
|||
# # whether print configs
|
||||
# print_configs = false
|
||||
|
||||
# # collect interval, default 15s
|
||||
# interval_seconds = 15
|
||||
|
||||
# # By default, categraf will gather stats for all devices including disk partitions.
|
||||
# # Setting devices will restrict the stats to the specified devices.
|
||||
# devices = ["sda", "sdb", "vd*"]
|
1
go.mod
1
go.mod
|
@ -3,6 +3,7 @@ module flashcat.cloud/categraf
|
|||
go 1.17
|
||||
|
||||
require (
|
||||
github.com/gobwas/glob v0.2.3
|
||||
github.com/golang/protobuf v1.5.2
|
||||
github.com/golang/snappy v0.0.4
|
||||
github.com/koding/multiconfig v0.0.0-20171124222453-69c27309b2d7
|
||||
|
|
2
go.sum
2
go.sum
|
@ -83,6 +83,8 @@ github.com/go-playground/locales v0.13.0/go.mod h1:taPMhCMXrRLJO55olJkUXHZBHCxTM
|
|||
github.com/go-playground/universal-translator v0.17.0/go.mod h1:UkSxE5sNxxRwHyU+Scu5vgOQjsIJAF8j9muTVoKLVtA=
|
||||
github.com/go-playground/validator/v10 v10.4.1/go.mod h1:nlOn6nFhuKACm19sB/8EGNn9GlaMV7XkbRSipzJ0Ii4=
|
||||
github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
|
||||
github.com/gobwas/glob v0.2.3 h1:A4xDbljILXROh+kObIiy5kIaPYD8e96x1tgBhUI5J+Y=
|
||||
github.com/gobwas/glob v0.2.3/go.mod h1:d3Ez4x06l9bZtSvzIay5+Yzi0fmZzPgnTbPcKjJAkT8=
|
||||
github.com/gogo/protobuf v1.1.1 h1:72R+M5VuhED/KujmZVcIquuo8mBgX4oVda//DQb3PXo=
|
||||
github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ=
|
||||
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
|
||||
|
|
|
@ -0,0 +1,138 @@
|
|||
package diskio
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"log"
|
||||
"time"
|
||||
|
||||
"flashcat.cloud/categraf/config"
|
||||
"flashcat.cloud/categraf/inputs"
|
||||
"flashcat.cloud/categraf/inputs/system"
|
||||
"flashcat.cloud/categraf/pkg/filter"
|
||||
"flashcat.cloud/categraf/types"
|
||||
)
|
||||
|
||||
const InputName = "diskio"
|
||||
|
||||
type DiskIO struct {
|
||||
quit chan struct{}
|
||||
ps system.PS
|
||||
|
||||
PrintConfigs bool `toml:"print_configs"`
|
||||
IntervalSeconds int64 `toml:"interval_seconds"`
|
||||
Devices []string `toml:"devices"`
|
||||
deviceFilter filter.Filter
|
||||
}
|
||||
|
||||
func init() {
|
||||
ps := system.NewSystemPS()
|
||||
inputs.Add(InputName, func() inputs.Input {
|
||||
return &DiskIO{
|
||||
quit: make(chan struct{}),
|
||||
ps: ps,
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func (d *DiskIO) getInterval() time.Duration {
|
||||
if d.IntervalSeconds != 0 {
|
||||
return time.Duration(d.IntervalSeconds) * time.Second
|
||||
}
|
||||
return config.GetInterval()
|
||||
}
|
||||
|
||||
// overwrite func
|
||||
func (d *DiskIO) Init() error {
|
||||
for _, device := range d.Devices {
|
||||
if filter.HasMeta(device) {
|
||||
deviceFilter, err := filter.Compile(d.Devices)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error compiling device pattern: %s", err.Error())
|
||||
}
|
||||
d.deviceFilter = deviceFilter
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// overwrite func
|
||||
func (d *DiskIO) StopGoroutines() {
|
||||
d.quit <- struct{}{}
|
||||
}
|
||||
|
||||
// overwrite func
|
||||
func (d *DiskIO) StartGoroutines(queue chan *types.Sample) {
|
||||
go d.LoopGather(queue)
|
||||
}
|
||||
|
||||
func (d *DiskIO) LoopGather(queue chan *types.Sample) {
|
||||
interval := d.getInterval()
|
||||
for {
|
||||
select {
|
||||
case <-d.quit:
|
||||
close(d.quit)
|
||||
return
|
||||
default:
|
||||
time.Sleep(interval)
|
||||
d.Gather(queue)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// overwrite func
|
||||
func (d *DiskIO) Gather(queue chan *types.Sample) {
|
||||
var samples []*types.Sample
|
||||
|
||||
defer func() {
|
||||
// if r := recover(); r != nil {
|
||||
// if strings.Contains(fmt.Sprint(r), "closed channel") {
|
||||
// return
|
||||
// } else {
|
||||
// log.Println("E! gather metrics panic:", r)
|
||||
// }
|
||||
// }
|
||||
|
||||
now := time.Now()
|
||||
for i := 0; i < len(samples); i++ {
|
||||
samples[i].Timestamp = now
|
||||
samples[i].Metric = InputName + "_" + samples[i].Metric
|
||||
queue <- samples[i]
|
||||
}
|
||||
}()
|
||||
|
||||
// ----------------------------------------------
|
||||
|
||||
devices := []string{}
|
||||
if d.deviceFilter == nil {
|
||||
// no glob chars
|
||||
devices = d.Devices
|
||||
}
|
||||
|
||||
diskio, err := d.ps.DiskIO(devices)
|
||||
if err != nil {
|
||||
log.Println("E! failed to get disk io:", err)
|
||||
return
|
||||
}
|
||||
|
||||
for _, io := range diskio {
|
||||
if d.deviceFilter != nil && !d.deviceFilter.Match(io.Name) {
|
||||
continue
|
||||
}
|
||||
|
||||
fields := map[string]interface{}{
|
||||
"reads": io.ReadCount,
|
||||
"writes": io.WriteCount,
|
||||
"read_bytes": io.ReadBytes,
|
||||
"write_bytes": io.WriteBytes,
|
||||
"read_time": io.ReadTime,
|
||||
"write_time": io.WriteTime,
|
||||
"io_time": io.IoTime,
|
||||
"weighted_io_time": io.WeightedIO,
|
||||
"iops_in_progress": io.IopsInProgress,
|
||||
"merged_reads": io.MergedReadCount,
|
||||
"merged_writes": io.MergedWriteCount,
|
||||
}
|
||||
|
||||
samples = append(samples, inputs.NewSamples(fields, map[string]string{"name": io.Name})...)
|
||||
}
|
||||
}
|
|
@ -174,7 +174,7 @@ func (s *SystemPS) NetConnections() ([]net.ConnectionStat, error) {
|
|||
|
||||
func (s *SystemPS) DiskIO(names []string) (map[string]disk.IOCountersStat, error) {
|
||||
m, err := disk.IOCounters(names...)
|
||||
if strings.Contains(err.Error(), "not implemented") {
|
||||
if err != nil && strings.Contains(err.Error(), "not implemented") {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,132 @@
|
|||
package filter
|
||||
|
||||
import (
|
||||
"strings"
|
||||
|
||||
"github.com/gobwas/glob"
|
||||
)
|
||||
|
||||
type Filter interface {
|
||||
Match(string) bool
|
||||
}
|
||||
|
||||
// Compile takes a list of string filters and returns a Filter interface
|
||||
// for matching a given string against the filter list. The filter list
|
||||
// supports glob matching too, ie:
|
||||
//
|
||||
// f, _ := Compile([]string{"cpu", "mem", "net*"})
|
||||
// f.Match("cpu") // true
|
||||
// f.Match("network") // true
|
||||
// f.Match("memory") // false
|
||||
//
|
||||
func Compile(filters []string) (Filter, error) {
|
||||
// return if there is nothing to compile
|
||||
if len(filters) == 0 {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// check if we can compile a non-glob filter
|
||||
noGlob := true
|
||||
for _, filter := range filters {
|
||||
if HasMeta(filter) {
|
||||
noGlob = false
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
switch {
|
||||
case noGlob:
|
||||
// return non-globbing filter if not needed.
|
||||
return compileFilterNoGlob(filters), nil
|
||||
case len(filters) == 1:
|
||||
return glob.Compile(filters[0])
|
||||
default:
|
||||
return glob.Compile("{" + strings.Join(filters, ",") + "}")
|
||||
}
|
||||
}
|
||||
|
||||
// HasMeta reports whether path contains any magic glob characters.
|
||||
func HasMeta(s string) bool {
|
||||
return strings.ContainsAny(s, "*?[")
|
||||
}
|
||||
|
||||
type filter struct {
|
||||
m map[string]struct{}
|
||||
}
|
||||
|
||||
func (f *filter) Match(s string) bool {
|
||||
_, ok := f.m[s]
|
||||
return ok
|
||||
}
|
||||
|
||||
type filtersingle struct {
|
||||
s string
|
||||
}
|
||||
|
||||
func (f *filtersingle) Match(s string) bool {
|
||||
return f.s == s
|
||||
}
|
||||
|
||||
func compileFilterNoGlob(filters []string) Filter {
|
||||
if len(filters) == 1 {
|
||||
return &filtersingle{s: filters[0]}
|
||||
}
|
||||
out := filter{m: make(map[string]struct{})}
|
||||
for _, filter := range filters {
|
||||
out.m[filter] = struct{}{}
|
||||
}
|
||||
return &out
|
||||
}
|
||||
|
||||
type IncludeExcludeFilter struct {
|
||||
include Filter
|
||||
exclude Filter
|
||||
includeDefault bool
|
||||
excludeDefault bool
|
||||
}
|
||||
|
||||
func NewIncludeExcludeFilter(
|
||||
include []string,
|
||||
exclude []string,
|
||||
) (Filter, error) {
|
||||
return NewIncludeExcludeFilterDefaults(include, exclude, true, false)
|
||||
}
|
||||
|
||||
func NewIncludeExcludeFilterDefaults(
|
||||
include []string,
|
||||
exclude []string,
|
||||
includeDefault bool,
|
||||
excludeDefault bool,
|
||||
) (Filter, error) {
|
||||
in, err := Compile(include)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
ex, err := Compile(exclude)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &IncludeExcludeFilter{in, ex, includeDefault, excludeDefault}, nil
|
||||
}
|
||||
|
||||
func (f *IncludeExcludeFilter) Match(s string) bool {
|
||||
if f.include != nil {
|
||||
if !f.include.Match(s) {
|
||||
return false
|
||||
}
|
||||
} else if !f.includeDefault {
|
||||
return false
|
||||
}
|
||||
|
||||
if f.exclude != nil {
|
||||
if f.exclude.Match(s) {
|
||||
return false
|
||||
}
|
||||
} else if f.excludeDefault {
|
||||
return false
|
||||
}
|
||||
|
||||
return true
|
||||
}
|
Loading…
Reference in New Issue