forked from p53841790/wheat-cache
119 lines
2.7 KiB
Go
119 lines
2.7 KiB
Go
package lru
|
|
|
|
import (
|
|
"context"
|
|
"gitee.com/timedb/wheatCache/pkg/errorx"
|
|
"gitee.com/timedb/wheatCache/pkg/event"
|
|
"gitee.com/timedb/wheatCache/pkg/logx"
|
|
"time"
|
|
)
|
|
|
|
func (lru *SingleCache) lruSingleWork() {
|
|
ctx := context.Background()
|
|
for {
|
|
workEvent := lru.lruConsumer.Receive(ctx)
|
|
workFunc, ok := workEvent.GetValue(WorkFuncEventKey)
|
|
if !ok {
|
|
workEvent.SetResultErr(errorx.LruNotWorkFuncEventErr())
|
|
continue
|
|
}
|
|
|
|
switch workEvent.GetEventName() {
|
|
case OptionEventName:
|
|
if work, ok := workFunc.(event.EventWorkFunc); ok {
|
|
workEvent.ExecWorkAndSendResult(work)
|
|
}
|
|
|
|
case CleanEventName:
|
|
// 对当前的io数量进行判断
|
|
ioNum := lru.GetDriver().GetLength()
|
|
if ioNum > lru.lruMaxDiverSize*1/2 {
|
|
lru.lruCleanProduce.Call(ctx, workEvent)
|
|
continue
|
|
}
|
|
if work, ok := workFunc.(event.EventWorkFunc); ok {
|
|
workEvent.ExecWorkAndSendResult(work)
|
|
}
|
|
|
|
case TtlEventName:
|
|
if work, ok := workFunc.(event.EventWorkFunc); ok {
|
|
workEvent.ExecWorkAndSendResult(work)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// 执行过期事件
|
|
func (lru *SingleCache) lruTtlWork() {
|
|
|
|
ttlEvent := event.NewEvent(TtlEventName)
|
|
ctx := context.Background()
|
|
work := event.EventWorkFunc(func() (interface{}, error) {
|
|
|
|
beforeTime := time.Now().Unix()
|
|
cle := lru.lruTtlManage.detachNum
|
|
if cle > len(lru.lruTtlManage.memoryKey) {
|
|
cle = len(lru.lruTtlManage.memoryKey)
|
|
}
|
|
|
|
// TODO send keys to middle
|
|
keys := make([]string, 0)
|
|
for i := 0; i < cle; i++ {
|
|
key := <-lru.lruTtlManage.memoryKey
|
|
keys = append(keys, key)
|
|
lru.delByKeyAndExTtl(key, beforeTime)
|
|
}
|
|
|
|
return nil, nil
|
|
})
|
|
ttlEvent.SetValue(WorkFuncEventKey, work)
|
|
|
|
cleanTTlTicker := time.NewTicker(500 * time.Millisecond)
|
|
defer cleanTTlTicker.Stop()
|
|
gatherTTlTicker := time.NewTicker(1 * time.Millisecond)
|
|
defer gatherTTlTicker.Stop()
|
|
|
|
for {
|
|
select {
|
|
// 清理事件
|
|
case <-cleanTTlTicker.C:
|
|
if len(lru.lruTtlManage.memoryKey) == 0 {
|
|
continue
|
|
}
|
|
ttlEvent.InitWaitEvent()
|
|
lru.lruCleanProduce.Call(ctx, ttlEvent)
|
|
_, err := ttlEvent.StartWaitEvent(time.Second * 2)
|
|
if err != nil {
|
|
logx.With(ctx, lru.middleProduce).Errorln(err)
|
|
}
|
|
|
|
case <-gatherTTlTicker.C:
|
|
lru.lruTtlManage.ttlKeyToMemoryBySecond()
|
|
}
|
|
}
|
|
}
|
|
|
|
func (lru *SingleCache) cleanWork() {
|
|
cxt := context.Background()
|
|
lruCleanEvent := event.NewEvent(CleanEventName)
|
|
work := event.EventWorkFunc(func() (interface{}, error) {
|
|
err := lru.DelToClearSize()
|
|
return nil, err
|
|
})
|
|
|
|
lruCleanEvent.SetValue(WorkFuncEventKey, work)
|
|
|
|
for {
|
|
time.Sleep(2 * time.Second)
|
|
|
|
if lru.clearSize < lru.nowSize {
|
|
lruCleanEvent.InitWaitEvent()
|
|
lru.lruCleanProduce.Call(cxt, lruCleanEvent)
|
|
_, err := lruCleanEvent.StartWaitEvent(defaultWaitTime)
|
|
if err != nil {
|
|
logx.With(cxt, lru.middleProduce).Errorln(err)
|
|
}
|
|
}
|
|
}
|
|
}
|