!63 完成 全部 single lru

Merge pull request !63 from bandl/feat-lru-ttl
This commit is contained in:
bandl 2021-10-12 13:18:32 +00:00 committed by Gitee
commit ef5c088a49
15 changed files with 733 additions and 193 deletions

View File

@ -13,17 +13,16 @@ lruCache:
maxSize: "1GB"
eventDriverSize: 2000
workTime: 1
detachNum: 300
logPrint:
stath: [
"debug",
"error"
]
stath: [ "debug", "error" ]
middleware-driver:
driverCount: 1000
middleConsumerCount: 5
plugins-control:
logcontext: ["logMiddle"]
logcontext: [ "logMiddle" ]

View File

@ -1,31 +0,0 @@
package lru
import (
"context"
"gitee.com/timedb/wheatCache/pkg/event"
"time"
)
func (lru *SingleCache) cleanWork() {
cxt := context.Background()
for {
time.Sleep(2 * time.Second)
if lru.clearSize < lru.nowSize {
lruCleanEvent := event.NewEvent(CleanEventName)
lruCleanEvent.InitWaitEvent()
work := event.EventWorkFunc(func() (interface{}, error) {
err := lru.DelToClearSize()
return nil, err
})
lruCleanEvent.SetValue(WorkFuncEventKey, work)
lru.lruCleanProduce.Call(cxt, lruCleanEvent)
_, err := lruCleanEvent.StartWaitEvent(defaultWaitTime)
if err != nil {
//logx.With(cxt, ).Error("cleanWork err: %v", err)
}
}
}
}

View File

@ -12,6 +12,7 @@ type SingleWorkFunc func() interface{}
const (
OptionEventName = "operateEvent"
CleanEventName = "clearEvent"
TtlEventName = "ttlEvent"
WorkFuncEventKey = "workFunc"
)
@ -35,5 +36,11 @@ type CacheInterface interface {
Add(key *proto.BaseKey, val structure.KeyBaseInterface)
UpdateLruSize(length structure.UpdateLength)
DelByKey(key *proto.BaseKey) error
DelToClearSize() error
DelToClearSize() error
}
// TTL
const (
defaultDetachNum = 300
defaultTtlMaxLevel = 18
)

View File

@ -2,19 +2,22 @@ package lru
import (
"container/list"
"sync/atomic"
_ "gitee.com/timedb/wheatCache/conf"
"gitee.com/timedb/wheatCache/pkg/errorx"
"gitee.com/timedb/wheatCache/pkg/event"
"gitee.com/timedb/wheatCache/pkg/middle"
"gitee.com/timedb/wheatCache/pkg/proto"
"gitee.com/timedb/wheatCache/pkg/structure"
"gitee.com/timedb/wheatCache/pkg/util"
"github.com/spf13/viper"
"sync/atomic"
)
type keyBaseValue struct {
key string
val structure.KeyBaseInterface
key string
val structure.KeyBaseInterface
expire int64 // 过期时间戳
}
type SingleCache struct {
@ -24,9 +27,13 @@ type SingleCache struct {
li *list.List
lruMap map[string]*list.Element
lruMaxDiverSize int
lruTtlManage *lruTTl // 定时清理器
lruDriver event.DriverInterface
lruConsumer event.ConsumerInterface
lruCleanProduce event.ProduceInterface // 发送清理事件
middleProduce event.ProduceInterface // 中间件驱动
}
// UpdateLruSize 更新现在的长度
@ -34,11 +41,11 @@ func (lru *SingleCache) UpdateLruSize(length structure.UpdateLength) {
atomic.AddInt64(&lru.nowSize, int64(length))
}
func cacheInit() (int64, int64, int) {
func cacheInit() (int64, int64, int, int) {
maxSize := viper.GetString("lruCache.maxSize")
retMaxSize, maxErr := util.ParseSizeToBit(maxSize)
if maxErr != nil {
return 0, 0, 0
return 0, 0, 0, 0
}
if retMaxSize == 0 {
retMaxSize = defaultLruMaxSize
@ -47,7 +54,7 @@ func cacheInit() (int64, int64, int) {
clearSize := viper.GetString("lruCache.clearSize")
retClearSize, clearErr := util.ParseSizeToBit(clearSize)
if clearErr != nil {
return 0, 0, 0
return 0, 0, 0, 0
}
if retClearSize == 0 {
retClearSize = defaultLruClearSize
@ -57,12 +64,18 @@ func cacheInit() (int64, int64, int) {
if maxDriver == 0 {
maxDriver = defaultLruEventDriver
}
return retMaxSize, retClearSize, maxDriver
detachNum := viper.GetInt("lruCache.detachNum")
if detachNum == 0 {
detachNum = defaultDetachNum
}
return retMaxSize, retClearSize, maxDriver, detachNum
}
// NewLRUCache lru初始化
func NewLRUCache() *SingleCache {
maxSize, clearSize, maxDriverSize := cacheInit()
maxSize, clearSize, maxDriverSize, detachNum := cacheInit()
lruDriver := event.NewDriver(maxDriverSize)
lruCacheOnce.Do(func() {
lru := &SingleCache{
@ -75,9 +88,16 @@ func NewLRUCache() *SingleCache {
lruDriver: lruDriver,
lruConsumer: event.NewConsumer(lruDriver),
lruCleanProduce: event.NewProduce(lruDriver),
middleProduce: event.NewProduce(middle.NewMiddleWare().GetEventDriver()),
lruTtlManage: newLruTTl(detachNum),
}
lruCache = lru
// 启动 lru 事件驱动
go lru.lruSingleWork()
go lru.lruTtlWork()
go lru.cleanWork()
})
return lruCache
}
@ -90,9 +110,11 @@ func (lru *SingleCache) GetDriver() event.DriverInterface {
//Add 增加
func (lru *SingleCache) Add(key *proto.BaseKey, val structure.KeyBaseInterface) {
exp := lru.lruTtlManage.setKeys(key)
keyBaseVal := &keyBaseValue{
key: key.Key,
val: val,
key: key.Key,
val: val,
expire: exp,
}
if elVal, ok := lru.lruMap[key.Key]; ok {
lru.li.MoveToFront(elVal)
@ -108,9 +130,6 @@ func (lru *SingleCache) Add(key *proto.BaseKey, val structure.KeyBaseInterface)
// Get 查找key对应的value
func (lru *SingleCache) Get(key *proto.BaseKey) (structure.KeyBaseInterface, bool) {
if lru.lruMap == nil {
return nil, false
}
if elVal, ok := lru.lruMap[key.Key]; ok {
lru.li.MoveToFront(elVal)
return elVal.Value.(*keyBaseValue).val, true
@ -136,24 +155,49 @@ func (lru *SingleCache) DelByKey(key *proto.BaseKey) error {
if lru.lruMap == nil {
return errorx.New("lru is nil")
}
if _, ok := lru.lruMap[key.Key]; ok {
if el, ok := lru.lruMap[key.Key]; ok {
delete(lru.lruMap, key.Key)
lru.li.Remove(el)
lru.UpdateLruSize(structure.UpdateLength(-1 * el.Value.(*keyBaseValue).val.SizeByte()))
return nil
}
return errorx.New("lru no this key")
}
//DelByKeyAndExTtl 根据key(string)删除已经过期的 key
func (lru *SingleCache) delByKeyAndExTtl(key string, beforeTime int64) {
if elVal, ok := lru.lruMap[key]; ok {
exp := elVal.Value.(*keyBaseValue).expire
if exp <= beforeTime {
delete(lru.lruMap, key)
lru.li.Remove(elVal)
lru.UpdateLruSize(structure.UpdateLength(-1 * elVal.Value.(*keyBaseValue).val.SizeByte()))
}
}
}
func (lru *SingleCache) DelToClearSize() error {
if lru.lruMap == nil {
return errorx.New("lru is nil")
}
for {
if lru.nowSize > lru.clearSize {
//del自动给nowSize进行大小的改变
lru.Del()
} else {
break
for lru.nowSize > lru.clearSize {
//del自动给nowSize进行大小的改变
err := lru.Del()
if err != nil {
return err
}
}
return nil
}
// 更新过期时间
func (lru *SingleCache) UpdateTTl(key *proto.BaseKey) error {
if elVal, ok := lru.lruMap[key.Key]; ok {
expire := lru.lruTtlManage.setKeys(key)
elVal.Value.(*keyBaseValue).expire = expire
}
return errorx.New("the key is not in lru cache, key:%s", key.Key)
}

View File

@ -2,10 +2,12 @@ package lru
import (
"fmt"
"testing"
"time"
"gitee.com/timedb/wheatCache/pkg/proto"
"gitee.com/timedb/wheatCache/pkg/structure/stringx"
"github.com/stretchr/testify/require"
"testing"
)
func TestNewLRUCache(t *testing.T) {
@ -55,4 +57,46 @@ func TestNewLRUCache2(t *testing.T) {
_, ok := cache.Get(&key1)
require.Equal(t, ok, false)
require.Error(t, cache.DelByKey(&key1))
}
}
func TestLruProcess(t *testing.T) {
lru := NewLRUCache()
lru.clearSize = 1000
for i := 100; i < 200; i++ {
lru.Add(&proto.BaseKey{
Key: fmt.Sprint(i),
Ttl: 20 << 2,
}, stringx.NewStringSingle())
}
// mock LruKey
for i := 0; i < 100; i++ {
lru.Add(&proto.BaseKey{
Key: fmt.Sprint(i),
Ttl: 4,
}, stringx.NewStringSingle())
}
require.Equal(t, lru.nowSize, int64(200*8))
// 自动清理测试
time.Sleep(3 * time.Second)
fmt.Println(lru.nowSize)
require.Less(t, lru.nowSize, lru.clearSize+1)
// TTL 测试
time.Sleep(2 * time.Second)
require.Equal(t, lru.li.Len(), 25)
// 过期全部的 Key
for i := 100; i < 200; i++ {
lru.UpdateTTl(&proto.BaseKey{
Key: fmt.Sprint(i),
Ttl: -1,
})
}
time.Sleep(2 * time.Second)
require.Equal(t, lru.nowSize, int64(0))
}

51
pkg/lru/ttl.go Normal file
View File

@ -0,0 +1,51 @@
package lru
import (
"sync"
"time"
"gitee.com/timedb/wheatCache/pkg/proto"
"gitee.com/timedb/wheatCache/pkg/util/skiplist"
)
// lru 的 ttl 管理器
type lruTTl struct {
sk *skiplist.SkipList
memoryKey chan string // 缓存过期的 key
detachNum int // 每次移除的数量
mu sync.Mutex
}
func (l *lruTTl) setKeys(key *proto.BaseKey) int64 {
l.mu.Lock()
defer l.mu.Unlock()
ttlTime := time.Now().Unix()
if key.Expire != nil {
ttlTime = key.Expire.GetSeconds()
}
ttlTime += key.GetTtl()
l.sk.Insert(float64(ttlTime), key.GetKey())
return ttlTime
}
// 加载过期的 Key 到 Memory
func (l *lruTTl) ttlKeyToMemoryBySecond() {
t := time.Now()
values := l.sk.PopLeft(float64(t.Unix()))
for _, val := range values {
l.memoryKey <- val.(string)
}
}
func newLruTTl(detachNum int) *lruTTl {
return &lruTTl{
sk: skiplist.NewSkipList(defaultTtlMaxLevel),
// 默认 10000 个 Key
memoryKey: make(chan string, 10000),
detachNum: detachNum,
}
}

35
pkg/lru/ttl_test.go Normal file
View File

@ -0,0 +1,35 @@
package lru
import (
"fmt"
"gitee.com/timedb/wheatCache/pkg/proto"
"gitee.com/timedb/wheatCache/pkg/structure/stringx"
"github.com/stretchr/testify/require"
"testing"
"time"
)
func TestTTlCup(t *testing.T) {
k := make([]string, 100, 3000)
fmt.Println(cap(k))
p := k[:50]
fmt.Println(cap(p))
}
func Test_LruTTl(t *testing.T) {
lru := NewLRUCache()
s := stringx.NewStringSingle()
lru.Add(&proto.BaseKey{
Key: "k8s",
Ttl: 1,
}, s)
lru.Add(&proto.BaseKey{
Key: "990",
Ttl: 10,
}, s)
require.Equal(t, lru.nowSize, int64(16))
time.Sleep(4 * time.Second)
require.Equal(t, lru.nowSize, int64(8))
}

View File

@ -53,6 +53,6 @@ func TestSingleCache_DelToClearSize(t *testing.T) {
workEvent.StartWaitEvent(2 * time.Second)
}
time.Sleep(5 * time.Second)
time.Sleep(5<<10 * time.Second)
logx.Info("end size is %d", lru.nowSize)
}

View File

@ -4,30 +4,27 @@ import (
"context"
"gitee.com/timedb/wheatCache/pkg/errorx"
"gitee.com/timedb/wheatCache/pkg/event"
"gitee.com/timedb/wheatCache/pkg/logx"
"time"
)
func (lru *SingleCache) lruSingleWork() interface{} {
func (lru *SingleCache) lruSingleWork() {
ctx := context.Background()
for {
workEvent := lru.lruConsumer.Receive(ctx)
workFunc, ok := workEvent.GetValue(WorkFuncEventKey)
if !ok {
workEvent.SetResultErr(errorx.LruNotWorkFuncEventErr())
continue
}
switch workEvent.GetEventName() {
case OptionEventName:
workFunc, ok := workEvent.GetValue(WorkFuncEventKey)
if !ok {
workEvent.SetResultErr(errorx.LruNotWorkFuncEventErr())
continue
}
if work, ok := workFunc.(event.EventWorkFunc); ok {
workEvent.ExecWorkAndSendResult(work)
}
case CleanEventName:
workFunc, ok := workEvent.GetValue(WorkFuncEventKey)
if !ok {
workEvent.SetResultErr(errorx.LruNotWorkFuncEventErr())
continue
}
// 对当前的io数量进行判断
ioNum := lru.GetDriver().GetLength()
if ioNum > lru.lruMaxDiverSize*1/2 {
@ -37,8 +34,85 @@ func (lru *SingleCache) lruSingleWork() interface{} {
if work, ok := workFunc.(event.EventWorkFunc); ok {
workEvent.ExecWorkAndSendResult(work)
}
default:
return errorx.New("no this name")
case TtlEventName:
if work, ok := workFunc.(event.EventWorkFunc); ok {
workEvent.ExecWorkAndSendResult(work)
}
}
}
}
// 执行过期事件
func (lru *SingleCache) lruTtlWork() {
ttlEvent := event.NewEvent(TtlEventName)
ctx := context.Background()
work := event.EventWorkFunc(func() (interface{}, error) {
beforeTime := time.Now().Unix()
cle := lru.lruTtlManage.detachNum
if cle > len(lru.lruTtlManage.memoryKey) {
cle = len(lru.lruTtlManage.memoryKey)
}
// TODO send keys to middle
keys := make([]string, 0)
for i := 0; i < cle; i++ {
key := <-lru.lruTtlManage.memoryKey
keys = append(keys, key)
lru.delByKeyAndExTtl(key, beforeTime)
}
return nil, nil
})
ttlEvent.SetValue(WorkFuncEventKey, work)
cleanTTlTicker := time.NewTicker(500 * time.Millisecond)
defer cleanTTlTicker.Stop()
gatherTTlTicker := time.NewTicker(1 * time.Millisecond)
defer gatherTTlTicker.Stop()
for {
select {
// 清理事件
case <-cleanTTlTicker.C:
if len(lru.lruTtlManage.memoryKey) == 0 {
continue
}
ttlEvent.InitWaitEvent()
lru.lruCleanProduce.Call(ctx, ttlEvent)
_, err := ttlEvent.StartWaitEvent(time.Second * 2)
if err != nil {
logx.With(ctx, lru.middleProduce).Errorln(err)
}
case <-gatherTTlTicker.C:
lru.lruTtlManage.ttlKeyToMemoryBySecond()
}
}
}
func (lru *SingleCache) cleanWork() {
cxt := context.Background()
lruCleanEvent := event.NewEvent(CleanEventName)
work := event.EventWorkFunc(func() (interface{}, error) {
err := lru.DelToClearSize()
return nil, err
})
lruCleanEvent.SetValue(WorkFuncEventKey, work)
for {
time.Sleep(2 * time.Second)
if lru.clearSize < lru.nowSize {
lruCleanEvent.InitWaitEvent()
lru.lruCleanProduce.Call(cxt, lruCleanEvent)
_, err := lruCleanEvent.StartWaitEvent(defaultWaitTime)
if err != nil {
logx.With(cxt, lru.middleProduce).Errorln(err)
}
}
}
}

View File

@ -2,13 +2,20 @@ package middle_msg
import "time"
var LruCleanContextName = "lru-clean-context"
const LruCleanContextName = "lru-clean-context"
type LruCleanContext struct {
Keys []string
BeforeCleanSize int64
BehindCleanSize int64
StartTime time.Duration
EndTime time.Duration
StartTime time.Time
EndTime time.Time
}
const LruTTlContextName = "lru-ttl-context"
type LruTTlContext struct {
Keys []string
CleanTime time.Time
}

View File

@ -52,13 +52,13 @@ func (m *MiddleWare) loadPlugins() {
pluginsContext := make(map[string][]plugins.MiddleToolsInterface)
for middleMsg, pluNames := range plug {
for msg, pluNames := range plug {
pulgSingle := make([]plugins.MiddleToolsInterface, 0)
for _, name := range pluNames {
pulgSingle = append(pulgSingle, pluginsMap[name])
}
pluginsContext[middleMsg] = pulgSingle
pluginsContext[msg] = pulgSingle
}
m.plugins = pluginsContext

View File

@ -80,8 +80,7 @@ type SetResponse struct {
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
UpdateSize int64 `protobuf:"varint,1,opt,name=update_size,json=updateSize,proto3" json:"update_size,omitempty"`
Result string `protobuf:"bytes,2,opt,name=result,proto3" json:"result,omitempty"`
Result string `protobuf:"bytes,2,opt,name=result,proto3" json:"result,omitempty"`
}
func (x *SetResponse) Reset() {
@ -116,13 +115,6 @@ func (*SetResponse) Descriptor() ([]byte, []int) {
return file_stringx_proto_rawDescGZIP(), []int{1}
}
func (x *SetResponse) GetUpdateSize() int64 {
if x != nil {
return x.UpdateSize
}
return 0
}
func (x *SetResponse) GetResult() string {
if x != nil {
return x.Result
@ -182,8 +174,7 @@ type GetResponse struct {
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
UpdateSize int64 `protobuf:"varint,1,opt,name=update_size,json=updateSize,proto3" json:"update_size,omitempty"`
Result string `protobuf:"bytes,2,opt,name=result,proto3" json:"result,omitempty"`
Result string `protobuf:"bytes,2,opt,name=result,proto3" json:"result,omitempty"`
}
func (x *GetResponse) Reset() {
@ -218,13 +209,6 @@ func (*GetResponse) Descriptor() ([]byte, []int) {
return file_stringx_proto_rawDescGZIP(), []int{3}
}
func (x *GetResponse) GetUpdateSize() int64 {
if x != nil {
return x.UpdateSize
}
return 0
}
func (x *GetResponse) GetResult() string {
if x != nil {
return x.Result
@ -292,8 +276,7 @@ type AddResponse struct {
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
UpdateSize int64 `protobuf:"varint,1,opt,name=update_size,json=updateSize,proto3" json:"update_size,omitempty"`
Result string `protobuf:"bytes,2,opt,name=result,proto3" json:"result,omitempty"`
Result string `protobuf:"bytes,2,opt,name=result,proto3" json:"result,omitempty"`
}
func (x *AddResponse) Reset() {
@ -328,13 +311,6 @@ func (*AddResponse) Descriptor() ([]byte, []int) {
return file_stringx_proto_rawDescGZIP(), []int{5}
}
func (x *AddResponse) GetUpdateSize() int64 {
if x != nil {
return x.UpdateSize
}
return 0
}
func (x *AddResponse) GetResult() string {
if x != nil {
return x.Result
@ -402,8 +378,7 @@ type ReduceResponse struct {
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
UpdateSize int64 `protobuf:"varint,1,opt,name=update_size,json=updateSize,proto3" json:"update_size,omitempty"`
Result string `protobuf:"bytes,2,opt,name=result,proto3" json:"result,omitempty"`
Result string `protobuf:"bytes,2,opt,name=result,proto3" json:"result,omitempty"`
}
func (x *ReduceResponse) Reset() {
@ -438,13 +413,6 @@ func (*ReduceResponse) Descriptor() ([]byte, []int) {
return file_stringx_proto_rawDescGZIP(), []int{7}
}
func (x *ReduceResponse) GetUpdateSize() int64 {
if x != nil {
return x.UpdateSize
}
return 0
}
func (x *ReduceResponse) GetResult() string {
if x != nil {
return x.Result
@ -519,8 +487,6 @@ type SetbitResponse struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
UpdateSize int64 `protobuf:"varint,1,opt,name=update_size,json=updateSize,proto3" json:"update_size,omitempty"`
}
func (x *SetbitResponse) Reset() {
@ -555,13 +521,6 @@ func (*SetbitResponse) Descriptor() ([]byte, []int) {
return file_stringx_proto_rawDescGZIP(), []int{9}
}
func (x *SetbitResponse) GetUpdateSize() int64 {
if x != nil {
return x.UpdateSize
}
return 0
}
type GetbitRequest struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
@ -622,8 +581,7 @@ type GetbitResponse struct {
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
UpdateSize int64 `protobuf:"varint,1,opt,name=update_size,json=updateSize,proto3" json:"update_size,omitempty"`
Val bool `protobuf:"varint,2,opt,name=val,proto3" json:"val,omitempty"`
Val bool `protobuf:"varint,2,opt,name=val,proto3" json:"val,omitempty"`
}
func (x *GetbitResponse) Reset() {
@ -658,13 +616,6 @@ func (*GetbitResponse) Descriptor() ([]byte, []int) {
return file_stringx_proto_rawDescGZIP(), []int{11}
}
func (x *GetbitResponse) GetUpdateSize() int64 {
if x != nil {
return x.UpdateSize
}
return 0
}
func (x *GetbitResponse) GetVal() bool {
if x != nil {
return x.Val
@ -680,55 +631,42 @@ var file_stringx_proto_rawDesc = []byte{
0x65, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x1a, 0x0a, 0x03, 0x6b, 0x65, 0x79,
0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x08, 0x2e, 0x42, 0x61, 0x73, 0x65, 0x4b, 0x65, 0x79,
0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x10, 0x0a, 0x03, 0x76, 0x61, 0x6c, 0x18, 0x02, 0x20, 0x01,
0x28, 0x09, 0x52, 0x03, 0x76, 0x61, 0x6c, 0x22, 0x46, 0x0a, 0x0b, 0x53, 0x65, 0x74, 0x52, 0x65,
0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x1f, 0x0a, 0x0b, 0x75, 0x70, 0x64, 0x61, 0x74, 0x65,
0x5f, 0x73, 0x69, 0x7a, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x03, 0x52, 0x0a, 0x75, 0x70, 0x64,
0x61, 0x74, 0x65, 0x53, 0x69, 0x7a, 0x65, 0x12, 0x16, 0x0a, 0x06, 0x72, 0x65, 0x73, 0x75, 0x6c,
0x28, 0x09, 0x52, 0x03, 0x76, 0x61, 0x6c, 0x22, 0x25, 0x0a, 0x0b, 0x53, 0x65, 0x74, 0x52, 0x65,
0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x16, 0x0a, 0x06, 0x72, 0x65, 0x73, 0x75, 0x6c, 0x74,
0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x72, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x22, 0x28,
0x0a, 0x0a, 0x47, 0x65, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x1a, 0x0a, 0x03,
0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x08, 0x2e, 0x42, 0x61, 0x73, 0x65,
0x4b, 0x65, 0x79, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x22, 0x25, 0x0a, 0x0b, 0x47, 0x65, 0x74, 0x52,
0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x16, 0x0a, 0x06, 0x72, 0x65, 0x73, 0x75, 0x6c,
0x74, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x72, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x22,
0x28, 0x0a, 0x0a, 0x47, 0x65, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x1a, 0x0a,
0x42, 0x0a, 0x0a, 0x41, 0x64, 0x64, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x1a, 0x0a,
0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x08, 0x2e, 0x42, 0x61, 0x73,
0x65, 0x4b, 0x65, 0x79, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x22, 0x46, 0x0a, 0x0b, 0x47, 0x65, 0x74,
0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x1f, 0x0a, 0x0b, 0x75, 0x70, 0x64, 0x61,
0x74, 0x65, 0x5f, 0x73, 0x69, 0x7a, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x03, 0x52, 0x0a, 0x75,
0x70, 0x64, 0x61, 0x74, 0x65, 0x53, 0x69, 0x7a, 0x65, 0x12, 0x16, 0x0a, 0x06, 0x72, 0x65, 0x73,
0x75, 0x6c, 0x74, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x72, 0x65, 0x73, 0x75, 0x6c,
0x74, 0x22, 0x42, 0x0a, 0x0a, 0x41, 0x64, 0x64, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12,
0x1a, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x08, 0x2e, 0x42,
0x61, 0x73, 0x65, 0x4b, 0x65, 0x79, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x18, 0x0a, 0x07, 0x72,
0x65, 0x6e, 0x65, 0x77, 0x61, 0x6c, 0x18, 0x02, 0x20, 0x01, 0x28, 0x05, 0x52, 0x07, 0x72, 0x65,
0x6e, 0x65, 0x77, 0x61, 0x6c, 0x22, 0x46, 0x0a, 0x0b, 0x41, 0x64, 0x64, 0x52, 0x65, 0x73, 0x70,
0x6f, 0x6e, 0x73, 0x65, 0x12, 0x1f, 0x0a, 0x0b, 0x75, 0x70, 0x64, 0x61, 0x74, 0x65, 0x5f, 0x73,
0x69, 0x7a, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x03, 0x52, 0x0a, 0x75, 0x70, 0x64, 0x61, 0x74,
0x65, 0x53, 0x69, 0x7a, 0x65, 0x12, 0x16, 0x0a, 0x06, 0x72, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x18,
0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x72, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x22, 0x45, 0x0a,
0x0d, 0x52, 0x65, 0x64, 0x75, 0x63, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x1a,
0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x08, 0x2e, 0x42, 0x61,
0x73, 0x65, 0x4b, 0x65, 0x79, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x18, 0x0a, 0x07, 0x72, 0x65,
0x6e, 0x65, 0x77, 0x61, 0x6c, 0x18, 0x02, 0x20, 0x01, 0x28, 0x05, 0x52, 0x07, 0x72, 0x65, 0x6e,
0x65, 0x77, 0x61, 0x6c, 0x22, 0x49, 0x0a, 0x0e, 0x52, 0x65, 0x64, 0x75, 0x63, 0x65, 0x52, 0x65,
0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x1f, 0x0a, 0x0b, 0x75, 0x70, 0x64, 0x61, 0x74, 0x65,
0x5f, 0x73, 0x69, 0x7a, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x03, 0x52, 0x0a, 0x75, 0x70, 0x64,
0x61, 0x74, 0x65, 0x53, 0x69, 0x7a, 0x65, 0x12, 0x16, 0x0a, 0x06, 0x72, 0x65, 0x73, 0x75, 0x6c,
0x74, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x72, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x22,
0x53, 0x0a, 0x0d, 0x53, 0x65, 0x74, 0x62, 0x69, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74,
0x12, 0x1a, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x08, 0x2e,
0x42, 0x61, 0x73, 0x65, 0x4b, 0x65, 0x79, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x10, 0x0a, 0x03,
0x76, 0x61, 0x6c, 0x18, 0x02, 0x20, 0x01, 0x28, 0x08, 0x52, 0x03, 0x76, 0x61, 0x6c, 0x12, 0x14,
0x0a, 0x05, 0x6f, 0x66, 0x66, 0x65, 0x72, 0x18, 0x03, 0x20, 0x01, 0x28, 0x05, 0x52, 0x05, 0x6f,
0x66, 0x66, 0x65, 0x72, 0x22, 0x31, 0x0a, 0x0e, 0x53, 0x65, 0x74, 0x62, 0x69, 0x74, 0x52, 0x65,
0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x1f, 0x0a, 0x0b, 0x75, 0x70, 0x64, 0x61, 0x74, 0x65,
0x5f, 0x73, 0x69, 0x7a, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x03, 0x52, 0x0a, 0x75, 0x70, 0x64,
0x61, 0x74, 0x65, 0x53, 0x69, 0x7a, 0x65, 0x22, 0x41, 0x0a, 0x0d, 0x47, 0x65, 0x74, 0x62, 0x69,
0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x1a, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18,
0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x08, 0x2e, 0x42, 0x61, 0x73, 0x65, 0x4b, 0x65, 0x79, 0x52,
0x03, 0x6b, 0x65, 0x79, 0x12, 0x14, 0x0a, 0x05, 0x6f, 0x66, 0x66, 0x65, 0x72, 0x18, 0x03, 0x20,
0x01, 0x28, 0x05, 0x52, 0x05, 0x6f, 0x66, 0x66, 0x65, 0x72, 0x22, 0x43, 0x0a, 0x0e, 0x47, 0x65,
0x74, 0x62, 0x69, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x1f, 0x0a, 0x0b,
0x75, 0x70, 0x64, 0x61, 0x74, 0x65, 0x5f, 0x73, 0x69, 0x7a, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28,
0x03, 0x52, 0x0a, 0x75, 0x70, 0x64, 0x61, 0x74, 0x65, 0x53, 0x69, 0x7a, 0x65, 0x12, 0x10, 0x0a,
0x03, 0x76, 0x61, 0x6c, 0x18, 0x02, 0x20, 0x01, 0x28, 0x08, 0x52, 0x03, 0x76, 0x61, 0x6c, 0x42,
0x0b, 0x5a, 0x09, 0x70, 0x6b, 0x67, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x06, 0x70, 0x72,
0x6f, 0x74, 0x6f, 0x33,
0x65, 0x4b, 0x65, 0x79, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x18, 0x0a, 0x07, 0x72, 0x65, 0x6e,
0x65, 0x77, 0x61, 0x6c, 0x18, 0x02, 0x20, 0x01, 0x28, 0x05, 0x52, 0x07, 0x72, 0x65, 0x6e, 0x65,
0x77, 0x61, 0x6c, 0x22, 0x25, 0x0a, 0x0b, 0x41, 0x64, 0x64, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e,
0x73, 0x65, 0x12, 0x16, 0x0a, 0x06, 0x72, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x18, 0x02, 0x20, 0x01,
0x28, 0x09, 0x52, 0x06, 0x72, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x22, 0x45, 0x0a, 0x0d, 0x52, 0x65,
0x64, 0x75, 0x63, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x1a, 0x0a, 0x03, 0x6b,
0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x08, 0x2e, 0x42, 0x61, 0x73, 0x65, 0x4b,
0x65, 0x79, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x18, 0x0a, 0x07, 0x72, 0x65, 0x6e, 0x65, 0x77,
0x61, 0x6c, 0x18, 0x02, 0x20, 0x01, 0x28, 0x05, 0x52, 0x07, 0x72, 0x65, 0x6e, 0x65, 0x77, 0x61,
0x6c, 0x22, 0x28, 0x0a, 0x0e, 0x52, 0x65, 0x64, 0x75, 0x63, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f,
0x6e, 0x73, 0x65, 0x12, 0x16, 0x0a, 0x06, 0x72, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x18, 0x02, 0x20,
0x01, 0x28, 0x09, 0x52, 0x06, 0x72, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x22, 0x53, 0x0a, 0x0d, 0x53,
0x65, 0x74, 0x62, 0x69, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x1a, 0x0a, 0x03,
0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x08, 0x2e, 0x42, 0x61, 0x73, 0x65,
0x4b, 0x65, 0x79, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x10, 0x0a, 0x03, 0x76, 0x61, 0x6c, 0x18,
0x02, 0x20, 0x01, 0x28, 0x08, 0x52, 0x03, 0x76, 0x61, 0x6c, 0x12, 0x14, 0x0a, 0x05, 0x6f, 0x66,
0x66, 0x65, 0x72, 0x18, 0x03, 0x20, 0x01, 0x28, 0x05, 0x52, 0x05, 0x6f, 0x66, 0x66, 0x65, 0x72,
0x22, 0x10, 0x0a, 0x0e, 0x53, 0x65, 0x74, 0x62, 0x69, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e,
0x73, 0x65, 0x22, 0x41, 0x0a, 0x0d, 0x47, 0x65, 0x74, 0x62, 0x69, 0x74, 0x52, 0x65, 0x71, 0x75,
0x65, 0x73, 0x74, 0x12, 0x1a, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b,
0x32, 0x08, 0x2e, 0x42, 0x61, 0x73, 0x65, 0x4b, 0x65, 0x79, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12,
0x14, 0x0a, 0x05, 0x6f, 0x66, 0x66, 0x65, 0x72, 0x18, 0x03, 0x20, 0x01, 0x28, 0x05, 0x52, 0x05,
0x6f, 0x66, 0x66, 0x65, 0x72, 0x22, 0x22, 0x0a, 0x0e, 0x47, 0x65, 0x74, 0x62, 0x69, 0x74, 0x52,
0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x10, 0x0a, 0x03, 0x76, 0x61, 0x6c, 0x18, 0x02,
0x20, 0x01, 0x28, 0x08, 0x52, 0x03, 0x76, 0x61, 0x6c, 0x42, 0x0b, 0x5a, 0x09, 0x70, 0x6b, 0x67,
0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
}
var (

View File

@ -5,35 +5,32 @@ package structure
const (
DEFAULT_KEY = iota
STRING_X
)
const (
DEFAULT_COMM = iota
SET
DEFAULT_COMM = iota
SET
GET
ADD
REDUCE
SETBIT
GETBIT
)
var CommKeyString = map[string]int {"set": STRING_X,
"get": STRING_X,
"add": STRING_X,
var CommKeyString = map[string]int{"set": STRING_X,
"get": STRING_X,
"add": STRING_X,
"reduce": STRING_X,
"setbit": STRING_X,
"getbit": STRING_X,
}
var CommKey = map[int]int {SET: STRING_X,
GET: STRING_X,
ADD: STRING_X,
var CommKey = map[int]int{SET: STRING_X,
GET: STRING_X,
ADD: STRING_X,
REDUCE: STRING_X,
SETBIT: STRING_X,
GETBIT: STRING_X,
}
}

View File

@ -0,0 +1,307 @@
package skiplist
import (
"crypto/rand"
"fmt"
"math"
"math/big"
)
type skipListNode struct {
score float64
val interface{}
next *skipListNode
down *skipListNode
up *skipListNode
pre *skipListNode
}
func newSkipListNode(score float64, val interface{}) *skipListNode {
if val == nil {
return nil
}
return &skipListNode{
score: score,
val: val,
}
}
type SkipList struct {
head *skipListNode
tail *skipListNode
length int
levels int // 单前层级
maxLevels int // 最大层级
}
func NewSkipList(maxLevel int) *SkipList {
skl := new(SkipList)
skl.head = &skipListNode{score: math.MinInt64}
skl.tail = &skipListNode{score: math.MaxInt64}
skl.head.next = skl.tail
skl.tail.pre = skl.head
skl.length = 0
skl.levels = 1
skl.maxLevels = maxLevel
return skl
}
func (s *SkipList) Length() int {
return s.length
}
func (s *SkipList) Levels() int {
return s.levels
}
func (s *SkipList) MaxLength() int {
return s.maxLevels
}
// Insert 插入数据
func (s *SkipList) Insert(score float64, val interface{}) {
node := newSkipListNode(score, val)
f := s.searchNode(score)
s.insertAfter(f, node)
// 随机上升
for newLevel := 1; ; newLevel++ {
rander, _ := rand.Int(rand.Reader, big.NewInt(2))
if rander.Int64() == 0 || newLevel >= s.maxLevels {
break
}
// 上升层级
if newLevel >= s.levels && s.levels < s.maxLevels {
s.newLevels()
}
for f.up == nil {
f = f.pre
}
f = f.up
tmpNode := &skipListNode{score: score}
node.up = tmpNode
tmpNode.down = node
s.insertAfter(f, tmpNode)
node = tmpNode
}
s.length += 1
}
// Pop 弹出随机一个 score 值的 val
func (s *SkipList) Pop(score float64) interface{} {
f := s.searchNode(score)
if f.score == score {
return nil
}
v := f.val
s.delSkipListNode(f)
return v
}
// Get 获取 随机一个 score 值的 val
func (s *SkipList) Get(score float64) interface{} {
node := s.searchNode(score)
if node != nil && node.score == score {
return node.val
}
return nil
}
// GetAll 获取全部的 ALL
func (s *SkipList) GetAll(score float64) []interface{} {
node := s.searchNode(score)
values := make([]interface{}, 0)
p := node
// pre
for p.score == score {
values = append(values, p.val)
p = p.pre
}
// next
p = node.next
for p.score == score {
values = append(values, p.val)
p = p.next
}
return values
}
// RemoveAll 删除全部
func (s *SkipList) RemoveAll(score float64) {
node := s.searchNode(score)
p := node
delNode := make([]*skipListNode, 0)
// pre
for p.score == score {
delNode = append(delNode, p)
p = p.pre
}
// next
p = node.next
for p.score == score {
delNode = append(delNode, p)
p = p.next
}
s.delSkipListNode(delNode...)
}
// ClearLeft 删除 小等于 score 的全部节点
func (s *SkipList) ClearLeft(score float64) *skipListNode {
head := s.head
for head.down != nil {
p := head
for p.score <= score {
p = p.next
}
head.next = p
p.pre = head
head = head.down
}
node := s.searchNode(score)
for node.score <= score {
node = node.next
}
oldNode := node.pre
head.next = node
node.pre = head
return oldNode
}
func (s *SkipList) PopLeft(score float64) []interface{} {
node := s.ClearLeft(score)
values := make([]interface{}, 0, 10)
for node.score != math.MinInt64 {
values = append(values, node.val)
node = node.pre
}
return values
}
// 查找节点位置
func (s *SkipList) searchNode(score float64) *skipListNode {
p := s.head
for p != nil {
// 尾部是极大极小值
if score == p.score {
if p.down == nil {
return p
}
p = p.down
continue
}
if score > p.score && score < p.next.score {
if p.down == nil {
return p
}
p = p.down
continue
}
p = p.next
}
return p
}
// 在节点后插入新节点
func (s *SkipList) insertAfter(pNode *skipListNode, newNode *skipListNode) {
newNode.next = pNode.next
newNode.pre = pNode
pNode.next.pre = newNode
pNode.next = newNode
}
// 添加 一个新的 levels 层
func (s *SkipList) newLevels() {
newHead := &skipListNode{score: math.MinInt64}
newTail := &skipListNode{score: math.MaxInt64}
newHead.next = newTail
newTail.pre = newHead
s.head.up = newHead
newHead.down = s.head
s.tail.up = newTail
newTail.down = s.tail
s.head = newHead
s.tail = newTail
s.levels++
}
func (s *SkipList) debugPrint() {
mapScore := make(map[float64]int)
p := s.head
for p.down != nil {
p = p.down
}
index := 0
for p != nil {
mapScore[p.score] = index
p = p.next
index++
}
p = s.head
for i := 0; i < s.levels; i++ {
q := p
preIndex := 0
for q != nil {
s := q.score
if s == math.MinInt64 {
fmt.Printf("%s", "BEGIN")
q = q.next
continue
}
index := mapScore[s]
c := (index - preIndex - 1) * 6
for m := 0; m < c; m++ {
fmt.Print("-")
}
if s == math.MaxInt64 {
fmt.Printf("-->%s\n", "END")
} else {
fmt.Printf("-->%3d", int(s))
preIndex = index
}
q = q.next
}
p = p.down
}
}
func (s *SkipList) delSkipListNode(sKm ...*skipListNode) {
for _, f := range sKm {
for f != nil {
f.pre.next = f.next
f.next.pre = f.pre
f = f.up
}
}
}

View File

@ -0,0 +1,68 @@
package skiplist
import (
"fmt"
"gitee.com/timedb/wheatCache/pkg/logx"
"github.com/stretchr/testify/require"
"math/rand"
"testing"
"time"
)
// 时间测试
func TestSkiplist_InsertTimeB(t *testing.T) {
// 测试 达到 maxLevel = 18 时性能 OK
list := NewSkipList(18)
for i := 0; i < 100; i++ {
list.Insert(float64(i), i)
}
list.debugPrint()
}
func TestSkiplist_InsertTimeA(t *testing.T) {
// 测试 达到 maxLevel = 18 时性能 OK
list := NewSkipList(20)
for i := 0; i < 200; i++ {
list.Insert(float64(rand.Intn(100)), i)
}
list.debugPrint()
start := time.Now()
list.Insert(7890, 1)
end := time.Now()
fmt.Println(end.UnixNano() - start.UnixNano())
}
func TestSkipList_Insert(t *testing.T) {
s := NewSkipList(18)
s.Insert(20, 1)
s.Insert(30, 2)
s.Insert(11, 3)
s.Insert(20, 11)
val := s.Get(30)
require.Equal(t, 2, val)
val = s.Get(11)
require.Equal(t, 3, val)
require.Equal(t, s.GetAll(20), []interface{}{1, 11})
s.RemoveAll(20)
require.Equal(t, s.GetAll(20), []interface{}{})
}
func Test_skipList_ClearLeft(t *testing.T) {
s := NewSkipList(18)
for i := 0; i < 100; i++ {
s.Insert(float64(rand.Intn(100)), i)
}
val := s.PopLeft(50)
logx.Debug("val:%v", val)
s.debugPrint()
require.Equal(t, s.Get(20), nil)
}