From 758f1bfec22ae2eec0b8b980ee882304dabb6dc2 Mon Sep 17 00:00:00 2001 From: bandl <1658002533@qq.com> Date: Tue, 12 Oct 2021 15:11:30 +0800 Subject: [PATCH] feat(lru): update lru ttl and worker --- pkg/lru/clean_work.go | 31 ------------- pkg/lru/define.go | 9 +++- pkg/lru/lru.go | 84 +++++++++++++++++++++++++--------- pkg/lru/ttl.go | 51 +++++++++++++++++++++ pkg/lru/worker.go | 102 ++++++++++++++++++++++++++++++++++++------ pkg/middle-msg/lru.go | 13 ++++-- 6 files changed, 221 insertions(+), 69 deletions(-) delete mode 100644 pkg/lru/clean_work.go create mode 100644 pkg/lru/ttl.go diff --git a/pkg/lru/clean_work.go b/pkg/lru/clean_work.go deleted file mode 100644 index 228ca19..0000000 --- a/pkg/lru/clean_work.go +++ /dev/null @@ -1,31 +0,0 @@ -package lru - -import ( - "context" - "gitee.com/timedb/wheatCache/pkg/event" - "time" -) - -func (lru *SingleCache) cleanWork() { - cxt := context.Background() - for { - time.Sleep(2 * time.Second) - if lru.clearSize < lru.nowSize { - lruCleanEvent := event.NewEvent(CleanEventName) - lruCleanEvent.InitWaitEvent() - work := event.EventWorkFunc(func() (interface{}, error) { - err := lru.DelToClearSize() - return nil, err - }) - - lruCleanEvent.SetValue(WorkFuncEventKey, work) - - lru.lruCleanProduce.Call(cxt, lruCleanEvent) - _, err := lruCleanEvent.StartWaitEvent(defaultWaitTime) - if err != nil { - //logx.With(cxt, ).Error("cleanWork err: %v", err) - } - - } - } -} diff --git a/pkg/lru/define.go b/pkg/lru/define.go index 6b8dd2b..6c8ede4 100644 --- a/pkg/lru/define.go +++ b/pkg/lru/define.go @@ -12,6 +12,7 @@ type SingleWorkFunc func() interface{} const ( OptionEventName = "operateEvent" CleanEventName = "clearEvent" + TtlEventName = "ttlEvent" WorkFuncEventKey = "workFunc" ) @@ -35,5 +36,11 @@ type CacheInterface interface { Add(key *proto.BaseKey, val structure.KeyBaseInterface) UpdateLruSize(length structure.UpdateLength) DelByKey(key *proto.BaseKey) error - DelToClearSize() error + DelToClearSize() error } + +// TTL +const ( + defaultDetachNum = 300 + defaultTtlMaxLevel = 18 +) diff --git a/pkg/lru/lru.go b/pkg/lru/lru.go index 29a5ae7..a8682ad 100644 --- a/pkg/lru/lru.go +++ b/pkg/lru/lru.go @@ -2,19 +2,22 @@ package lru import ( "container/list" + "sync/atomic" + _ "gitee.com/timedb/wheatCache/conf" "gitee.com/timedb/wheatCache/pkg/errorx" "gitee.com/timedb/wheatCache/pkg/event" + "gitee.com/timedb/wheatCache/pkg/middle" "gitee.com/timedb/wheatCache/pkg/proto" "gitee.com/timedb/wheatCache/pkg/structure" "gitee.com/timedb/wheatCache/pkg/util" "github.com/spf13/viper" - "sync/atomic" ) type keyBaseValue struct { - key string - val structure.KeyBaseInterface + key string + val structure.KeyBaseInterface + expire int64 // 过期时间戳 } type SingleCache struct { @@ -24,9 +27,13 @@ type SingleCache struct { li *list.List lruMap map[string]*list.Element lruMaxDiverSize int + lruTtlManage *lruTTl // 定时清理器 + lruDriver event.DriverInterface lruConsumer event.ConsumerInterface lruCleanProduce event.ProduceInterface // 发送清理事件 + + middleProduce event.ProduceInterface // 中间件驱动 } // UpdateLruSize 更新现在的长度 @@ -34,11 +41,11 @@ func (lru *SingleCache) UpdateLruSize(length structure.UpdateLength) { atomic.AddInt64(&lru.nowSize, int64(length)) } -func cacheInit() (int64, int64, int) { +func cacheInit() (int64, int64, int, int) { maxSize := viper.GetString("lruCache.maxSize") retMaxSize, maxErr := util.ParseSizeToBit(maxSize) if maxErr != nil { - return 0, 0, 0 + return 0, 0, 0, 0 } if retMaxSize == 0 { retMaxSize = defaultLruMaxSize @@ -47,7 +54,7 @@ func cacheInit() (int64, int64, int) { clearSize := viper.GetString("lruCache.clearSize") retClearSize, clearErr := util.ParseSizeToBit(clearSize) if clearErr != nil { - return 0, 0, 0 + return 0, 0, 0, 0 } if retClearSize == 0 { retClearSize = defaultLruClearSize @@ -57,12 +64,18 @@ func cacheInit() (int64, int64, int) { if maxDriver == 0 { maxDriver = defaultLruEventDriver } - return retMaxSize, retClearSize, maxDriver + + detachNum := viper.GetInt("lruCache.detachNum") + if detachNum == 0 { + detachNum = defaultDetachNum + } + + return retMaxSize, retClearSize, maxDriver, detachNum } // NewLRUCache lru初始化 func NewLRUCache() *SingleCache { - maxSize, clearSize, maxDriverSize := cacheInit() + maxSize, clearSize, maxDriverSize, detachNum := cacheInit() lruDriver := event.NewDriver(maxDriverSize) lruCacheOnce.Do(func() { lru := &SingleCache{ @@ -75,9 +88,16 @@ func NewLRUCache() *SingleCache { lruDriver: lruDriver, lruConsumer: event.NewConsumer(lruDriver), lruCleanProduce: event.NewProduce(lruDriver), + middleProduce: event.NewProduce(middle.NewMiddleWare().GetEventDriver()), + lruTtlManage: newLruTTl(detachNum), } lruCache = lru + + // 启动 lru 事件驱动 go lru.lruSingleWork() + go lru.lruTtlWork() + go lru.cleanWork() + }) return lruCache } @@ -90,9 +110,11 @@ func (lru *SingleCache) GetDriver() event.DriverInterface { //Add 增加 func (lru *SingleCache) Add(key *proto.BaseKey, val structure.KeyBaseInterface) { + exp := lru.lruTtlManage.setKeys(key) keyBaseVal := &keyBaseValue{ - key: key.Key, - val: val, + key: key.Key, + val: val, + expire: exp, } if elVal, ok := lru.lruMap[key.Key]; ok { lru.li.MoveToFront(elVal) @@ -108,9 +130,6 @@ func (lru *SingleCache) Add(key *proto.BaseKey, val structure.KeyBaseInterface) // Get 查找key对应的value func (lru *SingleCache) Get(key *proto.BaseKey) (structure.KeyBaseInterface, bool) { - if lru.lruMap == nil { - return nil, false - } if elVal, ok := lru.lruMap[key.Key]; ok { lru.li.MoveToFront(elVal) return elVal.Value.(*keyBaseValue).val, true @@ -136,24 +155,49 @@ func (lru *SingleCache) DelByKey(key *proto.BaseKey) error { if lru.lruMap == nil { return errorx.New("lru is nil") } - if _, ok := lru.lruMap[key.Key]; ok { + if el, ok := lru.lruMap[key.Key]; ok { delete(lru.lruMap, key.Key) + lru.li.Remove(el) + lru.UpdateLruSize(structure.UpdateLength(-1 * el.Value.(*keyBaseValue).val.SizeByte())) return nil } return errorx.New("lru no this key") } +//DelByKeyAndExTtl 根据key(string)删除已经过期的 key +func (lru *SingleCache) delByKeyAndExTtl(key string, beforeTime int64) { + if elVal, ok := lru.lruMap[key]; ok { + exp := elVal.Value.(*keyBaseValue).expire + if exp <= beforeTime { + delete(lru.lruMap, key) + lru.li.Remove(elVal) + lru.UpdateLruSize(structure.UpdateLength(-1 * elVal.Value.(*keyBaseValue).val.SizeByte())) + } + } +} + func (lru *SingleCache) DelToClearSize() error { if lru.lruMap == nil { return errorx.New("lru is nil") } - for { - if lru.nowSize > lru.clearSize { - //del自动给nowSize进行大小的改变 - lru.Del() - } else { - break + for lru.nowSize > lru.clearSize { + //del自动给nowSize进行大小的改变 + err := lru.Del() + if err != nil { + return err } + } return nil } + +// 更新过期时间 +func (lru *SingleCache) UpdateTTl(key *proto.BaseKey) error { + + if elVal, ok := lru.lruMap[key.Key]; ok { + expire := lru.lruTtlManage.setKeys(key) + elVal.Value.(*keyBaseValue).expire = expire + } + + return errorx.New("the key is not in lru cache, key:%s", key.Key) +} diff --git a/pkg/lru/ttl.go b/pkg/lru/ttl.go new file mode 100644 index 0000000..a0933d8 --- /dev/null +++ b/pkg/lru/ttl.go @@ -0,0 +1,51 @@ +package lru + +import ( + "sync" + "time" + + "gitee.com/timedb/wheatCache/pkg/proto" + "gitee.com/timedb/wheatCache/pkg/util/skiplist" +) + +// lru 的 ttl 管理器 +type lruTTl struct { + sk *skiplist.SkipList + memoryKey chan string // 缓存过期的 key + detachNum int // 每次移除的数量 + mu sync.Mutex +} + +func (l *lruTTl) setKeys(key *proto.BaseKey) int64 { + l.mu.Lock() + defer l.mu.Unlock() + + ttlTime := time.Now().Unix() + if key.Expire != nil { + ttlTime = key.Expire.GetSeconds() + } + + ttlTime += key.GetTtl() + l.sk.Insert(float64(ttlTime), key.GetKey()) + + return ttlTime +} + +// 加载过期的 Key 到 Memory +func (l *lruTTl) ttlKeyToMemoryBySecond() { + t := time.Now() + values := l.sk.PopLeft(float64(t.Unix())) + + for _, val := range values { + l.memoryKey <- val.(string) + } +} + +func newLruTTl(detachNum int) *lruTTl { + return &lruTTl{ + sk: skiplist.NewSkipList(defaultTtlMaxLevel), + // 默认 10000 个 Key + memoryKey: make(chan string, 10000), + detachNum: detachNum, + } +} diff --git a/pkg/lru/worker.go b/pkg/lru/worker.go index 74d5705..71c7c23 100644 --- a/pkg/lru/worker.go +++ b/pkg/lru/worker.go @@ -4,30 +4,27 @@ import ( "context" "gitee.com/timedb/wheatCache/pkg/errorx" "gitee.com/timedb/wheatCache/pkg/event" + "gitee.com/timedb/wheatCache/pkg/logx" + "time" ) -func (lru *SingleCache) lruSingleWork() interface{} { +func (lru *SingleCache) lruSingleWork() { ctx := context.Background() for { workEvent := lru.lruConsumer.Receive(ctx) + workFunc, ok := workEvent.GetValue(WorkFuncEventKey) + if !ok { + workEvent.SetResultErr(errorx.LruNotWorkFuncEventErr()) + continue + } switch workEvent.GetEventName() { case OptionEventName: - workFunc, ok := workEvent.GetValue(WorkFuncEventKey) - if !ok { - workEvent.SetResultErr(errorx.LruNotWorkFuncEventErr()) - continue - } - if work, ok := workFunc.(event.EventWorkFunc); ok { workEvent.ExecWorkAndSendResult(work) } + case CleanEventName: - workFunc, ok := workEvent.GetValue(WorkFuncEventKey) - if !ok { - workEvent.SetResultErr(errorx.LruNotWorkFuncEventErr()) - continue - } // 对当前的io数量进行判断 ioNum := lru.GetDriver().GetLength() if ioNum > lru.lruMaxDiverSize*1/2 { @@ -37,8 +34,85 @@ func (lru *SingleCache) lruSingleWork() interface{} { if work, ok := workFunc.(event.EventWorkFunc); ok { workEvent.ExecWorkAndSendResult(work) } - default: - return errorx.New("no this name") + + case TtlEventName: + if work, ok := workFunc.(event.EventWorkFunc); ok { + workEvent.ExecWorkAndSendResult(work) + } + } + } +} + +// 执行过期事件 +func (lru *SingleCache) lruTtlWork() { + + ttlEvent := event.NewEvent(TtlEventName) + ctx := context.Background() + work := event.EventWorkFunc(func() (interface{}, error) { + + beforeTime := time.Now().Unix() + cle := lru.lruTtlManage.detachNum + if cle > len(lru.lruTtlManage.memoryKey) { + cle = len(lru.lruTtlManage.memoryKey) + } + + // TODO send keys to middle + keys := make([]string, 0) + for i := 0; i < cle; i++ { + key := <-lru.lruTtlManage.memoryKey + keys = append(keys, key) + lru.delByKeyAndExTtl(key, beforeTime) + } + + return nil, nil + }) + ttlEvent.SetValue(WorkFuncEventKey, work) + + cleanTTlTicker := time.NewTicker(500 * time.Millisecond) + defer cleanTTlTicker.Stop() + gatherTTlTicker := time.NewTicker(1 * time.Millisecond) + defer gatherTTlTicker.Stop() + + for { + select { + // 清理事件 + case <-cleanTTlTicker.C: + if len(lru.lruTtlManage.memoryKey) == 0 { + continue + } + ttlEvent.InitWaitEvent() + lru.lruCleanProduce.Call(ctx, ttlEvent) + _, err := ttlEvent.StartWaitEvent(time.Second * 2) + if err != nil { + logx.With(ctx, lru.middleProduce).Errorln(err) + } + + case <-gatherTTlTicker.C: + lru.lruTtlManage.ttlKeyToMemoryBySecond() + } + } +} + +func (lru *SingleCache) cleanWork() { + cxt := context.Background() + lruCleanEvent := event.NewEvent(CleanEventName) + work := event.EventWorkFunc(func() (interface{}, error) { + err := lru.DelToClearSize() + return nil, err + }) + + lruCleanEvent.SetValue(WorkFuncEventKey, work) + + for { + time.Sleep(2 * time.Second) + + if lru.clearSize < lru.nowSize { + lruCleanEvent.InitWaitEvent() + lru.lruCleanProduce.Call(cxt, lruCleanEvent) + _, err := lruCleanEvent.StartWaitEvent(defaultWaitTime) + if err != nil { + logx.With(cxt, lru.middleProduce).Errorln(err) + } } } } diff --git a/pkg/middle-msg/lru.go b/pkg/middle-msg/lru.go index b566510..4df0a47 100644 --- a/pkg/middle-msg/lru.go +++ b/pkg/middle-msg/lru.go @@ -2,13 +2,20 @@ package middle_msg import "time" -var LruCleanContextName = "lru-clean-context" +const LruCleanContextName = "lru-clean-context" type LruCleanContext struct { Keys []string BeforeCleanSize int64 BehindCleanSize int64 - StartTime time.Duration - EndTime time.Duration + StartTime time.Time + EndTime time.Time +} + +const LruTTlContextName = "lru-ttl-context" + +type LruTTlContext struct { + Keys []string + CleanTime time.Time }