From fe81c6cad71c140e3f4e48f7cf1470511a548019 Mon Sep 17 00:00:00 2001 From: alick-liming Date: Mon, 28 Sep 2020 17:20:31 +0800 Subject: [PATCH] =?UTF-8?q?=E9=85=8D=E7=BD=AE=E8=B0=83=E6=95=B4=EF=BC=8C?= =?UTF-8?q?=E8=BF=9E=E6=8E=A5=E6=A3=80=E6=9F=A5=20(#314)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * 1.rabbitmq 配置文件 2.连接检查 * 代码调整 * 启动协程 * rabbitmq 连接检查代码调优 * rabbitmq 连接检查代码调优 Co-authored-by: alickliming --- etc/rdb.yml | 5 +++ src/modules/rdb/config/yaml.go | 5 ++- src/modules/rdb/rabbitmq/conn.go | 50 +++++++++++++++++++++++ src/modules/rdb/rabbitmq/queue_consume.go | 34 +++++++++++---- src/modules/rdb/rdb.go | 6 +-- 5 files changed, 86 insertions(+), 14 deletions(-) diff --git a/etc/rdb.yml b/etc/rdb.yml index 2ade7f7d..c1a6eebc 100644 --- a/etc/rdb.yml +++ b/etc/rdb.yml @@ -51,6 +51,11 @@ redis: read: 3000 write: 3000 +rabbitmq: + enable: false + addr: amqp://root:1234@127.0.0.1:5672/ + queue: test + sender: mail: # three choice: smtp|shell|api diff --git a/src/modules/rdb/config/yaml.go b/src/modules/rdb/config/yaml.go index 72b206c3..051e28ad 100644 --- a/src/modules/rdb/config/yaml.go +++ b/src/modules/rdb/config/yaml.go @@ -76,8 +76,9 @@ type timeoutSection struct { } type rabbitmqSection struct { - Addr string `yaml:"addr"` - Queue string `yaml:"queue"` + Enable bool `yaml:"enable"` + Addr string `yaml:"addr"` + Queue string `yaml:"queue"` } var Config *ConfigT diff --git a/src/modules/rdb/rabbitmq/conn.go b/src/modules/rdb/rabbitmq/conn.go index bee5089b..18606dbf 100644 --- a/src/modules/rdb/rabbitmq/conn.go +++ b/src/modules/rdb/rabbitmq/conn.go @@ -1,9 +1,11 @@ package rabbitmq import ( + "fmt" "log" "github.com/streadway/amqp" + "github.com/toolkits/pkg/logger" ) var ( @@ -19,6 +21,54 @@ func Init(url string) { } } +// ping 测试rabbitmq连接是否正常 +func ping() (err error) { + if conn == nil { + return fmt.Errorf("conn is nil") + } + + ch, err := conn.Channel() + if err != nil { + logger.Error(err) + close() + return err + } + + defer ch.Close() + + err = ch.ExchangeDeclare("ping.ping", "topic", false, true, false, true, nil) + if err != nil { + logger.Error(err) + return err + } + + msgContent := "ping.ping" + err = ch.Publish("ping.ping", "ping.ping", false, false, amqp.Publishing{ + ContentType: "text/plain", + Body: []byte(msgContent), + }) + if err != nil { + logger.Error(err) + return err + } + err = ch.ExchangeDelete("ping.ping", false, false) + if err != nil { + logger.Error(err) + } + + return err +} + +func close() { + if conn != nil { + err := conn.Close() + if err != nil { + logger.Error(err) + } + conn = nil + } +} + func Shutdown() { conn.Close() exit <- true diff --git a/src/modules/rdb/rabbitmq/queue_consume.go b/src/modules/rdb/rabbitmq/queue_consume.go index 633f6ccb..48f50d5d 100644 --- a/src/modules/rdb/rabbitmq/queue_consume.go +++ b/src/modules/rdb/rabbitmq/queue_consume.go @@ -3,26 +3,42 @@ package rabbitmq import ( "time" + "github.com/streadway/amqp" "github.com/toolkits/pkg/logger" ) -func Consume(queueName string) { - go func(queueName string) { - for { +// Consume 消费消息 +func Consume(url, queueName string) { + for { + select { + case <-exit: + return + default: + if err := ping(); err != nil { + logger.Error("rabbitmq conn failed") + conn, err = amqp.Dial(url) + if err != nil { + conn = nil + logger.Error(err) + time.Sleep(500 * time.Millisecond) + continue + } + } + sleep := consume(queueName) if sleep { time.Sleep(300 * time.Millisecond) } - - if _, ok := <-exit; ok { - return - } } - }(queueName) + } } // 如果操作MQ出现问题,或者没有load到数据,就sleep一下 func consume(queueName string) bool { + if conn == nil { + return true + } + ch, err := conn.Channel() if err != nil { logger.Error(err) @@ -45,7 +61,7 @@ func consume(queueName string) bool { } err = ch.Qos( - 0, // prefetch count + 80, // prefetch count 0, // prefetch size false, // global ) diff --git a/src/modules/rdb/rdb.go b/src/modules/rdb/rdb.go index 4004da28..b9be37d6 100644 --- a/src/modules/rdb/rdb.go +++ b/src/modules/rdb/rdb.go @@ -68,9 +68,9 @@ func main() { cron.InitWorker() // 初始化 rabbitmq 处理部分异步逻辑 - if len(config.Config.RabbitMQ.Addr) > 0 { + if config.Config.RabbitMQ.Enable { rabbitmq.Init(config.Config.RabbitMQ.Addr) - rabbitmq.Consume(config.Config.RabbitMQ.Queue) + go rabbitmq.Consume(config.Config.RabbitMQ.Addr, config.Config.RabbitMQ.Queue) } go cron.ConsumeMail() @@ -102,7 +102,7 @@ func endingProc() { http.Shutdown() redisc.CloseRedis() - if len(config.Config.RabbitMQ.Addr) > 0 { + if config.Config.RabbitMQ.Enable { rabbitmq.Shutdown() }