code refactor

This commit is contained in:
Ulric Qin 2022-04-15 18:19:15 +08:00
parent 0d09cfa89c
commit 1fd0ec4901
5 changed files with 104 additions and 76 deletions

View File

@ -67,15 +67,12 @@ func (c *CPUStats) LoopGather(queue chan *types.Sample) {
return
default:
time.Sleep(interval)
c.Gather(queue)
c.GatherOnce(queue)
}
}
}
// overwrite func
func (c *CPUStats) Gather(queue chan *types.Sample) {
var samples []*types.Sample
func (c *CPUStats) GatherOnce(queue chan *types.Sample) {
defer func() {
if r := recover(); r != nil {
if strings.Contains(fmt.Sprint(r), "closed channel") {
@ -84,21 +81,29 @@ func (c *CPUStats) Gather(queue chan *types.Sample) {
log.Println("E! gather metrics panic:", r)
}
}
now := time.Now()
for i := 0; i < len(samples); i++ {
samples[i].Timestamp = now
samples[i].Metric = InputName + "_" + samples[i].Metric
queue <- samples[i]
}
}()
// ----------------------------------------------
samples := c.Gather()
if len(samples) == 0 {
return
}
now := time.Now()
for i := 0; i < len(samples); i++ {
samples[i].Timestamp = now
samples[i].Metric = InputName + "_" + samples[i].Metric
queue <- samples[i]
}
}
func (c *CPUStats) Gather() []*types.Sample {
var samples []*types.Sample
times, err := c.ps.CPUTimes(c.CollectPerCPU, true)
if err != nil {
log.Println("E! failed to get cpu metrics:", err)
return
return samples
}
for _, cts := range times {
@ -154,6 +159,8 @@ func (c *CPUStats) Gather(queue chan *types.Sample) {
for _, cts := range times {
c.lastStats[cts.CPU] = cts
}
return samples
}
func totalCPUTime(t cpuUtil.TimesStat) float64 {

View File

@ -65,15 +65,12 @@ func (s *DiskStats) LoopGather(queue chan *types.Sample) {
return
default:
time.Sleep(interval)
s.Gather(queue)
s.GatherOnce(queue)
}
}
}
// overwrite func
func (s *DiskStats) Gather(queue chan *types.Sample) {
var samples []*types.Sample
func (s *DiskStats) GatherOnce(queue chan *types.Sample) {
defer func() {
if r := recover(); r != nil {
if strings.Contains(fmt.Sprint(r), "closed channel") {
@ -82,21 +79,29 @@ func (s *DiskStats) Gather(queue chan *types.Sample) {
log.Println("E! gather metrics panic:", r)
}
}
now := time.Now()
for i := 0; i < len(samples); i++ {
samples[i].Timestamp = now
samples[i].Metric = InputName + "_" + samples[i].Metric
queue <- samples[i]
}
}()
// ----------------------------------------------
samples := s.Gather()
if len(samples) == 0 {
return
}
now := time.Now()
for i := 0; i < len(samples); i++ {
samples[i].Timestamp = now
samples[i].Metric = InputName + "_" + samples[i].Metric
queue <- samples[i]
}
}
func (s *DiskStats) Gather() []*types.Sample {
var samples []*types.Sample
disks, partitions, err := s.ps.DiskUsage(s.MountPoints, s.IgnoreFS)
if err != nil {
log.Println("E! failed to get disk usage:", err)
return
return samples
}
for i, du := range disks {
@ -129,6 +134,8 @@ func (s *DiskStats) Gather(queue chan *types.Sample) {
samples = append(samples, inputs.NewSamples(fields, tags)...)
}
return samples
}
type MountOptions []string

View File

@ -75,15 +75,12 @@ func (d *DiskIO) LoopGather(queue chan *types.Sample) {
return
default:
time.Sleep(interval)
d.Gather(queue)
d.GatherOnce(queue)
}
}
}
// overwrite func
func (d *DiskIO) Gather(queue chan *types.Sample) {
var samples []*types.Sample
func (d *DiskIO) GatherOnce(queue chan *types.Sample) {
defer func() {
if r := recover(); r != nil {
if strings.Contains(fmt.Sprint(r), "closed channel") {
@ -92,16 +89,24 @@ func (d *DiskIO) Gather(queue chan *types.Sample) {
log.Println("E! gather metrics panic:", r)
}
}
now := time.Now()
for i := 0; i < len(samples); i++ {
samples[i].Timestamp = now
samples[i].Metric = InputName + "_" + samples[i].Metric
queue <- samples[i]
}
}()
// ----------------------------------------------
samples := d.Gather()
if len(samples) == 0 {
return
}
now := time.Now()
for i := 0; i < len(samples); i++ {
samples[i].Timestamp = now
samples[i].Metric = InputName + "_" + samples[i].Metric
queue <- samples[i]
}
}
func (d *DiskIO) Gather() []*types.Sample {
var samples []*types.Sample
devices := []string{}
if d.deviceFilter == nil {
@ -112,7 +117,7 @@ func (d *DiskIO) Gather(queue chan *types.Sample) {
diskio, err := d.ps.DiskIO(devices)
if err != nil {
log.Println("E! failed to get disk io:", err)
return
return samples
}
for _, io := range diskio {
@ -136,4 +141,6 @@ func (d *DiskIO) Gather(queue chan *types.Sample) {
samples = append(samples, inputs.NewSamples(fields, map[string]string{"name": io.Name})...)
}
return samples
}

View File

@ -67,15 +67,12 @@ func (s *MemStats) LoopGather(queue chan *types.Sample) {
return
default:
time.Sleep(interval)
s.Gather(queue)
s.GatherOnce(queue)
}
}
}
// overwrite func
func (s *MemStats) Gather(queue chan *types.Sample) {
var samples []*types.Sample
func (s *MemStats) GatherOnce(queue chan *types.Sample) {
defer func() {
if r := recover(); r != nil {
if strings.Contains(fmt.Sprint(r), "closed channel") {
@ -84,21 +81,27 @@ func (s *MemStats) Gather(queue chan *types.Sample) {
log.Println("E! gather metrics panic:", r)
}
}
now := time.Now()
for i := 0; i < len(samples); i++ {
samples[i].Timestamp = now
samples[i].Metric = InputName + "_" + samples[i].Metric
queue <- samples[i]
}
}()
// ----------------------------------------------
samples := s.Gather()
if len(samples) == 0 {
return
}
now := time.Now()
for i := 0; i < len(samples); i++ {
samples[i].Timestamp = now
samples[i].Metric = InputName + "_" + samples[i].Metric
queue <- samples[i]
}
}
func (s *MemStats) Gather() []*types.Sample {
vm, err := s.ps.VMStat()
if err != nil {
log.Println("E! failed to get vmstat:", err)
return
return nil
}
fields := map[string]interface{}{
@ -163,5 +166,5 @@ func (s *MemStats) Gather(queue chan *types.Sample) {
}
}
samples = inputs.NewSamples(fields)
return inputs.NewSamples(fields)
}

View File

@ -64,15 +64,12 @@ func (s *SystemStats) LoopGather(queue chan *types.Sample) {
return
default:
time.Sleep(interval)
s.Gather(queue)
s.GatherOnce(queue)
}
}
}
// overwrite func
func (s *SystemStats) Gather(queue chan *types.Sample) {
var samples []*types.Sample
func (s *SystemStats) GatherOnce(queue chan *types.Sample) {
defer func() {
if r := recover(); r != nil {
if strings.Contains(fmt.Sprint(r), "closed channel") {
@ -81,27 +78,35 @@ func (s *SystemStats) Gather(queue chan *types.Sample) {
log.Println("E! gather metrics panic:", r)
}
}
now := time.Now()
for i := 0; i < len(samples); i++ {
samples[i].Timestamp = now
samples[i].Metric = InputName + "_" + samples[i].Metric
queue <- samples[i]
}
}()
// ----------------------------------------------
samples := s.Gather()
if len(samples) == 0 {
return
}
now := time.Now()
for i := 0; i < len(samples); i++ {
samples[i].Timestamp = now
samples[i].Metric = InputName + "_" + samples[i].Metric
queue <- samples[i]
}
}
func (s *SystemStats) Gather() []*types.Sample {
var samples []*types.Sample
loadavg, err := load.Avg()
if err != nil && !strings.Contains(err.Error(), "not implemented") {
log.Println("E! failed to gather system load:", err)
return
return samples
}
numCPUs, err := cpu.Counts(true)
if err != nil {
log.Println("E! failed to gather cpu number:", err)
return
return samples
}
fields := map[string]interface{}{
@ -117,11 +122,10 @@ func (s *SystemStats) Gather(queue chan *types.Sample) {
uptime, err := host.Uptime()
if err != nil {
log.Println("E! failed to get host uptime:", err)
return
} else {
fields["uptime"] = uptime
}
fields["uptime"] = uptime
if s.CollectUserNumber {
users, err := host.Users()
if err == nil {
@ -133,5 +137,5 @@ func (s *SystemStats) Gather(queue chan *types.Sample) {
}
}
samples = inputs.NewSamples(fields)
return inputs.NewSamples(fields)
}