input plugin ping skelton done

This commit is contained in:
Ulric Qin 2022-04-19 17:36:45 +08:00
parent bc8294affa
commit 48803f41a9
4 changed files with 187 additions and 6 deletions

View File

@ -23,6 +23,7 @@ import (
_ "flashcat.cloud/categraf/inputs/net"
_ "flashcat.cloud/categraf/inputs/netstat"
_ "flashcat.cloud/categraf/inputs/oracle"
_ "flashcat.cloud/categraf/inputs/ping"
_ "flashcat.cloud/categraf/inputs/processes"
_ "flashcat.cloud/categraf/inputs/redis"
_ "flashcat.cloud/categraf/inputs/system"

43
conf/input.ping/ping.toml Normal file
View File

@ -0,0 +1,43 @@
# # collect interval
# interval = 15
[[instances]]
# send ping packets to
targets = ["example.org"]
# # interval = global.interval * interval_times
# interval_times = 1
## Number of ping packets to send per interval. Corresponds to the "-c"
## option of the ping command.
# count = 1
## Time to wait between sending ping packets in seconds. Operates like the
## "-i" option of the ping command.
# ping_interval = 1.0
## If set, the time to wait for a ping response in seconds. Operates like
## the "-W" option of the ping command.
# timeout = 1.0
## If set, the total ping deadline, in seconds. Operates like the -w option
## of the ping command.
# deadline = 10
## Interface or source address to send ping from. Operates like the -I or -S
## option of the ping command.
# interface = ""
## Use only IPv6 addresses when resolving a hostname.
# ipv6 = false
## Number of data bytes to be sent. Corresponds to the "-s"
## option of the ping command.
# size = 56
[[instances]]
# send ping packets to
targets = [
"127.0.0.1",
"192.168.8.12"
]

View File

@ -51,6 +51,7 @@ type Oracle struct {
dbconnpool map[string]*sqlx.DB // key: instance
Counter uint64
wg sync.WaitGroup
}
func init() {
@ -103,13 +104,12 @@ func (o *Oracle) Gather() (samples []*types.Sample) {
slist := list.NewSafeList()
var wg sync.WaitGroup
for i := range o.Instances {
ins := o.Instances[i]
wg.Add(1)
go o.collectOnce(&wg, ins, slist)
o.wg.Add(1)
go o.gatherOnce(slist, ins)
}
wg.Wait()
o.wg.Wait()
interfaceList := slist.PopBackAll()
for i := 0; i < len(interfaceList); i++ {
@ -119,8 +119,8 @@ func (o *Oracle) Gather() (samples []*types.Sample) {
return
}
func (o *Oracle) collectOnce(wg *sync.WaitGroup, ins OrclInstance, slist *list.SafeList) {
defer wg.Done()
func (o *Oracle) gatherOnce(slist *list.SafeList, ins OrclInstance) {
defer o.wg.Done()
if ins.IntervalTimes > 0 {
counter := atomic.LoadUint64(&o.Counter)

137
inputs/ping/ping.go Normal file
View File

@ -0,0 +1,137 @@
package ping
import (
"fmt"
"net"
"sync"
"sync/atomic"
"time"
"flashcat.cloud/categraf/config"
"flashcat.cloud/categraf/inputs"
"flashcat.cloud/categraf/types"
"github.com/toolkits/pkg/container/list"
)
const inputName = "ping"
type PingInstance struct {
Targets []string `toml:"targets"`
IntervalTimes int64 `toml:"interval_times"`
Count int `toml:"count"` // ping -c <COUNT>
PingInterval float64 `toml:"ping_interval"` // ping -i <INTERVAL>
Timeout float64 `toml:"timeout"` // ping -W <TIMEOUT>
Deadline int `toml:"deadline"` // ping -w <DEADLINE>
Interface string `toml:"interface"` // ping -I/-S <INTERFACE/SRC_ADDR>
IPv6 bool `toml:"ipv6"` // Whether to resolve addresses using ipv6 or not.
Size int `toml:"size"` // Packet size
calcInterval time.Duration
calcTimeout time.Duration
sourceAddress string
}
func (ins *PingInstance) Init() error {
if ins.Count < 1 {
ins.Count = 1
}
if ins.PingInterval < 0.2 {
ins.calcInterval = time.Duration(0.2 * float64(time.Second))
} else {
ins.calcInterval = time.Duration(ins.PingInterval * float64(time.Second))
}
if ins.Timeout == 0 {
ins.calcTimeout = time.Duration(5) * time.Second
} else {
ins.calcTimeout = time.Duration(ins.Timeout) * time.Second
}
if ins.Deadline < 0 {
ins.Deadline = 10
}
if ins.Interface != "" {
if addr := net.ParseIP(ins.Interface); addr != nil {
ins.sourceAddress = ins.Interface
} else {
i, err := net.InterfaceByName(ins.Interface)
if err != nil {
return fmt.Errorf("failed to get interface: %v", err)
}
addrs, err := i.Addrs()
if err != nil {
return fmt.Errorf("failed to get the address of interface: %v", err)
}
ins.sourceAddress = addrs[0].(*net.IPNet).IP.String()
}
}
return nil
}
type Ping struct {
Interval config.Duration `toml:"interval"`
Instances []*PingInstance `toml:"instances"`
Counter uint64
wg sync.WaitGroup
}
func init() {
inputs.Add(inputName, func() inputs.Input {
return &Ping{}
})
}
func (p *Ping) GetInputName() string {
return inputName
}
func (p *Ping) GetInterval() config.Duration {
return p.Interval
}
func (p *Ping) Init() error {
if len(p.Instances) == 0 {
return fmt.Errorf("ping instances empty")
}
for i := 0; i < len(p.Instances); i++ {
if err := p.Instances[i].Init(); err != nil {
return err
}
}
return nil
}
func (p *Ping) Drop() {}
func (p *Ping) Gather() (samples []*types.Sample) {
atomic.AddUint64(&p.Counter, 1)
slist := list.NewSafeList()
for i := range p.Instances {
ins := p.Instances[i]
p.wg.Add(1)
go p.gatherOnce(slist, ins)
}
p.wg.Wait()
interfaceList := slist.PopBackAll()
for i := 0; i < len(interfaceList); i++ {
samples = append(samples, interfaceList[i].(*types.Sample))
}
return
}
func (p *Ping) gatherOnce(slist *list.SafeList, ins *PingInstance) {
defer p.wg.Done()
fmt.Println("ping.....", ins.Targets)
}