support collect process mem util, mem used and cpu util metrics (#152)

* support send to influxdb

* fix influxdb config var

* fix name error

* fix remove unused variable

* support collect proc mem util, mem used and cpu util metrics

* opt calc timestamp

* opt code
This commit is contained in:
zengwh 2020-05-18 19:25:54 +08:00 committed by GitHub
parent 91d9f3b29f
commit 1accfdf9a8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 109 additions and 15 deletions

View File

@ -5,12 +5,16 @@ import (
"time" "time"
"github.com/didi/nightingale/src/dataobj" "github.com/didi/nightingale/src/dataobj"
process "github.com/shirou/gopsutil/process"
) )
var MetricHistory *History var MetricHistory *History
var ProcsCache *ProcessCache
func Init() { func Init() {
MetricHistory = NewHistory() MetricHistory = NewHistory()
ProcsCache = NewProcsCache()
} }
func NewHistory() *History { func NewHistory() *History {
@ -61,3 +65,51 @@ func (h *History) clean() {
} }
} }
} }
type ProcessCache struct {
sync.RWMutex
Data map[int32]*process.Process
}
func NewProcsCache() *ProcessCache{
pc := ProcessCache{
Data: make(map[int32]*process.Process),
}
go pc.Clean()
return &pc
}
func (pc *ProcessCache) Set(pid int32, p *process.Process){
pc.Lock()
defer pc.Unlock()
pc.Data[pid] = p
}
func (pc *ProcessCache) Get(pid int32)(*process.Process, bool){
pc.RLock()
defer pc.RUnlock()
p, exists := pc.Data[pid]
return p, exists
}
func (pc *ProcessCache) Clean() {
ticker := time.NewTicker(10 * time.Minute)
for {
select {
case <-ticker.C:
pc.clean()
}
}
}
func (pc *ProcessCache) clean() {
pc.Lock()
defer pc.Unlock()
for pid, procs := range pc.Data {
running, _ := procs.IsRunning()
if !running{
delete(pc.Data, pid)
}
}
}

View File

@ -5,11 +5,11 @@ import (
"time" "time"
"github.com/toolkits/pkg/logger" "github.com/toolkits/pkg/logger"
"github.com/toolkits/pkg/nux" process "github.com/shirou/gopsutil/process"
"github.com/didi/nightingale/src/dataobj" "github.com/didi/nightingale/src/dataobj"
"github.com/didi/nightingale/src/model" "github.com/didi/nightingale/src/model"
"github.com/didi/nightingale/src/modules/collector/sys/funcs" "github.com/didi/nightingale/src/modules/collector/sys/funcs"
"github.com/didi/nightingale/src/modules/collector/cache"
"github.com/didi/nightingale/src/toolkits/identity" "github.com/didi/nightingale/src/toolkits/identity"
) )
@ -45,32 +45,74 @@ func (p *ProcScheduler) Stop() {
} }
func ProcCollect(p *model.ProcCollect) { func ProcCollect(p *model.ProcCollect) {
ps, err := nux.AllProcs() ps, err := process.Processes()
if err != nil { if err != nil {
logger.Error(err) logger.Error(err)
return return
} }
var memUsedTotal uint64 = 0
pslen := len(ps) var memUtilTotal = 0.0
var cpuUtilTotal = 0.0
var items [] *dataobj.MetricValue
cnt := 0 cnt := 0
for i := 0; i < pslen; i++ { for _, procs := range ps {
if isProc(ps[i], p.CollectMethod, p.Target) { if isProc(procs, p.CollectMethod, p.Target) {
cnt++ cnt++
procCache, exists := cache.ProcsCache.Get(procs.Pid)
if !exists{
cache.ProcsCache.Set(procs.Pid, procs)
procCache = procs
}
mem, err := procCache.MemoryInfo()
if err != nil {
logger.Error(err)
continue
}
memUsedTotal += mem.RSS
memUtil, err := procCache.MemoryPercent()
if err != nil {
logger.Error(err)
continue
}
memUtilTotal += float64(memUtil)
cpuUtil, err := procCache.Percent(0)
if err != nil {
logger.Error(err)
continue
}
cpuUtilTotal += cpuUtil
} }
} }
item := funcs.GaugeValue("proc.num", cnt, p.Tags) procNumItem := funcs.GaugeValue("proc.num", cnt, p.Tags)
item.Step = int64(p.Step) memUsedItem := funcs.GaugeValue("proc.mem.used", memUsedTotal, p.Tags)
item.Timestamp = time.Now().Unix() memUtilItem := funcs.GaugeValue("proc.mem.util", memUtilTotal, p.Tags)
item.Endpoint = identity.Identity cpuUtilItem := funcs.GaugeValue("proc.cpu.util", cpuUtilTotal, p.Tags)
items = []*dataobj.MetricValue{procNumItem, memUsedItem, memUtilItem, cpuUtilItem}
now := time.Now().Unix()
for _, item := range items{
item.Step = int64(p.Step)
item.Timestamp = now
item.Endpoint = identity.Identity
}
funcs.Push([]*dataobj.MetricValue{item}) funcs.Push(items)
} }
func isProc(p *nux.Proc, method, target string) bool { func isProc(p *process.Process, method, target string) bool {
if method == "name" && target == p.Name { name, err := p.Name()
if err != nil {
return false
}
cmdlines, err := p.Cmdline()
if err != nil {
return false
}
if method == "name" && target == name {
return true return true
} else if (method == "cmdline" || method == "cmd") && strings.Contains(p.Cmdline, target) { } else if (method == "cmdline" || method == "cmd") && strings.Contains(cmdlines, target) {
return true return true
} }
return false return false