Merge pull request !84 from bandl/feat-recover-aof
This commit is contained in:
bandl 2021-11-03 04:42:21 +00:00 committed by Gitee
commit eccfd6a439
10 changed files with 204 additions and 19 deletions

View File

@ -5,5 +5,5 @@ func LruNotWorkFuncEventErr() error {
}
func KeyBaseIsNilErr() error {
return New("key base not is nil")
return New("key base can't be nil")
}

View File

@ -20,6 +20,11 @@ func (l *lruTTl) setKeys(key *proto.BaseKey) int64 {
l.mu.Lock()
defer l.mu.Unlock()
// 永久存储
if key.Expire == nil && key.Ttl == 0 {
return 0
}
ttlTime := time.Now().Unix()
if key.Expire != nil {
ttlTime = key.Expire.GetSeconds()

View File

@ -143,11 +143,22 @@ def gen_aof(proto):
temp_path = f"{aofPath}/codec.gen.go"
with open(temp_path, 'w', encoding='utf-8') as f:
f.write(text)
# 生成 AOF 恢复机制
tem_text = load_template("dao_aof.template")
template = Template(tem_text)
text = template.render(keys=proto)
temp_path = f"{daoPath}/dao.gen.go"
with open(temp_path, 'w', encoding='utf-8') as f:
f.write(text)
def format_code_go():
go_fmt(f"{daoPath}/interface.gen.go")
go_fmt(f"{servicePath}/single_service.gen.go")
go_fmt(f"{aofPath}/codec.gen.go")
go_fmt(f"{daoPath}/dao.gen.go")
def main():
# 加载 protobuf

95
storage/dao/dao.gen.go Normal file
View File

@ -0,0 +1,95 @@
// Code generated by gen-struct. DO NOT EDIT.
// make gen-service generated
package dao
import (
"gitee.com/wheat-os/wheatCache/pkg/errorx"
"gitee.com/wheat-os/wheatCache/pkg/lru"
"gitee.com/wheat-os/wheatCache/pkg/proto"
protobuf "google.golang.org/protobuf/proto"
)
type Dao struct {
lru lru.CacheInterface
}
func NewDao(lru lru.CacheInterface) Interface {
return &Dao{
lru: lru,
}
}
// 执行 msg
func (d *Dao) ExecMessage(message protobuf.Message) error {
switch req := message.(type) {
case *proto.LIndexRequest:
_, err := d.LIndex(req.Key, req.Index)
return err
case *proto.LLenRequest:
_, err := d.LLen(req.Key)
return err
case *proto.LPopRequest:
_, err := d.LPop(req.Key, req.Count)
return err
case *proto.LPushRequest:
_, err := d.LPush(req.Key, req.Values)
return err
case *proto.LPushXRequest:
_, err := d.LPushX(req.Key, req.Values)
return err
case *proto.LRangeRequest:
_, err := d.LRange(req.Key, req.Start, req.End)
return err
case *proto.LRemRequest:
_, err := d.LRem(req.Key, req.Count, req.Value)
return err
case *proto.LSetRequest:
_, err := d.LSet(req.Key, req.Index, req.Value)
return err
case *proto.RPopRequest:
_, err := d.RPop(req.Key, req.Count)
return err
case *proto.LTrimRequest:
_, err := d.LTrim(req.Key, req.Start, req.End)
return err
case *proto.RPushRequest:
_, err := d.RPush(req.Key, req.Values)
return err
case *proto.RPushXRequest:
_, err := d.RPushX(req.Key, req.Values)
return err
case *proto.SetRequest:
_, err := d.Set(req.Key, req.Val)
return err
case *proto.GetRequest:
_, err := d.Get(req.Key)
return err
case *proto.AddRequest:
_, err := d.Add(req.Key, req.Renewal)
return err
case *proto.ReduceRequest:
_, err := d.Reduce(req.Key, req.Renewal)
return err
case *proto.SetnxRequest:
_, err := d.Setnx(req.Key, req.Val)
return err
case *proto.SetBitRequest:
_, err := d.SetBit(req.Key, req.Val, req.Offer)
return err
case *proto.GetBitRequest:
_, err := d.GetBit(req.Key, req.Offer)
return err
case *proto.GetRangeRequest:
_, err := d.GetRange(req.Key, req.Start, req.End)
return err
case *proto.GetSetRequest:
_, err := d.GetSet(req.Key, req.Val)
return err
case *proto.StrLenRequest:
_, err := d.StrLen(req.Key)
return err
default:
return errorx.New("The type that is not registered exec err")
}
}

View File

@ -1,15 +0,0 @@
package dao
import (
"gitee.com/wheat-os/wheatCache/pkg/lru"
)
type Dao struct {
lru lru.CacheInterface
}
func NewDao(lru lru.CacheInterface) Interface {
return &Dao{
lru: lru,
}
}

View File

@ -3,7 +3,10 @@
package dao
import "gitee.com/wheat-os/wheatCache/pkg/proto"
import (
"gitee.com/wheat-os/wheatCache/pkg/proto"
protobuf "google.golang.org/protobuf/proto"
)
type Interface interface {
LIndex(*proto.BaseKey, int32) (*proto.LIndexResponse, error)
@ -28,4 +31,5 @@ type Interface interface {
GetRange(*proto.BaseKey, int32, int32) (*proto.GetRangeResponse, error)
GetSet(*proto.BaseKey, string) (*proto.GetSetResponse, error)
StrLen(*proto.BaseKey) (*proto.StrLenResponse, error)
ExecMessage(message protobuf.Message) error
}

View File

@ -103,3 +103,7 @@ func (a *AOF) SendRequest(method string, req interface{}) error {
}
return errorx.New("type not is protobuf.message")
}
func (a *AOF) GetAOFLogPath() string {
return a.logPath
}

View File

@ -1,9 +1,13 @@
package service
import (
"bufio"
"io"
"os"
"time"
"gitee.com/wheat-os/wheatCache/pkg/event"
"gitee.com/wheat-os/wheatCache/pkg/logx"
"gitee.com/wheat-os/wheatCache/pkg/lru"
"gitee.com/wheat-os/wheatCache/pkg/middle"
"gitee.com/wheat-os/wheatCache/pkg/proto"
@ -42,6 +46,44 @@ func loadAOF() *persisted.AOF {
aofCheckFreq, time.Second*time.Duration(aofCheckTime))
}
// 尝试使用 aof 进行恢复
func recoverByAOF(d dao.Interface, aof *persisted.AOF) {
if aof == nil || d == nil {
return
}
path := aof.GetAOFLogPath()
f, err := os.Open(path)
if err != nil {
return
}
defer f.Close()
rd := bufio.NewReader(f)
for {
std, err := rd.ReadBytes('\n')
if err != nil {
if err != io.EOF {
logx.Errorln(err)
}
break
}
message, err := aof.DecodeAOF(std[:len(std)-1])
if err != nil {
logx.Errorln(err)
return
}
// 执行 恢复
err = d.ExecMessage(message)
if err != nil {
logx.Errorln(err)
return
}
}
}
func NewSingleServer() proto.CommServerServer {
oneSingleServer.Do(func() {
timeOut := viper.GetInt("storage.timeOut")
@ -50,15 +92,17 @@ func NewSingleServer() proto.CommServerServer {
}
lruCache := lru.NewLRUCache()
dao := dao.NewDao(lruCache)
// 加载 aof
aof := loadAOF()
recoverByAOF(dao, aof)
ser := &singleService{
lruCache: lruCache,
lruProduce: event.NewProduce(lruCache.GetDriver()),
timeOut: time.Duration(timeOut) * time.Second,
dao: dao.NewDao(lruCache),
dao: dao,
middleProduce: event.NewProduce(middle.NewMiddleWare().GetEventDriver()),
aof: aof,
}

View File

@ -3,10 +3,14 @@
package dao
import "gitee.com/wheat-os/wheatCache/pkg/proto"
import (
"gitee.com/wheat-os/wheatCache/pkg/proto"
protobuf "google.golang.org/protobuf/proto"
)
type Interface interface {
{%for key in keys %}
{{key.method}}({% for req in key.option %} {{req[1]}}, {% endfor %}) (*proto.{{key.method}}Response, error)
{%- endfor %}
ExecMessage(message protobuf.Message) error
}

View File

@ -0,0 +1,33 @@
// Code generated by gen-struct. DO NOT EDIT.
// make gen-service generated
package dao
import (
"gitee.com/wheat-os/wheatCache/pkg/errorx"
"gitee.com/wheat-os/wheatCache/pkg/lru"
"gitee.com/wheat-os/wheatCache/pkg/proto"
protobuf "google.golang.org/protobuf/proto"
)
type Dao struct {
lru lru.CacheInterface
}
func NewDao(lru lru.CacheInterface) Interface {
return &Dao{
lru: lru,
}
}
// 执行 msg
func (d *Dao) ExecMessage(message protobuf.Message) error {
switch req := message.(type) {
{%for key in keys %}
case *proto.{{key.method}}Request:
_, err := d.{{key.method}}({% for req in key.option %} req.{{req[0]}}, {% endfor %})
return err
{%- endfor %}
default:
return errorx.New("The type that is not registered exec err")
}
}