diff --git a/conf/input.mysql/mysql.toml b/conf/input.mysql/mysql.toml index f1a9240..749154c 100644 --- a/conf/input.mysql/mysql.toml +++ b/conf/input.mysql/mysql.toml @@ -34,4 +34,14 @@ labels = { instance="n9e-10.2.3.4:3306" } # tls_cert = "/etc/categraf/cert.pem" # tls_key = "/etc/categraf/key.pem" ## Use TLS but skip chain & host verification -# insecure_skip_verify = true \ No newline at end of file +# insecure_skip_verify = true + +[[instances.queries]] +mesurement = "users" +metric_fields = [ "total" ] +label_fields = [ "product" ] +# field_to_append = "" +timeout = "3s" +request = ''' +select 'n9e' as product, count(*) as total from n9e_v5.users +''' \ No newline at end of file diff --git a/inputs/mysql/custom_queries.go b/inputs/mysql/custom_queries.go new file mode 100644 index 0000000..37110bf --- /dev/null +++ b/inputs/mysql/custom_queries.go @@ -0,0 +1,116 @@ +package mysql + +import ( + "context" + "database/sql" + "fmt" + "log" + "strings" + "sync" + "time" + + "flashcat.cloud/categraf/inputs" + "flashcat.cloud/categraf/pkg/conv" + "flashcat.cloud/categraf/pkg/tagx" + "github.com/toolkits/pkg/container/list" +) + +func (m *MySQL) gatherCustomQueries(slist *list.SafeList, ins *Instance, db *sql.DB, globalTags map[string]string) { + wg := new(sync.WaitGroup) + defer wg.Wait() + + for i := 0; i < len(ins.Queries); i++ { + wg.Add(1) + go m.gatherOneQuery(slist, ins, db, globalTags, wg, ins.Queries[i]) + } +} + +func (m *MySQL) gatherOneQuery(slist *list.SafeList, ins *Instance, db *sql.DB, globalTags map[string]string, wg *sync.WaitGroup, query QueryConfig) { + defer wg.Done() + + timeout := time.Duration(query.Timeout) + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + + rows, err := db.QueryContext(ctx, query.Request) + if ctx.Err() == context.DeadlineExceeded { + log.Println("E! query timeout, request:", query.Request) + return + } + + if err != nil { + log.Println("E! failed to query:", err) + return + } + + defer rows.Close() + + cols, err := rows.Columns() + if err != nil { + log.Println("E! failed to get columns:", err) + return + } + + for rows.Next() { + columns := make([]interface{}, len(cols)) + columnPointers := make([]interface{}, len(cols)) + for i := range columns { + columnPointers[i] = &columns[i] + } + + // Scan the result into the column pointers... + if err := rows.Scan(columnPointers...); err != nil { + log.Println("E! failed to scan:", err) + return + } + + row := make(map[string]string) + for i, colName := range cols { + val := columnPointers[i].(*interface{}) + row[strings.ToLower(colName)] = fmt.Sprint(*val) + } + + if err = m.parseRow(row, query, slist, globalTags); err != nil { + log.Println("E! failed to parse row:", err, "sql:", query.Request) + } + } +} + +func (m *MySQL) parseRow(row map[string]string, query QueryConfig, slist *list.SafeList, globalTags map[string]string) error { + labels := tagx.Copy(globalTags) + + for _, label := range query.LabelFields { + labelValue, has := row[label] + if has { + labels[label] = strings.Replace(labelValue, " ", "_", -1) + } + } + + for _, column := range query.MetricFields { + value, err := conv.ToFloat64(row[column]) + if err != nil { + log.Println("E! failed to convert field:", column, "value:", value, "error:", err) + return err + } + + if query.FieldToAppend == "" { + slist.PushFront(inputs.NewSample(query.Mesurement+"_"+column, value, labels)) + } else { + suffix := cleanName(row[query.FieldToAppend]) + slist.PushFront(inputs.NewSample(query.Mesurement+"_"+suffix+"_"+column, value, labels)) + } + } + + return nil +} + +func cleanName(s string) string { + s = strings.Replace(s, " ", "_", -1) // Remove spaces + s = strings.Replace(s, "(", "", -1) // Remove open parenthesis + s = strings.Replace(s, ")", "", -1) // Remove close parenthesis + s = strings.Replace(s, "/", "", -1) // Remove forward slashes + s = strings.Replace(s, "*", "", -1) // Remove asterisks + s = strings.Replace(s, "%", "percent", -1) + s = strings.ToLower(s) + return s +} diff --git a/inputs/mysql/mysql.go b/inputs/mysql/mysql.go index 7a7142b..19bc45e 100644 --- a/inputs/mysql/mysql.go +++ b/inputs/mysql/mysql.go @@ -19,6 +19,15 @@ import ( const inputName = "mysql" +type QueryConfig struct { + Mesurement string `toml:"mesurement"` + LabelFields []string `toml:"label_fields"` + MetricFields []string `toml:"metric_fields"` + FieldToAppend string `toml:"field_to_append"` + Timeout config.Duration `toml:"timeout"` + Request string `toml:"request"` +} + type Instance struct { Address string `toml:"address"` Username string `toml:"username"` @@ -28,6 +37,7 @@ type Instance struct { Labels map[string]string `toml:"labels"` IntervalTimes int64 `toml:"interval_times"` + Queries []QueryConfig `toml:"queries"` ExtraStatusMetrics bool `toml:"extra_status_metrics"` ExtraInnodbMetrics bool `toml:"extra_innodb_metrics"` @@ -228,6 +238,7 @@ func (m *MySQL) gatherOnce(slist *list.SafeList, ins *Instance) { if err = db.Ping(); err != nil { slist.PushFront(inputs.NewSample("up", 0, tags)) log.Println("E! failed to ping mysql:", err) + return } slist.PushFront(inputs.NewSample("up", 1, tags)) @@ -245,4 +256,5 @@ func (m *MySQL) gatherOnce(slist *list.SafeList, ins *Instance) { m.gatherTableSize(slist, ins, db, tags, false) m.gatherTableSize(slist, ins, db, tags, true) m.gatherSlaveStatus(slist, ins, db, tags) + m.gatherCustomQueries(slist, ins, db, tags) } diff --git a/inputs/oracle/oracle.go b/inputs/oracle/oracle.go index 0b01c6c..5f5900c 100644 --- a/inputs/oracle/oracle.go +++ b/inputs/oracle/oracle.go @@ -36,7 +36,7 @@ type OrclInstance struct { type MetricConfig struct { Mesurement string `toml:"mesurement"` LabelFields []string `toml:"label_fields"` - MetricFields []string `toml:"metric_fields"` // column_name -> value type(float64, bool, int64) + MetricFields []string `toml:"metric_fields"` FieldToAppend string `toml:"field_to_append"` Timeout config.Duration `toml:"timeout"` Request string `toml:"request"`