add logger with monapi.plugins as telegraf.Logger interface (#522)
This commit is contained in:
parent
c5ba127b9e
commit
8204641656
|
@ -1 +1,7 @@
|
|||
set names utf8;
|
||||
use n9e_mon;
|
||||
|
||||
|
||||
alter table collect_rule change `last_updator` `updater` varchar(64) NOT NULL DEFAULT '' COMMENT 'updater';
|
||||
alter table collect_rule add `created_at` bigint NOT NULL DEFAULT 0;
|
||||
alter table collect_rule add `updated_at` bigint NOT NULL DEFAULT 0;
|
||||
|
|
|
@ -331,9 +331,10 @@ CREATE TABLE `collect_rule` (
|
|||
`data` blob NULL COMMENT 'data',
|
||||
`tags` varchar(512) NOT NULL DEFAULT '' COMMENT 'tags',
|
||||
`creator` varchar(64) NOT NULL DEFAULT '' COMMENT 'creator',
|
||||
`last_updator` varchar(64) NOT NULL DEFAULT '' COMMENT 'last_updator',
|
||||
`updater` varchar(64) NOT NULL DEFAULT '' COMMENT 'updater',
|
||||
`created` datetime NOT NULL COMMENT 'created',
|
||||
`last_updated` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
|
||||
`created_at` bigint not null default 0,
|
||||
`updated_at` bigint not null default 0,
|
||||
PRIMARY KEY (`id`),
|
||||
KEY `idx_nid` (`nid`),
|
||||
KEY `idx_collect_type` (`collect_type`)
|
||||
|
|
|
@ -130,7 +130,7 @@ func (a *AggrCalc) Update(cols ...string) error {
|
|||
return err
|
||||
}
|
||||
|
||||
err = saveHist(a.Id, "calc", "update", a.Creator, string(straByte), session)
|
||||
err = saveHistory(a.Id, "calc", "update", a.Creator, string(straByte), session)
|
||||
if err != nil {
|
||||
session.Rollback()
|
||||
return err
|
||||
|
@ -170,7 +170,7 @@ func AggrCalcDel(id int64) error {
|
|||
return err
|
||||
}
|
||||
|
||||
err = saveHist(obj.Id, "calc", "delete", obj.Creator, string(straByte), session)
|
||||
err = saveHistory(obj.Id, "calc", "delete", obj.Creator, string(straByte), session)
|
||||
if err != nil {
|
||||
session.Rollback()
|
||||
return err
|
||||
|
|
|
@ -304,7 +304,7 @@ func (p *PortCollect) Update() error {
|
|||
return err
|
||||
}
|
||||
|
||||
if err := saveHist(p.Id, "port", "update", p.Creator, string(portByte), session); err != nil {
|
||||
if err := saveHistory(p.Id, "port", "update", p.Creator, string(portByte), session); err != nil {
|
||||
session.Rollback()
|
||||
return err
|
||||
}
|
||||
|
@ -342,7 +342,7 @@ func (p *ProcCollect) Update() error {
|
|||
return err
|
||||
}
|
||||
|
||||
if err := saveHist(p.Id, "port", "update", p.Creator, string(b), session); err != nil {
|
||||
if err := saveHistory(p.Id, "port", "update", p.Creator, string(b), session); err != nil {
|
||||
session.Rollback()
|
||||
return err
|
||||
}
|
||||
|
@ -380,7 +380,7 @@ func (p *LogCollect) Update() error {
|
|||
return err
|
||||
}
|
||||
|
||||
if err := saveHist(p.Id, "log", "update", p.Creator, string(b), session); err != nil {
|
||||
if err := saveHistory(p.Id, "log", "update", p.Creator, string(b), session); err != nil {
|
||||
session.Rollback()
|
||||
return err
|
||||
}
|
||||
|
@ -438,7 +438,7 @@ func (p *PluginCollect) Update() error {
|
|||
return err
|
||||
}
|
||||
|
||||
if err := saveHist(p.Id, "plugin", "update", p.Creator, string(b), session); err != nil {
|
||||
if err := saveHistory(p.Id, "plugin", "update", p.Creator, string(b), session); err != nil {
|
||||
session.Rollback()
|
||||
return err
|
||||
}
|
||||
|
@ -513,7 +513,7 @@ func (a *ApiCollect) Update() error {
|
|||
return err
|
||||
}
|
||||
|
||||
if err := saveHist(a.Id, "api", "update", a.Creator, string(b), session); err != nil {
|
||||
if err := saveHistory(a.Id, "api", "update", a.Creator, string(b), session); err != nil {
|
||||
session.Rollback()
|
||||
return err
|
||||
}
|
||||
|
@ -545,7 +545,7 @@ func CreateCollect(collectType, creator string, collect interface{}) error {
|
|||
return err
|
||||
}
|
||||
|
||||
if err := saveHist(0, collectType, "create", creator, string(b), session); err != nil {
|
||||
if err := saveHistory(0, collectType, "create", creator, string(b), session); err != nil {
|
||||
session.Rollback()
|
||||
return err
|
||||
}
|
||||
|
@ -564,7 +564,7 @@ func DeleteCollectById(collectType, creator string, cid int64) error {
|
|||
return err
|
||||
}
|
||||
|
||||
if err := saveHist(cid, collectType, "delete", creator, strconv.FormatInt(cid, 10), session); err != nil {
|
||||
if err := saveHistory(cid, collectType, "delete", creator, strconv.FormatInt(cid, 10), session); err != nil {
|
||||
session.Rollback()
|
||||
return err
|
||||
}
|
||||
|
@ -572,7 +572,7 @@ func DeleteCollectById(collectType, creator string, cid int64) error {
|
|||
return session.Commit()
|
||||
}
|
||||
|
||||
func saveHist(id int64, tp string, action, username, body string, session *xorm.Session) error {
|
||||
func saveHistory(id int64, tp string, action, username, body string, session *xorm.Session) error {
|
||||
h := CollectHist{
|
||||
Cid: id,
|
||||
CollectType: tp,
|
||||
|
|
|
@ -3,9 +3,9 @@ package models
|
|||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/didi/nightingale/src/common/dataobj"
|
||||
"xorm.io/xorm"
|
||||
)
|
||||
|
||||
const (
|
||||
|
@ -24,9 +24,9 @@ type CollectRule struct {
|
|||
Data json.RawMessage `json:"data"`
|
||||
Tags string `json:"tags" description:"k1=v1,k2=v2,k3=v3,..."`
|
||||
Creator string `json:"creator" description:"just for output"`
|
||||
LastUpdator string `json:"last_updator" description:"just for output"`
|
||||
Created time.Time `json:"created" description:"just for output"`
|
||||
LastUpdated time.Time `json:"last_updated" description:"just for output"`
|
||||
Updater string `json:"updater" description:"just for output"`
|
||||
CreatedAt int64 `json:"created_at" description:"just for output"`
|
||||
UpdatedAt int64 `json:"updated_at" description:"just for output"`
|
||||
}
|
||||
|
||||
type validator interface {
|
||||
|
@ -51,25 +51,51 @@ func (p *CollectRule) Validate(v ...interface{}) error {
|
|||
}
|
||||
|
||||
if len(v) > 0 && v[0] != nil {
|
||||
if err := json.Unmarshal(p.Data, v[0]); err != nil {
|
||||
obj := v[0]
|
||||
if err := json.Unmarshal(p.Data, obj); err != nil {
|
||||
return err
|
||||
}
|
||||
if o, ok := v[0].(validator); ok {
|
||||
if o, ok := obj.(validator); ok {
|
||||
if err := o.Validate(); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
b, err := json.Marshal(obj)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
p.Data = json.RawMessage(b)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func GetCollectRules() ([]*CollectRule, error) {
|
||||
func DumpCollectRules() ([]*CollectRule, error) {
|
||||
rules := []*CollectRule{}
|
||||
err := DB["mon"].Find(&rules)
|
||||
return rules, err
|
||||
}
|
||||
|
||||
func GetCollectRules(typ string, nid int64, limit, offset int) (total int64, list []*CollectRule, err error) {
|
||||
search := func() *xorm.Session {
|
||||
session := DB["mon"].Where("1=1")
|
||||
if nid != 0 {
|
||||
session = session.And("nid=?", nid)
|
||||
}
|
||||
if typ != "" {
|
||||
return session.And("collect_type=?", typ)
|
||||
}
|
||||
return session
|
||||
}
|
||||
|
||||
if total, err = search().Count(new(CollectRule)); err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
err = search().Desc("created").Limit(limit, offset).Find(&list)
|
||||
return
|
||||
}
|
||||
|
||||
func (p *CollectRule) Update() error {
|
||||
session := DB["mon"].NewSession()
|
||||
defer session.Close()
|
||||
|
@ -90,7 +116,7 @@ func (p *CollectRule) Update() error {
|
|||
return err
|
||||
}
|
||||
|
||||
if err := saveHist(p.Id, p.CollectType, "update", p.Creator, string(b), session); err != nil {
|
||||
if err := saveHistory(p.Id, p.CollectType, "update", p.Creator, string(b), session); err != nil {
|
||||
session.Rollback()
|
||||
return err
|
||||
}
|
||||
|
|
|
@ -3,6 +3,7 @@ package collector
|
|||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/didi/nightingale/src/models"
|
||||
"github.com/influxdata/telegraf"
|
||||
|
@ -91,8 +92,11 @@ func (p BaseCollector) Create(data []byte, username string) error {
|
|||
return fmt.Errorf("permission deny")
|
||||
}
|
||||
|
||||
now := time.Now().Unix()
|
||||
collect.Creator = username
|
||||
collect.LastUpdator = username
|
||||
collect.CreatedAt = now
|
||||
collect.Updater = username
|
||||
collect.UpdatedAt = now
|
||||
|
||||
old, err := p.GetByNameAndNid(collect.Name, collect.Nid)
|
||||
if err != nil {
|
||||
|
@ -135,8 +139,8 @@ func (p BaseCollector) Update(data []byte, username string) error {
|
|||
return fmt.Errorf("采集不存在 type:%s id:%d", p.name, collect.Id)
|
||||
}
|
||||
|
||||
collect.Creator = username
|
||||
collect.LastUpdator = username
|
||||
collect.Updater = username
|
||||
collect.UpdatedAt = time.Now().Unix()
|
||||
|
||||
old, err := p.GetByNameAndNid(collect.Name, collect.Nid)
|
||||
if err != nil {
|
||||
|
|
|
@ -4,6 +4,7 @@ import (
|
|||
"encoding/json"
|
||||
"fmt"
|
||||
"reflect"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"unicode"
|
||||
|
@ -12,12 +13,13 @@ import (
|
|||
var fieldCache sync.Map // map[reflect.Type]structFields
|
||||
|
||||
type Field struct {
|
||||
skip bool `json:"-"`
|
||||
skip bool `json:"-"`
|
||||
def string `json:"-"`
|
||||
// definitions map[string][]Field `json:"-"`
|
||||
|
||||
Name string `json:"name,omitempty"`
|
||||
Label string `json:"label,omitempty"`
|
||||
Default string `json:"default,omitempty"`
|
||||
Default interface{} `json:"default,omitempty"`
|
||||
Example string `json:"example,omitempty"`
|
||||
Description string `json:"description,omitempty"`
|
||||
Required bool `json:"required,omitempty"`
|
||||
|
@ -137,7 +139,7 @@ func getTagOpt(sf reflect.StructField) (opt Field) {
|
|||
|
||||
opt.Name = name
|
||||
opt.Label = _s(sf.Tag.Get("label"))
|
||||
opt.Default = sf.Tag.Get("default")
|
||||
opt.def = sf.Tag.Get("default")
|
||||
opt.Example = sf.Tag.Get("example")
|
||||
opt.Description = _s(sf.Tag.Get("description"))
|
||||
|
||||
|
@ -189,15 +191,29 @@ func fieldType(t reflect.Type, in *Field, definitions map[string][]Field) {
|
|||
t = t.Elem()
|
||||
}
|
||||
|
||||
var def interface{}
|
||||
|
||||
switch t.Kind() {
|
||||
case reflect.Int, reflect.Int32, reflect.Int64, reflect.Uint, reflect.Uint32, reflect.Uint64:
|
||||
in.Type = "integer"
|
||||
if in.def != "" {
|
||||
def, _ = strconv.ParseInt(in.def, 10, 64)
|
||||
}
|
||||
case reflect.Float32, reflect.Float64:
|
||||
in.Type = "float"
|
||||
if in.def != "" {
|
||||
def, _ = strconv.ParseFloat(in.def, 64)
|
||||
}
|
||||
case reflect.Bool:
|
||||
in.Type = "boolean"
|
||||
if in.def != "" {
|
||||
def = in.def == "true"
|
||||
}
|
||||
case reflect.String:
|
||||
in.Type = "string"
|
||||
if in.def != "" {
|
||||
def = in.def
|
||||
}
|
||||
case reflect.Struct:
|
||||
name := t.String()
|
||||
if _, ok := definitions[name]; !ok {
|
||||
|
@ -222,8 +238,17 @@ func fieldType(t reflect.Type, in *Field, definitions map[string][]Field) {
|
|||
} else {
|
||||
panic(fmt.Sprintf("unspport type %s items %s", t.String(), t2.String()))
|
||||
}
|
||||
if t2.Kind() == reflect.String && in.def != "" {
|
||||
var s []string
|
||||
json.Unmarshal([]byte(in.def), &s)
|
||||
def = s
|
||||
}
|
||||
default:
|
||||
panic(fmt.Sprintf("unspport type %s", t.String()))
|
||||
// in.Type = "string"
|
||||
}
|
||||
|
||||
if def != nil {
|
||||
in.Default = def
|
||||
}
|
||||
}
|
||||
|
|
|
@ -20,6 +20,11 @@ func Config(r *gin.Engine) {
|
|||
sys.GET("/addr", addr)
|
||||
}
|
||||
|
||||
generic := r.Group("/api/mon").Use(GetCookieUser())
|
||||
{
|
||||
generic.GET("/regions", func(c *gin.Context) { renderData(c, config.Get().Region, nil) })
|
||||
}
|
||||
|
||||
node := r.Group("/api/mon/node").Use(GetCookieUser())
|
||||
{
|
||||
node.GET("/:id/maskconf", maskconfGets)
|
||||
|
@ -104,7 +109,7 @@ func Config(r *gin.Engine) {
|
|||
collectRules := r.Group("/api/mon/collect-rules").Use(GetCookieUser())
|
||||
{
|
||||
collectRules.POST("", collectRulePost) // create a collect rule
|
||||
collectRules.GET("/list", collectRulesGet) // get collect rules
|
||||
collectRules.GET("/list", collectRulesGetV2) // get collect rules
|
||||
collectRules.GET("", collectRuleGet) // get collect rule by type & id
|
||||
collectRules.PUT("", collectRulePut) // update collect rule by type & id
|
||||
collectRules.DELETE("", collectsRuleDel) // delete collect rules by type & ids
|
||||
|
|
|
@ -3,9 +3,11 @@ package http
|
|||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"log"
|
||||
"regexp"
|
||||
"strings"
|
||||
|
||||
"github.com/didi/nightingale/src/models"
|
||||
"github.com/didi/nightingale/src/modules/monapi/collector"
|
||||
"github.com/didi/nightingale/src/modules/monapi/scache"
|
||||
|
||||
|
@ -83,6 +85,20 @@ func collectRulesGet(c *gin.Context) {
|
|||
renderData(c, resp, nil)
|
||||
}
|
||||
|
||||
func collectRulesGetV2(c *gin.Context) {
|
||||
nid := queryInt64(c, "nid", 0)
|
||||
limit := queryInt(c, "limit", 20)
|
||||
typ := queryStr(c, "type", "")
|
||||
|
||||
log.Printf("typ %s", typ)
|
||||
total, list, err := models.GetCollectRules(typ, nid, limit, offset(c, limit, 0))
|
||||
|
||||
renderData(c, map[string]interface{}{
|
||||
"total": total,
|
||||
"list": list,
|
||||
}, err)
|
||||
}
|
||||
|
||||
func collectRulePut(c *gin.Context) {
|
||||
var recv CollectRecv
|
||||
errors.Dangerous(c.ShouldBind(&recv))
|
||||
|
|
|
@ -2,7 +2,7 @@ package all
|
|||
|
||||
import (
|
||||
// remote
|
||||
_ "github.com/didi/nightingale/src/modules/monapi/plugins/api"
|
||||
// _ "github.com/didi/nightingale/src/modules/monapi/plugins/api"
|
||||
// telegraf style
|
||||
_ "github.com/didi/nightingale/src/modules/monapi/plugins/mongodb"
|
||||
_ "github.com/didi/nightingale/src/modules/monapi/plugins/mysql"
|
||||
|
|
|
@ -4,6 +4,7 @@ import (
|
|||
"fmt"
|
||||
|
||||
"github.com/didi/nightingale/src/modules/monapi/collector"
|
||||
"github.com/didi/nightingale/src/modules/monapi/plugins"
|
||||
"github.com/didi/nightingale/src/modules/monapi/plugins/mongodb/mongodb"
|
||||
"github.com/didi/nightingale/src/toolkits/i18n"
|
||||
"github.com/influxdata/telegraf"
|
||||
|
@ -72,5 +73,6 @@ func (p *MongodbRule) TelegrafInput() (telegraf.Input, error) {
|
|||
GatherPerdbStats: p.GatherPerdbStats,
|
||||
GatherColStats: p.GatherColStats,
|
||||
ColStatsDbs: p.ColStatsDbs,
|
||||
Log: plugins.GetLogger(),
|
||||
}, nil
|
||||
}
|
||||
|
|
|
@ -12,7 +12,6 @@ import (
|
|||
|
||||
"github.com/influxdata/telegraf"
|
||||
tlsint "github.com/influxdata/telegraf/plugins/common/tls"
|
||||
"github.com/influxdata/telegraf/plugins/inputs"
|
||||
"gopkg.in/mgo.v2"
|
||||
)
|
||||
|
||||
|
@ -186,6 +185,7 @@ func (m *MongoDB) gatherServer(server *Server, acc telegraf.Accumulator) error {
|
|||
return server.gatherData(acc, m.GatherClusterStatus, m.GatherPerdbStats, m.GatherColStats, m.ColStatsDbs)
|
||||
}
|
||||
|
||||
/*
|
||||
func init() {
|
||||
inputs.Add("mongodb", func() telegraf.Input {
|
||||
return &MongoDB{
|
||||
|
@ -197,3 +197,4 @@ func init() {
|
|||
}
|
||||
})
|
||||
}
|
||||
*/
|
||||
|
|
|
@ -4,6 +4,7 @@ import (
|
|||
"fmt"
|
||||
|
||||
"github.com/didi/nightingale/src/modules/monapi/collector"
|
||||
"github.com/didi/nightingale/src/modules/monapi/plugins"
|
||||
"github.com/didi/nightingale/src/toolkits/i18n"
|
||||
"github.com/influxdata/telegraf"
|
||||
"github.com/influxdata/telegraf/plugins/inputs/mysql"
|
||||
|
@ -64,10 +65,10 @@ var (
|
|||
)
|
||||
|
||||
type MysqlRule struct {
|
||||
Servers []string `label:"Servers" json:"servers,required" description:"specify servers via a url matching\n[username[:password]@][protocol[(address)]]/[?tls=[true|false|skip-verify|custom]]\nsee https://github.com/go-sql-driver/mysql#dsn-data-source-name" example:"servers = ['user:passwd@tcp(127.0.0.1:3306)/?tls=false']\nservers = ["user@tcp(127.0.0.1:3306)/?tls=false"]"`
|
||||
PerfEventsStatementsDigestTextLimit int64 `label:"-" json:"-"`
|
||||
PerfEventsStatementsLimit int64 `label:"-" json:"-"`
|
||||
PerfEventsStatementsTimeLimit int64 `label:"-" json:"-"`
|
||||
Servers []string `label:"Servers" json:"servers,required" description:"specify servers via a url matching\n[username[:password]@][protocol[(address)]]/[?tls=[true|false|skip-verify|custom]]\nsee https://github.com/go-sql-driver/mysql#dsn-data-source-name" example:"user:passwd@tcp(127.0.0.1:3306)/?tls=false"`
|
||||
PerfEventsStatementsDigestTextLimit int64 `label:"Perf Events Statements Digest Text Limit" json:"perf_events_statements_digest_text_limit" default:"120" description:"the limits for metrics form perf_events_statements"`
|
||||
PerfEventsStatementsLimit int64 `label:"Perf Events Statements Limit" json:"perf_events_statements_limit" default:"250" description:"the limits for metrics form perf_events_statements"`
|
||||
PerfEventsStatementsTimeLimit int64 `label:"Perf Events Statements Timelimit" json:"perf_events_statements_time_limit" default:"86400" description:"the limits for metrics form perf_events_statements"`
|
||||
TableSchemaDatabases []string `label:"Databases" json:"table_schema_databases" description:"if the list is empty, then metrics are gathered from all database tables"`
|
||||
GatherProcessList bool `label:"Process List" json:"gather_process_list" description:"gather thread state counts from INFORMATION_SCHEMA.PROCESSLIST"`
|
||||
GatherUserStatistics bool `label:"User Statistics" json:"gather_user_statistics" description:"gather user statistics from INFORMATION_SCHEMA.USER_STATISTICS"`
|
||||
|
@ -82,8 +83,8 @@ type MysqlRule struct {
|
|||
GatherTableSchema bool `label:"Tables" json:"gather_table_schema" description:"gather metrics from INFORMATION_SCHEMA.TABLES for databases provided above list"`
|
||||
GatherFileEventsStats bool `label:"File Events Stats" json:"gather_file_events_stats" description:"gather metrics from PERFORMANCE_SCHEMA.FILE_SUMMARY_BY_EVENT_NAME"`
|
||||
GatherPerfEventsStatements bool `label:"Perf Events Statements" json:"gather_perf_events_statements" description:"gather metrics from PERFORMANCE_SCHEMA.EVENTS_STATEMENTS_SUMMARY_BY_DIGEST"`
|
||||
GatherGlobalVars bool `label:"-" json:"-"`
|
||||
IntervalSlow string `label:"Interval Slow" json:"interval_slow" desc:"Some queries we may want to run less often (such as SHOW GLOBAL VARIABLES)" example:"interval_slow = '30m'" json:"-"`
|
||||
GatherGlobalVars bool `label:"Global Vars" json:"gather_global_variables" description:"gather metrics from PERFORMANCE_SCHEMA.GLOBAL_VARIABLES" default:"true"`
|
||||
IntervalSlow string `label:"Interval Slow" json:"interval_slow" desc:"Some queries we may want to run less often (such as SHOW GLOBAL VARIABLES)" example:"30m"`
|
||||
MetricVersion int `label:"-" json:"-"`
|
||||
}
|
||||
|
||||
|
@ -91,6 +92,15 @@ func (p *MysqlRule) Validate() error {
|
|||
if len(p.Servers) == 0 || p.Servers[0] == "" {
|
||||
return fmt.Errorf("mysql.rule.servers must be set")
|
||||
}
|
||||
if p.PerfEventsStatementsDigestTextLimit == 0 {
|
||||
p.PerfEventsStatementsDigestTextLimit = 120
|
||||
}
|
||||
if p.PerfEventsStatementsLimit == 0 {
|
||||
p.PerfEventsStatementsLimit = 250
|
||||
}
|
||||
if p.PerfEventsStatementsTimeLimit == 0 {
|
||||
p.PerfEventsStatementsTimeLimit = 86400
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -101,9 +111,9 @@ func (p *MysqlRule) TelegrafInput() (telegraf.Input, error) {
|
|||
|
||||
return &mysql.Mysql{
|
||||
Servers: p.Servers,
|
||||
PerfEventsStatementsDigestTextLimit: 120,
|
||||
PerfEventsStatementsLimit: 250,
|
||||
PerfEventsStatementsTimeLimit: 86400,
|
||||
PerfEventsStatementsDigestTextLimit: p.PerfEventsStatementsDigestTextLimit,
|
||||
PerfEventsStatementsLimit: p.PerfEventsStatementsLimit,
|
||||
PerfEventsStatementsTimeLimit: p.PerfEventsStatementsTimeLimit,
|
||||
TableSchemaDatabases: p.TableSchemaDatabases,
|
||||
GatherProcessList: p.GatherProcessList,
|
||||
GatherUserStatistics: p.GatherUserStatistics,
|
||||
|
@ -118,8 +128,9 @@ func (p *MysqlRule) TelegrafInput() (telegraf.Input, error) {
|
|||
GatherTableSchema: p.GatherTableSchema,
|
||||
GatherFileEventsStats: p.GatherFileEventsStats,
|
||||
GatherPerfEventsStatements: p.GatherPerfEventsStatements,
|
||||
GatherGlobalVars: true,
|
||||
IntervalSlow: "0m",
|
||||
GatherGlobalVars: p.GatherGlobalVars,
|
||||
IntervalSlow: "",
|
||||
MetricVersion: 2,
|
||||
Log: plugins.GetLogger(),
|
||||
}, nil
|
||||
}
|
||||
|
|
|
@ -4,6 +4,7 @@ import (
|
|||
"fmt"
|
||||
|
||||
"github.com/didi/nightingale/src/modules/monapi/collector"
|
||||
"github.com/didi/nightingale/src/modules/monapi/plugins"
|
||||
"github.com/didi/nightingale/src/toolkits/i18n"
|
||||
"github.com/influxdata/telegraf"
|
||||
"github.com/influxdata/telegraf/plugins/inputs/redis"
|
||||
|
@ -91,5 +92,6 @@ func (p *RedisRule) TelegrafInput() (telegraf.Input, error) {
|
|||
Servers: p.Servers,
|
||||
Commands: commands,
|
||||
Password: p.Password,
|
||||
Log: plugins.GetLogger(),
|
||||
}, nil
|
||||
}
|
||||
|
|
|
@ -0,0 +1,41 @@
|
|||
package plugins
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"github.com/toolkits/pkg/logger"
|
||||
)
|
||||
|
||||
var defaultLogger = Logger{}
|
||||
|
||||
func GetLogger() *Logger {
|
||||
return &defaultLogger
|
||||
}
|
||||
|
||||
// telegraf.Logger
|
||||
type Logger struct{}
|
||||
|
||||
func (l *Logger) Errorf(format string, args ...interface{}) {
|
||||
logger.LogDepth(logger.ERROR, 1, format, args...)
|
||||
}
|
||||
func (l *Logger) Error(args ...interface{}) {
|
||||
logger.LogDepth(logger.ERROR, 1, fmt.Sprint(args...))
|
||||
}
|
||||
func (l *Logger) Debugf(format string, args ...interface{}) {
|
||||
logger.LogDepth(logger.DEBUG, 1, format, args...)
|
||||
}
|
||||
func (l *Logger) Debug(args ...interface{}) {
|
||||
logger.LogDepth(logger.DEBUG, 1, fmt.Sprint(args...))
|
||||
}
|
||||
func (l *Logger) Warnf(format string, args ...interface{}) {
|
||||
logger.LogDepth(logger.WARNING, 1, format, args...)
|
||||
}
|
||||
func (l *Logger) Warn(args ...interface{}) {
|
||||
logger.LogDepth(logger.WARNING, 1, fmt.Sprint(args...))
|
||||
}
|
||||
func (l *Logger) Infof(format string, args ...interface{}) {
|
||||
logger.LogDepth(logger.INFO, 1, format, args...)
|
||||
}
|
||||
func (l *Logger) Info(args ...interface{}) {
|
||||
logger.LogDepth(logger.INFO, 1, fmt.Sprint(args...))
|
||||
}
|
|
@ -107,7 +107,7 @@ func str(in interface{}) string {
|
|||
}
|
||||
|
||||
func (p *collectRuleCache) syncCollectRules() {
|
||||
rules, err := models.GetCollectRules()
|
||||
rules, err := models.DumpCollectRules()
|
||||
if err != nil {
|
||||
logger.Warningf("get log collectRules err:%v", err)
|
||||
}
|
||||
|
|
|
@ -22,6 +22,7 @@ type collectRule struct {
|
|||
tags map[string]string
|
||||
precision time.Duration
|
||||
metrics []*dataobj.MetricValue
|
||||
lastAt int64
|
||||
}
|
||||
|
||||
func newCollectRule(rule *models.CollectRule) (*collectRule, error) {
|
||||
|
@ -120,7 +121,7 @@ func (p *collectRule) prepareMetrics() error {
|
|||
}
|
||||
|
||||
func (p *collectRule) update(rule *models.CollectRule) error {
|
||||
if p.CollectRule.LastUpdated == rule.LastUpdated {
|
||||
if p.CollectRule.UpdatedAt == rule.UpdatedAt {
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -257,7 +258,7 @@ func (p *collectRule) AddError(err error) {
|
|||
if err == nil {
|
||||
return
|
||||
}
|
||||
logger.Debugf("Error in plugin: %v", err)
|
||||
logger.Debugf("collectRule %s.%s(%d) Error: %s", p.CollectType, p.Name, p.Id, err)
|
||||
}
|
||||
|
||||
func (p *collectRule) SetPrecision(precision time.Duration) {
|
||||
|
|
|
@ -1,8 +1,8 @@
|
|||
package manager
|
||||
|
||||
type ruleSummary struct {
|
||||
id int64 // collect rule id
|
||||
executeAt int64
|
||||
id int64 // collect rule id
|
||||
activeAt int64
|
||||
}
|
||||
|
||||
type ruleSummaryHeap []*ruleSummary
|
||||
|
@ -12,7 +12,7 @@ func (h ruleSummaryHeap) Len() int {
|
|||
}
|
||||
|
||||
func (h ruleSummaryHeap) Less(i, j int) bool {
|
||||
return h[i].executeAt < h[j].executeAt
|
||||
return h[i].activeAt < h[j].activeAt
|
||||
}
|
||||
|
||||
func (h ruleSummaryHeap) Swap(i, j int) {
|
||||
|
@ -30,5 +30,5 @@ func (h *ruleSummaryHeap) Pop() interface{} {
|
|||
}
|
||||
|
||||
func (h *ruleSummaryHeap) Top() *ruleSummary {
|
||||
return (*h)[len(*h)-1]
|
||||
return (*h)[0]
|
||||
}
|
||||
|
|
|
@ -82,7 +82,7 @@ func (p *manager) schedule() error {
|
|||
if p.heap.Len() == 0 {
|
||||
return nil
|
||||
}
|
||||
if p.heap.Top().executeAt > now {
|
||||
if p.heap.Top().activeAt > now {
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -108,7 +108,12 @@ func (p *manager) schedule() error {
|
|||
|
||||
p.collectRuleCh <- rule
|
||||
|
||||
summary.executeAt = now + int64(ruleConfig.Step)
|
||||
logger.Debugf("%s %s %d lastAt %ds before nextAt %ds later",
|
||||
rule.CollectType, rule.Name, rule.Id,
|
||||
now-rule.lastAt, ruleConfig.Step)
|
||||
|
||||
summary.activeAt = now + int64(ruleConfig.Step)
|
||||
rule.lastAt = now
|
||||
heap.Push(&p.heap, summary)
|
||||
|
||||
continue
|
||||
|
@ -134,8 +139,8 @@ func (p *manager) AddRule(rule *models.CollectRule) error {
|
|||
|
||||
p.index[rule.Id] = ruleEntity
|
||||
heap.Push(&p.heap, &ruleSummary{
|
||||
id: rule.Id,
|
||||
executeAt: time.Now().Unix() + int64(rule.Step),
|
||||
id: rule.Id,
|
||||
activeAt: time.Now().Unix() + int64(rule.Step),
|
||||
})
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -73,6 +73,7 @@ func main() {
|
|||
gin.SetMode(gin.ReleaseMode)
|
||||
}
|
||||
|
||||
// for manager -> core.Push()
|
||||
core.InitRpcClients()
|
||||
|
||||
manager.NewManager(cfg, cache.CollectRule).Start(ctx)
|
||||
|
@ -120,6 +121,7 @@ func start() {
|
|||
func ending(cancel context.CancelFunc) {
|
||||
c := make(chan os.Signal, 1)
|
||||
signal.Notify(c, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
|
||||
|
||||
select {
|
||||
case <-c:
|
||||
fmt.Printf("stop signal caught, stopping... pid=%d\n", os.Getpid())
|
||||
|
|
Loading…
Reference in New Issue