use rabbitmq

This commit is contained in:
Ulric Qin 2020-09-26 23:09:03 +08:00
parent 4197cfba98
commit 9e881bc9a5
10 changed files with 410 additions and 68 deletions

1
go.mod
View File

@ -33,6 +33,7 @@ require (
github.com/shirou/gopsutil v2.20.7+incompatible
github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72
github.com/spf13/viper v1.7.1
github.com/streadway/amqp v1.0.0
github.com/stretchr/testify v1.6.1
github.com/toolkits/pkg v1.1.2
github.com/ugorji/go/codec v1.1.7

2
go.sum
View File

@ -363,6 +363,8 @@ github.com/spf13/pflag v1.0.3 h1:zPAT6CGy6wXeQ7NtTnaTerfKOsV6V6F8agHXFiazDkg=
github.com/spf13/pflag v1.0.3/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4=
github.com/spf13/viper v1.7.1 h1:pM5oEahlgWv/WnHXpgbKz7iLIxRf65tye2Ci+XFK5sk=
github.com/spf13/viper v1.7.1/go.mod h1:8WkrPz2fc9jxqZNCJI/76HCieCp4Q8HaLFoCha5qpdg=
github.com/streadway/amqp v1.0.0 h1:kuuDrUJFZL1QYL9hUNuCxNObNzB0bV/ZG5jV3RWAQgo=
github.com/streadway/amqp v1.0.0/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.2.0/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=

View File

@ -86,6 +86,10 @@ func NodeGet(where string, args ...interface{}) (*Node, error) {
return &obj, nil
}
func NodeGetById(id int64) (*Node, error) {
return NodeGet("id=?", id)
}
// NodeGets 在所有节点范围查询比如管理员看服务树就需要load所有数据
func NodeGets(where string, args ...interface{}) (nodes []Node, err error) {
if where != "" {

View File

@ -0,0 +1,133 @@
package models
import (
"fmt"
)
type ResourceRegisterItem struct {
UUID string `json:"uuid"`
Ident string `json:"ident"`
Name string `json:"name"`
Labels string `json:"labels"`
Extend string `json:"extend"`
Cate string `json:"cate"`
NID int64 `json:"nid"`
}
func (i ResourceRegisterItem) Validate() error {
if i.Cate == "" {
return fmt.Errorf("cate is blank")
}
if i.UUID == "" {
return fmt.Errorf("uuid is blank")
}
if i.Ident == "" {
return fmt.Errorf("ident is blank")
}
return nil
}
// ResourceRegisterFor3rd 用于第三方资源注册 errCode=400: 表示传入的参数有问题 errCode=500: 表示DB出了问题
// 之所以要通过errCode对错误做区分是因为这个方法同时被同步和异步两种方式调用上层需要依托这个信息做判断
func ResourceRegisterFor3rd(item ResourceRegisterItem) (errCode int, err error) {
err = item.Validate()
if err != nil {
return 400, err
}
node, err := NodeGetById(item.NID)
if err != nil {
return 500, err
}
if node == nil {
return 400, fmt.Errorf("node not found")
}
if node.Cate != "project" {
return 400, fmt.Errorf("node not project")
}
res, err := ResourceGet("uuid=?", item.UUID)
if err != nil {
return 500, err
}
if res != nil {
// 这个资源之前就已经存在过了,这次可能是更新了部分字段
res.Name = item.Name
res.Labels = item.Labels
res.Extend = item.Extend
err = res.Update("name", "labels", "extend")
if err != nil {
return 500, err
}
} else {
// 之前没有过这个资源在RDB注册这个资源
res = new(Resource)
res.UUID = item.UUID
res.Ident = item.Ident
res.Name = item.Name
res.Labels = item.Labels
res.Extend = item.Extend
res.Cate = item.Cate
res.Tenant = node.Tenant()
err = res.Save()
if err != nil {
return 500, err
}
}
// 检查这个资源是否有挂载过,没有的话就补齐挂载关系,这个动作是幂等的
leafPath := node.Path + "." + item.Cate
leafNode, err := NodeGet("path=?", leafPath)
if err != nil {
return 500, err
}
// 第一个挂载位置:项目下面的${cate}节点
if leafNode == nil {
leafNode, err = node.CreateChild(item.Cate, item.Cate, "", "resource", "system", 1, 1, []int64{})
if err != nil {
return 500, err
}
}
err = leafNode.Bind([]int64{res.Id})
if err != nil {
return 500, err
}
// 第二个挂载位置inner.${cate}
innerCatePath := "inner." + item.Cate
innerCateNode, err := NodeGet("path=?", innerCatePath)
if err != nil {
return 500, err
}
if innerCateNode == nil {
innerNode, err := NodeGet("path=?", "inner")
if err != nil {
return 500, err
}
if innerNode == nil {
return 500, fmt.Errorf("inner node not exists, maybe forget init system")
}
innerCateNode, err = innerNode.CreateChild(item.Cate, item.Cate, "", "resource", "system", 1, 1, []int64{})
if err != nil {
return 500, err
}
}
err = innerCateNode.Bind([]int64{res.Id})
if err != nil {
return 500, err
}
return 0, nil
}

View File

@ -9,13 +9,14 @@ import (
)
type ConfigT struct {
Logger loggeri.Config `yaml:"logger"`
HTTP httpSection `yaml:"http"`
LDAP ldapSection `yaml:"ldap"`
SSO ssoSection `yaml:"sso"`
Tokens []string `yaml:"tokens"`
Redis redisSection `yaml:"redis"`
Sender map[string]senderSection `yaml:"sender"`
Logger loggeri.Config `yaml:"logger"`
HTTP httpSection `yaml:"http"`
LDAP ldapSection `yaml:"ldap"`
SSO ssoSection `yaml:"sso"`
Tokens []string `yaml:"tokens"`
Redis redisSection `yaml:"redis"`
Sender map[string]senderSection `yaml:"sender"`
RabbitMQ rabbitmqSection `yaml:"rabbitmq"`
}
type ssoSection struct {
@ -74,6 +75,11 @@ type timeoutSection struct {
Write int `yaml:"write"`
}
type rabbitmqSection struct {
Addr string `yaml:"addr"`
Queue string `yaml:"queue"`
}
var Config *ConfigT
// Parse configuration file

View File

@ -330,7 +330,7 @@ func (f v1ResourcesRegisterItem) Validate() {
// 资源注册后面要用MQ的方式不能用HTTP接口RDB可能挂数据库可能挂如果RDB或数据库挂了子系统就会注册资源失败
// MQ的方式就不怕RDB挂掉了使用MQ的手工ack方式只有确认资源正常入库了才发送ack给MQ
func v1ResourcesRegisterPost(c *gin.Context) {
var items []v1ResourcesRegisterItem
var items []models.ResourceRegisterItem
bind(c, &items)
count := len(items)
@ -339,66 +339,10 @@ func v1ResourcesRegisterPost(c *gin.Context) {
}
for i := 0; i < count; i++ {
items[i].Validate()
node := Node(items[i].NID)
if node.Cate != "project" {
bomb("node not project")
}
res, err := models.ResourceGet("uuid=?", items[i].UUID)
dangerous(err)
if res != nil {
// 这个资源之前就已经存在过了,这次可能是更新了部分字段
res.Name = items[i].Name
res.Labels = items[i].Labels
res.Extend = items[i].Extend
dangerous(res.Update("name", "labels", "extend"))
} else {
// 之前没有过这个资源在RDB注册这个资源
res = new(models.Resource)
res.UUID = items[i].UUID
res.Ident = items[i].Ident
res.Name = items[i].Name
res.Labels = items[i].Labels
res.Extend = items[i].Extend
res.Cate = items[i].Cate
res.Tenant = node.Tenant()
dangerous(res.Save())
}
// 检查这个资源是否有挂载过,没有的话就补齐挂载关系,这个动作是幂等的
leafPath := node.Path + "." + items[i].Cate
leafNode, err := models.NodeGet("path=?", leafPath)
dangerous(err)
// 第一个挂载位置:项目下面的${cate}节点
if leafNode == nil {
leafNode, err = node.CreateChild(items[i].Cate, items[i].Cate, "", "resource", "system", 1, 1, []int64{})
errCode, err := models.ResourceRegisterFor3rd(items[i])
if errCode != 0 {
dangerous(err)
}
dangerous(leafNode.Bind([]int64{res.Id}))
// 第二个挂载位置inner.${cate}
innerCatePath := "inner." + items[i].Cate
innerCateNode, err := models.NodeGet("path=?", innerCatePath)
dangerous(err)
if innerCateNode == nil {
innerNode, err := models.NodeGet("path=?", "inner")
dangerous(err)
if innerNode == nil {
bomb("inner node not exists")
}
innerCateNode, err = innerNode.CreateChild(items[i].Cate, items[i].Cate, "", "resource", "system", 1, 1, []int64{})
dangerous(err)
}
dangerous(innerCateNode.Bind([]int64{res.Id}))
}
renderMessage(c, nil)

View File

@ -0,0 +1,25 @@
package rabbitmq
import (
"log"
"github.com/streadway/amqp"
)
var (
conn *amqp.Connection
exit = make(chan bool)
)
func Init(url string) {
var err error
conn, err = amqp.Dial(url)
if err != nil {
log.Fatal(err)
}
}
func Shutdown() {
conn.Close()
exit <- true
}

View File

@ -0,0 +1,90 @@
package rabbitmq
import (
"time"
"github.com/toolkits/pkg/logger"
)
func Consume(queueName string) {
go func(queueName string) {
for {
sleep := consume(queueName)
if sleep {
time.Sleep(300 * time.Millisecond)
}
if _, ok := <-exit; ok {
return
}
}
}(queueName)
}
// 如果操作MQ出现问题或者没有load到数据就sleep一下
func consume(queueName string) bool {
ch, err := conn.Channel()
if err != nil {
logger.Error(err)
return true
}
defer ch.Close()
q, err := ch.QueueDeclare(
queueName, // name
true, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
if err != nil {
logger.Error(err)
return true
}
err = ch.Qos(
0, // prefetch count
0, // prefetch size
false, // global
)
if err != nil {
logger.Error(err)
return true
}
msgs, err := ch.Consume(
q.Name, // queue
"", // consumer
false, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
if err != nil {
logger.Error(err)
return true
}
size := 0
for d := range msgs {
size++
logger.Infof("rabbitmq consume message: %s", d.Body)
if handleMessage(d.Body) {
d.Ack(true)
} else {
// 底层代码认为不应该ack说明处理的过程出现问题可能是DB有问题之类的sleep一下
return true
}
}
if size == 0 {
// MQ里没有消息就sleep一下否则上层代码一直在死循环空转浪费算力
return true
}
return false
}

View File

@ -0,0 +1,125 @@
package rabbitmq
import (
"encoding/json"
"github.com/toolkits/pkg/logger"
"github.com/didi/nightingale/src/models"
)
type MQRequest struct {
Method string `json:"method"`
Payload interface{} `json:"payload"`
}
// 返回的bool值代表是否让上层给mq发送ack
func handleMessage(msgBody []byte) bool {
if len(msgBody) <= 0 {
logger.Warning("msg body is blank")
// 这是个异常消息需要ack并丢弃
return true
}
var req MQRequest
err := json.Unmarshal(msgBody, &req)
if err != nil {
logger.Warning("unmarshal msg body fail")
return true
}
if req.Method == "" {
logger.Warning("mq_request.method is blank")
return true
}
logger.Infof("mq_request, method: %s, payload: %v", req.Method, req.Payload)
jsonBytes, err := json.Marshal(req.Payload)
if err != nil {
logger.Warning("mq_request.payload marshal fail: ", err)
return true
}
err = dispatchHandler(req.Method, jsonBytes)
if err != nil {
// 如果处理的有问题可能是后端DB挂了不能ack等DB恢复了还可以继续处理
return false
}
return true
}
func dispatchHandler(method string, jsonBytes []byte) error {
switch method {
case "oplog_add":
return oplogAdd(jsonBytes)
case "resource_register":
return resourceRegister(jsonBytes)
case "resource_unregister":
return resourceUnregister(jsonBytes)
default:
logger.Warning("mq_request.method not support")
return nil
}
}
// 第三方系统通过MQ把操作日志推给RDB保存
func oplogAdd(jsonBytes []byte) error {
var ol models.OperationLog
err := json.Unmarshal(jsonBytes, &ol)
if err != nil {
// 传入的数据不合理无法decode这种数据要被消费丢掉
logger.Error("cannot unmarshal OperationLog: ", err)
return nil
}
return ol.New()
}
// 第三方系统比如RDS、Redis等资源创建了要注册到RDB
func resourceRegister(jsonBytes []byte) error {
var item models.ResourceRegisterItem
err := json.Unmarshal(jsonBytes, &item)
if err != nil {
logger.Warning(err)
return nil
}
errCode, err := models.ResourceRegisterFor3rd(item)
if errCode == 0 {
return nil
}
if errCode == 400 {
logger.Warningf("item invalid: %v", err)
return nil
}
// errCode == 500
logger.Errorf("system internal error: %v", err)
return err
}
// 第三方系统比如RDS、Redis等资源销毁了要通知到RDB
func resourceUnregister(jsonBytes []byte) error {
var uuids []string
err := json.Unmarshal(jsonBytes, &uuids)
if err != nil {
logger.Error(err)
// 这种错误不需要重试所以也就不需要return err了
return nil
}
if len(uuids) == 0 {
return nil
}
err = models.ResourceUnregister(uuids)
if err != nil {
logger.Error(err)
return err
}
return nil
}

View File

@ -17,6 +17,7 @@ import (
"github.com/didi/nightingale/src/modules/rdb/config"
"github.com/didi/nightingale/src/modules/rdb/cron"
"github.com/didi/nightingale/src/modules/rdb/http"
"github.com/didi/nightingale/src/modules/rdb/rabbitmq"
"github.com/didi/nightingale/src/modules/rdb/redisc"
"github.com/didi/nightingale/src/modules/rdb/ssoc"
)
@ -62,10 +63,16 @@ func main() {
ssoc.InitSSO()
// 初始化redis用来发送邮件短信等
// 初始化 redis 用来发送邮件短信等
redisc.InitRedis()
cron.InitWorker()
// 初始化 rabbitmq 处理部分异步逻辑
if len(config.Config.RabbitMQ.Addr) > 0 {
rabbitmq.Init(config.Config.RabbitMQ.Addr)
rabbitmq.Consume(config.Config.RabbitMQ.Queue)
}
go cron.ConsumeMail()
go cron.ConsumeSms()
go cron.ConsumeVoice()
@ -94,5 +101,10 @@ func endingProc() {
logger.Close()
http.Shutdown()
redisc.CloseRedis()
fmt.Println("portal stopped successfully")
if len(config.Config.RabbitMQ.Addr) > 0 {
rabbitmq.Shutdown()
}
fmt.Println("stopped successfully")
}