diff --git a/conf/wheat-cache.yaml b/conf/wheat-cache.yaml index 33925a8..dcf83ee 100644 --- a/conf/wheat-cache.yaml +++ b/conf/wheat-cache.yaml @@ -9,7 +9,7 @@ storage: # clearSize and maxSize must be Int lruCache: - clearSize: "512MB" + clearSize: "512mb" maxSize: "1GB" eventDriverSize: 2000 workTime: 1 diff --git a/pkg/errorx/lru.go b/pkg/errorx/lru.go new file mode 100644 index 0000000..569e563 --- /dev/null +++ b/pkg/errorx/lru.go @@ -0,0 +1,5 @@ +package errorx + +func LruNotWorkFuncEventErr() error { + return New("the event haven't work of function") +} diff --git a/pkg/event/define.go b/pkg/event/define.go index 529d7e3..8921901 100644 --- a/pkg/event/define.go +++ b/pkg/event/define.go @@ -18,6 +18,7 @@ type EventWorkFunc func() (interface{}, error) type DriverInterface interface { Get() *Event Put(event *Event) + GetLength() int } type ProduceInterface interface { diff --git a/pkg/event/driver.go b/pkg/event/driver.go index b5c5636..764ff22 100644 --- a/pkg/event/driver.go +++ b/pkg/event/driver.go @@ -137,6 +137,10 @@ func (d *Driver) Put(event *Event) { d.queue <- event } +func (d *Driver) GetLength() int { + return len(d.queue) +} + // NewDriver 新建 Driver func NewDriver(maxSize int) DriverInterface { return &Driver{ diff --git a/pkg/lru/clean_work.go b/pkg/lru/clean_work.go new file mode 100644 index 0000000..228ca19 --- /dev/null +++ b/pkg/lru/clean_work.go @@ -0,0 +1,31 @@ +package lru + +import ( + "context" + "gitee.com/timedb/wheatCache/pkg/event" + "time" +) + +func (lru *SingleCache) cleanWork() { + cxt := context.Background() + for { + time.Sleep(2 * time.Second) + if lru.clearSize < lru.nowSize { + lruCleanEvent := event.NewEvent(CleanEventName) + lruCleanEvent.InitWaitEvent() + work := event.EventWorkFunc(func() (interface{}, error) { + err := lru.DelToClearSize() + return nil, err + }) + + lruCleanEvent.SetValue(WorkFuncEventKey, work) + + lru.lruCleanProduce.Call(cxt, lruCleanEvent) + _, err := lruCleanEvent.StartWaitEvent(defaultWaitTime) + if err != nil { + //logx.With(cxt, ).Error("cleanWork err: %v", err) + } + + } + } +} diff --git a/pkg/lru/define.go b/pkg/lru/define.go index 4d05af5..6b8dd2b 100644 --- a/pkg/lru/define.go +++ b/pkg/lru/define.go @@ -4,6 +4,7 @@ import ( "gitee.com/timedb/wheatCache/pkg/proto" "gitee.com/timedb/wheatCache/pkg/structure" "sync" + "time" ) type SingleWorkFunc func() interface{} @@ -20,9 +21,12 @@ var ( ) const ( - lruMaxSize = 1 * 1024 * 1024 * 1024 * 8 - lruClearSize = 0.5 * 1024 * 1024 * 1024 * 8 - lruEventDriver = 2000 + defaultLruMaxSize = 1 * 1024 * 1024 * 1024 * 8 + defaultLruClearSize = 0.5 * 1024 * 1024 * 1024 * 8 + defaultLruEventDriver = 2000 +) +const ( + defaultWaitTime = 20 * time.Minute ) type CacheInterface interface { @@ -31,4 +35,5 @@ type CacheInterface interface { Add(key *proto.BaseKey, val structure.KeyBaseInterface) UpdateLruSize(length structure.UpdateLength) DelByKey(key *proto.BaseKey) error + DelToClearSize() error } diff --git a/pkg/lru/lru.go b/pkg/lru/lru.go index 7ad8f41..29a5ae7 100644 --- a/pkg/lru/lru.go +++ b/pkg/lru/lru.go @@ -18,12 +18,12 @@ type keyBaseValue struct { } type SingleCache struct { - maxsize int64 //最大的长度 - clearSize int64 // 清理长度 - nowSize int64 // 现在的长度 - li *list.List - lruMap map[string]*list.Element - + maxsize int64 //最大的长度 + clearSize int64 // 清理长度 + nowSize int64 // 现在的长度 + li *list.List + lruMap map[string]*list.Element + lruMaxDiverSize int lruDriver event.DriverInterface lruConsumer event.ConsumerInterface lruCleanProduce event.ProduceInterface // 发送清理事件 @@ -41,7 +41,7 @@ func cacheInit() (int64, int64, int) { return 0, 0, 0 } if retMaxSize == 0 { - retMaxSize = lruMaxSize + retMaxSize = defaultLruMaxSize } clearSize := viper.GetString("lruCache.clearSize") @@ -50,12 +50,12 @@ func cacheInit() (int64, int64, int) { return 0, 0, 0 } if retClearSize == 0 { - retClearSize = lruClearSize + retClearSize = defaultLruClearSize } maxDriver := viper.GetInt("lruCache.eventDriverSize") if maxDriver == 0 { - maxDriver = lruEventDriver + maxDriver = defaultLruEventDriver } return retMaxSize, retClearSize, maxDriver } @@ -71,6 +71,7 @@ func NewLRUCache() *SingleCache { nowSize: 0, li: list.New(), lruMap: make(map[string]*list.Element), + lruMaxDiverSize: maxDriverSize, lruDriver: lruDriver, lruConsumer: event.NewConsumer(lruDriver), lruCleanProduce: event.NewProduce(lruDriver), @@ -131,7 +132,7 @@ func (lru *SingleCache) Del() error { } //DelByKey 根据key删除 -func (lru *SingleCache)DelByKey(key *proto.BaseKey) error { +func (lru *SingleCache) DelByKey(key *proto.BaseKey) error { if lru.lruMap == nil { return errorx.New("lru is nil") } @@ -140,4 +141,19 @@ func (lru *SingleCache)DelByKey(key *proto.BaseKey) error { return nil } return errorx.New("lru no this key") -} \ No newline at end of file +} + +func (lru *SingleCache) DelToClearSize() error { + if lru.lruMap == nil { + return errorx.New("lru is nil") + } + for { + if lru.nowSize > lru.clearSize { + //del自动给nowSize进行大小的改变 + lru.Del() + } else { + break + } + } + return nil +} diff --git a/pkg/lru/woker_test.go b/pkg/lru/woker_test.go index 77a6ad8..9e8a21b 100644 --- a/pkg/lru/woker_test.go +++ b/pkg/lru/woker_test.go @@ -3,6 +3,7 @@ package lru import ( "context" "gitee.com/timedb/wheatCache/pkg/event" + "gitee.com/timedb/wheatCache/pkg/logx" "gitee.com/timedb/wheatCache/pkg/proto" "gitee.com/timedb/wheatCache/pkg/structure/stringx" "github.com/stretchr/testify/require" @@ -30,3 +31,28 @@ func TestWorker(t *testing.T) { require.NoError(t, err) require.Equal(t, res, "123") } + +func TestSingleCache_DelToClearSize(t *testing.T) { + ctx := context.Background() + lru := NewLRUCache() + produce := event.NewProduce(lru.GetDriver()) + + for i := int32(20000); i >= 1; i-- { + workEvent := event.NewEvent(OptionEventName) + workEvent.SetValue(WorkFuncEventKey, event.EventWorkFunc(func() (interface{}, error) { + v1 := stringx.NewStringSingle() + key := proto.BaseKey{ + Key: string(i), + } + u := v1.Setbit(i, true) + lru.Add(&key, v1) + return u, nil + })) + workEvent.InitWaitEvent() + produce.Call(ctx, workEvent) + workEvent.StartWaitEvent(2 * time.Second) + } + + time.Sleep(5 * time.Second) + logx.Info("end size is %d", lru.nowSize) +} diff --git a/pkg/lru/worker.go b/pkg/lru/worker.go index 1120ca7..74d5705 100644 --- a/pkg/lru/worker.go +++ b/pkg/lru/worker.go @@ -15,9 +15,7 @@ func (lru *SingleCache) lruSingleWork() interface{} { case OptionEventName: workFunc, ok := workEvent.GetValue(WorkFuncEventKey) if !ok { - workEvent.ExecWorkAndSendResult(func() (interface{}, error) { - return nil, errorx.New("the event haven't work of function") - }) + workEvent.SetResultErr(errorx.LruNotWorkFuncEventErr()) continue } @@ -25,6 +23,22 @@ func (lru *SingleCache) lruSingleWork() interface{} { workEvent.ExecWorkAndSendResult(work) } case CleanEventName: + workFunc, ok := workEvent.GetValue(WorkFuncEventKey) + if !ok { + workEvent.SetResultErr(errorx.LruNotWorkFuncEventErr()) + continue + } + // 对当前的io数量进行判断 + ioNum := lru.GetDriver().GetLength() + if ioNum > lru.lruMaxDiverSize*1/2 { + lru.lruCleanProduce.Call(ctx, workEvent) + continue + } + if work, ok := workFunc.(event.EventWorkFunc); ok { + workEvent.ExecWorkAndSendResult(work) + } + default: + return errorx.New("no this name") } } } diff --git a/pkg/middle-msg/logx.go b/pkg/middle-msg/logx.go index 90b2340..1dd9fe1 100644 --- a/pkg/middle-msg/logx.go +++ b/pkg/middle-msg/logx.go @@ -3,7 +3,7 @@ package middle_msg import "time" var ( - EventNameLog = "logcontext" + EventNameLog = "log-context" ) type LogContext struct { diff --git a/pkg/middle-msg/lru.go b/pkg/middle-msg/lru.go new file mode 100644 index 0000000..b566510 --- /dev/null +++ b/pkg/middle-msg/lru.go @@ -0,0 +1,14 @@ +package middle_msg + +import "time" + +var LruCleanContextName = "lru-clean-context" + +type LruCleanContext struct { + Keys []string + BeforeCleanSize int64 + BehindCleanSize int64 + + StartTime time.Duration + EndTime time.Duration +}