diff --git a/sql/n9e_mon-patch.sql b/sql/n9e_mon-patch.sql index 8b137891..0026a55e 100644 --- a/sql/n9e_mon-patch.sql +++ b/sql/n9e_mon-patch.sql @@ -1 +1,7 @@ +set names utf8; +use n9e_mon; + +alter table collect_rule change `last_updator` `updater` varchar(64) NOT NULL DEFAULT '' COMMENT 'updater'; +alter table collect_rule add `created_at` bigint NOT NULL DEFAULT 0; +alter table collect_rule add `updated_at` bigint NOT NULL DEFAULT 0; diff --git a/sql/n9e_mon.sql b/sql/n9e_mon.sql index 7586fe49..b38534c8 100644 --- a/sql/n9e_mon.sql +++ b/sql/n9e_mon.sql @@ -331,9 +331,10 @@ CREATE TABLE `collect_rule` ( `data` blob NULL COMMENT 'data', `tags` varchar(512) NOT NULL DEFAULT '' COMMENT 'tags', `creator` varchar(64) NOT NULL DEFAULT '' COMMENT 'creator', - `last_updator` varchar(64) NOT NULL DEFAULT '' COMMENT 'last_updator', + `updater` varchar(64) NOT NULL DEFAULT '' COMMENT 'updater', `created` datetime NOT NULL COMMENT 'created', - `last_updated` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, + `created_at` bigint not null default 0, + `updated_at` bigint not null default 0, PRIMARY KEY (`id`), KEY `idx_nid` (`nid`), KEY `idx_collect_type` (`collect_type`) diff --git a/src/models/mon_aggr.go b/src/models/mon_aggr.go index bbdeaa62..268b71de 100644 --- a/src/models/mon_aggr.go +++ b/src/models/mon_aggr.go @@ -130,7 +130,7 @@ func (a *AggrCalc) Update(cols ...string) error { return err } - err = saveHist(a.Id, "calc", "update", a.Creator, string(straByte), session) + err = saveHistory(a.Id, "calc", "update", a.Creator, string(straByte), session) if err != nil { session.Rollback() return err @@ -170,7 +170,7 @@ func AggrCalcDel(id int64) error { return err } - err = saveHist(obj.Id, "calc", "delete", obj.Creator, string(straByte), session) + err = saveHistory(obj.Id, "calc", "delete", obj.Creator, string(straByte), session) if err != nil { session.Rollback() return err diff --git a/src/models/mon_collect.go b/src/models/mon_collect.go index 3fead87f..85aaa265 100644 --- a/src/models/mon_collect.go +++ b/src/models/mon_collect.go @@ -304,7 +304,7 @@ func (p *PortCollect) Update() error { return err } - if err := saveHist(p.Id, "port", "update", p.Creator, string(portByte), session); err != nil { + if err := saveHistory(p.Id, "port", "update", p.Creator, string(portByte), session); err != nil { session.Rollback() return err } @@ -342,7 +342,7 @@ func (p *ProcCollect) Update() error { return err } - if err := saveHist(p.Id, "port", "update", p.Creator, string(b), session); err != nil { + if err := saveHistory(p.Id, "port", "update", p.Creator, string(b), session); err != nil { session.Rollback() return err } @@ -380,7 +380,7 @@ func (p *LogCollect) Update() error { return err } - if err := saveHist(p.Id, "log", "update", p.Creator, string(b), session); err != nil { + if err := saveHistory(p.Id, "log", "update", p.Creator, string(b), session); err != nil { session.Rollback() return err } @@ -438,7 +438,7 @@ func (p *PluginCollect) Update() error { return err } - if err := saveHist(p.Id, "plugin", "update", p.Creator, string(b), session); err != nil { + if err := saveHistory(p.Id, "plugin", "update", p.Creator, string(b), session); err != nil { session.Rollback() return err } @@ -513,7 +513,7 @@ func (a *ApiCollect) Update() error { return err } - if err := saveHist(a.Id, "api", "update", a.Creator, string(b), session); err != nil { + if err := saveHistory(a.Id, "api", "update", a.Creator, string(b), session); err != nil { session.Rollback() return err } @@ -545,7 +545,7 @@ func CreateCollect(collectType, creator string, collect interface{}) error { return err } - if err := saveHist(0, collectType, "create", creator, string(b), session); err != nil { + if err := saveHistory(0, collectType, "create", creator, string(b), session); err != nil { session.Rollback() return err } @@ -564,7 +564,7 @@ func DeleteCollectById(collectType, creator string, cid int64) error { return err } - if err := saveHist(cid, collectType, "delete", creator, strconv.FormatInt(cid, 10), session); err != nil { + if err := saveHistory(cid, collectType, "delete", creator, strconv.FormatInt(cid, 10), session); err != nil { session.Rollback() return err } @@ -572,7 +572,7 @@ func DeleteCollectById(collectType, creator string, cid int64) error { return session.Commit() } -func saveHist(id int64, tp string, action, username, body string, session *xorm.Session) error { +func saveHistory(id int64, tp string, action, username, body string, session *xorm.Session) error { h := CollectHist{ Cid: id, CollectType: tp, diff --git a/src/models/mon_collect_rule.go b/src/models/mon_collect_rule.go index 0ca5ee89..65334dbc 100644 --- a/src/models/mon_collect_rule.go +++ b/src/models/mon_collect_rule.go @@ -3,9 +3,9 @@ package models import ( "encoding/json" "fmt" - "time" "github.com/didi/nightingale/src/common/dataobj" + "xorm.io/xorm" ) const ( @@ -24,9 +24,9 @@ type CollectRule struct { Data json.RawMessage `json:"data"` Tags string `json:"tags" description:"k1=v1,k2=v2,k3=v3,..."` Creator string `json:"creator" description:"just for output"` - LastUpdator string `json:"last_updator" description:"just for output"` - Created time.Time `json:"created" description:"just for output"` - LastUpdated time.Time `json:"last_updated" description:"just for output"` + Updater string `json:"updater" description:"just for output"` + CreatedAt int64 `json:"created_at" description:"just for output"` + UpdatedAt int64 `json:"updated_at" description:"just for output"` } type validator interface { @@ -51,25 +51,51 @@ func (p *CollectRule) Validate(v ...interface{}) error { } if len(v) > 0 && v[0] != nil { - if err := json.Unmarshal(p.Data, v[0]); err != nil { + obj := v[0] + if err := json.Unmarshal(p.Data, obj); err != nil { return err } - if o, ok := v[0].(validator); ok { + if o, ok := obj.(validator); ok { if err := o.Validate(); err != nil { return err } } + b, err := json.Marshal(obj) + if err != nil { + return err + } + p.Data = json.RawMessage(b) } return nil } -func GetCollectRules() ([]*CollectRule, error) { +func DumpCollectRules() ([]*CollectRule, error) { rules := []*CollectRule{} err := DB["mon"].Find(&rules) return rules, err } +func GetCollectRules(typ string, nid int64, limit, offset int) (total int64, list []*CollectRule, err error) { + search := func() *xorm.Session { + session := DB["mon"].Where("1=1") + if nid != 0 { + session = session.And("nid=?", nid) + } + if typ != "" { + return session.And("collect_type=?", typ) + } + return session + } + + if total, err = search().Count(new(CollectRule)); err != nil { + return + } + + err = search().Desc("created").Limit(limit, offset).Find(&list) + return +} + func (p *CollectRule) Update() error { session := DB["mon"].NewSession() defer session.Close() @@ -90,7 +116,7 @@ func (p *CollectRule) Update() error { return err } - if err := saveHist(p.Id, p.CollectType, "update", p.Creator, string(b), session); err != nil { + if err := saveHistory(p.Id, p.CollectType, "update", p.Creator, string(b), session); err != nil { session.Rollback() return err } diff --git a/src/modules/monapi/collector/basecollector.go b/src/modules/monapi/collector/basecollector.go index 6618eeb0..37018638 100644 --- a/src/modules/monapi/collector/basecollector.go +++ b/src/modules/monapi/collector/basecollector.go @@ -3,6 +3,7 @@ package collector import ( "encoding/json" "fmt" + "time" "github.com/didi/nightingale/src/models" "github.com/influxdata/telegraf" @@ -91,8 +92,11 @@ func (p BaseCollector) Create(data []byte, username string) error { return fmt.Errorf("permission deny") } + now := time.Now().Unix() collect.Creator = username - collect.LastUpdator = username + collect.CreatedAt = now + collect.Updater = username + collect.UpdatedAt = now old, err := p.GetByNameAndNid(collect.Name, collect.Nid) if err != nil { @@ -135,8 +139,8 @@ func (p BaseCollector) Update(data []byte, username string) error { return fmt.Errorf("采集不存在 type:%s id:%d", p.name, collect.Id) } - collect.Creator = username - collect.LastUpdator = username + collect.Updater = username + collect.UpdatedAt = time.Now().Unix() old, err := p.GetByNameAndNid(collect.Name, collect.Nid) if err != nil { diff --git a/src/modules/monapi/collector/template.go b/src/modules/monapi/collector/template.go index 2a1a6cf7..f8e8c314 100644 --- a/src/modules/monapi/collector/template.go +++ b/src/modules/monapi/collector/template.go @@ -4,6 +4,7 @@ import ( "encoding/json" "fmt" "reflect" + "strconv" "strings" "sync" "unicode" @@ -12,12 +13,13 @@ import ( var fieldCache sync.Map // map[reflect.Type]structFields type Field struct { - skip bool `json:"-"` + skip bool `json:"-"` + def string `json:"-"` // definitions map[string][]Field `json:"-"` Name string `json:"name,omitempty"` Label string `json:"label,omitempty"` - Default string `json:"default,omitempty"` + Default interface{} `json:"default,omitempty"` Example string `json:"example,omitempty"` Description string `json:"description,omitempty"` Required bool `json:"required,omitempty"` @@ -137,7 +139,7 @@ func getTagOpt(sf reflect.StructField) (opt Field) { opt.Name = name opt.Label = _s(sf.Tag.Get("label")) - opt.Default = sf.Tag.Get("default") + opt.def = sf.Tag.Get("default") opt.Example = sf.Tag.Get("example") opt.Description = _s(sf.Tag.Get("description")) @@ -189,15 +191,29 @@ func fieldType(t reflect.Type, in *Field, definitions map[string][]Field) { t = t.Elem() } + var def interface{} + switch t.Kind() { case reflect.Int, reflect.Int32, reflect.Int64, reflect.Uint, reflect.Uint32, reflect.Uint64: in.Type = "integer" + if in.def != "" { + def, _ = strconv.ParseInt(in.def, 10, 64) + } case reflect.Float32, reflect.Float64: in.Type = "float" + if in.def != "" { + def, _ = strconv.ParseFloat(in.def, 64) + } case reflect.Bool: in.Type = "boolean" + if in.def != "" { + def = in.def == "true" + } case reflect.String: in.Type = "string" + if in.def != "" { + def = in.def + } case reflect.Struct: name := t.String() if _, ok := definitions[name]; !ok { @@ -222,8 +238,17 @@ func fieldType(t reflect.Type, in *Field, definitions map[string][]Field) { } else { panic(fmt.Sprintf("unspport type %s items %s", t.String(), t2.String())) } + if t2.Kind() == reflect.String && in.def != "" { + var s []string + json.Unmarshal([]byte(in.def), &s) + def = s + } default: panic(fmt.Sprintf("unspport type %s", t.String())) // in.Type = "string" } + + if def != nil { + in.Default = def + } } diff --git a/src/modules/monapi/http/router.go b/src/modules/monapi/http/router.go index 217e0425..ceaf41b4 100644 --- a/src/modules/monapi/http/router.go +++ b/src/modules/monapi/http/router.go @@ -20,6 +20,11 @@ func Config(r *gin.Engine) { sys.GET("/addr", addr) } + generic := r.Group("/api/mon").Use(GetCookieUser()) + { + generic.GET("/regions", func(c *gin.Context) { renderData(c, config.Get().Region, nil) }) + } + node := r.Group("/api/mon/node").Use(GetCookieUser()) { node.GET("/:id/maskconf", maskconfGets) @@ -104,7 +109,7 @@ func Config(r *gin.Engine) { collectRules := r.Group("/api/mon/collect-rules").Use(GetCookieUser()) { collectRules.POST("", collectRulePost) // create a collect rule - collectRules.GET("/list", collectRulesGet) // get collect rules + collectRules.GET("/list", collectRulesGetV2) // get collect rules collectRules.GET("", collectRuleGet) // get collect rule by type & id collectRules.PUT("", collectRulePut) // update collect rule by type & id collectRules.DELETE("", collectsRuleDel) // delete collect rules by type & ids diff --git a/src/modules/monapi/http/router_collect.go b/src/modules/monapi/http/router_collect.go index 223e7ac3..490f41f0 100644 --- a/src/modules/monapi/http/router_collect.go +++ b/src/modules/monapi/http/router_collect.go @@ -3,9 +3,11 @@ package http import ( "encoding/json" "fmt" + "log" "regexp" "strings" + "github.com/didi/nightingale/src/models" "github.com/didi/nightingale/src/modules/monapi/collector" "github.com/didi/nightingale/src/modules/monapi/scache" @@ -83,6 +85,20 @@ func collectRulesGet(c *gin.Context) { renderData(c, resp, nil) } +func collectRulesGetV2(c *gin.Context) { + nid := queryInt64(c, "nid", 0) + limit := queryInt(c, "limit", 20) + typ := queryStr(c, "type", "") + + log.Printf("typ %s", typ) + total, list, err := models.GetCollectRules(typ, nid, limit, offset(c, limit, 0)) + + renderData(c, map[string]interface{}{ + "total": total, + "list": list, + }, err) +} + func collectRulePut(c *gin.Context) { var recv CollectRecv errors.Dangerous(c.ShouldBind(&recv)) diff --git a/src/modules/monapi/plugins/all/all.go b/src/modules/monapi/plugins/all/all.go index 2c0f3788..54e635ab 100644 --- a/src/modules/monapi/plugins/all/all.go +++ b/src/modules/monapi/plugins/all/all.go @@ -2,7 +2,7 @@ package all import ( // remote - _ "github.com/didi/nightingale/src/modules/monapi/plugins/api" + // _ "github.com/didi/nightingale/src/modules/monapi/plugins/api" // telegraf style _ "github.com/didi/nightingale/src/modules/monapi/plugins/mongodb" _ "github.com/didi/nightingale/src/modules/monapi/plugins/mysql" diff --git a/src/modules/monapi/plugins/mongodb/mongodb.go b/src/modules/monapi/plugins/mongodb/mongodb.go index 808680c5..b23fa93d 100644 --- a/src/modules/monapi/plugins/mongodb/mongodb.go +++ b/src/modules/monapi/plugins/mongodb/mongodb.go @@ -4,6 +4,7 @@ import ( "fmt" "github.com/didi/nightingale/src/modules/monapi/collector" + "github.com/didi/nightingale/src/modules/monapi/plugins" "github.com/didi/nightingale/src/modules/monapi/plugins/mongodb/mongodb" "github.com/didi/nightingale/src/toolkits/i18n" "github.com/influxdata/telegraf" @@ -72,5 +73,6 @@ func (p *MongodbRule) TelegrafInput() (telegraf.Input, error) { GatherPerdbStats: p.GatherPerdbStats, GatherColStats: p.GatherColStats, ColStatsDbs: p.ColStatsDbs, + Log: plugins.GetLogger(), }, nil } diff --git a/src/modules/monapi/plugins/mongodb/mongodb/mongodb.go b/src/modules/monapi/plugins/mongodb/mongodb/mongodb.go index 1feb40d0..c65b6fd7 100644 --- a/src/modules/monapi/plugins/mongodb/mongodb/mongodb.go +++ b/src/modules/monapi/plugins/mongodb/mongodb/mongodb.go @@ -12,7 +12,6 @@ import ( "github.com/influxdata/telegraf" tlsint "github.com/influxdata/telegraf/plugins/common/tls" - "github.com/influxdata/telegraf/plugins/inputs" "gopkg.in/mgo.v2" ) @@ -186,6 +185,7 @@ func (m *MongoDB) gatherServer(server *Server, acc telegraf.Accumulator) error { return server.gatherData(acc, m.GatherClusterStatus, m.GatherPerdbStats, m.GatherColStats, m.ColStatsDbs) } +/* func init() { inputs.Add("mongodb", func() telegraf.Input { return &MongoDB{ @@ -197,3 +197,4 @@ func init() { } }) } +*/ diff --git a/src/modules/monapi/plugins/mysql/mysql.go b/src/modules/monapi/plugins/mysql/mysql.go index 7510180a..bdcc6231 100644 --- a/src/modules/monapi/plugins/mysql/mysql.go +++ b/src/modules/monapi/plugins/mysql/mysql.go @@ -4,6 +4,7 @@ import ( "fmt" "github.com/didi/nightingale/src/modules/monapi/collector" + "github.com/didi/nightingale/src/modules/monapi/plugins" "github.com/didi/nightingale/src/toolkits/i18n" "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/plugins/inputs/mysql" @@ -64,10 +65,10 @@ var ( ) type MysqlRule struct { - Servers []string `label:"Servers" json:"servers,required" description:"specify servers via a url matching\n[username[:password]@][protocol[(address)]]/[?tls=[true|false|skip-verify|custom]]\nsee https://github.com/go-sql-driver/mysql#dsn-data-source-name" example:"servers = ['user:passwd@tcp(127.0.0.1:3306)/?tls=false']\nservers = ["user@tcp(127.0.0.1:3306)/?tls=false"]"` - PerfEventsStatementsDigestTextLimit int64 `label:"-" json:"-"` - PerfEventsStatementsLimit int64 `label:"-" json:"-"` - PerfEventsStatementsTimeLimit int64 `label:"-" json:"-"` + Servers []string `label:"Servers" json:"servers,required" description:"specify servers via a url matching\n[username[:password]@][protocol[(address)]]/[?tls=[true|false|skip-verify|custom]]\nsee https://github.com/go-sql-driver/mysql#dsn-data-source-name" example:"user:passwd@tcp(127.0.0.1:3306)/?tls=false"` + PerfEventsStatementsDigestTextLimit int64 `label:"Perf Events Statements Digest Text Limit" json:"perf_events_statements_digest_text_limit" default:"120" description:"the limits for metrics form perf_events_statements"` + PerfEventsStatementsLimit int64 `label:"Perf Events Statements Limit" json:"perf_events_statements_limit" default:"250" description:"the limits for metrics form perf_events_statements"` + PerfEventsStatementsTimeLimit int64 `label:"Perf Events Statements Timelimit" json:"perf_events_statements_time_limit" default:"86400" description:"the limits for metrics form perf_events_statements"` TableSchemaDatabases []string `label:"Databases" json:"table_schema_databases" description:"if the list is empty, then metrics are gathered from all database tables"` GatherProcessList bool `label:"Process List" json:"gather_process_list" description:"gather thread state counts from INFORMATION_SCHEMA.PROCESSLIST"` GatherUserStatistics bool `label:"User Statistics" json:"gather_user_statistics" description:"gather user statistics from INFORMATION_SCHEMA.USER_STATISTICS"` @@ -82,8 +83,8 @@ type MysqlRule struct { GatherTableSchema bool `label:"Tables" json:"gather_table_schema" description:"gather metrics from INFORMATION_SCHEMA.TABLES for databases provided above list"` GatherFileEventsStats bool `label:"File Events Stats" json:"gather_file_events_stats" description:"gather metrics from PERFORMANCE_SCHEMA.FILE_SUMMARY_BY_EVENT_NAME"` GatherPerfEventsStatements bool `label:"Perf Events Statements" json:"gather_perf_events_statements" description:"gather metrics from PERFORMANCE_SCHEMA.EVENTS_STATEMENTS_SUMMARY_BY_DIGEST"` - GatherGlobalVars bool `label:"-" json:"-"` - IntervalSlow string `label:"Interval Slow" json:"interval_slow" desc:"Some queries we may want to run less often (such as SHOW GLOBAL VARIABLES)" example:"interval_slow = '30m'" json:"-"` + GatherGlobalVars bool `label:"Global Vars" json:"gather_global_variables" description:"gather metrics from PERFORMANCE_SCHEMA.GLOBAL_VARIABLES" default:"true"` + IntervalSlow string `label:"Interval Slow" json:"interval_slow" desc:"Some queries we may want to run less often (such as SHOW GLOBAL VARIABLES)" example:"30m"` MetricVersion int `label:"-" json:"-"` } @@ -91,6 +92,15 @@ func (p *MysqlRule) Validate() error { if len(p.Servers) == 0 || p.Servers[0] == "" { return fmt.Errorf("mysql.rule.servers must be set") } + if p.PerfEventsStatementsDigestTextLimit == 0 { + p.PerfEventsStatementsDigestTextLimit = 120 + } + if p.PerfEventsStatementsLimit == 0 { + p.PerfEventsStatementsLimit = 250 + } + if p.PerfEventsStatementsTimeLimit == 0 { + p.PerfEventsStatementsTimeLimit = 86400 + } return nil } @@ -101,9 +111,9 @@ func (p *MysqlRule) TelegrafInput() (telegraf.Input, error) { return &mysql.Mysql{ Servers: p.Servers, - PerfEventsStatementsDigestTextLimit: 120, - PerfEventsStatementsLimit: 250, - PerfEventsStatementsTimeLimit: 86400, + PerfEventsStatementsDigestTextLimit: p.PerfEventsStatementsDigestTextLimit, + PerfEventsStatementsLimit: p.PerfEventsStatementsLimit, + PerfEventsStatementsTimeLimit: p.PerfEventsStatementsTimeLimit, TableSchemaDatabases: p.TableSchemaDatabases, GatherProcessList: p.GatherProcessList, GatherUserStatistics: p.GatherUserStatistics, @@ -118,8 +128,9 @@ func (p *MysqlRule) TelegrafInput() (telegraf.Input, error) { GatherTableSchema: p.GatherTableSchema, GatherFileEventsStats: p.GatherFileEventsStats, GatherPerfEventsStatements: p.GatherPerfEventsStatements, - GatherGlobalVars: true, - IntervalSlow: "0m", + GatherGlobalVars: p.GatherGlobalVars, + IntervalSlow: "", MetricVersion: 2, + Log: plugins.GetLogger(), }, nil } diff --git a/src/modules/monapi/plugins/redis/redis.go b/src/modules/monapi/plugins/redis/redis.go index ae913198..b0982911 100644 --- a/src/modules/monapi/plugins/redis/redis.go +++ b/src/modules/monapi/plugins/redis/redis.go @@ -4,6 +4,7 @@ import ( "fmt" "github.com/didi/nightingale/src/modules/monapi/collector" + "github.com/didi/nightingale/src/modules/monapi/plugins" "github.com/didi/nightingale/src/toolkits/i18n" "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/plugins/inputs/redis" @@ -91,5 +92,6 @@ func (p *RedisRule) TelegrafInput() (telegraf.Input, error) { Servers: p.Servers, Commands: commands, Password: p.Password, + Log: plugins.GetLogger(), }, nil } diff --git a/src/modules/monapi/plugins/util.go b/src/modules/monapi/plugins/util.go new file mode 100644 index 00000000..267f065f --- /dev/null +++ b/src/modules/monapi/plugins/util.go @@ -0,0 +1,41 @@ +package plugins + +import ( + "fmt" + + "github.com/toolkits/pkg/logger" +) + +var defaultLogger = Logger{} + +func GetLogger() *Logger { + return &defaultLogger +} + +// telegraf.Logger +type Logger struct{} + +func (l *Logger) Errorf(format string, args ...interface{}) { + logger.LogDepth(logger.ERROR, 1, format, args...) +} +func (l *Logger) Error(args ...interface{}) { + logger.LogDepth(logger.ERROR, 1, fmt.Sprint(args...)) +} +func (l *Logger) Debugf(format string, args ...interface{}) { + logger.LogDepth(logger.DEBUG, 1, format, args...) +} +func (l *Logger) Debug(args ...interface{}) { + logger.LogDepth(logger.DEBUG, 1, fmt.Sprint(args...)) +} +func (l *Logger) Warnf(format string, args ...interface{}) { + logger.LogDepth(logger.WARNING, 1, format, args...) +} +func (l *Logger) Warn(args ...interface{}) { + logger.LogDepth(logger.WARNING, 1, fmt.Sprint(args...)) +} +func (l *Logger) Infof(format string, args ...interface{}) { + logger.LogDepth(logger.INFO, 1, format, args...) +} +func (l *Logger) Info(args ...interface{}) { + logger.LogDepth(logger.INFO, 1, fmt.Sprint(args...)) +} diff --git a/src/modules/monapi/scache/collectrule.go b/src/modules/monapi/scache/collectrule.go index 9a0ff70d..0d1d73cf 100644 --- a/src/modules/monapi/scache/collectrule.go +++ b/src/modules/monapi/scache/collectrule.go @@ -107,7 +107,7 @@ func str(in interface{}) string { } func (p *collectRuleCache) syncCollectRules() { - rules, err := models.GetCollectRules() + rules, err := models.DumpCollectRules() if err != nil { logger.Warningf("get log collectRules err:%v", err) } diff --git a/src/modules/prober/manager/collectrule.go b/src/modules/prober/manager/collectrule.go index 73899ce8..d175f652 100644 --- a/src/modules/prober/manager/collectrule.go +++ b/src/modules/prober/manager/collectrule.go @@ -22,6 +22,7 @@ type collectRule struct { tags map[string]string precision time.Duration metrics []*dataobj.MetricValue + lastAt int64 } func newCollectRule(rule *models.CollectRule) (*collectRule, error) { @@ -120,7 +121,7 @@ func (p *collectRule) prepareMetrics() error { } func (p *collectRule) update(rule *models.CollectRule) error { - if p.CollectRule.LastUpdated == rule.LastUpdated { + if p.CollectRule.UpdatedAt == rule.UpdatedAt { return nil } @@ -257,7 +258,7 @@ func (p *collectRule) AddError(err error) { if err == nil { return } - logger.Debugf("Error in plugin: %v", err) + logger.Debugf("collectRule %s.%s(%d) Error: %s", p.CollectType, p.Name, p.Id, err) } func (p *collectRule) SetPrecision(precision time.Duration) { diff --git a/src/modules/prober/manager/heap.go b/src/modules/prober/manager/heap.go index ff93fc03..39c5b839 100644 --- a/src/modules/prober/manager/heap.go +++ b/src/modules/prober/manager/heap.go @@ -1,8 +1,8 @@ package manager type ruleSummary struct { - id int64 // collect rule id - executeAt int64 + id int64 // collect rule id + activeAt int64 } type ruleSummaryHeap []*ruleSummary @@ -12,7 +12,7 @@ func (h ruleSummaryHeap) Len() int { } func (h ruleSummaryHeap) Less(i, j int) bool { - return h[i].executeAt < h[j].executeAt + return h[i].activeAt < h[j].activeAt } func (h ruleSummaryHeap) Swap(i, j int) { @@ -30,5 +30,5 @@ func (h *ruleSummaryHeap) Pop() interface{} { } func (h *ruleSummaryHeap) Top() *ruleSummary { - return (*h)[len(*h)-1] + return (*h)[0] } diff --git a/src/modules/prober/manager/manager.go b/src/modules/prober/manager/manager.go index 059a8186..904a8c91 100644 --- a/src/modules/prober/manager/manager.go +++ b/src/modules/prober/manager/manager.go @@ -82,7 +82,7 @@ func (p *manager) schedule() error { if p.heap.Len() == 0 { return nil } - if p.heap.Top().executeAt > now { + if p.heap.Top().activeAt > now { return nil } @@ -108,7 +108,12 @@ func (p *manager) schedule() error { p.collectRuleCh <- rule - summary.executeAt = now + int64(ruleConfig.Step) + logger.Debugf("%s %s %d lastAt %ds before nextAt %ds later", + rule.CollectType, rule.Name, rule.Id, + now-rule.lastAt, ruleConfig.Step) + + summary.activeAt = now + int64(ruleConfig.Step) + rule.lastAt = now heap.Push(&p.heap, summary) continue @@ -134,8 +139,8 @@ func (p *manager) AddRule(rule *models.CollectRule) error { p.index[rule.Id] = ruleEntity heap.Push(&p.heap, &ruleSummary{ - id: rule.Id, - executeAt: time.Now().Unix() + int64(rule.Step), + id: rule.Id, + activeAt: time.Now().Unix() + int64(rule.Step), }) return nil } diff --git a/src/modules/prober/prober.go b/src/modules/prober/prober.go index 08127e0d..ecf16c42 100644 --- a/src/modules/prober/prober.go +++ b/src/modules/prober/prober.go @@ -73,6 +73,7 @@ func main() { gin.SetMode(gin.ReleaseMode) } + // for manager -> core.Push() core.InitRpcClients() manager.NewManager(cfg, cache.CollectRule).Start(ctx) @@ -120,6 +121,7 @@ func start() { func ending(cancel context.CancelFunc) { c := make(chan os.Signal, 1) signal.Notify(c, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT) + select { case <-c: fmt.Printf("stop signal caught, stopping... pid=%d\n", os.Getpid())