From 929f931cd64f9a436633aab1c17dae76173f59d1 Mon Sep 17 00:00:00 2001 From: HuangJiaLuo <1820799930@qq.com> Date: Sat, 9 Oct 2021 10:35:03 +0800 Subject: [PATCH 1/7] feat(worker): feat get the io --- pkg/event/driver.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/pkg/event/driver.go b/pkg/event/driver.go index b5c5636..764ff22 100644 --- a/pkg/event/driver.go +++ b/pkg/event/driver.go @@ -137,6 +137,10 @@ func (d *Driver) Put(event *Event) { d.queue <- event } +func (d *Driver) GetLength() int { + return len(d.queue) +} + // NewDriver 新建 Driver func NewDriver(maxSize int) DriverInterface { return &Driver{ From c45c2682e0d24282f46239e7e18af615dff4f4c3 Mon Sep 17 00:00:00 2001 From: HuangJiaLuo <1820799930@qq.com> Date: Sat, 9 Oct 2021 10:47:17 +0800 Subject: [PATCH 2/7] feat(event): feat the get the io length --- pkg/event/define.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/event/define.go b/pkg/event/define.go index 529d7e3..8921901 100644 --- a/pkg/event/define.go +++ b/pkg/event/define.go @@ -18,6 +18,7 @@ type EventWorkFunc func() (interface{}, error) type DriverInterface interface { Get() *Event Put(event *Event) + GetLength() int } type ProduceInterface interface { From f1d2cbb0eeef734b6046737c5e5202acb8892bcc Mon Sep 17 00:00:00 2001 From: HuangJiaLuo <1820799930@qq.com> Date: Sat, 9 Oct 2021 10:55:12 +0800 Subject: [PATCH 3/7] feat(event): feat the io of judgement function --- pkg/event/define.go | 2 +- pkg/event/driver.go | 8 ++++++-- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/pkg/event/define.go b/pkg/event/define.go index 8921901..3604c9c 100644 --- a/pkg/event/define.go +++ b/pkg/event/define.go @@ -18,7 +18,7 @@ type EventWorkFunc func() (interface{}, error) type DriverInterface interface { Get() *Event Put(event *Event) - GetLength() int + GetLengthJudge() bool } type ProduceInterface interface { diff --git a/pkg/event/driver.go b/pkg/event/driver.go index 764ff22..78797ea 100644 --- a/pkg/event/driver.go +++ b/pkg/event/driver.go @@ -137,8 +137,12 @@ func (d *Driver) Put(event *Event) { d.queue <- event } -func (d *Driver) GetLength() int { - return len(d.queue) +func (d *Driver) GetLengthJudge() bool { + // 自动对当前的最大io数量进行判断 + if len(d.queue) > d.maxQueueSize * 1/2{ + return true + } + return false } // NewDriver 新建 Driver From a251cdb844a2f360087baa4cbbea5dacd008532f Mon Sep 17 00:00:00 2001 From: HuangJiaLuo <1820799930@qq.com> Date: Sat, 9 Oct 2021 22:04:47 +0800 Subject: [PATCH 4/7] feat(errorx): feat the LruNotWorkFuncEvent error --- pkg/errorx/lru.go | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 pkg/errorx/lru.go diff --git a/pkg/errorx/lru.go b/pkg/errorx/lru.go new file mode 100644 index 0000000..569e563 --- /dev/null +++ b/pkg/errorx/lru.go @@ -0,0 +1,5 @@ +package errorx + +func LruNotWorkFuncEventErr() error { + return New("the event haven't work of function") +} From 843fcd27caec2b042bb7e613e67dc4480de4fe7a Mon Sep 17 00:00:00 2001 From: HuangJiaLuo <1820799930@qq.com> Date: Sat, 9 Oct 2021 22:05:39 +0800 Subject: [PATCH 5/7] feat(event): feat the function of getting the queue length --- pkg/event/define.go | 2 +- pkg/event/driver.go | 8 ++------ 2 files changed, 3 insertions(+), 7 deletions(-) diff --git a/pkg/event/define.go b/pkg/event/define.go index 3604c9c..8921901 100644 --- a/pkg/event/define.go +++ b/pkg/event/define.go @@ -18,7 +18,7 @@ type EventWorkFunc func() (interface{}, error) type DriverInterface interface { Get() *Event Put(event *Event) - GetLengthJudge() bool + GetLength() int } type ProduceInterface interface { diff --git a/pkg/event/driver.go b/pkg/event/driver.go index 78797ea..764ff22 100644 --- a/pkg/event/driver.go +++ b/pkg/event/driver.go @@ -137,12 +137,8 @@ func (d *Driver) Put(event *Event) { d.queue <- event } -func (d *Driver) GetLengthJudge() bool { - // 自动对当前的最大io数量进行判断 - if len(d.queue) > d.maxQueueSize * 1/2{ - return true - } - return false +func (d *Driver) GetLength() int { + return len(d.queue) } // NewDriver 新建 Driver From 64b38044c8af20da3f6decda111c97c7dd885fca Mon Sep 17 00:00:00 2001 From: HuangJiaLuo <1820799930@qq.com> Date: Sat, 9 Oct 2021 22:06:38 +0800 Subject: [PATCH 6/7] feat(lru): feat the function of cleanEvent --- pkg/lru/define.go | 7 ++++--- pkg/lru/lru.go | 38 +++++++++++++++++++++++++++----------- pkg/lru/worker.go | 20 +++++++++++++++++--- 3 files changed, 48 insertions(+), 17 deletions(-) diff --git a/pkg/lru/define.go b/pkg/lru/define.go index 4d05af5..67f54a2 100644 --- a/pkg/lru/define.go +++ b/pkg/lru/define.go @@ -20,9 +20,9 @@ var ( ) const ( - lruMaxSize = 1 * 1024 * 1024 * 1024 * 8 - lruClearSize = 0.5 * 1024 * 1024 * 1024 * 8 - lruEventDriver = 2000 + defaultLruMaxSize = 1 * 1024 * 1024 * 1024 * 8 + defaultLruClearSize = 0.5 * 1024 * 1024 * 1024 * 8 + defaultLruEventDriver = 2000 ) type CacheInterface interface { @@ -31,4 +31,5 @@ type CacheInterface interface { Add(key *proto.BaseKey, val structure.KeyBaseInterface) UpdateLruSize(length structure.UpdateLength) DelByKey(key *proto.BaseKey) error + DelToClearSize() error } diff --git a/pkg/lru/lru.go b/pkg/lru/lru.go index 7ad8f41..2455a56 100644 --- a/pkg/lru/lru.go +++ b/pkg/lru/lru.go @@ -18,12 +18,12 @@ type keyBaseValue struct { } type SingleCache struct { - maxsize int64 //最大的长度 - clearSize int64 // 清理长度 - nowSize int64 // 现在的长度 - li *list.List - lruMap map[string]*list.Element - + maxsize int64 //最大的长度 + clearSize int64 // 清理长度 + nowSize int64 // 现在的长度 + li *list.List + lruMap map[string]*list.Element + lruMaxDiverSize int lruDriver event.DriverInterface lruConsumer event.ConsumerInterface lruCleanProduce event.ProduceInterface // 发送清理事件 @@ -41,7 +41,7 @@ func cacheInit() (int64, int64, int) { return 0, 0, 0 } if retMaxSize == 0 { - retMaxSize = lruMaxSize + retMaxSize = defaultLruMaxSize } clearSize := viper.GetString("lruCache.clearSize") @@ -50,12 +50,12 @@ func cacheInit() (int64, int64, int) { return 0, 0, 0 } if retClearSize == 0 { - retClearSize = lruClearSize + retClearSize = defaultLruClearSize } maxDriver := viper.GetInt("lruCache.eventDriverSize") if maxDriver == 0 { - maxDriver = lruEventDriver + maxDriver = defaultLruEventDriver } return retMaxSize, retClearSize, maxDriver } @@ -71,6 +71,7 @@ func NewLRUCache() *SingleCache { nowSize: 0, li: list.New(), lruMap: make(map[string]*list.Element), + lruMaxDiverSize: maxDriverSize, lruDriver: lruDriver, lruConsumer: event.NewConsumer(lruDriver), lruCleanProduce: event.NewProduce(lruDriver), @@ -131,7 +132,7 @@ func (lru *SingleCache) Del() error { } //DelByKey 根据key删除 -func (lru *SingleCache)DelByKey(key *proto.BaseKey) error { +func (lru *SingleCache) DelByKey(key *proto.BaseKey) error { if lru.lruMap == nil { return errorx.New("lru is nil") } @@ -140,4 +141,19 @@ func (lru *SingleCache)DelByKey(key *proto.BaseKey) error { return nil } return errorx.New("lru no this key") -} \ No newline at end of file +} + +func (lru *SingleCache) DelToClearSize() error { + if lru.lruMap == nil { + return errorx.New("lru is nil") + } + for { + if lru.nowSize > lru.clearSize*1/3 { + //del自动给nowSize进行大小的改变 + lru.Del() + } else { + break + } + } + return nil +} diff --git a/pkg/lru/worker.go b/pkg/lru/worker.go index 1120ca7..74d5705 100644 --- a/pkg/lru/worker.go +++ b/pkg/lru/worker.go @@ -15,9 +15,7 @@ func (lru *SingleCache) lruSingleWork() interface{} { case OptionEventName: workFunc, ok := workEvent.GetValue(WorkFuncEventKey) if !ok { - workEvent.ExecWorkAndSendResult(func() (interface{}, error) { - return nil, errorx.New("the event haven't work of function") - }) + workEvent.SetResultErr(errorx.LruNotWorkFuncEventErr()) continue } @@ -25,6 +23,22 @@ func (lru *SingleCache) lruSingleWork() interface{} { workEvent.ExecWorkAndSendResult(work) } case CleanEventName: + workFunc, ok := workEvent.GetValue(WorkFuncEventKey) + if !ok { + workEvent.SetResultErr(errorx.LruNotWorkFuncEventErr()) + continue + } + // 对当前的io数量进行判断 + ioNum := lru.GetDriver().GetLength() + if ioNum > lru.lruMaxDiverSize*1/2 { + lru.lruCleanProduce.Call(ctx, workEvent) + continue + } + if work, ok := workFunc.(event.EventWorkFunc); ok { + workEvent.ExecWorkAndSendResult(work) + } + default: + return errorx.New("no this name") } } } From 8a6db79fd64cb0786dea00a04cf9e77be6de7787 Mon Sep 17 00:00:00 2001 From: HuangJiaLuo <1820799930@qq.com> Date: Sat, 9 Oct 2021 22:08:09 +0800 Subject: [PATCH 7/7] feat(lru): feat the function of launching the cleanWorkFunc to driver --- pkg/lru/cleanwork.go | 28 ++++++++++++++++++++++++++++ 1 file changed, 28 insertions(+) create mode 100644 pkg/lru/cleanwork.go diff --git a/pkg/lru/cleanwork.go b/pkg/lru/cleanwork.go new file mode 100644 index 0000000..af787dc --- /dev/null +++ b/pkg/lru/cleanwork.go @@ -0,0 +1,28 @@ +package lru + +import ( + "context" + "gitee.com/timedb/wheatCache/pkg/event" + "gitee.com/timedb/wheatCache/pkg/proto" + "time" +) + +type cleanWorkSingle struct { + timeOut time.Duration + lru *SingleCache +} + +func (clean *cleanWorkSingle) cleanWork() (*proto.ReduceResponse, error) { + cxt := context.Background() + lruCleanEvent := event.NewEvent(CleanEventName) + lruCleanEvent.InitWaitEvent() + lruCleanEvent.SetValue(WorkFuncEventKey, clean.lru.DelToClearSize()) + clean.lru.lruCleanProduce.Call(cxt, lruCleanEvent) + resp, err := lruCleanEvent.StartWaitEvent(clean.timeOut) + if err != nil{ + return nil, err + } + return &proto.ReduceResponse{ + Result: resp.(string), + }, nil +} \ No newline at end of file