wheat-cache/storage/persisted/aof.go

106 lines
2.3 KiB
Go
Raw Permalink Normal View History

2021-11-01 21:56:23 +08:00
package persisted
import (
"bytes"
"os"
"sync/atomic"
"time"
"gitee.com/timedb/wheatCache/pkg/errorx"
"gitee.com/timedb/wheatCache/pkg/logx"
protobuf "google.golang.org/protobuf/proto"
)
type AOF struct {
codec AOFCodec
byteIO chan []byte
logPath string
ioFrequency int32
flushTime time.Duration
checkIOTime time.Duration
ioNum *int32
}
// NewAOF
// codec AOF 的编码解码器nil 时默认 protoCodec
// logPath AOF 持久化的地址
// flushTime 刷新到磁盘的间隔时间
// frequency, checkIOTime 使用一定频率检查,是否需要提前执行磁盘刷新
func NewAOF(codec AOFCodec, logPath string, flushTime time.Duration, frequency int, checkIOTime time.Duration) *AOF {
oneAOF.Do(func() {
if codec == nil {
codec = &codecProto{}
}
sysAOF = &AOF{
codec: codec,
ioFrequency: int32(frequency),
flushTime: flushTime,
logPath: logPath,
byteIO: make(chan []byte, defaultByteIONum),
ioNum: new(int32), // 统计当前的变更情况
checkIOTime: checkIOTime,
}
go sysAOF.flushLog()
})
return sysAOF
}
// 刷新数据到磁盘
func (a *AOF) flushLog() {
flushTc := time.NewTicker(a.flushTime)
frequencyTc := time.NewTicker(a.checkIOTime)
file, err := os.OpenFile(a.logPath, os.O_APPEND|os.O_WRONLY|os.O_CREATE, 0666)
if err != nil {
logx.Errorln(err)
return
}
flush := func() {
num := len(a.byteIO)
for i := 0; i < num; i++ {
aof := <-a.byteIO
buffer := bytes.NewBuffer(aof)
buffer.WriteString("\n")
file.Write(buffer.Bytes())
}
file.Sync()
}
for {
select {
case <-flushTc.C:
flush()
2021-11-01 23:48:48 +08:00
atomic.SwapInt32(a.ioNum, 0)
2021-11-01 21:56:23 +08:00
case <-frequencyTc.C:
if num := atomic.LoadInt32(a.ioNum); num >= a.ioFrequency {
flush()
}
2021-11-01 23:48:48 +08:00
atomic.SwapInt32(a.ioNum, 0)
2021-11-01 21:56:23 +08:00
}
}
}
func (a *AOF) DecodeAOF(buf []byte) (protobuf.Message, error) {
return a.codec.Decode(buf)
}
func (a *AOF) EncodeAOF(method string, m protobuf.Message) ([]byte, error) {
return a.codec.Encode(method, m)
}
func (a *AOF) SendRequest(method string, req interface{}) error {
if req, ok := req.(protobuf.Message); ok {
aof, err := a.EncodeAOF(method, req)
if err != nil {
return err
}
a.byteIO <- aof
atomic.AddInt32(a.ioNum, 1)
}
return errorx.New("type not is protobuf.message")
}