package lru import ( "context" "gitee.com/timedb/wheatCache/pkg/errorx" "gitee.com/timedb/wheatCache/pkg/event" "gitee.com/timedb/wheatCache/pkg/logx" "time" ) func (lru *SingleCache) lruSingleWork() { ctx := context.Background() for { workEvent := lru.lruConsumer.Receive(ctx) workFunc, ok := workEvent.GetValue(WorkFuncEventKey) if !ok { workEvent.SetResultErr(errorx.LruNotWorkFuncEventErr()) continue } switch workEvent.GetEventName() { case OptionEventName: if work, ok := workFunc.(event.EventWorkFunc); ok { workEvent.ExecWorkAndSendResult(work) } case CleanEventName: // 对当前的io数量进行判断 ioNum := lru.GetDriver().GetLength() if ioNum > lru.lruMaxDiverSize*1/2 { lru.lruCleanProduce.Call(ctx, workEvent) continue } if work, ok := workFunc.(event.EventWorkFunc); ok { workEvent.ExecWorkAndSendResult(work) } case TtlEventName: if work, ok := workFunc.(event.EventWorkFunc); ok { workEvent.ExecWorkAndSendResult(work) } } } } // 执行过期事件 func (lru *SingleCache) lruTtlWork() { ttlEvent := event.NewEvent(TtlEventName) ctx := context.Background() work := event.EventWorkFunc(func() (interface{}, error) { beforeTime := time.Now().Unix() cle := lru.lruTtlManage.detachNum if cle > len(lru.lruTtlManage.memoryKey) { cle = len(lru.lruTtlManage.memoryKey) } // TODO send keys to middle keys := make([]string, 0) for i := 0; i < cle; i++ { key := <-lru.lruTtlManage.memoryKey keys = append(keys, key) lru.delByKeyAndExTtl(key, beforeTime) } return nil, nil }) ttlEvent.SetValue(WorkFuncEventKey, work) cleanTTlTicker := time.NewTicker(500 * time.Millisecond) defer cleanTTlTicker.Stop() gatherTTlTicker := time.NewTicker(1 * time.Millisecond) defer gatherTTlTicker.Stop() for { select { // 清理事件 case <-cleanTTlTicker.C: if len(lru.lruTtlManage.memoryKey) == 0 { continue } ttlEvent.InitWaitEvent() lru.lruCleanProduce.Call(ctx, ttlEvent) _, err := ttlEvent.StartWaitEvent(time.Second * 2) if err != nil { logx.With(ctx, lru.middleProduce).Errorln(err) } case <-gatherTTlTicker.C: lru.lruTtlManage.ttlKeyToMemoryBySecond() } } } func (lru *SingleCache) cleanWork() { cxt := context.Background() lruCleanEvent := event.NewEvent(CleanEventName) work := event.EventWorkFunc(func() (interface{}, error) { err := lru.DelToClearSize() return nil, err }) lruCleanEvent.SetValue(WorkFuncEventKey, work) for { time.Sleep(2 * time.Second) if lru.clearSize < lru.nowSize { lruCleanEvent.InitWaitEvent() lru.lruCleanProduce.Call(cxt, lruCleanEvent) _, err := lruCleanEvent.StartWaitEvent(defaultWaitTime) if err != nil { logx.With(cxt, lru.middleProduce).Errorln(err) } } } }