feat(storage): recver lru by aof

This commit is contained in:
bandl 2021-11-02 21:11:08 +08:00
parent 7d9081ce8e
commit 9fed552380
2 changed files with 49 additions and 1 deletions

View File

@ -103,3 +103,7 @@ func (a *AOF) SendRequest(method string, req interface{}) error {
}
return errorx.New("type not is protobuf.message")
}
func (a *AOF) GetAOFLogPath() string {
return a.logPath
}

View File

@ -1,9 +1,13 @@
package service
import (
"bufio"
"io"
"os"
"time"
"gitee.com/wheat-os/wheatCache/pkg/event"
"gitee.com/wheat-os/wheatCache/pkg/logx"
"gitee.com/wheat-os/wheatCache/pkg/lru"
"gitee.com/wheat-os/wheatCache/pkg/middle"
"gitee.com/wheat-os/wheatCache/pkg/proto"
@ -42,6 +46,44 @@ func loadAOF() *persisted.AOF {
aofCheckFreq, time.Second*time.Duration(aofCheckTime))
}
// 尝试使用 aof 进行恢复
func recoverByAOF(d dao.Interface, aof *persisted.AOF) {
if aof == nil || d == nil {
return
}
path := aof.GetAOFLogPath()
f, err := os.Open(path)
if err != nil {
return
}
defer f.Close()
rd := bufio.NewReader(f)
for {
std, err := rd.ReadBytes('\n')
if err != nil {
if err != io.EOF {
logx.Errorln(err)
}
break
}
message, err := aof.DecodeAOF(std[:len(std)-1])
if err != nil {
logx.Errorln(err)
return
}
// 执行 恢复
err = d.ExecMessage(message)
if err != nil {
logx.Errorln(err)
return
}
}
}
func NewSingleServer() proto.CommServerServer {
oneSingleServer.Do(func() {
timeOut := viper.GetInt("storage.timeOut")
@ -50,15 +92,17 @@ func NewSingleServer() proto.CommServerServer {
}
lruCache := lru.NewLRUCache()
dao := dao.NewDao(lruCache)
// 加载 aof
aof := loadAOF()
recoverByAOF(dao, aof)
ser := &singleService{
lruCache: lruCache,
lruProduce: event.NewProduce(lruCache.GetDriver()),
timeOut: time.Duration(timeOut) * time.Second,
dao: dao.NewDao(lruCache),
dao: dao,
middleProduce: event.NewProduce(middle.NewMiddleWare().GetEventDriver()),
aof: aof,
}