!71 feat-middle-worker

Merge pull request !71 from bandl/feat-middle-worker
This commit is contained in:
bandl 2021-10-19 08:40:06 +00:00 committed by Gitee
commit 1a884ce9f0
17 changed files with 167 additions and 159 deletions

View File

@ -13,11 +13,21 @@ func TestClient(t *testing.T) {
cli, err := NewWheatClient("127.0.0.1:5891", middle.WithUnaryColonyClient) cli, err := NewWheatClient("127.0.0.1:5891", middle.WithUnaryColonyClient)
require.NoError(t, err) require.NoError(t, err)
ctx := context.Background() ctx := context.Background()
_, err = cli.Set(ctx, &proto.SetRequest{ bKey := &proto.BaseKey{
Key: &proto.BaseKey{
Key: "apple", Key: "apple",
}, }
resp, err := cli.Set(ctx, &proto.SetRequest{
Key: bKey,
Val: "yyyy", Val: "yyyy",
}) })
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, resp.Result, "yyyy")
getResp, err := cli.Get(ctx, &proto.GetRequest{
Key: bKey,
})
require.NoError(t, err)
require.Equal(t, getResp.Result, "yyyy")
} }

View File

@ -8,9 +8,6 @@ import (
const ( const (
linuxPath = "/etc/wheat-cache/" linuxPath = "/etc/wheat-cache/"
devPath = "./conf"
devPathBin = "../conf"
) )
func init() { func init() {
@ -19,7 +16,7 @@ func init() {
switch err.(type) { switch err.(type) {
case nil: case nil:
case viper.ConfigFileNotFoundError: case viper.ConfigFileNotFoundError:
formatPath := []string{linuxPath, devPath, devPath} formatPath := []string{linuxPath}
log.Fatalf("the profile could not be read, read path:%v", formatPath) log.Fatalf("the profile could not be read, read path:%v", formatPath)
default: default:
log.Fatalf("the resolution of the profile failed, err: %v", err) log.Fatalf("the resolution of the profile failed, err: %v", err)
@ -42,10 +39,6 @@ func LoadConf(path string) error {
// linux // linux
viper.AddConfigPath(linuxPath) viper.AddConfigPath(linuxPath)
// 开发环境
viper.AddConfigPath(devPath)
viper.AddConfigPath(devPathBin)
viper.SetConfigType("yaml") viper.SetConfigType("yaml")
err := viper.ReadInConfig() err := viper.ReadInConfig()

View File

@ -24,9 +24,25 @@ middleware-driver:
driverCount: 1000 driverCount: 1000
middleConsumerCount: 5 middleConsumerCount: 5
# Register the message push type
# 在这里注册消息推送类型,
plugins-control: plugins-control:
logcontext: [ "logMiddle" ]
# log-context: Logs generated by storage or gateway are pushed through this message
# log-context: storage 或者 gateway 产生的日志通过这个消息推送
log-context: [ "mock-plugins" ]
# lru-clean-context: Lru is pushed through this message when data cleansing occurs
# lru-clean-context: Lru 发生数据清理时通过这个消息推送
lru-clean-context: ["mock-plugins"]
# lru-ttl-context: Lru is pushed through this message when data expires
# lru-ttl-context: Lru 发生数据过期时通过这个消息推送
lru-ttl-context: ["mock-plugins"]
# plugins-info-contextAll plugins information for the current project
# plugins-info-context: 当前项目全部的插件信息
plugins-infos-context: ["mock-plugins"]
gateway: gateway:
host: '127.0.0.1' host: '127.0.0.1'

View File

@ -3,12 +3,13 @@ package logx
import ( import (
"context" "context"
"fmt" "fmt"
"gitee.com/timedb/wheatCache/pkg/event"
middleMsg "gitee.com/timedb/wheatCache/pkg/middle-msg"
"os" "os"
"runtime" "runtime"
"strings" "strings"
"time" "time"
"gitee.com/timedb/wheatCache/pkg/event"
middleMsg "gitee.com/timedb/wheatCache/pkg/middle-msg"
) )
func With(ctx context.Context, p event.ProduceInterface) *upLogger { func With(ctx context.Context, p event.ProduceInterface) *upLogger {
@ -41,15 +42,13 @@ func (l *upLogger) Panic(format string, msg ...interface{}) {
func (l *upLogger) Print(level string, format string, msg ...interface{}) { func (l *upLogger) Print(level string, format string, msg ...interface{}) {
logPrint(4, level, format, msg...) logPrint(4, level, format, msg...)
sendMsg := &middleMsg.LogContext{
eventMiddle := event.NewEvent(middleMsg.EventNameLog)
eventMiddle.SetValue(middleMsg.MiddleMsgKey, middleMsg.LogContext{
Level: level, Level: level,
Data: time.Now(), Data: time.Now(),
Msg: fmt.Sprintf(format, msg...), Msg: fmt.Sprintf(format, msg...),
Route: findPlace(4), Route: findPlace(4),
}) }
l.produce.Call(l.ctx, eventMiddle) middleMsg.SendMiddleMsg(l.ctx, l.produce, sendMsg)
} }
func (l *upLogger) Debugln(msg ...interface{}) { func (l *upLogger) Debugln(msg ...interface{}) {

View File

@ -23,8 +23,14 @@ func SendMiddleMsg(
var eventName string var eventName string
switch val.(type) { switch val.(type) {
case LogContext: case *LogContext:
eventName = EventNameLog eventName = LogContextName
case *LruCleanContext:
eventName = LruCleanContextName
case *LruTTlContext:
eventName = LruTTlContextName
case *PulginsInfos:
eventName = PulginsInfosName
} }
msgEvent := event.NewEvent(eventName) msgEvent := event.NewEvent(eventName)

View File

@ -3,7 +3,7 @@ package middlemsg
import "time" import "time"
var ( var (
EventNameLog = "log-context" LogContextName = "log-context"
) )
type LogContext struct { type LogContext struct {

View File

@ -4,14 +4,8 @@ import (
"time" "time"
) )
// []pulginsINfo
// 1. Version
// 2. desc
// 3. Name
// 4. 运行状态
const ( const (
EventNamePlug = "plugins-info" PulginsInfosName = "plugins-infos-context"
) )
type PulginsInfo struct { type PulginsInfo struct {
@ -21,3 +15,7 @@ type PulginsInfo struct {
Statux string Statux string
Time time.Duration Time time.Duration
} }
type PulginsInfos struct {
Infos []*PulginsInfo
}

View File

@ -1,12 +1,8 @@
package middle package middle
import ( import (
"context"
_ "gitee.com/timedb/wheatCache/conf" _ "gitee.com/timedb/wheatCache/conf"
"gitee.com/timedb/wheatCache/pkg/event" "gitee.com/timedb/wheatCache/pkg/event"
"gitee.com/timedb/wheatCache/pkg/logx"
middleMsg "gitee.com/timedb/wheatCache/pkg/middle-msg"
"gitee.com/timedb/wheatCache/plugins" "gitee.com/timedb/wheatCache/plugins"
"gitee.com/timedb/wheatCache/plugins/config" "gitee.com/timedb/wheatCache/plugins/config"
"github.com/spf13/viper" "github.com/spf13/viper"
@ -16,7 +12,7 @@ type MiddleWare struct {
eventDriver event.DriverInterface eventDriver event.DriverInterface
eventConsumer event.ConsumerInterface eventConsumer event.ConsumerInterface
eventProduce event.ProduceInterface eventProduce event.ProduceInterface
plugins map[string][]plugins.MiddleToolsInterface plugins map[string][]plugins.PluginInterface
consumerCount int consumerCount int
driverCount int driverCount int
} }
@ -35,8 +31,8 @@ func NewMiddleWare() *MiddleWare {
} }
middleWareDriver.loadPlugins() middleWareDriver.loadPlugins()
// 多消费 middle
middleWareDriver.startWork() middleWareDriver.startWork()
}) })
return middleWareDriver return middleWareDriver
} }
@ -50,10 +46,10 @@ func (m *MiddleWare) loadPlugins() {
pluginsMap := config.GetMiddlewareMap() pluginsMap := config.GetMiddlewareMap()
pluginsContext := make(map[string][]plugins.MiddleToolsInterface) pluginsContext := make(map[string][]plugins.PluginInterface)
for msg, pluNames := range plug { for msg, pluNames := range plug {
pulgSingle := make([]plugins.MiddleToolsInterface, 0) pulgSingle := make([]plugins.PluginInterface, 0)
for _, name := range pluNames { for _, name := range pluNames {
pulgSingle = append(pulgSingle, pluginsMap[name]) pulgSingle = append(pulgSingle, pluginsMap[name])
} }
@ -76,24 +72,3 @@ func loadConfigAndDefault() (int, int) {
} }
return consumerCount, driverCount return consumerCount, driverCount
} }
func (m *MiddleWare) startWork() {
for i := 0; i < m.consumerCount; i++ {
go func() {
ctx := context.Background()
for {
workEvent := m.eventConsumer.Receive(ctx)
plugs := m.plugins[workEvent.GetEventName()]
msg, ok := workEvent.GetValue(middleMsg.MiddleMsgKey)
if !ok {
logx.With(ctx, m.eventProduce).Error("get event value errnot key:%s", middleMsg.MiddleMsgKey)
continue
}
for _, val := range plugs {
val.Exec(msg)
}
}
}()
}
}

View File

@ -1,31 +0,0 @@
package middle
import (
"context"
"testing"
"time"
"gitee.com/timedb/wheatCache/pkg/event"
middleMsg "gitee.com/timedb/wheatCache/pkg/middle-msg"
"github.com/stretchr/testify/require"
)
func Test_middleware_driver(t *testing.T) {
ctx := context.Background()
middleware := NewMiddleWare()
require.Equal(t, middleware.plugins["logcontext"][0].Name(), "logMiddle")
event := event.NewEvent("logcontext")
event.SetValue(middleMsg.MiddleMsgKey, "123")
middleware.eventProduce.Call(ctx, event)
require.Equal(t, middleware.consumerCount, 5)
time.Sleep(1 * time.Second)
}
func TestWorker(t *testing.T) {
// ctx := context.Background()
event := event.NewEvent("logcontext")
m := NewMiddleWare()
m.eventDriver.Put(event)
}

34
pkg/middle/worker.go Normal file
View File

@ -0,0 +1,34 @@
package middle
import (
"context"
"gitee.com/timedb/wheatCache/pkg/logx"
middleMsg "gitee.com/timedb/wheatCache/pkg/middle-msg"
)
func (m *MiddleWare) startWork() {
for i := 0; i < m.consumerCount; i++ {
go func() {
ctx := context.Background()
for {
workEvent := m.eventConsumer.Receive(ctx)
plugs := m.plugins[workEvent.GetEventName()]
msg, ok := workEvent.GetValue(middleMsg.MiddleMsgKey)
if !ok {
logx.With(ctx, m.eventProduce).Error("get event value errnot key:%s", middleMsg.MiddleMsgKey)
continue
}
// 发送事件到 全部的 plugs 里
for _, val := range plugs {
_, err := val.Exec(msg)
if err != nil {
logx.With(ctx, m.eventProduce).Errorln(err)
}
}
}
}()
}
}

42
pkg/middle/worker_test.go Normal file
View File

@ -0,0 +1,42 @@
package middle
import (
"context"
"fmt"
"testing"
"time"
"gitee.com/timedb/wheatCache/pkg/event"
middleMsg "gitee.com/timedb/wheatCache/pkg/middle-msg"
)
func Test_middleware_loadPlugins(t *testing.T) {
m := NewMiddleWare()
m.loadPlugins()
fmt.Println(m.plugins)
}
func TestWorker(t *testing.T) {
ctx := context.Background()
m := NewMiddleWare()
product := event.NewProduce(m.GetEventDriver())
middleMsg.SendMiddleMsg(ctx, product, &middleMsg.LogContext{
Msg: "debug msg",
})
middleMsg.SendMiddleMsg(ctx, product, &middleMsg.PulginsInfos{
Infos: []*middleMsg.PulginsInfo{
{
Desc: "miss",
},
},
})
middleMsg.SendMiddleMsg(ctx, product, &middleMsg.LruTTlContext{
Keys: []string{"1", "2", "3"},
})
time.Sleep(1 * time.Second)
}

View File

@ -6,17 +6,14 @@ package config
import ( import (
"gitee.com/timedb/wheatCache/plugins" "gitee.com/timedb/wheatCache/plugins"
logMiddle "gitee.com/timedb/wheatCache/plugins/log-middle" mockPlugin "gitee.com/timedb/wheatCache/plugins/mock-plugin"
mapKey "gitee.com/timedb/wheatCache/plugins/map-key"
) )
func GetMiddlewareMap() map[string]plugins.MiddleToolsInterface { func GetMiddlewareMap() map[string]plugins.PluginInterface {
logMiddle := logMiddle.NewMiddleware() mockPlugin := mockPlugin.NewPlugin()
mapKey := mapKey.NewMiddleware() return map[string]plugins.PluginInterface{
return map[string]plugins.MiddleToolsInterface{
logMiddle.Name(): logMiddle, mockPlugin.Name(): mockPlugin,
mapKey.Name(): mapKey,
} }
} }

View File

@ -11,11 +11,11 @@ import (
) )
func GetMiddlewareMap() map[string]plugins.MiddleToolsInterface { func GetMiddlewareMap() map[string]plugins.PluginInterface {
{%for dir in dirs %} {%for dir in dirs %}
{{dir[0]}}:={{dir[0]}}.NewMiddleware() {{dir[0]}}:={{dir[0]}}.NewPlugin()
{%- endfor%} {%- endfor%}
return map[string]plugins.MiddleToolsInterface{ return map[string]plugins.PluginInterface{
{%for dir in dirs %} {%for dir in dirs %}
{{dir[0]}}.Name():{{dir[0]}}, {{dir[0]}}.Name():{{dir[0]}},
{%- endfor%} {%- endfor%}

View File

@ -1,6 +1,6 @@
package plugins package plugins
type MiddleToolsInterface interface { type PluginInterface interface {
Init() // 初始化 Init() // 初始化
Exec(interface{}) (interface{}, error) // 处理用户发送事件 Exec(interface{}) (interface{}, error) // 处理用户发送事件
Name() string // 获取中间件名称 Name() string // 获取中间件名称

View File

@ -1,29 +0,0 @@
package logmiddle
import (
"fmt"
"gitee.com/timedb/wheatCache/plugins"
)
type logMiddle struct {
}
func (i *logMiddle) Init() {
}
func (i *logMiddle) Exec(interface{}) (interface{}, error) {
fmt.Println("logMiddle")
return "", nil
}
func (i *logMiddle) Name() string {
return "logMiddle"
}
func (i *logMiddle) Describe() string {
return ""
}
func NewMiddleware() plugins.MiddleToolsInterface {
return &logMiddle{}
}

View File

@ -1,28 +0,0 @@
package logmiddle
import (
"gitee.com/timedb/wheatCache/plugins"
)
type mapKey struct {
}
func (i *mapKey) Init() {
}
func (i *mapKey) Exec(interface{}) (interface{}, error) {
return "", nil
}
func (i *mapKey) Name() string {
return "mapKey"
}
func (i *mapKey) Describe() string {
return ""
}
func NewMiddleware() plugins.MiddleToolsInterface {
return &mapKey{}
}

View File

@ -0,0 +1,26 @@
package mockplugin
import "fmt"
type MockPlugin struct {
}
func (m *MockPlugin) Init() {
}
func (m *MockPlugin) Exec(msg interface{}) (interface{}, error) {
fmt.Println(msg)
return nil, nil
}
func (m *MockPlugin) Name() string {
return "mock-plugins"
}
func (m *MockPlugin) Describe() string {
return "这是一个测试用的插件"
}
func NewPlugin() *MockPlugin {
return &MockPlugin{}
}