feat(event): update event

This commit is contained in:
bandl 2021-10-26 16:01:11 +08:00
parent db615609cd
commit 404dc1fbbc
4 changed files with 66 additions and 23 deletions

View File

@ -33,7 +33,7 @@ func SendMiddleMsg(
eventName = PulginsInfosName
}
msgEvent := event.NewEvent(eventName)
msgEvent := middleProduce.NewEvent(eventName)
msgEvent.SetValue(MiddleMsgKey, val)
middleProduce.Call(ctx, msgEvent)
return nil

View File

@ -16,6 +16,8 @@ func (m *MiddleWare) startWork() {
workEvent := m.eventConsumer.Receive(ctx)
plugs := m.plugins[workEvent.GetEventName()]
msg, ok := workEvent.GetValue(middleMsg.MiddleMsgKey)
m.eventConsumer.Recovery(workEvent)
if !ok {
logx.With(ctx, m.eventProduce).Error("get event value errnot key:%s", middleMsg.MiddleMsgKey)
continue

View File

@ -16,11 +16,12 @@ func (s *serverSingle) LIndex(
return s.dao.LINdex(req.Key, req.Index)
})
lruEvent := event.NewEvent(lru.OptionEventName)
lruEvent := s.lruProduce.NewEvent(lru.OptionEventName)
lruEvent.InitWaitEvent()
lruEvent.SetValue(lru.WorkFuncEventKey, work)
s.lruProduce.Call(ctx, lruEvent)
resp, err := lruEvent.StartWaitEvent(s.timeOut)
s.lruProduce.Recovery(lruEvent)
if err != nil {
return nil, err
}
@ -38,11 +39,12 @@ func (s *serverSingle) LLen(
return s.dao.LLen(req.Key)
})
lruEvent := event.NewEvent(lru.OptionEventName)
lruEvent := s.lruProduce.NewEvent(lru.OptionEventName)
lruEvent.InitWaitEvent()
lruEvent.SetValue(lru.WorkFuncEventKey, work)
s.lruProduce.Call(ctx, lruEvent)
resp, err := lruEvent.StartWaitEvent(s.timeOut)
s.lruProduce.Recovery(lruEvent)
if err != nil {
return nil, err
}
@ -60,11 +62,12 @@ func (s *serverSingle) LPop(
return s.dao.LPop(request.Key, request.Count)
})
lruEvent := event.NewEvent(lru.OptionEventName)
lruEvent := s.lruProduce.NewEvent(lru.OptionEventName)
lruEvent.InitWaitEvent()
lruEvent.SetValue(lru.WorkFuncEventKey, work)
s.lruProduce.Call(ctx, lruEvent)
resp, err := lruEvent.StartWaitEvent(s.timeOut)
s.lruProduce.Recovery(lruEvent)
if err != nil {
return nil, err
}
@ -82,11 +85,13 @@ func (s *serverSingle) LPush(
return nil, s.dao.LPush(req.Key, req.Values...)
})
lruEvent := event.NewEvent(lru.OptionEventName)
lruEvent := s.lruProduce.NewEvent(lru.OptionEventName)
lruEvent.InitWaitEvent()
lruEvent.SetValue(lru.WorkFuncEventKey, work)
s.lruProduce.Call(ctx, lruEvent)
_, err := lruEvent.StartWaitEvent(s.timeOut)
s.lruProduce.Recovery(lruEvent)
if err != nil {
return nil, err
}
@ -102,11 +107,13 @@ func (s *serverSingle) LPushX(
return nil, s.dao.LPush(req.Key, req.Values...)
})
lruEvent := event.NewEvent(lru.OptionEventName)
lruEvent := s.lruProduce.NewEvent(lru.OptionEventName)
lruEvent.InitWaitEvent()
lruEvent.SetValue(lru.WorkFuncEventKey, work)
s.lruProduce.Call(ctx, lruEvent)
_, err := lruEvent.StartWaitEvent(s.timeOut)
s.lruProduce.Recovery(lruEvent)
if err != nil {
return nil, err
}
@ -122,11 +129,13 @@ func (s *serverSingle) LRange(
return s.dao.LRange(req.Key, req.Start, req.End)
})
lruEvent := event.NewEvent(lru.OptionEventName)
lruEvent := s.lruProduce.NewEvent(lru.OptionEventName)
lruEvent.InitWaitEvent()
lruEvent.SetValue(lru.WorkFuncEventKey, work)
s.lruProduce.Call(ctx, lruEvent)
resp, err := lruEvent.StartWaitEvent(s.timeOut)
s.lruProduce.Recovery(lruEvent)
if err != nil {
return nil, err
}
@ -144,11 +153,13 @@ func (s *serverSingle) LRem(
return s.dao.LRemove(req.Key, req.Count, req.Value)
})
lruEvent := event.NewEvent(lru.OptionEventName)
lruEvent := s.lruProduce.NewEvent(lru.OptionEventName)
lruEvent.InitWaitEvent()
lruEvent.SetValue(lru.WorkFuncEventKey, work)
s.lruProduce.Call(ctx, lruEvent)
resp, err := lruEvent.StartWaitEvent(s.timeOut)
s.lruProduce.Recovery(lruEvent)
if err != nil {
return nil, err
}
@ -166,11 +177,13 @@ func (s *serverSingle) LSet(
return nil, s.dao.LSet(req.Key, req.Index, req.Value)
})
lruEvent := event.NewEvent(lru.OptionEventName)
lruEvent := s.lruProduce.NewEvent(lru.OptionEventName)
lruEvent.InitWaitEvent()
lruEvent.SetValue(lru.WorkFuncEventKey, work)
s.lruProduce.Call(ctx, lruEvent)
_, err := lruEvent.StartWaitEvent(s.timeOut)
s.lruProduce.Recovery(lruEvent)
if err != nil {
return nil, err
}
@ -186,11 +199,13 @@ func (s *serverSingle) RPop(
return s.dao.RPop(req.Key, req.Count)
})
lruEvent := event.NewEvent(lru.OptionEventName)
lruEvent := s.lruProduce.NewEvent(lru.OptionEventName)
lruEvent.InitWaitEvent()
lruEvent.SetValue(lru.WorkFuncEventKey, work)
s.lruProduce.Call(ctx, lruEvent)
resp, err := lruEvent.StartWaitEvent(s.timeOut)
s.lruProduce.Recovery(lruEvent)
if err != nil {
return nil, err
}
@ -208,11 +223,13 @@ func (s *serverSingle) LTrim(
return nil, s.dao.LTrim(req.Key, req.Start, req.End)
})
lruEvent := event.NewEvent(lru.OptionEventName)
lruEvent := s.lruProduce.NewEvent(lru.OptionEventName)
lruEvent.InitWaitEvent()
lruEvent.SetValue(lru.WorkFuncEventKey, work)
s.lruProduce.Call(ctx, lruEvent)
_, err := lruEvent.StartWaitEvent(s.timeOut)
s.lruProduce.Recovery(lruEvent)
if err != nil {
return nil, err
}
@ -228,11 +245,13 @@ func (s *serverSingle) RPush(
return nil, s.dao.RPush(req.Key, req.Values...)
})
lruEvent := event.NewEvent(lru.OptionEventName)
lruEvent := s.lruProduce.NewEvent(lru.OptionEventName)
lruEvent.InitWaitEvent()
lruEvent.SetValue(lru.WorkFuncEventKey, work)
s.lruProduce.Call(ctx, lruEvent)
_, err := lruEvent.StartWaitEvent(s.timeOut)
s.lruProduce.Recovery(lruEvent)
if err != nil {
return nil, err
}
@ -248,11 +267,13 @@ func (s *serverSingle) RPushX(
return nil, s.dao.RPushX(req.Key, req.Values...)
})
lruEvent := event.NewEvent(lru.OptionEventName)
lruEvent := s.lruProduce.NewEvent(lru.OptionEventName)
lruEvent.InitWaitEvent()
lruEvent.SetValue(lru.WorkFuncEventKey, work)
s.lruProduce.Call(ctx, lruEvent)
_, err := lruEvent.StartWaitEvent(s.timeOut)
s.lruProduce.Recovery(lruEvent)
if err != nil {
return nil, err
}

View File

@ -17,11 +17,13 @@ func (s *serverSingle) Set(
return s.dao.Set(req.Key, req.Val)
})
lruEvent := event.NewEvent(lru.OptionEventName)
lruEvent := s.lruProduce.NewEvent(lru.OptionEventName)
lruEvent.InitWaitEvent()
lruEvent.SetValue(lru.WorkFuncEventKey, work)
s.lruProduce.Call(ctx, lruEvent)
resp, err := lruEvent.StartWaitEvent(s.timeOut)
s.lruProduce.Recovery(lruEvent)
if err != nil {
return nil, err
}
@ -39,11 +41,13 @@ func (s *serverSingle) Get(
return s.dao.Get(req.Key)
})
lruEvent := event.NewEvent(lru.OptionEventName)
lruEvent := s.lruProduce.NewEvent(lru.OptionEventName)
lruEvent.InitWaitEvent()
lruEvent.SetValue(lru.WorkFuncEventKey, work)
s.lruProduce.Call(cxt, lruEvent)
resp, err := lruEvent.StartWaitEvent(s.timeOut)
s.lruProduce.Recovery(lruEvent)
if err != nil {
return nil, err
}
@ -60,11 +64,13 @@ func (s serverSingle) Add(
return s.dao.Add(req.Key, req.Renewal)
})
lruEvent := event.NewEvent(lru.OptionEventName)
lruEvent := s.lruProduce.NewEvent(lru.OptionEventName)
lruEvent.InitWaitEvent()
lruEvent.SetValue(lru.WorkFuncEventKey, work)
s.lruProduce.Call(cxt, lruEvent)
resp, err := lruEvent.StartWaitEvent(s.timeOut)
s.lruProduce.Recovery(lruEvent)
if err != nil {
return nil, err
}
@ -81,11 +87,13 @@ func (s *serverSingle) Reduce(
return s.dao.Add(req.Key, req.Renewal)
})
lruEvent := event.NewEvent(lru.OptionEventName)
lruEvent := s.lruProduce.NewEvent(lru.OptionEventName)
lruEvent.InitWaitEvent()
lruEvent.SetValue(lru.WorkFuncEventKey, work)
s.lruProduce.Call(cxt, lruEvent)
resp, err := lruEvent.StartWaitEvent(s.timeOut)
s.lruProduce.Recovery(lruEvent)
if err != nil {
return nil, err
}
@ -102,11 +110,13 @@ func (s *serverSingle) SetBit(
return nil, s.dao.Setbit(req.Key, req.Val, req.Offer)
})
lruEvent := event.NewEvent(lru.OptionEventName)
lruEvent := s.lruProduce.NewEvent(lru.OptionEventName)
lruEvent.InitWaitEvent()
lruEvent.SetValue(lru.WorkFuncEventKey, work)
s.lruProduce.Call(cxt, lruEvent)
_, err := lruEvent.StartWaitEvent(s.timeOut)
s.lruProduce.Recovery(lruEvent)
if err != nil {
return nil, err
}
@ -121,11 +131,13 @@ func (s *serverSingle) GetBit(
return s.dao.GetBit(req.Key, req.Offer)
})
lruEvent := event.NewEvent(lru.OptionEventName)
lruEvent := s.lruProduce.NewEvent(lru.OptionEventName)
lruEvent.InitWaitEvent()
lruEvent.SetValue(lru.WorkFuncEventKey, work)
s.lruProduce.Call(cxt, lruEvent)
flag, err := lruEvent.StartWaitEvent(s.timeOut)
s.lruProduce.Recovery(lruEvent)
if err != nil {
return nil, err
}
@ -142,11 +154,13 @@ func (s *serverSingle) GetRange(
return s.dao.Getrange(req.Key, req.Start, req.End)
})
lruEvent := event.NewEvent(lru.OptionEventName)
lruEvent := s.lruProduce.NewEvent(lru.OptionEventName)
lruEvent.InitWaitEvent()
lruEvent.SetValue(lru.WorkFuncEventKey, work)
s.lruProduce.Call(ctx, lruEvent)
flag, err := lruEvent.StartWaitEvent(s.timeOut)
s.lruProduce.Recovery(lruEvent)
if err != nil {
return nil, err
}
@ -163,11 +177,13 @@ func (s *serverSingle) GetSet(
return s.dao.Getset(req.Key, req.Val)
})
lruEvent := event.NewEvent(lru.OptionEventName)
lruEvent := s.lruProduce.NewEvent(lru.OptionEventName)
lruEvent.InitWaitEvent()
lruEvent.SetValue(lru.WorkFuncEventKey, work)
s.lruProduce.Call(ctx, lruEvent)
result, err := lruEvent.StartWaitEvent(s.timeOut)
s.lruProduce.Recovery(lruEvent)
if err != nil {
return nil, err
}
@ -184,11 +200,13 @@ func (s *serverSingle) StrLen(
return s.dao.Strlen(req.Key)
})
lruEvent := event.NewEvent(lru.OptionEventName)
lruEvent := s.lruProduce.NewEvent(lru.OptionEventName)
lruEvent.InitWaitEvent()
lruEvent.SetValue(lru.WorkFuncEventKey, work)
s.lruProduce.Call(ctx, lruEvent)
flag, err := lruEvent.StartWaitEvent(s.timeOut)
s.lruProduce.Recovery(lruEvent)
if err != nil {
return nil, err
}
@ -205,11 +223,13 @@ func (s *serverSingle) Setnx(
return nil, s.dao.Setnx(req.Key, req.Val)
})
lruEvent := event.NewEvent(lru.OptionEventName)
lruEvent := s.lruProduce.NewEvent(lru.OptionEventName)
lruEvent.InitWaitEvent()
lruEvent.SetValue(lru.WorkFuncEventKey, work)
s.lruProduce.Call(ctx, lruEvent)
_, err := lruEvent.StartWaitEvent(s.timeOut)
s.lruProduce.Recovery(lruEvent)
if err != nil {
return nil, err
}