feat(middle): update middle driver

This commit is contained in:
Sodesnei 2021-10-10 14:09:09 +08:00
parent febd5501c4
commit 95f165fb55
2 changed files with 21 additions and 13 deletions

View File

@ -18,22 +18,27 @@ type MiddleWare struct {
eventProduce event.ProduceInterface
plugins map[string][]plugins.MiddleToolsInterface
consumerCount int
driverCount int
}
func NewMiddleWare() *MiddleWare {
oneMiddle.Do(func() {
_, driverCount := loadConfigAndDefault()
consumerCount, driverCount := loadConfigAndDefault()
driver := event.NewDriver(driverCount)
MiddleWareDriver = &MiddleWare{
middleWareDriver = &MiddleWare{
eventDriver: driver,
eventConsumer: event.NewConsumer(driver),
eventProduce: event.NewProduce(driver),
driverCount: driverCount,
consumerCount: consumerCount,
}
MiddleWareDriver.loadPlugins()
middleWareDriver.loadPlugins()
middleWareDriver.startWork()
})
return MiddleWareDriver
return middleWareDriver
}
func (m *MiddleWare) GetEventDriver() event.DriverInterface {
@ -61,17 +66,15 @@ func (m *MiddleWare) loadPlugins() {
func loadConfigAndDefault() (int, int) {
// 加载 consumerCount
consumerCount := viper.GetInt("middleware-driver.middleConsumerCount")
consumerCount := viper.GetInt("middle-driver.middleConsumerCount")
if consumerCount == 0 {
consumerCount = defaultConsumerCount
}
driverCount := viper.GetInt("middleware-driver.driverCount")
driverCount := viper.GetInt("middle-driver.driverCount")
if driverCount == 0 {
driverCount = defaultDriverCount
}
return consumerCount, driverCount
}
func (m *MiddleWare) startWork() {
@ -82,18 +85,15 @@ func (m *MiddleWare) startWork() {
for {
workEvent := m.eventConsumer.Receive(ctx)
plugs := m.plugins[workEvent.GetEventName()]
msg, ok := workEvent.GetValue(middleMsg.MiddleMsgKey)
if !ok {
logx.With(ctx, m.eventProduce).Error("get event value errnot key:%s", middleMsg.MiddleMsgKey)
continue
}
for _, val := range plugs {
val.Exec(msg)
}
}
}()
}
}

View File

@ -1,23 +1,31 @@
package middle
import (
"context"
"testing"
"time"
"gitee.com/timedb/wheatCache/pkg/event"
middleMsg "gitee.com/timedb/wheatCache/pkg/middle-msg"
"github.com/stretchr/testify/require"
)
func Test_middleware_driver(t *testing.T) {
ctx := context.Background()
middleware := NewMiddleWare()
require.Equal(t, middleware.plugins["logcontext"][0].Name(), "logMiddle")
event := event.NewEvent("logcontext")
event.SetValue(middleMsg.MiddleMsgKey, "123")
middleware.eventProduce.Call(ctx, event)
require.Equal(t, middleware.consumerCount, 5)
time.Sleep(1 * time.Second)
}
func TestWorker(t *testing.T) {
// ctx := context.Background()
event := event.NewEvent("logcontext")
m := NewMiddleWare()
m.eventDriver.Put(event)
m.startWork()
}