gather process list

This commit is contained in:
Ulric Qin 2022-04-27 11:42:59 +08:00
parent 20063541bb
commit 8fc15dc671
5 changed files with 229 additions and 0 deletions

View File

@ -11,6 +11,7 @@ password = "1234"
extra_status_metrics = true
extra_innodb_metrics = false
gather_process_list = true
# # timeout
# timeout_seconds = 3

View File

@ -31,6 +31,7 @@ type Instance struct {
ExtraStatusMetrics bool `toml:"extra_status_metrics"`
ExtraInnodbMetrics bool `toml:"extra_innodb_metrics"`
GatherProcessList bool `toml:"gather_process_list"`
validMetrics map[string]struct{}
dsn string
@ -233,4 +234,6 @@ func (m *MySQL) gatherOnce(slist *list.SafeList, ins *Instance) {
m.gatherEngineInnodbStatus(slist, ins, db, tags, cache)
m.gatherEngineInnodbStatusCompute(slist, ins, db, tags, cache)
m.gatherBinlog(slist, ins, db, tags, cache)
m.gatherProcesslist(slist, ins, db, tags, cache)
m.gatherProcesslistByUser(slist, ins, db, tags, cache)
}

177
inputs/mysql/processlist.go Normal file
View File

@ -0,0 +1,177 @@
package mysql
import (
"database/sql"
"log"
"strings"
"flashcat.cloud/categraf/inputs"
"flashcat.cloud/categraf/pkg/tagx"
"github.com/toolkits/pkg/container/list"
)
// These are const but can't be declared as such because golang doesn't allow const maps
var (
// status counter
generalThreadStates = map[string]uint32{
"after create": uint32(0),
"altering table": uint32(0),
"analyzing": uint32(0),
"checking permissions": uint32(0),
"checking table": uint32(0),
"cleaning up": uint32(0),
"closing tables": uint32(0),
"converting heap to myisam": uint32(0),
"copying to tmp table": uint32(0),
"creating sort index": uint32(0),
"creating table": uint32(0),
"creating tmp table": uint32(0),
"deleting": uint32(0),
"executing": uint32(0),
"execution of init_command": uint32(0),
"end": uint32(0),
"freeing items": uint32(0),
"flushing tables": uint32(0),
"fulltext initialization": uint32(0),
"idle": uint32(0),
"init": uint32(0),
"killed": uint32(0),
"waiting for lock": uint32(0),
"logging slow query": uint32(0),
"login": uint32(0),
"manage keys": uint32(0),
"opening tables": uint32(0),
"optimizing": uint32(0),
"preparing": uint32(0),
"reading from net": uint32(0),
"removing duplicates": uint32(0),
"removing tmp table": uint32(0),
"reopen tables": uint32(0),
"repair by sorting": uint32(0),
"repair done": uint32(0),
"repair with keycache": uint32(0),
"replication master": uint32(0),
"rolling back": uint32(0),
"searching rows for update": uint32(0),
"sending data": uint32(0),
"sorting for group": uint32(0),
"sorting for order": uint32(0),
"sorting index": uint32(0),
"sorting result": uint32(0),
"statistics": uint32(0),
"updating": uint32(0),
"waiting for tables": uint32(0),
"waiting for table flush": uint32(0),
"waiting on cond": uint32(0),
"writing to net": uint32(0),
"other": uint32(0),
}
// plaintext statuses
stateStatusMappings = map[string]string{
"user sleep": "idle",
"creating index": "altering table",
"committing alter table to storage engine": "altering table",
"discard or import tablespace": "altering table",
"rename": "altering table",
"setup": "altering table",
"renaming result table": "altering table",
"preparing for alter table": "altering table",
"copying to group table": "copying to tmp table",
"copy to tmp table": "copying to tmp table",
"query end": "end",
"update": "updating",
"updating main table": "updating",
"updating reference tables": "updating",
"system lock": "waiting for lock",
"user lock": "waiting for lock",
"table lock": "waiting for lock",
"deleting from main table": "deleting",
"deleting from reference tables": "deleting",
}
)
func (m *MySQL) gatherProcesslist(slist *list.SafeList, ins *Instance, db *sql.DB, globalTags map[string]string, cache map[string]float64) {
if !ins.GatherProcessList {
return
}
rows, err := db.Query(SQL_INFO_SCHEMA_PROCESSLIST)
if err != nil {
log.Println("E! failed to get processlist:", err)
return
}
defer rows.Close()
var (
command string
state string
count uint32
)
labels := tagx.Copy(globalTags)
// mapping of state with its counts
stateCounts := make(map[string]uint32, len(generalThreadStates))
// set map with keys and default values
for k, v := range generalThreadStates {
stateCounts[k] = v
}
for rows.Next() {
err = rows.Scan(&command, &state, &count)
if err != nil {
log.Println("W! failed to scan rows:", err)
return
}
// each state has its mapping
foundState := findThreadState(command, state)
// count each state
stateCounts[foundState] += count
}
for s, c := range stateCounts {
slist.PushFront(inputs.NewSample("process_list_threads", c, labels, map[string]string{"state": s}))
}
}
// findThreadState can be used to find thread state by command and plain state
func findThreadState(rawCommand, rawState string) string {
var (
// replace '_' symbol with space
command = strings.Replace(strings.ToLower(rawCommand), "_", " ", -1)
state = strings.Replace(strings.ToLower(rawState), "_", " ", -1)
)
// if the state is already valid, then return it
if _, ok := generalThreadStates[state]; ok {
return state
}
// if state is plain, return the mapping
if mappedState, ok := stateStatusMappings[state]; ok {
return mappedState
}
// if the state is any lock, return the special state
if strings.Contains(state, "waiting for") && strings.Contains(state, "lock") {
return "waiting for lock"
}
if command == "sleep" && state == "" {
return "idle"
}
if command == "query" {
return "executing"
}
if command == "binlog dump" {
return "replication master"
}
// if no mappings found and state is invalid, then return "other" state
return "other"
}
// newNamespace can be used to make a namespace
func newNamespace(words ...string) string {
return strings.Replace(strings.Join(words, "_"), " ", "_", -1)
}

View File

@ -0,0 +1,39 @@
package mysql
import (
"database/sql"
"log"
"flashcat.cloud/categraf/inputs"
"flashcat.cloud/categraf/pkg/tagx"
"github.com/toolkits/pkg/container/list"
)
func (m *MySQL) gatherProcesslistByUser(slist *list.SafeList, ins *Instance, db *sql.DB, globalTags map[string]string, cache map[string]float64) {
if !ins.GatherProcessList {
return
}
rows, err := db.Query(SQL_INFO_SCHEMA_PROCESSLIST_BY_USER)
if err != nil {
log.Println("E! failed to get processlist:", err)
return
}
defer rows.Close()
labels := tagx.Copy(globalTags)
for rows.Next() {
var user string
var connections int64
err = rows.Scan(&user, &connections)
if err != nil {
log.Println("E! failed to scan rows:", err)
return
}
slist.PushFront(inputs.NewSample("processlist_processes_by_user", connections, labels, map[string]string{"user": user}))
}
}

View File

@ -11,6 +11,15 @@ const (
SQL_ENGINE_INNODB_STATUS = `SHOW /*!50000 ENGINE*/ INNODB STATUS`
SQL_INFO_SCHEMA_PROCESSLIST = `
SELECT COALESCE(command,''),COALESCE(state,''),count(*)
FROM information_schema.processlist
WHERE ID != connection_id()
GROUP BY command,state
ORDER BY null`
SQL_INFO_SCHEMA_PROCESSLIST_BY_USER = `SELECT user, sum(1) AS connections FROM INFORMATION_SCHEMA.PROCESSLIST GROUP BY user`
SQL_95TH_PERCENTILE = `SELECT avg_us, ro as percentile FROM
(SELECT avg_us, @rownum := @rownum + 1 as ro FROM
(SELECT ROUND(avg_timer_wait / 1000000) as avg_us