106 lines
2.3 KiB
Go
106 lines
2.3 KiB
Go
package persisted
|
||
|
||
import (
|
||
"bytes"
|
||
"os"
|
||
"sync/atomic"
|
||
"time"
|
||
|
||
"gitee.com/timedb/wheatCache/pkg/errorx"
|
||
"gitee.com/timedb/wheatCache/pkg/logx"
|
||
protobuf "google.golang.org/protobuf/proto"
|
||
)
|
||
|
||
type AOF struct {
|
||
codec AOFCodec
|
||
byteIO chan []byte
|
||
logPath string
|
||
ioFrequency int32
|
||
flushTime time.Duration
|
||
checkIOTime time.Duration
|
||
ioNum *int32
|
||
}
|
||
|
||
// NewAOF
|
||
// codec AOF 的编码解码器,nil 时默认 protoCodec
|
||
// logPath AOF 持久化的地址
|
||
// flushTime 刷新到磁盘的间隔时间
|
||
// frequency, checkIOTime 使用一定频率检查,是否需要提前执行磁盘刷新
|
||
func NewAOF(codec AOFCodec, logPath string, flushTime time.Duration, frequency int, checkIOTime time.Duration) *AOF {
|
||
oneAOF.Do(func() {
|
||
if codec == nil {
|
||
codec = &codecProto{}
|
||
}
|
||
sysAOF = &AOF{
|
||
codec: codec,
|
||
ioFrequency: int32(frequency),
|
||
flushTime: flushTime,
|
||
logPath: logPath,
|
||
byteIO: make(chan []byte, defaultByteIONum),
|
||
ioNum: new(int32), // 统计当前的变更情况
|
||
checkIOTime: checkIOTime,
|
||
}
|
||
go sysAOF.flushLog()
|
||
})
|
||
|
||
return sysAOF
|
||
}
|
||
|
||
// 刷新数据到磁盘
|
||
func (a *AOF) flushLog() {
|
||
flushTc := time.NewTicker(a.flushTime)
|
||
frequencyTc := time.NewTicker(a.checkIOTime)
|
||
|
||
file, err := os.OpenFile(a.logPath, os.O_APPEND|os.O_WRONLY|os.O_CREATE, 0666)
|
||
if err != nil {
|
||
logx.Errorln(err)
|
||
return
|
||
}
|
||
|
||
flush := func() {
|
||
num := len(a.byteIO)
|
||
for i := 0; i < num; i++ {
|
||
aof := <-a.byteIO
|
||
buffer := bytes.NewBuffer(aof)
|
||
buffer.WriteString("\n")
|
||
file.Write(buffer.Bytes())
|
||
}
|
||
|
||
file.Sync()
|
||
}
|
||
|
||
for {
|
||
select {
|
||
case <-flushTc.C:
|
||
flush()
|
||
atomic.SwapInt32(a.ioNum, 0)
|
||
case <-frequencyTc.C:
|
||
if num := atomic.LoadInt32(a.ioNum); num >= a.ioFrequency {
|
||
flush()
|
||
}
|
||
atomic.SwapInt32(a.ioNum, 0)
|
||
}
|
||
}
|
||
|
||
}
|
||
|
||
func (a *AOF) DecodeAOF(buf []byte) (protobuf.Message, error) {
|
||
return a.codec.Decode(buf)
|
||
}
|
||
|
||
func (a *AOF) EncodeAOF(method string, m protobuf.Message) ([]byte, error) {
|
||
return a.codec.Encode(method, m)
|
||
}
|
||
|
||
func (a *AOF) SendRequest(method string, req interface{}) error {
|
||
if req, ok := req.(protobuf.Message); ok {
|
||
aof, err := a.EncodeAOF(method, req)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
a.byteIO <- aof
|
||
atomic.AddInt32(a.ioNum, 1)
|
||
}
|
||
return errorx.New("type not is protobuf.message")
|
||
}
|