新增rabbitmq、haproxy组件采集 (#575)
* add * add prober plugin for rabbitmq * add prober plugin for haproxy Co-authored-by: root <root@localhost.localdomain> Co-authored-by: UlricQin <ulric.qin@gmail.com>
This commit is contained in:
parent
e839c6bd6b
commit
3fdd61edfc
|
@ -11,6 +11,8 @@ import (
|
|||
_ "github.com/didi/nightingale/src/modules/monapi/plugins/redis"
|
||||
_ "github.com/didi/nightingale/src/modules/monapi/plugins/nginx"
|
||||
_ "github.com/didi/nightingale/src/modules/monapi/plugins/elasticsearch"
|
||||
_ "github.com/didi/nightingale/src/modules/monapi/plugins/rabbitmq"
|
||||
_ "github.com/didi/nightingale/src/modules/monapi/plugins/haproxy"
|
||||
_ "github.com/didi/nightingale/src/modules/monapi/plugins/tengine"
|
||||
_ "github.com/didi/nightingale/src/modules/monapi/plugins/zookeeper"
|
||||
|
||||
|
|
|
@ -0,0 +1,71 @@
|
|||
package haproxy
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"github.com/didi/nightingale/src/modules/monapi/collector"
|
||||
"github.com/didi/nightingale/src/modules/monapi/plugins"
|
||||
"github.com/didi/nightingale/src/toolkits/i18n"
|
||||
"github.com/influxdata/telegraf"
|
||||
"github.com/didi/nightingale/src/modules/monapi/plugins/haproxy/haproxy"
|
||||
)
|
||||
|
||||
func init() {
|
||||
collector.CollectorRegister(NewHaproxyCollector()) // for monapi
|
||||
i18n.DictRegister(langDict)
|
||||
}
|
||||
|
||||
type HaproxyCollector struct {
|
||||
*collector.BaseCollector
|
||||
}
|
||||
|
||||
func NewHaproxyCollector() *HaproxyCollector {
|
||||
return &HaproxyCollector{BaseCollector: collector.NewBaseCollector(
|
||||
"haproxy",
|
||||
collector.RemoteCategory,
|
||||
func() collector.TelegrafPlugin { return &HaproxyRule{} },
|
||||
)}
|
||||
}
|
||||
|
||||
var (
|
||||
langDict = map[string]map[string]string{
|
||||
"zh": map[string]string{
|
||||
"Servers": "Servers",
|
||||
"Username": "用户名",
|
||||
"Password": "密码",
|
||||
},
|
||||
}
|
||||
)
|
||||
|
||||
type HaproxyRule struct {
|
||||
Servers []string `label:"Servers" json:"servers,required" example:"http://myhaproxy.com:1936/haproxy?stats"`
|
||||
KeepFieldNames bool `label:"KeepFieldNames" json:"keepFieldNames" default:"false" description:"Setting this option to true results in the plugin keeping the original"`
|
||||
Username string `label:"Username" json:"username" description:"specify username"`
|
||||
Password string `label:"Password" json:"password" format:"password" description:"specify server password"`
|
||||
|
||||
plugins.ClientConfig
|
||||
}
|
||||
|
||||
func (p *HaproxyRule) Validate() error {
|
||||
if len(p.Servers) == 0 || p.Servers[0] == "" {
|
||||
return fmt.Errorf("haproxy.rule.servers must be set")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *HaproxyRule) TelegrafInput() (telegraf.Input, error) {
|
||||
if err := p.Validate(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
ha := &haproxy.Haproxy{
|
||||
|
||||
Servers: p.Servers,
|
||||
KeepFieldNames: p.KeepFieldNames,
|
||||
Username: p.Username,
|
||||
Password: p.Password,
|
||||
ClientConfig: p.ClientConfig.TlsClientConfig(),
|
||||
}
|
||||
|
||||
return ha, nil
|
||||
}
|
||||
|
|
@ -0,0 +1,313 @@
|
|||
package haproxy
|
||||
|
||||
import (
|
||||
"encoding/csv"
|
||||
"fmt"
|
||||
"io"
|
||||
"net"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"path/filepath"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/influxdata/telegraf"
|
||||
"github.com/influxdata/telegraf/plugins/common/tls"
|
||||
"github.com/influxdata/telegraf/plugins/inputs"
|
||||
|
||||
)
|
||||
|
||||
//CSV format: https://cbonte.github.io/haproxy-dconv/1.5/configuration.html#9.1
|
||||
|
||||
type Haproxy struct {
|
||||
Servers []string
|
||||
KeepFieldNames bool
|
||||
Username string
|
||||
Password string
|
||||
tls.ClientConfig
|
||||
|
||||
client *http.Client
|
||||
}
|
||||
|
||||
var sampleConfig = `
|
||||
## An array of address to gather stats about. Specify an ip on hostname
|
||||
## with optional port. ie localhost, 10.10.3.33:1936, etc.
|
||||
## Make sure you specify the complete path to the stats endpoint
|
||||
## including the protocol, ie http://10.10.3.33:1936/haproxy?stats
|
||||
|
||||
## If no servers are specified, then default to 127.0.0.1:1936/haproxy?stats
|
||||
servers = ["http://myhaproxy.com:1936/haproxy?stats"]
|
||||
|
||||
## Credentials for basic HTTP authentication
|
||||
# username = "admin"
|
||||
# password = "admin"
|
||||
|
||||
## You can also use local socket with standard wildcard globbing.
|
||||
## Server address not starting with 'http' will be treated as a possible
|
||||
## socket, so both examples below are valid.
|
||||
# servers = ["socket:/run/haproxy/admin.sock", "/run/haproxy/*.sock"]
|
||||
|
||||
## By default, some of the fields are renamed from what haproxy calls them.
|
||||
## Setting this option to true results in the plugin keeping the original
|
||||
## field names.
|
||||
# keep_field_names = false
|
||||
|
||||
## Optional TLS Config
|
||||
# tls_ca = "/etc/telegraf/ca.pem"
|
||||
# tls_cert = "/etc/telegraf/cert.pem"
|
||||
# tls_key = "/etc/telegraf/key.pem"
|
||||
## Use TLS but skip chain & host verification
|
||||
# insecure_skip_verify = false
|
||||
`
|
||||
|
||||
func (r *Haproxy) SampleConfig() string {
|
||||
return sampleConfig
|
||||
}
|
||||
|
||||
func (r *Haproxy) Description() string {
|
||||
return "Read metrics of haproxy, via socket or csv stats page"
|
||||
}
|
||||
|
||||
// Reads stats from all configured servers accumulates stats.
|
||||
// Returns one of the errors encountered while gather stats (if any).
|
||||
func (g *Haproxy) Gather(acc telegraf.Accumulator) error {
|
||||
if len(g.Servers) == 0 {
|
||||
return g.gatherServer("http://127.0.0.1:1936/haproxy?stats", acc)
|
||||
}
|
||||
|
||||
endpoints := make([]string, 0, len(g.Servers))
|
||||
|
||||
for _, endpoint := range g.Servers {
|
||||
|
||||
if strings.HasPrefix(endpoint, "http") {
|
||||
endpoints = append(endpoints, endpoint)
|
||||
continue
|
||||
}
|
||||
|
||||
socketPath := getSocketAddr(endpoint)
|
||||
|
||||
matches, err := filepath.Glob(socketPath)
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if len(matches) == 0 {
|
||||
endpoints = append(endpoints, socketPath)
|
||||
} else {
|
||||
for _, match := range matches {
|
||||
endpoints = append(endpoints, match)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(len(endpoints))
|
||||
for _, server := range endpoints {
|
||||
go func(serv string) {
|
||||
defer wg.Done()
|
||||
if err := g.gatherServer(serv, acc); err != nil {
|
||||
acc.AddError(err)
|
||||
}
|
||||
}(server)
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (g *Haproxy) gatherServerSocket(addr string, acc telegraf.Accumulator) error {
|
||||
socketPath := getSocketAddr(addr)
|
||||
|
||||
c, err := net.Dial("unix", socketPath)
|
||||
|
||||
if err != nil {
|
||||
return fmt.Errorf("Could not connect to socket '%s': %s", addr, err)
|
||||
}
|
||||
|
||||
_, errw := c.Write([]byte("show stat\n"))
|
||||
|
||||
if errw != nil {
|
||||
return fmt.Errorf("Could not write to socket '%s': %s", addr, errw)
|
||||
}
|
||||
|
||||
return g.importCsvResult(c, acc, socketPath)
|
||||
}
|
||||
|
||||
func (g *Haproxy) gatherServer(addr string, acc telegraf.Accumulator) error {
|
||||
if !strings.HasPrefix(addr, "http") {
|
||||
return g.gatherServerSocket(addr, acc)
|
||||
}
|
||||
|
||||
if g.client == nil {
|
||||
tlsCfg, err := g.ClientConfig.TLSConfig()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
tr := &http.Transport{
|
||||
ResponseHeaderTimeout: time.Duration(3 * time.Second),
|
||||
TLSClientConfig: tlsCfg,
|
||||
}
|
||||
client := &http.Client{
|
||||
Transport: tr,
|
||||
Timeout: time.Duration(4 * time.Second),
|
||||
}
|
||||
g.client = client
|
||||
}
|
||||
|
||||
if !strings.HasSuffix(addr, ";csv") {
|
||||
addr += "/;csv"
|
||||
}
|
||||
|
||||
u, err := url.Parse(addr)
|
||||
if err != nil {
|
||||
return fmt.Errorf("unable parse server address '%s': %s", addr, err)
|
||||
}
|
||||
|
||||
req, err := http.NewRequest("GET", addr, nil)
|
||||
if err != nil {
|
||||
return fmt.Errorf("unable to create new request '%s': %s", addr, err)
|
||||
}
|
||||
if u.User != nil {
|
||||
p, _ := u.User.Password()
|
||||
req.SetBasicAuth(u.User.Username(), p)
|
||||
u.User = &url.Userinfo{}
|
||||
addr = u.String()
|
||||
}
|
||||
|
||||
if g.Username != "" || g.Password != "" {
|
||||
req.SetBasicAuth(g.Username, g.Password)
|
||||
}
|
||||
|
||||
res, err := g.client.Do(req)
|
||||
if err != nil {
|
||||
return fmt.Errorf("unable to connect to haproxy server '%s': %s", addr, err)
|
||||
}
|
||||
defer res.Body.Close()
|
||||
|
||||
if res.StatusCode != 200 {
|
||||
return fmt.Errorf("unable to get valid stat result from '%s', http response code : %d", addr, res.StatusCode)
|
||||
}
|
||||
|
||||
if err := g.importCsvResult(res.Body, acc, u.Host); err != nil {
|
||||
return fmt.Errorf("unable to parse stat result from '%s': %s", addr, err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func getSocketAddr(sock string) string {
|
||||
socketAddr := strings.Split(sock, ":")
|
||||
|
||||
if len(socketAddr) >= 2 {
|
||||
return socketAddr[1]
|
||||
} else {
|
||||
return socketAddr[0]
|
||||
}
|
||||
}
|
||||
|
||||
var typeNames = []string{"frontend", "backend", "server", "listener"}
|
||||
var fieldRenames = map[string]string{
|
||||
"pxname": "proxy",
|
||||
"svname": "sv",
|
||||
"act": "active_servers",
|
||||
"bck": "backup_servers",
|
||||
"cli_abrt": "cli_abort",
|
||||
"srv_abrt": "srv_abort",
|
||||
"hrsp_1xx": "http_response.1xx",
|
||||
"hrsp_2xx": "http_response.2xx",
|
||||
"hrsp_3xx": "http_response.3xx",
|
||||
"hrsp_4xx": "http_response.4xx",
|
||||
"hrsp_5xx": "http_response.5xx",
|
||||
"hrsp_other": "http_response.other",
|
||||
}
|
||||
|
||||
func (g *Haproxy) importCsvResult(r io.Reader, acc telegraf.Accumulator, host string) error {
|
||||
csvr := csv.NewReader(r)
|
||||
now := time.Now()
|
||||
|
||||
headers, err := csvr.Read()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if len(headers[0]) <= 2 || headers[0][:2] != "# " {
|
||||
return fmt.Errorf("did not receive standard haproxy headers")
|
||||
}
|
||||
headers[0] = headers[0][2:]
|
||||
|
||||
for {
|
||||
row, err := csvr.Read()
|
||||
if err == io.EOF {
|
||||
break
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
fields := make(map[string]interface{})
|
||||
tags := map[string]string{
|
||||
"server": host,
|
||||
}
|
||||
|
||||
if len(row) != len(headers) {
|
||||
return fmt.Errorf("number of columns does not match number of headers. headers=%d columns=%d", len(headers), len(row))
|
||||
}
|
||||
for i, v := range row {
|
||||
if v == "" {
|
||||
continue
|
||||
}
|
||||
|
||||
colName := headers[i]
|
||||
fieldName := colName
|
||||
if !g.KeepFieldNames {
|
||||
if fieldRename, ok := fieldRenames[colName]; ok {
|
||||
fieldName = fieldRename
|
||||
}
|
||||
}
|
||||
|
||||
switch colName {
|
||||
case "pxname", "svname":
|
||||
tags[fieldName] = v
|
||||
case "type":
|
||||
vi, err := strconv.ParseInt(v, 10, 64)
|
||||
if err != nil {
|
||||
return fmt.Errorf("unable to parse type value '%s'", v)
|
||||
}
|
||||
if vi >= int64(len(typeNames)) {
|
||||
return fmt.Errorf("received unknown type value: %d", vi)
|
||||
}
|
||||
tags[fieldName] = typeNames[vi]
|
||||
case "check_desc", "agent_desc":
|
||||
// do nothing. These fields are just a more verbose description of the check_status & agent_status fields
|
||||
case "status", "check_status", "last_chk", "mode", "tracked", "agent_status", "last_agt", "addr", "cookie":
|
||||
// these are string fields
|
||||
fields[fieldName] = v
|
||||
case "lastsess":
|
||||
vi, err := strconv.ParseInt(v, 10, 64)
|
||||
if err != nil {
|
||||
//TODO log the error. And just once (per column) so we don't spam the log
|
||||
continue
|
||||
}
|
||||
fields[fieldName] = vi
|
||||
default:
|
||||
vi, err := strconv.ParseUint(v, 10, 64)
|
||||
if err != nil {
|
||||
//TODO log the error. And just once (per column) so we don't spam the log
|
||||
continue
|
||||
}
|
||||
fields[fieldName] = vi
|
||||
}
|
||||
}
|
||||
acc.AddFields("haproxy", fields, tags, now)
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func init() {
|
||||
inputs.Add("haproxy", func() telegraf.Input {
|
||||
return &Haproxy{}
|
||||
})
|
||||
}
|
||||
|
|
@ -0,0 +1,98 @@
|
|||
package rabbitmq
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"time"
|
||||
"reflect"
|
||||
"github.com/didi/nightingale/src/modules/monapi/collector"
|
||||
"github.com/didi/nightingale/src/modules/monapi/plugins"
|
||||
"github.com/didi/nightingale/src/toolkits/i18n"
|
||||
"github.com/influxdata/telegraf"
|
||||
"github.com/influxdata/telegraf/plugins/inputs/rabbitmq"
|
||||
)
|
||||
|
||||
func init() {
|
||||
collector.CollectorRegister(NewRabbitMQCollector()) // for monapi
|
||||
i18n.DictRegister(langDict)
|
||||
}
|
||||
|
||||
type RabbitMQCollector struct {
|
||||
*collector.BaseCollector
|
||||
}
|
||||
|
||||
func NewRabbitMQCollector() *RabbitMQCollector {
|
||||
return &RabbitMQCollector{BaseCollector: collector.NewBaseCollector(
|
||||
"rabbitMQ",
|
||||
collector.RemoteCategory,
|
||||
func() collector.TelegrafPlugin { return &RabbitMQRule{} },
|
||||
)}
|
||||
}
|
||||
|
||||
var (
|
||||
langDict = map[string]map[string]string{
|
||||
"zh": map[string]string{
|
||||
"URL": "URL",
|
||||
"Name": "节点名称",
|
||||
"Username": "用户名",
|
||||
"Password": "密码",
|
||||
"header time out": "请求超时时间",
|
||||
"client time out": "连接超时时间",
|
||||
"nodes": "MQ节点",
|
||||
"queues": "队列",
|
||||
"exchanges": "Exchange交换机",
|
||||
"QueueNameInclude": "包含队列",
|
||||
"QueueNameExclude": "排除队列",
|
||||
|
||||
},
|
||||
}
|
||||
)
|
||||
|
||||
type RabbitMQRule struct {
|
||||
URL string `label:"URL" json:"url,required" example:"http://localhost:15672"`
|
||||
Name string `label:"Name" json:"Name" description:"Tag added to rabbitmq_overview series"`
|
||||
Username string `label:"Username" json:"username,required" description:"specify username"`
|
||||
Password string `label:"Password" json:"password,required" format:"password" description:"specify server password"`
|
||||
ResponseHeaderTimeout int `label:"header time out" json:"header_timeout" default:"3" description:"for a server's response headers after fully writing the request"`
|
||||
ClientTimeout int `label:"client time out" json:"client_timeout" default:"4" description:"for a server's response headers after fully writing the request"`
|
||||
Nodes []string `label:"nodes" json:"nodes" description:"A list of nodes to gather as the rabbitmq_node measurement"`
|
||||
Queues []string `label:"queues" json:"queues" description:"A list of queues to gather as the rabbitmq_queue measurement"`
|
||||
Exchanges []string `label:"exchanges" json:"exchanges" description:"A list of exchanges to gather as the rabbitmq_exchange measurement"`
|
||||
QueueNameInclude []string `label:"queue name include" json:"queue_name_include" description:"Queues to include."`
|
||||
QueueNameExclude []string `label:"queue name exclude" json:"queue_name_exclude" description:"Queues to exclude."`
|
||||
FederationUpstreamInclude []string `label:"FederationUpstreamInclude" json:"federation_upstream_include" description:"exchange filters include"`
|
||||
FederationUpstreamExclude []string `label:"FederationUpstreamExclude" json:"federation_upstream_exclude" description:"exchange filters exclude"`
|
||||
plugins.ClientConfig
|
||||
}
|
||||
|
||||
func (p *RabbitMQRule) Validate() error {
|
||||
if len(p.URL) == 0 || p.URL == "" {
|
||||
return fmt.Errorf("rabbitmq.rule.servers must be set")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *RabbitMQRule) TelegrafInput() (telegraf.Input, error) {
|
||||
if err := p.Validate(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
mq := &rabbitmq.RabbitMQ{
|
||||
|
||||
URL: p.URL,
|
||||
Name: p.Name,
|
||||
Username: p.Username,
|
||||
Password: p.Password,
|
||||
Nodes: p.Nodes,
|
||||
Queues: p.Queues,
|
||||
Exchanges: p.Exchanges,
|
||||
QueueInclude: p.QueueNameInclude,
|
||||
QueueExclude: p.QueueNameExclude,
|
||||
ClientConfig: p.ClientConfig.TlsClientConfig(),
|
||||
}
|
||||
v := reflect.ValueOf(&(mq.ResponseHeaderTimeout.Duration)).Elem()
|
||||
v.Set(reflect.ValueOf(time.Second * time.Duration(p.ResponseHeaderTimeout)))
|
||||
v1 := reflect.ValueOf(&(mq.ClientTimeout.Duration)).Elem()
|
||||
v1.Set(reflect.ValueOf(time.Second * time.Duration(p.ClientTimeout)))
|
||||
return mq, nil
|
||||
}
|
||||
|
Loading…
Reference in New Issue