diff --git a/go.mod b/go.mod index 63eb866d..270bcbd5 100644 --- a/go.mod +++ b/go.mod @@ -8,9 +8,7 @@ require ( github.com/codegangsta/negroni v1.0.0 github.com/coreos/go-oidc v2.2.1+incompatible github.com/dgryski/go-tsz v0.0.0-20180227144327-03b7d791f4fe - github.com/ericchiang/k8s v1.2.0 github.com/garyburd/redigo v1.6.2 - github.com/ghodss/yaml v1.0.1-0.20190212211648-25d852aebe32 github.com/gin-contrib/pprof v1.3.0 github.com/gin-gonic/gin v1.6.3 github.com/go-ping/ping v0.0.0-20201115131931-3300c582a663 @@ -24,12 +22,9 @@ require ( github.com/m3db/m3 v0.15.17 github.com/mattn/go-isatty v0.0.12 github.com/mattn/go-sqlite3 v1.14.0 // indirect - github.com/matttproud/golang_protobuf_extensions v1.0.1 github.com/mojocn/base64Captcha v1.3.1 github.com/open-falcon/rrdlite v0.0.0-20200214140804-bf5829f786ad github.com/pquerna/cachecontrol v0.0.0-20200819021114-67c6ae64274f // indirect - github.com/prometheus/client_model v0.2.0 - github.com/prometheus/common v0.9.1 github.com/robfig/go-cache v0.0.0-20130306151617-9fc39e0dbf62 // indirect github.com/shirou/gopsutil v3.20.11+incompatible // indirect github.com/spaolacci/murmur3 v1.1.0 @@ -45,7 +40,6 @@ require ( gopkg.in/alexcesaro/quotedprintable.v3 v3.0.0-20150716171945-2caba252f4dc // indirect gopkg.in/gomail.v2 v2.0.0-20160411212932-81ebce5c23df gopkg.in/ldap.v3 v3.1.0 - gopkg.in/mgo.v2 v2.0.0-20180705113604-9856a29383ce gopkg.in/square/go-jose.v2 v2.5.1 // indirect gopkg.in/yaml.v2 v2.3.0 xorm.io/core v0.7.3 diff --git a/src/models/mon_collect.go b/src/models/mon_collect.go index 85aaa265..a6719494 100644 --- a/src/models/mon_collect.go +++ b/src/models/mon_collect.go @@ -525,32 +525,32 @@ func (a *ApiCollect) Update() error { return err } -func CreateCollect(collectType, creator string, collect interface{}) error { +func CreateCollect(collectType, creator string, collect interface{}, dryRun bool) (err error) { session := DB["mon"].NewSession() - defer session.Close() - - err := session.Begin() - if err != nil { + if err = session.Begin(); err != nil { + session.Close() return err } + defer func() { + if err != nil || dryRun { + session.Rollback() + } else { + err = session.Commit() + } + session.Close() + }() - if _, err := session.Insert(collect); err != nil { - session.Rollback() - return err + if _, err = session.Insert(collect); err != nil { + return } - b, err := json.Marshal(collect) - if err != nil { - session.Rollback() - return err + var b []byte + if b, err = json.Marshal(collect); err != nil { + return } - if err := saveHistory(0, collectType, "create", creator, string(b), session); err != nil { - session.Rollback() - return err - } - - return session.Commit() + err = saveHistory(0, collectType, "create", creator, string(b), session) + return } func DeleteCollectById(collectType, creator string, cid int64) error { diff --git a/src/models/mon_collect_rule.go b/src/models/mon_collect_rule.go index e7240cf5..a0468523 100644 --- a/src/models/mon_collect_rule.go +++ b/src/models/mon_collect_rule.go @@ -21,6 +21,7 @@ type CollectRule struct { Name string `json:"name" describes:"customize name"` Region string `json:"region"` Comment string `json:"comment"` + DryRun bool `json:"dryrun" xorm:"-"` Data json.RawMessage `json:"data"` Tags string `json:"tags" description:"k1=v1,k2=v2,k3=v3,..."` Creator string `json:"creator" description:"just for output"` @@ -100,36 +101,32 @@ func GetCollectRules(typ string, nid int64, limit, offset int) (total int64, lis return } -func (p *CollectRule) Update() error { +func (p *CollectRule) Update() (err error) { session := DB["mon"].NewSession() - defer session.Close() - - err := session.Begin() - if err != nil { - return err + if err = session.Begin(); err != nil { + session.Close() + return } + defer func() { + if err != nil || p.DryRun { + session.Rollback() + } else { + err = session.Commit() + } + session.Close() + }() if _, err = session.Id(p.Id).AllCols().Update(p); err != nil { - session.Rollback() - return err + return } - b, err := json.Marshal(p) - if err != nil { - session.Rollback() - return err + var b []byte + if b, err = json.Marshal(p); err != nil { + return } - if err := saveHistory(p.Id, p.CollectType, "update", p.Creator, string(b), session); err != nil { - session.Rollback() - return err - } - - if err = session.Commit(); err != nil { - return err - } - - return err + err = saveHistory(p.Id, p.CollectType, "update", p.Creator, string(b), session) + return } func DeleteCollectRule(sid int64) error { diff --git a/src/models/sso_token.go b/src/models/sso_token.go index 46531288..05ccb174 100644 --- a/src/models/sso_token.go +++ b/src/models/sso_token.go @@ -25,10 +25,17 @@ type Token struct { } func TokenAll() (int64, error) { + if _, ok := DB["sso"]; !ok { + return 0, nil + } return DB["sso"].Count(new(Token)) } func TokenGet(token string) (*Token, error) { + if _, ok := DB["sso"]; !ok { + return nil, nil + } + var obj Token has, err := DB["sso"].Where("access_token=?", token).Get(&obj) if err != nil { @@ -54,16 +61,25 @@ func (p *Token) Session() *Session { } func (p *Token) Update(cols ...string) error { + if _, ok := DB["sso"]; !ok { + return nil + } _, err := DB["sso"].Where("access_token=?", p.AccessToken).Cols(cols...).Update(p) return err } func TokenDelete(token string) error { + if _, ok := DB["sso"]; !ok { + return nil + } _, err := DB["sso"].Where("access_token=?", token).Delete(new(Token)) return err } func TokenGets(where string, args ...interface{}) (tokens []Token, err error) { + if _, ok := DB["sso"]; !ok { + return + } if where != "" { err = DB["sso"].Where(where, args...).Find(&tokens) } else { diff --git a/src/modules/monapi/collector/basecollector.go b/src/modules/monapi/collector/basecollector.go index c39ec988..8fae9d67 100644 --- a/src/modules/monapi/collector/basecollector.go +++ b/src/modules/monapi/collector/basecollector.go @@ -1,11 +1,14 @@ package collector import ( + "bytes" "encoding/json" "fmt" "time" + "github.com/didi/nightingale/src/common/dataobj" "github.com/didi/nightingale/src/models" + "github.com/didi/nightingale/src/modules/prober/manager/accumulator" "github.com/influxdata/telegraf" ) @@ -113,7 +116,52 @@ func (p BaseCollector) Create(data []byte, username string) error { if old != nil { return fmt.Errorf("同节点下策略名称 %s 已存在", collect.Name) } - return models.CreateCollect(p.name, username, collect) + + if err := models.CreateCollect(p.name, username, collect, collect.DryRun); err != nil { + return err + } + + if collect.DryRun { + return p.dryRun(rule) + } + + return nil +} + +func (p BaseCollector) dryRun(rule TelegrafPlugin) error { + input, err := rule.TelegrafInput() + if err != nil { + return err + } + + metrics := []*dataobj.MetricValue{} + + acc, err := accumulator.New(accumulator.Options{Name: "plugin-dryrun", Metrics: &metrics}) + if err != nil { + return err + } + + if err = input.Gather(acc); err != nil { + return err + } + + buf := &bytes.Buffer{} + for k, v := range metrics { + fmt.Fprintf(buf, "%d %s %s %f\n", k, v.CounterType, v.PK(), v.Value) + } + return NewDryRunError(buf.String()) +} + +type DryRun struct { + msg string +} + +func (p DryRun) Error() string { + return p.msg +} + +func NewDryRunError(msg string) error { + return DryRun{msg} } func (p BaseCollector) Update(data []byte, username string) error { @@ -153,7 +201,15 @@ func (p BaseCollector) Update(data []byte, username string) error { return fmt.Errorf("同节点下策略名称 %s 已存在", collect.Name) } - return collect.Update() + if err := collect.Update(); err != nil { + return err + } + + if collect.DryRun { + return p.dryRun(rule) + } + + return nil } func (p BaseCollector) Delete(id int64, username string) error { diff --git a/src/modules/monapi/http/router_collect.go b/src/modules/monapi/http/router_collect.go index 276c30a9..536593a0 100644 --- a/src/modules/monapi/http/router_collect.go +++ b/src/modules/monapi/http/router_collect.go @@ -1,6 +1,7 @@ package http import ( + "bytes" "encoding/json" "fmt" "regexp" @@ -24,17 +25,23 @@ func collectRulePost(c *gin.Context) { var recv []CollectRecv errors.Dangerous(c.ShouldBind(&recv)) + buf := &bytes.Buffer{} creator := loginUsername(c) for _, obj := range recv { cl, err := collector.GetCollector(obj.Type) errors.Dangerous(err) if err := cl.Create([]byte(obj.Data), creator); err != nil { - errors.Bomb("%s add rule err %s", obj.Type, err) + if _, ok := err.(collector.DryRun); ok { + fmt.Fprintf(buf, "%s\n", err) + } else { + errors.Bomb("%s add rule err %s", obj.Type, err) + } } } - renderData(c, "ok", nil) + buf.WriteString("ok") + renderData(c, buf.String(), nil) } func collectRulesGetByLocalEndpoint(c *gin.Context) { @@ -104,11 +111,17 @@ func collectRulePut(c *gin.Context) { cl, err := collector.GetCollector(recv.Type) errors.Dangerous(err) + buf := &bytes.Buffer{} creator := loginUsername(c) if err := cl.Update([]byte(recv.Data), creator); err != nil { - errors.Bomb("%s update rule err %s", recv.Type, err) + if _, ok := err.(collector.DryRun); ok { + fmt.Fprintf(buf, "%s\n", err) + } else { + errors.Bomb("%s update rule err %s", recv.Type, err) + } } - renderData(c, "ok", nil) + buf.WriteString("ok") + renderData(c, buf.String(), nil) } type CollectsDelRev struct { diff --git a/src/modules/monapi/monapi.go b/src/modules/monapi/monapi.go index af4efcd6..6dc7ba4a 100644 --- a/src/modules/monapi/monapi.go +++ b/src/modules/monapi/monapi.go @@ -12,6 +12,7 @@ import ( "github.com/didi/nightingale/src/models" "github.com/didi/nightingale/src/modules/monapi/acache" "github.com/didi/nightingale/src/modules/monapi/alarm" + "github.com/didi/nightingale/src/modules/monapi/collector" "github.com/didi/nightingale/src/modules/monapi/config" "github.com/didi/nightingale/src/modules/monapi/http" "github.com/didi/nightingale/src/modules/monapi/redisc" @@ -96,6 +97,8 @@ func main() { go alarm.CleanEventLoop() } + pluginInfo() + http.Start() ending() } @@ -140,3 +143,10 @@ func pconf() { os.Exit(1) } } + +func pluginInfo() { + fmt.Println("remote collector") + for k, v := range collector.GetRemoteCollectors() { + fmt.Printf(" %d %s\n", k, v) + } +} diff --git a/src/modules/monapi/plugins/log/log.go b/src/modules/monapi/plugins/log/log.go index 90fae047..df2f0092 100644 --- a/src/modules/monapi/plugins/log/log.go +++ b/src/modules/monapi/plugins/log/log.go @@ -82,7 +82,7 @@ func (p LogCollector) Create(data []byte, username string) error { if old != nil { return fmt.Errorf("同节点下策略名称 %s 已存在", name) } - return models.CreateCollect(p.Name(), username, collector) + return models.CreateCollect(p.Name(), username, collector, false) } func (p LogCollector) Update(data []byte, username string) error { diff --git a/src/modules/monapi/plugins/plugin/plugin.go b/src/modules/monapi/plugins/plugin/plugin.go index 8a745c25..419ce7cd 100644 --- a/src/modules/monapi/plugins/plugin/plugin.go +++ b/src/modules/monapi/plugins/plugin/plugin.go @@ -79,7 +79,7 @@ func (p PluginCollector) Create(data []byte, username string) error { if old != nil { return fmt.Errorf("同节点下策略名称 %s 已存在", name) } - return models.CreateCollect(p.Name(), username, collect) + return models.CreateCollect(p.Name(), username, collect, false) } func (p PluginCollector) Update(data []byte, username string) error { diff --git a/src/modules/monapi/plugins/port/port.go b/src/modules/monapi/plugins/port/port.go index e63b24ff..05377e3f 100644 --- a/src/modules/monapi/plugins/port/port.go +++ b/src/modules/monapi/plugins/port/port.go @@ -79,7 +79,7 @@ func (p PortCollector) Create(data []byte, username string) error { if old != nil { return fmt.Errorf("同节点下策略名称 %s 已存在", name) } - return models.CreateCollect(p.Name(), username, collect) + return models.CreateCollect(p.Name(), username, collect, false) } func (p PortCollector) Update(data []byte, username string) error { diff --git a/src/modules/monapi/plugins/proc/proc.go b/src/modules/monapi/plugins/proc/proc.go index 2303c437..a86b8c32 100644 --- a/src/modules/monapi/plugins/proc/proc.go +++ b/src/modules/monapi/plugins/proc/proc.go @@ -79,7 +79,7 @@ func (p ProcCollector) Create(data []byte, username string) error { if old != nil { return fmt.Errorf("同节点下策略名称 %s 已存在", name) } - return models.CreateCollect(p.Name(), username, collect) + return models.CreateCollect(p.Name(), username, collect, false) } func (p ProcCollector) Update(data []byte, username string) error { diff --git a/src/modules/monapi/plugins/util.go b/src/modules/monapi/plugins/util.go index 6f3cdd4e..1d76994d 100644 --- a/src/modules/monapi/plugins/util.go +++ b/src/modules/monapi/plugins/util.go @@ -6,7 +6,7 @@ import ( "testing" "github.com/didi/nightingale/src/common/dataobj" - "github.com/didi/nightingale/src/modules/prober/manager" + "github.com/didi/nightingale/src/modules/prober/manager/accumulator" "github.com/influxdata/telegraf" "github.com/toolkits/pkg/logger" ) @@ -83,7 +83,7 @@ func PluginTest(t *testing.T, plugin telegrafPlugin) telegraf.Input { func PluginInputTest(t *testing.T, input telegraf.Input) { metrics := []*dataobj.MetricValue{} - acc, err := manager.NewAccumulator(manager.AccumulatorOptions{Name: "plugin-test", Metrics: &metrics}) + acc, err := accumulator.New(accumulator.Options{Name: "plugin-test", Metrics: &metrics}) if err != nil { t.Error(err) } diff --git a/src/modules/prober/manager/accumulator.go b/src/modules/prober/manager/accumulator/accumulator.go similarity index 94% rename from src/modules/prober/manager/accumulator.go rename to src/modules/prober/manager/accumulator/accumulator.go index 46e8b9ed..7e094c27 100644 --- a/src/modules/prober/manager/accumulator.go +++ b/src/modules/prober/manager/accumulator/accumulator.go @@ -1,4 +1,4 @@ -package manager +package accumulator import ( "fmt" @@ -7,17 +7,18 @@ import ( "time" "github.com/didi/nightingale/src/common/dataobj" + "github.com/didi/nightingale/src/modules/prober/manager/metric" "github.com/influxdata/telegraf" "github.com/toolkits/pkg/logger" ) -type AccumulatorOptions struct { +type Options struct { Name string Tags map[string]string Metrics *[]*dataobj.MetricValue } -func (p *AccumulatorOptions) Validate() error { +func (p *Options) Validate() error { if p.Name == "" { return fmt.Errorf("unable to get Name") } @@ -28,8 +29,8 @@ func (p *AccumulatorOptions) Validate() error { return nil } -// NewAccumulator return telegraf.Accumulator -func NewAccumulator(opt AccumulatorOptions) (telegraf.Accumulator, error) { +// New return telegraf.Accumulator +func New(opt Options) (telegraf.Accumulator, error) { if err := opt.Validate(); err != nil { return nil, err } @@ -126,7 +127,7 @@ func (p *accumulator) addFields( tp telegraf.ValueType, t ...time.Time, ) { - m, err := NewMetric(measurement, tags, fields, p.getTime(t), tp) + m, err := metric.NewMetric(measurement, tags, fields, p.getTime(t), tp) if err != nil { return } diff --git a/src/modules/prober/manager/collectrule.go b/src/modules/prober/manager/collectrule.go index c5100325..421aef47 100644 --- a/src/modules/prober/manager/collectrule.go +++ b/src/modules/prober/manager/collectrule.go @@ -9,6 +9,7 @@ import ( "github.com/didi/nightingale/src/models" "github.com/didi/nightingale/src/modules/monapi/collector" "github.com/didi/nightingale/src/modules/prober/config" + "github.com/didi/nightingale/src/modules/prober/manager/accumulator" "github.com/influxdata/telegraf" "github.com/toolkits/pkg/logger" ) @@ -43,7 +44,7 @@ func newCollectRule(rule *models.CollectRule) (*collectRule, error) { metrics := []*dataobj.MetricValue{} - acc, err := NewAccumulator(AccumulatorOptions{ + acc, err := accumulator.New(accumulator.Options{ Name: fmt.Sprintf("%s-%d", rule.CollectType, rule.Id), Tags: tags, Metrics: &metrics}) @@ -176,7 +177,7 @@ func (p *collectRule) update(rule *models.CollectRule) error { return err } - acc, err := NewAccumulator(AccumulatorOptions{ + acc, err := accumulator.New(accumulator.Options{ Name: fmt.Sprintf("%s-%d", rule.CollectType, rule.Id), Tags: tags, Metrics: p.metrics}) diff --git a/src/modules/prober/manager/metric.go b/src/modules/prober/manager/metric/metric.go similarity index 99% rename from src/modules/prober/manager/metric.go rename to src/modules/prober/manager/metric/metric.go index 05e482e8..5d5e3af1 100644 --- a/src/modules/prober/manager/metric.go +++ b/src/modules/prober/manager/metric/metric.go @@ -1,4 +1,4 @@ -package manager +package metric import ( "fmt"