categraf/inputs/ping/ping.go

257 lines
5.5 KiB
Go
Raw Normal View History

2022-04-19 17:36:45 +08:00
package ping
import (
"fmt"
2022-04-19 18:09:16 +08:00
"log"
2022-04-19 17:36:45 +08:00
"net"
2022-04-19 18:09:16 +08:00
"runtime"
"strings"
2022-04-19 17:36:45 +08:00
"sync"
"sync/atomic"
"time"
"flashcat.cloud/categraf/config"
"flashcat.cloud/categraf/inputs"
"flashcat.cloud/categraf/types"
2022-04-19 18:09:16 +08:00
"github.com/go-ping/ping"
2022-04-19 17:36:45 +08:00
"github.com/toolkits/pkg/container/list"
)
2022-04-19 18:09:16 +08:00
const (
inputName = "ping"
defaultPingDataBytesSize = 56
)
2022-04-19 17:36:45 +08:00
type PingInstance struct {
2022-04-19 18:09:16 +08:00
Targets []string `toml:"targets"`
Labels map[string]string `toml:"labels"`
IntervalTimes int64 `toml:"interval_times"`
Count int `toml:"count"` // ping -c <COUNT>
PingInterval float64 `toml:"ping_interval"` // ping -i <INTERVAL>
Timeout float64 `toml:"timeout"` // ping -W <TIMEOUT>
Interface string `toml:"interface"` // ping -I/-S <INTERFACE/SRC_ADDR>
IPv6 bool `toml:"ipv6"` // Whether to resolve addresses using ipv6 or not.
Size *int `toml:"size"` // Packet size
2022-04-19 17:36:45 +08:00
calcInterval time.Duration
calcTimeout time.Duration
sourceAddress string
}
func (ins *PingInstance) Init() error {
if ins.Count < 1 {
ins.Count = 1
}
if ins.PingInterval < 0.2 {
ins.calcInterval = time.Duration(0.2 * float64(time.Second))
} else {
ins.calcInterval = time.Duration(ins.PingInterval * float64(time.Second))
}
if ins.Timeout == 0 {
ins.calcTimeout = time.Duration(3) * time.Second
2022-04-19 17:36:45 +08:00
} else {
ins.calcTimeout = time.Duration(ins.Timeout) * time.Second
}
if ins.Interface != "" {
if addr := net.ParseIP(ins.Interface); addr != nil {
ins.sourceAddress = ins.Interface
} else {
i, err := net.InterfaceByName(ins.Interface)
if err != nil {
return fmt.Errorf("failed to get interface: %v", err)
}
addrs, err := i.Addrs()
if err != nil {
return fmt.Errorf("failed to get the address of interface: %v", err)
}
ins.sourceAddress = addrs[0].(*net.IPNet).IP.String()
}
}
return nil
}
type Ping struct {
Interval config.Duration `toml:"interval"`
Instances []*PingInstance `toml:"instances"`
Counter uint64
wg sync.WaitGroup
}
func init() {
inputs.Add(inputName, func() inputs.Input {
return &Ping{}
})
}
func (p *Ping) Prefix() string {
2022-04-19 17:36:45 +08:00
return inputName
}
func (p *Ping) GetInterval() config.Duration {
return p.Interval
}
func (p *Ping) Init() error {
if len(p.Instances) == 0 {
2022-04-25 12:08:56 +08:00
return types.ErrInstancesEmpty
2022-04-19 17:36:45 +08:00
}
for i := 0; i < len(p.Instances); i++ {
if err := p.Instances[i].Init(); err != nil {
return err
}
}
return nil
}
func (p *Ping) Drop() {}
2022-04-25 15:34:15 +08:00
func (p *Ping) Gather(slist *list.SafeList) {
2022-04-19 17:36:45 +08:00
atomic.AddUint64(&p.Counter, 1)
for i := range p.Instances {
ins := p.Instances[i]
p.wg.Add(1)
go p.gatherOnce(slist, ins)
}
p.wg.Wait()
}
func (p *Ping) gatherOnce(slist *list.SafeList, ins *PingInstance) {
defer p.wg.Done()
2022-04-19 18:09:16 +08:00
if ins.IntervalTimes > 0 {
counter := atomic.LoadUint64(&p.Counter)
if counter%uint64(ins.IntervalTimes) != 0 {
return
}
}
2022-04-19 18:57:43 +08:00
if config.Config.DebugMode {
if len(ins.Targets) == 0 {
log.Println("D! ping targets empty")
}
}
2022-04-19 18:09:16 +08:00
wg := new(sync.WaitGroup)
for _, target := range ins.Targets {
wg.Add(1)
go func(target string) {
defer wg.Done()
ins.gather(slist, target)
}(target)
}
wg.Wait()
}
func (ins *PingInstance) gather(slist *list.SafeList, target string) {
2022-04-19 18:41:49 +08:00
if config.Config.DebugMode {
log.Println("D! ping...", target)
}
2022-04-19 18:09:16 +08:00
labels := map[string]string{"target": target}
for k, v := range ins.Labels {
labels[k] = v
}
fields := map[string]interface{}{}
defer func() {
for field, value := range fields {
slist.PushFront(inputs.NewSample(field, value, labels))
}
}()
stats, err := ins.ping(target)
if err != nil {
log.Println("E! failed to ping:", target, "error:", err)
if strings.Contains(err.Error(), "unknown") {
fields["result_code"] = 1
} else {
fields["result_code"] = 2
}
return
}
fields["result_code"] = 0
if stats.PacketsSent == 0 {
if config.Config.DebugMode {
log.Println("D! no packets sent, target:", target)
}
fields["result_code"] = 2
return
}
if stats.PacketsRecv == 0 {
if config.Config.DebugMode {
log.Println("D! no packets received, target:", target)
}
fields["result_code"] = 1
fields["percent_packet_loss"] = float64(100)
return
}
fields["percent_packet_loss"] = float64(stats.PacketLoss)
}
type pingStats struct {
ping.Statistics
ttl int
}
func (ins *PingInstance) ping(destination string) (*pingStats, error) {
ps := &pingStats{}
pinger, err := ping.NewPinger(destination)
if err != nil {
return nil, fmt.Errorf("failed to create new pinger: %w", err)
}
pinger.SetPrivileged(true)
if ins.IPv6 {
pinger.SetNetwork("ip6")
}
pinger.Size = defaultPingDataBytesSize
if ins.Size != nil {
pinger.Size = *ins.Size
}
pinger.Source = ins.sourceAddress
pinger.Interval = ins.calcInterval
pinger.Timeout = ins.calcTimeout
2022-04-19 18:09:16 +08:00
// Get Time to live (TTL) of first response, matching original implementation
once := &sync.Once{}
pinger.OnRecv = func(pkt *ping.Packet) {
once.Do(func() {
ps.ttl = pkt.Ttl
})
}
pinger.Count = ins.Count
err = pinger.Run()
if err != nil {
if strings.Contains(err.Error(), "operation not permitted") {
if runtime.GOOS == "linux" {
return nil, fmt.Errorf("permission changes required, enable CAP_NET_RAW capabilities (refer to the ping plugin's README.md for more info)")
}
return nil, fmt.Errorf("permission changes required, refer to the ping plugin's README.md for more info")
}
return nil, fmt.Errorf("%w", err)
}
ps.Statistics = *pinger.Statistics()
return ps, nil
2022-04-19 17:36:45 +08:00
}