forked from p53841790/wheat-cache
feat(aof): add aof log
This commit is contained in:
parent
170ba44d2d
commit
4ffa91ac6a
|
@ -0,0 +1,104 @@
|
|||
package persisted
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"os"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"gitee.com/timedb/wheatCache/pkg/errorx"
|
||||
"gitee.com/timedb/wheatCache/pkg/logx"
|
||||
protobuf "google.golang.org/protobuf/proto"
|
||||
)
|
||||
|
||||
type AOF struct {
|
||||
codec AOFCodec
|
||||
byteIO chan []byte
|
||||
logPath string
|
||||
ioFrequency int32
|
||||
flushTime time.Duration
|
||||
checkIOTime time.Duration
|
||||
ioNum *int32
|
||||
}
|
||||
|
||||
// NewAOF
|
||||
// codec AOF 的编码解码器,nil 时默认 protoCodec
|
||||
// logPath AOF 持久化的地址
|
||||
// flushTime 刷新到磁盘的间隔时间
|
||||
// frequency, checkIOTime 使用一定频率检查,是否需要提前执行磁盘刷新
|
||||
func NewAOF(codec AOFCodec, logPath string, flushTime time.Duration, frequency int, checkIOTime time.Duration) *AOF {
|
||||
oneAOF.Do(func() {
|
||||
if codec == nil {
|
||||
codec = &codecProto{}
|
||||
}
|
||||
sysAOF = &AOF{
|
||||
codec: codec,
|
||||
ioFrequency: int32(frequency),
|
||||
flushTime: flushTime,
|
||||
logPath: logPath,
|
||||
byteIO: make(chan []byte, defaultByteIONum),
|
||||
ioNum: new(int32), // 统计当前的变更情况
|
||||
checkIOTime: checkIOTime,
|
||||
}
|
||||
go sysAOF.flushLog()
|
||||
})
|
||||
|
||||
return sysAOF
|
||||
}
|
||||
|
||||
// 刷新数据到磁盘
|
||||
func (a *AOF) flushLog() {
|
||||
flushTc := time.NewTicker(a.flushTime)
|
||||
frequencyTc := time.NewTicker(a.checkIOTime)
|
||||
|
||||
file, err := os.OpenFile(a.logPath, os.O_APPEND|os.O_WRONLY|os.O_CREATE, 0666)
|
||||
if err != nil {
|
||||
logx.Errorln(err)
|
||||
return
|
||||
}
|
||||
|
||||
flush := func() {
|
||||
num := len(a.byteIO)
|
||||
for i := 0; i < num; i++ {
|
||||
aof := <-a.byteIO
|
||||
buffer := bytes.NewBuffer(aof)
|
||||
buffer.WriteString("\n")
|
||||
file.Write(buffer.Bytes())
|
||||
}
|
||||
|
||||
file.Sync()
|
||||
}
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-flushTc.C:
|
||||
flush()
|
||||
case <-frequencyTc.C:
|
||||
if num := atomic.LoadInt32(a.ioNum); num >= a.ioFrequency {
|
||||
flush()
|
||||
atomic.SwapInt32(a.ioNum, 0)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func (a *AOF) DecodeAOF(buf []byte) (protobuf.Message, error) {
|
||||
return a.codec.Decode(buf)
|
||||
}
|
||||
|
||||
func (a *AOF) EncodeAOF(method string, m protobuf.Message) ([]byte, error) {
|
||||
return a.codec.Encode(method, m)
|
||||
}
|
||||
|
||||
func (a *AOF) SendRequest(method string, req interface{}) error {
|
||||
if req, ok := req.(protobuf.Message); ok {
|
||||
aof, err := a.EncodeAOF(method, req)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
a.byteIO <- aof
|
||||
atomic.AddInt32(a.ioNum, 1)
|
||||
}
|
||||
return errorx.New("type not is protobuf.message")
|
||||
}
|
|
@ -0,0 +1,21 @@
|
|||
package persisted
|
||||
|
||||
import (
|
||||
"sync"
|
||||
|
||||
protobuf "google.golang.org/protobuf/proto"
|
||||
)
|
||||
|
||||
type AOFCodec interface {
|
||||
Encode(method string, m protobuf.Message) ([]byte, error)
|
||||
Decode(buf []byte) (protobuf.Message, error)
|
||||
}
|
||||
|
||||
var (
|
||||
oneAOF sync.Once
|
||||
sysAOF *AOF
|
||||
)
|
||||
|
||||
const (
|
||||
defaultByteIONum = 20000
|
||||
)
|
Loading…
Reference in New Issue