From 6a70bed30f6edb987d192e1b6cb2cb650d58c401 Mon Sep 17 00:00:00 2001 From: qinyening <710leo@gmail.com> Date: Thu, 21 Jan 2021 00:38:18 +0800 Subject: [PATCH] modify proc info collect (#528) --- src/modules/agent/cache/cache.go | 50 ----- src/modules/agent/sys/procs/scheduler.go | 132 +++++++++----- src/modules/agent/sys/procs/sys.go | 223 +++++++++++++++++++++++ 3 files changed, 310 insertions(+), 95 deletions(-) create mode 100644 src/modules/agent/sys/procs/sys.go diff --git a/src/modules/agent/cache/cache.go b/src/modules/agent/cache/cache.go index b52e0c79..fa74be6b 100644 --- a/src/modules/agent/cache/cache.go +++ b/src/modules/agent/cache/cache.go @@ -5,15 +5,12 @@ import ( "time" "github.com/didi/nightingale/src/common/dataobj" - process "github.com/shirou/gopsutil/process" ) var MetricHistory *History -var ProcsCache *ProcessCache func Init() { MetricHistory = NewHistory() - ProcsCache = NewProcsCache() } func NewHistory() *History { @@ -64,50 +61,3 @@ func (h *History) clean() { } } } - -type ProcessCache struct { - sync.RWMutex - Data map[int32]*process.Process -} - -func NewProcsCache() *ProcessCache { - pc := ProcessCache{ - Data: make(map[int32]*process.Process), - } - go pc.Clean() - return &pc -} - -func (pc *ProcessCache) Set(pid int32, p *process.Process) { - pc.Lock() - defer pc.Unlock() - pc.Data[pid] = p -} - -func (pc *ProcessCache) Get(pid int32) (*process.Process, bool) { - pc.RLock() - defer pc.RUnlock() - p, exists := pc.Data[pid] - return p, exists -} - -func (pc *ProcessCache) Clean() { - ticker := time.NewTicker(10 * time.Minute) - for { - select { - case <-ticker.C: - pc.clean() - } - } -} - -func (pc *ProcessCache) clean() { - pc.Lock() - defer pc.Unlock() - for pid, procs := range pc.Data { - running, _ := procs.IsRunning() - if !running { - delete(pc.Data, pid) - } - } -} diff --git a/src/modules/agent/sys/procs/scheduler.go b/src/modules/agent/sys/procs/scheduler.go index ec746778..85136834 100644 --- a/src/modules/agent/sys/procs/scheduler.go +++ b/src/modules/agent/sys/procs/scheduler.go @@ -4,14 +4,13 @@ import ( "strings" "time" - process "github.com/shirou/gopsutil/process" - "github.com/toolkits/pkg/logger" - "github.com/didi/nightingale/src/common/dataobj" "github.com/didi/nightingale/src/models" - "github.com/didi/nightingale/src/modules/agent/cache" "github.com/didi/nightingale/src/modules/agent/config" "github.com/didi/nightingale/src/modules/agent/core" + + "github.com/toolkits/pkg/logger" + "github.com/toolkits/pkg/nux" ) type ProcScheduler struct { @@ -45,52 +44,97 @@ func (p *ProcScheduler) Stop() { close(p.Quit) } +var ( + rBytes map[int]uint64 + wBytes map[int]uint64 + procJiffy map[int]uint64 + jiffy uint64 +) + func ProcCollect(p *models.ProcCollect) { - ps, err := process.Processes() + ps, err := AllProcs() if err != nil { logger.Error(err) return } - var memUsedTotal uint64 = 0 - var memUtilTotal = 0.0 - var cpuUtilTotal = 0.0 + + newRBytes := make(map[int]uint64) + newWBytes := make(map[int]uint64) + newProcJiffy := make(map[int]uint64) + newJiffy := readJiffy() + + for _, proc := range ps { + newRBytes[proc.Pid] = proc.RBytes + newWBytes[proc.Pid] = proc.WBytes + if pj, err := readProcJiffy(proc.Pid); err == nil { + newProcJiffy[proc.Pid] = pj + } + } + var items []*dataobj.MetricValue - cnt := 0 - for _, procs := range ps { - if isProc(procs, p.CollectMethod, p.Target) { + var cnt int + var fdNum int + var memory uint64 + var cpu float64 + var ioWrite, ioRead uint64 + var uptime uint64 + + for _, proc := range ps { + if isProc(proc, p.CollectMethod, p.Target) { cnt++ - procCache, exists := cache.ProcsCache.Get(procs.Pid) - if !exists { - cache.ProcsCache.Set(procs.Pid, procs) - procCache = procs + memory += proc.Mem + fdNum += proc.FdCount + rOld := rBytes[proc.Pid] + if rOld != 0 && rOld <= proc.RBytes { + ioRead += proc.RBytes - rOld } - mem, err := procCache.MemoryInfo() - if err != nil { - logger.Error(err) + + wOld := wBytes[proc.Pid] + if wOld != 0 && wOld <= proc.WBytes { + ioWrite += proc.WBytes - wOld + } + + uptime = readUptime(proc.Pid) + + // jiffy 为零,表示第一次采集信息,不做cpu计算 + if jiffy == 0 { continue } - memUsedTotal += mem.RSS - memUtil, err := procCache.MemoryPercent() - if err != nil { - logger.Error(err) - continue - } - memUtilTotal += float64(memUtil) - cpuUtil, err := procCache.Percent(0) - if err != nil { - logger.Error(err) - continue - } - cpuUtilTotal += cpuUtil + + cpu += float64(newProcJiffy[proc.Pid] - procJiffy[proc.Pid]) } } procNumItem := core.GaugeValue("proc.num", cnt, p.Tags) - memUsedItem := core.GaugeValue("proc.mem.used", memUsedTotal, p.Tags) - memUtilItem := core.GaugeValue("proc.mem.util", memUtilTotal, p.Tags) - cpuUtilItem := core.GaugeValue("proc.cpu.util", cpuUtilTotal, p.Tags) - items = []*dataobj.MetricValue{procNumItem, memUsedItem, memUtilItem, cpuUtilItem} + procFdItem := core.GaugeValue("proc.uptime", uptime, p.Tags) + procUptimeItem := core.GaugeValue("proc.fdnum", fdNum, p.Tags) + memUsedItem := core.GaugeValue("proc.mem.used", memory*1024, p.Tags) + ioReadItem := core.GaugeValue("proc.io.read.bytes", ioRead, p.Tags) + ioWriteItem := core.GaugeValue("proc.io.write.bytes", ioWrite, p.Tags) + items = []*dataobj.MetricValue{procNumItem, memUsedItem, procFdItem, procUptimeItem, ioReadItem, ioWriteItem} + + if jiffy != 0 { + cpuUtil := cpu / float64(newJiffy-jiffy) * 100 + if cpuUtil > 100 { + cpuUtil = 100 + } + + cpuUtilItem := core.GaugeValue("proc.cpu.util", cpuUtil, p.Tags) + items = append(items, cpuUtilItem) + } + + sysMem, err := nux.MemInfo() + if err != nil { + logger.Error(err) + } + + if sysMem != nil && sysMem.MemTotal != 0 { + memUsedUtil := float64(memory*1024) / float64(sysMem.MemTotal) * 100 + memUtilItem := core.GaugeValue("proc.mem.util", memUsedUtil, p.Tags) + items = append(items, memUtilItem) + } + now := time.Now().Unix() for _, item := range items { item.Step = int64(p.Step) @@ -99,18 +143,16 @@ func ProcCollect(p *models.ProcCollect) { } core.Push(items) + + rBytes = newRBytes + wBytes = newWBytes + procJiffy = newProcJiffy + jiffy = readJiffy() } -func isProc(p *process.Process, method, target string) bool { - name, err := p.Name() - if err != nil { - return false - } - cmdlines, err := p.Cmdline() - if err != nil { - return false - } - if method == "name" && target == name { +func isProc(p *Proc, method, target string) bool { + cmdlines := p.Cmdline + if method == "name" && target == p.Name { return true } else if (method == "cmdline" || method == "cmd") && strings.Contains(cmdlines, target) { return true diff --git a/src/modules/agent/sys/procs/sys.go b/src/modules/agent/sys/procs/sys.go new file mode 100644 index 00000000..a5349efe --- /dev/null +++ b/src/modules/agent/sys/procs/sys.go @@ -0,0 +1,223 @@ +package procs + +import ( + "bufio" + "bytes" + "fmt" + "io" + "io/ioutil" + "os" + "strconv" + "strings" + "time" + + "github.com/toolkits/pkg/file" + "github.com/toolkits/pkg/logger" +) + +type Proc struct { + Pid int + Name string + Exe string + Cmdline string + Mem uint64 + Cpu float64 + jiffy uint64 + + RBytes uint64 + WBytes uint64 + Uptime uint64 + FdCount int +} + +func (this *Proc) String() string { + return fmt.Sprintf("", + this.Pid, this.Name, this.Uptime, this.Exe, this.Mem, this.Cpu) +} + +func AllProcs() (ps []*Proc, err error) { + var dirs []string + dirs, err = file.DirsUnder("/proc") + if err != nil { + return + } + + size := len(dirs) + if size == 0 { + return + } + + for i := 0; i < size; i++ { + pid, e := strconv.Atoi(dirs[i]) + if e != nil { + continue + } + statusFile := fmt.Sprintf("/proc/%d/status", pid) + cmdlineFile := fmt.Sprintf("/proc/%d/cmdline", pid) + + if !file.IsExist(statusFile) || !file.IsExist(cmdlineFile) { + continue + } + + name, memory, e := ReadNameAndMem(statusFile) + if e != nil { + logger.Error("read pid status file err:", e) + continue + } + + cmdlineBytes, e := file.ToBytes(cmdlineFile) + if e != nil { + continue + } + + cmdlineBytesLen := len(cmdlineBytes) + if cmdlineBytesLen == 0 { + continue + } + + noNut := make([]byte, 0, cmdlineBytesLen) + for j := 0; j < cmdlineBytesLen; j++ { + if cmdlineBytes[j] != 0 { + noNut = append(noNut, cmdlineBytes[j]) + } + } + + p := Proc{Pid: pid, Name: name, Cmdline: string(noNut), Mem: memory} + ps = append(ps, &p) + } + + for _, p := range ps { + p.RBytes, p.WBytes = readIO(p.Pid) + p.FdCount = readProcFd(p.Pid) + p.Uptime = readUptime(p.Pid) + } + + return +} + +func ReadNameAndMem(path string) (name string, memory uint64, err error) { + var content []byte + content, err = ioutil.ReadFile(path) + if err != nil { + return + } + + reader := bufio.NewReader(bytes.NewBuffer(content)) + + for { + var bs []byte + bs, err = file.ReadLine(reader) + if err == io.EOF { + return + } + + line := string(bs) + + colonIndex := strings.Index(line, ":") + if colonIndex == -1 { + logger.Warning("line is illegal", path) + continue + } + + if strings.TrimSpace(line[0:colonIndex]) == "Name" { + name = strings.TrimSpace(line[colonIndex+1:]) + } else if strings.TrimSpace(line[0:colonIndex]) == "VmRSS" { + kbIndex := strings.Index(line, "kB") + memory, _ = strconv.ParseUint(strings.TrimSpace(line[colonIndex+1:kbIndex]), 10, 64) + break + } + + } + return +} + +func readJiffy() uint64 { + f, err := os.Open("/proc/stat") + if err != nil { + return 0 + } + defer f.Close() + scanner := bufio.NewScanner(f) + scanner.Scan() + s := scanner.Text() + if !strings.HasPrefix(s, "cpu ") { + return 0 + } + ss := strings.Split(s, " ") + var ret uint64 + for _, x := range ss { + if x == "" || x == "cpu" { + continue + } + if v, e := strconv.ParseUint(x, 10, 64); e == nil { + ret += v + } + } + return ret +} + +func readProcFd(pid int) int { + var fds []string + fds, err := file.FilesUnder(fmt.Sprintf("/proc/%d/fd", pid)) + if err != nil { + return 0 + } + return len(fds) +} + +func readProcJiffy(pid int) (uint64, error) { + f, err := os.Open(fmt.Sprintf("/proc/%d/stat", pid)) + if err != nil { + return 0, err + } + defer f.Close() + scanner := bufio.NewScanner(f) + scanner.Scan() + s := scanner.Text() + ss := strings.Split(s, " ") + if len(ss) < 15 { + return 0, fmt.Errorf("/porc/%s/stat illegal:%v", pid, ss) + } + var ret uint64 + for i := 13; i < 15; i++ { + v, e := strconv.ParseUint(ss[i], 10, 64) + if e != nil { + return 0, err + } + ret += v + } + return ret, nil +} + +func readIO(pid int) (r uint64, w uint64) { + f, err := os.Open(fmt.Sprintf("/proc/%d/io", pid)) + if err != nil { + return + } + defer f.Close() + scanner := bufio.NewScanner(f) + for scanner.Scan() { + s := scanner.Text() + if strings.HasPrefix(s, "read_bytes") || strings.HasPrefix(s, "write_bytes") { + v := strings.Split(s, " ") + if len(v) == 2 { + value, _ := strconv.ParseUint(v[1], 10, 64) + if s[0] == 'r' { + r = value + } else { + w = value + } + } + } + } + return +} + +func readUptime(pid int) uint64 { + fileInfo, err := os.Stat(fmt.Sprintf("/proc/%d", pid)) + if err != nil { + return 0 + } + duration := time.Now().Sub(fileInfo.ModTime()) + return uint64(duration.Seconds()) +}