Merge pull request !81 from bandl/feat-storage-aof
This commit is contained in:
bandl 2021-11-01 16:00:57 +00:00 committed by Gitee
commit de59f13234
14 changed files with 780 additions and 28 deletions

View File

@ -33,6 +33,7 @@ func init() {
func setDefaultConfValue() {
// 设置一些默认值
viper.SetDefault("version", "base-01")
defaultStorage()
}
func LoadConf(path string) error {

11
conf/storage.go Normal file
View File

@ -0,0 +1,11 @@
package conf
import "github.com/spf13/viper"
func defaultStorage() {
// aof
viper.SetDefault("storage.aof-path", "/etc/wheat-cache/wheat.aof")
viper.SetDefault("storage.aof-flush-time", 5)
viper.SetDefault("storage.aof-check-time", 1)
viper.SetDefault("storage.aof-check-freq", 20)
}

View File

@ -6,6 +6,13 @@ storage:
host: '127.0.0.1'
port: 5890
timeOut: 2 # second
aof-codec: "b16" # 目前只实现了 b16 编码方案。
aof-path: "/etc/wheat-cache/wheat.aof"
aof-flush-time: 5 # second , 每 5 秒刷新缓冲区的内容到磁盘。
aof-check-time: 1 # 每 1 second 执行一次 io 检查
aof-check-freq: 20 # 在一个 aof-check-time 周期内,出现超过 aof-check-freq 的 IO 操作会刷新磁盘
# clearSize and maxSize must be Int
lruCache:

View File

@ -1,5 +1,9 @@
package proto
import (
"google.golang.org/protobuf/types/known/timestamppb"
)
type GetKeyBaseInterface interface {
GetKey() *BaseKey
}
@ -7,3 +11,25 @@ type GetKeyBaseInterface interface {
const (
BaseKeyMethodKey = "basekey"
)
// NewBaseKey
// keyttlexpire
func NewBaseKey(key string, t ...int64) *BaseKey {
var expire *timestamppb.Timestamp
var ttl int64
if len(t) > 1 {
expire = &timestamppb.Timestamp{
Seconds: t[1],
}
ttl = t[0]
} else if len(t) == 1 {
ttl = t[0]
}
return &BaseKey{
Key: key,
Expire: expire,
Ttl: ttl,
}
}

View File

@ -8,6 +8,7 @@ protoAddress = f"{sysPath}/protobuf"
tempPath = f"{sysPath}/storage/temp"
daoPath = f"{sysPath}/storage/dao"
servicePath = f"{sysPath}/storage/service"
aofPath = f"{sysPath}/storage/persisted"
class ProtoOption(object):
@ -133,16 +134,33 @@ def gen_single_service(proto):
with open(temp_path, 'w', encoding='utf-8') as f:
f.write(text)
def gen_aof(proto):
tem_text = load_template("aof.template")
template = Template(tem_text)
text = template.render(keys=proto)
temp_path = f"{aofPath}/codec.gen.go"
with open(temp_path, 'w', encoding='utf-8') as f:
f.write(text)
def format_code_go():
go_fmt(f"{daoPath}/interface.gen.go")
go_fmt(f"{servicePath}/single_service.gen.go")
go_fmt(f"{aofPath}/codec.gen.go")
def main():
# 加载 protobuf
protobuf = load_protobuf()
# 生成 dao 接口
gen_dao_interface(protobuf)
# 生成服务代码
gen_single_service(protobuf)
# 生成 aof 解码方案
gen_aof(protobuf)
format_code_go()
if __name__ == "__main__":

105
storage/persisted/aof.go Normal file
View File

@ -0,0 +1,105 @@
package persisted
import (
"bytes"
"os"
"sync/atomic"
"time"
"gitee.com/timedb/wheatCache/pkg/errorx"
"gitee.com/timedb/wheatCache/pkg/logx"
protobuf "google.golang.org/protobuf/proto"
)
type AOF struct {
codec AOFCodec
byteIO chan []byte
logPath string
ioFrequency int32
flushTime time.Duration
checkIOTime time.Duration
ioNum *int32
}
// NewAOF
// codec AOF 的编码解码器nil 时默认 protoCodec
// logPath AOF 持久化的地址
// flushTime 刷新到磁盘的间隔时间
// frequency, checkIOTime 使用一定频率检查,是否需要提前执行磁盘刷新
func NewAOF(codec AOFCodec, logPath string, flushTime time.Duration, frequency int, checkIOTime time.Duration) *AOF {
oneAOF.Do(func() {
if codec == nil {
codec = &codecProto{}
}
sysAOF = &AOF{
codec: codec,
ioFrequency: int32(frequency),
flushTime: flushTime,
logPath: logPath,
byteIO: make(chan []byte, defaultByteIONum),
ioNum: new(int32), // 统计当前的变更情况
checkIOTime: checkIOTime,
}
go sysAOF.flushLog()
})
return sysAOF
}
// 刷新数据到磁盘
func (a *AOF) flushLog() {
flushTc := time.NewTicker(a.flushTime)
frequencyTc := time.NewTicker(a.checkIOTime)
file, err := os.OpenFile(a.logPath, os.O_APPEND|os.O_WRONLY|os.O_CREATE, 0666)
if err != nil {
logx.Errorln(err)
return
}
flush := func() {
num := len(a.byteIO)
for i := 0; i < num; i++ {
aof := <-a.byteIO
buffer := bytes.NewBuffer(aof)
buffer.WriteString("\n")
file.Write(buffer.Bytes())
}
file.Sync()
}
for {
select {
case <-flushTc.C:
flush()
atomic.SwapInt32(a.ioNum, 0)
case <-frequencyTc.C:
if num := atomic.LoadInt32(a.ioNum); num >= a.ioFrequency {
flush()
}
atomic.SwapInt32(a.ioNum, 0)
}
}
}
func (a *AOF) DecodeAOF(buf []byte) (protobuf.Message, error) {
return a.codec.Decode(buf)
}
func (a *AOF) EncodeAOF(method string, m protobuf.Message) ([]byte, error) {
return a.codec.Encode(method, m)
}
func (a *AOF) SendRequest(method string, req interface{}) error {
if req, ok := req.(protobuf.Message); ok {
aof, err := a.EncodeAOF(method, req)
if err != nil {
return err
}
a.byteIO <- aof
atomic.AddInt32(a.ioNum, 1)
}
return errorx.New("type not is protobuf.message")
}

View File

@ -0,0 +1,51 @@
package persisted
import (
"bufio"
"fmt"
"io"
"os"
"strconv"
"testing"
"time"
"gitee.com/timedb/wheatCache/pkg/proto"
"github.com/stretchr/testify/require"
)
func TestAOF_SendRequest(t *testing.T) {
aof := NewAOF(nil, "./test.aof", time.Second*3, 5, time.Second*1)
for i := 0; i < 100; i++ {
req := &proto.ReduceRequest{
Key: proto.NewBaseKey(strconv.Itoa(i), int64(i)),
Renewal: int32(i),
}
aof.SendRequest("Reduce", req)
}
time.Sleep(4 * time.Second)
fmt.Println(len(aof.byteIO))
}
func TestAOF_ReadRequest(t *testing.T) {
aof := NewAOF(nil, "./test.aof", time.Second*2, 5, time.Second*1)
f, _ := os.Open("test.aof")
defer f.Close()
rd := bufio.NewReader(f)
for {
line, err := rd.ReadBytes('\n')
if err != nil {
if err != io.EOF {
require.NoError(t, err)
}
break
}
msg, err := aof.DecodeAOF(line[:len(line)-1])
require.NoError(t, err)
fmt.Println(msg)
}
}

View File

@ -0,0 +1,297 @@
// Code generated by gen-struct. DO NOT EDIT.
// make gen-service generated
package persisted
import (
"bytes"
"encoding/hex"
"fmt"
"gitee.com/timedb/wheatCache/pkg/errorx"
"gitee.com/timedb/wheatCache/pkg/proto"
protobuf "google.golang.org/protobuf/proto"
)
type codecProto struct {
}
func (c *codecProto) Encode(method string, m protobuf.Message) ([]byte, error) {
buf, err := protobuf.Marshal(m)
if err != nil {
return nil, err
}
// 16 进制编码
dst := make([]byte, hex.EncodedLen(len(buf)))
n := hex.Encode(dst, buf)
buffer := bytes.NewBuffer(dst[:n])
buffer.WriteString(fmt.Sprint(":", method))
return buffer.Bytes(), nil
}
func (c *codecProto) Decode(buf []byte) (protobuf.Message, error) {
hexBuf := bytes.Split(buf, []byte(":"))
if len(hexBuf) != 2 {
return nil, errorx.New("decode aof err: not a valid aof")
}
method := string(hexBuf[1])
n, err := hex.Decode(buf, hexBuf[0])
if err != nil {
return nil, err
}
return decode(method, buf[:n])
}
func decode(method string, buf []byte) (protobuf.Message, error) {
switch method {
case "LIndex":
return decodeLIndex(buf)
case "LLen":
return decodeLLen(buf)
case "LPop":
return decodeLPop(buf)
case "LPush":
return decodeLPush(buf)
case "LPushX":
return decodeLPushX(buf)
case "LRange":
return decodeLRange(buf)
case "LRem":
return decodeLRem(buf)
case "LSet":
return decodeLSet(buf)
case "RPop":
return decodeRPop(buf)
case "LTrim":
return decodeLTrim(buf)
case "RPush":
return decodeRPush(buf)
case "RPushX":
return decodeRPushX(buf)
case "Set":
return decodeSet(buf)
case "Get":
return decodeGet(buf)
case "Add":
return decodeAdd(buf)
case "Reduce":
return decodeReduce(buf)
case "Setnx":
return decodeSetnx(buf)
case "SetBit":
return decodeSetBit(buf)
case "GetBit":
return decodeGetBit(buf)
case "GetRange":
return decodeGetRange(buf)
case "GetSet":
return decodeGetSet(buf)
case "StrLen":
return decodeStrLen(buf)
}
return nil, errorx.New("decode aof err, the method could not be resolved, method:%s", method)
}
func decodeLIndex(buf []byte) (*proto.LIndexRequest, error) {
req := &proto.LIndexRequest{}
err := protobuf.Unmarshal(buf, req)
if err != nil {
return nil, err
}
return req, nil
}
func decodeLLen(buf []byte) (*proto.LLenRequest, error) {
req := &proto.LLenRequest{}
err := protobuf.Unmarshal(buf, req)
if err != nil {
return nil, err
}
return req, nil
}
func decodeLPop(buf []byte) (*proto.LPopRequest, error) {
req := &proto.LPopRequest{}
err := protobuf.Unmarshal(buf, req)
if err != nil {
return nil, err
}
return req, nil
}
func decodeLPush(buf []byte) (*proto.LPushRequest, error) {
req := &proto.LPushRequest{}
err := protobuf.Unmarshal(buf, req)
if err != nil {
return nil, err
}
return req, nil
}
func decodeLPushX(buf []byte) (*proto.LPushXRequest, error) {
req := &proto.LPushXRequest{}
err := protobuf.Unmarshal(buf, req)
if err != nil {
return nil, err
}
return req, nil
}
func decodeLRange(buf []byte) (*proto.LRangeRequest, error) {
req := &proto.LRangeRequest{}
err := protobuf.Unmarshal(buf, req)
if err != nil {
return nil, err
}
return req, nil
}
func decodeLRem(buf []byte) (*proto.LRemRequest, error) {
req := &proto.LRemRequest{}
err := protobuf.Unmarshal(buf, req)
if err != nil {
return nil, err
}
return req, nil
}
func decodeLSet(buf []byte) (*proto.LSetRequest, error) {
req := &proto.LSetRequest{}
err := protobuf.Unmarshal(buf, req)
if err != nil {
return nil, err
}
return req, nil
}
func decodeRPop(buf []byte) (*proto.RPopRequest, error) {
req := &proto.RPopRequest{}
err := protobuf.Unmarshal(buf, req)
if err != nil {
return nil, err
}
return req, nil
}
func decodeLTrim(buf []byte) (*proto.LTrimRequest, error) {
req := &proto.LTrimRequest{}
err := protobuf.Unmarshal(buf, req)
if err != nil {
return nil, err
}
return req, nil
}
func decodeRPush(buf []byte) (*proto.RPushRequest, error) {
req := &proto.RPushRequest{}
err := protobuf.Unmarshal(buf, req)
if err != nil {
return nil, err
}
return req, nil
}
func decodeRPushX(buf []byte) (*proto.RPushXRequest, error) {
req := &proto.RPushXRequest{}
err := protobuf.Unmarshal(buf, req)
if err != nil {
return nil, err
}
return req, nil
}
func decodeSet(buf []byte) (*proto.SetRequest, error) {
req := &proto.SetRequest{}
err := protobuf.Unmarshal(buf, req)
if err != nil {
return nil, err
}
return req, nil
}
func decodeGet(buf []byte) (*proto.GetRequest, error) {
req := &proto.GetRequest{}
err := protobuf.Unmarshal(buf, req)
if err != nil {
return nil, err
}
return req, nil
}
func decodeAdd(buf []byte) (*proto.AddRequest, error) {
req := &proto.AddRequest{}
err := protobuf.Unmarshal(buf, req)
if err != nil {
return nil, err
}
return req, nil
}
func decodeReduce(buf []byte) (*proto.ReduceRequest, error) {
req := &proto.ReduceRequest{}
err := protobuf.Unmarshal(buf, req)
if err != nil {
return nil, err
}
return req, nil
}
func decodeSetnx(buf []byte) (*proto.SetnxRequest, error) {
req := &proto.SetnxRequest{}
err := protobuf.Unmarshal(buf, req)
if err != nil {
return nil, err
}
return req, nil
}
func decodeSetBit(buf []byte) (*proto.SetBitRequest, error) {
req := &proto.SetBitRequest{}
err := protobuf.Unmarshal(buf, req)
if err != nil {
return nil, err
}
return req, nil
}
func decodeGetBit(buf []byte) (*proto.GetBitRequest, error) {
req := &proto.GetBitRequest{}
err := protobuf.Unmarshal(buf, req)
if err != nil {
return nil, err
}
return req, nil
}
func decodeGetRange(buf []byte) (*proto.GetRangeRequest, error) {
req := &proto.GetRangeRequest{}
err := protobuf.Unmarshal(buf, req)
if err != nil {
return nil, err
}
return req, nil
}
func decodeGetSet(buf []byte) (*proto.GetSetRequest, error) {
req := &proto.GetSetRequest{}
err := protobuf.Unmarshal(buf, req)
if err != nil {
return nil, err
}
return req, nil
}
func decodeStrLen(buf []byte) (*proto.StrLenRequest, error) {
req := &proto.StrLenRequest{}
err := protobuf.Unmarshal(buf, req)
if err != nil {
return nil, err
}
return req, nil
}

View File

@ -0,0 +1,21 @@
package persisted
import (
"sync"
protobuf "google.golang.org/protobuf/proto"
)
type AOFCodec interface {
Encode(method string, m protobuf.Message) ([]byte, error)
Decode(buf []byte) (protobuf.Message, error)
}
var (
oneAOF sync.Once
sysAOF *AOF
)
const (
defaultByteIONum = 20000
)

View File

@ -9,4 +9,6 @@ var (
sysSingleService *singleService
)
const timeOutDefault = 2
const (
timeOutDefault = 2
)

View File

@ -5,8 +5,10 @@ import (
"gitee.com/timedb/wheatCache/pkg/event"
"gitee.com/timedb/wheatCache/pkg/lru"
"gitee.com/timedb/wheatCache/pkg/middle"
"gitee.com/timedb/wheatCache/pkg/proto"
"gitee.com/timedb/wheatCache/storage/dao"
"gitee.com/timedb/wheatCache/storage/persisted"
"github.com/spf13/viper"
)
@ -16,6 +18,28 @@ type singleService struct {
timeOut time.Duration
lruCache *lru.SingleCache
dao dao.Interface
aof *persisted.AOF
}
func loadAOF() *persisted.AOF {
decName := viper.GetString("storage.aof-codec")
var aofCodec persisted.AOFCodec
switch decName {
case "":
return nil
case "b16":
aofCodec = nil
default:
aofCodec = nil
}
aofPath := viper.GetString("storage.aof-path")
aofFlush := viper.GetInt("storage.aof-flush-time")
aofCheckTime := viper.GetInt("storage.aof-check-time")
aofCheckFreq := viper.GetInt("storage.aof-check-freq")
return persisted.NewAOF(aofCodec, aofPath, time.Second*time.Duration(aofFlush),
aofCheckFreq, time.Second*time.Duration(aofCheckTime))
}
func NewSingleServer() proto.CommServerServer {
@ -27,11 +51,16 @@ func NewSingleServer() proto.CommServerServer {
lruCache := lru.NewLRUCache()
// 加载 aof
aof := loadAOF()
ser := &singleService{
lruCache: lruCache,
lruProduce: event.NewProduce(lruCache.GetDriver()),
timeOut: time.Duration(timeOut) * time.Second,
dao: dao.NewDao(lruCache),
lruCache: lruCache,
lruProduce: event.NewProduce(lruCache.GetDriver()),
timeOut: time.Duration(timeOut) * time.Second,
dao: dao.NewDao(lruCache),
middleProduce: event.NewProduce(middle.NewMiddleWare().GetEventDriver()),
aof: aof,
}
sysSingleService = ser

View File

@ -15,7 +15,12 @@ func (s *singleService) LIndex(
req *proto.LIndexRequest,
) (*proto.LIndexResponse, error) {
work := event.EventWorkFunc(func() (interface{}, error) {
return s.dao.LIndex(req.Key, req.Index)
resp, err := s.dao.LIndex(req.Key, req.Index)
if err != nil {
return nil, err
}
s.aof.SendRequest("LIndex", req)
return resp, nil
})
lruEvent := s.lruProduce.NewEvent(lru.OptionEventName)
@ -36,7 +41,12 @@ func (s *singleService) LLen(
req *proto.LLenRequest,
) (*proto.LLenResponse, error) {
work := event.EventWorkFunc(func() (interface{}, error) {
return s.dao.LLen(req.Key)
resp, err := s.dao.LLen(req.Key)
if err != nil {
return nil, err
}
s.aof.SendRequest("LLen", req)
return resp, nil
})
lruEvent := s.lruProduce.NewEvent(lru.OptionEventName)
@ -57,7 +67,12 @@ func (s *singleService) LPop(
req *proto.LPopRequest,
) (*proto.LPopResponse, error) {
work := event.EventWorkFunc(func() (interface{}, error) {
return s.dao.LPop(req.Key, req.Count)
resp, err := s.dao.LPop(req.Key, req.Count)
if err != nil {
return nil, err
}
s.aof.SendRequest("LPop", req)
return resp, nil
})
lruEvent := s.lruProduce.NewEvent(lru.OptionEventName)
@ -78,7 +93,12 @@ func (s *singleService) LPush(
req *proto.LPushRequest,
) (*proto.LPushResponse, error) {
work := event.EventWorkFunc(func() (interface{}, error) {
return s.dao.LPush(req.Key, req.Values)
resp, err := s.dao.LPush(req.Key, req.Values)
if err != nil {
return nil, err
}
s.aof.SendRequest("LPush", req)
return resp, nil
})
lruEvent := s.lruProduce.NewEvent(lru.OptionEventName)
@ -99,7 +119,12 @@ func (s *singleService) LPushX(
req *proto.LPushXRequest,
) (*proto.LPushXResponse, error) {
work := event.EventWorkFunc(func() (interface{}, error) {
return s.dao.LPushX(req.Key, req.Values)
resp, err := s.dao.LPushX(req.Key, req.Values)
if err != nil {
return nil, err
}
s.aof.SendRequest("LPushX", req)
return resp, nil
})
lruEvent := s.lruProduce.NewEvent(lru.OptionEventName)
@ -120,7 +145,12 @@ func (s *singleService) LRange(
req *proto.LRangeRequest,
) (*proto.LRangeResponse, error) {
work := event.EventWorkFunc(func() (interface{}, error) {
return s.dao.LRange(req.Key, req.Start, req.End)
resp, err := s.dao.LRange(req.Key, req.Start, req.End)
if err != nil {
return nil, err
}
s.aof.SendRequest("LRange", req)
return resp, nil
})
lruEvent := s.lruProduce.NewEvent(lru.OptionEventName)
@ -141,7 +171,12 @@ func (s *singleService) LRem(
req *proto.LRemRequest,
) (*proto.LRemResponse, error) {
work := event.EventWorkFunc(func() (interface{}, error) {
return s.dao.LRem(req.Key, req.Count, req.Value)
resp, err := s.dao.LRem(req.Key, req.Count, req.Value)
if err != nil {
return nil, err
}
s.aof.SendRequest("LRem", req)
return resp, nil
})
lruEvent := s.lruProduce.NewEvent(lru.OptionEventName)
@ -162,7 +197,12 @@ func (s *singleService) LSet(
req *proto.LSetRequest,
) (*proto.LSetResponse, error) {
work := event.EventWorkFunc(func() (interface{}, error) {
return s.dao.LSet(req.Key, req.Index, req.Value)
resp, err := s.dao.LSet(req.Key, req.Index, req.Value)
if err != nil {
return nil, err
}
s.aof.SendRequest("LSet", req)
return resp, nil
})
lruEvent := s.lruProduce.NewEvent(lru.OptionEventName)
@ -183,7 +223,12 @@ func (s *singleService) RPop(
req *proto.RPopRequest,
) (*proto.RPopResponse, error) {
work := event.EventWorkFunc(func() (interface{}, error) {
return s.dao.RPop(req.Key, req.Count)
resp, err := s.dao.RPop(req.Key, req.Count)
if err != nil {
return nil, err
}
s.aof.SendRequest("RPop", req)
return resp, nil
})
lruEvent := s.lruProduce.NewEvent(lru.OptionEventName)
@ -204,7 +249,12 @@ func (s *singleService) LTrim(
req *proto.LTrimRequest,
) (*proto.LTrimResponse, error) {
work := event.EventWorkFunc(func() (interface{}, error) {
return s.dao.LTrim(req.Key, req.Start, req.End)
resp, err := s.dao.LTrim(req.Key, req.Start, req.End)
if err != nil {
return nil, err
}
s.aof.SendRequest("LTrim", req)
return resp, nil
})
lruEvent := s.lruProduce.NewEvent(lru.OptionEventName)
@ -225,7 +275,12 @@ func (s *singleService) RPush(
req *proto.RPushRequest,
) (*proto.RPushResponse, error) {
work := event.EventWorkFunc(func() (interface{}, error) {
return s.dao.RPush(req.Key, req.Values)
resp, err := s.dao.RPush(req.Key, req.Values)
if err != nil {
return nil, err
}
s.aof.SendRequest("RPush", req)
return resp, nil
})
lruEvent := s.lruProduce.NewEvent(lru.OptionEventName)
@ -246,7 +301,12 @@ func (s *singleService) RPushX(
req *proto.RPushXRequest,
) (*proto.RPushXResponse, error) {
work := event.EventWorkFunc(func() (interface{}, error) {
return s.dao.RPushX(req.Key, req.Values)
resp, err := s.dao.RPushX(req.Key, req.Values)
if err != nil {
return nil, err
}
s.aof.SendRequest("RPushX", req)
return resp, nil
})
lruEvent := s.lruProduce.NewEvent(lru.OptionEventName)
@ -267,7 +327,12 @@ func (s *singleService) Set(
req *proto.SetRequest,
) (*proto.SetResponse, error) {
work := event.EventWorkFunc(func() (interface{}, error) {
return s.dao.Set(req.Key, req.Val)
resp, err := s.dao.Set(req.Key, req.Val)
if err != nil {
return nil, err
}
s.aof.SendRequest("Set", req)
return resp, nil
})
lruEvent := s.lruProduce.NewEvent(lru.OptionEventName)
@ -288,7 +353,12 @@ func (s *singleService) Get(
req *proto.GetRequest,
) (*proto.GetResponse, error) {
work := event.EventWorkFunc(func() (interface{}, error) {
return s.dao.Get(req.Key)
resp, err := s.dao.Get(req.Key)
if err != nil {
return nil, err
}
s.aof.SendRequest("Get", req)
return resp, nil
})
lruEvent := s.lruProduce.NewEvent(lru.OptionEventName)
@ -309,7 +379,12 @@ func (s *singleService) Add(
req *proto.AddRequest,
) (*proto.AddResponse, error) {
work := event.EventWorkFunc(func() (interface{}, error) {
return s.dao.Add(req.Key, req.Renewal)
resp, err := s.dao.Add(req.Key, req.Renewal)
if err != nil {
return nil, err
}
s.aof.SendRequest("Add", req)
return resp, nil
})
lruEvent := s.lruProduce.NewEvent(lru.OptionEventName)
@ -330,7 +405,12 @@ func (s *singleService) Reduce(
req *proto.ReduceRequest,
) (*proto.ReduceResponse, error) {
work := event.EventWorkFunc(func() (interface{}, error) {
return s.dao.Reduce(req.Key, req.Renewal)
resp, err := s.dao.Reduce(req.Key, req.Renewal)
if err != nil {
return nil, err
}
s.aof.SendRequest("Reduce", req)
return resp, nil
})
lruEvent := s.lruProduce.NewEvent(lru.OptionEventName)
@ -351,7 +431,12 @@ func (s *singleService) Setnx(
req *proto.SetnxRequest,
) (*proto.SetnxResponse, error) {
work := event.EventWorkFunc(func() (interface{}, error) {
return s.dao.Setnx(req.Key, req.Val)
resp, err := s.dao.Setnx(req.Key, req.Val)
if err != nil {
return nil, err
}
s.aof.SendRequest("Setnx", req)
return resp, nil
})
lruEvent := s.lruProduce.NewEvent(lru.OptionEventName)
@ -372,7 +457,12 @@ func (s *singleService) SetBit(
req *proto.SetBitRequest,
) (*proto.SetBitResponse, error) {
work := event.EventWorkFunc(func() (interface{}, error) {
return s.dao.SetBit(req.Key, req.Val, req.Offer)
resp, err := s.dao.SetBit(req.Key, req.Val, req.Offer)
if err != nil {
return nil, err
}
s.aof.SendRequest("SetBit", req)
return resp, nil
})
lruEvent := s.lruProduce.NewEvent(lru.OptionEventName)
@ -393,7 +483,12 @@ func (s *singleService) GetBit(
req *proto.GetBitRequest,
) (*proto.GetBitResponse, error) {
work := event.EventWorkFunc(func() (interface{}, error) {
return s.dao.GetBit(req.Key, req.Offer)
resp, err := s.dao.GetBit(req.Key, req.Offer)
if err != nil {
return nil, err
}
s.aof.SendRequest("GetBit", req)
return resp, nil
})
lruEvent := s.lruProduce.NewEvent(lru.OptionEventName)
@ -414,7 +509,12 @@ func (s *singleService) GetRange(
req *proto.GetRangeRequest,
) (*proto.GetRangeResponse, error) {
work := event.EventWorkFunc(func() (interface{}, error) {
return s.dao.GetRange(req.Key, req.Start, req.End)
resp, err := s.dao.GetRange(req.Key, req.Start, req.End)
if err != nil {
return nil, err
}
s.aof.SendRequest("GetRange", req)
return resp, nil
})
lruEvent := s.lruProduce.NewEvent(lru.OptionEventName)
@ -435,7 +535,12 @@ func (s *singleService) GetSet(
req *proto.GetSetRequest,
) (*proto.GetSetResponse, error) {
work := event.EventWorkFunc(func() (interface{}, error) {
return s.dao.GetSet(req.Key, req.Val)
resp, err := s.dao.GetSet(req.Key, req.Val)
if err != nil {
return nil, err
}
s.aof.SendRequest("GetSet", req)
return resp, nil
})
lruEvent := s.lruProduce.NewEvent(lru.OptionEventName)
@ -456,7 +561,12 @@ func (s *singleService) StrLen(
req *proto.StrLenRequest,
) (*proto.StrLenResponse, error) {
work := event.EventWorkFunc(func() (interface{}, error) {
return s.dao.StrLen(req.Key)
resp, err := s.dao.StrLen(req.Key)
if err != nil {
return nil, err
}
s.aof.SendRequest("StrLen", req)
return resp, nil
})
lruEvent := s.lruProduce.NewEvent(lru.OptionEventName)

69
storage/temp/aof.template Normal file
View File

@ -0,0 +1,69 @@
// Code generated by gen-struct. DO NOT EDIT.
// make gen-service generated
package persisted
import (
"bytes"
"encoding/hex"
"fmt"
"gitee.com/timedb/wheatCache/pkg/errorx"
"gitee.com/timedb/wheatCache/pkg/proto"
protobuf "google.golang.org/protobuf/proto"
)
type codecProto struct {
}
func (c *codecProto) Encode(method string, m protobuf.Message) ([]byte, error) {
buf, err := protobuf.Marshal(m)
if err != nil {
return nil, err
}
// 16 进制编码
dst := make([]byte, hex.EncodedLen(len(buf)))
n := hex.Encode(dst, buf)
buffer := bytes.NewBuffer(dst[:n])
buffer.WriteString(fmt.Sprint(":", method))
return buffer.Bytes(), nil
}
func (c *codecProto) Decode(buf []byte) (protobuf.Message, error) {
hexBuf := bytes.Split(buf, []byte(":"))
if len(hexBuf) != 2 {
return nil, errorx.New("decode aof err: not a valid aof")
}
method := string(hexBuf[1])
n, err := hex.Decode(buf, hexBuf[0])
if err != nil {
return nil, err
}
return decode(method, buf[:n])
}
func decode(method string, buf []byte) (protobuf.Message, error) {
switch method {
{% for key in keys -%}
case "{{key.method}}":
return decode{{key.method}}(buf)
{% endfor %}
}
return nil, errorx.New("decode aof err, the method could not be resolved, method:%s", method)
}
{% for key in keys %}
func decode{{key.method}}(buf []byte) (*proto.{{key.method}}Request, error) {
req := &proto.{{key.method}}Request{}
err := protobuf.Unmarshal(buf, req)
if err != nil {
return nil, err
}
return req, nil
}
{% endfor %}

View File

@ -16,7 +16,12 @@ func (s *singleService) {{key.method}}(
req *proto.{{key.method}}Request,
) (*proto.{{key.method}}Response, error) {
work := event.EventWorkFunc(func() (interface{}, error) {
return s.dao.{{key.method}}({% for opt in key.option %}req.{{opt[0]}}, {% endfor %})
resp, err := s.dao.{{key.method}}({% for opt in key.option %}req.{{opt[0]}}, {% endfor %})
if err != nil{
return nil, err
}
s.aof.SendRequest("{{key.method}}", req)
return resp, nil
})
lruEvent := s.lruProduce.NewEvent(lru.OptionEventName)