fix mysql plugin: collect slave status
This commit is contained in:
parent
e2a95a91c3
commit
535c38b821
2
Makefile
2
Makefile
|
@ -1,7 +1,7 @@
|
|||
.PHONY: start build
|
||||
|
||||
APP = categraf
|
||||
VER = 0.1.0
|
||||
VER = 0.1.1
|
||||
|
||||
all: build
|
||||
|
||||
|
|
|
@ -7,98 +7,110 @@ import (
|
|||
"strings"
|
||||
|
||||
"flashcat.cloud/categraf/inputs"
|
||||
"flashcat.cloud/categraf/pkg/tagx"
|
||||
"github.com/toolkits/pkg/container/list"
|
||||
)
|
||||
|
||||
var slaveStatusQueries = [2]string{"SHOW ALL SLAVES STATUS", "SHOW SLAVE STATUS"}
|
||||
var slaveStatusQuerySuffixes = [3]string{" NONBLOCKING", " NOLOCK", ""}
|
||||
|
||||
func querySlaveStatus(db *sql.DB) (rows *sql.Rows, err error) {
|
||||
for _, query := range slaveStatusQueries {
|
||||
rows, err = db.Query(query)
|
||||
if err == nil {
|
||||
return rows, nil
|
||||
}
|
||||
|
||||
// Leverage lock-free SHOW SLAVE STATUS by guessing the right suffix
|
||||
for _, suffix := range slaveStatusQuerySuffixes {
|
||||
rows, err = db.Query(fmt.Sprint(query, suffix))
|
||||
if err == nil {
|
||||
return rows, nil
|
||||
}
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (m *MySQL) gatherSlaveStatus(slist *list.SafeList, ins *Instance, db *sql.DB, globalTags map[string]string) {
|
||||
if !ins.GatherSlaveStatus {
|
||||
return
|
||||
}
|
||||
|
||||
var (
|
||||
rows *sql.Rows
|
||||
err error
|
||||
)
|
||||
// Try the both syntax for MySQL/Percona and MariaDB
|
||||
for _, query := range slaveStatusQueries {
|
||||
rows, err = db.Query(query)
|
||||
if err != nil { // MySQL/Percona
|
||||
// Leverage lock-free SHOW SLAVE STATUS by guessing the right suffix
|
||||
for _, suffix := range slaveStatusQuerySuffixes {
|
||||
rows, err = db.Query(fmt.Sprint(query, suffix))
|
||||
if err == nil {
|
||||
break
|
||||
}
|
||||
}
|
||||
} else { // MariaDB
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
rows, err := querySlaveStatus(db)
|
||||
if err != nil {
|
||||
log.Println("E! failed to query slave status:", err)
|
||||
return
|
||||
}
|
||||
|
||||
if rows == nil {
|
||||
log.Println("E! failed to query slave status: rows is nil")
|
||||
return
|
||||
}
|
||||
|
||||
defer rows.Close()
|
||||
|
||||
var (
|
||||
tags = tagx.Copy(globalTags)
|
||||
fields = make(map[string]interface{})
|
||||
textItems = map[string]string{
|
||||
"master_host": "",
|
||||
"master_uuid": "",
|
||||
"channel_name": "",
|
||||
"connection_name": "",
|
||||
}
|
||||
)
|
||||
slaveCols, err := rows.Columns()
|
||||
if err != nil {
|
||||
log.Println("E! failed to get columns of slave rows:", err)
|
||||
return
|
||||
}
|
||||
|
||||
for rows.Next() {
|
||||
var key string
|
||||
var val sql.RawBytes
|
||||
// As the number of columns varies with mysqld versions,
|
||||
// and sql.Scan requires []interface{}, we need to create a
|
||||
// slice of pointers to the elements of slaveData.
|
||||
scanArgs := make([]interface{}, len(slaveCols))
|
||||
for i := range scanArgs {
|
||||
scanArgs[i] = &sql.RawBytes{}
|
||||
}
|
||||
|
||||
if err = rows.Scan(&key, &val); err != nil {
|
||||
if err := rows.Scan(scanArgs...); err != nil {
|
||||
continue
|
||||
}
|
||||
|
||||
// key to lower
|
||||
key = strings.ToLower(key)
|
||||
masterUUID := columnValue(scanArgs, slaveCols, "Master_UUID")
|
||||
masterHost := columnValue(scanArgs, slaveCols, "Master_Host")
|
||||
channelName := columnValue(scanArgs, slaveCols, "Channel_Name") // MySQL & Percona
|
||||
connectionName := columnValue(scanArgs, slaveCols, "Connection_name") // MariaDB
|
||||
|
||||
// collect some string fields
|
||||
if _, has := textItems[key]; has {
|
||||
textItems[key] = string(val)
|
||||
continue
|
||||
if connectionName != "" {
|
||||
channelName = connectionName
|
||||
}
|
||||
|
||||
// collect float fields
|
||||
if _, has := ins.validMetrics[key]; !has {
|
||||
continue
|
||||
if channelName == "" {
|
||||
channelName = "default"
|
||||
}
|
||||
|
||||
if floatVal, ok := parseStatus(val); ok {
|
||||
fields[key] = floatVal
|
||||
continue
|
||||
for i, col := range slaveCols {
|
||||
key := strings.ToLower(col)
|
||||
if _, has := ins.validMetrics[key]; !has {
|
||||
continue
|
||||
}
|
||||
|
||||
if value, ok := parseStatus(*scanArgs[i].(*sql.RawBytes)); ok {
|
||||
slist.PushFront(inputs.NewSample("slave_status_"+key, value, globalTags, map[string]string{
|
||||
"master_host": masterHost,
|
||||
"master_uuid": masterUUID,
|
||||
"channel_name": channelName,
|
||||
}))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if textItems["connection_name"] != "" {
|
||||
textItems["channel_name"] = textItems["connection_name"]
|
||||
}
|
||||
|
||||
// default channel name is empty
|
||||
if textItems["channel_name"] == "" {
|
||||
textItems["channel_name"] = "default"
|
||||
}
|
||||
|
||||
for k, v := range fields {
|
||||
slist.PushFront(inputs.NewSample("slave_status_"+k, v, tags, map[string]string{
|
||||
"master_host": textItems["master_host"],
|
||||
"master_uuid": textItems["master_uuid"],
|
||||
"channel_name": textItems["channel_name"],
|
||||
}))
|
||||
}
|
||||
}
|
||||
|
||||
func columnIndex(slaveCols []string, colName string) int {
|
||||
for idx := range slaveCols {
|
||||
if slaveCols[idx] == colName {
|
||||
return idx
|
||||
}
|
||||
}
|
||||
return -1
|
||||
}
|
||||
|
||||
func columnValue(scanArgs []interface{}, slaveCols []string, colName string) string {
|
||||
var columnIndex = columnIndex(slaveCols, colName)
|
||||
if columnIndex == -1 {
|
||||
return ""
|
||||
}
|
||||
return string(*scanArgs[columnIndex].(*sql.RawBytes))
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue