diff --git a/client/client_test.go b/client/client_test.go index a317a40..04f82bc 100644 --- a/client/client_test.go +++ b/client/client_test.go @@ -13,11 +13,21 @@ func TestClient(t *testing.T) { cli, err := NewWheatClient("127.0.0.1:5891", middle.WithUnaryColonyClient) require.NoError(t, err) ctx := context.Background() - _, err = cli.Set(ctx, &proto.SetRequest{ - Key: &proto.BaseKey{ - Key: "apple", - }, + bKey := &proto.BaseKey{ + Key: "apple", + } + resp, err := cli.Set(ctx, &proto.SetRequest{ + Key: bKey, Val: "yyyy", }) + require.NoError(t, err) + require.Equal(t, resp.Result, "yyyy") + + getResp, err := cli.Get(ctx, &proto.GetRequest{ + Key: bKey, + }) + require.NoError(t, err) + require.Equal(t, getResp.Result, "yyyy") + } diff --git a/conf/public_conf.go b/conf/public_conf.go index 088f4ea..533b64a 100644 --- a/conf/public_conf.go +++ b/conf/public_conf.go @@ -8,9 +8,6 @@ import ( const ( linuxPath = "/etc/wheat-cache/" - - devPath = "./conf" - devPathBin = "../conf" ) func init() { @@ -19,7 +16,7 @@ func init() { switch err.(type) { case nil: case viper.ConfigFileNotFoundError: - formatPath := []string{linuxPath, devPath, devPath} + formatPath := []string{linuxPath} log.Fatalf("the profile could not be read, read path:%v", formatPath) default: log.Fatalf("the resolution of the profile failed, err: %v", err) @@ -42,10 +39,6 @@ func LoadConf(path string) error { // linux viper.AddConfigPath(linuxPath) - // 开发环境 - viper.AddConfigPath(devPath) - viper.AddConfigPath(devPathBin) - viper.SetConfigType("yaml") err := viper.ReadInConfig() diff --git a/conf/wheat-cache.yaml b/conf/wheat-cache.yaml index 08f2c02..d29ec38 100644 --- a/conf/wheat-cache.yaml +++ b/conf/wheat-cache.yaml @@ -24,9 +24,25 @@ middleware-driver: driverCount: 1000 middleConsumerCount: 5 +# Register the message push type +# 在这里注册消息推送类型, plugins-control: - logcontext: [ "logMiddle" ] + # log-context: Logs generated by storage or gateway are pushed through this message + # log-context: storage 或者 gateway 产生的日志通过这个消息推送 + log-context: [ "mock-plugins" ] + + # lru-clean-context: Lru is pushed through this message when data cleansing occurs + # lru-clean-context: Lru 发生数据清理时通过这个消息推送 + lru-clean-context: ["mock-plugins"] + + # lru-ttl-context: Lru is pushed through this message when data expires + # lru-ttl-context: Lru 发生数据过期时通过这个消息推送 + lru-ttl-context: ["mock-plugins"] + + # plugins-info-context:All plugins information for the current project + # plugins-info-context: 当前项目全部的插件信息 + plugins-infos-context: ["mock-plugins"] gateway: host: '127.0.0.1' diff --git a/pkg/logx/logx.go b/pkg/logx/logx.go index 3814ebe..17c5b62 100644 --- a/pkg/logx/logx.go +++ b/pkg/logx/logx.go @@ -3,12 +3,13 @@ package logx import ( "context" "fmt" - "gitee.com/timedb/wheatCache/pkg/event" - middleMsg "gitee.com/timedb/wheatCache/pkg/middle-msg" "os" "runtime" "strings" "time" + + "gitee.com/timedb/wheatCache/pkg/event" + middleMsg "gitee.com/timedb/wheatCache/pkg/middle-msg" ) func With(ctx context.Context, p event.ProduceInterface) *upLogger { @@ -41,15 +42,13 @@ func (l *upLogger) Panic(format string, msg ...interface{}) { func (l *upLogger) Print(level string, format string, msg ...interface{}) { logPrint(4, level, format, msg...) - - eventMiddle := event.NewEvent(middleMsg.EventNameLog) - eventMiddle.SetValue(middleMsg.MiddleMsgKey, middleMsg.LogContext{ + sendMsg := &middleMsg.LogContext{ Level: level, Data: time.Now(), Msg: fmt.Sprintf(format, msg...), Route: findPlace(4), - }) - l.produce.Call(l.ctx, eventMiddle) + } + middleMsg.SendMiddleMsg(l.ctx, l.produce, sendMsg) } func (l *upLogger) Debugln(msg ...interface{}) { diff --git a/pkg/middle-msg/define.go b/pkg/middle-msg/define.go index 5c01913..aed2e29 100644 --- a/pkg/middle-msg/define.go +++ b/pkg/middle-msg/define.go @@ -23,8 +23,14 @@ func SendMiddleMsg( var eventName string switch val.(type) { - case LogContext: - eventName = EventNameLog + case *LogContext: + eventName = LogContextName + case *LruCleanContext: + eventName = LruCleanContextName + case *LruTTlContext: + eventName = LruTTlContextName + case *PulginsInfos: + eventName = PulginsInfosName } msgEvent := event.NewEvent(eventName) diff --git a/pkg/middle-msg/logx.go b/pkg/middle-msg/logx.go index b47434d..a3c1ded 100644 --- a/pkg/middle-msg/logx.go +++ b/pkg/middle-msg/logx.go @@ -3,7 +3,7 @@ package middlemsg import "time" var ( - EventNameLog = "log-context" + LogContextName = "log-context" ) type LogContext struct { diff --git a/pkg/middle-msg/plugins.go b/pkg/middle-msg/plugins.go index d57a9d2..b452349 100644 --- a/pkg/middle-msg/plugins.go +++ b/pkg/middle-msg/plugins.go @@ -4,14 +4,8 @@ import ( "time" ) -// []pulginsINfo -// 1. Version -// 2. desc -// 3. Name -// 4. 运行状态 - const ( - EventNamePlug = "plugins-info" + PulginsInfosName = "plugins-infos-context" ) type PulginsInfo struct { @@ -21,3 +15,7 @@ type PulginsInfo struct { Statux string Time time.Duration } + +type PulginsInfos struct { + Infos []*PulginsInfo +} diff --git a/pkg/middle/middleware.go b/pkg/middle/middleware.go index 39b8c5a..44a1239 100644 --- a/pkg/middle/middleware.go +++ b/pkg/middle/middleware.go @@ -1,12 +1,8 @@ package middle import ( - "context" - _ "gitee.com/timedb/wheatCache/conf" "gitee.com/timedb/wheatCache/pkg/event" - "gitee.com/timedb/wheatCache/pkg/logx" - middleMsg "gitee.com/timedb/wheatCache/pkg/middle-msg" "gitee.com/timedb/wheatCache/plugins" "gitee.com/timedb/wheatCache/plugins/config" "github.com/spf13/viper" @@ -16,7 +12,7 @@ type MiddleWare struct { eventDriver event.DriverInterface eventConsumer event.ConsumerInterface eventProduce event.ProduceInterface - plugins map[string][]plugins.MiddleToolsInterface + plugins map[string][]plugins.PluginInterface consumerCount int driverCount int } @@ -35,8 +31,8 @@ func NewMiddleWare() *MiddleWare { } middleWareDriver.loadPlugins() + // 多消费 middle middleWareDriver.startWork() - }) return middleWareDriver } @@ -50,10 +46,10 @@ func (m *MiddleWare) loadPlugins() { pluginsMap := config.GetMiddlewareMap() - pluginsContext := make(map[string][]plugins.MiddleToolsInterface) + pluginsContext := make(map[string][]plugins.PluginInterface) for msg, pluNames := range plug { - pulgSingle := make([]plugins.MiddleToolsInterface, 0) + pulgSingle := make([]plugins.PluginInterface, 0) for _, name := range pluNames { pulgSingle = append(pulgSingle, pluginsMap[name]) } @@ -76,24 +72,3 @@ func loadConfigAndDefault() (int, int) { } return consumerCount, driverCount } - -func (m *MiddleWare) startWork() { - - for i := 0; i < m.consumerCount; i++ { - go func() { - ctx := context.Background() - for { - workEvent := m.eventConsumer.Receive(ctx) - plugs := m.plugins[workEvent.GetEventName()] - msg, ok := workEvent.GetValue(middleMsg.MiddleMsgKey) - if !ok { - logx.With(ctx, m.eventProduce).Error("get event value err,not key:%s", middleMsg.MiddleMsgKey) - continue - } - for _, val := range plugs { - val.Exec(msg) - } - } - }() - } -} diff --git a/pkg/middle/middleware_test.go b/pkg/middle/middleware_test.go deleted file mode 100644 index df94f50..0000000 --- a/pkg/middle/middleware_test.go +++ /dev/null @@ -1,31 +0,0 @@ -package middle - -import ( - "context" - "testing" - "time" - - "gitee.com/timedb/wheatCache/pkg/event" - middleMsg "gitee.com/timedb/wheatCache/pkg/middle-msg" - "github.com/stretchr/testify/require" -) - -func Test_middleware_driver(t *testing.T) { - ctx := context.Background() - middleware := NewMiddleWare() - require.Equal(t, middleware.plugins["logcontext"][0].Name(), "logMiddle") - event := event.NewEvent("logcontext") - event.SetValue(middleMsg.MiddleMsgKey, "123") - middleware.eventProduce.Call(ctx, event) - require.Equal(t, middleware.consumerCount, 5) - time.Sleep(1 * time.Second) -} - -func TestWorker(t *testing.T) { - // ctx := context.Background() - event := event.NewEvent("logcontext") - - m := NewMiddleWare() - m.eventDriver.Put(event) - -} diff --git a/pkg/middle/worker.go b/pkg/middle/worker.go new file mode 100644 index 0000000..86a85da --- /dev/null +++ b/pkg/middle/worker.go @@ -0,0 +1,34 @@ +package middle + +import ( + "context" + + "gitee.com/timedb/wheatCache/pkg/logx" + middleMsg "gitee.com/timedb/wheatCache/pkg/middle-msg" +) + +func (m *MiddleWare) startWork() { + + for i := 0; i < m.consumerCount; i++ { + go func() { + ctx := context.Background() + for { + workEvent := m.eventConsumer.Receive(ctx) + plugs := m.plugins[workEvent.GetEventName()] + msg, ok := workEvent.GetValue(middleMsg.MiddleMsgKey) + if !ok { + logx.With(ctx, m.eventProduce).Error("get event value err,not key:%s", middleMsg.MiddleMsgKey) + continue + } + + // 发送事件到 全部的 plugs 里 + for _, val := range plugs { + _, err := val.Exec(msg) + if err != nil { + logx.With(ctx, m.eventProduce).Errorln(err) + } + } + } + }() + } +} diff --git a/pkg/middle/worker_test.go b/pkg/middle/worker_test.go new file mode 100644 index 0000000..1db20bf --- /dev/null +++ b/pkg/middle/worker_test.go @@ -0,0 +1,42 @@ +package middle + +import ( + "context" + "fmt" + "testing" + "time" + + "gitee.com/timedb/wheatCache/pkg/event" + middleMsg "gitee.com/timedb/wheatCache/pkg/middle-msg" +) + +func Test_middleware_loadPlugins(t *testing.T) { + m := NewMiddleWare() + m.loadPlugins() + + fmt.Println(m.plugins) +} + +func TestWorker(t *testing.T) { + ctx := context.Background() + m := NewMiddleWare() + + product := event.NewProduce(m.GetEventDriver()) + middleMsg.SendMiddleMsg(ctx, product, &middleMsg.LogContext{ + Msg: "debug msg", + }) + + middleMsg.SendMiddleMsg(ctx, product, &middleMsg.PulginsInfos{ + Infos: []*middleMsg.PulginsInfo{ + { + Desc: "miss", + }, + }, + }) + + middleMsg.SendMiddleMsg(ctx, product, &middleMsg.LruTTlContext{ + Keys: []string{"1", "2", "3"}, + }) + + time.Sleep(1 * time.Second) +} diff --git a/plugins/config/middle.gen.go b/plugins/config/middle.gen.go index 15a055c..8030d69 100644 --- a/plugins/config/middle.gen.go +++ b/plugins/config/middle.gen.go @@ -6,17 +6,14 @@ package config import ( "gitee.com/timedb/wheatCache/plugins" - logMiddle "gitee.com/timedb/wheatCache/plugins/log-middle" - mapKey "gitee.com/timedb/wheatCache/plugins/map-key" + mockPlugin "gitee.com/timedb/wheatCache/plugins/mock-plugin" ) -func GetMiddlewareMap() map[string]plugins.MiddleToolsInterface { +func GetMiddlewareMap() map[string]plugins.PluginInterface { - logMiddle := logMiddle.NewMiddleware() - mapKey := mapKey.NewMiddleware() - return map[string]plugins.MiddleToolsInterface{ + mockPlugin := mockPlugin.NewPlugin() + return map[string]plugins.PluginInterface{ - logMiddle.Name(): logMiddle, - mapKey.Name(): mapKey, + mockPlugin.Name(): mockPlugin, } } diff --git a/plugins/config/middle.template b/plugins/config/middle.template index d904530..a11e1d6 100644 --- a/plugins/config/middle.template +++ b/plugins/config/middle.template @@ -11,11 +11,11 @@ import ( ) -func GetMiddlewareMap() map[string]plugins.MiddleToolsInterface { +func GetMiddlewareMap() map[string]plugins.PluginInterface { {%for dir in dirs %} - {{dir[0]}}:={{dir[0]}}.NewMiddleware() + {{dir[0]}}:={{dir[0]}}.NewPlugin() {%- endfor%} - return map[string]plugins.MiddleToolsInterface{ + return map[string]plugins.PluginInterface{ {%for dir in dirs %} {{dir[0]}}.Name():{{dir[0]}}, {%- endfor%} diff --git a/plugins/define.go b/plugins/define.go index 922dc34..37a2519 100644 --- a/plugins/define.go +++ b/plugins/define.go @@ -1,6 +1,6 @@ package plugins -type MiddleToolsInterface interface { +type PluginInterface interface { Init() // 初始化 Exec(interface{}) (interface{}, error) // 处理用户发送事件 Name() string // 获取中间件名称 diff --git a/plugins/log-middle/middleware.go b/plugins/log-middle/middleware.go deleted file mode 100644 index 1a07a47..0000000 --- a/plugins/log-middle/middleware.go +++ /dev/null @@ -1,29 +0,0 @@ -package logmiddle - -import ( - "fmt" - - "gitee.com/timedb/wheatCache/plugins" -) - -type logMiddle struct { -} - -func (i *logMiddle) Init() { -} - -func (i *logMiddle) Exec(interface{}) (interface{}, error) { - fmt.Println("logMiddle") - return "", nil -} - -func (i *logMiddle) Name() string { - return "logMiddle" -} - -func (i *logMiddle) Describe() string { - return "" -} -func NewMiddleware() plugins.MiddleToolsInterface { - return &logMiddle{} -} diff --git a/plugins/map-key/middleware.go b/plugins/map-key/middleware.go deleted file mode 100644 index a36588c..0000000 --- a/plugins/map-key/middleware.go +++ /dev/null @@ -1,28 +0,0 @@ -package logmiddle - -import ( - "gitee.com/timedb/wheatCache/plugins" -) - -type mapKey struct { -} - -func (i *mapKey) Init() { -} - -func (i *mapKey) Exec(interface{}) (interface{}, error) { - - return "", nil -} - -func (i *mapKey) Name() string { - return "mapKey" -} - -func (i *mapKey) Describe() string { - return "" -} - -func NewMiddleware() plugins.MiddleToolsInterface { - return &mapKey{} -} diff --git a/plugins/mock-plugin/mock.go b/plugins/mock-plugin/mock.go new file mode 100644 index 0000000..0c87df2 --- /dev/null +++ b/plugins/mock-plugin/mock.go @@ -0,0 +1,26 @@ +package mockplugin + +import "fmt" + +type MockPlugin struct { +} + +func (m *MockPlugin) Init() { +} + +func (m *MockPlugin) Exec(msg interface{}) (interface{}, error) { + fmt.Println(msg) + return nil, nil +} + +func (m *MockPlugin) Name() string { + return "mock-plugins" +} + +func (m *MockPlugin) Describe() string { + return "这是一个测试用的插件" +} + +func NewPlugin() *MockPlugin { + return &MockPlugin{} +}