code refactor

This commit is contained in:
Ulric Qin 2022-04-13 17:16:16 +08:00
parent 61019631b2
commit 6f0e1b26a7
4 changed files with 82 additions and 71 deletions

View File

@ -48,17 +48,23 @@ func (a *Agent) String() string {
func (a *Agent) Start() {
log.Println("I! agent starting")
StartInputs(a.ConfigDir)
a.startInputs()
}
func (a *Agent) Stop() {
log.Println("I! agent stopping")
for name := range InputConsumers {
InputConsumers[name].Instance.StopGoroutines()
close(InputConsumers[name].Queue)
}
}
func (a *Agent) Reload() {
log.Println("I! agent reloading")
a.Stop()
a.Start()
}
// -----
@ -66,13 +72,30 @@ func (a *Agent) Reload() {
type Consumer struct {
Instance inputs.Input
Queue chan *types.Sample
Quit chan struct{}
}
func (c *Consumer) Start() {
// start consumer goroutines
go consume(c.Queue)
// start collector goroutines
c.Instance.StartGoroutines(c.Queue)
}
func consume(queue chan *types.Sample) {
for s := range queue {
fmt.Println(s.Metric)
fmt.Println(s.Labels)
fmt.Println(s.Timestamp)
fmt.Println(s.Value)
fmt.Println()
}
}
var InputConsumers = map[string]*Consumer{}
func StartInputs(confd string) error {
names, err := getInputsByDirs(confd)
func (a *Agent) startInputs() error {
names, err := a.getInputsByDirs()
if err != nil {
return err
}
@ -93,7 +116,7 @@ func StartInputs(confd string) error {
instance := creator()
// set configurations for input instance
loadConfigs(path.Join(confd, "input."+name), instance.GetPointer())
loadConfigs(path.Join(a.ConfigDir, "input."+name), instance)
// check configurations
if err = instance.TidyConfig(); err != nil {
@ -103,15 +126,11 @@ func StartInputs(confd string) error {
c := &Consumer{
Instance: instance,
Quit: make(chan struct{}),
Queue: make(chan *types.Sample, 1000000),
}
// start consumer goroutines
go consume(c.Queue)
// start collector goroutines
instance.StartGoroutines(c.Queue)
log.Println("I! input:", name, "started")
c.Start()
InputConsumers[name] = c
}
@ -119,15 +138,6 @@ func StartInputs(confd string) error {
return nil
}
func consume(queue chan *types.Sample) {
for s := range queue {
fmt.Println(s.Metric)
fmt.Println(s.Labels)
fmt.Println(s.Timestamp)
fmt.Println(s.Value)
}
}
func loadConfigs(configDir string, configPtr interface{}) error {
loaders := []multiconfig.Loader{
&multiconfig.TagLoader{},
@ -156,16 +166,14 @@ func loadConfigs(configDir string, configPtr interface{}) error {
Validator: multiconfig.MultiValidator(&multiconfig.RequiredValidator{}),
}
m.MustLoad(configPtr)
return nil
return m.Load(configPtr)
}
// input dir should has prefix input.
func getInputsByDirs(confd string) ([]string, error) {
dirs, err := file.DirsUnder(confd)
func (a *Agent) getInputsByDirs() ([]string, error) {
dirs, err := file.DirsUnder(a.ConfigDir)
if err != nil {
return nil, fmt.Errorf("failed to get dirs under %s : %v", confd, err)
return nil, fmt.Errorf("failed to get dirs under %s : %v", a.ConfigDir, err)
}
count := len(dirs)

View File

@ -1,8 +1,22 @@
IntervalMS = 30000
TimeoutMS = 20000
Labels = {aaaa = "bbbb"}
# collect interval
IntervalSeconds = 30
# collect timeout per target
TimeoutSeconds = 20
# append labels for series
Labels = { aaaa = "bbbb", cccc = "ddddd" }
[[Targets]]
# redis server address
Address = "127.0.0.1:6379"
IntervalMS = 15000
TimeoutMS = 10000
# redis server password
Password = "1234"
# collect interval this target
IntervalSeconds = 5
# collect timeout this target
TimeoutSeconds = 5

View File

@ -4,21 +4,10 @@ import (
"flashcat.cloud/categraf/types"
)
type PluginDescriber interface {
// TidyConfig Validate and tidy configurations
TidyConfig() error
// Description returns a one-sentence description
Description() string
}
type Input interface {
PluginDescriber
TidyConfig() error
StartGoroutines(chan *types.Sample)
StopGoroutines()
GetPointer() interface{}
}
type Creator func() Input

View File

@ -1,7 +1,9 @@
package redis
import (
"fmt"
"log"
"strings"
"time"
"flashcat.cloud/categraf/inputs"
@ -14,47 +16,36 @@ var (
)
type Target struct {
IntervalMS int64
TimeoutMS int64
Address string
Password string
quit chan struct{}
Labels map[string]string
IntervalSeconds int64
TimeoutSeconds int64
Labels map[string]string
Address string
Password string
quit chan struct{}
}
type Redis struct {
ConfigBytes []byte
IntervalSeconds int64
TimeoutSeconds int64
Labels map[string]string
IntervalMS int64
TimeoutMS int64
Targets []*Target
Labels map[string]string
}
// overwrite func
func (r *Redis) Description() string {
return "Read metrics from one or many redis servers"
}
func (r *Redis) GetPointer() interface{} {
return r
Targets []*Target
}
// overwrite func
func (r *Redis) TidyConfig() error {
log.Printf("-------%#v", r)
if len(r.Targets) == 0 {
log.Println("I! [redis] Targets is empty")
}
return nil
}
// overwrite func
func (r *Redis) StopGoroutines() {
for i := 0; i < len(r.Targets); i++ {
close(r.Targets[i].quit)
r.Targets[i].quit <- struct{}{}
}
}
@ -67,12 +58,12 @@ func (r *Redis) StartGoroutines(queue chan *types.Sample) {
}
func (t *Target) getInterval(r *Redis) time.Duration {
if t.IntervalMS != 0 {
return time.Duration(t.IntervalMS) * time.Millisecond
if t.IntervalSeconds != 0 {
return time.Duration(t.IntervalSeconds) * time.Second
}
if r.IntervalMS != 0 {
return time.Duration(r.IntervalMS) * time.Millisecond
if r.IntervalSeconds != 0 {
return time.Duration(r.IntervalSeconds) * time.Second
}
return DefaultInterval
@ -85,6 +76,15 @@ func (t *Target) LoopGather(r *Redis, queue chan *types.Sample) {
return
default:
time.Sleep(t.getInterval(r))
defer func() {
if r := recover(); r != nil {
if strings.Contains(fmt.Sprint(r), "closed channel") {
return
} else {
log.Println("E! gather redis:", t.Address, " panic:", r)
}
}
}()
t.Gather(r, queue)
}
}