categraf/inputs/mysql/processlist.go

173 lines
5.6 KiB
Go

package mysql
import (
"database/sql"
"log"
"strings"
"flashcat.cloud/categraf/inputs"
"flashcat.cloud/categraf/pkg/tagx"
"github.com/toolkits/pkg/container/list"
)
// These are const but can't be declared as such because golang doesn't allow const maps
var (
// status counter
generalThreadStates = map[string]uint32{
"after create": uint32(0),
"altering table": uint32(0),
"analyzing": uint32(0),
"checking permissions": uint32(0),
"checking table": uint32(0),
"cleaning up": uint32(0),
"closing tables": uint32(0),
"converting heap to myisam": uint32(0),
"copying to tmp table": uint32(0),
"creating sort index": uint32(0),
"creating table": uint32(0),
"creating tmp table": uint32(0),
"deleting": uint32(0),
"executing": uint32(0),
"execution of init_command": uint32(0),
"end": uint32(0),
"freeing items": uint32(0),
"flushing tables": uint32(0),
"fulltext initialization": uint32(0),
"idle": uint32(0),
"init": uint32(0),
"killed": uint32(0),
"waiting for lock": uint32(0),
"logging slow query": uint32(0),
"login": uint32(0),
"manage keys": uint32(0),
"opening tables": uint32(0),
"optimizing": uint32(0),
"preparing": uint32(0),
"reading from net": uint32(0),
"removing duplicates": uint32(0),
"removing tmp table": uint32(0),
"reopen tables": uint32(0),
"repair by sorting": uint32(0),
"repair done": uint32(0),
"repair with keycache": uint32(0),
"replication master": uint32(0),
"rolling back": uint32(0),
"searching rows for update": uint32(0),
"sending data": uint32(0),
"sorting for group": uint32(0),
"sorting for order": uint32(0),
"sorting index": uint32(0),
"sorting result": uint32(0),
"statistics": uint32(0),
"updating": uint32(0),
"waiting for tables": uint32(0),
"waiting for table flush": uint32(0),
"waiting on cond": uint32(0),
"writing to net": uint32(0),
"other": uint32(0),
}
// plaintext statuses
stateStatusMappings = map[string]string{
"user sleep": "idle",
"creating index": "altering table",
"committing alter table to storage engine": "altering table",
"discard or import tablespace": "altering table",
"rename": "altering table",
"setup": "altering table",
"renaming result table": "altering table",
"preparing for alter table": "altering table",
"copying to group table": "copying to tmp table",
"copy to tmp table": "copying to tmp table",
"query end": "end",
"update": "updating",
"updating main table": "updating",
"updating reference tables": "updating",
"system lock": "waiting for lock",
"user lock": "waiting for lock",
"table lock": "waiting for lock",
"deleting from main table": "deleting",
"deleting from reference tables": "deleting",
}
)
func (m *MySQL) gatherProcesslistByState(slist *list.SafeList, ins *Instance, db *sql.DB, globalTags map[string]string) {
if !ins.GatherProcessListProcessByState {
return
}
rows, err := db.Query(SQL_INFO_SCHEMA_PROCESSLIST)
if err != nil {
log.Println("E! failed to get processlist:", err)
return
}
defer rows.Close()
labels := tagx.Copy(globalTags)
// mapping of state with its counts
stateCounts := make(map[string]uint32, len(generalThreadStates))
// set map with keys and default values
for k, v := range generalThreadStates {
stateCounts[k] = v
}
for rows.Next() {
var (
command string
state string
count uint32
)
err = rows.Scan(&command, &state, &count)
if err != nil {
log.Println("W! failed to scan rows:", err)
return
}
// each state has its mapping
foundState := findThreadState(command, state)
// count each state
stateCounts[foundState] += count
}
for s, c := range stateCounts {
slist.PushFront(inputs.NewSample("processlist_processes_by_state", c, labels, map[string]string{"state": s}))
}
}
// findThreadState can be used to find thread state by command and plain state
func findThreadState(rawCommand, rawState string) string {
var (
// replace '_' symbol with space
command = strings.Replace(strings.ToLower(rawCommand), "_", " ", -1)
state = strings.Replace(strings.ToLower(rawState), "_", " ", -1)
)
// if the state is already valid, then return it
if _, ok := generalThreadStates[state]; ok {
return state
}
// if state is plain, return the mapping
if mappedState, ok := stateStatusMappings[state]; ok {
return mappedState
}
// if the state is any lock, return the special state
if strings.Contains(state, "waiting for") && strings.Contains(state, "lock") {
return "waiting for lock"
}
if command == "sleep" && state == "" {
return "idle"
}
if command == "query" {
return "executing"
}
if command == "binlog dump" {
return "replication master"
}
// if no mappings found and state is invalid, then return "other" state
return "other"
}