diff --git a/pkg/event/consumer.go b/pkg/event/consumer.go index 995c123..e24155a 100644 --- a/pkg/event/consumer.go +++ b/pkg/event/consumer.go @@ -6,10 +6,18 @@ type Consumer struct { driver DriverInterface } -func (c *Consumer) Receive(ctx context.Context) *Event { +func (c *Consumer) Receive(ctx context.Context) *event { return c.driver.Get() } +func (c *Consumer) NewEvent(name string) *event { + return c.driver.NewEvent(name) +} + +func (c *Consumer) Recovery(e *event) { + c.driver.Recovery(e) +} + func NewConsumer(driver DriverInterface) ConsumerInterface { return &Consumer{ driver: driver, diff --git a/pkg/event/define.go b/pkg/event/define.go index 8921901..33faca5 100644 --- a/pkg/event/define.go +++ b/pkg/event/define.go @@ -4,28 +4,30 @@ import ( "context" ) -type eventType int8 - const ( - defaultEventState = eventType(iota) //默认情况下的状态 - waitEventState // 等待状态 - workEventState //工作状态 - closeEventState //事件关闭状态 + defaultEventState = int32(iota) //默认情况下的状态 + waitEventState // 等待状态 + workEventState //工作状态 + closeEventState //事件关闭状态 ) type EventWorkFunc func() (interface{}, error) type DriverInterface interface { - Get() *Event - Put(event *Event) + Get() *event + Put(*event) GetLength() int + NewEvent(string) *event + Recovery(*event) } type ProduceInterface interface { - Call(ctx context.Context, event *Event) + Call(context.Context, *event) + NewEvent(string) *event + Recovery(*event) } type ConsumerInterface interface { - Receive(ctx context.Context) *Event + Receive(ctx context.Context) *event + Recovery(*event) } - diff --git a/pkg/event/driver.go b/pkg/event/driver.go index 764ff22..51244a4 100644 --- a/pkg/event/driver.go +++ b/pkg/event/driver.go @@ -1,139 +1,166 @@ package event import ( - "sync" + "sync/atomic" "time" "gitee.com/timedb/wheatCache/pkg/errorx" ) -type Active func() ([]string, error) // 事件带函数 +// 事件 poll 降低 new 对象的频率 +type eventPoll struct { + poll chan *event + maxSize int32 + nowSize *int32 +} -type Event struct { +func (e *eventPoll) getEvent() *event { + issSize := atomic.LoadInt32(e.nowSize) + if issSize < e.maxSize { + atomic.AddInt32(e.nowSize, 1) + return newEvent() + } + + return <-e.poll +} + +func (e *eventPoll) recovery(rEvent *event) { + rEvent.Reset() + e.poll <- rEvent +} + +func newEventPoll(maxSize int) *eventPoll { + return &eventPoll{ + poll: make(chan *event, maxSize), + maxSize: int32(maxSize), + nowSize: new(int32), + } +} + +type event struct { msgCtx map[string]interface{} eventName string - WorkTime time.Duration // 工作时间 msg map[string]string // 消息 waitResult chan interface{} // 等待返回 err error - ru sync.RWMutex - muClose sync.Mutex //关闭锁 - eventStatus eventType + eventStatus *int32 + ttlManage *time.Timer } -func (e *Event) SetMsg(key string, val string) { - e.ru.Lock() - defer e.ru.Unlock() +func newEvent() *event { + status := defaultEventState + return &event{ + eventStatus: &status, + } +} + +func (e *event) Reset() { + if e.ttlManage != nil { + e.ttlManage.Stop() + } + + e.err = nil + + atomic.SwapInt32(e.eventStatus, defaultEventState) +} + +func (e *event) SetMsg(key string, val string) { if e.msg == nil { e.msg = make(map[string]string) } e.msg[key] = val } -func (e *Event) GetMsg(key string) string { - e.ru.RLock() - defer e.ru.RUnlock() +func (e *event) GetMsg(key string) string { return e.msg[key] } -func (e *Event) GetEventName() string { +func (e *event) GetEventName() string { return e.eventName } // SetValue 写入 ctx 传递用参数 -func (e *Event) SetValue(key string, value interface{}) { - e.ru.Lock() - defer e.ru.Unlock() +func (e *event) SetValue(key string, value interface{}) { if e.msgCtx == nil { e.msgCtx = make(map[string]interface{}) } e.msgCtx[key] = value } -func (e *Event) GetValue(key string) (interface{}, bool) { - e.ru.RLock() - defer e.ru.RUnlock() +func (e *event) GetValue(key string) (interface{}, bool) { val, ok := e.msgCtx[key] return val, ok } // InitWaitEvent 初始化 wait event 必须调用才拥有等待特性 -func (e *Event) InitWaitEvent() { - e.muClose.Lock() - defer e.muClose.Unlock() - e.waitResult = make(chan interface{}) - e.eventStatus = waitEventState +func (e *event) InitWaitEvent() { + if e.waitResult == nil || len(e.waitResult) > 0 { + e.waitResult = make(chan interface{}) + } + + // 清理残留 + if e.ttlManage == nil { + e.ttlManage = time.NewTimer(0) + } + e.ttlManage.Stop() + if len(e.ttlManage.C) > 0 { + <-e.ttlManage.C + } + + atomic.CompareAndSwapInt32(e.eventStatus, defaultEventState, waitEventState) } // StartWaitEvent 开始一个等待任务 -func (e *Event) StartWaitEvent(ttl time.Duration) (interface{}, error) { - t := time.NewTimer(ttl) - select { - case <-t.C: - e.muClose.Lock() - defer e.muClose.Unlock() - if e.eventStatus == workEventState { - return <-e.waitResult, e.err +func (e *event) StartWaitEvent(ttl time.Duration) (interface{}, error) { + e.ttlManage.Reset(ttl) + + for { + select { + case <-e.ttlManage.C: + if atomic.CompareAndSwapInt32(e.eventStatus, waitEventState, closeEventState) { + return nil, errorx.TimeOutErr() + } + continue + + case result := <-e.waitResult: + atomic.CompareAndSwapInt32(e.eventStatus, workEventState, closeEventState) + return result, e.err } - - e.eventStatus = closeEventState - return nil, errorx.TimeOutErr() - - case result := <-e.waitResult: - return result, e.err } } -func (e *Event) ExecWorkAndSendResult(work EventWorkFunc) (interface{}, error) { - e.muClose.Lock() - defer e.muClose.Unlock() - if e.eventStatus != waitEventState { +func (e *event) ExecWorkAndSendResult(work EventWorkFunc) (interface{}, error) { + if !atomic.CompareAndSwapInt32(e.eventStatus, waitEventState, workEventState) { return nil, errorx.New("not wait status, exec err") } - e.eventStatus = workEventState - res, err := work() e.err = err e.waitResult <- res - - close(e.waitResult) - e.eventStatus = closeEventState return res, err } -func (e *Event) SetResultErr(err error) { - e.muClose.Lock() - defer e.muClose.Unlock() - if e.eventStatus != waitEventState { +func (e *event) SetResultErr(err error) { + if !atomic.CompareAndSwapInt32(e.eventStatus, waitEventState, workEventState) { return } - e.eventStatus = workEventState e.err = err e.waitResult <- nil - close(e.waitResult) - e.eventStatus = closeEventState -} - -func NewEvent(eventName string) *Event { - return &Event{ - eventName: eventName, - eventStatus: defaultEventState, - } } type Driver struct { maxQueueSize int - queue chan *Event + queue chan *event + poll *eventPoll } // Get 获取驱动 -func (d *Driver) Get() *Event { +func (d *Driver) Get() *event { return <-d.queue } -func (d *Driver) Put(event *Event) { +func (d *Driver) Put(event *event) { d.queue <- event } @@ -141,10 +168,22 @@ func (d *Driver) GetLength() int { return len(d.queue) } +func (d *Driver) NewEvent(name string) *event { + event := d.poll.getEvent() + event.eventName = name + return event +} + +// 任何时候回收事件都应该由 最后使用者回收 +func (d *Driver) Recovery(e *event) { + d.poll.recovery(e) +} + // NewDriver 新建 Driver func NewDriver(maxSize int) DriverInterface { return &Driver{ maxQueueSize: maxSize, - queue: make(chan *Event, maxSize), + queue: make(chan *event, maxSize), + poll: newEventPoll(maxSize), } } diff --git a/pkg/event/event_test.go b/pkg/event/event_test.go index 8da5b4c..bd61d84 100644 --- a/pkg/event/event_test.go +++ b/pkg/event/event_test.go @@ -3,7 +3,8 @@ package event import ( "context" "fmt" - "gitee.com/timedb/wheatCache/pkg/errorx" + "strconv" + "sync" "testing" "time" @@ -13,87 +14,76 @@ import ( const testEvent = "1001" const waitTestEvent = "1002" -// 简单 非等待响应模式, 使用 event driver -func TestEvent_DriverEventTest(t *testing.T) { - ctx := context.Background() - driver := NewDriver(500) +// 简单的 单向 event 使用 +func Test_EventDriver(t *testing.T) { + driver := NewDriver(2000) produce := NewProduce(driver) consumer := NewConsumer(driver) - go produceEvent(t, ctx, produce) - consumerEvent(t, ctx, consumer) -} - -func produceEvent(t *testing.T, ctx context.Context, v ProduceInterface) { - for i := 0; i < 100; i++ { - event := NewEvent(testEvent) - event.SetValue("test", i) - v.Call(ctx, event) - } -} - -func consumerEvent(t *testing.T, ctx context.Context, v ConsumerInterface) { - for i := 0; i < 100; i++ { - event := v.Receive(ctx) - res, ok := event.GetValue("test") - require.True(t, ok) - fmt.Println(res) - require.Equal(t, res, i) - } -} - -// 响应等待用法 -func TestEvent_SpanWaitEvent(t *testing.T) { ctx := context.Background() - driver := NewDriver(500) - produce := NewProduce(driver) - consumer := NewConsumer(driver) - go waitConsumer(t, ctx, consumer) + wait := sync.WaitGroup{} + wait.Add(30000) - waitProduce(t, ctx, produce) -} - -func waitProduce(t *testing.T, ctx context.Context, v ProduceInterface) { - for i := 0; i < 100; i++ { - event := NewEvent(waitTestEvent) - - event.InitWaitEvent() - event.SetValue("test", i) - v.Call(ctx, event) // 推送给 consumer - res, err := event.StartWaitEvent(2 * time.Second) // 最多等待 consumer 回复 2s - require.NoError(t, err) - require.Equal(t, fmt.Sprintf("test:%v", i), res) - } -} - -func waitConsumer(t *testing.T, ctx context.Context, v ConsumerInterface) { - for i := 0; i < 100; i++ { - event := v.Receive(ctx) // 接受 produce 的 event - res, ok := event.GetValue("test") - require.True(t, ok) - require.Equal(t, res, i) - - // 发送返回值给 produce - event.ExecWorkAndSendResult(func() (interface{}, error) { - return fmt.Sprintf("test:%v", res), nil - }) - } -} - -func TestEvent_SetResultErr(t *testing.T) { - ctx := context.Background() - event := NewEvent("dddd") - driver := NewDriver(100) - produce := NewProduce(driver) - consumer := NewConsumer(driver) go func() { - event := consumer.Receive(ctx) - event.SetResultErr(errorx.New("err")) + for i := 0; i < 30000; i++ { + event := produce.NewEvent(testEvent) + event.SetMsg("k", strconv.Itoa(i)) + produce.Call(ctx, event) + + } }() - event.InitWaitEvent() - produce.Call(ctx, event) - _, err := event.StartWaitEvent(2 * time.Second) - fmt.Println(err) - require.Error(t, err) + + go func() { + for { + event := consumer.Receive(ctx) + fmt.Println(event.GetMsg("k")) + consumer.Recovery(event) + wait.Done() + } + }() + + wait.Wait() + + fmt.Println(*driver.(*Driver).poll.nowSize) +} + +// 双向 event +func Test_WaitEventDriver(t *testing.T) { + driver := NewDriver(200) + produce := NewProduce(driver) + consumer := NewConsumer(driver) + + ctx := context.Background() + + wait := sync.WaitGroup{} + wait.Add(300000) + + go func() { + for i := 0; i < 300000; i++ { + event := produce.NewEvent(testEvent) + event.SetMsg("k", strconv.Itoa(i)) + event.InitWaitEvent() + produce.Call(ctx, event) + val, err := event.StartWaitEvent(2 * time.Second) + require.NoError(t, err) + fmt.Println(val) + produce.Recovery(event) + wait.Done() + } + }() + + go func() { + for { + event := consumer.Receive(ctx) + event.ExecWorkAndSendResult(func() (interface{}, error) { + msg := event.GetMsg("k") + return "hello: " + msg, nil + }) + } + }() + + wait.Wait() + + fmt.Println(*driver.(*Driver).poll.nowSize) } diff --git a/pkg/event/produce.go b/pkg/event/produce.go index e096188..7394c88 100644 --- a/pkg/event/produce.go +++ b/pkg/event/produce.go @@ -6,8 +6,16 @@ type Produce struct { driver DriverInterface } -func (p *Produce) Call(ctx context.Context, event *Event) { - p.driver.Put(event) +func (p *Produce) NewEvent(name string) *event { + return p.driver.NewEvent(name) +} + +func (p *Produce) Recovery(e *event) { + p.driver.Recovery(e) +} + +func (p *Produce) Call(ctx context.Context, e *event) { + p.driver.Put(e) } func NewProduce(driver DriverInterface) ProduceInterface { diff --git a/pkg/lru/lru_test.go b/pkg/lru/lru_test.go index 1a1e3c8..369e42a 100644 --- a/pkg/lru/lru_test.go +++ b/pkg/lru/lru_test.go @@ -61,7 +61,7 @@ func TestNewLRUCache2(t *testing.T) { func TestLruProcess(t *testing.T) { lru := NewLRUCache() - lru.clearSize = 1000 + lru.clearSize = 3600 for i := 100; i < 200; i++ { lru.Add(&proto.BaseKey{ @@ -78,16 +78,17 @@ func TestLruProcess(t *testing.T) { }, stringx.NewStringSingle()) } - require.Equal(t, lru.nowSize, int64(200*8)) + require.Equal(t, lru.nowSize, int64(200*24)) // 自动清理测试 + fmt.Println(lru.clearSize) + require.Equal(t, lru.li.Len(), 200) time.Sleep(3 * time.Second) - fmt.Println(lru.nowSize) require.Less(t, lru.nowSize, lru.clearSize+1) - // TTL 测试 + // TTL 测试, 100-200 key 发生自动清理 留下 50-100 共 100(0-100) + 20个 key,5s 后,前 0-100的 key 过期,剩下 time.Sleep(2 * time.Second) - require.Equal(t, lru.li.Len(), 25) + require.Equal(t, lru.li.Len(), 50) // 过期全部的 Key for i := 100; i < 200; i++ { diff --git a/pkg/lru/ttl_test.go b/pkg/lru/ttl_test.go index 76b8d15..2f67610 100644 --- a/pkg/lru/ttl_test.go +++ b/pkg/lru/ttl_test.go @@ -28,9 +28,9 @@ func Test_LruTTl(t *testing.T) { Key: "990", Ttl: 10, }, s) - require.Equal(t, lru.nowSize, int64(16)) + require.Equal(t, lru.nowSize, int64(48)) time.Sleep(4 * time.Second) - require.Equal(t, lru.nowSize, int64(8)) + require.Equal(t, lru.nowSize, int64(24)) } diff --git a/pkg/lru/woker_test.go b/pkg/lru/woker_test.go index 6009258..887c63c 100644 --- a/pkg/lru/woker_test.go +++ b/pkg/lru/woker_test.go @@ -16,7 +16,7 @@ func TestWorker(t *testing.T) { ctx := context.Background() lru := NewLRUCache() produce := event.NewProduce(lru.GetDriver()) - workEvent := event.NewEvent(OptionEventName) + workEvent := produce.NewEvent(OptionEventName) workEvent.SetValue(WorkFuncEventKey, event.EventWorkFunc(func() (interface{}, error) { v1 := stringx.NewStringSingle() key := proto.BaseKey{ @@ -39,7 +39,7 @@ func TestSingleCache_DelToClearSize(t *testing.T) { produce := event.NewProduce(lru.GetDriver()) for i := int32(20000); i >= 1; i-- { - workEvent := event.NewEvent(OptionEventName) + workEvent := produce.NewEvent(OptionEventName) workEvent.SetValue(WorkFuncEventKey, event.EventWorkFunc(func() (interface{}, error) { v1 := stringx.NewStringSingle() key := proto.BaseKey{ @@ -52,6 +52,7 @@ func TestSingleCache_DelToClearSize(t *testing.T) { workEvent.InitWaitEvent() produce.Call(ctx, workEvent) workEvent.StartWaitEvent(2 * time.Second) + produce.Recovery(workEvent) } time.Sleep(5 * time.Second) diff --git a/pkg/lru/worker.go b/pkg/lru/worker.go index 71c7c23..e0bcebd 100644 --- a/pkg/lru/worker.go +++ b/pkg/lru/worker.go @@ -2,10 +2,11 @@ package lru import ( "context" + "time" + "gitee.com/timedb/wheatCache/pkg/errorx" "gitee.com/timedb/wheatCache/pkg/event" "gitee.com/timedb/wheatCache/pkg/logx" - "time" ) func (lru *SingleCache) lruSingleWork() { @@ -27,7 +28,7 @@ func (lru *SingleCache) lruSingleWork() { case CleanEventName: // 对当前的io数量进行判断 ioNum := lru.GetDriver().GetLength() - if ioNum > lru.lruMaxDiverSize*1/2 { + if ioNum > lru.lruMaxDiverSize/2 { lru.lruCleanProduce.Call(ctx, workEvent) continue } @@ -46,7 +47,6 @@ func (lru *SingleCache) lruSingleWork() { // 执行过期事件 func (lru *SingleCache) lruTtlWork() { - ttlEvent := event.NewEvent(TtlEventName) ctx := context.Background() work := event.EventWorkFunc(func() (interface{}, error) { @@ -66,7 +66,6 @@ func (lru *SingleCache) lruTtlWork() { return nil, nil }) - ttlEvent.SetValue(WorkFuncEventKey, work) cleanTTlTicker := time.NewTicker(500 * time.Millisecond) defer cleanTTlTicker.Stop() @@ -77,16 +76,22 @@ func (lru *SingleCache) lruTtlWork() { select { // 清理事件 case <-cleanTTlTicker.C: + ttlEvent := lru.lruCleanProduce.NewEvent(TtlEventName) + ttlEvent.SetValue(WorkFuncEventKey, work) + if len(lru.lruTtlManage.memoryKey) == 0 { continue } ttlEvent.InitWaitEvent() lru.lruCleanProduce.Call(ctx, ttlEvent) _, err := ttlEvent.StartWaitEvent(time.Second * 2) + lru.lruCleanProduce.Recovery(ttlEvent) + if err != nil { logx.With(ctx, lru.middleProduce).Errorln(err) } + // 收集过期的 key case <-gatherTTlTicker.C: lru.lruTtlManage.ttlKeyToMemoryBySecond() } @@ -95,24 +100,27 @@ func (lru *SingleCache) lruTtlWork() { func (lru *SingleCache) cleanWork() { cxt := context.Background() - lruCleanEvent := event.NewEvent(CleanEventName) work := event.EventWorkFunc(func() (interface{}, error) { err := lru.DelToClearSize() return nil, err }) - lruCleanEvent.SetValue(WorkFuncEventKey, work) - for { - time.Sleep(2 * time.Second) + time.Sleep(2 * time.Second) if lru.clearSize < lru.nowSize { + lruCleanEvent := lru.lruCleanProduce.NewEvent(CleanEventName) + lruCleanEvent.SetValue(WorkFuncEventKey, work) + lruCleanEvent.InitWaitEvent() lru.lruCleanProduce.Call(cxt, lruCleanEvent) _, err := lruCleanEvent.StartWaitEvent(defaultWaitTime) if err != nil { logx.With(cxt, lru.middleProduce).Errorln(err) } + + // 归还 + lru.lruCleanProduce.Recovery(lruCleanEvent) } } } diff --git a/pkg/middle-msg/define.go b/pkg/middle-msg/define.go index aed2e29..dd3c3a9 100644 --- a/pkg/middle-msg/define.go +++ b/pkg/middle-msg/define.go @@ -33,7 +33,7 @@ func SendMiddleMsg( eventName = PulginsInfosName } - msgEvent := event.NewEvent(eventName) + msgEvent := middleProduce.NewEvent(eventName) msgEvent.SetValue(MiddleMsgKey, val) middleProduce.Call(ctx, msgEvent) return nil diff --git a/pkg/middle/worker.go b/pkg/middle/worker.go index 86a85da..6673e33 100644 --- a/pkg/middle/worker.go +++ b/pkg/middle/worker.go @@ -16,6 +16,8 @@ func (m *MiddleWare) startWork() { workEvent := m.eventConsumer.Receive(ctx) plugs := m.plugins[workEvent.GetEventName()] msg, ok := workEvent.GetValue(middleMsg.MiddleMsgKey) + m.eventConsumer.Recovery(workEvent) + if !ok { logx.With(ctx, m.eventProduce).Error("get event value err,not key:%s", middleMsg.MiddleMsgKey) continue diff --git a/storage/server/single/listx.go b/storage/server/single/listx.go index ffc505b..086d381 100644 --- a/storage/server/single/listx.go +++ b/storage/server/single/listx.go @@ -16,11 +16,12 @@ func (s *serverSingle) LIndex( return s.dao.LINdex(req.Key, req.Index) }) - lruEvent := event.NewEvent(lru.OptionEventName) + lruEvent := s.lruProduce.NewEvent(lru.OptionEventName) lruEvent.InitWaitEvent() lruEvent.SetValue(lru.WorkFuncEventKey, work) s.lruProduce.Call(ctx, lruEvent) resp, err := lruEvent.StartWaitEvent(s.timeOut) + s.lruProduce.Recovery(lruEvent) if err != nil { return nil, err } @@ -38,11 +39,12 @@ func (s *serverSingle) LLen( return s.dao.LLen(req.Key) }) - lruEvent := event.NewEvent(lru.OptionEventName) + lruEvent := s.lruProduce.NewEvent(lru.OptionEventName) lruEvent.InitWaitEvent() lruEvent.SetValue(lru.WorkFuncEventKey, work) s.lruProduce.Call(ctx, lruEvent) resp, err := lruEvent.StartWaitEvent(s.timeOut) + s.lruProduce.Recovery(lruEvent) if err != nil { return nil, err } @@ -60,11 +62,12 @@ func (s *serverSingle) LPop( return s.dao.LPop(request.Key, request.Count) }) - lruEvent := event.NewEvent(lru.OptionEventName) + lruEvent := s.lruProduce.NewEvent(lru.OptionEventName) lruEvent.InitWaitEvent() lruEvent.SetValue(lru.WorkFuncEventKey, work) s.lruProduce.Call(ctx, lruEvent) resp, err := lruEvent.StartWaitEvent(s.timeOut) + s.lruProduce.Recovery(lruEvent) if err != nil { return nil, err } @@ -82,11 +85,13 @@ func (s *serverSingle) LPush( return nil, s.dao.LPush(req.Key, req.Values...) }) - lruEvent := event.NewEvent(lru.OptionEventName) + lruEvent := s.lruProduce.NewEvent(lru.OptionEventName) lruEvent.InitWaitEvent() lruEvent.SetValue(lru.WorkFuncEventKey, work) s.lruProduce.Call(ctx, lruEvent) _, err := lruEvent.StartWaitEvent(s.timeOut) + s.lruProduce.Recovery(lruEvent) + if err != nil { return nil, err } @@ -102,11 +107,13 @@ func (s *serverSingle) LPushX( return nil, s.dao.LPush(req.Key, req.Values...) }) - lruEvent := event.NewEvent(lru.OptionEventName) + lruEvent := s.lruProduce.NewEvent(lru.OptionEventName) lruEvent.InitWaitEvent() lruEvent.SetValue(lru.WorkFuncEventKey, work) s.lruProduce.Call(ctx, lruEvent) _, err := lruEvent.StartWaitEvent(s.timeOut) + s.lruProduce.Recovery(lruEvent) + if err != nil { return nil, err } @@ -122,11 +129,13 @@ func (s *serverSingle) LRange( return s.dao.LRange(req.Key, req.Start, req.End) }) - lruEvent := event.NewEvent(lru.OptionEventName) + lruEvent := s.lruProduce.NewEvent(lru.OptionEventName) lruEvent.InitWaitEvent() lruEvent.SetValue(lru.WorkFuncEventKey, work) s.lruProduce.Call(ctx, lruEvent) resp, err := lruEvent.StartWaitEvent(s.timeOut) + s.lruProduce.Recovery(lruEvent) + if err != nil { return nil, err } @@ -144,11 +153,13 @@ func (s *serverSingle) LRem( return s.dao.LRemove(req.Key, req.Count, req.Value) }) - lruEvent := event.NewEvent(lru.OptionEventName) + lruEvent := s.lruProduce.NewEvent(lru.OptionEventName) lruEvent.InitWaitEvent() lruEvent.SetValue(lru.WorkFuncEventKey, work) s.lruProduce.Call(ctx, lruEvent) resp, err := lruEvent.StartWaitEvent(s.timeOut) + s.lruProduce.Recovery(lruEvent) + if err != nil { return nil, err } @@ -166,11 +177,13 @@ func (s *serverSingle) LSet( return nil, s.dao.LSet(req.Key, req.Index, req.Value) }) - lruEvent := event.NewEvent(lru.OptionEventName) + lruEvent := s.lruProduce.NewEvent(lru.OptionEventName) lruEvent.InitWaitEvent() lruEvent.SetValue(lru.WorkFuncEventKey, work) s.lruProduce.Call(ctx, lruEvent) _, err := lruEvent.StartWaitEvent(s.timeOut) + s.lruProduce.Recovery(lruEvent) + if err != nil { return nil, err } @@ -186,11 +199,13 @@ func (s *serverSingle) RPop( return s.dao.RPop(req.Key, req.Count) }) - lruEvent := event.NewEvent(lru.OptionEventName) + lruEvent := s.lruProduce.NewEvent(lru.OptionEventName) lruEvent.InitWaitEvent() lruEvent.SetValue(lru.WorkFuncEventKey, work) s.lruProduce.Call(ctx, lruEvent) resp, err := lruEvent.StartWaitEvent(s.timeOut) + s.lruProduce.Recovery(lruEvent) + if err != nil { return nil, err } @@ -208,11 +223,13 @@ func (s *serverSingle) LTrim( return nil, s.dao.LTrim(req.Key, req.Start, req.End) }) - lruEvent := event.NewEvent(lru.OptionEventName) + lruEvent := s.lruProduce.NewEvent(lru.OptionEventName) lruEvent.InitWaitEvent() lruEvent.SetValue(lru.WorkFuncEventKey, work) s.lruProduce.Call(ctx, lruEvent) _, err := lruEvent.StartWaitEvent(s.timeOut) + s.lruProduce.Recovery(lruEvent) + if err != nil { return nil, err } @@ -228,11 +245,13 @@ func (s *serverSingle) RPush( return nil, s.dao.RPush(req.Key, req.Values...) }) - lruEvent := event.NewEvent(lru.OptionEventName) + lruEvent := s.lruProduce.NewEvent(lru.OptionEventName) lruEvent.InitWaitEvent() lruEvent.SetValue(lru.WorkFuncEventKey, work) s.lruProduce.Call(ctx, lruEvent) _, err := lruEvent.StartWaitEvent(s.timeOut) + s.lruProduce.Recovery(lruEvent) + if err != nil { return nil, err } @@ -248,11 +267,13 @@ func (s *serverSingle) RPushX( return nil, s.dao.RPushX(req.Key, req.Values...) }) - lruEvent := event.NewEvent(lru.OptionEventName) + lruEvent := s.lruProduce.NewEvent(lru.OptionEventName) lruEvent.InitWaitEvent() lruEvent.SetValue(lru.WorkFuncEventKey, work) s.lruProduce.Call(ctx, lruEvent) _, err := lruEvent.StartWaitEvent(s.timeOut) + s.lruProduce.Recovery(lruEvent) + if err != nil { return nil, err } diff --git a/storage/server/single/stringx.go b/storage/server/single/stringx.go index e5326a1..090fad5 100644 --- a/storage/server/single/stringx.go +++ b/storage/server/single/stringx.go @@ -17,11 +17,13 @@ func (s *serverSingle) Set( return s.dao.Set(req.Key, req.Val) }) - lruEvent := event.NewEvent(lru.OptionEventName) + lruEvent := s.lruProduce.NewEvent(lru.OptionEventName) lruEvent.InitWaitEvent() lruEvent.SetValue(lru.WorkFuncEventKey, work) s.lruProduce.Call(ctx, lruEvent) resp, err := lruEvent.StartWaitEvent(s.timeOut) + s.lruProduce.Recovery(lruEvent) + if err != nil { return nil, err } @@ -39,11 +41,13 @@ func (s *serverSingle) Get( return s.dao.Get(req.Key) }) - lruEvent := event.NewEvent(lru.OptionEventName) + lruEvent := s.lruProduce.NewEvent(lru.OptionEventName) lruEvent.InitWaitEvent() lruEvent.SetValue(lru.WorkFuncEventKey, work) s.lruProduce.Call(cxt, lruEvent) resp, err := lruEvent.StartWaitEvent(s.timeOut) + s.lruProduce.Recovery(lruEvent) + if err != nil { return nil, err } @@ -60,11 +64,13 @@ func (s serverSingle) Add( return s.dao.Add(req.Key, req.Renewal) }) - lruEvent := event.NewEvent(lru.OptionEventName) + lruEvent := s.lruProduce.NewEvent(lru.OptionEventName) lruEvent.InitWaitEvent() lruEvent.SetValue(lru.WorkFuncEventKey, work) s.lruProduce.Call(cxt, lruEvent) resp, err := lruEvent.StartWaitEvent(s.timeOut) + s.lruProduce.Recovery(lruEvent) + if err != nil { return nil, err } @@ -81,11 +87,13 @@ func (s *serverSingle) Reduce( return s.dao.Add(req.Key, req.Renewal) }) - lruEvent := event.NewEvent(lru.OptionEventName) + lruEvent := s.lruProduce.NewEvent(lru.OptionEventName) lruEvent.InitWaitEvent() lruEvent.SetValue(lru.WorkFuncEventKey, work) s.lruProduce.Call(cxt, lruEvent) resp, err := lruEvent.StartWaitEvent(s.timeOut) + s.lruProduce.Recovery(lruEvent) + if err != nil { return nil, err } @@ -102,11 +110,13 @@ func (s *serverSingle) SetBit( return nil, s.dao.Setbit(req.Key, req.Val, req.Offer) }) - lruEvent := event.NewEvent(lru.OptionEventName) + lruEvent := s.lruProduce.NewEvent(lru.OptionEventName) lruEvent.InitWaitEvent() lruEvent.SetValue(lru.WorkFuncEventKey, work) s.lruProduce.Call(cxt, lruEvent) _, err := lruEvent.StartWaitEvent(s.timeOut) + s.lruProduce.Recovery(lruEvent) + if err != nil { return nil, err } @@ -121,11 +131,13 @@ func (s *serverSingle) GetBit( return s.dao.GetBit(req.Key, req.Offer) }) - lruEvent := event.NewEvent(lru.OptionEventName) + lruEvent := s.lruProduce.NewEvent(lru.OptionEventName) lruEvent.InitWaitEvent() lruEvent.SetValue(lru.WorkFuncEventKey, work) s.lruProduce.Call(cxt, lruEvent) flag, err := lruEvent.StartWaitEvent(s.timeOut) + s.lruProduce.Recovery(lruEvent) + if err != nil { return nil, err } @@ -142,11 +154,13 @@ func (s *serverSingle) GetRange( return s.dao.Getrange(req.Key, req.Start, req.End) }) - lruEvent := event.NewEvent(lru.OptionEventName) + lruEvent := s.lruProduce.NewEvent(lru.OptionEventName) lruEvent.InitWaitEvent() lruEvent.SetValue(lru.WorkFuncEventKey, work) s.lruProduce.Call(ctx, lruEvent) flag, err := lruEvent.StartWaitEvent(s.timeOut) + s.lruProduce.Recovery(lruEvent) + if err != nil { return nil, err } @@ -163,11 +177,13 @@ func (s *serverSingle) GetSet( return s.dao.Getset(req.Key, req.Val) }) - lruEvent := event.NewEvent(lru.OptionEventName) + lruEvent := s.lruProduce.NewEvent(lru.OptionEventName) lruEvent.InitWaitEvent() lruEvent.SetValue(lru.WorkFuncEventKey, work) s.lruProduce.Call(ctx, lruEvent) result, err := lruEvent.StartWaitEvent(s.timeOut) + s.lruProduce.Recovery(lruEvent) + if err != nil { return nil, err } @@ -184,11 +200,13 @@ func (s *serverSingle) StrLen( return s.dao.Strlen(req.Key) }) - lruEvent := event.NewEvent(lru.OptionEventName) + lruEvent := s.lruProduce.NewEvent(lru.OptionEventName) lruEvent.InitWaitEvent() lruEvent.SetValue(lru.WorkFuncEventKey, work) s.lruProduce.Call(ctx, lruEvent) flag, err := lruEvent.StartWaitEvent(s.timeOut) + s.lruProduce.Recovery(lruEvent) + if err != nil { return nil, err } @@ -205,11 +223,13 @@ func (s *serverSingle) Setnx( return nil, s.dao.Setnx(req.Key, req.Val) }) - lruEvent := event.NewEvent(lru.OptionEventName) + lruEvent := s.lruProduce.NewEvent(lru.OptionEventName) lruEvent.InitWaitEvent() lruEvent.SetValue(lru.WorkFuncEventKey, work) s.lruProduce.Call(ctx, lruEvent) _, err := lruEvent.StartWaitEvent(s.timeOut) + s.lruProduce.Recovery(lruEvent) + if err != nil { return nil, err }