2021-10-09 21:51:52 +08:00
|
|
|
|
package middle
|
|
|
|
|
|
|
|
|
|
import (
|
|
|
|
|
"context"
|
|
|
|
|
|
|
|
|
|
_ "gitee.com/timedb/wheatCache/conf"
|
|
|
|
|
"gitee.com/timedb/wheatCache/pkg/event"
|
|
|
|
|
"gitee.com/timedb/wheatCache/pkg/logx"
|
|
|
|
|
middleMsg "gitee.com/timedb/wheatCache/pkg/middle-msg"
|
|
|
|
|
"gitee.com/timedb/wheatCache/plugins"
|
|
|
|
|
"gitee.com/timedb/wheatCache/plugins/config"
|
|
|
|
|
"github.com/spf13/viper"
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
type MiddleWare struct {
|
|
|
|
|
eventDriver event.DriverInterface
|
|
|
|
|
eventConsumer event.ConsumerInterface
|
|
|
|
|
eventProduce event.ProduceInterface
|
|
|
|
|
plugins map[string][]plugins.MiddleToolsInterface
|
|
|
|
|
consumerCount int
|
2021-10-10 14:09:09 +08:00
|
|
|
|
driverCount int
|
2021-10-09 21:51:52 +08:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func NewMiddleWare() *MiddleWare {
|
|
|
|
|
oneMiddle.Do(func() {
|
2021-10-10 14:09:09 +08:00
|
|
|
|
consumerCount, driverCount := loadConfigAndDefault()
|
2021-10-09 21:51:52 +08:00
|
|
|
|
|
|
|
|
|
driver := event.NewDriver(driverCount)
|
2021-10-10 14:09:09 +08:00
|
|
|
|
middleWareDriver = &MiddleWare{
|
2021-10-09 21:51:52 +08:00
|
|
|
|
eventDriver: driver,
|
|
|
|
|
eventConsumer: event.NewConsumer(driver),
|
|
|
|
|
eventProduce: event.NewProduce(driver),
|
2021-10-10 14:09:09 +08:00
|
|
|
|
driverCount: driverCount,
|
|
|
|
|
consumerCount: consumerCount,
|
2021-10-09 21:51:52 +08:00
|
|
|
|
}
|
2021-10-10 14:09:09 +08:00
|
|
|
|
middleWareDriver.loadPlugins()
|
|
|
|
|
|
|
|
|
|
middleWareDriver.startWork()
|
2021-10-09 21:51:52 +08:00
|
|
|
|
|
|
|
|
|
})
|
2021-10-10 14:09:09 +08:00
|
|
|
|
return middleWareDriver
|
2021-10-09 21:51:52 +08:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (m *MiddleWare) GetEventDriver() event.DriverInterface {
|
|
|
|
|
return m.eventDriver
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (m *MiddleWare) loadPlugins() {
|
|
|
|
|
plug := viper.GetStringMapStringSlice("plugins-control")
|
|
|
|
|
|
|
|
|
|
pluginsMap := config.GetMiddlewareMap()
|
|
|
|
|
|
|
|
|
|
pluginsContext := make(map[string][]plugins.MiddleToolsInterface)
|
|
|
|
|
|
|
|
|
|
for middleMsg, pluNames := range plug {
|
|
|
|
|
pulgSingle := make([]plugins.MiddleToolsInterface, 0)
|
|
|
|
|
for _, name := range pluNames {
|
|
|
|
|
pulgSingle = append(pulgSingle, pluginsMap[name])
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
pluginsContext[middleMsg] = pulgSingle
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
m.plugins = pluginsContext
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func loadConfigAndDefault() (int, int) {
|
|
|
|
|
// 加载 consumerCount
|
2021-10-10 14:09:09 +08:00
|
|
|
|
consumerCount := viper.GetInt("middle-driver.middleConsumerCount")
|
2021-10-09 21:51:52 +08:00
|
|
|
|
if consumerCount == 0 {
|
|
|
|
|
consumerCount = defaultConsumerCount
|
|
|
|
|
}
|
2021-10-10 14:09:09 +08:00
|
|
|
|
driverCount := viper.GetInt("middle-driver.driverCount")
|
2021-10-09 21:51:52 +08:00
|
|
|
|
if driverCount == 0 {
|
|
|
|
|
driverCount = defaultDriverCount
|
|
|
|
|
}
|
|
|
|
|
return consumerCount, driverCount
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (m *MiddleWare) startWork() {
|
|
|
|
|
|
|
|
|
|
for i := 0; i < m.consumerCount; i++ {
|
|
|
|
|
go func() {
|
|
|
|
|
ctx := context.Background()
|
|
|
|
|
for {
|
|
|
|
|
workEvent := m.eventConsumer.Receive(ctx)
|
|
|
|
|
plugs := m.plugins[workEvent.GetEventName()]
|
|
|
|
|
msg, ok := workEvent.GetValue(middleMsg.MiddleMsgKey)
|
|
|
|
|
if !ok {
|
|
|
|
|
logx.With(ctx, m.eventProduce).Error("get event value err,not key:%s", middleMsg.MiddleMsgKey)
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
for _, val := range plugs {
|
|
|
|
|
val.Exec(msg)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}()
|
|
|
|
|
}
|
|
|
|
|
}
|