feat(lru, single service): update event2

This commit is contained in:
bandl 2021-11-11 22:39:50 +08:00
parent 3eb515325d
commit 2f81eab981
3 changed files with 22 additions and 11 deletions

View File

@ -82,6 +82,15 @@ func (e *event) InitWaitEvent() {
atomic.SwapInt32(&e.eventStatus, waitEventState) atomic.SwapInt32(&e.eventStatus, waitEventState)
} }
func (e *event) SetResultErr(err error) {
if !atomic.CompareAndSwapInt32(&e.eventStatus, waitEventState, workEventState) {
return
}
e.err = err
e.waitResult <- nil
}
// StartWaitEvent 开始一个等待任务 // StartWaitEvent 开始一个等待任务
func (e *event) StartWaitEvent(ttl time.Duration) (interface{}, error) { func (e *event) StartWaitEvent(ttl time.Duration) (interface{}, error) {
if e.ttlManage == nil { if e.ttlManage == nil {

View File

@ -6,6 +6,7 @@ import (
"gitee.com/wheat-os/wheatCache/pkg/errorx" "gitee.com/wheat-os/wheatCache/pkg/errorx"
"gitee.com/wheat-os/wheatCache/pkg/event" "gitee.com/wheat-os/wheatCache/pkg/event"
"gitee.com/wheat-os/wheatCache/pkg/event2"
"gitee.com/wheat-os/wheatCache/pkg/logx" "gitee.com/wheat-os/wheatCache/pkg/logx"
mMsg "gitee.com/wheat-os/wheatCache/pkg/middle-msg" mMsg "gitee.com/wheat-os/wheatCache/pkg/middle-msg"
) )
@ -14,7 +15,7 @@ func (lru *SingleCache) lruSingleWork() {
ctx := context.Background() ctx := context.Background()
for { for {
workEvent := lru.lruConsumer.Receive(ctx) workEvent := lru.lruConsumer.Receive(ctx)
workFunc, ok := workEvent.GetValue(WorkFuncEventKey) workFunc, ok := workEvent.GetValue(event2.WorkFuncEventKey)
if !ok { if !ok {
workEvent.SetResultErr(errorx.LruNotWorkFuncEventErr()) workEvent.SetResultErr(errorx.LruNotWorkFuncEventErr())
continue continue
@ -22,7 +23,7 @@ func (lru *SingleCache) lruSingleWork() {
switch workEvent.GetEventName() { switch workEvent.GetEventName() {
case OptionEventName: case OptionEventName:
if work, ok := workFunc.(event.EventWorkFunc); ok { if work, ok := workFunc.(event2.EventWorkFunc); ok {
workEvent.ExecWorkAndSendResult(work) workEvent.ExecWorkAndSendResult(work)
} }
@ -33,12 +34,12 @@ func (lru *SingleCache) lruSingleWork() {
lru.lruCleanProduce.Call(ctx, workEvent) lru.lruCleanProduce.Call(ctx, workEvent)
continue continue
} }
if work, ok := workFunc.(event.EventWorkFunc); ok { if work, ok := workFunc.(event2.EventWorkFunc); ok {
workEvent.ExecWorkAndSendResult(work) workEvent.ExecWorkAndSendResult(work)
} }
case TtlEventName: case TtlEventName:
if work, ok := workFunc.(event.EventWorkFunc); ok { if work, ok := workFunc.(event2.EventWorkFunc); ok {
workEvent.ExecWorkAndSendResult(work) workEvent.ExecWorkAndSendResult(work)
} }
} }
@ -52,7 +53,7 @@ func (lru *SingleCache) lruTtlWork() {
// 清理事件 // 清理事件
go func() { go func() {
work := event.EventWorkFunc(func() (interface{}, error) { work := event2.EventWorkFunc(func() (interface{}, error) {
beforeTime := time.Now().Unix() beforeTime := time.Now().Unix()
cle := lru.lruTtlManage.detachNum cle := lru.lruTtlManage.detachNum
@ -80,12 +81,12 @@ func (lru *SingleCache) lruTtlWork() {
} }
ttlEvent := lru.lruCleanProduce.NewEvent(TtlEventName) ttlEvent := lru.lruCleanProduce.NewEvent(TtlEventName)
ttlEvent.SetValue(WorkFuncEventKey, work) ttlEvent.SetValue(event2.WorkFuncEventKey, work)
ttlEvent.InitWaitEvent() ttlEvent.InitWaitEvent()
lru.lruCleanProduce.Call(ctx, ttlEvent) lru.lruCleanProduce.Call(ctx, ttlEvent)
keys, err := ttlEvent.StartWaitEvent(time.Second * 2) keys, err := ttlEvent.StartWaitEvent(time.Second * 2)
lru.lruCleanProduce.Recovery(ttlEvent) ttlEvent.Recovery()
mMsg.SendMiddleMsg(ctx, lru.middleProduce, mMsg.LruTTlContext{ mMsg.SendMiddleMsg(ctx, lru.middleProduce, mMsg.LruTTlContext{
Keys: keys.([]string), Keys: keys.([]string),
@ -118,7 +119,7 @@ func (lru *SingleCache) cleanWork() {
time.Sleep(2 * time.Second) time.Sleep(2 * time.Second)
if lru.clearSize < lru.nowSize { if lru.clearSize < lru.nowSize {
lruCleanEvent := lru.lruCleanProduce.NewEvent(CleanEventName) lruCleanEvent := lru.lruCleanProduce.NewEvent(CleanEventName)
lruCleanEvent.SetValue(WorkFuncEventKey, work) lruCleanEvent.SetValue(event2.WorkFuncEventKey, work)
lruCleanEvent.InitWaitEvent() lruCleanEvent.InitWaitEvent()
lru.lruCleanProduce.Call(cxt, lruCleanEvent) lru.lruCleanProduce.Call(cxt, lruCleanEvent)
@ -128,7 +129,7 @@ func (lru *SingleCache) cleanWork() {
} }
// 归还 // 归还
lru.lruCleanProduce.Recovery(lruCleanEvent) lruCleanEvent.Recovery()
} }
} }
} }

View File

@ -7,6 +7,7 @@ import (
"time" "time"
"gitee.com/wheat-os/wheatCache/pkg/event" "gitee.com/wheat-os/wheatCache/pkg/event"
"gitee.com/wheat-os/wheatCache/pkg/event2"
"gitee.com/wheat-os/wheatCache/pkg/logx" "gitee.com/wheat-os/wheatCache/pkg/logx"
"gitee.com/wheat-os/wheatCache/pkg/lru" "gitee.com/wheat-os/wheatCache/pkg/lru"
"gitee.com/wheat-os/wheatCache/pkg/middle" "gitee.com/wheat-os/wheatCache/pkg/middle"
@ -18,7 +19,7 @@ import (
type singleService struct { type singleService struct {
middleProduce event.ProduceInterface middleProduce event.ProduceInterface
lruProduce event.ProduceInterface lruProduce event2.ProduceInterface
timeOut time.Duration timeOut time.Duration
lruCache *lru.SingleCache lruCache *lru.SingleCache
dao dao.Interface dao dao.Interface
@ -100,7 +101,7 @@ func NewSingleServer() proto.CommServerServer {
ser := &singleService{ ser := &singleService{
lruCache: lruCache, lruCache: lruCache,
lruProduce: event.NewProduce(lruCache.GetDriver()), lruProduce: event2.NewProduce(lruCache.GetDriver()),
timeOut: time.Duration(timeOut) * time.Second, timeOut: time.Duration(timeOut) * time.Second,
dao: dao, dao: dao,
middleProduce: event.NewProduce(middle.NewMiddleWare().GetEventDriver()), middleProduce: event.NewProduce(middle.NewMiddleWare().GetEventDriver()),