diff --git a/conf/public_conf_test.go b/conf/public_conf_test.go index 9de01aa..9c519d2 100644 --- a/conf/public_conf_test.go +++ b/conf/public_conf_test.go @@ -8,6 +8,7 @@ import ( ) func TestConf(t *testing.T) { + // 外部导入 conf.yaml 需要导入 conf 包 // 每次迁移文件时, 使用 sudo make init-conf来将yam文件迁移到指定的文件夹下 // get 使用, 读取 public_conf 配置文件 @@ -22,3 +23,13 @@ func TestConf(t *testing.T) { host := viper.GetString("host") require.Equal(t, host, "1222") } + +func TestMiddleConf(t *testing.T) { + ct := viper.GetStringSlice("plugins-control.logcontext") + require.Equal(t, ct, []string{"logMiddle"}) + + d := viper.GetInt("middleware-driver.driverCount") + require.Equal(t, d, 1000) + c := viper.GetInt("middleware-driver.middleConsumerCount") + require.Equal(t, c, 5) +} diff --git a/conf/wheat-cache.yaml b/conf/wheat-cache.yaml index 981dc43..33925a8 100644 --- a/conf/wheat-cache.yaml +++ b/conf/wheat-cache.yaml @@ -14,8 +14,16 @@ lruCache: eventDriverSize: 2000 workTime: 1 + logPrint: stath: [ "debug", "error" ] + +middleware-driver: + driverCount: 1000 + middleConsumerCount: 5 + +plugins-control: + logcontext: ["logMiddle"] diff --git a/pkg/logx/logx.go b/pkg/logx/logx.go index a173a73..18b40d0 100644 --- a/pkg/logx/logx.go +++ b/pkg/logx/logx.go @@ -45,7 +45,7 @@ func (l *upLogger) Print(level string, format string, msg ...interface{}) { Print(level, format, msg...) eventMiddle := event.NewEvent(middleMsg.EventNameLog) - eventMiddle.SetValue(middleMsg.EventKeyLog, middleMsg.LogContext{ + eventMiddle.SetValue(middleMsg.MiddleMsgKey, middleMsg.LogContext{ Level: level, Data: time.Now(), Msg: fmt.Sprintf(format, msg...), diff --git a/pkg/middle-msg/define.go b/pkg/middle-msg/define.go new file mode 100644 index 0000000..f7621e9 --- /dev/null +++ b/pkg/middle-msg/define.go @@ -0,0 +1,5 @@ +package middle_msg + +const ( + MiddleMsgKey = "middleMsgKey" +) diff --git a/pkg/middle-msg/logx.go b/pkg/middle-msg/logx.go index 67d887d..90b2340 100644 --- a/pkg/middle-msg/logx.go +++ b/pkg/middle-msg/logx.go @@ -3,9 +3,7 @@ package middle_msg import "time" var ( - EventNameLog = "LogContext" - - EventKeyLog = "LogContext" + EventNameLog = "logcontext" ) type LogContext struct { diff --git a/pkg/middle-msg/plugins.go b/pkg/middle-msg/plugins.go new file mode 100644 index 0000000..5a80ec8 --- /dev/null +++ b/pkg/middle-msg/plugins.go @@ -0,0 +1,8 @@ +package middle_msg + +// []pulginsINfo +// 1. Version +// 2. desc +// 3. Name +// 4. 运行状态 +// 5. 运行时间 diff --git a/pkg/middle/define.go b/pkg/middle/define.go index 678c75b..72caaa6 100644 --- a/pkg/middle/define.go +++ b/pkg/middle/define.go @@ -1,7 +1,15 @@ package middle -import getMiddlewareMap "gitee.com/timedb/wheatCache/plugins/config" +import ( + "sync" +) -func Init() { - getMiddlewareMap.GetMiddlewareMap() -} +var ( + oneMiddle sync.Once + middleWareDriver *MiddleWare +) + +const ( + defaultConsumerCount = 5 + defaultDriverCount = 1000 +) diff --git a/pkg/middle/middleware.go b/pkg/middle/middleware.go new file mode 100644 index 0000000..69aa18d --- /dev/null +++ b/pkg/middle/middleware.go @@ -0,0 +1,99 @@ +package middle + +import ( + "context" + + _ "gitee.com/timedb/wheatCache/conf" + "gitee.com/timedb/wheatCache/pkg/event" + "gitee.com/timedb/wheatCache/pkg/logx" + middleMsg "gitee.com/timedb/wheatCache/pkg/middle-msg" + "gitee.com/timedb/wheatCache/plugins" + "gitee.com/timedb/wheatCache/plugins/config" + "github.com/spf13/viper" +) + +type MiddleWare struct { + eventDriver event.DriverInterface + eventConsumer event.ConsumerInterface + eventProduce event.ProduceInterface + plugins map[string][]plugins.MiddleToolsInterface + consumerCount int + driverCount int +} + +func NewMiddleWare() *MiddleWare { + oneMiddle.Do(func() { + consumerCount, driverCount := loadConfigAndDefault() + + driver := event.NewDriver(driverCount) + middleWareDriver = &MiddleWare{ + eventDriver: driver, + eventConsumer: event.NewConsumer(driver), + eventProduce: event.NewProduce(driver), + driverCount: driverCount, + consumerCount: consumerCount, + } + middleWareDriver.loadPlugins() + + middleWareDriver.startWork() + + }) + return middleWareDriver +} + +func (m *MiddleWare) GetEventDriver() event.DriverInterface { + return m.eventDriver +} + +func (m *MiddleWare) loadPlugins() { + plug := viper.GetStringMapStringSlice("plugins-control") + + pluginsMap := config.GetMiddlewareMap() + + pluginsContext := make(map[string][]plugins.MiddleToolsInterface) + + for middleMsg, pluNames := range plug { + pulgSingle := make([]plugins.MiddleToolsInterface, 0) + for _, name := range pluNames { + pulgSingle = append(pulgSingle, pluginsMap[name]) + } + + pluginsContext[middleMsg] = pulgSingle + } + + m.plugins = pluginsContext +} + +func loadConfigAndDefault() (int, int) { + // 加载 consumerCount + consumerCount := viper.GetInt("middle-driver.middleConsumerCount") + if consumerCount == 0 { + consumerCount = defaultConsumerCount + } + driverCount := viper.GetInt("middle-driver.driverCount") + if driverCount == 0 { + driverCount = defaultDriverCount + } + return consumerCount, driverCount +} + +func (m *MiddleWare) startWork() { + + for i := 0; i < m.consumerCount; i++ { + go func() { + ctx := context.Background() + for { + workEvent := m.eventConsumer.Receive(ctx) + plugs := m.plugins[workEvent.GetEventName()] + msg, ok := workEvent.GetValue(middleMsg.MiddleMsgKey) + if !ok { + logx.With(ctx, m.eventProduce).Error("get event value err,not key:%s", middleMsg.MiddleMsgKey) + continue + } + for _, val := range plugs { + val.Exec(msg) + } + } + }() + } +} diff --git a/pkg/middle/middleware_test.go b/pkg/middle/middleware_test.go new file mode 100644 index 0000000..df94f50 --- /dev/null +++ b/pkg/middle/middleware_test.go @@ -0,0 +1,31 @@ +package middle + +import ( + "context" + "testing" + "time" + + "gitee.com/timedb/wheatCache/pkg/event" + middleMsg "gitee.com/timedb/wheatCache/pkg/middle-msg" + "github.com/stretchr/testify/require" +) + +func Test_middleware_driver(t *testing.T) { + ctx := context.Background() + middleware := NewMiddleWare() + require.Equal(t, middleware.plugins["logcontext"][0].Name(), "logMiddle") + event := event.NewEvent("logcontext") + event.SetValue(middleMsg.MiddleMsgKey, "123") + middleware.eventProduce.Call(ctx, event) + require.Equal(t, middleware.consumerCount, 5) + time.Sleep(1 * time.Second) +} + +func TestWorker(t *testing.T) { + // ctx := context.Background() + event := event.NewEvent("logcontext") + + m := NewMiddleWare() + m.eventDriver.Put(event) + +} diff --git a/plugins/define.go b/plugins/define.go index 37538c9..922dc34 100644 --- a/plugins/define.go +++ b/plugins/define.go @@ -4,4 +4,5 @@ type MiddleToolsInterface interface { Init() // 初始化 Exec(interface{}) (interface{}, error) // 处理用户发送事件 Name() string // 获取中间件名称 + Describe() string // 描述 } diff --git a/plugins/log-middle/middleware.go b/plugins/log-middle/middleware.go index c6ff102..dbdb6fe 100644 --- a/plugins/log-middle/middleware.go +++ b/plugins/log-middle/middleware.go @@ -13,14 +13,17 @@ func (i *logMiddle) Init() { } func (i *logMiddle) Exec(interface{}) (interface{}, error) { - fmt.Println(1) - return nil, nil + fmt.Println("logMiddle") + return "", nil } func (i *logMiddle) Name() string { return "logMiddle" } +func (i *logMiddle) Describe() string { + return "" +} func NewMiddleware() plugins.MiddleToolsInterface { return &logMiddle{} } diff --git a/plugins/map-key/middleware.go b/plugins/map-key/middleware.go index 31fa509..088c589 100644 --- a/plugins/map-key/middleware.go +++ b/plugins/map-key/middleware.go @@ -12,13 +12,17 @@ func (i *mapKey) Init() { func (i *mapKey) Exec(interface{}) (interface{}, error) { - return nil, nil + return "", nil } func (i *mapKey) Name() string { return "mapKey" } +func (i *mapKey) Describe() string { + return "" +} + func NewMiddleware() plugins.MiddleToolsInterface { return &mapKey{} }