From 2f81eab9819ed77e1542f7f3438925e9edb5eed6 Mon Sep 17 00:00:00 2001 From: bandl <1658002533@qq.com> Date: Thu, 11 Nov 2021 22:39:50 +0800 Subject: [PATCH] feat(lru, single service): update event2 --- pkg/event2/driver.go | 9 +++++++++ pkg/lru/worker.go | 19 ++++++++++--------- storage/service/single.go | 5 +++-- 3 files changed, 22 insertions(+), 11 deletions(-) diff --git a/pkg/event2/driver.go b/pkg/event2/driver.go index 49d57fc..db61c83 100644 --- a/pkg/event2/driver.go +++ b/pkg/event2/driver.go @@ -82,6 +82,15 @@ func (e *event) InitWaitEvent() { atomic.SwapInt32(&e.eventStatus, waitEventState) } +func (e *event) SetResultErr(err error) { + if !atomic.CompareAndSwapInt32(&e.eventStatus, waitEventState, workEventState) { + return + } + + e.err = err + e.waitResult <- nil +} + // StartWaitEvent 开始一个等待任务 func (e *event) StartWaitEvent(ttl time.Duration) (interface{}, error) { if e.ttlManage == nil { diff --git a/pkg/lru/worker.go b/pkg/lru/worker.go index 674d5e1..bdcde92 100644 --- a/pkg/lru/worker.go +++ b/pkg/lru/worker.go @@ -6,6 +6,7 @@ import ( "gitee.com/wheat-os/wheatCache/pkg/errorx" "gitee.com/wheat-os/wheatCache/pkg/event" + "gitee.com/wheat-os/wheatCache/pkg/event2" "gitee.com/wheat-os/wheatCache/pkg/logx" mMsg "gitee.com/wheat-os/wheatCache/pkg/middle-msg" ) @@ -14,7 +15,7 @@ func (lru *SingleCache) lruSingleWork() { ctx := context.Background() for { workEvent := lru.lruConsumer.Receive(ctx) - workFunc, ok := workEvent.GetValue(WorkFuncEventKey) + workFunc, ok := workEvent.GetValue(event2.WorkFuncEventKey) if !ok { workEvent.SetResultErr(errorx.LruNotWorkFuncEventErr()) continue @@ -22,7 +23,7 @@ func (lru *SingleCache) lruSingleWork() { switch workEvent.GetEventName() { case OptionEventName: - if work, ok := workFunc.(event.EventWorkFunc); ok { + if work, ok := workFunc.(event2.EventWorkFunc); ok { workEvent.ExecWorkAndSendResult(work) } @@ -33,12 +34,12 @@ func (lru *SingleCache) lruSingleWork() { lru.lruCleanProduce.Call(ctx, workEvent) continue } - if work, ok := workFunc.(event.EventWorkFunc); ok { + if work, ok := workFunc.(event2.EventWorkFunc); ok { workEvent.ExecWorkAndSendResult(work) } case TtlEventName: - if work, ok := workFunc.(event.EventWorkFunc); ok { + if work, ok := workFunc.(event2.EventWorkFunc); ok { workEvent.ExecWorkAndSendResult(work) } } @@ -52,7 +53,7 @@ func (lru *SingleCache) lruTtlWork() { // 清理事件 go func() { - work := event.EventWorkFunc(func() (interface{}, error) { + work := event2.EventWorkFunc(func() (interface{}, error) { beforeTime := time.Now().Unix() cle := lru.lruTtlManage.detachNum @@ -80,12 +81,12 @@ func (lru *SingleCache) lruTtlWork() { } ttlEvent := lru.lruCleanProduce.NewEvent(TtlEventName) - ttlEvent.SetValue(WorkFuncEventKey, work) + ttlEvent.SetValue(event2.WorkFuncEventKey, work) ttlEvent.InitWaitEvent() lru.lruCleanProduce.Call(ctx, ttlEvent) keys, err := ttlEvent.StartWaitEvent(time.Second * 2) - lru.lruCleanProduce.Recovery(ttlEvent) + ttlEvent.Recovery() mMsg.SendMiddleMsg(ctx, lru.middleProduce, mMsg.LruTTlContext{ Keys: keys.([]string), @@ -118,7 +119,7 @@ func (lru *SingleCache) cleanWork() { time.Sleep(2 * time.Second) if lru.clearSize < lru.nowSize { lruCleanEvent := lru.lruCleanProduce.NewEvent(CleanEventName) - lruCleanEvent.SetValue(WorkFuncEventKey, work) + lruCleanEvent.SetValue(event2.WorkFuncEventKey, work) lruCleanEvent.InitWaitEvent() lru.lruCleanProduce.Call(cxt, lruCleanEvent) @@ -128,7 +129,7 @@ func (lru *SingleCache) cleanWork() { } // 归还 - lru.lruCleanProduce.Recovery(lruCleanEvent) + lruCleanEvent.Recovery() } } } diff --git a/storage/service/single.go b/storage/service/single.go index aca1922..2c96d59 100644 --- a/storage/service/single.go +++ b/storage/service/single.go @@ -7,6 +7,7 @@ import ( "time" "gitee.com/wheat-os/wheatCache/pkg/event" + "gitee.com/wheat-os/wheatCache/pkg/event2" "gitee.com/wheat-os/wheatCache/pkg/logx" "gitee.com/wheat-os/wheatCache/pkg/lru" "gitee.com/wheat-os/wheatCache/pkg/middle" @@ -18,7 +19,7 @@ import ( type singleService struct { middleProduce event.ProduceInterface - lruProduce event.ProduceInterface + lruProduce event2.ProduceInterface timeOut time.Duration lruCache *lru.SingleCache dao dao.Interface @@ -100,7 +101,7 @@ func NewSingleServer() proto.CommServerServer { ser := &singleService{ lruCache: lruCache, - lruProduce: event.NewProduce(lruCache.GetDriver()), + lruProduce: event2.NewProduce(lruCache.GetDriver()), timeOut: time.Duration(timeOut) * time.Second, dao: dao, middleProduce: event.NewProduce(middle.NewMiddleWare().GetEventDriver()),