From 9fed552380bcaba043003e6b72b90b1b4187cffb Mon Sep 17 00:00:00 2001 From: bandl <1658002533@qq.com> Date: Tue, 2 Nov 2021 21:11:08 +0800 Subject: [PATCH] feat(storage): recver lru by aof --- storage/persisted/aof.go | 4 ++++ storage/service/single.go | 46 ++++++++++++++++++++++++++++++++++++++- 2 files changed, 49 insertions(+), 1 deletion(-) diff --git a/storage/persisted/aof.go b/storage/persisted/aof.go index 49f7da4..8e5e7a3 100644 --- a/storage/persisted/aof.go +++ b/storage/persisted/aof.go @@ -103,3 +103,7 @@ func (a *AOF) SendRequest(method string, req interface{}) error { } return errorx.New("type not is protobuf.message") } + +func (a *AOF) GetAOFLogPath() string { + return a.logPath +} diff --git a/storage/service/single.go b/storage/service/single.go index 499e775..aca1922 100644 --- a/storage/service/single.go +++ b/storage/service/single.go @@ -1,9 +1,13 @@ package service import ( + "bufio" + "io" + "os" "time" "gitee.com/wheat-os/wheatCache/pkg/event" + "gitee.com/wheat-os/wheatCache/pkg/logx" "gitee.com/wheat-os/wheatCache/pkg/lru" "gitee.com/wheat-os/wheatCache/pkg/middle" "gitee.com/wheat-os/wheatCache/pkg/proto" @@ -42,6 +46,44 @@ func loadAOF() *persisted.AOF { aofCheckFreq, time.Second*time.Duration(aofCheckTime)) } +// 尝试使用 aof 进行恢复 +func recoverByAOF(d dao.Interface, aof *persisted.AOF) { + if aof == nil || d == nil { + return + } + + path := aof.GetAOFLogPath() + f, err := os.Open(path) + if err != nil { + return + } + defer f.Close() + + rd := bufio.NewReader(f) + for { + std, err := rd.ReadBytes('\n') + if err != nil { + if err != io.EOF { + logx.Errorln(err) + } + break + } + + message, err := aof.DecodeAOF(std[:len(std)-1]) + if err != nil { + logx.Errorln(err) + return + } + + // 执行 恢复 + err = d.ExecMessage(message) + if err != nil { + logx.Errorln(err) + return + } + } +} + func NewSingleServer() proto.CommServerServer { oneSingleServer.Do(func() { timeOut := viper.GetInt("storage.timeOut") @@ -50,15 +92,17 @@ func NewSingleServer() proto.CommServerServer { } lruCache := lru.NewLRUCache() + dao := dao.NewDao(lruCache) // 加载 aof aof := loadAOF() + recoverByAOF(dao, aof) ser := &singleService{ lruCache: lruCache, lruProduce: event.NewProduce(lruCache.GetDriver()), timeOut: time.Duration(timeOut) * time.Second, - dao: dao.NewDao(lruCache), + dao: dao, middleProduce: event.NewProduce(middle.NewMiddleWare().GetEventDriver()), aof: aof, }