From c381c573749ac7b8337e4791129c68537f780028 Mon Sep 17 00:00:00 2001 From: bandl <1658002533@qq.com> Date: Mon, 29 Nov 2021 00:16:04 +0800 Subject: [PATCH] feat(structure-channelx): add channelx --- pkg/structure/channelx/channelx.go | 59 ++++++++++++++++++++++++++++++ pkg/structure/define.go | 7 ++++ storage/dao/channelx.go | 33 +++++++++++++++++ 3 files changed, 99 insertions(+) create mode 100644 pkg/structure/channelx/channelx.go create mode 100644 storage/dao/channelx.go diff --git a/pkg/structure/channelx/channelx.go b/pkg/structure/channelx/channelx.go new file mode 100644 index 0000000..fb27ae9 --- /dev/null +++ b/pkg/structure/channelx/channelx.go @@ -0,0 +1,59 @@ +package channelx + +import ( + "sync/atomic" + + "gitee.com/wheat-os/wheatCache/pkg/structure" +) + +type ChannelX struct { + channel chan *structure.Value + sizeByte int64 +} + +func MakeChannelX(length int) *ChannelX { + return &ChannelX{ + channel: make(chan *structure.Value, length), + sizeByte: 0, + } +} + +func (c *ChannelX) SizeByte() int64 { + return c.sizeByte +} + +// RollBack TODO 事务相关, V2 实现 +func (c *ChannelX) RollBack() error { + panic("not implemented") // TODO: Implement +} + +// Begin 事务相关, V2 实现 +func (c *ChannelX) Begin() error { + panic("not implemented") // TODO: Implement +} + +// Comment 事务相关, V2 实现 +func (c *ChannelX) Comment() error { + panic("not implemented") // TODO: Implement +} + +func (c *ChannelX) Encode() ([]byte, error) { + panic("not implemented") // TODO: Implement +} + +func (c *ChannelX) Push(value string) structure.UpdateLength { + val := structure.NewValue(value) + up := val.GetSize() + c.channel <- val + atomic.AddInt64(&c.sizeByte, int64(up)) + return structure.UpdateLength(up) +} + +func (c *ChannelX) Pop() (string, structure.UpdateLength) { + val := <-c.channel + return val.ToString(), structure.UpdateLength(val.GetSize()) +} + +func (c *ChannelX) Length() int { + return len(c.channel) +} diff --git a/pkg/structure/define.go b/pkg/structure/define.go index e3078bd..b726f97 100644 --- a/pkg/structure/define.go +++ b/pkg/structure/define.go @@ -67,3 +67,10 @@ type HashXInterface interface { Length() int Range(consur, count int, regex string) []string } + +type ChannelXInterface interface { + KeyBaseInterface + Push(value string) UpdateLength + Pop() (string, UpdateLength) + Length() int +} diff --git a/storage/dao/channelx.go b/storage/dao/channelx.go new file mode 100644 index 0000000..c681092 --- /dev/null +++ b/storage/dao/channelx.go @@ -0,0 +1,33 @@ +package dao + +import ( + "gitee.com/wheat-os/wheatCache/pkg/errorx" + "gitee.com/wheat-os/wheatCache/pkg/proto" + "gitee.com/wheat-os/wheatCache/pkg/structure" +) + + + +func (d *Dao) CPush(key *proto.BaseKey, Value []string) (interface{}, error) { + val, ok := d.lru.Get(key) + if !ok { + return nil, errorx.NotKeyErr(key.Key) + } + chanVal, ok := val.() +} + +func (d *Dao) CPop(_ *proto.BaseKey, _ int32) (interface{}, error) { + panic("not implemented") // TODO: Implement +} + +func (d *Dao) CMake(key *proto.BaseKey, length int32) (*proto.CMakeResponse, error) { + panic("not implemented") // TODO: Implement +} + +func (d *Dao) CLen(_ *proto.BaseKey) (*proto.CLenResponse, error) { + panic("not implemented") // TODO: Implement +} + +func (d *Dao) CClean(_ *proto.BaseKey) (*proto.CCleanResponse, error) { + panic("not implemented") // TODO: Implement +}