From 780cf7d276d1c18401da7b976b67cac508c7055c Mon Sep 17 00:00:00 2001 From: bandl <1658002533@qq.com> Date: Tue, 19 Oct 2021 15:17:43 +0800 Subject: [PATCH] feat(middle-msg): update middle msg --- pkg/middle-msg/define.go | 10 ++++-- pkg/middle-msg/logx.go | 2 +- pkg/middle-msg/plugins.go | 8 +---- pkg/middle/middleware.go | 25 -------------- pkg/middle/worker.go | 34 +++++++++++++++++++ .../{middleware_test.go => worker_test.go} | 0 6 files changed, 44 insertions(+), 35 deletions(-) create mode 100644 pkg/middle/worker.go rename pkg/middle/{middleware_test.go => worker_test.go} (100%) diff --git a/pkg/middle-msg/define.go b/pkg/middle-msg/define.go index 5c01913..8e2e68d 100644 --- a/pkg/middle-msg/define.go +++ b/pkg/middle-msg/define.go @@ -23,8 +23,14 @@ func SendMiddleMsg( var eventName string switch val.(type) { - case LogContext: - eventName = EventNameLog + case *LogContext: + eventName = LogContextName + case *LruCleanContext: + eventName = LruCleanContextName + case *LruTTlContext: + eventName = LruTTlContextName + case *PulginsInfo: + eventName = PulginsInfoname } msgEvent := event.NewEvent(eventName) diff --git a/pkg/middle-msg/logx.go b/pkg/middle-msg/logx.go index b47434d..a3c1ded 100644 --- a/pkg/middle-msg/logx.go +++ b/pkg/middle-msg/logx.go @@ -3,7 +3,7 @@ package middlemsg import "time" var ( - EventNameLog = "log-context" + LogContextName = "log-context" ) type LogContext struct { diff --git a/pkg/middle-msg/plugins.go b/pkg/middle-msg/plugins.go index d57a9d2..ee92512 100644 --- a/pkg/middle-msg/plugins.go +++ b/pkg/middle-msg/plugins.go @@ -4,14 +4,8 @@ import ( "time" ) -// []pulginsINfo -// 1. Version -// 2. desc -// 3. Name -// 4. 运行状态 - const ( - EventNamePlug = "plugins-info" + PulginsInfoname = "plugins-info-context" ) type PulginsInfo struct { diff --git a/pkg/middle/middleware.go b/pkg/middle/middleware.go index 39b8c5a..065c615 100644 --- a/pkg/middle/middleware.go +++ b/pkg/middle/middleware.go @@ -1,12 +1,8 @@ package middle import ( - "context" - _ "gitee.com/timedb/wheatCache/conf" "gitee.com/timedb/wheatCache/pkg/event" - "gitee.com/timedb/wheatCache/pkg/logx" - middleMsg "gitee.com/timedb/wheatCache/pkg/middle-msg" "gitee.com/timedb/wheatCache/plugins" "gitee.com/timedb/wheatCache/plugins/config" "github.com/spf13/viper" @@ -76,24 +72,3 @@ func loadConfigAndDefault() (int, int) { } return consumerCount, driverCount } - -func (m *MiddleWare) startWork() { - - for i := 0; i < m.consumerCount; i++ { - go func() { - ctx := context.Background() - for { - workEvent := m.eventConsumer.Receive(ctx) - plugs := m.plugins[workEvent.GetEventName()] - msg, ok := workEvent.GetValue(middleMsg.MiddleMsgKey) - if !ok { - logx.With(ctx, m.eventProduce).Error("get event value err,not key:%s", middleMsg.MiddleMsgKey) - continue - } - for _, val := range plugs { - val.Exec(msg) - } - } - }() - } -} diff --git a/pkg/middle/worker.go b/pkg/middle/worker.go new file mode 100644 index 0000000..86a85da --- /dev/null +++ b/pkg/middle/worker.go @@ -0,0 +1,34 @@ +package middle + +import ( + "context" + + "gitee.com/timedb/wheatCache/pkg/logx" + middleMsg "gitee.com/timedb/wheatCache/pkg/middle-msg" +) + +func (m *MiddleWare) startWork() { + + for i := 0; i < m.consumerCount; i++ { + go func() { + ctx := context.Background() + for { + workEvent := m.eventConsumer.Receive(ctx) + plugs := m.plugins[workEvent.GetEventName()] + msg, ok := workEvent.GetValue(middleMsg.MiddleMsgKey) + if !ok { + logx.With(ctx, m.eventProduce).Error("get event value err,not key:%s", middleMsg.MiddleMsgKey) + continue + } + + // 发送事件到 全部的 plugs 里 + for _, val := range plugs { + _, err := val.Exec(msg) + if err != nil { + logx.With(ctx, m.eventProduce).Errorln(err) + } + } + } + }() + } +} diff --git a/pkg/middle/middleware_test.go b/pkg/middle/worker_test.go similarity index 100% rename from pkg/middle/middleware_test.go rename to pkg/middle/worker_test.go