diff --git a/storage/persisted/aof.go b/storage/persisted/aof.go new file mode 100644 index 0000000..7a6d7df --- /dev/null +++ b/storage/persisted/aof.go @@ -0,0 +1,104 @@ +package persisted + +import ( + "bytes" + "os" + "sync/atomic" + "time" + + "gitee.com/timedb/wheatCache/pkg/errorx" + "gitee.com/timedb/wheatCache/pkg/logx" + protobuf "google.golang.org/protobuf/proto" +) + +type AOF struct { + codec AOFCodec + byteIO chan []byte + logPath string + ioFrequency int32 + flushTime time.Duration + checkIOTime time.Duration + ioNum *int32 +} + +// NewAOF +// codec AOF 的编码解码器,nil 时默认 protoCodec +// logPath AOF 持久化的地址 +// flushTime 刷新到磁盘的间隔时间 +// frequency, checkIOTime 使用一定频率检查,是否需要提前执行磁盘刷新 +func NewAOF(codec AOFCodec, logPath string, flushTime time.Duration, frequency int, checkIOTime time.Duration) *AOF { + oneAOF.Do(func() { + if codec == nil { + codec = &codecProto{} + } + sysAOF = &AOF{ + codec: codec, + ioFrequency: int32(frequency), + flushTime: flushTime, + logPath: logPath, + byteIO: make(chan []byte, defaultByteIONum), + ioNum: new(int32), // 统计当前的变更情况 + checkIOTime: checkIOTime, + } + go sysAOF.flushLog() + }) + + return sysAOF +} + +// 刷新数据到磁盘 +func (a *AOF) flushLog() { + flushTc := time.NewTicker(a.flushTime) + frequencyTc := time.NewTicker(a.checkIOTime) + + file, err := os.OpenFile(a.logPath, os.O_APPEND|os.O_WRONLY|os.O_CREATE, 0666) + if err != nil { + logx.Errorln(err) + return + } + + flush := func() { + num := len(a.byteIO) + for i := 0; i < num; i++ { + aof := <-a.byteIO + buffer := bytes.NewBuffer(aof) + buffer.WriteString("\n") + file.Write(buffer.Bytes()) + } + + file.Sync() + } + + for { + select { + case <-flushTc.C: + flush() + case <-frequencyTc.C: + if num := atomic.LoadInt32(a.ioNum); num >= a.ioFrequency { + flush() + atomic.SwapInt32(a.ioNum, 0) + } + } + } + +} + +func (a *AOF) DecodeAOF(buf []byte) (protobuf.Message, error) { + return a.codec.Decode(buf) +} + +func (a *AOF) EncodeAOF(method string, m protobuf.Message) ([]byte, error) { + return a.codec.Encode(method, m) +} + +func (a *AOF) SendRequest(method string, req interface{}) error { + if req, ok := req.(protobuf.Message); ok { + aof, err := a.EncodeAOF(method, req) + if err != nil { + return err + } + a.byteIO <- aof + atomic.AddInt32(a.ioNum, 1) + } + return errorx.New("type not is protobuf.message") +} diff --git a/storage/persisted/define.go b/storage/persisted/define.go new file mode 100644 index 0000000..ba182cb --- /dev/null +++ b/storage/persisted/define.go @@ -0,0 +1,21 @@ +package persisted + +import ( + "sync" + + protobuf "google.golang.org/protobuf/proto" +) + +type AOFCodec interface { + Encode(method string, m protobuf.Message) ([]byte, error) + Decode(buf []byte) (protobuf.Message, error) +} + +var ( + oneAOF sync.Once + sysAOF *AOF +) + +const ( + defaultByteIONum = 20000 +)