diff --git a/conf/public_conf.go b/conf/public_conf.go index d5f0791..9ce786b 100644 --- a/conf/public_conf.go +++ b/conf/public_conf.go @@ -33,6 +33,7 @@ func init() { func setDefaultConfValue() { // 设置一些默认值 viper.SetDefault("version", "base-01") + defaultStorage() } func LoadConf(path string) error { diff --git a/conf/storage.go b/conf/storage.go new file mode 100644 index 0000000..af0f30a --- /dev/null +++ b/conf/storage.go @@ -0,0 +1,11 @@ +package conf + +import "github.com/spf13/viper" + +func defaultStorage() { + // aof + viper.SetDefault("storage.aof-path", "/etc/wheat-cache/wheat.aof") + viper.SetDefault("storage.aof-flush-time", 5) + viper.SetDefault("storage.aof-check-time", 1) + viper.SetDefault("storage.aof-check-freq", 20) +} diff --git a/conf/wheat-cache.yaml b/conf/wheat-cache.yaml index 9d89429..51687c3 100644 --- a/conf/wheat-cache.yaml +++ b/conf/wheat-cache.yaml @@ -6,6 +6,13 @@ storage: host: '127.0.0.1' port: 5890 timeOut: 2 # second + + aof-codec: "b16" # 目前只实现了 b16 编码方案。 + aof-path: "/etc/wheat-cache/wheat.aof" + aof-flush-time: 5 # second , 每 5 秒刷新缓冲区的内容到磁盘。 + aof-check-time: 1 # 每 1 second 执行一次 io 检查 + aof-check-freq: 20 # 在一个 aof-check-time 周期内,出现超过 aof-check-freq 的 IO 操作会刷新磁盘 + # clearSize and maxSize must be Int lruCache: diff --git a/pkg/proto/define.go b/pkg/proto/define.go index e333d8b..271091f 100644 --- a/pkg/proto/define.go +++ b/pkg/proto/define.go @@ -1,5 +1,9 @@ package proto +import ( + "google.golang.org/protobuf/types/known/timestamppb" +) + type GetKeyBaseInterface interface { GetKey() *BaseKey } @@ -7,3 +11,25 @@ type GetKeyBaseInterface interface { const ( BaseKeyMethodKey = "basekey" ) + +// NewBaseKey +// key,ttl,expire +func NewBaseKey(key string, t ...int64) *BaseKey { + var expire *timestamppb.Timestamp + var ttl int64 + + if len(t) > 1 { + expire = ×tamppb.Timestamp{ + Seconds: t[1], + } + ttl = t[0] + } else if len(t) == 1 { + ttl = t[0] + } + + return &BaseKey{ + Key: key, + Expire: expire, + Ttl: ttl, + } +} diff --git a/shell/make_service.py b/shell/make_service.py index 8ab2ea2..83923d8 100644 --- a/shell/make_service.py +++ b/shell/make_service.py @@ -8,6 +8,7 @@ protoAddress = f"{sysPath}/protobuf" tempPath = f"{sysPath}/storage/temp" daoPath = f"{sysPath}/storage/dao" servicePath = f"{sysPath}/storage/service" +aofPath = f"{sysPath}/storage/persisted" class ProtoOption(object): @@ -133,16 +134,33 @@ def gen_single_service(proto): with open(temp_path, 'w', encoding='utf-8') as f: f.write(text) +def gen_aof(proto): + tem_text = load_template("aof.template") + template = Template(tem_text) + + text = template.render(keys=proto) + + temp_path = f"{aofPath}/codec.gen.go" + with open(temp_path, 'w', encoding='utf-8') as f: + f.write(text) + def format_code_go(): go_fmt(f"{daoPath}/interface.gen.go") go_fmt(f"{servicePath}/single_service.gen.go") + go_fmt(f"{aofPath}/codec.gen.go") def main(): # 加载 protobuf protobuf = load_protobuf() + # 生成 dao 接口 gen_dao_interface(protobuf) + + # 生成服务代码 gen_single_service(protobuf) + + # 生成 aof 解码方案 + gen_aof(protobuf) format_code_go() if __name__ == "__main__": diff --git a/storage/persisted/aof.go b/storage/persisted/aof.go new file mode 100644 index 0000000..60b6cd4 --- /dev/null +++ b/storage/persisted/aof.go @@ -0,0 +1,105 @@ +package persisted + +import ( + "bytes" + "os" + "sync/atomic" + "time" + + "gitee.com/timedb/wheatCache/pkg/errorx" + "gitee.com/timedb/wheatCache/pkg/logx" + protobuf "google.golang.org/protobuf/proto" +) + +type AOF struct { + codec AOFCodec + byteIO chan []byte + logPath string + ioFrequency int32 + flushTime time.Duration + checkIOTime time.Duration + ioNum *int32 +} + +// NewAOF +// codec AOF 的编码解码器,nil 时默认 protoCodec +// logPath AOF 持久化的地址 +// flushTime 刷新到磁盘的间隔时间 +// frequency, checkIOTime 使用一定频率检查,是否需要提前执行磁盘刷新 +func NewAOF(codec AOFCodec, logPath string, flushTime time.Duration, frequency int, checkIOTime time.Duration) *AOF { + oneAOF.Do(func() { + if codec == nil { + codec = &codecProto{} + } + sysAOF = &AOF{ + codec: codec, + ioFrequency: int32(frequency), + flushTime: flushTime, + logPath: logPath, + byteIO: make(chan []byte, defaultByteIONum), + ioNum: new(int32), // 统计当前的变更情况 + checkIOTime: checkIOTime, + } + go sysAOF.flushLog() + }) + + return sysAOF +} + +// 刷新数据到磁盘 +func (a *AOF) flushLog() { + flushTc := time.NewTicker(a.flushTime) + frequencyTc := time.NewTicker(a.checkIOTime) + + file, err := os.OpenFile(a.logPath, os.O_APPEND|os.O_WRONLY|os.O_CREATE, 0666) + if err != nil { + logx.Errorln(err) + return + } + + flush := func() { + num := len(a.byteIO) + for i := 0; i < num; i++ { + aof := <-a.byteIO + buffer := bytes.NewBuffer(aof) + buffer.WriteString("\n") + file.Write(buffer.Bytes()) + } + + file.Sync() + } + + for { + select { + case <-flushTc.C: + flush() + atomic.SwapInt32(a.ioNum, 0) + case <-frequencyTc.C: + if num := atomic.LoadInt32(a.ioNum); num >= a.ioFrequency { + flush() + } + atomic.SwapInt32(a.ioNum, 0) + } + } + +} + +func (a *AOF) DecodeAOF(buf []byte) (protobuf.Message, error) { + return a.codec.Decode(buf) +} + +func (a *AOF) EncodeAOF(method string, m protobuf.Message) ([]byte, error) { + return a.codec.Encode(method, m) +} + +func (a *AOF) SendRequest(method string, req interface{}) error { + if req, ok := req.(protobuf.Message); ok { + aof, err := a.EncodeAOF(method, req) + if err != nil { + return err + } + a.byteIO <- aof + atomic.AddInt32(a.ioNum, 1) + } + return errorx.New("type not is protobuf.message") +} diff --git a/storage/persisted/aof_test.go b/storage/persisted/aof_test.go new file mode 100644 index 0000000..7dbd997 --- /dev/null +++ b/storage/persisted/aof_test.go @@ -0,0 +1,51 @@ +package persisted + +import ( + "bufio" + "fmt" + "io" + "os" + "strconv" + "testing" + "time" + + "gitee.com/timedb/wheatCache/pkg/proto" + "github.com/stretchr/testify/require" +) + +func TestAOF_SendRequest(t *testing.T) { + aof := NewAOF(nil, "./test.aof", time.Second*3, 5, time.Second*1) + for i := 0; i < 100; i++ { + req := &proto.ReduceRequest{ + Key: proto.NewBaseKey(strconv.Itoa(i), int64(i)), + Renewal: int32(i), + } + + aof.SendRequest("Reduce", req) + } + + time.Sleep(4 * time.Second) + fmt.Println(len(aof.byteIO)) +} + +func TestAOF_ReadRequest(t *testing.T) { + + aof := NewAOF(nil, "./test.aof", time.Second*2, 5, time.Second*1) + f, _ := os.Open("test.aof") + defer f.Close() + + rd := bufio.NewReader(f) + for { + line, err := rd.ReadBytes('\n') + + if err != nil { + if err != io.EOF { + require.NoError(t, err) + } + break + } + msg, err := aof.DecodeAOF(line[:len(line)-1]) + require.NoError(t, err) + fmt.Println(msg) + } +} diff --git a/storage/persisted/codec.gen.go b/storage/persisted/codec.gen.go new file mode 100644 index 0000000..7cafce2 --- /dev/null +++ b/storage/persisted/codec.gen.go @@ -0,0 +1,297 @@ +// Code generated by gen-struct. DO NOT EDIT. +// make gen-service generated +package persisted + +import ( + "bytes" + "encoding/hex" + "fmt" + + "gitee.com/timedb/wheatCache/pkg/errorx" + "gitee.com/timedb/wheatCache/pkg/proto" + protobuf "google.golang.org/protobuf/proto" +) + +type codecProto struct { +} + +func (c *codecProto) Encode(method string, m protobuf.Message) ([]byte, error) { + buf, err := protobuf.Marshal(m) + if err != nil { + return nil, err + } + + // 16 进制编码 + dst := make([]byte, hex.EncodedLen(len(buf))) + n := hex.Encode(dst, buf) + buffer := bytes.NewBuffer(dst[:n]) + buffer.WriteString(fmt.Sprint(":", method)) + + return buffer.Bytes(), nil +} + +func (c *codecProto) Decode(buf []byte) (protobuf.Message, error) { + hexBuf := bytes.Split(buf, []byte(":")) + if len(hexBuf) != 2 { + return nil, errorx.New("decode aof err: not a valid aof") + } + + method := string(hexBuf[1]) + n, err := hex.Decode(buf, hexBuf[0]) + if err != nil { + return nil, err + } + + return decode(method, buf[:n]) + +} + +func decode(method string, buf []byte) (protobuf.Message, error) { + switch method { + case "LIndex": + return decodeLIndex(buf) + case "LLen": + return decodeLLen(buf) + case "LPop": + return decodeLPop(buf) + case "LPush": + return decodeLPush(buf) + case "LPushX": + return decodeLPushX(buf) + case "LRange": + return decodeLRange(buf) + case "LRem": + return decodeLRem(buf) + case "LSet": + return decodeLSet(buf) + case "RPop": + return decodeRPop(buf) + case "LTrim": + return decodeLTrim(buf) + case "RPush": + return decodeRPush(buf) + case "RPushX": + return decodeRPushX(buf) + case "Set": + return decodeSet(buf) + case "Get": + return decodeGet(buf) + case "Add": + return decodeAdd(buf) + case "Reduce": + return decodeReduce(buf) + case "Setnx": + return decodeSetnx(buf) + case "SetBit": + return decodeSetBit(buf) + case "GetBit": + return decodeGetBit(buf) + case "GetRange": + return decodeGetRange(buf) + case "GetSet": + return decodeGetSet(buf) + case "StrLen": + return decodeStrLen(buf) + + } + + return nil, errorx.New("decode aof err, the method could not be resolved, method:%s", method) +} + +func decodeLIndex(buf []byte) (*proto.LIndexRequest, error) { + req := &proto.LIndexRequest{} + err := protobuf.Unmarshal(buf, req) + if err != nil { + return nil, err + } + return req, nil +} + +func decodeLLen(buf []byte) (*proto.LLenRequest, error) { + req := &proto.LLenRequest{} + err := protobuf.Unmarshal(buf, req) + if err != nil { + return nil, err + } + return req, nil +} + +func decodeLPop(buf []byte) (*proto.LPopRequest, error) { + req := &proto.LPopRequest{} + err := protobuf.Unmarshal(buf, req) + if err != nil { + return nil, err + } + return req, nil +} + +func decodeLPush(buf []byte) (*proto.LPushRequest, error) { + req := &proto.LPushRequest{} + err := protobuf.Unmarshal(buf, req) + if err != nil { + return nil, err + } + return req, nil +} + +func decodeLPushX(buf []byte) (*proto.LPushXRequest, error) { + req := &proto.LPushXRequest{} + err := protobuf.Unmarshal(buf, req) + if err != nil { + return nil, err + } + return req, nil +} + +func decodeLRange(buf []byte) (*proto.LRangeRequest, error) { + req := &proto.LRangeRequest{} + err := protobuf.Unmarshal(buf, req) + if err != nil { + return nil, err + } + return req, nil +} + +func decodeLRem(buf []byte) (*proto.LRemRequest, error) { + req := &proto.LRemRequest{} + err := protobuf.Unmarshal(buf, req) + if err != nil { + return nil, err + } + return req, nil +} + +func decodeLSet(buf []byte) (*proto.LSetRequest, error) { + req := &proto.LSetRequest{} + err := protobuf.Unmarshal(buf, req) + if err != nil { + return nil, err + } + return req, nil +} + +func decodeRPop(buf []byte) (*proto.RPopRequest, error) { + req := &proto.RPopRequest{} + err := protobuf.Unmarshal(buf, req) + if err != nil { + return nil, err + } + return req, nil +} + +func decodeLTrim(buf []byte) (*proto.LTrimRequest, error) { + req := &proto.LTrimRequest{} + err := protobuf.Unmarshal(buf, req) + if err != nil { + return nil, err + } + return req, nil +} + +func decodeRPush(buf []byte) (*proto.RPushRequest, error) { + req := &proto.RPushRequest{} + err := protobuf.Unmarshal(buf, req) + if err != nil { + return nil, err + } + return req, nil +} + +func decodeRPushX(buf []byte) (*proto.RPushXRequest, error) { + req := &proto.RPushXRequest{} + err := protobuf.Unmarshal(buf, req) + if err != nil { + return nil, err + } + return req, nil +} + +func decodeSet(buf []byte) (*proto.SetRequest, error) { + req := &proto.SetRequest{} + err := protobuf.Unmarshal(buf, req) + if err != nil { + return nil, err + } + return req, nil +} + +func decodeGet(buf []byte) (*proto.GetRequest, error) { + req := &proto.GetRequest{} + err := protobuf.Unmarshal(buf, req) + if err != nil { + return nil, err + } + return req, nil +} + +func decodeAdd(buf []byte) (*proto.AddRequest, error) { + req := &proto.AddRequest{} + err := protobuf.Unmarshal(buf, req) + if err != nil { + return nil, err + } + return req, nil +} + +func decodeReduce(buf []byte) (*proto.ReduceRequest, error) { + req := &proto.ReduceRequest{} + err := protobuf.Unmarshal(buf, req) + if err != nil { + return nil, err + } + return req, nil +} + +func decodeSetnx(buf []byte) (*proto.SetnxRequest, error) { + req := &proto.SetnxRequest{} + err := protobuf.Unmarshal(buf, req) + if err != nil { + return nil, err + } + return req, nil +} + +func decodeSetBit(buf []byte) (*proto.SetBitRequest, error) { + req := &proto.SetBitRequest{} + err := protobuf.Unmarshal(buf, req) + if err != nil { + return nil, err + } + return req, nil +} + +func decodeGetBit(buf []byte) (*proto.GetBitRequest, error) { + req := &proto.GetBitRequest{} + err := protobuf.Unmarshal(buf, req) + if err != nil { + return nil, err + } + return req, nil +} + +func decodeGetRange(buf []byte) (*proto.GetRangeRequest, error) { + req := &proto.GetRangeRequest{} + err := protobuf.Unmarshal(buf, req) + if err != nil { + return nil, err + } + return req, nil +} + +func decodeGetSet(buf []byte) (*proto.GetSetRequest, error) { + req := &proto.GetSetRequest{} + err := protobuf.Unmarshal(buf, req) + if err != nil { + return nil, err + } + return req, nil +} + +func decodeStrLen(buf []byte) (*proto.StrLenRequest, error) { + req := &proto.StrLenRequest{} + err := protobuf.Unmarshal(buf, req) + if err != nil { + return nil, err + } + return req, nil +} diff --git a/storage/persisted/define.go b/storage/persisted/define.go new file mode 100644 index 0000000..ba182cb --- /dev/null +++ b/storage/persisted/define.go @@ -0,0 +1,21 @@ +package persisted + +import ( + "sync" + + protobuf "google.golang.org/protobuf/proto" +) + +type AOFCodec interface { + Encode(method string, m protobuf.Message) ([]byte, error) + Decode(buf []byte) (protobuf.Message, error) +} + +var ( + oneAOF sync.Once + sysAOF *AOF +) + +const ( + defaultByteIONum = 20000 +) diff --git a/storage/service/define.go b/storage/service/define.go index a6bdfaa..b8a23dd 100644 --- a/storage/service/define.go +++ b/storage/service/define.go @@ -9,4 +9,6 @@ var ( sysSingleService *singleService ) -const timeOutDefault = 2 +const ( + timeOutDefault = 2 +) diff --git a/storage/service/single.go b/storage/service/single.go index 11b59f9..dc8ac99 100644 --- a/storage/service/single.go +++ b/storage/service/single.go @@ -5,8 +5,10 @@ import ( "gitee.com/timedb/wheatCache/pkg/event" "gitee.com/timedb/wheatCache/pkg/lru" + "gitee.com/timedb/wheatCache/pkg/middle" "gitee.com/timedb/wheatCache/pkg/proto" "gitee.com/timedb/wheatCache/storage/dao" + "gitee.com/timedb/wheatCache/storage/persisted" "github.com/spf13/viper" ) @@ -16,6 +18,28 @@ type singleService struct { timeOut time.Duration lruCache *lru.SingleCache dao dao.Interface + aof *persisted.AOF +} + +func loadAOF() *persisted.AOF { + decName := viper.GetString("storage.aof-codec") + var aofCodec persisted.AOFCodec + switch decName { + case "": + return nil + case "b16": + aofCodec = nil + default: + aofCodec = nil + } + + aofPath := viper.GetString("storage.aof-path") + aofFlush := viper.GetInt("storage.aof-flush-time") + aofCheckTime := viper.GetInt("storage.aof-check-time") + aofCheckFreq := viper.GetInt("storage.aof-check-freq") + + return persisted.NewAOF(aofCodec, aofPath, time.Second*time.Duration(aofFlush), + aofCheckFreq, time.Second*time.Duration(aofCheckTime)) } func NewSingleServer() proto.CommServerServer { @@ -27,11 +51,16 @@ func NewSingleServer() proto.CommServerServer { lruCache := lru.NewLRUCache() + // 加载 aof + aof := loadAOF() + ser := &singleService{ - lruCache: lruCache, - lruProduce: event.NewProduce(lruCache.GetDriver()), - timeOut: time.Duration(timeOut) * time.Second, - dao: dao.NewDao(lruCache), + lruCache: lruCache, + lruProduce: event.NewProduce(lruCache.GetDriver()), + timeOut: time.Duration(timeOut) * time.Second, + dao: dao.NewDao(lruCache), + middleProduce: event.NewProduce(middle.NewMiddleWare().GetEventDriver()), + aof: aof, } sysSingleService = ser diff --git a/storage/service/single_service.gen.go b/storage/service/single_service.gen.go index 341c61b..33b784c 100644 --- a/storage/service/single_service.gen.go +++ b/storage/service/single_service.gen.go @@ -15,7 +15,12 @@ func (s *singleService) LIndex( req *proto.LIndexRequest, ) (*proto.LIndexResponse, error) { work := event.EventWorkFunc(func() (interface{}, error) { - return s.dao.LIndex(req.Key, req.Index) + resp, err := s.dao.LIndex(req.Key, req.Index) + if err != nil { + return nil, err + } + s.aof.SendRequest("LIndex", req) + return resp, nil }) lruEvent := s.lruProduce.NewEvent(lru.OptionEventName) @@ -36,7 +41,12 @@ func (s *singleService) LLen( req *proto.LLenRequest, ) (*proto.LLenResponse, error) { work := event.EventWorkFunc(func() (interface{}, error) { - return s.dao.LLen(req.Key) + resp, err := s.dao.LLen(req.Key) + if err != nil { + return nil, err + } + s.aof.SendRequest("LLen", req) + return resp, nil }) lruEvent := s.lruProduce.NewEvent(lru.OptionEventName) @@ -57,7 +67,12 @@ func (s *singleService) LPop( req *proto.LPopRequest, ) (*proto.LPopResponse, error) { work := event.EventWorkFunc(func() (interface{}, error) { - return s.dao.LPop(req.Key, req.Count) + resp, err := s.dao.LPop(req.Key, req.Count) + if err != nil { + return nil, err + } + s.aof.SendRequest("LPop", req) + return resp, nil }) lruEvent := s.lruProduce.NewEvent(lru.OptionEventName) @@ -78,7 +93,12 @@ func (s *singleService) LPush( req *proto.LPushRequest, ) (*proto.LPushResponse, error) { work := event.EventWorkFunc(func() (interface{}, error) { - return s.dao.LPush(req.Key, req.Values) + resp, err := s.dao.LPush(req.Key, req.Values) + if err != nil { + return nil, err + } + s.aof.SendRequest("LPush", req) + return resp, nil }) lruEvent := s.lruProduce.NewEvent(lru.OptionEventName) @@ -99,7 +119,12 @@ func (s *singleService) LPushX( req *proto.LPushXRequest, ) (*proto.LPushXResponse, error) { work := event.EventWorkFunc(func() (interface{}, error) { - return s.dao.LPushX(req.Key, req.Values) + resp, err := s.dao.LPushX(req.Key, req.Values) + if err != nil { + return nil, err + } + s.aof.SendRequest("LPushX", req) + return resp, nil }) lruEvent := s.lruProduce.NewEvent(lru.OptionEventName) @@ -120,7 +145,12 @@ func (s *singleService) LRange( req *proto.LRangeRequest, ) (*proto.LRangeResponse, error) { work := event.EventWorkFunc(func() (interface{}, error) { - return s.dao.LRange(req.Key, req.Start, req.End) + resp, err := s.dao.LRange(req.Key, req.Start, req.End) + if err != nil { + return nil, err + } + s.aof.SendRequest("LRange", req) + return resp, nil }) lruEvent := s.lruProduce.NewEvent(lru.OptionEventName) @@ -141,7 +171,12 @@ func (s *singleService) LRem( req *proto.LRemRequest, ) (*proto.LRemResponse, error) { work := event.EventWorkFunc(func() (interface{}, error) { - return s.dao.LRem(req.Key, req.Count, req.Value) + resp, err := s.dao.LRem(req.Key, req.Count, req.Value) + if err != nil { + return nil, err + } + s.aof.SendRequest("LRem", req) + return resp, nil }) lruEvent := s.lruProduce.NewEvent(lru.OptionEventName) @@ -162,7 +197,12 @@ func (s *singleService) LSet( req *proto.LSetRequest, ) (*proto.LSetResponse, error) { work := event.EventWorkFunc(func() (interface{}, error) { - return s.dao.LSet(req.Key, req.Index, req.Value) + resp, err := s.dao.LSet(req.Key, req.Index, req.Value) + if err != nil { + return nil, err + } + s.aof.SendRequest("LSet", req) + return resp, nil }) lruEvent := s.lruProduce.NewEvent(lru.OptionEventName) @@ -183,7 +223,12 @@ func (s *singleService) RPop( req *proto.RPopRequest, ) (*proto.RPopResponse, error) { work := event.EventWorkFunc(func() (interface{}, error) { - return s.dao.RPop(req.Key, req.Count) + resp, err := s.dao.RPop(req.Key, req.Count) + if err != nil { + return nil, err + } + s.aof.SendRequest("RPop", req) + return resp, nil }) lruEvent := s.lruProduce.NewEvent(lru.OptionEventName) @@ -204,7 +249,12 @@ func (s *singleService) LTrim( req *proto.LTrimRequest, ) (*proto.LTrimResponse, error) { work := event.EventWorkFunc(func() (interface{}, error) { - return s.dao.LTrim(req.Key, req.Start, req.End) + resp, err := s.dao.LTrim(req.Key, req.Start, req.End) + if err != nil { + return nil, err + } + s.aof.SendRequest("LTrim", req) + return resp, nil }) lruEvent := s.lruProduce.NewEvent(lru.OptionEventName) @@ -225,7 +275,12 @@ func (s *singleService) RPush( req *proto.RPushRequest, ) (*proto.RPushResponse, error) { work := event.EventWorkFunc(func() (interface{}, error) { - return s.dao.RPush(req.Key, req.Values) + resp, err := s.dao.RPush(req.Key, req.Values) + if err != nil { + return nil, err + } + s.aof.SendRequest("RPush", req) + return resp, nil }) lruEvent := s.lruProduce.NewEvent(lru.OptionEventName) @@ -246,7 +301,12 @@ func (s *singleService) RPushX( req *proto.RPushXRequest, ) (*proto.RPushXResponse, error) { work := event.EventWorkFunc(func() (interface{}, error) { - return s.dao.RPushX(req.Key, req.Values) + resp, err := s.dao.RPushX(req.Key, req.Values) + if err != nil { + return nil, err + } + s.aof.SendRequest("RPushX", req) + return resp, nil }) lruEvent := s.lruProduce.NewEvent(lru.OptionEventName) @@ -267,7 +327,12 @@ func (s *singleService) Set( req *proto.SetRequest, ) (*proto.SetResponse, error) { work := event.EventWorkFunc(func() (interface{}, error) { - return s.dao.Set(req.Key, req.Val) + resp, err := s.dao.Set(req.Key, req.Val) + if err != nil { + return nil, err + } + s.aof.SendRequest("Set", req) + return resp, nil }) lruEvent := s.lruProduce.NewEvent(lru.OptionEventName) @@ -288,7 +353,12 @@ func (s *singleService) Get( req *proto.GetRequest, ) (*proto.GetResponse, error) { work := event.EventWorkFunc(func() (interface{}, error) { - return s.dao.Get(req.Key) + resp, err := s.dao.Get(req.Key) + if err != nil { + return nil, err + } + s.aof.SendRequest("Get", req) + return resp, nil }) lruEvent := s.lruProduce.NewEvent(lru.OptionEventName) @@ -309,7 +379,12 @@ func (s *singleService) Add( req *proto.AddRequest, ) (*proto.AddResponse, error) { work := event.EventWorkFunc(func() (interface{}, error) { - return s.dao.Add(req.Key, req.Renewal) + resp, err := s.dao.Add(req.Key, req.Renewal) + if err != nil { + return nil, err + } + s.aof.SendRequest("Add", req) + return resp, nil }) lruEvent := s.lruProduce.NewEvent(lru.OptionEventName) @@ -330,7 +405,12 @@ func (s *singleService) Reduce( req *proto.ReduceRequest, ) (*proto.ReduceResponse, error) { work := event.EventWorkFunc(func() (interface{}, error) { - return s.dao.Reduce(req.Key, req.Renewal) + resp, err := s.dao.Reduce(req.Key, req.Renewal) + if err != nil { + return nil, err + } + s.aof.SendRequest("Reduce", req) + return resp, nil }) lruEvent := s.lruProduce.NewEvent(lru.OptionEventName) @@ -351,7 +431,12 @@ func (s *singleService) Setnx( req *proto.SetnxRequest, ) (*proto.SetnxResponse, error) { work := event.EventWorkFunc(func() (interface{}, error) { - return s.dao.Setnx(req.Key, req.Val) + resp, err := s.dao.Setnx(req.Key, req.Val) + if err != nil { + return nil, err + } + s.aof.SendRequest("Setnx", req) + return resp, nil }) lruEvent := s.lruProduce.NewEvent(lru.OptionEventName) @@ -372,7 +457,12 @@ func (s *singleService) SetBit( req *proto.SetBitRequest, ) (*proto.SetBitResponse, error) { work := event.EventWorkFunc(func() (interface{}, error) { - return s.dao.SetBit(req.Key, req.Val, req.Offer) + resp, err := s.dao.SetBit(req.Key, req.Val, req.Offer) + if err != nil { + return nil, err + } + s.aof.SendRequest("SetBit", req) + return resp, nil }) lruEvent := s.lruProduce.NewEvent(lru.OptionEventName) @@ -393,7 +483,12 @@ func (s *singleService) GetBit( req *proto.GetBitRequest, ) (*proto.GetBitResponse, error) { work := event.EventWorkFunc(func() (interface{}, error) { - return s.dao.GetBit(req.Key, req.Offer) + resp, err := s.dao.GetBit(req.Key, req.Offer) + if err != nil { + return nil, err + } + s.aof.SendRequest("GetBit", req) + return resp, nil }) lruEvent := s.lruProduce.NewEvent(lru.OptionEventName) @@ -414,7 +509,12 @@ func (s *singleService) GetRange( req *proto.GetRangeRequest, ) (*proto.GetRangeResponse, error) { work := event.EventWorkFunc(func() (interface{}, error) { - return s.dao.GetRange(req.Key, req.Start, req.End) + resp, err := s.dao.GetRange(req.Key, req.Start, req.End) + if err != nil { + return nil, err + } + s.aof.SendRequest("GetRange", req) + return resp, nil }) lruEvent := s.lruProduce.NewEvent(lru.OptionEventName) @@ -435,7 +535,12 @@ func (s *singleService) GetSet( req *proto.GetSetRequest, ) (*proto.GetSetResponse, error) { work := event.EventWorkFunc(func() (interface{}, error) { - return s.dao.GetSet(req.Key, req.Val) + resp, err := s.dao.GetSet(req.Key, req.Val) + if err != nil { + return nil, err + } + s.aof.SendRequest("GetSet", req) + return resp, nil }) lruEvent := s.lruProduce.NewEvent(lru.OptionEventName) @@ -456,7 +561,12 @@ func (s *singleService) StrLen( req *proto.StrLenRequest, ) (*proto.StrLenResponse, error) { work := event.EventWorkFunc(func() (interface{}, error) { - return s.dao.StrLen(req.Key) + resp, err := s.dao.StrLen(req.Key) + if err != nil { + return nil, err + } + s.aof.SendRequest("StrLen", req) + return resp, nil }) lruEvent := s.lruProduce.NewEvent(lru.OptionEventName) diff --git a/storage/temp/aof.template b/storage/temp/aof.template new file mode 100644 index 0000000..bececc4 --- /dev/null +++ b/storage/temp/aof.template @@ -0,0 +1,69 @@ +// Code generated by gen-struct. DO NOT EDIT. +// make gen-service generated +package persisted + +import ( + "bytes" + "encoding/hex" + "fmt" + + "gitee.com/timedb/wheatCache/pkg/errorx" + "gitee.com/timedb/wheatCache/pkg/proto" + protobuf "google.golang.org/protobuf/proto" +) + +type codecProto struct { +} + +func (c *codecProto) Encode(method string, m protobuf.Message) ([]byte, error) { + buf, err := protobuf.Marshal(m) + if err != nil { + return nil, err + } + + // 16 进制编码 + dst := make([]byte, hex.EncodedLen(len(buf))) + n := hex.Encode(dst, buf) + buffer := bytes.NewBuffer(dst[:n]) + buffer.WriteString(fmt.Sprint(":", method)) + + return buffer.Bytes(), nil +} + +func (c *codecProto) Decode(buf []byte) (protobuf.Message, error) { + hexBuf := bytes.Split(buf, []byte(":")) + if len(hexBuf) != 2 { + return nil, errorx.New("decode aof err: not a valid aof") + } + + method := string(hexBuf[1]) + n, err := hex.Decode(buf, hexBuf[0]) + if err != nil { + return nil, err + } + + return decode(method, buf[:n]) + +} + +func decode(method string, buf []byte) (protobuf.Message, error) { + switch method { + {% for key in keys -%} + case "{{key.method}}": + return decode{{key.method}}(buf) + {% endfor %} + } + + return nil, errorx.New("decode aof err, the method could not be resolved, method:%s", method) +} + +{% for key in keys %} +func decode{{key.method}}(buf []byte) (*proto.{{key.method}}Request, error) { + req := &proto.{{key.method}}Request{} + err := protobuf.Unmarshal(buf, req) + if err != nil { + return nil, err + } + return req, nil +} +{% endfor %} diff --git a/storage/temp/service.template b/storage/temp/service.template index e1eb81e..9d63598 100644 --- a/storage/temp/service.template +++ b/storage/temp/service.template @@ -16,7 +16,12 @@ func (s *singleService) {{key.method}}( req *proto.{{key.method}}Request, ) (*proto.{{key.method}}Response, error) { work := event.EventWorkFunc(func() (interface{}, error) { - return s.dao.{{key.method}}({% for opt in key.option %}req.{{opt[0]}}, {% endfor %}) + resp, err := s.dao.{{key.method}}({% for opt in key.option %}req.{{opt[0]}}, {% endfor %}) + if err != nil{ + return nil, err + } + s.aof.SendRequest("{{key.method}}", req) + return resp, nil }) lruEvent := s.lruProduce.NewEvent(lru.OptionEventName)