!80 修复 skiplist 的 bug

Merge pull request !80 from bandl/fix-skiplist
This commit is contained in:
bandl 2021-10-28 06:52:59 +00:00 committed by Gitee
commit bed3f1893a
6 changed files with 106 additions and 55 deletions

View File

@ -13,8 +13,10 @@ func TestClient(t *testing.T) {
cli, err := NewWheatClient("127.0.0.1:5891", middle.WithUnaryColonyClient) cli, err := NewWheatClient("127.0.0.1:5891", middle.WithUnaryColonyClient)
require.NoError(t, err) require.NoError(t, err)
ctx := context.Background() ctx := context.Background()
bKey := &proto.BaseKey{ bKey := &proto.BaseKey{
Key: "apple", Key: "apple",
Ttl: 10,
} }
resp, err := cli.Set(ctx, &proto.SetRequest{ resp, err := cli.Set(ctx, &proto.SetRequest{
Key: bKey, Key: bKey,
@ -29,5 +31,20 @@ func TestClient(t *testing.T) {
}) })
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, getResp.Result, "yyyy") require.Equal(t, getResp.Result, "yyyy")
}
func TestClientGet(t *testing.T) {
cli, err := NewWheatClient("127.0.0.1:5891", middle.WithUnaryColonyClient)
require.NoError(t, err)
ctx := context.Background()
bKey := &proto.BaseKey{
Key: "apple",
}
getResp, err := cli.Get(ctx, &proto.GetRequest{
Key: bKey,
})
require.NoError(t, err)
require.Equal(t, getResp.Result, "yyyy")
} }

View File

@ -38,12 +38,13 @@ func TestSingleCache_DelToClearSize(t *testing.T) {
lru := NewLRUCache() lru := NewLRUCache()
produce := event.NewProduce(lru.GetDriver()) produce := event.NewProduce(lru.GetDriver())
for i := int32(20000); i >= 1; i-- { for i := int32(20000); i > 0; i-- {
workEvent := produce.NewEvent(OptionEventName) workEvent := produce.NewEvent(OptionEventName)
workEvent.SetValue(WorkFuncEventKey, event.EventWorkFunc(func() (interface{}, error) { workEvent.SetValue(WorkFuncEventKey, event.EventWorkFunc(func() (interface{}, error) {
v1 := stringx.NewStringSingle() v1 := stringx.NewStringSingle()
key := proto.BaseKey{ key := proto.BaseKey{
Key: string(i), Key: string(i),
Ttl: 1,
} }
u := v1.Setbit(i, true) u := v1.Setbit(i, true)
lru.Add(&key, v1) lru.Add(&key, v1)
@ -55,6 +56,7 @@ func TestSingleCache_DelToClearSize(t *testing.T) {
produce.Recovery(workEvent) produce.Recovery(workEvent)
} }
time.Sleep(5 * time.Second) logx.Info("start size is %d", lru.nowSize)
time.Sleep(10 * time.Second)
logx.Info("end size is %d", lru.nowSize) logx.Info("end size is %d", lru.nowSize)
} }

View File

@ -7,6 +7,7 @@ import (
"gitee.com/timedb/wheatCache/pkg/errorx" "gitee.com/timedb/wheatCache/pkg/errorx"
"gitee.com/timedb/wheatCache/pkg/event" "gitee.com/timedb/wheatCache/pkg/event"
"gitee.com/timedb/wheatCache/pkg/logx" "gitee.com/timedb/wheatCache/pkg/logx"
mMsg "gitee.com/timedb/wheatCache/pkg/middle-msg"
) )
func (lru *SingleCache) lruSingleWork() { func (lru *SingleCache) lruSingleWork() {
@ -48,54 +49,61 @@ func (lru *SingleCache) lruSingleWork() {
func (lru *SingleCache) lruTtlWork() { func (lru *SingleCache) lruTtlWork() {
ctx := context.Background() ctx := context.Background()
work := event.EventWorkFunc(func() (interface{}, error) {
beforeTime := time.Now().Unix() // 清理事件
cle := lru.lruTtlManage.detachNum go func() {
if cle > len(lru.lruTtlManage.memoryKey) { work := event.EventWorkFunc(func() (interface{}, error) {
cle = len(lru.lruTtlManage.memoryKey)
}
// TODO send keys to middle beforeTime := time.Now().Unix()
keys := make([]string, 0) cle := lru.lruTtlManage.detachNum
for i := 0; i < cle; i++ { if cle > len(lru.lruTtlManage.memoryKey) {
key := <-lru.lruTtlManage.memoryKey cle = len(lru.lruTtlManage.memoryKey)
keys = append(keys, key) }
lru.delByKeyAndExTtl(key, beforeTime)
}
return nil, nil keys := make([]string, 0)
}) for i := 0; i < cle; i++ {
key := <-lru.lruTtlManage.memoryKey
keys = append(keys, key)
lru.delByKeyAndExTtl(key, beforeTime)
}
return keys, nil
})
cleanTTlTicker := time.NewTicker(500 * time.Millisecond) cleanTTlTicker := time.NewTicker(500 * time.Millisecond)
defer cleanTTlTicker.Stop() defer cleanTTlTicker.Stop()
gatherTTlTicker := time.NewTicker(1 * time.Millisecond)
defer gatherTTlTicker.Stop()
for {
select {
// 清理事件
case <-cleanTTlTicker.C:
ttlEvent := lru.lruCleanProduce.NewEvent(TtlEventName)
ttlEvent.SetValue(WorkFuncEventKey, work)
for {
// 清理事件
<-cleanTTlTicker.C
if len(lru.lruTtlManage.memoryKey) == 0 { if len(lru.lruTtlManage.memoryKey) == 0 {
continue continue
} }
ttlEvent := lru.lruCleanProduce.NewEvent(TtlEventName)
ttlEvent.SetValue(WorkFuncEventKey, work)
ttlEvent.InitWaitEvent() ttlEvent.InitWaitEvent()
lru.lruCleanProduce.Call(ctx, ttlEvent) lru.lruCleanProduce.Call(ctx, ttlEvent)
_, err := ttlEvent.StartWaitEvent(time.Second * 2) keys, err := ttlEvent.StartWaitEvent(time.Second * 2)
lru.lruCleanProduce.Recovery(ttlEvent) lru.lruCleanProduce.Recovery(ttlEvent)
mMsg.SendMiddleMsg(ctx, lru.middleProduce, mMsg.LruTTlContext{
Keys: keys.([]string),
CleanTime: time.Now(),
})
if err != nil { if err != nil {
logx.With(ctx, lru.middleProduce).Errorln(err) logx.With(ctx, lru.middleProduce).Errorln(err)
} }
// 收集过期的 key
case <-gatherTTlTicker.C:
lru.lruTtlManage.ttlKeyToMemoryBySecond()
} }
}()
// 收集事件
for {
time.Sleep(1 * time.Second)
lru.lruTtlManage.ttlKeyToMemoryBySecond()
} }
} }
func (lru *SingleCache) cleanWork() { func (lru *SingleCache) cleanWork() {

View File

@ -91,7 +91,7 @@ func TestValue_ChangeValue(t *testing.T) {
chanageLen := value.ChangeValueLength(func() { chanageLen := value.ChangeValueLength(func() {
value.SetString("小葵花课堂开课了") value.SetString("小葵花课堂开课了")
}) })
require.Equal(t, chanageLen, int64(value.GetLength()-oldLen)) require.Equal(t, int64(chanageLen), int64(value.GetLength()-oldLen))
lens := value.ChangeValueLength(func() { lens := value.ChangeValueLength(func() {
value.SetInt(100) value.SetInt(100)

View File

@ -17,10 +17,6 @@ type skipListNode struct {
} }
func newSkipListNode(score float64, val interface{}) *skipListNode { func newSkipListNode(score float64, val interface{}) *skipListNode {
if val == nil {
return nil
}
return &skipListNode{ return &skipListNode{
score: score, score: score,
val: val, val: val,
@ -285,33 +281,50 @@ func (s *SkipList) PopRangeByScore(startScore float64, endScore float64) []inter
// 查找节点位置 // 查找节点位置
func (s *SkipList) searchNode(score float64) *skipListNode { func (s *SkipList) searchNode(score float64) *skipListNode {
const (
exitStatus = iota
initStatus
downStatus
nextStatus
)
status := initStatus
p := s.head p := s.head
for p != nil {
// 尾部是极大极小值 for {
if score == p.score { switch status {
case exitStatus:
return p
case initStatus:
if p.score == score || p.next.score > score {
status = downStatus
continue
}
status = nextStatus
case downStatus:
if p.down == nil { if p.down == nil {
return p status = exitStatus
continue
} }
p = p.down p = p.down
continue status = initStatus
}
if score > p.score && score < p.next.score { case nextStatus:
if p.down == nil { // 不知名的 bug 保护一下
return p if p.next.score != math.MaxInt64 {
p = p.next
} }
p = p.down status = initStatus
continue
} }
p = p.next
} }
return p
} }
// 在节点后插入新节点 // 在节点后插入新节点
func (s *SkipList) insertAfter(pNode *skipListNode, newNode *skipListNode) { func (s *SkipList) insertAfter(pNode *skipListNode, newNode *skipListNode) {
if pNode == nil || newNode == nil {
return
}
newNode.next = pNode.next newNode.next = pNode.next
newNode.pre = pNode newNode.pre = pNode
pNode.next.pre = newNode pNode.next.pre = newNode

View File

@ -44,12 +44,13 @@ func TestSkipList_Insert(t *testing.T) {
s.Insert(30, 2) s.Insert(30, 2)
s.Insert(11, 3) s.Insert(11, 3)
s.Insert(20, 11) s.Insert(20, 11)
s.debugPrint()
val := s.Get(30) val := s.Get(30)
require.Equal(t, 2, val) require.Equal(t, 2, val)
val = s.Get(11) val = s.Get(11)
require.Equal(t, 3, val) require.Equal(t, 3, val)
require.Equal(t, s.GetAll(20), []interface{}{1, 11}) require.Len(t, s.GetAll(20), 2)
s.RemoveAll(20) s.RemoveAll(20)
require.Equal(t, s.GetAll(20), []interface{}{}) require.Equal(t, s.GetAll(20), []interface{}{})
@ -103,3 +104,13 @@ func TestSkipList_PopRight(t *testing.T) {
s.debugPrint() s.debugPrint()
} }
} }
func Test_skipList_InsertTime(t *testing.T) {
now := time.Now()
s := NewSkipList(18)
for i := 0; i < 20; i++ {
s.Insert(float64(now.UnixMicro()), now)
}
s.debugPrint()
}