配置调整,连接检查 (#314)
* 1.rabbitmq 配置文件 2.连接检查 * 代码调整 * 启动协程 * rabbitmq 连接检查代码调优 * rabbitmq 连接检查代码调优 Co-authored-by: alickliming <alickliming@didi.global.com>
This commit is contained in:
parent
6e6771ff3b
commit
fe81c6cad7
|
@ -51,6 +51,11 @@ redis:
|
|||
read: 3000
|
||||
write: 3000
|
||||
|
||||
rabbitmq:
|
||||
enable: false
|
||||
addr: amqp://root:1234@127.0.0.1:5672/
|
||||
queue: test
|
||||
|
||||
sender:
|
||||
mail:
|
||||
# three choice: smtp|shell|api
|
||||
|
|
|
@ -76,6 +76,7 @@ type timeoutSection struct {
|
|||
}
|
||||
|
||||
type rabbitmqSection struct {
|
||||
Enable bool `yaml:"enable"`
|
||||
Addr string `yaml:"addr"`
|
||||
Queue string `yaml:"queue"`
|
||||
}
|
||||
|
|
|
@ -1,9 +1,11 @@
|
|||
package rabbitmq
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"log"
|
||||
|
||||
"github.com/streadway/amqp"
|
||||
"github.com/toolkits/pkg/logger"
|
||||
)
|
||||
|
||||
var (
|
||||
|
@ -19,6 +21,54 @@ func Init(url string) {
|
|||
}
|
||||
}
|
||||
|
||||
// ping 测试rabbitmq连接是否正常
|
||||
func ping() (err error) {
|
||||
if conn == nil {
|
||||
return fmt.Errorf("conn is nil")
|
||||
}
|
||||
|
||||
ch, err := conn.Channel()
|
||||
if err != nil {
|
||||
logger.Error(err)
|
||||
close()
|
||||
return err
|
||||
}
|
||||
|
||||
defer ch.Close()
|
||||
|
||||
err = ch.ExchangeDeclare("ping.ping", "topic", false, true, false, true, nil)
|
||||
if err != nil {
|
||||
logger.Error(err)
|
||||
return err
|
||||
}
|
||||
|
||||
msgContent := "ping.ping"
|
||||
err = ch.Publish("ping.ping", "ping.ping", false, false, amqp.Publishing{
|
||||
ContentType: "text/plain",
|
||||
Body: []byte(msgContent),
|
||||
})
|
||||
if err != nil {
|
||||
logger.Error(err)
|
||||
return err
|
||||
}
|
||||
err = ch.ExchangeDelete("ping.ping", false, false)
|
||||
if err != nil {
|
||||
logger.Error(err)
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
func close() {
|
||||
if conn != nil {
|
||||
err := conn.Close()
|
||||
if err != nil {
|
||||
logger.Error(err)
|
||||
}
|
||||
conn = nil
|
||||
}
|
||||
}
|
||||
|
||||
func Shutdown() {
|
||||
conn.Close()
|
||||
exit <- true
|
||||
|
|
|
@ -3,26 +3,42 @@ package rabbitmq
|
|||
import (
|
||||
"time"
|
||||
|
||||
"github.com/streadway/amqp"
|
||||
"github.com/toolkits/pkg/logger"
|
||||
)
|
||||
|
||||
func Consume(queueName string) {
|
||||
go func(queueName string) {
|
||||
// Consume 消费消息
|
||||
func Consume(url, queueName string) {
|
||||
for {
|
||||
select {
|
||||
case <-exit:
|
||||
return
|
||||
default:
|
||||
if err := ping(); err != nil {
|
||||
logger.Error("rabbitmq conn failed")
|
||||
conn, err = amqp.Dial(url)
|
||||
if err != nil {
|
||||
conn = nil
|
||||
logger.Error(err)
|
||||
time.Sleep(500 * time.Millisecond)
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
sleep := consume(queueName)
|
||||
if sleep {
|
||||
time.Sleep(300 * time.Millisecond)
|
||||
}
|
||||
|
||||
if _, ok := <-exit; ok {
|
||||
return
|
||||
}
|
||||
}
|
||||
}(queueName)
|
||||
}
|
||||
|
||||
// 如果操作MQ出现问题,或者没有load到数据,就sleep一下
|
||||
func consume(queueName string) bool {
|
||||
if conn == nil {
|
||||
return true
|
||||
}
|
||||
|
||||
ch, err := conn.Channel()
|
||||
if err != nil {
|
||||
logger.Error(err)
|
||||
|
@ -45,7 +61,7 @@ func consume(queueName string) bool {
|
|||
}
|
||||
|
||||
err = ch.Qos(
|
||||
0, // prefetch count
|
||||
80, // prefetch count
|
||||
0, // prefetch size
|
||||
false, // global
|
||||
)
|
||||
|
|
|
@ -68,9 +68,9 @@ func main() {
|
|||
cron.InitWorker()
|
||||
|
||||
// 初始化 rabbitmq 处理部分异步逻辑
|
||||
if len(config.Config.RabbitMQ.Addr) > 0 {
|
||||
if config.Config.RabbitMQ.Enable {
|
||||
rabbitmq.Init(config.Config.RabbitMQ.Addr)
|
||||
rabbitmq.Consume(config.Config.RabbitMQ.Queue)
|
||||
go rabbitmq.Consume(config.Config.RabbitMQ.Addr, config.Config.RabbitMQ.Queue)
|
||||
}
|
||||
|
||||
go cron.ConsumeMail()
|
||||
|
@ -102,7 +102,7 @@ func endingProc() {
|
|||
http.Shutdown()
|
||||
redisc.CloseRedis()
|
||||
|
||||
if len(config.Config.RabbitMQ.Addr) > 0 {
|
||||
if config.Config.RabbitMQ.Enable {
|
||||
rabbitmq.Shutdown()
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue