From 4ca09febb40130338f4bd134c0b152a2e6c6609e Mon Sep 17 00:00:00 2001 From: bandl <1658002533@qq.com> Date: Tue, 2 Nov 2021 20:35:28 +0800 Subject: [PATCH 1/5] feat(shell): add recover aof --- shell/make_service.py | 11 +++++++++++ storage/temp/dao.template | 6 +++++- storage/temp/dao_aof.template | 33 +++++++++++++++++++++++++++++++++ 3 files changed, 49 insertions(+), 1 deletion(-) create mode 100644 storage/temp/dao_aof.template diff --git a/shell/make_service.py b/shell/make_service.py index 83923d8..218ea0e 100644 --- a/shell/make_service.py +++ b/shell/make_service.py @@ -143,11 +143,22 @@ def gen_aof(proto): temp_path = f"{aofPath}/codec.gen.go" with open(temp_path, 'w', encoding='utf-8') as f: f.write(text) + + # 生成 AOF 恢复机制 + tem_text = load_template("dao_aof.template") + template = Template(tem_text) + + text = template.render(keys=proto) + + temp_path = f"{daoPath}/dao.gen.go" + with open(temp_path, 'w', encoding='utf-8') as f: + f.write(text) def format_code_go(): go_fmt(f"{daoPath}/interface.gen.go") go_fmt(f"{servicePath}/single_service.gen.go") go_fmt(f"{aofPath}/codec.gen.go") + go_fmt(f"{daoPath}/dao.gen.go") def main(): # 加载 protobuf diff --git a/storage/temp/dao.template b/storage/temp/dao.template index 55a2b90..0ea1134 100644 --- a/storage/temp/dao.template +++ b/storage/temp/dao.template @@ -3,10 +3,14 @@ package dao -import "gitee.com/wheat-os/wheatCache/pkg/proto" +import ( + "gitee.com/wheat-os/wheatCache/pkg/proto" + protobuf "google.golang.org/protobuf/proto" +) type Interface interface { {%for key in keys %} {{key.method}}({% for req in key.option %} {{req[1]}}, {% endfor %}) (*proto.{{key.method}}Response, error) {%- endfor %} + ExecMessage(message protobuf.Message) error } diff --git a/storage/temp/dao_aof.template b/storage/temp/dao_aof.template new file mode 100644 index 0000000..37dcac1 --- /dev/null +++ b/storage/temp/dao_aof.template @@ -0,0 +1,33 @@ +// Code generated by gen-struct. DO NOT EDIT. +// make gen-service generated +package dao + +import ( + "gitee.com/wheat-os/wheatCache/pkg/errorx" + "gitee.com/wheat-os/wheatCache/pkg/lru" + "gitee.com/wheat-os/wheatCache/pkg/proto" + protobuf "google.golang.org/protobuf/proto" +) + +type Dao struct { + lru lru.CacheInterface +} + +func NewDao(lru lru.CacheInterface) Interface { + return &Dao{ + lru: lru, + } +} + +// 执行 msg +func (d *Dao) ExecMessage(message protobuf.Message) error { + switch req := message.(type) { + {%for key in keys %} + case *proto.{{key.method}}Request: + _, err := d.{{key.method}}({% for req in key.option %} req.{{req[0]}}, {% endfor %}) + return err + {%- endfor %} + default: + return errorx.New("The type that is not registered exec err") + } +} From 7d9081ce8e8c553134ce926ef32298f68f4991c2 Mon Sep 17 00:00:00 2001 From: bandl <1658002533@qq.com> Date: Tue, 2 Nov 2021 20:35:45 +0800 Subject: [PATCH 2/5] chore(service): updater dao service --- storage/dao/dao.gen.go | 95 ++++++++++++++++++++++++++++++++++++ storage/dao/dao.go | 15 ------ storage/dao/interface.gen.go | 6 ++- 3 files changed, 100 insertions(+), 16 deletions(-) create mode 100644 storage/dao/dao.gen.go delete mode 100644 storage/dao/dao.go diff --git a/storage/dao/dao.gen.go b/storage/dao/dao.gen.go new file mode 100644 index 0000000..25d1c59 --- /dev/null +++ b/storage/dao/dao.gen.go @@ -0,0 +1,95 @@ +// Code generated by gen-struct. DO NOT EDIT. +// make gen-service generated +package dao + +import ( + "gitee.com/wheat-os/wheatCache/pkg/errorx" + "gitee.com/wheat-os/wheatCache/pkg/lru" + "gitee.com/wheat-os/wheatCache/pkg/proto" + protobuf "google.golang.org/protobuf/proto" +) + +type Dao struct { + lru lru.CacheInterface +} + +func NewDao(lru lru.CacheInterface) Interface { + return &Dao{ + lru: lru, + } +} + +// 执行 msg +func (d *Dao) ExecMessage(message protobuf.Message) error { + switch req := message.(type) { + + case *proto.LIndexRequest: + _, err := d.LIndex(req.Key, req.Index) + return err + case *proto.LLenRequest: + _, err := d.LLen(req.Key) + return err + case *proto.LPopRequest: + _, err := d.LPop(req.Key, req.Count) + return err + case *proto.LPushRequest: + _, err := d.LPush(req.Key, req.Values) + return err + case *proto.LPushXRequest: + _, err := d.LPushX(req.Key, req.Values) + return err + case *proto.LRangeRequest: + _, err := d.LRange(req.Key, req.Start, req.End) + return err + case *proto.LRemRequest: + _, err := d.LRem(req.Key, req.Count, req.Value) + return err + case *proto.LSetRequest: + _, err := d.LSet(req.Key, req.Index, req.Value) + return err + case *proto.RPopRequest: + _, err := d.RPop(req.Key, req.Count) + return err + case *proto.LTrimRequest: + _, err := d.LTrim(req.Key, req.Start, req.End) + return err + case *proto.RPushRequest: + _, err := d.RPush(req.Key, req.Values) + return err + case *proto.RPushXRequest: + _, err := d.RPushX(req.Key, req.Values) + return err + case *proto.SetRequest: + _, err := d.Set(req.Key, req.Val) + return err + case *proto.GetRequest: + _, err := d.Get(req.Key) + return err + case *proto.AddRequest: + _, err := d.Add(req.Key, req.Renewal) + return err + case *proto.ReduceRequest: + _, err := d.Reduce(req.Key, req.Renewal) + return err + case *proto.SetnxRequest: + _, err := d.Setnx(req.Key, req.Val) + return err + case *proto.SetBitRequest: + _, err := d.SetBit(req.Key, req.Val, req.Offer) + return err + case *proto.GetBitRequest: + _, err := d.GetBit(req.Key, req.Offer) + return err + case *proto.GetRangeRequest: + _, err := d.GetRange(req.Key, req.Start, req.End) + return err + case *proto.GetSetRequest: + _, err := d.GetSet(req.Key, req.Val) + return err + case *proto.StrLenRequest: + _, err := d.StrLen(req.Key) + return err + default: + return errorx.New("The type that is not registered exec err") + } +} diff --git a/storage/dao/dao.go b/storage/dao/dao.go deleted file mode 100644 index ccdf1bb..0000000 --- a/storage/dao/dao.go +++ /dev/null @@ -1,15 +0,0 @@ -package dao - -import ( - "gitee.com/wheat-os/wheatCache/pkg/lru" -) - -type Dao struct { - lru lru.CacheInterface -} - -func NewDao(lru lru.CacheInterface) Interface { - return &Dao{ - lru: lru, - } -} diff --git a/storage/dao/interface.gen.go b/storage/dao/interface.gen.go index 5914ce5..8d5a878 100644 --- a/storage/dao/interface.gen.go +++ b/storage/dao/interface.gen.go @@ -3,7 +3,10 @@ package dao -import "gitee.com/wheat-os/wheatCache/pkg/proto" +import ( + "gitee.com/wheat-os/wheatCache/pkg/proto" + protobuf "google.golang.org/protobuf/proto" +) type Interface interface { LIndex(*proto.BaseKey, int32) (*proto.LIndexResponse, error) @@ -28,4 +31,5 @@ type Interface interface { GetRange(*proto.BaseKey, int32, int32) (*proto.GetRangeResponse, error) GetSet(*proto.BaseKey, string) (*proto.GetSetResponse, error) StrLen(*proto.BaseKey) (*proto.StrLenResponse, error) + ExecMessage(message protobuf.Message) error } From 9fed552380bcaba043003e6b72b90b1b4187cffb Mon Sep 17 00:00:00 2001 From: bandl <1658002533@qq.com> Date: Tue, 2 Nov 2021 21:11:08 +0800 Subject: [PATCH 3/5] feat(storage): recver lru by aof --- storage/persisted/aof.go | 4 ++++ storage/service/single.go | 46 ++++++++++++++++++++++++++++++++++++++- 2 files changed, 49 insertions(+), 1 deletion(-) diff --git a/storage/persisted/aof.go b/storage/persisted/aof.go index 49f7da4..8e5e7a3 100644 --- a/storage/persisted/aof.go +++ b/storage/persisted/aof.go @@ -103,3 +103,7 @@ func (a *AOF) SendRequest(method string, req interface{}) error { } return errorx.New("type not is protobuf.message") } + +func (a *AOF) GetAOFLogPath() string { + return a.logPath +} diff --git a/storage/service/single.go b/storage/service/single.go index 499e775..aca1922 100644 --- a/storage/service/single.go +++ b/storage/service/single.go @@ -1,9 +1,13 @@ package service import ( + "bufio" + "io" + "os" "time" "gitee.com/wheat-os/wheatCache/pkg/event" + "gitee.com/wheat-os/wheatCache/pkg/logx" "gitee.com/wheat-os/wheatCache/pkg/lru" "gitee.com/wheat-os/wheatCache/pkg/middle" "gitee.com/wheat-os/wheatCache/pkg/proto" @@ -42,6 +46,44 @@ func loadAOF() *persisted.AOF { aofCheckFreq, time.Second*time.Duration(aofCheckTime)) } +// 尝试使用 aof 进行恢复 +func recoverByAOF(d dao.Interface, aof *persisted.AOF) { + if aof == nil || d == nil { + return + } + + path := aof.GetAOFLogPath() + f, err := os.Open(path) + if err != nil { + return + } + defer f.Close() + + rd := bufio.NewReader(f) + for { + std, err := rd.ReadBytes('\n') + if err != nil { + if err != io.EOF { + logx.Errorln(err) + } + break + } + + message, err := aof.DecodeAOF(std[:len(std)-1]) + if err != nil { + logx.Errorln(err) + return + } + + // 执行 恢复 + err = d.ExecMessage(message) + if err != nil { + logx.Errorln(err) + return + } + } +} + func NewSingleServer() proto.CommServerServer { oneSingleServer.Do(func() { timeOut := viper.GetInt("storage.timeOut") @@ -50,15 +92,17 @@ func NewSingleServer() proto.CommServerServer { } lruCache := lru.NewLRUCache() + dao := dao.NewDao(lruCache) // 加载 aof aof := loadAOF() + recoverByAOF(dao, aof) ser := &singleService{ lruCache: lruCache, lruProduce: event.NewProduce(lruCache.GetDriver()), timeOut: time.Duration(timeOut) * time.Second, - dao: dao.NewDao(lruCache), + dao: dao, middleProduce: event.NewProduce(middle.NewMiddleWare().GetEventDriver()), aof: aof, } From 5ee502021da719b52d5d51803e6961079b7a65e1 Mon Sep 17 00:00:00 2001 From: bandl <1658002533@qq.com> Date: Tue, 2 Nov 2021 21:34:23 +0800 Subject: [PATCH 4/5] feat(eooorx): update errorx key base --- pkg/errorx/lru.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/errorx/lru.go b/pkg/errorx/lru.go index 95952eb..076ce8e 100644 --- a/pkg/errorx/lru.go +++ b/pkg/errorx/lru.go @@ -5,5 +5,5 @@ func LruNotWorkFuncEventErr() error { } func KeyBaseIsNilErr() error { - return New("key base not is nil") + return New("key base can't be nil") } From f5c3b0e1dbbaeb3b04f8acca861cf3bae8d9f2ad Mon Sep 17 00:00:00 2001 From: bandl <1658002533@qq.com> Date: Wed, 3 Nov 2021 12:40:58 +0800 Subject: [PATCH 5/5] fat(ttl): add permanent storage --- pkg/lru/ttl.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/pkg/lru/ttl.go b/pkg/lru/ttl.go index 189aa3f..23319e2 100644 --- a/pkg/lru/ttl.go +++ b/pkg/lru/ttl.go @@ -20,6 +20,11 @@ func (l *lruTTl) setKeys(key *proto.BaseKey) int64 { l.mu.Lock() defer l.mu.Unlock() + // 永久存储 + if key.Expire == nil && key.Ttl == 0 { + return 0 + } + ttlTime := time.Now().Unix() if key.Expire != nil { ttlTime = key.Expire.GetSeconds()