wheat-cache/pkg/middle/worker.go

37 lines
800 B
Go
Raw Normal View History

2021-10-19 15:17:43 +08:00
package middle
import (
"context"
"gitee.com/timedb/wheatCache/pkg/logx"
middleMsg "gitee.com/timedb/wheatCache/pkg/middle-msg"
)
func (m *MiddleWare) startWork() {
for i := 0; i < m.consumerCount; i++ {
go func() {
ctx := context.Background()
for {
workEvent := m.eventConsumer.Receive(ctx)
plugs := m.plugins[workEvent.GetEventName()]
msg, ok := workEvent.GetValue(middleMsg.MiddleMsgKey)
2021-10-26 16:01:11 +08:00
m.eventConsumer.Recovery(workEvent)
2021-10-19 15:17:43 +08:00
if !ok {
logx.With(ctx, m.eventProduce).Error("get event value errnot key:%s", middleMsg.MiddleMsgKey)
continue
}
// 发送事件到 全部的 plugs 里
for _, val := range plugs {
_, err := val.Exec(msg)
if err != nil {
logx.With(ctx, m.eventProduce).Errorln(err)
}
}
}
}()
}
}