refactor zookeeper plugin

This commit is contained in:
Ulric Qin 2022-06-20 18:08:14 +08:00
parent 0f05600c16
commit c364661025
1 changed files with 63 additions and 36 deletions

View File

@ -22,7 +22,7 @@ import (
const (
inputName = "zookeeper"
commandNotAllowedTmpl = "warning: %q command isn't allowed at %q, see '4lw.commands.whitelist' ZK config parameter"
commandNotAllowedTmpl = "E!: %q command isn't allowed at %q, see '4lw.commands.whitelist' ZK config parameter\n"
instanceNotServingMessage = "This ZooKeeper instance is not currently serving requests"
cmdNotExecutedSffx = "is not executed because it is not in the whitelist."
)
@ -33,28 +33,29 @@ var (
)
type Instance struct {
Addresses string `toml:"addresses"`
Timeout int `toml:"timeout"`
ClusterName string `toml:"cluster_name"`
Labels map[string]string `toml:"labels"`
Addresses string `toml:"addresses"`
Timeout int `toml:"timeout"`
ClusterName string `toml:"cluster_name"`
Labels map[string]string `toml:"labels"`
IntervalTimes int64 `toml:"interval_times"`
tls.ClientConfig
}
func (i *Instance) ZkHosts() []string {
return strings.Fields(i.Addresses)
func (ins *Instance) ZkHosts() []string {
return strings.Fields(ins.Addresses)
}
func (i *Instance) ZkConnect(host string) (net.Conn, error) {
dialer := net.Dialer{Timeout: time.Duration(i.Timeout) * time.Second}
func (ins *Instance) ZkConnect(host string) (net.Conn, error) {
dialer := net.Dialer{Timeout: time.Duration(ins.Timeout) * time.Second}
tcpaddr, err := net.ResolveTCPAddr("tcp", host)
if err != nil {
return nil, fmt.Errorf("failed to resolve zookeeper(cluster: %s) address: %s: %v", i.ClusterName, host, err)
return nil, fmt.Errorf("failed to resolve zookeeper(cluster: %s) address: %s: %v", ins.ClusterName, host, err)
}
if !i.UseTLS {
if !ins.UseTLS {
return dialer.Dial("tcp", tcpaddr.String())
}
tlsConfig, err := i.TLSConfig()
tlsConfig, err := ins.TLSConfig()
if err != nil {
return nil, fmt.Errorf("failed to init tls config: %v", err)
}
@ -63,10 +64,9 @@ func (i *Instance) ZkConnect(host string) (net.Conn, error) {
type Zookeeper struct {
config.Interval
counter uint64
waitgrp sync.WaitGroup
Instances []*Instance `toml:"instances"`
Counter uint64
wg sync.WaitGroup
}
func init() {
@ -89,24 +89,46 @@ func (z *Zookeeper) Init() error {
func (z *Zookeeper) Drop() {}
func (z *Zookeeper) Gather(slist *list.SafeList) {
atomic.AddUint64(&z.Counter, 1)
atomic.AddUint64(&z.counter, 1)
for i := range z.Instances {
ins := z.Instances[i]
zkHosts := ins.ZkHosts()
if len(zkHosts) == 0 {
log.Printf("E! no target zookeeper cluster %s addresses specified", ins.ClusterName)
continue
}
for _, zkHost := range zkHosts {
z.wg.Add(1)
go z.gatherOnce(slist, ins, zkHost)
}
z.waitgrp.Add(1)
go func(slist *list.SafeList, ins *Instance) {
defer z.waitgrp.Done()
if ins.IntervalTimes > 0 {
counter := atomic.LoadUint64(&z.counter)
if counter%uint64(ins.IntervalTimes) != 0 {
return
}
}
ins.gatherOnce(slist)
}(slist, ins)
}
z.wg.Wait()
z.waitgrp.Wait()
}
func (z *Zookeeper) gatherOnce(slist *list.SafeList, ins *Instance, zkHost string) {
defer z.wg.Done()
func (ins *Instance) gatherOnce(slist *list.SafeList) {
hosts := ins.ZkHosts()
if len(hosts) == 0 {
log.Println("E! addresses empty")
return
}
wg := new(sync.WaitGroup)
for i := 0; i < len(hosts); i++ {
wg.Add(1)
go ins.gatherOneHost(wg, slist, hosts[i])
}
wg.Wait()
}
func (ins *Instance) gatherOneHost(wg *sync.WaitGroup, slist *list.SafeList, zkHost string) {
defer wg.Done()
tags := map[string]string{"zk_host": zkHost, "zk_cluster": ins.ClusterName}
for k, v := range ins.Labels {
@ -122,28 +144,29 @@ func (z *Zookeeper) gatherOnce(slist *list.SafeList, ins *Instance, zkHost strin
}(begun)
// zk_up
conn, err := ins.ZkConnect(zkHost)
mntrConn, err := ins.ZkConnect(zkHost)
if err != nil {
slist.PushFront(inputs.NewSample("zk_up", 0, tags))
log.Println("E! :"+zkHost, "err:", err)
log.Println("E! failed to connect zookeeper:", zkHost, "error:", err)
return
}
defer conn.Close()
z.gatherMntrResult(conn, slist, ins, tags)
defer mntrConn.Close()
ins.gatherMntrResult(mntrConn, slist, tags)
// zk_ruok
ruokConn, err := ins.ZkConnect(zkHost)
if err != nil {
slist.PushFront(inputs.NewSample("zk_ruok", 0, tags))
log.Println("E! :"+zkHost, "err:", err)
log.Println("E! failed to connect zookeeper:", zkHost, "error:", err)
return
}
defer ruokConn.Close()
z.gatherRuokResult(ruokConn, slist, ins, tags)
ins.gatherRuokResult(ruokConn, slist, tags)
}
func (z *Zookeeper) gatherMntrResult(conn net.Conn, slist *list.SafeList, ins *Instance, globalTags map[string]string) {
func (ins *Instance) gatherMntrResult(conn net.Conn, slist *list.SafeList, globalTags map[string]string) {
res := sendZookeeperCmd(conn, "mntr")
// get slice of strings from response, like 'zk_avg_latency 0'
@ -171,6 +194,10 @@ func (z *Zookeeper) gatherMntrResult(conn net.Conn, slist *list.SafeList, ins *I
}
kv := strings.Fields(l)
if len(kv) != 2 {
continue
}
key := kv[0]
value := kv[1]
@ -207,7 +234,7 @@ func (z *Zookeeper) gatherMntrResult(conn net.Conn, slist *list.SafeList, ins *I
}
}
func (z *Zookeeper) gatherRuokResult(conn net.Conn, slist *list.SafeList, ins *Instance, globalTags map[string]string) {
func (ins *Instance) gatherRuokResult(conn net.Conn, slist *list.SafeList, globalTags map[string]string) {
res := sendZookeeperCmd(conn, "ruok")
if res == "imok" {
slist.PushFront(inputs.NewSample("zk_ruok", 1, globalTags))