From ce8f72040be3850242f63c02393ebaf30907cb19 Mon Sep 17 00:00:00 2001 From: Sodesnei <1452401269@qq.com> Date: Sat, 9 Oct 2021 21:51:52 +0800 Subject: [PATCH] feat(middleware): add middleware work function --- pkg/middle/define.go | 16 ++++-- pkg/middle/middleware.go | 99 +++++++++++++++++++++++++++++++++++ pkg/middle/middleware_test.go | 23 ++++++++ 3 files changed, 134 insertions(+), 4 deletions(-) create mode 100644 pkg/middle/middleware.go create mode 100644 pkg/middle/middleware_test.go diff --git a/pkg/middle/define.go b/pkg/middle/define.go index 678c75b..2cb946a 100644 --- a/pkg/middle/define.go +++ b/pkg/middle/define.go @@ -1,7 +1,15 @@ package middle -import getMiddlewareMap "gitee.com/timedb/wheatCache/plugins/config" +import ( + "sync" +) -func Init() { - getMiddlewareMap.GetMiddlewareMap() -} +var ( + oneMiddle sync.Once + MiddleWareDriver *MiddleWare +) + +const ( + defaultConsumerCount = 5 + defaultDriverCount = 1000 +) diff --git a/pkg/middle/middleware.go b/pkg/middle/middleware.go new file mode 100644 index 0000000..eb5d615 --- /dev/null +++ b/pkg/middle/middleware.go @@ -0,0 +1,99 @@ +package middle + +import ( + "context" + + _ "gitee.com/timedb/wheatCache/conf" + "gitee.com/timedb/wheatCache/pkg/event" + "gitee.com/timedb/wheatCache/pkg/logx" + middleMsg "gitee.com/timedb/wheatCache/pkg/middle-msg" + "gitee.com/timedb/wheatCache/plugins" + "gitee.com/timedb/wheatCache/plugins/config" + "github.com/spf13/viper" +) + +type MiddleWare struct { + eventDriver event.DriverInterface + eventConsumer event.ConsumerInterface + eventProduce event.ProduceInterface + plugins map[string][]plugins.MiddleToolsInterface + consumerCount int +} + +func NewMiddleWare() *MiddleWare { + oneMiddle.Do(func() { + _, driverCount := loadConfigAndDefault() + + driver := event.NewDriver(driverCount) + MiddleWareDriver = &MiddleWare{ + eventDriver: driver, + eventConsumer: event.NewConsumer(driver), + eventProduce: event.NewProduce(driver), + } + MiddleWareDriver.loadPlugins() + + }) + return MiddleWareDriver +} + +func (m *MiddleWare) GetEventDriver() event.DriverInterface { + return m.eventDriver +} + +func (m *MiddleWare) loadPlugins() { + plug := viper.GetStringMapStringSlice("plugins-control") + + pluginsMap := config.GetMiddlewareMap() + + pluginsContext := make(map[string][]plugins.MiddleToolsInterface) + + for middleMsg, pluNames := range plug { + pulgSingle := make([]plugins.MiddleToolsInterface, 0) + for _, name := range pluNames { + pulgSingle = append(pulgSingle, pluginsMap[name]) + } + + pluginsContext[middleMsg] = pulgSingle + } + + m.plugins = pluginsContext +} + +func loadConfigAndDefault() (int, int) { + // 加载 consumerCount + consumerCount := viper.GetInt("middleware-driver.middleConsumerCount") + if consumerCount == 0 { + consumerCount = defaultConsumerCount + } + + driverCount := viper.GetInt("middleware-driver.driverCount") + if driverCount == 0 { + driverCount = defaultDriverCount + } + return consumerCount, driverCount + +} + +func (m *MiddleWare) startWork() { + + for i := 0; i < m.consumerCount; i++ { + go func() { + ctx := context.Background() + for { + workEvent := m.eventConsumer.Receive(ctx) + plugs := m.plugins[workEvent.GetEventName()] + + msg, ok := workEvent.GetValue(middleMsg.MiddleMsgKey) + if !ok { + logx.With(ctx, m.eventProduce).Error("get event value err,not key:%s", middleMsg.MiddleMsgKey) + continue + } + + for _, val := range plugs { + val.Exec(msg) + } + } + + }() + } +} diff --git a/pkg/middle/middleware_test.go b/pkg/middle/middleware_test.go new file mode 100644 index 0000000..610459c --- /dev/null +++ b/pkg/middle/middleware_test.go @@ -0,0 +1,23 @@ +package middle + +import ( + "testing" + + "gitee.com/timedb/wheatCache/pkg/event" + "github.com/stretchr/testify/require" +) + +func Test_middleware_driver(t *testing.T) { + middleware := NewMiddleWare() + require.Equal(t, middleware.plugins["logcontext"][0].Name(), "logMiddle") +} + +func TestWorker(t *testing.T) { + + event := event.NewEvent("logcontext") + + m := NewMiddleWare() + m.eventDriver.Put(event) + + m.startWork() +}