From 929f931cd64f9a436633aab1c17dae76173f59d1 Mon Sep 17 00:00:00 2001 From: HuangJiaLuo <1820799930@qq.com> Date: Sat, 9 Oct 2021 10:35:03 +0800 Subject: [PATCH 01/19] feat(worker): feat get the io --- pkg/event/driver.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/pkg/event/driver.go b/pkg/event/driver.go index b5c5636..764ff22 100644 --- a/pkg/event/driver.go +++ b/pkg/event/driver.go @@ -137,6 +137,10 @@ func (d *Driver) Put(event *Event) { d.queue <- event } +func (d *Driver) GetLength() int { + return len(d.queue) +} + // NewDriver 新建 Driver func NewDriver(maxSize int) DriverInterface { return &Driver{ From c45c2682e0d24282f46239e7e18af615dff4f4c3 Mon Sep 17 00:00:00 2001 From: HuangJiaLuo <1820799930@qq.com> Date: Sat, 9 Oct 2021 10:47:17 +0800 Subject: [PATCH 02/19] feat(event): feat the get the io length --- pkg/event/define.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/event/define.go b/pkg/event/define.go index 529d7e3..8921901 100644 --- a/pkg/event/define.go +++ b/pkg/event/define.go @@ -18,6 +18,7 @@ type EventWorkFunc func() (interface{}, error) type DriverInterface interface { Get() *Event Put(event *Event) + GetLength() int } type ProduceInterface interface { From f1d2cbb0eeef734b6046737c5e5202acb8892bcc Mon Sep 17 00:00:00 2001 From: HuangJiaLuo <1820799930@qq.com> Date: Sat, 9 Oct 2021 10:55:12 +0800 Subject: [PATCH 03/19] feat(event): feat the io of judgement function --- pkg/event/define.go | 2 +- pkg/event/driver.go | 8 ++++++-- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/pkg/event/define.go b/pkg/event/define.go index 8921901..3604c9c 100644 --- a/pkg/event/define.go +++ b/pkg/event/define.go @@ -18,7 +18,7 @@ type EventWorkFunc func() (interface{}, error) type DriverInterface interface { Get() *Event Put(event *Event) - GetLength() int + GetLengthJudge() bool } type ProduceInterface interface { diff --git a/pkg/event/driver.go b/pkg/event/driver.go index 764ff22..78797ea 100644 --- a/pkg/event/driver.go +++ b/pkg/event/driver.go @@ -137,8 +137,12 @@ func (d *Driver) Put(event *Event) { d.queue <- event } -func (d *Driver) GetLength() int { - return len(d.queue) +func (d *Driver) GetLengthJudge() bool { + // 自动对当前的最大io数量进行判断 + if len(d.queue) > d.maxQueueSize * 1/2{ + return true + } + return false } // NewDriver 新建 Driver From a251cdb844a2f360087baa4cbbea5dacd008532f Mon Sep 17 00:00:00 2001 From: HuangJiaLuo <1820799930@qq.com> Date: Sat, 9 Oct 2021 22:04:47 +0800 Subject: [PATCH 04/19] feat(errorx): feat the LruNotWorkFuncEvent error --- pkg/errorx/lru.go | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 pkg/errorx/lru.go diff --git a/pkg/errorx/lru.go b/pkg/errorx/lru.go new file mode 100644 index 0000000..569e563 --- /dev/null +++ b/pkg/errorx/lru.go @@ -0,0 +1,5 @@ +package errorx + +func LruNotWorkFuncEventErr() error { + return New("the event haven't work of function") +} From 843fcd27caec2b042bb7e613e67dc4480de4fe7a Mon Sep 17 00:00:00 2001 From: HuangJiaLuo <1820799930@qq.com> Date: Sat, 9 Oct 2021 22:05:39 +0800 Subject: [PATCH 05/19] feat(event): feat the function of getting the queue length --- pkg/event/define.go | 2 +- pkg/event/driver.go | 8 ++------ 2 files changed, 3 insertions(+), 7 deletions(-) diff --git a/pkg/event/define.go b/pkg/event/define.go index 3604c9c..8921901 100644 --- a/pkg/event/define.go +++ b/pkg/event/define.go @@ -18,7 +18,7 @@ type EventWorkFunc func() (interface{}, error) type DriverInterface interface { Get() *Event Put(event *Event) - GetLengthJudge() bool + GetLength() int } type ProduceInterface interface { diff --git a/pkg/event/driver.go b/pkg/event/driver.go index 78797ea..764ff22 100644 --- a/pkg/event/driver.go +++ b/pkg/event/driver.go @@ -137,12 +137,8 @@ func (d *Driver) Put(event *Event) { d.queue <- event } -func (d *Driver) GetLengthJudge() bool { - // 自动对当前的最大io数量进行判断 - if len(d.queue) > d.maxQueueSize * 1/2{ - return true - } - return false +func (d *Driver) GetLength() int { + return len(d.queue) } // NewDriver 新建 Driver From 64b38044c8af20da3f6decda111c97c7dd885fca Mon Sep 17 00:00:00 2001 From: HuangJiaLuo <1820799930@qq.com> Date: Sat, 9 Oct 2021 22:06:38 +0800 Subject: [PATCH 06/19] feat(lru): feat the function of cleanEvent --- pkg/lru/define.go | 7 ++++--- pkg/lru/lru.go | 38 +++++++++++++++++++++++++++----------- pkg/lru/worker.go | 20 +++++++++++++++++--- 3 files changed, 48 insertions(+), 17 deletions(-) diff --git a/pkg/lru/define.go b/pkg/lru/define.go index 4d05af5..67f54a2 100644 --- a/pkg/lru/define.go +++ b/pkg/lru/define.go @@ -20,9 +20,9 @@ var ( ) const ( - lruMaxSize = 1 * 1024 * 1024 * 1024 * 8 - lruClearSize = 0.5 * 1024 * 1024 * 1024 * 8 - lruEventDriver = 2000 + defaultLruMaxSize = 1 * 1024 * 1024 * 1024 * 8 + defaultLruClearSize = 0.5 * 1024 * 1024 * 1024 * 8 + defaultLruEventDriver = 2000 ) type CacheInterface interface { @@ -31,4 +31,5 @@ type CacheInterface interface { Add(key *proto.BaseKey, val structure.KeyBaseInterface) UpdateLruSize(length structure.UpdateLength) DelByKey(key *proto.BaseKey) error + DelToClearSize() error } diff --git a/pkg/lru/lru.go b/pkg/lru/lru.go index 7ad8f41..2455a56 100644 --- a/pkg/lru/lru.go +++ b/pkg/lru/lru.go @@ -18,12 +18,12 @@ type keyBaseValue struct { } type SingleCache struct { - maxsize int64 //最大的长度 - clearSize int64 // 清理长度 - nowSize int64 // 现在的长度 - li *list.List - lruMap map[string]*list.Element - + maxsize int64 //最大的长度 + clearSize int64 // 清理长度 + nowSize int64 // 现在的长度 + li *list.List + lruMap map[string]*list.Element + lruMaxDiverSize int lruDriver event.DriverInterface lruConsumer event.ConsumerInterface lruCleanProduce event.ProduceInterface // 发送清理事件 @@ -41,7 +41,7 @@ func cacheInit() (int64, int64, int) { return 0, 0, 0 } if retMaxSize == 0 { - retMaxSize = lruMaxSize + retMaxSize = defaultLruMaxSize } clearSize := viper.GetString("lruCache.clearSize") @@ -50,12 +50,12 @@ func cacheInit() (int64, int64, int) { return 0, 0, 0 } if retClearSize == 0 { - retClearSize = lruClearSize + retClearSize = defaultLruClearSize } maxDriver := viper.GetInt("lruCache.eventDriverSize") if maxDriver == 0 { - maxDriver = lruEventDriver + maxDriver = defaultLruEventDriver } return retMaxSize, retClearSize, maxDriver } @@ -71,6 +71,7 @@ func NewLRUCache() *SingleCache { nowSize: 0, li: list.New(), lruMap: make(map[string]*list.Element), + lruMaxDiverSize: maxDriverSize, lruDriver: lruDriver, lruConsumer: event.NewConsumer(lruDriver), lruCleanProduce: event.NewProduce(lruDriver), @@ -131,7 +132,7 @@ func (lru *SingleCache) Del() error { } //DelByKey 根据key删除 -func (lru *SingleCache)DelByKey(key *proto.BaseKey) error { +func (lru *SingleCache) DelByKey(key *proto.BaseKey) error { if lru.lruMap == nil { return errorx.New("lru is nil") } @@ -140,4 +141,19 @@ func (lru *SingleCache)DelByKey(key *proto.BaseKey) error { return nil } return errorx.New("lru no this key") -} \ No newline at end of file +} + +func (lru *SingleCache) DelToClearSize() error { + if lru.lruMap == nil { + return errorx.New("lru is nil") + } + for { + if lru.nowSize > lru.clearSize*1/3 { + //del自动给nowSize进行大小的改变 + lru.Del() + } else { + break + } + } + return nil +} diff --git a/pkg/lru/worker.go b/pkg/lru/worker.go index 1120ca7..74d5705 100644 --- a/pkg/lru/worker.go +++ b/pkg/lru/worker.go @@ -15,9 +15,7 @@ func (lru *SingleCache) lruSingleWork() interface{} { case OptionEventName: workFunc, ok := workEvent.GetValue(WorkFuncEventKey) if !ok { - workEvent.ExecWorkAndSendResult(func() (interface{}, error) { - return nil, errorx.New("the event haven't work of function") - }) + workEvent.SetResultErr(errorx.LruNotWorkFuncEventErr()) continue } @@ -25,6 +23,22 @@ func (lru *SingleCache) lruSingleWork() interface{} { workEvent.ExecWorkAndSendResult(work) } case CleanEventName: + workFunc, ok := workEvent.GetValue(WorkFuncEventKey) + if !ok { + workEvent.SetResultErr(errorx.LruNotWorkFuncEventErr()) + continue + } + // 对当前的io数量进行判断 + ioNum := lru.GetDriver().GetLength() + if ioNum > lru.lruMaxDiverSize*1/2 { + lru.lruCleanProduce.Call(ctx, workEvent) + continue + } + if work, ok := workFunc.(event.EventWorkFunc); ok { + workEvent.ExecWorkAndSendResult(work) + } + default: + return errorx.New("no this name") } } } From 8a6db79fd64cb0786dea00a04cf9e77be6de7787 Mon Sep 17 00:00:00 2001 From: HuangJiaLuo <1820799930@qq.com> Date: Sat, 9 Oct 2021 22:08:09 +0800 Subject: [PATCH 07/19] feat(lru): feat the function of launching the cleanWorkFunc to driver --- pkg/lru/cleanwork.go | 28 ++++++++++++++++++++++++++++ 1 file changed, 28 insertions(+) create mode 100644 pkg/lru/cleanwork.go diff --git a/pkg/lru/cleanwork.go b/pkg/lru/cleanwork.go new file mode 100644 index 0000000..af787dc --- /dev/null +++ b/pkg/lru/cleanwork.go @@ -0,0 +1,28 @@ +package lru + +import ( + "context" + "gitee.com/timedb/wheatCache/pkg/event" + "gitee.com/timedb/wheatCache/pkg/proto" + "time" +) + +type cleanWorkSingle struct { + timeOut time.Duration + lru *SingleCache +} + +func (clean *cleanWorkSingle) cleanWork() (*proto.ReduceResponse, error) { + cxt := context.Background() + lruCleanEvent := event.NewEvent(CleanEventName) + lruCleanEvent.InitWaitEvent() + lruCleanEvent.SetValue(WorkFuncEventKey, clean.lru.DelToClearSize()) + clean.lru.lruCleanProduce.Call(cxt, lruCleanEvent) + resp, err := lruCleanEvent.StartWaitEvent(clean.timeOut) + if err != nil{ + return nil, err + } + return &proto.ReduceResponse{ + Result: resp.(string), + }, nil +} \ No newline at end of file From cf1c90442b5b0c7622b475b54ab697f24c00050c Mon Sep 17 00:00:00 2001 From: HuangJiaLuo <1820799930@qq.com> Date: Sat, 9 Oct 2021 10:35:03 +0800 Subject: [PATCH 08/19] feat(worker): feat get the io --- pkg/event/driver.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/pkg/event/driver.go b/pkg/event/driver.go index b5c5636..764ff22 100644 --- a/pkg/event/driver.go +++ b/pkg/event/driver.go @@ -137,6 +137,10 @@ func (d *Driver) Put(event *Event) { d.queue <- event } +func (d *Driver) GetLength() int { + return len(d.queue) +} + // NewDriver 新建 Driver func NewDriver(maxSize int) DriverInterface { return &Driver{ From 83bf49ec17b86724a95e5d2ea593fa9fcd90d496 Mon Sep 17 00:00:00 2001 From: HuangJiaLuo <1820799930@qq.com> Date: Sat, 9 Oct 2021 10:47:17 +0800 Subject: [PATCH 09/19] feat(event): feat the get the io length --- pkg/event/define.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/event/define.go b/pkg/event/define.go index 529d7e3..8921901 100644 --- a/pkg/event/define.go +++ b/pkg/event/define.go @@ -18,6 +18,7 @@ type EventWorkFunc func() (interface{}, error) type DriverInterface interface { Get() *Event Put(event *Event) + GetLength() int } type ProduceInterface interface { From 5275be2b8e8166f1f3f2d502185b4ae92546be85 Mon Sep 17 00:00:00 2001 From: HuangJiaLuo <1820799930@qq.com> Date: Sat, 9 Oct 2021 10:55:12 +0800 Subject: [PATCH 10/19] feat(event): feat the io of judgement function --- pkg/event/define.go | 2 +- pkg/event/driver.go | 8 ++++++-- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/pkg/event/define.go b/pkg/event/define.go index 8921901..3604c9c 100644 --- a/pkg/event/define.go +++ b/pkg/event/define.go @@ -18,7 +18,7 @@ type EventWorkFunc func() (interface{}, error) type DriverInterface interface { Get() *Event Put(event *Event) - GetLength() int + GetLengthJudge() bool } type ProduceInterface interface { diff --git a/pkg/event/driver.go b/pkg/event/driver.go index 764ff22..78797ea 100644 --- a/pkg/event/driver.go +++ b/pkg/event/driver.go @@ -137,8 +137,12 @@ func (d *Driver) Put(event *Event) { d.queue <- event } -func (d *Driver) GetLength() int { - return len(d.queue) +func (d *Driver) GetLengthJudge() bool { + // 自动对当前的最大io数量进行判断 + if len(d.queue) > d.maxQueueSize * 1/2{ + return true + } + return false } // NewDriver 新建 Driver From f119557eec22af5b36954ee14c10163cd82ed34a Mon Sep 17 00:00:00 2001 From: HuangJiaLuo <1820799930@qq.com> Date: Sat, 9 Oct 2021 22:04:47 +0800 Subject: [PATCH 11/19] feat(errorx): feat the LruNotWorkFuncEvent error --- pkg/errorx/lru.go | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 pkg/errorx/lru.go diff --git a/pkg/errorx/lru.go b/pkg/errorx/lru.go new file mode 100644 index 0000000..569e563 --- /dev/null +++ b/pkg/errorx/lru.go @@ -0,0 +1,5 @@ +package errorx + +func LruNotWorkFuncEventErr() error { + return New("the event haven't work of function") +} From 29b9c8f3b6f716ccf7284f2f2ad193460db8729a Mon Sep 17 00:00:00 2001 From: HuangJiaLuo <1820799930@qq.com> Date: Sat, 9 Oct 2021 22:05:39 +0800 Subject: [PATCH 12/19] feat(event): feat the function of getting the queue length --- pkg/event/define.go | 2 +- pkg/event/driver.go | 8 ++------ 2 files changed, 3 insertions(+), 7 deletions(-) diff --git a/pkg/event/define.go b/pkg/event/define.go index 3604c9c..8921901 100644 --- a/pkg/event/define.go +++ b/pkg/event/define.go @@ -18,7 +18,7 @@ type EventWorkFunc func() (interface{}, error) type DriverInterface interface { Get() *Event Put(event *Event) - GetLengthJudge() bool + GetLength() int } type ProduceInterface interface { diff --git a/pkg/event/driver.go b/pkg/event/driver.go index 78797ea..764ff22 100644 --- a/pkg/event/driver.go +++ b/pkg/event/driver.go @@ -137,12 +137,8 @@ func (d *Driver) Put(event *Event) { d.queue <- event } -func (d *Driver) GetLengthJudge() bool { - // 自动对当前的最大io数量进行判断 - if len(d.queue) > d.maxQueueSize * 1/2{ - return true - } - return false +func (d *Driver) GetLength() int { + return len(d.queue) } // NewDriver 新建 Driver From 2500251699d46a0a02a9b8e3dc511880ef5e4e97 Mon Sep 17 00:00:00 2001 From: HuangJiaLuo <1820799930@qq.com> Date: Sat, 9 Oct 2021 22:06:38 +0800 Subject: [PATCH 13/19] feat(lru): feat the function of cleanEvent --- pkg/lru/define.go | 7 ++++--- pkg/lru/lru.go | 38 +++++++++++++++++++++++++++----------- pkg/lru/worker.go | 20 +++++++++++++++++--- 3 files changed, 48 insertions(+), 17 deletions(-) diff --git a/pkg/lru/define.go b/pkg/lru/define.go index 4d05af5..67f54a2 100644 --- a/pkg/lru/define.go +++ b/pkg/lru/define.go @@ -20,9 +20,9 @@ var ( ) const ( - lruMaxSize = 1 * 1024 * 1024 * 1024 * 8 - lruClearSize = 0.5 * 1024 * 1024 * 1024 * 8 - lruEventDriver = 2000 + defaultLruMaxSize = 1 * 1024 * 1024 * 1024 * 8 + defaultLruClearSize = 0.5 * 1024 * 1024 * 1024 * 8 + defaultLruEventDriver = 2000 ) type CacheInterface interface { @@ -31,4 +31,5 @@ type CacheInterface interface { Add(key *proto.BaseKey, val structure.KeyBaseInterface) UpdateLruSize(length structure.UpdateLength) DelByKey(key *proto.BaseKey) error + DelToClearSize() error } diff --git a/pkg/lru/lru.go b/pkg/lru/lru.go index 7ad8f41..2455a56 100644 --- a/pkg/lru/lru.go +++ b/pkg/lru/lru.go @@ -18,12 +18,12 @@ type keyBaseValue struct { } type SingleCache struct { - maxsize int64 //最大的长度 - clearSize int64 // 清理长度 - nowSize int64 // 现在的长度 - li *list.List - lruMap map[string]*list.Element - + maxsize int64 //最大的长度 + clearSize int64 // 清理长度 + nowSize int64 // 现在的长度 + li *list.List + lruMap map[string]*list.Element + lruMaxDiverSize int lruDriver event.DriverInterface lruConsumer event.ConsumerInterface lruCleanProduce event.ProduceInterface // 发送清理事件 @@ -41,7 +41,7 @@ func cacheInit() (int64, int64, int) { return 0, 0, 0 } if retMaxSize == 0 { - retMaxSize = lruMaxSize + retMaxSize = defaultLruMaxSize } clearSize := viper.GetString("lruCache.clearSize") @@ -50,12 +50,12 @@ func cacheInit() (int64, int64, int) { return 0, 0, 0 } if retClearSize == 0 { - retClearSize = lruClearSize + retClearSize = defaultLruClearSize } maxDriver := viper.GetInt("lruCache.eventDriverSize") if maxDriver == 0 { - maxDriver = lruEventDriver + maxDriver = defaultLruEventDriver } return retMaxSize, retClearSize, maxDriver } @@ -71,6 +71,7 @@ func NewLRUCache() *SingleCache { nowSize: 0, li: list.New(), lruMap: make(map[string]*list.Element), + lruMaxDiverSize: maxDriverSize, lruDriver: lruDriver, lruConsumer: event.NewConsumer(lruDriver), lruCleanProduce: event.NewProduce(lruDriver), @@ -131,7 +132,7 @@ func (lru *SingleCache) Del() error { } //DelByKey 根据key删除 -func (lru *SingleCache)DelByKey(key *proto.BaseKey) error { +func (lru *SingleCache) DelByKey(key *proto.BaseKey) error { if lru.lruMap == nil { return errorx.New("lru is nil") } @@ -140,4 +141,19 @@ func (lru *SingleCache)DelByKey(key *proto.BaseKey) error { return nil } return errorx.New("lru no this key") -} \ No newline at end of file +} + +func (lru *SingleCache) DelToClearSize() error { + if lru.lruMap == nil { + return errorx.New("lru is nil") + } + for { + if lru.nowSize > lru.clearSize*1/3 { + //del自动给nowSize进行大小的改变 + lru.Del() + } else { + break + } + } + return nil +} diff --git a/pkg/lru/worker.go b/pkg/lru/worker.go index 1120ca7..74d5705 100644 --- a/pkg/lru/worker.go +++ b/pkg/lru/worker.go @@ -15,9 +15,7 @@ func (lru *SingleCache) lruSingleWork() interface{} { case OptionEventName: workFunc, ok := workEvent.GetValue(WorkFuncEventKey) if !ok { - workEvent.ExecWorkAndSendResult(func() (interface{}, error) { - return nil, errorx.New("the event haven't work of function") - }) + workEvent.SetResultErr(errorx.LruNotWorkFuncEventErr()) continue } @@ -25,6 +23,22 @@ func (lru *SingleCache) lruSingleWork() interface{} { workEvent.ExecWorkAndSendResult(work) } case CleanEventName: + workFunc, ok := workEvent.GetValue(WorkFuncEventKey) + if !ok { + workEvent.SetResultErr(errorx.LruNotWorkFuncEventErr()) + continue + } + // 对当前的io数量进行判断 + ioNum := lru.GetDriver().GetLength() + if ioNum > lru.lruMaxDiverSize*1/2 { + lru.lruCleanProduce.Call(ctx, workEvent) + continue + } + if work, ok := workFunc.(event.EventWorkFunc); ok { + workEvent.ExecWorkAndSendResult(work) + } + default: + return errorx.New("no this name") } } } From dcce00a32d71011ca9adf5a39d923391cff37991 Mon Sep 17 00:00:00 2001 From: HuangJiaLuo <1820799930@qq.com> Date: Sat, 9 Oct 2021 22:08:09 +0800 Subject: [PATCH 14/19] feat(lru): feat the function of launching the cleanWorkFunc to driver --- pkg/lru/cleanwork.go | 28 ++++++++++++++++++++++++++++ 1 file changed, 28 insertions(+) create mode 100644 pkg/lru/cleanwork.go diff --git a/pkg/lru/cleanwork.go b/pkg/lru/cleanwork.go new file mode 100644 index 0000000..af787dc --- /dev/null +++ b/pkg/lru/cleanwork.go @@ -0,0 +1,28 @@ +package lru + +import ( + "context" + "gitee.com/timedb/wheatCache/pkg/event" + "gitee.com/timedb/wheatCache/pkg/proto" + "time" +) + +type cleanWorkSingle struct { + timeOut time.Duration + lru *SingleCache +} + +func (clean *cleanWorkSingle) cleanWork() (*proto.ReduceResponse, error) { + cxt := context.Background() + lruCleanEvent := event.NewEvent(CleanEventName) + lruCleanEvent.InitWaitEvent() + lruCleanEvent.SetValue(WorkFuncEventKey, clean.lru.DelToClearSize()) + clean.lru.lruCleanProduce.Call(cxt, lruCleanEvent) + resp, err := lruCleanEvent.StartWaitEvent(clean.timeOut) + if err != nil{ + return nil, err + } + return &proto.ReduceResponse{ + Result: resp.(string), + }, nil +} \ No newline at end of file From 73e91b3ff00ae8f4177f1ae69edd76878ac04b88 Mon Sep 17 00:00:00 2001 From: HuangJiaLuo <1820799930@qq.com> Date: Sun, 10 Oct 2021 20:53:34 +0800 Subject: [PATCH 15/19] test --- conf/wheat-cache.yaml | 2 +- pkg/lru/clean_work.go | 32 ++++++++++++++++++++++++++++++++ pkg/lru/cleanwork.go | 28 ---------------------------- pkg/lru/lru.go | 3 ++- pkg/lru/woker_test.go | 26 ++++++++++++++++++++++++++ pkg/middle-msg/lru.go | 3 +++ 6 files changed, 64 insertions(+), 30 deletions(-) create mode 100644 pkg/lru/clean_work.go delete mode 100644 pkg/lru/cleanwork.go create mode 100644 pkg/middle-msg/lru.go diff --git a/conf/wheat-cache.yaml b/conf/wheat-cache.yaml index 33925a8..dcf83ee 100644 --- a/conf/wheat-cache.yaml +++ b/conf/wheat-cache.yaml @@ -9,7 +9,7 @@ storage: # clearSize and maxSize must be Int lruCache: - clearSize: "512MB" + clearSize: "512mb" maxSize: "1GB" eventDriverSize: 2000 workTime: 1 diff --git a/pkg/lru/clean_work.go b/pkg/lru/clean_work.go new file mode 100644 index 0000000..6b08301 --- /dev/null +++ b/pkg/lru/clean_work.go @@ -0,0 +1,32 @@ +package lru + +import ( + "context" + "gitee.com/timedb/wheatCache/pkg/event" + "gitee.com/timedb/wheatCache/pkg/logx" + "time" +) + +func (lru *SingleCache) cleanWork() { + cxt := context.Background() + for { + time.Sleep(2 * time.Second) + if lru.clearSize < lru.nowSize { + lruCleanEvent := event.NewEvent(CleanEventName) + lruCleanEvent.InitWaitEvent() + work := event.EventWorkFunc(func() (interface{}, error) { + err := lru.DelToClearSize() + return nil, err + }) + + lruCleanEvent.SetValue(WorkFuncEventKey, work) + + lru.lruCleanProduce.Call(cxt, lruCleanEvent) + _, err := lruCleanEvent.StartWaitEvent(20 * time.Minute) //常量 + if err != nil { + logx.With(cxt, ).Error("cleanWork err: %v", err) + } + + } + } +} diff --git a/pkg/lru/cleanwork.go b/pkg/lru/cleanwork.go deleted file mode 100644 index af787dc..0000000 --- a/pkg/lru/cleanwork.go +++ /dev/null @@ -1,28 +0,0 @@ -package lru - -import ( - "context" - "gitee.com/timedb/wheatCache/pkg/event" - "gitee.com/timedb/wheatCache/pkg/proto" - "time" -) - -type cleanWorkSingle struct { - timeOut time.Duration - lru *SingleCache -} - -func (clean *cleanWorkSingle) cleanWork() (*proto.ReduceResponse, error) { - cxt := context.Background() - lruCleanEvent := event.NewEvent(CleanEventName) - lruCleanEvent.InitWaitEvent() - lruCleanEvent.SetValue(WorkFuncEventKey, clean.lru.DelToClearSize()) - clean.lru.lruCleanProduce.Call(cxt, lruCleanEvent) - resp, err := lruCleanEvent.StartWaitEvent(clean.timeOut) - if err != nil{ - return nil, err - } - return &proto.ReduceResponse{ - Result: resp.(string), - }, nil -} \ No newline at end of file diff --git a/pkg/lru/lru.go b/pkg/lru/lru.go index 2455a56..e109339 100644 --- a/pkg/lru/lru.go +++ b/pkg/lru/lru.go @@ -78,6 +78,7 @@ func NewLRUCache() *SingleCache { } lruCache = lru go lru.lruSingleWork() + go lru.cleanWork() }) return lruCache } @@ -148,7 +149,7 @@ func (lru *SingleCache) DelToClearSize() error { return errorx.New("lru is nil") } for { - if lru.nowSize > lru.clearSize*1/3 { + if lru.nowSize > lru.clearSize { //del自动给nowSize进行大小的改变 lru.Del() } else { diff --git a/pkg/lru/woker_test.go b/pkg/lru/woker_test.go index 77a6ad8..9e8a21b 100644 --- a/pkg/lru/woker_test.go +++ b/pkg/lru/woker_test.go @@ -3,6 +3,7 @@ package lru import ( "context" "gitee.com/timedb/wheatCache/pkg/event" + "gitee.com/timedb/wheatCache/pkg/logx" "gitee.com/timedb/wheatCache/pkg/proto" "gitee.com/timedb/wheatCache/pkg/structure/stringx" "github.com/stretchr/testify/require" @@ -30,3 +31,28 @@ func TestWorker(t *testing.T) { require.NoError(t, err) require.Equal(t, res, "123") } + +func TestSingleCache_DelToClearSize(t *testing.T) { + ctx := context.Background() + lru := NewLRUCache() + produce := event.NewProduce(lru.GetDriver()) + + for i := int32(20000); i >= 1; i-- { + workEvent := event.NewEvent(OptionEventName) + workEvent.SetValue(WorkFuncEventKey, event.EventWorkFunc(func() (interface{}, error) { + v1 := stringx.NewStringSingle() + key := proto.BaseKey{ + Key: string(i), + } + u := v1.Setbit(i, true) + lru.Add(&key, v1) + return u, nil + })) + workEvent.InitWaitEvent() + produce.Call(ctx, workEvent) + workEvent.StartWaitEvent(2 * time.Second) + } + + time.Sleep(5 * time.Second) + logx.Info("end size is %d", lru.nowSize) +} diff --git a/pkg/middle-msg/lru.go b/pkg/middle-msg/lru.go new file mode 100644 index 0000000..b5aa092 --- /dev/null +++ b/pkg/middle-msg/lru.go @@ -0,0 +1,3 @@ +package middle_msg + + From 922fae0f781ed69b263c02cfd51f7dda43810da4 Mon Sep 17 00:00:00 2001 From: HuangJiaLuo <1820799930@qq.com> Date: Mon, 11 Oct 2021 19:01:24 +0800 Subject: [PATCH 16/19] feat(lru): update lru clean work --- pkg/lru/clean_work.go | 5 ++--- pkg/lru/define.go | 5 ++++- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/pkg/lru/clean_work.go b/pkg/lru/clean_work.go index 6b08301..228ca19 100644 --- a/pkg/lru/clean_work.go +++ b/pkg/lru/clean_work.go @@ -3,7 +3,6 @@ package lru import ( "context" "gitee.com/timedb/wheatCache/pkg/event" - "gitee.com/timedb/wheatCache/pkg/logx" "time" ) @@ -22,9 +21,9 @@ func (lru *SingleCache) cleanWork() { lruCleanEvent.SetValue(WorkFuncEventKey, work) lru.lruCleanProduce.Call(cxt, lruCleanEvent) - _, err := lruCleanEvent.StartWaitEvent(20 * time.Minute) //常量 + _, err := lruCleanEvent.StartWaitEvent(defaultWaitTime) if err != nil { - logx.With(cxt, ).Error("cleanWork err: %v", err) + //logx.With(cxt, ).Error("cleanWork err: %v", err) } } diff --git a/pkg/lru/define.go b/pkg/lru/define.go index 67f54a2..a5bb2ae 100644 --- a/pkg/lru/define.go +++ b/pkg/lru/define.go @@ -4,6 +4,7 @@ import ( "gitee.com/timedb/wheatCache/pkg/proto" "gitee.com/timedb/wheatCache/pkg/structure" "sync" + "time" ) type SingleWorkFunc func() interface{} @@ -24,7 +25,9 @@ const ( defaultLruClearSize = 0.5 * 1024 * 1024 * 1024 * 8 defaultLruEventDriver = 2000 ) - +const ( + defaultWaitTime = 20 * time.Minute +) type CacheInterface interface { Del() error Get(key *proto.BaseKey) (structure.KeyBaseInterface, bool) From b39899e5d6e3b76a1c0429a54e01a650310970b3 Mon Sep 17 00:00:00 2001 From: HuangJiaLuo <1820799930@qq.com> Date: Mon, 11 Oct 2021 19:02:01 +0800 Subject: [PATCH 17/19] feat(middle-msg): update lru msg --- pkg/middle-msg/logx.go | 2 +- pkg/middle-msg/lru.go | 11 +++++++++++ 2 files changed, 12 insertions(+), 1 deletion(-) diff --git a/pkg/middle-msg/logx.go b/pkg/middle-msg/logx.go index 90b2340..1dd9fe1 100644 --- a/pkg/middle-msg/logx.go +++ b/pkg/middle-msg/logx.go @@ -3,7 +3,7 @@ package middle_msg import "time" var ( - EventNameLog = "logcontext" + EventNameLog = "log-context" ) type LogContext struct { diff --git a/pkg/middle-msg/lru.go b/pkg/middle-msg/lru.go index b5aa092..b566510 100644 --- a/pkg/middle-msg/lru.go +++ b/pkg/middle-msg/lru.go @@ -1,3 +1,14 @@ package middle_msg +import "time" +var LruCleanContextName = "lru-clean-context" + +type LruCleanContext struct { + Keys []string + BeforeCleanSize int64 + BehindCleanSize int64 + + StartTime time.Duration + EndTime time.Duration +} From 5459c7710c7cc6acb93e83d3475d11777502e20d Mon Sep 17 00:00:00 2001 From: K-on <1820799930@qq.com> Date: Mon, 11 Oct 2021 11:09:59 +0000 Subject: [PATCH 18/19] =?UTF-8?q?=E5=88=A0=E9=99=A4=E6=96=87=E4=BB=B6=20pk?= =?UTF-8?q?g/lru/cleanwork.go?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pkg/lru/cleanwork.go | 28 ---------------------------- 1 file changed, 28 deletions(-) delete mode 100644 pkg/lru/cleanwork.go diff --git a/pkg/lru/cleanwork.go b/pkg/lru/cleanwork.go deleted file mode 100644 index af787dc..0000000 --- a/pkg/lru/cleanwork.go +++ /dev/null @@ -1,28 +0,0 @@ -package lru - -import ( - "context" - "gitee.com/timedb/wheatCache/pkg/event" - "gitee.com/timedb/wheatCache/pkg/proto" - "time" -) - -type cleanWorkSingle struct { - timeOut time.Duration - lru *SingleCache -} - -func (clean *cleanWorkSingle) cleanWork() (*proto.ReduceResponse, error) { - cxt := context.Background() - lruCleanEvent := event.NewEvent(CleanEventName) - lruCleanEvent.InitWaitEvent() - lruCleanEvent.SetValue(WorkFuncEventKey, clean.lru.DelToClearSize()) - clean.lru.lruCleanProduce.Call(cxt, lruCleanEvent) - resp, err := lruCleanEvent.StartWaitEvent(clean.timeOut) - if err != nil{ - return nil, err - } - return &proto.ReduceResponse{ - Result: resp.(string), - }, nil -} \ No newline at end of file From d2866256dd4be0a339125c1dbf4e7de8523acbe2 Mon Sep 17 00:00:00 2001 From: K-on <1820799930@qq.com> Date: Mon, 11 Oct 2021 11:19:11 +0000 Subject: [PATCH 19/19] update pkg/lru/define.go. --- pkg/lru/define.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pkg/lru/define.go b/pkg/lru/define.go index 5198552..6b8dd2b 100644 --- a/pkg/lru/define.go +++ b/pkg/lru/define.go @@ -28,10 +28,12 @@ const ( const ( defaultWaitTime = 20 * time.Minute ) + type CacheInterface interface { Del() error Get(key *proto.BaseKey) (structure.KeyBaseInterface, bool) Add(key *proto.BaseKey, val structure.KeyBaseInterface) UpdateLruSize(length structure.UpdateLength) DelByKey(key *proto.BaseKey) error + DelToClearSize() error }