add procstat, just statistic proc number

This commit is contained in:
Ulric Qin 2022-04-21 16:39:53 +08:00
parent b2dbb4e833
commit 4af5eef048
9 changed files with 478 additions and 0 deletions

View File

@ -29,6 +29,7 @@ import (
_ "flashcat.cloud/categraf/inputs/oracle"
_ "flashcat.cloud/categraf/inputs/ping"
_ "flashcat.cloud/categraf/inputs/processes"
_ "flashcat.cloud/categraf/inputs/procstat"
_ "flashcat.cloud/categraf/inputs/redis"
_ "flashcat.cloud/categraf/inputs/system"
)

View File

@ -0,0 +1,24 @@
# # collect interval
# interval = 15
[[instances]]
# executable name (ie, pgrep <search_exec_substring>)
search_exec_substring = "nginx"
# # pattern as argument for pgrep (ie, pgrep -f <search_cmdline_substring>)
# search_cmdline_substring = "n9e server"
# # windows service name
# search_win_service = ""
# # append some labels for series
# labels = { region="cloud", product="n9e" }
# # interval = global.interval * interval_times
# interval_times = 1
# # mode to use when calculating CPU usage. can be one of 'solaris' or 'irix'
# mode = "irix"
# # categraf will not gather proc's cpu|mem|fd|io...
# only_gather_proc_count = true

View File

@ -0,0 +1,90 @@
package procstat
import (
"fmt"
"os"
"strconv"
"strings"
"github.com/shirou/gopsutil/v3/process"
)
// NativeFinder uses gopsutil to find processes
type NativeFinder struct {
}
func NewNativeFinder() (PIDFinder, error) {
return &NativeFinder{}, nil
}
// Uid will return all pids for the given user
func (pg *NativeFinder) UID(user string) ([]PID, error) {
var dst []PID
procs, err := process.Processes()
if err != nil {
return dst, err
}
for _, p := range procs {
username, err := p.Username()
if err != nil {
//skip, this can happen if we don't have permissions or
//the pid no longer exists
continue
}
if username == user {
dst = append(dst, PID(p.Pid))
}
}
return dst, nil
}
// PidFile returns the pid from the pid file given.
func (pg *NativeFinder) PidFile(path string) ([]PID, error) {
var pids []PID
pidString, err := os.ReadFile(path)
if err != nil {
return pids, fmt.Errorf("failed to read pidfile '%s'. Error: '%s'",
path, err)
}
pid, err := strconv.ParseInt(strings.TrimSpace(string(pidString)), 10, 32)
if err != nil {
return pids, err
}
pids = append(pids, PID(pid))
return pids, nil
}
// FullPattern matches on the command line when the process was executed
func (pg *NativeFinder) FullPattern(pattern string) ([]PID, error) {
var pids []PID
procs, err := pg.FastProcessList()
if err != nil {
return pids, err
}
for _, p := range procs {
cmd, err := p.Cmdline()
if err != nil {
//skip, this can be caused by the pid no longer existing
//or you having no permissions to access it
continue
}
if strings.Contains(cmd, pattern) {
pids = append(pids, PID(p.Pid))
}
}
return pids, err
}
func (pg *NativeFinder) FastProcessList() ([]*process.Process, error) {
pids, err := process.Pids()
if err != nil {
return nil, err
}
result := make([]*process.Process, len(pids))
for i, pid := range pids {
result[i] = &process.Process{Pid: pid}
}
return result, nil
}

View File

@ -0,0 +1,30 @@
//go:build !windows
// +build !windows
package procstat
import (
"strings"
)
// Pattern matches on the process name
func (pg *NativeFinder) Pattern(pattern string) ([]PID, error) {
var pids []PID
procs, err := pg.FastProcessList()
if err != nil {
return pids, err
}
for _, p := range procs {
name, err := p.Exe()
if err != nil {
//skip, this can be caused by the pid no longer existing
//or you having no permissions to access it
continue
}
if strings.Contains(name, pattern) {
pids = append(pids, PID(p.Pid))
}
}
return pids, err
}

View File

@ -0,0 +1,27 @@
package procstat
import (
"strings"
)
// Pattern matches on the process name
func (pg *NativeFinder) Pattern(pattern string) ([]PID, error) {
var pids []PID
procs, err := pg.FastProcessList()
if err != nil {
return pids, err
}
for _, p := range procs {
name, err := p.Name()
if err != nil {
//skip, this can be caused by the pid no longer existing
//or you having no permissions to access it
continue
}
if strings.Contains(name, pattern) {
pids = append(pids, PID(p.Pid))
}
}
return pids, err
}

View File

@ -0,0 +1,78 @@
package procstat
import (
"fmt"
"time"
"github.com/shirou/gopsutil/v3/cpu"
"github.com/shirou/gopsutil/v3/process"
)
type Process interface {
PID() PID
Tags() map[string]string
PageFaults() (*process.PageFaultsStat, error)
IOCounters() (*process.IOCountersStat, error)
MemoryInfo() (*process.MemoryInfoStat, error)
Name() (string, error)
Cmdline() (string, error)
NumCtxSwitches() (*process.NumCtxSwitchesStat, error)
NumFDs() (int32, error)
NumThreads() (int32, error)
Percent(interval time.Duration) (float64, error)
MemoryPercent() (float32, error)
Times() (*cpu.TimesStat, error)
RlimitUsage(bool) ([]process.RlimitStat, error)
Username() (string, error)
CreateTime() (int64, error)
Ppid() (int32, error)
}
type PIDFinder interface {
PidFile(path string) ([]PID, error)
Pattern(pattern string) ([]PID, error)
UID(user string) ([]PID, error)
FullPattern(path string) ([]PID, error)
}
type Proc struct {
hasCPUTimes bool
tags map[string]string
*process.Process
}
func NewProc(pid PID) (Process, error) {
p, err := process.NewProcess(int32(pid))
if err != nil {
return nil, err
}
proc := &Proc{
Process: p,
hasCPUTimes: false,
tags: make(map[string]string),
}
return proc, nil
}
func (p *Proc) Tags() map[string]string {
return p.tags
}
func (p *Proc) PID() PID {
return PID(p.Process.Pid)
}
func (p *Proc) Username() (string, error) {
return p.Process.Username()
}
func (p *Proc) Percent(_ time.Duration) (float64, error) {
cpuPerc, err := p.Process.Percent(time.Duration(0))
if !p.hasCPUTimes && err == nil {
p.hasCPUTimes = true
return 0, fmt.Errorf("must call Percent twice to compute percent cpu")
}
return cpuPerc, err
}

167
inputs/procstat/procstat.go Normal file
View File

@ -0,0 +1,167 @@
package procstat
import (
"errors"
"fmt"
"log"
"strings"
"sync"
"sync/atomic"
"flashcat.cloud/categraf/config"
"flashcat.cloud/categraf/inputs"
"flashcat.cloud/categraf/types"
"github.com/toolkits/pkg/container/list"
)
const inputName = "procstat"
type PID int32
type Instance struct {
SearchExecSubstring string `toml:"search_exec_substring"`
SearchCmdlineSubstring string `toml:"search_cmdline_substring"`
SearchWinService string `toml:"search_win_service"`
Labels map[string]string `toml:"labels"`
IntervalTimes int64 `toml:"interval_times"`
Mode string `toml:"mode"`
OnlyGatherProcCount bool `toml:"only_gather_proc_count"`
searchString string
solarisMode bool
}
func (ins *Instance) Init() error {
if ins.Mode == "" {
ins.Mode = "irix"
}
if strings.ToLower(ins.Mode) == "solaris" {
ins.solarisMode = true
}
if ins.SearchExecSubstring != "" {
ins.searchString = ins.SearchExecSubstring
log.Println("I! procstat: search_exec_substring:", ins.SearchExecSubstring)
} else if ins.SearchCmdlineSubstring != "" {
ins.searchString = ins.SearchCmdlineSubstring
log.Println("I! procstat: search_cmdline_substring:", ins.SearchCmdlineSubstring)
} else if ins.SearchWinService != "" {
ins.searchString = ins.SearchWinService
log.Println("I! procstat: search_win_service:", ins.SearchWinService)
} else {
return errors.New("the fields should not be all blank: search_exec_substring, search_cmdline_substring, search_win_service")
}
return nil
}
type Procstat struct {
Interval config.Duration `toml:"interval"`
Instances []*Instance `toml:"instances"`
Counter uint64
wg sync.WaitGroup
}
func init() {
inputs.Add(inputName, func() inputs.Input {
return &Procstat{}
})
}
func (s *Procstat) GetInputName() string {
return inputName
}
func (s *Procstat) GetInterval() config.Duration {
return s.Interval
}
func (s *Procstat) Init() error {
if len(s.Instances) == 0 {
return fmt.Errorf("instances empty")
}
for i := 0; i < len(s.Instances); i++ {
if err := s.Instances[i].Init(); err != nil {
return err
}
}
return nil
}
func (s *Procstat) Drop() {}
func (s *Procstat) Gather() (samples []*types.Sample) {
atomic.AddUint64(&s.Counter, 1)
slist := list.NewSafeList()
for i := range s.Instances {
ins := s.Instances[i]
s.wg.Add(1)
go s.gatherOnce(slist, ins)
}
s.wg.Wait()
interfaceList := slist.PopBackAll()
for i := 0; i < len(interfaceList); i++ {
samples = append(samples, interfaceList[i].(*types.Sample))
}
return
}
func (s *Procstat) gatherOnce(slist *list.SafeList, ins *Instance) {
defer s.wg.Done()
if ins.IntervalTimes > 0 {
counter := atomic.LoadUint64(&s.Counter)
if counter%uint64(ins.IntervalTimes) != 0 {
return
}
}
var (
pids []PID
err error
tags = map[string]string{"search_string": ins.searchString}
)
pg, _ := NewNativeFinder()
if ins.SearchExecSubstring != "" {
pids, err = pg.Pattern(ins.SearchExecSubstring)
} else if ins.SearchCmdlineSubstring != "" {
pids, err = pg.FullPattern(ins.SearchCmdlineSubstring)
} else if ins.SearchWinService != "" {
pids, err = s.winServicePIDs(ins.SearchWinService)
} else {
log.Println("E! Oops... search string not found")
return
}
if err != nil {
log.Println("E! procstat: failed to lookup pids, search string:", ins.searchString, "error:", err)
slist.PushFront(inputs.NewSample("lookup_count", 0, tags))
return
}
slist.PushFront(inputs.NewSample("lookup_count", len(pids), tags))
if ins.OnlyGatherProcCount {
return
}
}
func (s *Procstat) winServicePIDs(winService string) ([]PID, error) {
var pids []PID
pid, err := queryPidWithWinServiceName(winService)
if err != nil {
return pids, err
}
pids = append(pids, PID(pid))
return pids, nil
}

View File

@ -0,0 +1,12 @@
//go:build !windows
// +build !windows
package procstat
import (
"fmt"
)
func queryPidWithWinServiceName(_ string) (uint32, error) {
return 0, fmt.Errorf("os not support win_service option")
}

View File

@ -0,0 +1,49 @@
//go:build windows
// +build windows
package procstat
import (
"unsafe"
"golang.org/x/sys/windows"
"golang.org/x/sys/windows/svc/mgr"
)
func getService(name string) (*mgr.Service, error) {
m, err := mgr.Connect()
if err != nil {
return nil, err
}
defer m.Disconnect()
srv, err := m.OpenService(name)
if err != nil {
return nil, err
}
return srv, nil
}
func queryPidWithWinServiceName(winServiceName string) (uint32, error) {
srv, err := getService(winServiceName)
if err != nil {
return 0, err
}
var p *windows.SERVICE_STATUS_PROCESS
var bytesNeeded uint32
var buf []byte
if err := windows.QueryServiceStatusEx(srv.Handle, windows.SC_STATUS_PROCESS_INFO, nil, 0, &bytesNeeded); err != windows.ERROR_INSUFFICIENT_BUFFER {
return 0, err
}
buf = make([]byte, bytesNeeded)
p = (*windows.SERVICE_STATUS_PROCESS)(unsafe.Pointer(&buf[0]))
if err := windows.QueryServiceStatusEx(srv.Handle, windows.SC_STATUS_PROCESS_INFO, &buf[0], uint32(len(buf)), &bytesNeeded); err != nil {
return 0, err
}
return p.ProcessId, nil
}