wheat-cache/pkg/lru/worker.go

135 lines
3.0 KiB
Go
Raw Normal View History

2021-09-27 11:29:47 +08:00
package lru
import (
"context"
2021-10-26 15:41:16 +08:00
"time"
2021-11-02 14:45:08 +08:00
"gitee.com/wheat-os/wheatCache/pkg/errorx"
"gitee.com/wheat-os/wheatCache/pkg/event"
"gitee.com/wheat-os/wheatCache/pkg/logx"
mMsg "gitee.com/wheat-os/wheatCache/pkg/middle-msg"
2021-09-27 11:29:47 +08:00
)
2021-10-12 15:11:30 +08:00
func (lru *SingleCache) lruSingleWork() {
2021-09-27 11:29:47 +08:00
ctx := context.Background()
for {
2021-10-04 16:25:52 +08:00
workEvent := lru.lruConsumer.Receive(ctx)
2021-10-12 15:11:30 +08:00
workFunc, ok := workEvent.GetValue(WorkFuncEventKey)
if !ok {
workEvent.SetResultErr(errorx.LruNotWorkFuncEventErr())
continue
}
2021-09-27 11:29:47 +08:00
2021-10-07 16:30:56 +08:00
switch workEvent.GetEventName() {
case OptionEventName:
if work, ok := workFunc.(event.EventWorkFunc); ok {
workEvent.ExecWorkAndSendResult(work)
}
2021-10-12 15:11:30 +08:00
2021-10-07 16:30:56 +08:00
case CleanEventName:
// 对当前的io数量进行判断
ioNum := lru.GetDriver().GetLength()
2021-10-26 15:41:16 +08:00
if ioNum > lru.lruMaxDiverSize/2 {
lru.lruCleanProduce.Call(ctx, workEvent)
continue
}
if work, ok := workFunc.(event.EventWorkFunc); ok {
workEvent.ExecWorkAndSendResult(work)
}
2021-10-12 15:11:30 +08:00
case TtlEventName:
if work, ok := workFunc.(event.EventWorkFunc); ok {
workEvent.ExecWorkAndSendResult(work)
}
}
}
}
// 执行过期事件
func (lru *SingleCache) lruTtlWork() {
ctx := context.Background()
// 清理事件
go func() {
work := event.EventWorkFunc(func() (interface{}, error) {
2021-10-12 15:11:30 +08:00
beforeTime := time.Now().Unix()
cle := lru.lruTtlManage.detachNum
if cle > len(lru.lruTtlManage.memoryKey) {
cle = len(lru.lruTtlManage.memoryKey)
}
2021-10-12 15:11:30 +08:00
keys := make([]string, 0)
for i := 0; i < cle; i++ {
key := <-lru.lruTtlManage.memoryKey
keys = append(keys, key)
lru.delByKeyAndExTtl(key, beforeTime)
}
return keys, nil
})
2021-10-28 11:56:05 +08:00
cleanTTlTicker := time.NewTicker(500 * time.Millisecond)
defer cleanTTlTicker.Stop()
2021-10-28 11:56:05 +08:00
for {
// 清理事件
<-cleanTTlTicker.C
2021-10-12 15:11:30 +08:00
if len(lru.lruTtlManage.memoryKey) == 0 {
continue
}
2021-10-28 11:56:05 +08:00
ttlEvent := lru.lruCleanProduce.NewEvent(TtlEventName)
ttlEvent.SetValue(WorkFuncEventKey, work)
2021-10-12 15:11:30 +08:00
ttlEvent.InitWaitEvent()
2021-10-28 11:56:05 +08:00
2021-10-12 15:11:30 +08:00
lru.lruCleanProduce.Call(ctx, ttlEvent)
keys, err := ttlEvent.StartWaitEvent(time.Second * 2)
2021-10-26 15:41:16 +08:00
lru.lruCleanProduce.Recovery(ttlEvent)
mMsg.SendMiddleMsg(ctx, lru.middleProduce, mMsg.LruTTlContext{
Keys: keys.([]string),
CleanTime: time.Now(),
})
2021-10-12 15:11:30 +08:00
if err != nil {
logx.With(ctx, lru.middleProduce).Errorln(err)
}
}
}()
// 收集事件
for {
time.Sleep(1 * time.Second)
lru.lruTtlManage.ttlKeyToMemoryBySecond()
2021-10-12 15:11:30 +08:00
}
2021-10-12 15:11:30 +08:00
}
func (lru *SingleCache) cleanWork() {
cxt := context.Background()
work := event.EventWorkFunc(func() (interface{}, error) {
err := lru.DelToClearSize()
return nil, err
})
for {
2021-10-26 15:41:16 +08:00
time.Sleep(2 * time.Second)
2021-10-12 15:11:30 +08:00
if lru.clearSize < lru.nowSize {
2021-10-26 15:41:16 +08:00
lruCleanEvent := lru.lruCleanProduce.NewEvent(CleanEventName)
lruCleanEvent.SetValue(WorkFuncEventKey, work)
2021-10-12 15:11:30 +08:00
lruCleanEvent.InitWaitEvent()
lru.lruCleanProduce.Call(cxt, lruCleanEvent)
_, err := lruCleanEvent.StartWaitEvent(defaultWaitTime)
if err != nil {
logx.With(cxt, lru.middleProduce).Errorln(err)
}
2021-10-26 15:41:16 +08:00
// 归还
lru.lruCleanProduce.Recovery(lruCleanEvent)
2021-09-27 11:29:47 +08:00
}
}
}