feat(middle-msg): update middle msg

This commit is contained in:
bandl 2021-10-19 15:17:43 +08:00
parent e94a41ec73
commit 780cf7d276
6 changed files with 44 additions and 35 deletions

View File

@ -23,8 +23,14 @@ func SendMiddleMsg(
var eventName string
switch val.(type) {
case LogContext:
eventName = EventNameLog
case *LogContext:
eventName = LogContextName
case *LruCleanContext:
eventName = LruCleanContextName
case *LruTTlContext:
eventName = LruTTlContextName
case *PulginsInfo:
eventName = PulginsInfoname
}
msgEvent := event.NewEvent(eventName)

View File

@ -3,7 +3,7 @@ package middlemsg
import "time"
var (
EventNameLog = "log-context"
LogContextName = "log-context"
)
type LogContext struct {

View File

@ -4,14 +4,8 @@ import (
"time"
)
// []pulginsINfo
// 1. Version
// 2. desc
// 3. Name
// 4. 运行状态
const (
EventNamePlug = "plugins-info"
PulginsInfoname = "plugins-info-context"
)
type PulginsInfo struct {

View File

@ -1,12 +1,8 @@
package middle
import (
"context"
_ "gitee.com/timedb/wheatCache/conf"
"gitee.com/timedb/wheatCache/pkg/event"
"gitee.com/timedb/wheatCache/pkg/logx"
middleMsg "gitee.com/timedb/wheatCache/pkg/middle-msg"
"gitee.com/timedb/wheatCache/plugins"
"gitee.com/timedb/wheatCache/plugins/config"
"github.com/spf13/viper"
@ -76,24 +72,3 @@ func loadConfigAndDefault() (int, int) {
}
return consumerCount, driverCount
}
func (m *MiddleWare) startWork() {
for i := 0; i < m.consumerCount; i++ {
go func() {
ctx := context.Background()
for {
workEvent := m.eventConsumer.Receive(ctx)
plugs := m.plugins[workEvent.GetEventName()]
msg, ok := workEvent.GetValue(middleMsg.MiddleMsgKey)
if !ok {
logx.With(ctx, m.eventProduce).Error("get event value errnot key:%s", middleMsg.MiddleMsgKey)
continue
}
for _, val := range plugs {
val.Exec(msg)
}
}
}()
}
}

34
pkg/middle/worker.go Normal file
View File

@ -0,0 +1,34 @@
package middle
import (
"context"
"gitee.com/timedb/wheatCache/pkg/logx"
middleMsg "gitee.com/timedb/wheatCache/pkg/middle-msg"
)
func (m *MiddleWare) startWork() {
for i := 0; i < m.consumerCount; i++ {
go func() {
ctx := context.Background()
for {
workEvent := m.eventConsumer.Receive(ctx)
plugs := m.plugins[workEvent.GetEventName()]
msg, ok := workEvent.GetValue(middleMsg.MiddleMsgKey)
if !ok {
logx.With(ctx, m.eventProduce).Error("get event value errnot key:%s", middleMsg.MiddleMsgKey)
continue
}
// 发送事件到 全部的 plugs 里
for _, val := range plugs {
_, err := val.Exec(msg)
if err != nil {
logx.With(ctx, m.eventProduce).Errorln(err)
}
}
}
}()
}
}