diff --git a/storage/persisted/aof.go b/storage/persisted/aof.go index 7a6d7df..60b6cd4 100644 --- a/storage/persisted/aof.go +++ b/storage/persisted/aof.go @@ -73,11 +73,12 @@ func (a *AOF) flushLog() { select { case <-flushTc.C: flush() + atomic.SwapInt32(a.ioNum, 0) case <-frequencyTc.C: if num := atomic.LoadInt32(a.ioNum); num >= a.ioFrequency { flush() - atomic.SwapInt32(a.ioNum, 0) } + atomic.SwapInt32(a.ioNum, 0) } } diff --git a/storage/service/define.go b/storage/service/define.go index a6bdfaa..b8a23dd 100644 --- a/storage/service/define.go +++ b/storage/service/define.go @@ -9,4 +9,6 @@ var ( sysSingleService *singleService ) -const timeOutDefault = 2 +const ( + timeOutDefault = 2 +) diff --git a/storage/service/single.go b/storage/service/single.go index 11b59f9..dc8ac99 100644 --- a/storage/service/single.go +++ b/storage/service/single.go @@ -5,8 +5,10 @@ import ( "gitee.com/timedb/wheatCache/pkg/event" "gitee.com/timedb/wheatCache/pkg/lru" + "gitee.com/timedb/wheatCache/pkg/middle" "gitee.com/timedb/wheatCache/pkg/proto" "gitee.com/timedb/wheatCache/storage/dao" + "gitee.com/timedb/wheatCache/storage/persisted" "github.com/spf13/viper" ) @@ -16,6 +18,28 @@ type singleService struct { timeOut time.Duration lruCache *lru.SingleCache dao dao.Interface + aof *persisted.AOF +} + +func loadAOF() *persisted.AOF { + decName := viper.GetString("storage.aof-codec") + var aofCodec persisted.AOFCodec + switch decName { + case "": + return nil + case "b16": + aofCodec = nil + default: + aofCodec = nil + } + + aofPath := viper.GetString("storage.aof-path") + aofFlush := viper.GetInt("storage.aof-flush-time") + aofCheckTime := viper.GetInt("storage.aof-check-time") + aofCheckFreq := viper.GetInt("storage.aof-check-freq") + + return persisted.NewAOF(aofCodec, aofPath, time.Second*time.Duration(aofFlush), + aofCheckFreq, time.Second*time.Duration(aofCheckTime)) } func NewSingleServer() proto.CommServerServer { @@ -27,11 +51,16 @@ func NewSingleServer() proto.CommServerServer { lruCache := lru.NewLRUCache() + // 加载 aof + aof := loadAOF() + ser := &singleService{ - lruCache: lruCache, - lruProduce: event.NewProduce(lruCache.GetDriver()), - timeOut: time.Duration(timeOut) * time.Second, - dao: dao.NewDao(lruCache), + lruCache: lruCache, + lruProduce: event.NewProduce(lruCache.GetDriver()), + timeOut: time.Duration(timeOut) * time.Second, + dao: dao.NewDao(lruCache), + middleProduce: event.NewProduce(middle.NewMiddleWare().GetEventDriver()), + aof: aof, } sysSingleService = ser