!58 middle driver

Merge pull request !58 from Sodesnei/fix-middleware-driver
This commit is contained in:
Sodesnei 2021-10-10 06:14:02 +00:00 committed by Gitee
commit aac12f93f4
12 changed files with 187 additions and 11 deletions

View File

@ -8,6 +8,7 @@ import (
) )
func TestConf(t *testing.T) { func TestConf(t *testing.T) {
// 外部导入 conf.yaml 需要导入 conf 包 // 外部导入 conf.yaml 需要导入 conf 包
// 每次迁移文件时, 使用 sudo make init-conf来将yam文件迁移到指定的文件夹下 // 每次迁移文件时, 使用 sudo make init-conf来将yam文件迁移到指定的文件夹下
// get 使用, 读取 public_conf 配置文件 // get 使用, 读取 public_conf 配置文件
@ -22,3 +23,13 @@ func TestConf(t *testing.T) {
host := viper.GetString("host") host := viper.GetString("host")
require.Equal(t, host, "1222") require.Equal(t, host, "1222")
} }
func TestMiddleConf(t *testing.T) {
ct := viper.GetStringSlice("plugins-control.logcontext")
require.Equal(t, ct, []string{"logMiddle"})
d := viper.GetInt("middleware-driver.driverCount")
require.Equal(t, d, 1000)
c := viper.GetInt("middleware-driver.middleConsumerCount")
require.Equal(t, c, 5)
}

View File

@ -14,8 +14,16 @@ lruCache:
eventDriverSize: 2000 eventDriverSize: 2000
workTime: 1 workTime: 1
logPrint: logPrint:
stath: [ stath: [
"debug", "debug",
"error" "error"
] ]
middleware-driver:
driverCount: 1000
middleConsumerCount: 5
plugins-control:
logcontext: ["logMiddle"]

View File

@ -45,7 +45,7 @@ func (l *upLogger) Print(level string, format string, msg ...interface{}) {
Print(level, format, msg...) Print(level, format, msg...)
eventMiddle := event.NewEvent(middleMsg.EventNameLog) eventMiddle := event.NewEvent(middleMsg.EventNameLog)
eventMiddle.SetValue(middleMsg.EventKeyLog, middleMsg.LogContext{ eventMiddle.SetValue(middleMsg.MiddleMsgKey, middleMsg.LogContext{
Level: level, Level: level,
Data: time.Now(), Data: time.Now(),
Msg: fmt.Sprintf(format, msg...), Msg: fmt.Sprintf(format, msg...),

5
pkg/middle-msg/define.go Normal file
View File

@ -0,0 +1,5 @@
package middle_msg
const (
MiddleMsgKey = "middleMsgKey"
)

View File

@ -3,9 +3,7 @@ package middle_msg
import "time" import "time"
var ( var (
EventNameLog = "LogContext" EventNameLog = "logcontext"
EventKeyLog = "LogContext"
) )
type LogContext struct { type LogContext struct {

View File

@ -0,0 +1,8 @@
package middle_msg
// []pulginsINfo
// 1. Version
// 2. desc
// 3. Name
// 4. 运行状态
// 5. 运行时间

View File

@ -1,7 +1,15 @@
package middle package middle
import getMiddlewareMap "gitee.com/timedb/wheatCache/plugins/config" import (
"sync"
)
func Init() { var (
getMiddlewareMap.GetMiddlewareMap() oneMiddle sync.Once
} middleWareDriver *MiddleWare
)
const (
defaultConsumerCount = 5
defaultDriverCount = 1000
)

99
pkg/middle/middleware.go Normal file
View File

@ -0,0 +1,99 @@
package middle
import (
"context"
_ "gitee.com/timedb/wheatCache/conf"
"gitee.com/timedb/wheatCache/pkg/event"
"gitee.com/timedb/wheatCache/pkg/logx"
middleMsg "gitee.com/timedb/wheatCache/pkg/middle-msg"
"gitee.com/timedb/wheatCache/plugins"
"gitee.com/timedb/wheatCache/plugins/config"
"github.com/spf13/viper"
)
type MiddleWare struct {
eventDriver event.DriverInterface
eventConsumer event.ConsumerInterface
eventProduce event.ProduceInterface
plugins map[string][]plugins.MiddleToolsInterface
consumerCount int
driverCount int
}
func NewMiddleWare() *MiddleWare {
oneMiddle.Do(func() {
consumerCount, driverCount := loadConfigAndDefault()
driver := event.NewDriver(driverCount)
middleWareDriver = &MiddleWare{
eventDriver: driver,
eventConsumer: event.NewConsumer(driver),
eventProduce: event.NewProduce(driver),
driverCount: driverCount,
consumerCount: consumerCount,
}
middleWareDriver.loadPlugins()
middleWareDriver.startWork()
})
return middleWareDriver
}
func (m *MiddleWare) GetEventDriver() event.DriverInterface {
return m.eventDriver
}
func (m *MiddleWare) loadPlugins() {
plug := viper.GetStringMapStringSlice("plugins-control")
pluginsMap := config.GetMiddlewareMap()
pluginsContext := make(map[string][]plugins.MiddleToolsInterface)
for middleMsg, pluNames := range plug {
pulgSingle := make([]plugins.MiddleToolsInterface, 0)
for _, name := range pluNames {
pulgSingle = append(pulgSingle, pluginsMap[name])
}
pluginsContext[middleMsg] = pulgSingle
}
m.plugins = pluginsContext
}
func loadConfigAndDefault() (int, int) {
// 加载 consumerCount
consumerCount := viper.GetInt("middle-driver.middleConsumerCount")
if consumerCount == 0 {
consumerCount = defaultConsumerCount
}
driverCount := viper.GetInt("middle-driver.driverCount")
if driverCount == 0 {
driverCount = defaultDriverCount
}
return consumerCount, driverCount
}
func (m *MiddleWare) startWork() {
for i := 0; i < m.consumerCount; i++ {
go func() {
ctx := context.Background()
for {
workEvent := m.eventConsumer.Receive(ctx)
plugs := m.plugins[workEvent.GetEventName()]
msg, ok := workEvent.GetValue(middleMsg.MiddleMsgKey)
if !ok {
logx.With(ctx, m.eventProduce).Error("get event value errnot key:%s", middleMsg.MiddleMsgKey)
continue
}
for _, val := range plugs {
val.Exec(msg)
}
}
}()
}
}

View File

@ -0,0 +1,31 @@
package middle
import (
"context"
"testing"
"time"
"gitee.com/timedb/wheatCache/pkg/event"
middleMsg "gitee.com/timedb/wheatCache/pkg/middle-msg"
"github.com/stretchr/testify/require"
)
func Test_middleware_driver(t *testing.T) {
ctx := context.Background()
middleware := NewMiddleWare()
require.Equal(t, middleware.plugins["logcontext"][0].Name(), "logMiddle")
event := event.NewEvent("logcontext")
event.SetValue(middleMsg.MiddleMsgKey, "123")
middleware.eventProduce.Call(ctx, event)
require.Equal(t, middleware.consumerCount, 5)
time.Sleep(1 * time.Second)
}
func TestWorker(t *testing.T) {
// ctx := context.Background()
event := event.NewEvent("logcontext")
m := NewMiddleWare()
m.eventDriver.Put(event)
}

View File

@ -4,4 +4,5 @@ type MiddleToolsInterface interface {
Init() // 初始化 Init() // 初始化
Exec(interface{}) (interface{}, error) // 处理用户发送事件 Exec(interface{}) (interface{}, error) // 处理用户发送事件
Name() string // 获取中间件名称 Name() string // 获取中间件名称
Describe() string // 描述
} }

View File

@ -13,14 +13,17 @@ func (i *logMiddle) Init() {
} }
func (i *logMiddle) Exec(interface{}) (interface{}, error) { func (i *logMiddle) Exec(interface{}) (interface{}, error) {
fmt.Println(1) fmt.Println("logMiddle")
return nil, nil return "", nil
} }
func (i *logMiddle) Name() string { func (i *logMiddle) Name() string {
return "logMiddle" return "logMiddle"
} }
func (i *logMiddle) Describe() string {
return ""
}
func NewMiddleware() plugins.MiddleToolsInterface { func NewMiddleware() plugins.MiddleToolsInterface {
return &logMiddle{} return &logMiddle{}
} }

View File

@ -12,13 +12,17 @@ func (i *mapKey) Init() {
func (i *mapKey) Exec(interface{}) (interface{}, error) { func (i *mapKey) Exec(interface{}) (interface{}, error) {
return nil, nil return "", nil
} }
func (i *mapKey) Name() string { func (i *mapKey) Name() string {
return "mapKey" return "mapKey"
} }
func (i *mapKey) Describe() string {
return ""
}
func NewMiddleware() plugins.MiddleToolsInterface { func NewMiddleware() plugins.MiddleToolsInterface {
return &mapKey{} return &mapKey{}
} }