!48 feat-storage-server-v2

Merge pull request !48 from bandl/fix-storage-structure
This commit is contained in:
bandl 2021-10-05 10:24:11 +00:00 committed by Gitee
commit 5292163b14
22 changed files with 472 additions and 619 deletions

9
pkg/errorx/dao.go Normal file
View File

@ -0,0 +1,9 @@
package errorx
func DaoTypeErr(typ string) error {
return New("the type is not: %s", typ)
}
func NotKeyErr(key string) error {
return New("the key is not exist, key:%s", key)
}

View File

@ -1,22 +1,32 @@
package lru
import "sync"
import (
"gitee.com/timedb/wheatCache/pkg/structure"
"sync"
)
type SingleWorkFunc func() interface{}
const (
OptionEventName = "operateEvent"
CleanEventName = "clearEvent"
CleanEventName = "clearEvent"
WorkFuncEventKey = "workFunc"
)
var (
lruCacheOnce sync.Once
lruCache *singleCache
lruCache *SingleCache
)
const (
lruMaxSize = 1*1024*1024*1024*8
lruClearSize = 0.5*1024*1024*1024*8
lruMaxSize = 1 * 1024 * 1024 * 1024 * 8
lruClearSize = 0.5 * 1024 * 1024 * 1024 * 8
lruEventDriver = 2000
)
type CacheInterface interface {
Del() error
Get(key string) (structure.KeyBaseInterface, bool)
Add(key string, val structure.KeyBaseInterface)
UpdateLruSize(length structure.UpdateLength)
}

View File

@ -16,64 +16,63 @@ type keyBaseValue struct {
val structure.KeyBaseInterface
}
type singleCache struct {
type SingleCache struct {
maxsize int64 //最大的长度
clearSize int64 // 清理长度
nowSize int64 // 现在的长度
li *list.List
lruMap map[string]*list.Element
lruDriver event.DriverInterface
lruConsumer event.ConsumerInterface
lruCleanProduce event.ProduceInterface // 发送清理事件
lruDriver event.DriverInterface
lruConsumer event.ConsumerInterface
lruCleanProduce event.ProduceInterface // 发送清理事件
}
// UpdateLruSize 更新现在的长度
func (lru *singleCache) UpdateLruSize(length int64) {
atomic.AddInt64(&lru.nowSize, length)
func (lru *SingleCache) UpdateLruSize(length structure.UpdateLength) {
atomic.AddInt64(&lru.nowSize, int64(length))
}
func cacheInit() (int64, int64, event.DriverInterface) {
func cacheInit() (int64, int64, int) {
maxSize := viper.GetString("lruCache.maxSize")
retMaxSize, maxErr:= util.ParseSizeToBit(maxSize)
if maxErr != nil{
return 0, 0, nil
retMaxSize, maxErr := util.ParseSizeToBit(maxSize)
if maxErr != nil {
return 0, 0, 0
}
if retMaxSize == 0{
if retMaxSize == 0 {
retMaxSize = lruMaxSize
}
clearSize := viper.GetString("lruCache.clearSize")
retClearSize, clearErr := util.ParseSizeToBit(clearSize)
if clearErr != nil{
return 0, 0, nil
if clearErr != nil {
return 0, 0, 0
}
if retClearSize == 0{
if retClearSize == 0 {
retClearSize = lruClearSize
}
maxDriver := viper.GetInt("lruCache.eventDriverSize")
if maxDriver == 0{
if maxDriver == 0 {
maxDriver = lruEventDriver
}
lruDriver := event.NewDriver(maxDriver)
return retMaxSize, retClearSize, lruDriver
return retMaxSize, retClearSize, maxDriver
}
// NewLRUCache lru初始化
func NewLRUCache() *singleCache {
maxSize, clearSize, lruDrivers := cacheInit()
func NewLRUCache() *SingleCache {
maxSize, clearSize, maxDriverSize := cacheInit()
lruDriver := event.NewDriver(maxDriverSize)
lruCacheOnce.Do(func() {
_, _, lruDriver := cacheInit()
lru := &singleCache{
maxsize: maxSize,
clearSize: clearSize,
nowSize: 0,
li: list.New(),
lruMap: make(map[string]*list.Element),
lruDriver: lruDriver,
lruConsumer: event.NewConsumer(lruDrivers),
lruCleanProduce: event.NewProduce(lruDrivers),
lru := &SingleCache{
maxsize: maxSize,
clearSize: clearSize,
nowSize: 0,
li: list.New(),
lruMap: make(map[string]*list.Element),
lruDriver: lruDriver,
lruConsumer: event.NewConsumer(lruDriver),
lruCleanProduce: event.NewProduce(lruDriver),
}
lruCache = lru
go lru.lruSingleWork()
@ -82,12 +81,12 @@ func NewLRUCache() *singleCache {
}
// GetDriver 获取驱动
func (lru *singleCache) GetDriver() event.DriverInterface {
func (lru *SingleCache) GetDriver() event.DriverInterface {
return lru.lruDriver
}
//Add 增加
func (lru *singleCache) Add(key string, val structure.KeyBaseInterface) {
func (lru *SingleCache) Add(key string, val structure.KeyBaseInterface) {
keyBaseVal := &keyBaseValue{
key: key,
@ -101,11 +100,11 @@ func (lru *singleCache) Add(key string, val structure.KeyBaseInterface) {
valEl := lru.li.PushFront(keyBaseVal)
lru.lruMap[key] = valEl
//增加大小
lru.UpdateLruSize(valEl.Value.(*keyBaseValue).val.SizeByte())
lru.UpdateLruSize(structure.UpdateLength(valEl.Value.(*keyBaseValue).val.SizeByte()))
}
// Get 查找key对应的value
func (lru *singleCache) Get(key string) (structure.KeyBaseInterface, bool) {
func (lru *SingleCache) Get(key string) (structure.KeyBaseInterface, bool) {
if lru.lruMap == nil {
return nil, false
@ -118,14 +117,14 @@ func (lru *singleCache) Get(key string) (structure.KeyBaseInterface, bool) {
}
//Del 删除机制
func (lru *singleCache) Del() error {
func (lru *SingleCache) Del() error {
if lru.lruMap == nil {
return errorx.New("lru is nil")
}
data := lru.li.Back()
delete(lru.lruMap, data.Value.(*keyBaseValue).key)
//删除大小
lru.UpdateLruSize(-1 * data.Value.(*keyBaseValue).val.SizeByte())
lru.UpdateLruSize(structure.UpdateLength(-1 * data.Value.(*keyBaseValue).val.SizeByte()))
lru.li.Remove(data)
return nil
}

View File

@ -2,10 +2,7 @@ package lru
import (
"context"
"gitee.com/timedb/wheatCache/pkg/errorx"
"gitee.com/timedb/wheatCache/pkg/event"
"gitee.com/timedb/wheatCache/pkg/proto"
"gitee.com/timedb/wheatCache/pkg/structure"
"gitee.com/timedb/wheatCache/pkg/structure/stringx"
"github.com/stretchr/testify/require"
"testing"
@ -20,66 +17,13 @@ func TestWorker(t *testing.T) {
workEvent.SetValue(WorkFuncEventKey, event.EventWorkFunc(func() (interface{}, error) {
v1 := stringx.NewStringSingle()
key := "v1"
res, err := v1.Set(&proto.SetRequest{
Val: "123",
})
if err != nil {
return nil, err
}
res, _ := v1.Set("123")
lru.Add(key, v1)
return res.Result, nil
return res, nil
}))
workEvent.InitWaitEvent()
produce.Call(ctx,workEvent)
produce.Call(ctx, workEvent)
res, err := workEvent.StartWaitEvent(2 * time.Second)
require.NoError(t, err)
require.Equal(t, res, "123")
workEvent.InitWaitEvent()
workEvent.SetValue(WorkFuncEventKey, event.EventWorkFunc(func() (interface{}, error) {
v2, ok := lru.Get("v1")
if !ok{
return nil, errorx.New("no this key")
}
switch v2.(type) {
case structure.StringXInterface:
res, err := v2.(structure.StringXInterface).Get(&proto.GetRequest{
})
if err != nil {
return nil, err
}
return res.Result, nil
default:
return nil, errorx.New("no this type")
}
}))
produce.Call(ctx, workEvent)
res, err = workEvent.StartWaitEvent(2 * time.Second)
require.NoError(t, err)
require.Equal(t, res, "123")
workEvent.InitWaitEvent()
workEvent.SetValue(WorkFuncEventKey, event.EventWorkFunc(func() (interface{}, error) {
lru.Del()
v2, ok := lru.Get("v1")
if !ok{
return nil, nil
}
switch v2.(type) {
case structure.StringXInterface:
res, err := v2.(structure.StringXInterface).Get(&proto.GetRequest{
})
if err != nil {
return nil, err
}
return res.Result, nil
default:
return nil, errorx.New("no this type")
}
}))
produce.Call(ctx, workEvent)
res, err = workEvent.StartWaitEvent(2 * time.Second)
require.Equal(t, err, nil)
require.Equal(t, res, nil)
}

View File

@ -6,11 +6,7 @@ import (
"log"
)
func lruCleanWork() {
}
func (lru *singleCache) lruSingleWork() interface{} {
func (lru *SingleCache) lruSingleWork() interface{} {
ctx := context.Background()
for {
workEvent := lru.lruConsumer.Receive(ctx)

View File

@ -5,6 +5,7 @@ package structure
const (
DEFAULT_KEY = iota
STRING_X
)

View File

@ -6,6 +6,8 @@ const (
type DynamicType int8
type UpdateLength int64
const (
DynamicNull = DynamicType(iota)
DynamicInt

View File

@ -1,74 +0,0 @@
// Code generated by gen-struct. DO NOT EDIT.
// make gen-struct generated
package server
import (
context "context"
"gitee.com/timedb/wheatCache/pkg/errorx"
"gitee.com/timedb/wheatCache/pkg/event"
"gitee.com/timedb/wheatCache/pkg/proto"
"gitee.com/timedb/wheatCache/pkg/structure"
"time"
)
type serverSingle struct {
middleProduce event.ProduceInterface
lruProduce event.ProduceInterface
ttl time.Duration
}
func NewServer() proto.CommServerServer {
ser := &serverSingle{}
return ser
}
// TODO 移除
func mockLruValue() structure.KeyBaseInterface {
return stringx.NewStringSingle()
}
{% for opt in option %}
{% for fun in opt.option %}
func (s *serverSingle) {{ fun }} (
cxt context.Context,
req *proto.{{ fun }}Request,
) (*proto.{{ fun }}Response, error) {
lruEvent := event.NewEvent("lru event")
lruEvent.InitWaitEvent()
work := event.EventWorkFunc(func() (interface{}, error) {
value := mockLruValue() // TODO 替换为从 lru 获取
switch value.(type) {
case structure.{{ opt.key }}Interface:
resp, err := value.(structure.{{ opt.key }}Interface).{{ fun }}(req)
if err != nil {
return nil, err
}
return resp, nil
default:
return nil, errorx.New("value err")
}
})
lruEvent.SetValue("lru work", work)
s.lruProduce.Call(cxt, lruEvent)
resp, err := lruEvent.StartWaitEvent(s.ttl)
if err != nil {
return nil, err
}
switch resp.(type) {
case *proto.{{ fun }}Response:
default:
return nil, errorx.New("value err")
}
return resp.(*proto.{{ fun }}Response), nil
}
{% endfor %}
{% endfor %}

View File

@ -1,10 +1,5 @@
// Code generated by gen-struct. DO NOT EDIT.
// make gen-struct generated
package structure
import "gitee.com/timedb/wheatCache/pkg/proto"
type KeyBaseInterface interface {
SizeByte() int64
@ -20,10 +15,10 @@ type KeyBaseInterface interface {
type StringXInterface interface {
KeyBaseInterface
Set(*proto.SetRequest) (*proto.SetResponse, error)
Get(*proto.GetRequest) (*proto.GetResponse, error)
Add(*proto.AddRequest) (*proto.AddResponse, error)
Reduce(*proto.ReduceRequest) (*proto.ReduceResponse, error)
Setbit(*proto.SetbitRequest) (*proto.SetbitResponse, error)
Getbit(*proto.GetbitRequest) (*proto.GetbitResponse, error)
Set(val string) (string, UpdateLength)
Get() string
Add(renewal int32) (string, error)
Reduce(renewal int32) (string, error)
Setbit(offer int32, val bool) UpdateLength
Getbit(offer int32) (bool, error)
}

View File

@ -2,7 +2,6 @@ package stringx
import (
"gitee.com/timedb/wheatCache/pkg/errorx"
"gitee.com/timedb/wheatCache/pkg/proto"
"gitee.com/timedb/wheatCache/pkg/structure"
"strconv"
)
@ -40,90 +39,69 @@ func (s *StringSingle) Encode() ([]byte, error) {
panic("not implemented") // TODO: Implement
}
func (s *StringSingle) Set(req *proto.SetRequest) (*proto.SetResponse, error) {
func (s *StringSingle) Set(val string) (string, structure.UpdateLength) {
length := s.val.ChangeValueLength(func() {
s.val.InferValue(req.Val)
s.val.InferValue(val)
})
return &proto.SetResponse{
Result: req.Val,
UpdateSize: length,
}, nil
return val, length
}
func (s *StringSingle) Get(req *proto.GetRequest) (*proto.GetResponse, error) {
return &proto.GetResponse{
Result: s.val.ToString(),
UpdateSize: 0,
}, nil
func (s *StringSingle) Get() string {
return s.val.ToString()
}
func updateValueNotString(s *StringSingle, val int32) (string, int64, error) {
func updateValueNotString(s *StringSingle, val int32) (string, error) {
switch s.val.GetDynamicType() {
case structure.DynamicNull:
length := s.val.ChangeValueLength(func() {
s.val.SetInt(int64(val))
})
return strconv.Itoa(int(val)), length, nil
s.val.SetInt(int64(val))
return strconv.Itoa(int(val)), nil
case structure.DynamicFloat:
f, err := s.val.ToFloat64()
if err != nil {
return "", 0, err
return "", err
}
s.val.SetFloat64(f + float64(val))
return strconv.FormatFloat(f+1, 'f', 2, 64), 0, nil
return strconv.FormatFloat(f+1, 'f', 2, 64), nil
case structure.DynamicInt:
i, err := s.val.ToInt()
if err != nil {
return "", 0, err
return "", err
}
s.val.SetInt(int64(val) + i)
return strconv.Itoa(int(i + int64(val))), 0, nil
return strconv.Itoa(int(i + int64(val))), nil
default:
return "", 0, errorx.New("string cannot perform add operations")
return "", errorx.New("string cannot perform add operations")
}
}
func (s *StringSingle) Add(req *proto.AddRequest) (*proto.AddResponse, error) {
result, length, err := updateValueNotString(s, req.Renewal)
func (s *StringSingle) Add(renewal int32) (string, error) {
result, err := updateValueNotString(s, renewal)
if err != nil {
return nil, err
return "", err
}
return &proto.AddResponse{
UpdateSize: length,
Result: result,
}, nil
return result, nil
}
func (s *StringSingle) Reduce(req *proto.ReduceRequest) (*proto.ReduceResponse, error) {
result, length, err := updateValueNotString(s, req.Renewal*-1)
func (s *StringSingle) Reduce(renewal int32) (string, error) {
result, err := updateValueNotString(s, -1*renewal)
if err != nil {
return nil, err
return "", err
}
return &proto.ReduceResponse{
UpdateSize: length,
Result: result,
}, nil
return result, nil
}
func (s *StringSingle) Setbit(req *proto.SetbitRequest) (*proto.SetbitResponse, error) {
func (s *StringSingle) Setbit(offer int32, val bool) structure.UpdateLength {
length := s.val.ChangeValueLength(func() {
s.val.SetByte(int(req.Offer), req.Val)
s.val.SetByte(int(offer), val)
})
return &proto.SetbitResponse{
UpdateSize: length,
}, nil
return length
}
func (s *StringSingle) Getbit(req *proto.GetbitRequest) (*proto.GetbitResponse, error) {
b, err := s.val.GetByte(int(req.Offer))
func (s *StringSingle) Getbit(offer int32) (bool, error) {
b, err := s.val.GetByte(int(offer))
if err != nil {
return nil, err
return false, err
}
return &proto.GetbitResponse{
Val: b,
}, nil
return b, err
}

View File

@ -1,140 +1,82 @@
package stringx
import (
"gitee.com/timedb/wheatCache/pkg/proto"
"gitee.com/timedb/wheatCache/pkg/structure"
"github.com/stretchr/testify/require"
"testing"
)
func TestStringSingle_Set(t *testing.T) {
s := NewStringSingle()
resp, err := s.Set(&proto.SetRequest{
Val: "189",
})
require.NoError(t, err)
require.Equal(t, resp.Result, "189")
i, err := s.Get(&proto.GetRequest{})
require.NoError(t, err)
require.Equal(t, i.Result, "189")
s.Set(&proto.SetRequest{
Val: "1.25",
})
i, err = s.Get(&proto.GetRequest{})
require.Equal(t, i.Result, "1.25")
resp, length := s.Set("189")
require.Equal(t, resp, "189")
require.Equal(t, length, structure.UpdateLength(0))
i := s.Get()
require.Equal(t, i, "189")
s.Set(&proto.SetRequest{
Val: "awdgiugaiuwdhoawd",
})
i, err = s.Get(&proto.GetRequest{})
require.Equal(t, i.Result, "awdgiugaiuwdhoawd")
resp, length = s.Set("189.12")
require.Equal(t, resp, "189.12")
require.Equal(t, length, structure.UpdateLength(0))
i = s.Get()
require.Equal(t, i, "189.12")
resp, length = s.Set("awdawd")
require.Equal(t, resp, "awdawd")
require.Equal(t, length, structure.UpdateLength(-2))
i = s.Get()
require.Equal(t, i, "awdawd")
}
func TestStringSingle_Add(t *testing.T) {
s := NewStringSingle()
_, err := s.Set(&proto.SetRequest{
Val: "135",
})
require.NoError(t, err)
s.Add(&proto.AddRequest{
Renewal: 9,
})
resp, err := s.Get(&proto.GetRequest{})
require.NoError(t, err)
require.Equal(t, resp.Result, "144")
s.Set("189")
s.Add(1)
i := s.Get()
require.Equal(t, i, "190")
// float
_, err = s.Set(&proto.SetRequest{
Val: "135.33",
})
require.NoError(t, err)
s.Add(&proto.AddRequest{
Renewal: 3,
})
resp, err = s.Get(&proto.GetRequest{})
require.NoError(t, err)
require.Equal(t, resp.Result, "138.33")
s.Set("189.2")
s.Add(1)
i = s.Get()
require.Equal(t, i, "190.20")
// string
_, err = s.Set(&proto.SetRequest{
Val: "awjdwad",
})
require.NoError(t, err)
_, err = s.Add(&proto.AddRequest{
Renewal: 3,
})
s.Set("wad")
_, err := s.Add(1)
require.Error(t, err)
resp, err = s.Get(&proto.GetRequest{})
require.NoError(t, err)
require.Equal(t, resp.Result, "awjdwad")
}
func TestStringSingle_Reduce(t *testing.T) {
s := NewStringSingle()
_, err := s.Set(&proto.SetRequest{
Val: "135",
})
require.NoError(t, err)
s.Reduce(&proto.ReduceRequest{
Renewal: 5,
})
resp, err := s.Get(&proto.GetRequest{})
require.NoError(t, err)
require.Equal(t, resp.Result, "130")
s.Set("189")
s.Reduce(1)
i := s.Get()
require.Equal(t, i, "188")
// float
_, err = s.Set(&proto.SetRequest{
Val: "135.33",
})
require.NoError(t, err)
s.Reduce(&proto.ReduceRequest{
Renewal: 3,
})
resp, err = s.Get(&proto.GetRequest{})
require.NoError(t, err)
require.Equal(t, resp.Result, "132.33")
s.Set("189.2")
s.Reduce(1)
i = s.Get()
require.Equal(t, i, "188.20")
// string
_, err = s.Set(&proto.SetRequest{
Val: "awjdwad",
})
require.NoError(t, err)
_, err = s.Reduce(&proto.ReduceRequest{
Renewal: 3,
})
s.Set("wad")
_, err := s.Reduce(1)
require.Error(t, err)
resp, err = s.Get(&proto.GetRequest{})
require.NoError(t, err)
require.Equal(t, resp.Result, "awjdwad")
}
func TestStringSingle_Getbit(t *testing.T) {
s := NewStringSingle()
_, err := s.Setbit(&proto.SetbitRequest{
Offer: 1009,
Val: true,
})
require.NoError(t, err)
length := s.Setbit(1009, true)
require.Equal(t, length, structure.UpdateLength(1002))
res, err := s.Getbit(&proto.GetbitRequest{
Offer: 1009,
})
res, err := s.Getbit(1009)
require.NoError(t, err)
require.Equal(t, res.Val, true)
res, err = s.Getbit(&proto.GetbitRequest{
Offer: 1008,
})
require.Equal(t, res, true)
res, err = s.Getbit(1008)
require.NoError(t, err)
require.Equal(t, res.Val, false)
require.Equal(t, res, false)
_, err = s.Setbit(&proto.SetbitRequest{
Offer: 1009,
Val: false,
})
res, err = s.Getbit(&proto.GetbitRequest{
Offer: 1009,
})
length = s.Setbit(1009, false)
require.Equal(t, length, structure.UpdateLength(0))
res, err = s.Getbit(1009)
require.NoError(t, err)
require.Equal(t, res.Val, false)
require.Equal(t, res, false)
}

View File

@ -128,10 +128,10 @@ func (v *Value) InferValue(str string) {
}
// ChangeValueLength 根据类型推断 change 的大小, 只用于 Set 操作不发生错误
func (v *Value) ChangeValueLength(f func()) int64 {
func (v *Value) ChangeValueLength(f func()) UpdateLength {
startLen := v.GetLength()
f()
return int64(v.GetLength() - startLen)
return UpdateLength(v.GetLength() - startLen)
}
func (v *Value) SetByte(offset int, val bool) {

View File

@ -8,7 +8,6 @@ message SetRequest {
}
message SetResponse {
int64 update_size = 1;
string result = 2;
}
@ -17,7 +16,6 @@ message GetRequest {
}
message GetResponse {
int64 update_size = 1;
string result = 2;
}
@ -27,7 +25,6 @@ message AddRequest {
}
message AddResponse {
int64 update_size = 1;
string result = 2;
}
@ -37,7 +34,6 @@ message ReduceRequest {
}
message ReduceResponse {
int64 update_size = 1;
string result = 2;
}
@ -48,7 +44,6 @@ message SetbitRequest {
}
message SetbitResponse {
int64 update_size = 1;
}
message GetbitRequest {
@ -57,6 +52,5 @@ message GetbitRequest {
}
message GetbitResponse {
int64 update_size = 1;
bool val = 2;
}

View File

@ -40,7 +40,7 @@ def mk_structure(cfg_camel):
file.write('syntax = "proto3";\nimport "base.proto";\noption go_package = "pkg/proto";\n')
for v in value:
file.write('\nmessage ' + v + 'Request ' + '{\n BaseKey key = 1;\n}\n')
file.write('\nmessage ' + v + 'Response ' + '{\n int64 update_size = 1;\n}\n')
file.write('\nmessage ' + v + 'Response ' + '{\n}\n')
file.close()
else: # 如果这个文件存在
@ -58,7 +58,7 @@ def mk_structure(cfg_camel):
if flag == 0:
file = open(proto_path, 'a')
file.write('\nmessage ' + v + 'Request ' + '{\n BaseKey key = 1;\n}\n')
file.write("\nmessage %sResponse {\n int64 update_size = 1;\n}\n" % v)
file.write("\nmessage %sResponse {\n}\n" % v)
file.close()
print(f"{key}.proto", "-> success")

View File

@ -113,7 +113,7 @@ def format_code_go():
if __name__ == "__main__":
conf, cfg_camel = load_conf()
set_structure_const_template(conf)
set_structure_interface(cfg_camel)
set_storage_server(cfg_camel)
# set_structure_interface(cfg_camel)
# set_storage_server(cfg_camel)
# 格式化代码
format_code_go()

View File

@ -3,6 +3,7 @@ package cmd
import (
"fmt"
"gitee.com/timedb/wheatCache/storage/server/single"
"log"
"net"
"os"
@ -11,7 +12,6 @@ import (
_ "gitee.com/timedb/wheatCache/conf"
"gitee.com/timedb/wheatCache/pkg/proto"
"gitee.com/timedb/wheatCache/storage/server"
"github.com/spf13/cobra"
"github.com/spf13/viper"
"google.golang.org/grpc"
@ -24,7 +24,7 @@ var rootCmd = &cobra.Command{
Short: "storage",
Long: `start storage server`,
Run: func(cmd *cobra.Command, args []string) {
storageServer := server.NewServer()
storageServer := single.NewServer()
// 先写死, 等配置文件
conf := viper.GetStringMap("storage")
host := conf["host"].(string)

132
storage/dao/dao.go Normal file
View File

@ -0,0 +1,132 @@
package dao
import (
"gitee.com/timedb/wheatCache/pkg/errorx"
"gitee.com/timedb/wheatCache/pkg/lru"
"gitee.com/timedb/wheatCache/pkg/structure"
"gitee.com/timedb/wheatCache/pkg/structure/stringx"
)
type Dao struct {
lru lru.CacheInterface
}
func NewDao(lru lru.CacheInterface) *Dao {
return &Dao{
lru: lru,
}
}
func (d *Dao) Set(key string, strVal string) (string, error) {
value, ok := d.lru.Get(key)
if ok {
if val, ok := value.(structure.StringXInterface); ok {
res, length := val.Set(strVal)
d.lru.UpdateLruSize(length)
return res, nil
} else {
return "", errorx.New("the key:%s is not stringx type", key)
}
}
// 不存在新建
strValue := stringx.NewStringSingle()
result, length := strValue.Set(strVal)
d.lru.Add(key, strValue)
d.lru.UpdateLruSize(length)
return result, nil
}
func (d *Dao) Get(key string) (string, error) {
val, ok := d.lru.Get(key)
if !ok {
return "", errorx.NotKeyErr(key)
}
strVal, ok := val.(structure.StringXInterface)
if !ok {
return "", errorx.DaoTypeErr("stringx")
}
return strVal.Get(), nil
}
func (d *Dao) Add(key string, renewal int32) (string, error) {
value, lruOk := d.lru.Get(key)
if !lruOk {
val := stringx.NewStringSingle()
res, err := val.Add(renewal)
if err != nil {
return "", nil
}
d.lru.Add(key, val)
return res, nil
}
strVal, ok := value.(structure.StringXInterface)
if !ok {
return "", errorx.DaoTypeErr("stringx")
}
res, err := strVal.Add(renewal)
if err != nil {
return "", err
}
return res, nil
}
func (d *Dao) Reduce(key string, renewal int32) (string, error) {
value, lruOk := d.lru.Get(key)
if !lruOk {
val := stringx.NewStringSingle()
res, err := val.Reduce(renewal)
if err != nil {
return "", nil
}
d.lru.Add(key, val)
return res, nil
}
strVal, ok := value.(structure.StringXInterface)
if !ok {
return "", errorx.DaoTypeErr("stringx")
}
res, err := strVal.Reduce(renewal)
if err != nil {
return "", err
}
return res, nil
}
func (d *Dao) Setbit(key string, val bool, offer int32) error {
value, lruOk := d.lru.Get(key)
if !lruOk {
valStr := stringx.NewStringSingle()
length := valStr.Setbit(offer, val)
d.lru.UpdateLruSize(length)
d.lru.Add(key, valStr)
return nil
}
strVal, ok := value.(structure.StringXInterface)
if !ok {
return errorx.DaoTypeErr("stringx")
}
length := strVal.Setbit(offer, val)
d.lru.UpdateLruSize(length)
return nil
}
func (d *Dao) GetBit(key string, offer int32) (bool, error) {
value, lruOk := d.lru.Get(key)
if !lruOk {
return false, errorx.NotKeyErr(key)
}
strVal, ok := value.(structure.StringXInterface)
if !ok {
return false, errorx.DaoTypeErr("stringx")
}
return strVal.Getbit(offer)
}

1
storage/dao/define.go Normal file
View File

@ -0,0 +1 @@
package dao

View File

@ -1,260 +0,0 @@
// Code generated by gen-struct. DO NOT EDIT.
// make gen-struct generated
package server
import (
context "context"
"gitee.com/timedb/wheatCache/pkg/errorx"
"gitee.com/timedb/wheatCache/pkg/event"
"gitee.com/timedb/wheatCache/pkg/proto"
"gitee.com/timedb/wheatCache/pkg/structure"
"gitee.com/timedb/wheatCache/pkg/structure/stringx"
"time"
)
type serverSingle struct {
middleProduce event.ProduceInterface
lruProduce event.ProduceInterface
ttl time.Duration
}
func NewServer() proto.CommServerServer {
ser := &serverSingle{}
return ser
}
// TODO 移除
func mockLruValue() structure.KeyBaseInterface {
return stringx.NewStringSingle()
}
func (s *serverSingle) Set(
cxt context.Context,
req *proto.SetRequest,
) (*proto.SetResponse, error) {
lruEvent := event.NewEvent("lru event")
lruEvent.InitWaitEvent()
work := event.EventWorkFunc(func() (interface{}, error) {
value := mockLruValue() // TODO 替换为从 lru 获取
switch value.(type) {
case structure.StringXInterface:
resp, err := value.(structure.StringXInterface).Set(req)
if err != nil {
return nil, err
}
return resp, nil
default:
return nil, errorx.New("value err")
}
})
lruEvent.SetValue("lru work", work)
s.lruProduce.Call(cxt, lruEvent)
resp, err := lruEvent.StartWaitEvent(s.ttl)
if err != nil {
return nil, err
}
switch resp.(type) {
case *proto.SetResponse:
default:
return nil, errorx.New("value err")
}
return resp.(*proto.SetResponse), nil
}
func (s *serverSingle) Get(
cxt context.Context,
req *proto.GetRequest,
) (*proto.GetResponse, error) {
lruEvent := event.NewEvent("lru event")
lruEvent.InitWaitEvent()
work := event.EventWorkFunc(func() (interface{}, error) {
value := mockLruValue() // TODO 替换为从 lru 获取
switch value.(type) {
case structure.StringXInterface:
resp, err := value.(structure.StringXInterface).Get(req)
if err != nil {
return nil, err
}
return resp, nil
default:
return nil, errorx.New("value err")
}
})
lruEvent.SetValue("lru work", work)
s.lruProduce.Call(cxt, lruEvent)
resp, err := lruEvent.StartWaitEvent(s.ttl)
if err != nil {
return nil, err
}
switch resp.(type) {
case *proto.GetResponse:
default:
return nil, errorx.New("value err")
}
return resp.(*proto.GetResponse), nil
}
func (s *serverSingle) Add(
cxt context.Context,
req *proto.AddRequest,
) (*proto.AddResponse, error) {
lruEvent := event.NewEvent("lru event")
lruEvent.InitWaitEvent()
work := event.EventWorkFunc(func() (interface{}, error) {
value := mockLruValue() // TODO 替换为从 lru 获取
switch value.(type) {
case structure.StringXInterface:
resp, err := value.(structure.StringXInterface).Add(req)
if err != nil {
return nil, err
}
return resp, nil
default:
return nil, errorx.New("value err")
}
})
lruEvent.SetValue("lru work", work)
s.lruProduce.Call(cxt, lruEvent)
resp, err := lruEvent.StartWaitEvent(s.ttl)
if err != nil {
return nil, err
}
switch resp.(type) {
case *proto.AddResponse:
default:
return nil, errorx.New("value err")
}
return resp.(*proto.AddResponse), nil
}
func (s *serverSingle) Reduce(
cxt context.Context,
req *proto.ReduceRequest,
) (*proto.ReduceResponse, error) {
lruEvent := event.NewEvent("lru event")
lruEvent.InitWaitEvent()
work := event.EventWorkFunc(func() (interface{}, error) {
value := mockLruValue() // TODO 替换为从 lru 获取
switch value.(type) {
case structure.StringXInterface:
resp, err := value.(structure.StringXInterface).Reduce(req)
if err != nil {
return nil, err
}
return resp, nil
default:
return nil, errorx.New("value err")
}
})
lruEvent.SetValue("lru work", work)
s.lruProduce.Call(cxt, lruEvent)
resp, err := lruEvent.StartWaitEvent(s.ttl)
if err != nil {
return nil, err
}
switch resp.(type) {
case *proto.ReduceResponse:
default:
return nil, errorx.New("value err")
}
return resp.(*proto.ReduceResponse), nil
}
func (s *serverSingle) Setbit(
cxt context.Context,
req *proto.SetbitRequest,
) (*proto.SetbitResponse, error) {
lruEvent := event.NewEvent("lru event")
lruEvent.InitWaitEvent()
work := event.EventWorkFunc(func() (interface{}, error) {
value := mockLruValue() // TODO 替换为从 lru 获取
switch value.(type) {
case structure.StringXInterface:
resp, err := value.(structure.StringXInterface).Setbit(req)
if err != nil {
return nil, err
}
return resp, nil
default:
return nil, errorx.New("value err")
}
})
lruEvent.SetValue("lru work", work)
s.lruProduce.Call(cxt, lruEvent)
resp, err := lruEvent.StartWaitEvent(s.ttl)
if err != nil {
return nil, err
}
switch resp.(type) {
case *proto.SetbitResponse:
default:
return nil, errorx.New("value err")
}
return resp.(*proto.SetbitResponse), nil
}
func (s *serverSingle) Getbit(
cxt context.Context,
req *proto.GetbitRequest,
) (*proto.GetbitResponse, error) {
lruEvent := event.NewEvent("lru event")
lruEvent.InitWaitEvent()
work := event.EventWorkFunc(func() (interface{}, error) {
value := mockLruValue() // TODO 替换为从 lru 获取
switch value.(type) {
case structure.StringXInterface:
resp, err := value.(structure.StringXInterface).Getbit(req)
if err != nil {
return nil, err
}
return resp, nil
default:
return nil, errorx.New("value err")
}
})
lruEvent.SetValue("lru work", work)
s.lruProduce.Call(cxt, lruEvent)
resp, err := lruEvent.StartWaitEvent(s.ttl)
if err != nil {
return nil, err
}
switch resp.(type) {
case *proto.GetbitResponse:
default:
return nil, errorx.New("value err")
}
return resp.(*proto.GetbitResponse), nil
}

View File

@ -0,0 +1,12 @@
package single
import (
"sync"
)
var (
oneSingleServer sync.Once
sysSingleServer *serverSingle
)
const timeOutDefault = 2

View File

@ -0,0 +1,38 @@
package single
import (
"gitee.com/timedb/wheatCache/pkg/event"
"gitee.com/timedb/wheatCache/pkg/lru"
"gitee.com/timedb/wheatCache/storage/dao"
"github.com/spf13/viper"
"time"
)
type serverSingle struct {
middleProduce event.ProduceInterface
lruProduce event.ProduceInterface
timeOut time.Duration
lruCache *lru.SingleCache
dao *dao.Dao
}
func NewServer() *serverSingle {
oneSingleServer.Do(func() {
timeOut := viper.GetInt("storage.timeOut")
if timeOut == 0 {
timeOut = timeOutDefault
}
lruCache := lru.NewLRUCache()
ser := &serverSingle{
lruCache: lruCache,
lruProduce: event.NewProduce(lruCache.GetDriver()),
timeOut: time.Duration(timeOut),
dao: dao.NewDao(lruCache),
}
sysSingleServer = ser
})
return sysSingleServer
}

View File

@ -0,0 +1,134 @@
package single
import (
context "context"
"gitee.com/timedb/wheatCache/pkg/event"
"gitee.com/timedb/wheatCache/pkg/lru"
"gitee.com/timedb/wheatCache/pkg/proto"
)
func (s *serverSingle) Set(
cxt context.Context,
req *proto.SetRequest,
) (*proto.SetResponse, error) {
work := event.EventWorkFunc(func() (interface{}, error) {
return s.dao.Set(req.Key.Key, req.Val)
})
lruEvent := event.NewEvent(lru.OptionEventName)
lruEvent.InitWaitEvent()
lruEvent.SetValue(lru.WorkFuncEventKey, work)
s.lruProduce.Call(cxt, lruEvent)
resp, err := lruEvent.StartWaitEvent(s.timeOut)
if err != nil {
return nil, err
}
return &proto.SetResponse{
Result: resp.(string),
}, nil
}
func (s *serverSingle) Get(
cxt context.Context,
req *proto.GetRequest,
) (*proto.GetResponse, error) {
work := event.EventWorkFunc(func() (interface{}, error) {
return s.dao.Get(req.Key.Key)
})
lruEvent := event.NewEvent(lru.OptionEventName)
lruEvent.InitWaitEvent()
lruEvent.SetValue(lru.WorkFuncEventKey, work)
s.lruProduce.Call(cxt, lruEvent)
resp, err := lruEvent.StartWaitEvent(s.timeOut)
if err != nil {
return nil, err
}
return &proto.GetResponse{
Result: resp.(string),
}, nil
}
func (s serverSingle) Add(
cxt context.Context,
req *proto.AddRequest,
) (*proto.AddResponse, error) {
work := event.EventWorkFunc(func() (interface{}, error) {
return s.dao.Add(req.Key.Key, req.Renewal)
})
lruEvent := event.NewEvent(lru.OptionEventName)
lruEvent.InitWaitEvent()
lruEvent.SetValue(lru.WorkFuncEventKey, work)
s.lruProduce.Call(cxt, lruEvent)
resp, err := lruEvent.StartWaitEvent(s.timeOut)
if err != nil {
return nil, err
}
return &proto.AddResponse{
Result: resp.(string),
}, nil
}
func (s *serverSingle) Reduce(
cxt context.Context,
req *proto.ReduceRequest,
) (*proto.ReduceResponse, error) {
work := event.EventWorkFunc(func() (interface{}, error) {
return s.dao.Add(req.Key.Key, req.Renewal)
})
lruEvent := event.NewEvent(lru.OptionEventName)
lruEvent.InitWaitEvent()
lruEvent.SetValue(lru.WorkFuncEventKey, work)
s.lruProduce.Call(cxt, lruEvent)
resp, err := lruEvent.StartWaitEvent(s.timeOut)
if err != nil {
return nil, err
}
return &proto.ReduceResponse{
Result: resp.(string),
}, nil
}
func (s *serverSingle) Setbit(
cxt context.Context,
req *proto.SetbitRequest,
) (*proto.SetbitResponse, error) {
work := event.EventWorkFunc(func() (interface{}, error) {
return nil, s.dao.Setbit(req.Key.Key, req.Val, req.Offer)
})
lruEvent := event.NewEvent(lru.OptionEventName)
lruEvent.InitWaitEvent()
lruEvent.SetValue(lru.WorkFuncEventKey, work)
s.lruProduce.Call(cxt, lruEvent)
_, err := lruEvent.StartWaitEvent(s.timeOut)
if err != nil {
return nil, err
}
return &proto.SetbitResponse{}, nil
}
func (s *serverSingle) Getbit(
cxt context.Context,
req *proto.GetbitRequest,
) (*proto.GetbitResponse, error) {
work := event.EventWorkFunc(func() (interface{}, error) {
return s.dao.GetBit(req.Key.Key, req.Offer)
})
lruEvent := event.NewEvent(lru.OptionEventName)
lruEvent.InitWaitEvent()
lruEvent.SetValue(lru.WorkFuncEventKey, work)
s.lruProduce.Call(cxt, lruEvent)
flag, err := lruEvent.StartWaitEvent(s.timeOut)
if err != nil {
return nil, err
}
return &proto.GetbitResponse{
Val: flag.(bool),
}, nil
}