From 2e045170660f741491714644e7e1c30db2de5f3c Mon Sep 17 00:00:00 2001 From: Sodesnei <1452401269@qq.com> Date: Sat, 9 Oct 2021 21:49:16 +0800 Subject: [PATCH 01/10] feat(conf): update wheat-cache.yaml --- conf/wheat-cache.yaml | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/conf/wheat-cache.yaml b/conf/wheat-cache.yaml index 981dc43..33925a8 100644 --- a/conf/wheat-cache.yaml +++ b/conf/wheat-cache.yaml @@ -14,8 +14,16 @@ lruCache: eventDriverSize: 2000 workTime: 1 + logPrint: stath: [ "debug", "error" ] + +middleware-driver: + driverCount: 1000 + middleConsumerCount: 5 + +plugins-control: + logcontext: ["logMiddle"] From 329513bd98432cfc0e78ec812959433b0b3610d9 Mon Sep 17 00:00:00 2001 From: Sodesnei <1452401269@qq.com> Date: Sat, 9 Oct 2021 21:49:55 +0800 Subject: [PATCH 02/10] feat(middle-msg): updatae logx --- pkg/logx/logx.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/logx/logx.go b/pkg/logx/logx.go index a173a73..18b40d0 100644 --- a/pkg/logx/logx.go +++ b/pkg/logx/logx.go @@ -45,7 +45,7 @@ func (l *upLogger) Print(level string, format string, msg ...interface{}) { Print(level, format, msg...) eventMiddle := event.NewEvent(middleMsg.EventNameLog) - eventMiddle.SetValue(middleMsg.EventKeyLog, middleMsg.LogContext{ + eventMiddle.SetValue(middleMsg.MiddleMsgKey, middleMsg.LogContext{ Level: level, Data: time.Now(), Msg: fmt.Sprintf(format, msg...), From ce8f72040be3850242f63c02393ebaf30907cb19 Mon Sep 17 00:00:00 2001 From: Sodesnei <1452401269@qq.com> Date: Sat, 9 Oct 2021 21:51:52 +0800 Subject: [PATCH 03/10] feat(middleware): add middleware work function --- pkg/middle/define.go | 16 ++++-- pkg/middle/middleware.go | 99 +++++++++++++++++++++++++++++++++++ pkg/middle/middleware_test.go | 23 ++++++++ 3 files changed, 134 insertions(+), 4 deletions(-) create mode 100644 pkg/middle/middleware.go create mode 100644 pkg/middle/middleware_test.go diff --git a/pkg/middle/define.go b/pkg/middle/define.go index 678c75b..2cb946a 100644 --- a/pkg/middle/define.go +++ b/pkg/middle/define.go @@ -1,7 +1,15 @@ package middle -import getMiddlewareMap "gitee.com/timedb/wheatCache/plugins/config" +import ( + "sync" +) -func Init() { - getMiddlewareMap.GetMiddlewareMap() -} +var ( + oneMiddle sync.Once + MiddleWareDriver *MiddleWare +) + +const ( + defaultConsumerCount = 5 + defaultDriverCount = 1000 +) diff --git a/pkg/middle/middleware.go b/pkg/middle/middleware.go new file mode 100644 index 0000000..eb5d615 --- /dev/null +++ b/pkg/middle/middleware.go @@ -0,0 +1,99 @@ +package middle + +import ( + "context" + + _ "gitee.com/timedb/wheatCache/conf" + "gitee.com/timedb/wheatCache/pkg/event" + "gitee.com/timedb/wheatCache/pkg/logx" + middleMsg "gitee.com/timedb/wheatCache/pkg/middle-msg" + "gitee.com/timedb/wheatCache/plugins" + "gitee.com/timedb/wheatCache/plugins/config" + "github.com/spf13/viper" +) + +type MiddleWare struct { + eventDriver event.DriverInterface + eventConsumer event.ConsumerInterface + eventProduce event.ProduceInterface + plugins map[string][]plugins.MiddleToolsInterface + consumerCount int +} + +func NewMiddleWare() *MiddleWare { + oneMiddle.Do(func() { + _, driverCount := loadConfigAndDefault() + + driver := event.NewDriver(driverCount) + MiddleWareDriver = &MiddleWare{ + eventDriver: driver, + eventConsumer: event.NewConsumer(driver), + eventProduce: event.NewProduce(driver), + } + MiddleWareDriver.loadPlugins() + + }) + return MiddleWareDriver +} + +func (m *MiddleWare) GetEventDriver() event.DriverInterface { + return m.eventDriver +} + +func (m *MiddleWare) loadPlugins() { + plug := viper.GetStringMapStringSlice("plugins-control") + + pluginsMap := config.GetMiddlewareMap() + + pluginsContext := make(map[string][]plugins.MiddleToolsInterface) + + for middleMsg, pluNames := range plug { + pulgSingle := make([]plugins.MiddleToolsInterface, 0) + for _, name := range pluNames { + pulgSingle = append(pulgSingle, pluginsMap[name]) + } + + pluginsContext[middleMsg] = pulgSingle + } + + m.plugins = pluginsContext +} + +func loadConfigAndDefault() (int, int) { + // 加载 consumerCount + consumerCount := viper.GetInt("middleware-driver.middleConsumerCount") + if consumerCount == 0 { + consumerCount = defaultConsumerCount + } + + driverCount := viper.GetInt("middleware-driver.driverCount") + if driverCount == 0 { + driverCount = defaultDriverCount + } + return consumerCount, driverCount + +} + +func (m *MiddleWare) startWork() { + + for i := 0; i < m.consumerCount; i++ { + go func() { + ctx := context.Background() + for { + workEvent := m.eventConsumer.Receive(ctx) + plugs := m.plugins[workEvent.GetEventName()] + + msg, ok := workEvent.GetValue(middleMsg.MiddleMsgKey) + if !ok { + logx.With(ctx, m.eventProduce).Error("get event value err,not key:%s", middleMsg.MiddleMsgKey) + continue + } + + for _, val := range plugs { + val.Exec(msg) + } + } + + }() + } +} diff --git a/pkg/middle/middleware_test.go b/pkg/middle/middleware_test.go new file mode 100644 index 0000000..610459c --- /dev/null +++ b/pkg/middle/middleware_test.go @@ -0,0 +1,23 @@ +package middle + +import ( + "testing" + + "gitee.com/timedb/wheatCache/pkg/event" + "github.com/stretchr/testify/require" +) + +func Test_middleware_driver(t *testing.T) { + middleware := NewMiddleWare() + require.Equal(t, middleware.plugins["logcontext"][0].Name(), "logMiddle") +} + +func TestWorker(t *testing.T) { + + event := event.NewEvent("logcontext") + + m := NewMiddleWare() + m.eventDriver.Put(event) + + m.startWork() +} From 81d3bb84345140c31d7ea1e5e87d6e82e9a5b050 Mon Sep 17 00:00:00 2001 From: Sodesnei <1452401269@qq.com> Date: Sat, 9 Oct 2021 21:54:26 +0800 Subject: [PATCH 04/10] feat(middle-msg): update base parameter --- pkg/middle-msg/define.go | 5 +++++ pkg/middle-msg/logx.go | 4 +--- plugins/define.go | 1 + 3 files changed, 7 insertions(+), 3 deletions(-) create mode 100644 pkg/middle-msg/define.go diff --git a/pkg/middle-msg/define.go b/pkg/middle-msg/define.go new file mode 100644 index 0000000..f7621e9 --- /dev/null +++ b/pkg/middle-msg/define.go @@ -0,0 +1,5 @@ +package middle_msg + +const ( + MiddleMsgKey = "middleMsgKey" +) diff --git a/pkg/middle-msg/logx.go b/pkg/middle-msg/logx.go index 67d887d..90b2340 100644 --- a/pkg/middle-msg/logx.go +++ b/pkg/middle-msg/logx.go @@ -3,9 +3,7 @@ package middle_msg import "time" var ( - EventNameLog = "LogContext" - - EventKeyLog = "LogContext" + EventNameLog = "logcontext" ) type LogContext struct { diff --git a/plugins/define.go b/plugins/define.go index 37538c9..922dc34 100644 --- a/plugins/define.go +++ b/plugins/define.go @@ -4,4 +4,5 @@ type MiddleToolsInterface interface { Init() // 初始化 Exec(interface{}) (interface{}, error) // 处理用户发送事件 Name() string // 获取中间件名称 + Describe() string // 描述 } From 99c3c56a5b4a0feb47606124b68e96ae267625df Mon Sep 17 00:00:00 2001 From: Sodesnei <1452401269@qq.com> Date: Sat, 9 Oct 2021 21:55:16 +0800 Subject: [PATCH 05/10] feat(middle-msg): add pluginsInfo --- pkg/middle-msg/plugins.go | 8 ++++++++ 1 file changed, 8 insertions(+) create mode 100644 pkg/middle-msg/plugins.go diff --git a/pkg/middle-msg/plugins.go b/pkg/middle-msg/plugins.go new file mode 100644 index 0000000..5a80ec8 --- /dev/null +++ b/pkg/middle-msg/plugins.go @@ -0,0 +1,8 @@ +package middle_msg + +// []pulginsINfo +// 1. Version +// 2. desc +// 3. Name +// 4. 运行状态 +// 5. 运行时间 From b182d7602dec73cce7e7a09bb29cf5e30905cbef Mon Sep 17 00:00:00 2001 From: Sodesnei <1452401269@qq.com> Date: Sat, 9 Oct 2021 21:55:39 +0800 Subject: [PATCH 06/10] feat(plugins): updata plugins --- plugins/log-middle/middleware.go | 9 +++++---- plugins/map-key/middleware.go | 6 +++++- 2 files changed, 10 insertions(+), 5 deletions(-) diff --git a/plugins/log-middle/middleware.go b/plugins/log-middle/middleware.go index c6ff102..ae7c0bb 100644 --- a/plugins/log-middle/middleware.go +++ b/plugins/log-middle/middleware.go @@ -1,8 +1,6 @@ package log_middle import ( - "fmt" - "gitee.com/timedb/wheatCache/plugins" ) @@ -13,14 +11,17 @@ func (i *logMiddle) Init() { } func (i *logMiddle) Exec(interface{}) (interface{}, error) { - fmt.Println(1) - return nil, nil + + return "", nil } func (i *logMiddle) Name() string { return "logMiddle" } +func (i *logMiddle) Describe() string { + return "" +} func NewMiddleware() plugins.MiddleToolsInterface { return &logMiddle{} } diff --git a/plugins/map-key/middleware.go b/plugins/map-key/middleware.go index 31fa509..088c589 100644 --- a/plugins/map-key/middleware.go +++ b/plugins/map-key/middleware.go @@ -12,13 +12,17 @@ func (i *mapKey) Init() { func (i *mapKey) Exec(interface{}) (interface{}, error) { - return nil, nil + return "", nil } func (i *mapKey) Name() string { return "mapKey" } +func (i *mapKey) Describe() string { + return "" +} + func NewMiddleware() plugins.MiddleToolsInterface { return &mapKey{} } From 4d9558c21f504cca05468144127bdc4672cf5ff9 Mon Sep 17 00:00:00 2001 From: Sodesnei <1452401269@qq.com> Date: Sat, 9 Oct 2021 21:56:43 +0800 Subject: [PATCH 07/10] feat(conf): add conf single test --- conf/public_conf_test.go | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/conf/public_conf_test.go b/conf/public_conf_test.go index 9de01aa..263f438 100644 --- a/conf/public_conf_test.go +++ b/conf/public_conf_test.go @@ -1,6 +1,7 @@ package conf import ( + "fmt" "testing" "github.com/spf13/viper" @@ -8,6 +9,7 @@ import ( ) func TestConf(t *testing.T) { + // 外部导入 conf.yaml 需要导入 conf 包 // 每次迁移文件时, 使用 sudo make init-conf来将yam文件迁移到指定的文件夹下 // get 使用, 读取 public_conf 配置文件 @@ -22,3 +24,20 @@ func TestConf(t *testing.T) { host := viper.GetString("host") require.Equal(t, host, "1222") } + +func TestMiddleConf(t *testing.T) { + ct := viper.GetStringMap("plugins-control") + + fmt.Println(ct) + + d := viper.GetInt("middleware-driver.driverCount") + require.Equal(t, d, 1000) + c := viper.GetInt("middleware-driver.middleConsumerCount") + require.Equal(t, c, 5) + + p := viper.GetStringMap("plugins-control") + for key, val := range p { + fmt.Println(key, val) + } + // fmt.Println(p) +} From febd5501c4efa785c7c8770609fb32d0a0c49772 Mon Sep 17 00:00:00 2001 From: Sodesnei <1452401269@qq.com> Date: Sun, 10 Oct 2021 14:05:01 +0800 Subject: [PATCH 08/10] feat(conf): updata conf test --- conf/public_conf_test.go | 12 ++---------- 1 file changed, 2 insertions(+), 10 deletions(-) diff --git a/conf/public_conf_test.go b/conf/public_conf_test.go index 263f438..9c519d2 100644 --- a/conf/public_conf_test.go +++ b/conf/public_conf_test.go @@ -1,7 +1,6 @@ package conf import ( - "fmt" "testing" "github.com/spf13/viper" @@ -26,18 +25,11 @@ func TestConf(t *testing.T) { } func TestMiddleConf(t *testing.T) { - ct := viper.GetStringMap("plugins-control") - - fmt.Println(ct) + ct := viper.GetStringSlice("plugins-control.logcontext") + require.Equal(t, ct, []string{"logMiddle"}) d := viper.GetInt("middleware-driver.driverCount") require.Equal(t, d, 1000) c := viper.GetInt("middleware-driver.middleConsumerCount") require.Equal(t, c, 5) - - p := viper.GetStringMap("plugins-control") - for key, val := range p { - fmt.Println(key, val) - } - // fmt.Println(p) } From 95f165fb55eb3c5dbd23be2d23161a5041d76de5 Mon Sep 17 00:00:00 2001 From: Sodesnei <1452401269@qq.com> Date: Sun, 10 Oct 2021 14:09:09 +0800 Subject: [PATCH 09/10] feat(middle): update middle driver --- pkg/middle/middleware.go | 22 +++++++++++----------- pkg/middle/middleware_test.go | 12 ++++++++++-- 2 files changed, 21 insertions(+), 13 deletions(-) diff --git a/pkg/middle/middleware.go b/pkg/middle/middleware.go index eb5d615..69aa18d 100644 --- a/pkg/middle/middleware.go +++ b/pkg/middle/middleware.go @@ -18,22 +18,27 @@ type MiddleWare struct { eventProduce event.ProduceInterface plugins map[string][]plugins.MiddleToolsInterface consumerCount int + driverCount int } func NewMiddleWare() *MiddleWare { oneMiddle.Do(func() { - _, driverCount := loadConfigAndDefault() + consumerCount, driverCount := loadConfigAndDefault() driver := event.NewDriver(driverCount) - MiddleWareDriver = &MiddleWare{ + middleWareDriver = &MiddleWare{ eventDriver: driver, eventConsumer: event.NewConsumer(driver), eventProduce: event.NewProduce(driver), + driverCount: driverCount, + consumerCount: consumerCount, } - MiddleWareDriver.loadPlugins() + middleWareDriver.loadPlugins() + + middleWareDriver.startWork() }) - return MiddleWareDriver + return middleWareDriver } func (m *MiddleWare) GetEventDriver() event.DriverInterface { @@ -61,17 +66,15 @@ func (m *MiddleWare) loadPlugins() { func loadConfigAndDefault() (int, int) { // 加载 consumerCount - consumerCount := viper.GetInt("middleware-driver.middleConsumerCount") + consumerCount := viper.GetInt("middle-driver.middleConsumerCount") if consumerCount == 0 { consumerCount = defaultConsumerCount } - - driverCount := viper.GetInt("middleware-driver.driverCount") + driverCount := viper.GetInt("middle-driver.driverCount") if driverCount == 0 { driverCount = defaultDriverCount } return consumerCount, driverCount - } func (m *MiddleWare) startWork() { @@ -82,18 +85,15 @@ func (m *MiddleWare) startWork() { for { workEvent := m.eventConsumer.Receive(ctx) plugs := m.plugins[workEvent.GetEventName()] - msg, ok := workEvent.GetValue(middleMsg.MiddleMsgKey) if !ok { logx.With(ctx, m.eventProduce).Error("get event value err,not key:%s", middleMsg.MiddleMsgKey) continue } - for _, val := range plugs { val.Exec(msg) } } - }() } } diff --git a/pkg/middle/middleware_test.go b/pkg/middle/middleware_test.go index 610459c..df94f50 100644 --- a/pkg/middle/middleware_test.go +++ b/pkg/middle/middleware_test.go @@ -1,23 +1,31 @@ package middle import ( + "context" "testing" + "time" "gitee.com/timedb/wheatCache/pkg/event" + middleMsg "gitee.com/timedb/wheatCache/pkg/middle-msg" "github.com/stretchr/testify/require" ) func Test_middleware_driver(t *testing.T) { + ctx := context.Background() middleware := NewMiddleWare() require.Equal(t, middleware.plugins["logcontext"][0].Name(), "logMiddle") + event := event.NewEvent("logcontext") + event.SetValue(middleMsg.MiddleMsgKey, "123") + middleware.eventProduce.Call(ctx, event) + require.Equal(t, middleware.consumerCount, 5) + time.Sleep(1 * time.Second) } func TestWorker(t *testing.T) { - + // ctx := context.Background() event := event.NewEvent("logcontext") m := NewMiddleWare() m.eventDriver.Put(event) - m.startWork() } From debde8a66a7bed22a8d25950952447493e9435fa Mon Sep 17 00:00:00 2001 From: Sodesnei <1452401269@qq.com> Date: Sun, 10 Oct 2021 14:10:21 +0800 Subject: [PATCH 10/10] feat(middle): update middle define var --- pkg/middle/define.go | 2 +- plugins/log-middle/middleware.go | 4 +++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/pkg/middle/define.go b/pkg/middle/define.go index 2cb946a..72caaa6 100644 --- a/pkg/middle/define.go +++ b/pkg/middle/define.go @@ -6,7 +6,7 @@ import ( var ( oneMiddle sync.Once - MiddleWareDriver *MiddleWare + middleWareDriver *MiddleWare ) const ( diff --git a/plugins/log-middle/middleware.go b/plugins/log-middle/middleware.go index ae7c0bb..dbdb6fe 100644 --- a/plugins/log-middle/middleware.go +++ b/plugins/log-middle/middleware.go @@ -1,6 +1,8 @@ package log_middle import ( + "fmt" + "gitee.com/timedb/wheatCache/plugins" ) @@ -11,7 +13,7 @@ func (i *logMiddle) Init() { } func (i *logMiddle) Exec(interface{}) (interface{}, error) { - + fmt.Println("logMiddle") return "", nil }