forked from p93542168/wheat-cache
feat(lru-worker): event recovery
This commit is contained in:
parent
46b029b339
commit
db615609cd
|
@ -2,10 +2,11 @@ package lru
|
|||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"gitee.com/timedb/wheatCache/pkg/errorx"
|
||||
"gitee.com/timedb/wheatCache/pkg/event"
|
||||
"gitee.com/timedb/wheatCache/pkg/logx"
|
||||
"time"
|
||||
)
|
||||
|
||||
func (lru *SingleCache) lruSingleWork() {
|
||||
|
@ -27,7 +28,7 @@ func (lru *SingleCache) lruSingleWork() {
|
|||
case CleanEventName:
|
||||
// 对当前的io数量进行判断
|
||||
ioNum := lru.GetDriver().GetLength()
|
||||
if ioNum > lru.lruMaxDiverSize*1/2 {
|
||||
if ioNum > lru.lruMaxDiverSize/2 {
|
||||
lru.lruCleanProduce.Call(ctx, workEvent)
|
||||
continue
|
||||
}
|
||||
|
@ -46,7 +47,6 @@ func (lru *SingleCache) lruSingleWork() {
|
|||
// 执行过期事件
|
||||
func (lru *SingleCache) lruTtlWork() {
|
||||
|
||||
ttlEvent := event.NewEvent(TtlEventName)
|
||||
ctx := context.Background()
|
||||
work := event.EventWorkFunc(func() (interface{}, error) {
|
||||
|
||||
|
@ -66,7 +66,6 @@ func (lru *SingleCache) lruTtlWork() {
|
|||
|
||||
return nil, nil
|
||||
})
|
||||
ttlEvent.SetValue(WorkFuncEventKey, work)
|
||||
|
||||
cleanTTlTicker := time.NewTicker(500 * time.Millisecond)
|
||||
defer cleanTTlTicker.Stop()
|
||||
|
@ -77,16 +76,22 @@ func (lru *SingleCache) lruTtlWork() {
|
|||
select {
|
||||
// 清理事件
|
||||
case <-cleanTTlTicker.C:
|
||||
ttlEvent := lru.lruCleanProduce.NewEvent(TtlEventName)
|
||||
ttlEvent.SetValue(WorkFuncEventKey, work)
|
||||
|
||||
if len(lru.lruTtlManage.memoryKey) == 0 {
|
||||
continue
|
||||
}
|
||||
ttlEvent.InitWaitEvent()
|
||||
lru.lruCleanProduce.Call(ctx, ttlEvent)
|
||||
_, err := ttlEvent.StartWaitEvent(time.Second * 2)
|
||||
lru.lruCleanProduce.Recovery(ttlEvent)
|
||||
|
||||
if err != nil {
|
||||
logx.With(ctx, lru.middleProduce).Errorln(err)
|
||||
}
|
||||
|
||||
// 收集过期的 key
|
||||
case <-gatherTTlTicker.C:
|
||||
lru.lruTtlManage.ttlKeyToMemoryBySecond()
|
||||
}
|
||||
|
@ -95,24 +100,27 @@ func (lru *SingleCache) lruTtlWork() {
|
|||
|
||||
func (lru *SingleCache) cleanWork() {
|
||||
cxt := context.Background()
|
||||
lruCleanEvent := event.NewEvent(CleanEventName)
|
||||
work := event.EventWorkFunc(func() (interface{}, error) {
|
||||
err := lru.DelToClearSize()
|
||||
return nil, err
|
||||
})
|
||||
|
||||
lruCleanEvent.SetValue(WorkFuncEventKey, work)
|
||||
|
||||
for {
|
||||
time.Sleep(2 * time.Second)
|
||||
|
||||
time.Sleep(2 * time.Second)
|
||||
if lru.clearSize < lru.nowSize {
|
||||
lruCleanEvent := lru.lruCleanProduce.NewEvent(CleanEventName)
|
||||
lruCleanEvent.SetValue(WorkFuncEventKey, work)
|
||||
|
||||
lruCleanEvent.InitWaitEvent()
|
||||
lru.lruCleanProduce.Call(cxt, lruCleanEvent)
|
||||
_, err := lruCleanEvent.StartWaitEvent(defaultWaitTime)
|
||||
if err != nil {
|
||||
logx.With(cxt, lru.middleProduce).Errorln(err)
|
||||
}
|
||||
|
||||
// 归还
|
||||
lru.lruCleanProduce.Recovery(lruCleanEvent)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue