!78 优化 事件驱动

Merge pull request !78 from bandl/perf-event
This commit is contained in:
bandl 2021-10-26 09:13:08 +00:00 committed by Gitee
commit bbc4c27027
13 changed files with 296 additions and 196 deletions

View File

@ -6,10 +6,18 @@ type Consumer struct {
driver DriverInterface
}
func (c *Consumer) Receive(ctx context.Context) *Event {
func (c *Consumer) Receive(ctx context.Context) *event {
return c.driver.Get()
}
func (c *Consumer) NewEvent(name string) *event {
return c.driver.NewEvent(name)
}
func (c *Consumer) Recovery(e *event) {
c.driver.Recovery(e)
}
func NewConsumer(driver DriverInterface) ConsumerInterface {
return &Consumer{
driver: driver,

View File

@ -4,28 +4,30 @@ import (
"context"
)
type eventType int8
const (
defaultEventState = eventType(iota) //默认情况下的状态
waitEventState // 等待状态
workEventState //工作状态
closeEventState //事件关闭状态
defaultEventState = int32(iota) //默认情况下的状态
waitEventState // 等待状态
workEventState //工作状态
closeEventState //事件关闭状态
)
type EventWorkFunc func() (interface{}, error)
type DriverInterface interface {
Get() *Event
Put(event *Event)
Get() *event
Put(*event)
GetLength() int
NewEvent(string) *event
Recovery(*event)
}
type ProduceInterface interface {
Call(ctx context.Context, event *Event)
Call(context.Context, *event)
NewEvent(string) *event
Recovery(*event)
}
type ConsumerInterface interface {
Receive(ctx context.Context) *Event
Receive(ctx context.Context) *event
Recovery(*event)
}

View File

@ -1,139 +1,166 @@
package event
import (
"sync"
"sync/atomic"
"time"
"gitee.com/timedb/wheatCache/pkg/errorx"
)
type Active func() ([]string, error) // 事件带函数
// 事件 poll 降低 new 对象的频率
type eventPoll struct {
poll chan *event
maxSize int32
nowSize *int32
}
type Event struct {
func (e *eventPoll) getEvent() *event {
issSize := atomic.LoadInt32(e.nowSize)
if issSize < e.maxSize {
atomic.AddInt32(e.nowSize, 1)
return newEvent()
}
return <-e.poll
}
func (e *eventPoll) recovery(rEvent *event) {
rEvent.Reset()
e.poll <- rEvent
}
func newEventPoll(maxSize int) *eventPoll {
return &eventPoll{
poll: make(chan *event, maxSize),
maxSize: int32(maxSize),
nowSize: new(int32),
}
}
type event struct {
msgCtx map[string]interface{}
eventName string
WorkTime time.Duration // 工作时间
msg map[string]string // 消息
waitResult chan interface{} // 等待返回
err error
ru sync.RWMutex
muClose sync.Mutex //关闭锁
eventStatus eventType
eventStatus *int32
ttlManage *time.Timer
}
func (e *Event) SetMsg(key string, val string) {
e.ru.Lock()
defer e.ru.Unlock()
func newEvent() *event {
status := defaultEventState
return &event{
eventStatus: &status,
}
}
func (e *event) Reset() {
if e.ttlManage != nil {
e.ttlManage.Stop()
}
e.err = nil
atomic.SwapInt32(e.eventStatus, defaultEventState)
}
func (e *event) SetMsg(key string, val string) {
if e.msg == nil {
e.msg = make(map[string]string)
}
e.msg[key] = val
}
func (e *Event) GetMsg(key string) string {
e.ru.RLock()
defer e.ru.RUnlock()
func (e *event) GetMsg(key string) string {
return e.msg[key]
}
func (e *Event) GetEventName() string {
func (e *event) GetEventName() string {
return e.eventName
}
// SetValue 写入 ctx 传递用参数
func (e *Event) SetValue(key string, value interface{}) {
e.ru.Lock()
defer e.ru.Unlock()
func (e *event) SetValue(key string, value interface{}) {
if e.msgCtx == nil {
e.msgCtx = make(map[string]interface{})
}
e.msgCtx[key] = value
}
func (e *Event) GetValue(key string) (interface{}, bool) {
e.ru.RLock()
defer e.ru.RUnlock()
func (e *event) GetValue(key string) (interface{}, bool) {
val, ok := e.msgCtx[key]
return val, ok
}
// InitWaitEvent 初始化 wait event 必须调用才拥有等待特性
func (e *Event) InitWaitEvent() {
e.muClose.Lock()
defer e.muClose.Unlock()
e.waitResult = make(chan interface{})
e.eventStatus = waitEventState
func (e *event) InitWaitEvent() {
if e.waitResult == nil || len(e.waitResult) > 0 {
e.waitResult = make(chan interface{})
}
// 清理残留
if e.ttlManage == nil {
e.ttlManage = time.NewTimer(0)
}
e.ttlManage.Stop()
if len(e.ttlManage.C) > 0 {
<-e.ttlManage.C
}
atomic.CompareAndSwapInt32(e.eventStatus, defaultEventState, waitEventState)
}
// StartWaitEvent 开始一个等待任务
func (e *Event) StartWaitEvent(ttl time.Duration) (interface{}, error) {
t := time.NewTimer(ttl)
select {
case <-t.C:
e.muClose.Lock()
defer e.muClose.Unlock()
if e.eventStatus == workEventState {
return <-e.waitResult, e.err
func (e *event) StartWaitEvent(ttl time.Duration) (interface{}, error) {
e.ttlManage.Reset(ttl)
for {
select {
case <-e.ttlManage.C:
if atomic.CompareAndSwapInt32(e.eventStatus, waitEventState, closeEventState) {
return nil, errorx.TimeOutErr()
}
continue
case result := <-e.waitResult:
atomic.CompareAndSwapInt32(e.eventStatus, workEventState, closeEventState)
return result, e.err
}
e.eventStatus = closeEventState
return nil, errorx.TimeOutErr()
case result := <-e.waitResult:
return result, e.err
}
}
func (e *Event) ExecWorkAndSendResult(work EventWorkFunc) (interface{}, error) {
e.muClose.Lock()
defer e.muClose.Unlock()
if e.eventStatus != waitEventState {
func (e *event) ExecWorkAndSendResult(work EventWorkFunc) (interface{}, error) {
if !atomic.CompareAndSwapInt32(e.eventStatus, waitEventState, workEventState) {
return nil, errorx.New("not wait status, exec err")
}
e.eventStatus = workEventState
res, err := work()
e.err = err
e.waitResult <- res
close(e.waitResult)
e.eventStatus = closeEventState
return res, err
}
func (e *Event) SetResultErr(err error) {
e.muClose.Lock()
defer e.muClose.Unlock()
if e.eventStatus != waitEventState {
func (e *event) SetResultErr(err error) {
if !atomic.CompareAndSwapInt32(e.eventStatus, waitEventState, workEventState) {
return
}
e.eventStatus = workEventState
e.err = err
e.waitResult <- nil
close(e.waitResult)
e.eventStatus = closeEventState
}
func NewEvent(eventName string) *Event {
return &Event{
eventName: eventName,
eventStatus: defaultEventState,
}
}
type Driver struct {
maxQueueSize int
queue chan *Event
queue chan *event
poll *eventPoll
}
// Get 获取驱动
func (d *Driver) Get() *Event {
func (d *Driver) Get() *event {
return <-d.queue
}
func (d *Driver) Put(event *Event) {
func (d *Driver) Put(event *event) {
d.queue <- event
}
@ -141,10 +168,22 @@ func (d *Driver) GetLength() int {
return len(d.queue)
}
func (d *Driver) NewEvent(name string) *event {
event := d.poll.getEvent()
event.eventName = name
return event
}
// 任何时候回收事件都应该由 最后使用者回收
func (d *Driver) Recovery(e *event) {
d.poll.recovery(e)
}
// NewDriver 新建 Driver
func NewDriver(maxSize int) DriverInterface {
return &Driver{
maxQueueSize: maxSize,
queue: make(chan *Event, maxSize),
queue: make(chan *event, maxSize),
poll: newEventPoll(maxSize),
}
}

View File

@ -3,7 +3,8 @@ package event
import (
"context"
"fmt"
"gitee.com/timedb/wheatCache/pkg/errorx"
"strconv"
"sync"
"testing"
"time"
@ -13,87 +14,76 @@ import (
const testEvent = "1001"
const waitTestEvent = "1002"
// 简单 非等待响应模式, 使用 event driver
func TestEvent_DriverEventTest(t *testing.T) {
ctx := context.Background()
driver := NewDriver(500)
// 简单的 单向 event 使用
func Test_EventDriver(t *testing.T) {
driver := NewDriver(2000)
produce := NewProduce(driver)
consumer := NewConsumer(driver)
go produceEvent(t, ctx, produce)
consumerEvent(t, ctx, consumer)
}
func produceEvent(t *testing.T, ctx context.Context, v ProduceInterface) {
for i := 0; i < 100; i++ {
event := NewEvent(testEvent)
event.SetValue("test", i)
v.Call(ctx, event)
}
}
func consumerEvent(t *testing.T, ctx context.Context, v ConsumerInterface) {
for i := 0; i < 100; i++ {
event := v.Receive(ctx)
res, ok := event.GetValue("test")
require.True(t, ok)
fmt.Println(res)
require.Equal(t, res, i)
}
}
// 响应等待用法
func TestEvent_SpanWaitEvent(t *testing.T) {
ctx := context.Background()
driver := NewDriver(500)
produce := NewProduce(driver)
consumer := NewConsumer(driver)
go waitConsumer(t, ctx, consumer)
wait := sync.WaitGroup{}
wait.Add(30000)
waitProduce(t, ctx, produce)
}
func waitProduce(t *testing.T, ctx context.Context, v ProduceInterface) {
for i := 0; i < 100; i++ {
event := NewEvent(waitTestEvent)
event.InitWaitEvent()
event.SetValue("test", i)
v.Call(ctx, event) // 推送给 consumer
res, err := event.StartWaitEvent(2 * time.Second) // 最多等待 consumer 回复 2s
require.NoError(t, err)
require.Equal(t, fmt.Sprintf("test:%v", i), res)
}
}
func waitConsumer(t *testing.T, ctx context.Context, v ConsumerInterface) {
for i := 0; i < 100; i++ {
event := v.Receive(ctx) // 接受 produce 的 event
res, ok := event.GetValue("test")
require.True(t, ok)
require.Equal(t, res, i)
// 发送返回值给 produce
event.ExecWorkAndSendResult(func() (interface{}, error) {
return fmt.Sprintf("test:%v", res), nil
})
}
}
func TestEvent_SetResultErr(t *testing.T) {
ctx := context.Background()
event := NewEvent("dddd")
driver := NewDriver(100)
produce := NewProduce(driver)
consumer := NewConsumer(driver)
go func() {
event := consumer.Receive(ctx)
event.SetResultErr(errorx.New("err"))
for i := 0; i < 30000; i++ {
event := produce.NewEvent(testEvent)
event.SetMsg("k", strconv.Itoa(i))
produce.Call(ctx, event)
}
}()
event.InitWaitEvent()
produce.Call(ctx, event)
_, err := event.StartWaitEvent(2 * time.Second)
fmt.Println(err)
require.Error(t, err)
go func() {
for {
event := consumer.Receive(ctx)
fmt.Println(event.GetMsg("k"))
consumer.Recovery(event)
wait.Done()
}
}()
wait.Wait()
fmt.Println(*driver.(*Driver).poll.nowSize)
}
// 双向 event
func Test_WaitEventDriver(t *testing.T) {
driver := NewDriver(200)
produce := NewProduce(driver)
consumer := NewConsumer(driver)
ctx := context.Background()
wait := sync.WaitGroup{}
wait.Add(300000)
go func() {
for i := 0; i < 300000; i++ {
event := produce.NewEvent(testEvent)
event.SetMsg("k", strconv.Itoa(i))
event.InitWaitEvent()
produce.Call(ctx, event)
val, err := event.StartWaitEvent(2 * time.Second)
require.NoError(t, err)
fmt.Println(val)
produce.Recovery(event)
wait.Done()
}
}()
go func() {
for {
event := consumer.Receive(ctx)
event.ExecWorkAndSendResult(func() (interface{}, error) {
msg := event.GetMsg("k")
return "hello: " + msg, nil
})
}
}()
wait.Wait()
fmt.Println(*driver.(*Driver).poll.nowSize)
}

View File

@ -6,8 +6,16 @@ type Produce struct {
driver DriverInterface
}
func (p *Produce) Call(ctx context.Context, event *Event) {
p.driver.Put(event)
func (p *Produce) NewEvent(name string) *event {
return p.driver.NewEvent(name)
}
func (p *Produce) Recovery(e *event) {
p.driver.Recovery(e)
}
func (p *Produce) Call(ctx context.Context, e *event) {
p.driver.Put(e)
}
func NewProduce(driver DriverInterface) ProduceInterface {

View File

@ -61,7 +61,7 @@ func TestNewLRUCache2(t *testing.T) {
func TestLruProcess(t *testing.T) {
lru := NewLRUCache()
lru.clearSize = 1000
lru.clearSize = 3600
for i := 100; i < 200; i++ {
lru.Add(&proto.BaseKey{
@ -78,16 +78,17 @@ func TestLruProcess(t *testing.T) {
}, stringx.NewStringSingle())
}
require.Equal(t, lru.nowSize, int64(200*8))
require.Equal(t, lru.nowSize, int64(200*24))
// 自动清理测试
fmt.Println(lru.clearSize)
require.Equal(t, lru.li.Len(), 200)
time.Sleep(3 * time.Second)
fmt.Println(lru.nowSize)
require.Less(t, lru.nowSize, lru.clearSize+1)
// TTL 测试
// TTL 测试, 100-200 key 发生自动清理 留下 50-100 共 1000-100 + 20个 key5s 后,前 0-100的 key 过期,剩下
time.Sleep(2 * time.Second)
require.Equal(t, lru.li.Len(), 25)
require.Equal(t, lru.li.Len(), 50)
// 过期全部的 Key
for i := 100; i < 200; i++ {

View File

@ -28,9 +28,9 @@ func Test_LruTTl(t *testing.T) {
Key: "990",
Ttl: 10,
}, s)
require.Equal(t, lru.nowSize, int64(16))
require.Equal(t, lru.nowSize, int64(48))
time.Sleep(4 * time.Second)
require.Equal(t, lru.nowSize, int64(8))
require.Equal(t, lru.nowSize, int64(24))
}

View File

@ -16,7 +16,7 @@ func TestWorker(t *testing.T) {
ctx := context.Background()
lru := NewLRUCache()
produce := event.NewProduce(lru.GetDriver())
workEvent := event.NewEvent(OptionEventName)
workEvent := produce.NewEvent(OptionEventName)
workEvent.SetValue(WorkFuncEventKey, event.EventWorkFunc(func() (interface{}, error) {
v1 := stringx.NewStringSingle()
key := proto.BaseKey{
@ -39,7 +39,7 @@ func TestSingleCache_DelToClearSize(t *testing.T) {
produce := event.NewProduce(lru.GetDriver())
for i := int32(20000); i >= 1; i-- {
workEvent := event.NewEvent(OptionEventName)
workEvent := produce.NewEvent(OptionEventName)
workEvent.SetValue(WorkFuncEventKey, event.EventWorkFunc(func() (interface{}, error) {
v1 := stringx.NewStringSingle()
key := proto.BaseKey{
@ -52,6 +52,7 @@ func TestSingleCache_DelToClearSize(t *testing.T) {
workEvent.InitWaitEvent()
produce.Call(ctx, workEvent)
workEvent.StartWaitEvent(2 * time.Second)
produce.Recovery(workEvent)
}
time.Sleep(5 * time.Second)

View File

@ -2,10 +2,11 @@ package lru
import (
"context"
"time"
"gitee.com/timedb/wheatCache/pkg/errorx"
"gitee.com/timedb/wheatCache/pkg/event"
"gitee.com/timedb/wheatCache/pkg/logx"
"time"
)
func (lru *SingleCache) lruSingleWork() {
@ -27,7 +28,7 @@ func (lru *SingleCache) lruSingleWork() {
case CleanEventName:
// 对当前的io数量进行判断
ioNum := lru.GetDriver().GetLength()
if ioNum > lru.lruMaxDiverSize*1/2 {
if ioNum > lru.lruMaxDiverSize/2 {
lru.lruCleanProduce.Call(ctx, workEvent)
continue
}
@ -46,7 +47,6 @@ func (lru *SingleCache) lruSingleWork() {
// 执行过期事件
func (lru *SingleCache) lruTtlWork() {
ttlEvent := event.NewEvent(TtlEventName)
ctx := context.Background()
work := event.EventWorkFunc(func() (interface{}, error) {
@ -66,7 +66,6 @@ func (lru *SingleCache) lruTtlWork() {
return nil, nil
})
ttlEvent.SetValue(WorkFuncEventKey, work)
cleanTTlTicker := time.NewTicker(500 * time.Millisecond)
defer cleanTTlTicker.Stop()
@ -77,16 +76,22 @@ func (lru *SingleCache) lruTtlWork() {
select {
// 清理事件
case <-cleanTTlTicker.C:
ttlEvent := lru.lruCleanProduce.NewEvent(TtlEventName)
ttlEvent.SetValue(WorkFuncEventKey, work)
if len(lru.lruTtlManage.memoryKey) == 0 {
continue
}
ttlEvent.InitWaitEvent()
lru.lruCleanProduce.Call(ctx, ttlEvent)
_, err := ttlEvent.StartWaitEvent(time.Second * 2)
lru.lruCleanProduce.Recovery(ttlEvent)
if err != nil {
logx.With(ctx, lru.middleProduce).Errorln(err)
}
// 收集过期的 key
case <-gatherTTlTicker.C:
lru.lruTtlManage.ttlKeyToMemoryBySecond()
}
@ -95,24 +100,27 @@ func (lru *SingleCache) lruTtlWork() {
func (lru *SingleCache) cleanWork() {
cxt := context.Background()
lruCleanEvent := event.NewEvent(CleanEventName)
work := event.EventWorkFunc(func() (interface{}, error) {
err := lru.DelToClearSize()
return nil, err
})
lruCleanEvent.SetValue(WorkFuncEventKey, work)
for {
time.Sleep(2 * time.Second)
time.Sleep(2 * time.Second)
if lru.clearSize < lru.nowSize {
lruCleanEvent := lru.lruCleanProduce.NewEvent(CleanEventName)
lruCleanEvent.SetValue(WorkFuncEventKey, work)
lruCleanEvent.InitWaitEvent()
lru.lruCleanProduce.Call(cxt, lruCleanEvent)
_, err := lruCleanEvent.StartWaitEvent(defaultWaitTime)
if err != nil {
logx.With(cxt, lru.middleProduce).Errorln(err)
}
// 归还
lru.lruCleanProduce.Recovery(lruCleanEvent)
}
}
}

View File

@ -33,7 +33,7 @@ func SendMiddleMsg(
eventName = PulginsInfosName
}
msgEvent := event.NewEvent(eventName)
msgEvent := middleProduce.NewEvent(eventName)
msgEvent.SetValue(MiddleMsgKey, val)
middleProduce.Call(ctx, msgEvent)
return nil

View File

@ -16,6 +16,8 @@ func (m *MiddleWare) startWork() {
workEvent := m.eventConsumer.Receive(ctx)
plugs := m.plugins[workEvent.GetEventName()]
msg, ok := workEvent.GetValue(middleMsg.MiddleMsgKey)
m.eventConsumer.Recovery(workEvent)
if !ok {
logx.With(ctx, m.eventProduce).Error("get event value errnot key:%s", middleMsg.MiddleMsgKey)
continue

View File

@ -16,11 +16,12 @@ func (s *serverSingle) LIndex(
return s.dao.LINdex(req.Key, req.Index)
})
lruEvent := event.NewEvent(lru.OptionEventName)
lruEvent := s.lruProduce.NewEvent(lru.OptionEventName)
lruEvent.InitWaitEvent()
lruEvent.SetValue(lru.WorkFuncEventKey, work)
s.lruProduce.Call(ctx, lruEvent)
resp, err := lruEvent.StartWaitEvent(s.timeOut)
s.lruProduce.Recovery(lruEvent)
if err != nil {
return nil, err
}
@ -38,11 +39,12 @@ func (s *serverSingle) LLen(
return s.dao.LLen(req.Key)
})
lruEvent := event.NewEvent(lru.OptionEventName)
lruEvent := s.lruProduce.NewEvent(lru.OptionEventName)
lruEvent.InitWaitEvent()
lruEvent.SetValue(lru.WorkFuncEventKey, work)
s.lruProduce.Call(ctx, lruEvent)
resp, err := lruEvent.StartWaitEvent(s.timeOut)
s.lruProduce.Recovery(lruEvent)
if err != nil {
return nil, err
}
@ -60,11 +62,12 @@ func (s *serverSingle) LPop(
return s.dao.LPop(request.Key, request.Count)
})
lruEvent := event.NewEvent(lru.OptionEventName)
lruEvent := s.lruProduce.NewEvent(lru.OptionEventName)
lruEvent.InitWaitEvent()
lruEvent.SetValue(lru.WorkFuncEventKey, work)
s.lruProduce.Call(ctx, lruEvent)
resp, err := lruEvent.StartWaitEvent(s.timeOut)
s.lruProduce.Recovery(lruEvent)
if err != nil {
return nil, err
}
@ -82,11 +85,13 @@ func (s *serverSingle) LPush(
return nil, s.dao.LPush(req.Key, req.Values...)
})
lruEvent := event.NewEvent(lru.OptionEventName)
lruEvent := s.lruProduce.NewEvent(lru.OptionEventName)
lruEvent.InitWaitEvent()
lruEvent.SetValue(lru.WorkFuncEventKey, work)
s.lruProduce.Call(ctx, lruEvent)
_, err := lruEvent.StartWaitEvent(s.timeOut)
s.lruProduce.Recovery(lruEvent)
if err != nil {
return nil, err
}
@ -102,11 +107,13 @@ func (s *serverSingle) LPushX(
return nil, s.dao.LPush(req.Key, req.Values...)
})
lruEvent := event.NewEvent(lru.OptionEventName)
lruEvent := s.lruProduce.NewEvent(lru.OptionEventName)
lruEvent.InitWaitEvent()
lruEvent.SetValue(lru.WorkFuncEventKey, work)
s.lruProduce.Call(ctx, lruEvent)
_, err := lruEvent.StartWaitEvent(s.timeOut)
s.lruProduce.Recovery(lruEvent)
if err != nil {
return nil, err
}
@ -122,11 +129,13 @@ func (s *serverSingle) LRange(
return s.dao.LRange(req.Key, req.Start, req.End)
})
lruEvent := event.NewEvent(lru.OptionEventName)
lruEvent := s.lruProduce.NewEvent(lru.OptionEventName)
lruEvent.InitWaitEvent()
lruEvent.SetValue(lru.WorkFuncEventKey, work)
s.lruProduce.Call(ctx, lruEvent)
resp, err := lruEvent.StartWaitEvent(s.timeOut)
s.lruProduce.Recovery(lruEvent)
if err != nil {
return nil, err
}
@ -144,11 +153,13 @@ func (s *serverSingle) LRem(
return s.dao.LRemove(req.Key, req.Count, req.Value)
})
lruEvent := event.NewEvent(lru.OptionEventName)
lruEvent := s.lruProduce.NewEvent(lru.OptionEventName)
lruEvent.InitWaitEvent()
lruEvent.SetValue(lru.WorkFuncEventKey, work)
s.lruProduce.Call(ctx, lruEvent)
resp, err := lruEvent.StartWaitEvent(s.timeOut)
s.lruProduce.Recovery(lruEvent)
if err != nil {
return nil, err
}
@ -166,11 +177,13 @@ func (s *serverSingle) LSet(
return nil, s.dao.LSet(req.Key, req.Index, req.Value)
})
lruEvent := event.NewEvent(lru.OptionEventName)
lruEvent := s.lruProduce.NewEvent(lru.OptionEventName)
lruEvent.InitWaitEvent()
lruEvent.SetValue(lru.WorkFuncEventKey, work)
s.lruProduce.Call(ctx, lruEvent)
_, err := lruEvent.StartWaitEvent(s.timeOut)
s.lruProduce.Recovery(lruEvent)
if err != nil {
return nil, err
}
@ -186,11 +199,13 @@ func (s *serverSingle) RPop(
return s.dao.RPop(req.Key, req.Count)
})
lruEvent := event.NewEvent(lru.OptionEventName)
lruEvent := s.lruProduce.NewEvent(lru.OptionEventName)
lruEvent.InitWaitEvent()
lruEvent.SetValue(lru.WorkFuncEventKey, work)
s.lruProduce.Call(ctx, lruEvent)
resp, err := lruEvent.StartWaitEvent(s.timeOut)
s.lruProduce.Recovery(lruEvent)
if err != nil {
return nil, err
}
@ -208,11 +223,13 @@ func (s *serverSingle) LTrim(
return nil, s.dao.LTrim(req.Key, req.Start, req.End)
})
lruEvent := event.NewEvent(lru.OptionEventName)
lruEvent := s.lruProduce.NewEvent(lru.OptionEventName)
lruEvent.InitWaitEvent()
lruEvent.SetValue(lru.WorkFuncEventKey, work)
s.lruProduce.Call(ctx, lruEvent)
_, err := lruEvent.StartWaitEvent(s.timeOut)
s.lruProduce.Recovery(lruEvent)
if err != nil {
return nil, err
}
@ -228,11 +245,13 @@ func (s *serverSingle) RPush(
return nil, s.dao.RPush(req.Key, req.Values...)
})
lruEvent := event.NewEvent(lru.OptionEventName)
lruEvent := s.lruProduce.NewEvent(lru.OptionEventName)
lruEvent.InitWaitEvent()
lruEvent.SetValue(lru.WorkFuncEventKey, work)
s.lruProduce.Call(ctx, lruEvent)
_, err := lruEvent.StartWaitEvent(s.timeOut)
s.lruProduce.Recovery(lruEvent)
if err != nil {
return nil, err
}
@ -248,11 +267,13 @@ func (s *serverSingle) RPushX(
return nil, s.dao.RPushX(req.Key, req.Values...)
})
lruEvent := event.NewEvent(lru.OptionEventName)
lruEvent := s.lruProduce.NewEvent(lru.OptionEventName)
lruEvent.InitWaitEvent()
lruEvent.SetValue(lru.WorkFuncEventKey, work)
s.lruProduce.Call(ctx, lruEvent)
_, err := lruEvent.StartWaitEvent(s.timeOut)
s.lruProduce.Recovery(lruEvent)
if err != nil {
return nil, err
}

View File

@ -17,11 +17,13 @@ func (s *serverSingle) Set(
return s.dao.Set(req.Key, req.Val)
})
lruEvent := event.NewEvent(lru.OptionEventName)
lruEvent := s.lruProduce.NewEvent(lru.OptionEventName)
lruEvent.InitWaitEvent()
lruEvent.SetValue(lru.WorkFuncEventKey, work)
s.lruProduce.Call(ctx, lruEvent)
resp, err := lruEvent.StartWaitEvent(s.timeOut)
s.lruProduce.Recovery(lruEvent)
if err != nil {
return nil, err
}
@ -39,11 +41,13 @@ func (s *serverSingle) Get(
return s.dao.Get(req.Key)
})
lruEvent := event.NewEvent(lru.OptionEventName)
lruEvent := s.lruProduce.NewEvent(lru.OptionEventName)
lruEvent.InitWaitEvent()
lruEvent.SetValue(lru.WorkFuncEventKey, work)
s.lruProduce.Call(cxt, lruEvent)
resp, err := lruEvent.StartWaitEvent(s.timeOut)
s.lruProduce.Recovery(lruEvent)
if err != nil {
return nil, err
}
@ -60,11 +64,13 @@ func (s serverSingle) Add(
return s.dao.Add(req.Key, req.Renewal)
})
lruEvent := event.NewEvent(lru.OptionEventName)
lruEvent := s.lruProduce.NewEvent(lru.OptionEventName)
lruEvent.InitWaitEvent()
lruEvent.SetValue(lru.WorkFuncEventKey, work)
s.lruProduce.Call(cxt, lruEvent)
resp, err := lruEvent.StartWaitEvent(s.timeOut)
s.lruProduce.Recovery(lruEvent)
if err != nil {
return nil, err
}
@ -81,11 +87,13 @@ func (s *serverSingle) Reduce(
return s.dao.Add(req.Key, req.Renewal)
})
lruEvent := event.NewEvent(lru.OptionEventName)
lruEvent := s.lruProduce.NewEvent(lru.OptionEventName)
lruEvent.InitWaitEvent()
lruEvent.SetValue(lru.WorkFuncEventKey, work)
s.lruProduce.Call(cxt, lruEvent)
resp, err := lruEvent.StartWaitEvent(s.timeOut)
s.lruProduce.Recovery(lruEvent)
if err != nil {
return nil, err
}
@ -102,11 +110,13 @@ func (s *serverSingle) SetBit(
return nil, s.dao.Setbit(req.Key, req.Val, req.Offer)
})
lruEvent := event.NewEvent(lru.OptionEventName)
lruEvent := s.lruProduce.NewEvent(lru.OptionEventName)
lruEvent.InitWaitEvent()
lruEvent.SetValue(lru.WorkFuncEventKey, work)
s.lruProduce.Call(cxt, lruEvent)
_, err := lruEvent.StartWaitEvent(s.timeOut)
s.lruProduce.Recovery(lruEvent)
if err != nil {
return nil, err
}
@ -121,11 +131,13 @@ func (s *serverSingle) GetBit(
return s.dao.GetBit(req.Key, req.Offer)
})
lruEvent := event.NewEvent(lru.OptionEventName)
lruEvent := s.lruProduce.NewEvent(lru.OptionEventName)
lruEvent.InitWaitEvent()
lruEvent.SetValue(lru.WorkFuncEventKey, work)
s.lruProduce.Call(cxt, lruEvent)
flag, err := lruEvent.StartWaitEvent(s.timeOut)
s.lruProduce.Recovery(lruEvent)
if err != nil {
return nil, err
}
@ -142,11 +154,13 @@ func (s *serverSingle) GetRange(
return s.dao.Getrange(req.Key, req.Start, req.End)
})
lruEvent := event.NewEvent(lru.OptionEventName)
lruEvent := s.lruProduce.NewEvent(lru.OptionEventName)
lruEvent.InitWaitEvent()
lruEvent.SetValue(lru.WorkFuncEventKey, work)
s.lruProduce.Call(ctx, lruEvent)
flag, err := lruEvent.StartWaitEvent(s.timeOut)
s.lruProduce.Recovery(lruEvent)
if err != nil {
return nil, err
}
@ -163,11 +177,13 @@ func (s *serverSingle) GetSet(
return s.dao.Getset(req.Key, req.Val)
})
lruEvent := event.NewEvent(lru.OptionEventName)
lruEvent := s.lruProduce.NewEvent(lru.OptionEventName)
lruEvent.InitWaitEvent()
lruEvent.SetValue(lru.WorkFuncEventKey, work)
s.lruProduce.Call(ctx, lruEvent)
result, err := lruEvent.StartWaitEvent(s.timeOut)
s.lruProduce.Recovery(lruEvent)
if err != nil {
return nil, err
}
@ -184,11 +200,13 @@ func (s *serverSingle) StrLen(
return s.dao.Strlen(req.Key)
})
lruEvent := event.NewEvent(lru.OptionEventName)
lruEvent := s.lruProduce.NewEvent(lru.OptionEventName)
lruEvent.InitWaitEvent()
lruEvent.SetValue(lru.WorkFuncEventKey, work)
s.lruProduce.Call(ctx, lruEvent)
flag, err := lruEvent.StartWaitEvent(s.timeOut)
s.lruProduce.Recovery(lruEvent)
if err != nil {
return nil, err
}
@ -205,11 +223,13 @@ func (s *serverSingle) Setnx(
return nil, s.dao.Setnx(req.Key, req.Val)
})
lruEvent := event.NewEvent(lru.OptionEventName)
lruEvent := s.lruProduce.NewEvent(lru.OptionEventName)
lruEvent.InitWaitEvent()
lruEvent.SetValue(lru.WorkFuncEventKey, work)
s.lruProduce.Call(ctx, lruEvent)
_, err := lruEvent.StartWaitEvent(s.timeOut)
s.lruProduce.Recovery(lruEvent)
if err != nil {
return nil, err
}