diff --git a/pkg/errorx/dao.go b/pkg/errorx/dao.go new file mode 100644 index 0000000..1e28c49 --- /dev/null +++ b/pkg/errorx/dao.go @@ -0,0 +1,9 @@ +package errorx + +func DaoTypeErr(typ string) error { + return New("the type is not: %s", typ) +} + +func NotKeyErr(key string) error { + return New("the key is not exist, key:%s", key) +} diff --git a/pkg/lru/define.go b/pkg/lru/define.go index 84d56d1..98e82e2 100644 --- a/pkg/lru/define.go +++ b/pkg/lru/define.go @@ -1,22 +1,32 @@ package lru -import "sync" +import ( + "gitee.com/timedb/wheatCache/pkg/structure" + "sync" +) type SingleWorkFunc func() interface{} const ( OptionEventName = "operateEvent" - CleanEventName = "clearEvent" + CleanEventName = "clearEvent" WorkFuncEventKey = "workFunc" ) var ( lruCacheOnce sync.Once - lruCache *singleCache + lruCache *SingleCache ) const ( - lruMaxSize = 1*1024*1024*1024*8 - lruClearSize = 0.5*1024*1024*1024*8 + lruMaxSize = 1 * 1024 * 1024 * 1024 * 8 + lruClearSize = 0.5 * 1024 * 1024 * 1024 * 8 lruEventDriver = 2000 ) + +type CacheInterface interface { + Del() error + Get(key string) (structure.KeyBaseInterface, bool) + Add(key string, val structure.KeyBaseInterface) + UpdateLruSize(length structure.UpdateLength) +} \ No newline at end of file diff --git a/pkg/lru/lru.go b/pkg/lru/lru.go index 24522d2..40cc014 100644 --- a/pkg/lru/lru.go +++ b/pkg/lru/lru.go @@ -16,64 +16,63 @@ type keyBaseValue struct { val structure.KeyBaseInterface } -type singleCache struct { +type SingleCache struct { maxsize int64 //最大的长度 clearSize int64 // 清理长度 nowSize int64 // 现在的长度 li *list.List lruMap map[string]*list.Element - lruDriver event.DriverInterface - lruConsumer event.ConsumerInterface - lruCleanProduce event.ProduceInterface // 发送清理事件 + lruDriver event.DriverInterface + lruConsumer event.ConsumerInterface + lruCleanProduce event.ProduceInterface // 发送清理事件 } // UpdateLruSize 更新现在的长度 -func (lru *singleCache) UpdateLruSize(length int64) { - atomic.AddInt64(&lru.nowSize, length) +func (lru *SingleCache) UpdateLruSize(length structure.UpdateLength) { + atomic.AddInt64(&lru.nowSize, int64(length)) } -func cacheInit() (int64, int64, event.DriverInterface) { +func cacheInit() (int64, int64, int) { maxSize := viper.GetString("lruCache.maxSize") - retMaxSize, maxErr:= util.ParseSizeToBit(maxSize) - if maxErr != nil{ - return 0, 0, nil + retMaxSize, maxErr := util.ParseSizeToBit(maxSize) + if maxErr != nil { + return 0, 0, 0 } - if retMaxSize == 0{ + if retMaxSize == 0 { retMaxSize = lruMaxSize } clearSize := viper.GetString("lruCache.clearSize") retClearSize, clearErr := util.ParseSizeToBit(clearSize) - if clearErr != nil{ - return 0, 0, nil + if clearErr != nil { + return 0, 0, 0 } - if retClearSize == 0{ + if retClearSize == 0 { retClearSize = lruClearSize } maxDriver := viper.GetInt("lruCache.eventDriverSize") - if maxDriver == 0{ + if maxDriver == 0 { maxDriver = lruEventDriver } - lruDriver := event.NewDriver(maxDriver) - return retMaxSize, retClearSize, lruDriver + return retMaxSize, retClearSize, maxDriver } // NewLRUCache lru初始化 -func NewLRUCache() *singleCache { - maxSize, clearSize, lruDrivers := cacheInit() +func NewLRUCache() *SingleCache { + maxSize, clearSize, maxDriverSize := cacheInit() + lruDriver := event.NewDriver(maxDriverSize) lruCacheOnce.Do(func() { - _, _, lruDriver := cacheInit() - lru := &singleCache{ - maxsize: maxSize, - clearSize: clearSize, - nowSize: 0, - li: list.New(), - lruMap: make(map[string]*list.Element), - lruDriver: lruDriver, - lruConsumer: event.NewConsumer(lruDrivers), - lruCleanProduce: event.NewProduce(lruDrivers), + lru := &SingleCache{ + maxsize: maxSize, + clearSize: clearSize, + nowSize: 0, + li: list.New(), + lruMap: make(map[string]*list.Element), + lruDriver: lruDriver, + lruConsumer: event.NewConsumer(lruDriver), + lruCleanProduce: event.NewProduce(lruDriver), } lruCache = lru go lru.lruSingleWork() @@ -82,12 +81,12 @@ func NewLRUCache() *singleCache { } // GetDriver 获取驱动 -func (lru *singleCache) GetDriver() event.DriverInterface { +func (lru *SingleCache) GetDriver() event.DriverInterface { return lru.lruDriver } //Add 增加 -func (lru *singleCache) Add(key string, val structure.KeyBaseInterface) { +func (lru *SingleCache) Add(key string, val structure.KeyBaseInterface) { keyBaseVal := &keyBaseValue{ key: key, @@ -101,11 +100,11 @@ func (lru *singleCache) Add(key string, val structure.KeyBaseInterface) { valEl := lru.li.PushFront(keyBaseVal) lru.lruMap[key] = valEl //增加大小 - lru.UpdateLruSize(valEl.Value.(*keyBaseValue).val.SizeByte()) + lru.UpdateLruSize(structure.UpdateLength(valEl.Value.(*keyBaseValue).val.SizeByte())) } // Get 查找key对应的value -func (lru *singleCache) Get(key string) (structure.KeyBaseInterface, bool) { +func (lru *SingleCache) Get(key string) (structure.KeyBaseInterface, bool) { if lru.lruMap == nil { return nil, false @@ -118,14 +117,14 @@ func (lru *singleCache) Get(key string) (structure.KeyBaseInterface, bool) { } //Del 删除机制 -func (lru *singleCache) Del() error { +func (lru *SingleCache) Del() error { if lru.lruMap == nil { return errorx.New("lru is nil") } data := lru.li.Back() delete(lru.lruMap, data.Value.(*keyBaseValue).key) //删除大小 - lru.UpdateLruSize(-1 * data.Value.(*keyBaseValue).val.SizeByte()) + lru.UpdateLruSize(structure.UpdateLength(-1 * data.Value.(*keyBaseValue).val.SizeByte())) lru.li.Remove(data) return nil } diff --git a/pkg/lru/woker_test.go b/pkg/lru/woker_test.go index b278f20..ef1a757 100644 --- a/pkg/lru/woker_test.go +++ b/pkg/lru/woker_test.go @@ -2,10 +2,7 @@ package lru import ( "context" - "gitee.com/timedb/wheatCache/pkg/errorx" "gitee.com/timedb/wheatCache/pkg/event" - "gitee.com/timedb/wheatCache/pkg/proto" - "gitee.com/timedb/wheatCache/pkg/structure" "gitee.com/timedb/wheatCache/pkg/structure/stringx" "github.com/stretchr/testify/require" "testing" @@ -20,66 +17,13 @@ func TestWorker(t *testing.T) { workEvent.SetValue(WorkFuncEventKey, event.EventWorkFunc(func() (interface{}, error) { v1 := stringx.NewStringSingle() key := "v1" - res, err := v1.Set(&proto.SetRequest{ - Val: "123", - }) - if err != nil { - return nil, err - } + res, _ := v1.Set("123") lru.Add(key, v1) - return res.Result, nil + return res, nil })) workEvent.InitWaitEvent() - produce.Call(ctx,workEvent) + produce.Call(ctx, workEvent) res, err := workEvent.StartWaitEvent(2 * time.Second) require.NoError(t, err) require.Equal(t, res, "123") - - workEvent.InitWaitEvent() - workEvent.SetValue(WorkFuncEventKey, event.EventWorkFunc(func() (interface{}, error) { - v2, ok := lru.Get("v1") - if !ok{ - return nil, errorx.New("no this key") - } - switch v2.(type) { - case structure.StringXInterface: - res, err := v2.(structure.StringXInterface).Get(&proto.GetRequest{ - }) - if err != nil { - return nil, err - } - return res.Result, nil - default: - return nil, errorx.New("no this type") - } - })) - produce.Call(ctx, workEvent) - res, err = workEvent.StartWaitEvent(2 * time.Second) - require.NoError(t, err) - require.Equal(t, res, "123") - - workEvent.InitWaitEvent() - workEvent.SetValue(WorkFuncEventKey, event.EventWorkFunc(func() (interface{}, error) { - lru.Del() - v2, ok := lru.Get("v1") - if !ok{ - return nil, nil - } - switch v2.(type) { - case structure.StringXInterface: - res, err := v2.(structure.StringXInterface).Get(&proto.GetRequest{ - }) - if err != nil { - return nil, err - } - return res.Result, nil - default: - return nil, errorx.New("no this type") - } - })) - produce.Call(ctx, workEvent) - res, err = workEvent.StartWaitEvent(2 * time.Second) - require.Equal(t, err, nil) - require.Equal(t, res, nil) - } diff --git a/pkg/lru/worker.go b/pkg/lru/worker.go index 2da7e4e..99eb362 100644 --- a/pkg/lru/worker.go +++ b/pkg/lru/worker.go @@ -6,11 +6,7 @@ import ( "log" ) -func lruCleanWork() { - -} - -func (lru *singleCache) lruSingleWork() interface{} { +func (lru *SingleCache) lruSingleWork() interface{} { ctx := context.Background() for { workEvent := lru.lruConsumer.Receive(ctx) diff --git a/pkg/structure/const.gen.go b/pkg/structure/const.gen.go index fa802ac..dce796a 100644 --- a/pkg/structure/const.gen.go +++ b/pkg/structure/const.gen.go @@ -5,6 +5,7 @@ package structure const ( DEFAULT_KEY = iota + STRING_X ) diff --git a/pkg/structure/define.go b/pkg/structure/define.go index cce5904..2563d9c 100644 --- a/pkg/structure/define.go +++ b/pkg/structure/define.go @@ -6,6 +6,8 @@ const ( type DynamicType int8 +type UpdateLength int64 + const ( DynamicNull = DynamicType(iota) DynamicInt diff --git a/pkg/structure/generate/storage.template b/pkg/structure/generate/storage.template deleted file mode 100644 index 5426397..0000000 --- a/pkg/structure/generate/storage.template +++ /dev/null @@ -1,74 +0,0 @@ -// Code generated by gen-struct. DO NOT EDIT. -// make gen-struct generated - -package server - -import ( - context "context" - "gitee.com/timedb/wheatCache/pkg/errorx" - "gitee.com/timedb/wheatCache/pkg/event" - "gitee.com/timedb/wheatCache/pkg/proto" - "gitee.com/timedb/wheatCache/pkg/structure" - - "time" -) - -type serverSingle struct { - middleProduce event.ProduceInterface - lruProduce event.ProduceInterface - ttl time.Duration -} - -func NewServer() proto.CommServerServer { - ser := &serverSingle{} - return ser -} - -// TODO 移除 -func mockLruValue() structure.KeyBaseInterface { - return stringx.NewStringSingle() -} - -{% for opt in option %} - -{% for fun in opt.option %} -func (s *serverSingle) {{ fun }} ( - cxt context.Context, - req *proto.{{ fun }}Request, -) (*proto.{{ fun }}Response, error) { - lruEvent := event.NewEvent("lru event") - lruEvent.InitWaitEvent() - - work := event.EventWorkFunc(func() (interface{}, error) { - value := mockLruValue() // TODO 替换为从 lru 获取 - switch value.(type) { - case structure.{{ opt.key }}Interface: - resp, err := value.(structure.{{ opt.key }}Interface).{{ fun }}(req) - if err != nil { - return nil, err - } - return resp, nil - - default: - return nil, errorx.New("value err") - } - - }) - - lruEvent.SetValue("lru work", work) - s.lruProduce.Call(cxt, lruEvent) - resp, err := lruEvent.StartWaitEvent(s.ttl) - if err != nil { - return nil, err - } - - switch resp.(type) { - case *proto.{{ fun }}Response: - default: - return nil, errorx.New("value err") - } - return resp.(*proto.{{ fun }}Response), nil -} -{% endfor %} - -{% endfor %} diff --git a/pkg/structure/interface.gen.go b/pkg/structure/interface.gen.go index 6aa2437..6ed2c1e 100644 --- a/pkg/structure/interface.gen.go +++ b/pkg/structure/interface.gen.go @@ -1,10 +1,5 @@ -// Code generated by gen-struct. DO NOT EDIT. -// make gen-struct generated - package structure -import "gitee.com/timedb/wheatCache/pkg/proto" - type KeyBaseInterface interface { SizeByte() int64 @@ -20,10 +15,10 @@ type KeyBaseInterface interface { type StringXInterface interface { KeyBaseInterface - Set(*proto.SetRequest) (*proto.SetResponse, error) - Get(*proto.GetRequest) (*proto.GetResponse, error) - Add(*proto.AddRequest) (*proto.AddResponse, error) - Reduce(*proto.ReduceRequest) (*proto.ReduceResponse, error) - Setbit(*proto.SetbitRequest) (*proto.SetbitResponse, error) - Getbit(*proto.GetbitRequest) (*proto.GetbitResponse, error) + Set(val string) (string, UpdateLength) + Get() string + Add(renewal int32) (string, error) + Reduce(renewal int32) (string, error) + Setbit(offer int32, val bool) UpdateLength + Getbit(offer int32) (bool, error) } diff --git a/pkg/structure/stringx/string.go b/pkg/structure/stringx/string.go index 08f6850..6efe942 100644 --- a/pkg/structure/stringx/string.go +++ b/pkg/structure/stringx/string.go @@ -2,7 +2,6 @@ package stringx import ( "gitee.com/timedb/wheatCache/pkg/errorx" - "gitee.com/timedb/wheatCache/pkg/proto" "gitee.com/timedb/wheatCache/pkg/structure" "strconv" ) @@ -40,90 +39,69 @@ func (s *StringSingle) Encode() ([]byte, error) { panic("not implemented") // TODO: Implement } -func (s *StringSingle) Set(req *proto.SetRequest) (*proto.SetResponse, error) { +func (s *StringSingle) Set(val string) (string, structure.UpdateLength) { length := s.val.ChangeValueLength(func() { - s.val.InferValue(req.Val) + s.val.InferValue(val) }) - - return &proto.SetResponse{ - Result: req.Val, - UpdateSize: length, - }, nil + return val, length } -func (s *StringSingle) Get(req *proto.GetRequest) (*proto.GetResponse, error) { - return &proto.GetResponse{ - Result: s.val.ToString(), - UpdateSize: 0, - }, nil +func (s *StringSingle) Get() string { + return s.val.ToString() } -func updateValueNotString(s *StringSingle, val int32) (string, int64, error) { +func updateValueNotString(s *StringSingle, val int32) (string, error) { switch s.val.GetDynamicType() { case structure.DynamicNull: - length := s.val.ChangeValueLength(func() { - s.val.SetInt(int64(val)) - }) - return strconv.Itoa(int(val)), length, nil + s.val.SetInt(int64(val)) + return strconv.Itoa(int(val)), nil case structure.DynamicFloat: f, err := s.val.ToFloat64() if err != nil { - return "", 0, err + return "", err } s.val.SetFloat64(f + float64(val)) - return strconv.FormatFloat(f+1, 'f', 2, 64), 0, nil + return strconv.FormatFloat(f+1, 'f', 2, 64), nil case structure.DynamicInt: i, err := s.val.ToInt() if err != nil { - return "", 0, err + return "", err } s.val.SetInt(int64(val) + i) - return strconv.Itoa(int(i + int64(val))), 0, nil + return strconv.Itoa(int(i + int64(val))), nil default: - return "", 0, errorx.New("string cannot perform add operations") + return "", errorx.New("string cannot perform add operations") } } -func (s *StringSingle) Add(req *proto.AddRequest) (*proto.AddResponse, error) { - result, length, err := updateValueNotString(s, req.Renewal) +func (s *StringSingle) Add(renewal int32) (string, error) { + result, err := updateValueNotString(s, renewal) if err != nil { - return nil, err + return "", err } - - return &proto.AddResponse{ - UpdateSize: length, - Result: result, - }, nil + return result, nil } -func (s *StringSingle) Reduce(req *proto.ReduceRequest) (*proto.ReduceResponse, error) { - result, length, err := updateValueNotString(s, req.Renewal*-1) +func (s *StringSingle) Reduce(renewal int32) (string, error) { + result, err := updateValueNotString(s, -1*renewal) if err != nil { - return nil, err + return "", err } - - return &proto.ReduceResponse{ - UpdateSize: length, - Result: result, - }, nil + return result, nil } -func (s *StringSingle) Setbit(req *proto.SetbitRequest) (*proto.SetbitResponse, error) { +func (s *StringSingle) Setbit(offer int32, val bool) structure.UpdateLength { length := s.val.ChangeValueLength(func() { - s.val.SetByte(int(req.Offer), req.Val) + s.val.SetByte(int(offer), val) }) - return &proto.SetbitResponse{ - UpdateSize: length, - }, nil + return length } -func (s *StringSingle) Getbit(req *proto.GetbitRequest) (*proto.GetbitResponse, error) { - b, err := s.val.GetByte(int(req.Offer)) +func (s *StringSingle) Getbit(offer int32) (bool, error) { + b, err := s.val.GetByte(int(offer)) if err != nil { - return nil, err + return false, err } - return &proto.GetbitResponse{ - Val: b, - }, nil + return b, err } diff --git a/pkg/structure/stringx/string_test.go b/pkg/structure/stringx/string_test.go index 2ae2d85..978028b 100644 --- a/pkg/structure/stringx/string_test.go +++ b/pkg/structure/stringx/string_test.go @@ -1,140 +1,82 @@ package stringx import ( - "gitee.com/timedb/wheatCache/pkg/proto" + "gitee.com/timedb/wheatCache/pkg/structure" "github.com/stretchr/testify/require" "testing" ) func TestStringSingle_Set(t *testing.T) { s := NewStringSingle() - resp, err := s.Set(&proto.SetRequest{ - Val: "189", - }) - require.NoError(t, err) - require.Equal(t, resp.Result, "189") - i, err := s.Get(&proto.GetRequest{}) - require.NoError(t, err) - require.Equal(t, i.Result, "189") - s.Set(&proto.SetRequest{ - Val: "1.25", - }) - i, err = s.Get(&proto.GetRequest{}) - require.Equal(t, i.Result, "1.25") + resp, length := s.Set("189") + require.Equal(t, resp, "189") + require.Equal(t, length, structure.UpdateLength(0)) + i := s.Get() + require.Equal(t, i, "189") - s.Set(&proto.SetRequest{ - Val: "awdgiugaiuwdhoawd", - }) - i, err = s.Get(&proto.GetRequest{}) - require.Equal(t, i.Result, "awdgiugaiuwdhoawd") + resp, length = s.Set("189.12") + require.Equal(t, resp, "189.12") + require.Equal(t, length, structure.UpdateLength(0)) + i = s.Get() + require.Equal(t, i, "189.12") + + resp, length = s.Set("awdawd") + require.Equal(t, resp, "awdawd") + require.Equal(t, length, structure.UpdateLength(-2)) + i = s.Get() + require.Equal(t, i, "awdawd") } func TestStringSingle_Add(t *testing.T) { s := NewStringSingle() - _, err := s.Set(&proto.SetRequest{ - Val: "135", - }) - require.NoError(t, err) - s.Add(&proto.AddRequest{ - Renewal: 9, - }) - resp, err := s.Get(&proto.GetRequest{}) - require.NoError(t, err) - require.Equal(t, resp.Result, "144") + s.Set("189") + s.Add(1) + i := s.Get() + require.Equal(t, i, "190") - // float - _, err = s.Set(&proto.SetRequest{ - Val: "135.33", - }) - require.NoError(t, err) - s.Add(&proto.AddRequest{ - Renewal: 3, - }) - resp, err = s.Get(&proto.GetRequest{}) - require.NoError(t, err) - require.Equal(t, resp.Result, "138.33") + s.Set("189.2") + s.Add(1) + i = s.Get() + require.Equal(t, i, "190.20") - // string - _, err = s.Set(&proto.SetRequest{ - Val: "awjdwad", - }) - require.NoError(t, err) - _, err = s.Add(&proto.AddRequest{ - Renewal: 3, - }) + s.Set("wad") + _, err := s.Add(1) require.Error(t, err) - resp, err = s.Get(&proto.GetRequest{}) - require.NoError(t, err) - require.Equal(t, resp.Result, "awjdwad") } func TestStringSingle_Reduce(t *testing.T) { s := NewStringSingle() - _, err := s.Set(&proto.SetRequest{ - Val: "135", - }) - require.NoError(t, err) - s.Reduce(&proto.ReduceRequest{ - Renewal: 5, - }) - resp, err := s.Get(&proto.GetRequest{}) - require.NoError(t, err) - require.Equal(t, resp.Result, "130") + s.Set("189") + s.Reduce(1) + i := s.Get() + require.Equal(t, i, "188") - // float - _, err = s.Set(&proto.SetRequest{ - Val: "135.33", - }) - require.NoError(t, err) - s.Reduce(&proto.ReduceRequest{ - Renewal: 3, - }) - resp, err = s.Get(&proto.GetRequest{}) - require.NoError(t, err) - require.Equal(t, resp.Result, "132.33") + s.Set("189.2") + s.Reduce(1) + i = s.Get() + require.Equal(t, i, "188.20") - // string - _, err = s.Set(&proto.SetRequest{ - Val: "awjdwad", - }) - require.NoError(t, err) - _, err = s.Reduce(&proto.ReduceRequest{ - Renewal: 3, - }) + s.Set("wad") + _, err := s.Reduce(1) require.Error(t, err) - resp, err = s.Get(&proto.GetRequest{}) - require.NoError(t, err) - require.Equal(t, resp.Result, "awjdwad") } func TestStringSingle_Getbit(t *testing.T) { s := NewStringSingle() - _, err := s.Setbit(&proto.SetbitRequest{ - Offer: 1009, - Val: true, - }) - require.NoError(t, err) + length := s.Setbit(1009, true) + require.Equal(t, length, structure.UpdateLength(1002)) - res, err := s.Getbit(&proto.GetbitRequest{ - Offer: 1009, - }) + res, err := s.Getbit(1009) require.NoError(t, err) - require.Equal(t, res.Val, true) - res, err = s.Getbit(&proto.GetbitRequest{ - Offer: 1008, - }) + require.Equal(t, res, true) + + res, err = s.Getbit(1008) require.NoError(t, err) - require.Equal(t, res.Val, false) + require.Equal(t, res, false) - _, err = s.Setbit(&proto.SetbitRequest{ - Offer: 1009, - Val: false, - }) - - res, err = s.Getbit(&proto.GetbitRequest{ - Offer: 1009, - }) + length = s.Setbit(1009, false) + require.Equal(t, length, structure.UpdateLength(0)) + res, err = s.Getbit(1009) require.NoError(t, err) - require.Equal(t, res.Val, false) - + require.Equal(t, res, false) } diff --git a/pkg/structure/value.go b/pkg/structure/value.go index c56d998..6d65fac 100644 --- a/pkg/structure/value.go +++ b/pkg/structure/value.go @@ -128,10 +128,10 @@ func (v *Value) InferValue(str string) { } // ChangeValueLength 根据类型推断 change 的大小, 只用于 Set 操作不发生错误 -func (v *Value) ChangeValueLength(f func()) int64 { +func (v *Value) ChangeValueLength(f func()) UpdateLength { startLen := v.GetLength() f() - return int64(v.GetLength() - startLen) + return UpdateLength(v.GetLength() - startLen) } func (v *Value) SetByte(offset int, val bool) { diff --git a/protobuf/stringx.proto b/protobuf/stringx.proto index 665bc84..0a783d7 100644 --- a/protobuf/stringx.proto +++ b/protobuf/stringx.proto @@ -8,7 +8,6 @@ message SetRequest { } message SetResponse { - int64 update_size = 1; string result = 2; } @@ -17,7 +16,6 @@ message GetRequest { } message GetResponse { - int64 update_size = 1; string result = 2; } @@ -27,7 +25,6 @@ message AddRequest { } message AddResponse { - int64 update_size = 1; string result = 2; } @@ -37,7 +34,6 @@ message ReduceRequest { } message ReduceResponse { - int64 update_size = 1; string result = 2; } @@ -48,7 +44,6 @@ message SetbitRequest { } message SetbitResponse { - int64 update_size = 1; } message GetbitRequest { @@ -57,6 +52,5 @@ message GetbitRequest { } message GetbitResponse { - int64 update_size = 1; bool val = 2; } diff --git a/shell/gen_protobuf.py b/shell/gen_protobuf.py index 5eccf93..2954fd8 100644 --- a/shell/gen_protobuf.py +++ b/shell/gen_protobuf.py @@ -40,7 +40,7 @@ def mk_structure(cfg_camel): file.write('syntax = "proto3";\nimport "base.proto";\noption go_package = "pkg/proto";\n') for v in value: file.write('\nmessage ' + v + 'Request ' + '{\n BaseKey key = 1;\n}\n') - file.write('\nmessage ' + v + 'Response ' + '{\n int64 update_size = 1;\n}\n') + file.write('\nmessage ' + v + 'Response ' + '{\n}\n') file.close() else: # 如果这个文件存在 @@ -58,7 +58,7 @@ def mk_structure(cfg_camel): if flag == 0: file = open(proto_path, 'a') file.write('\nmessage ' + v + 'Request ' + '{\n BaseKey key = 1;\n}\n') - file.write("\nmessage %sResponse {\n int64 update_size = 1;\n}\n" % v) + file.write("\nmessage %sResponse {\n}\n" % v) file.close() print(f"{key}.proto", "-> success") diff --git a/shell/make-struct.py b/shell/make-struct.py index 86e9787..ec29f29 100644 --- a/shell/make-struct.py +++ b/shell/make-struct.py @@ -113,7 +113,7 @@ def format_code_go(): if __name__ == "__main__": conf, cfg_camel = load_conf() set_structure_const_template(conf) - set_structure_interface(cfg_camel) - set_storage_server(cfg_camel) + # set_structure_interface(cfg_camel) + # set_storage_server(cfg_camel) # 格式化代码 format_code_go() diff --git a/storage/cmd/root.go b/storage/cmd/root.go index b08b1c6..e011f95 100644 --- a/storage/cmd/root.go +++ b/storage/cmd/root.go @@ -3,6 +3,7 @@ package cmd import ( "fmt" + "gitee.com/timedb/wheatCache/storage/server/single" "log" "net" "os" @@ -11,7 +12,6 @@ import ( _ "gitee.com/timedb/wheatCache/conf" "gitee.com/timedb/wheatCache/pkg/proto" - "gitee.com/timedb/wheatCache/storage/server" "github.com/spf13/cobra" "github.com/spf13/viper" "google.golang.org/grpc" @@ -24,7 +24,7 @@ var rootCmd = &cobra.Command{ Short: "storage", Long: `start storage server`, Run: func(cmd *cobra.Command, args []string) { - storageServer := server.NewServer() + storageServer := single.NewServer() // 先写死, 等配置文件 conf := viper.GetStringMap("storage") host := conf["host"].(string) diff --git a/storage/dao/dao.go b/storage/dao/dao.go new file mode 100644 index 0000000..6e8eb70 --- /dev/null +++ b/storage/dao/dao.go @@ -0,0 +1,132 @@ +package dao + +import ( + "gitee.com/timedb/wheatCache/pkg/errorx" + "gitee.com/timedb/wheatCache/pkg/lru" + "gitee.com/timedb/wheatCache/pkg/structure" + "gitee.com/timedb/wheatCache/pkg/structure/stringx" +) + +type Dao struct { + lru lru.CacheInterface +} + +func NewDao(lru lru.CacheInterface) *Dao { + return &Dao{ + lru: lru, + } +} + +func (d *Dao) Set(key string, strVal string) (string, error) { + value, ok := d.lru.Get(key) + if ok { + if val, ok := value.(structure.StringXInterface); ok { + res, length := val.Set(strVal) + d.lru.UpdateLruSize(length) + return res, nil + } else { + return "", errorx.New("the key:%s is not stringx type", key) + } + } + + // 不存在新建 + strValue := stringx.NewStringSingle() + result, length := strValue.Set(strVal) + d.lru.Add(key, strValue) + d.lru.UpdateLruSize(length) + return result, nil +} + +func (d *Dao) Get(key string) (string, error) { + val, ok := d.lru.Get(key) + if !ok { + return "", errorx.NotKeyErr(key) + } + + strVal, ok := val.(structure.StringXInterface) + if !ok { + return "", errorx.DaoTypeErr("stringx") + } + + return strVal.Get(), nil +} + +func (d *Dao) Add(key string, renewal int32) (string, error) { + value, lruOk := d.lru.Get(key) + if !lruOk { + val := stringx.NewStringSingle() + res, err := val.Add(renewal) + if err != nil { + return "", nil + } + d.lru.Add(key, val) + return res, nil + } + + strVal, ok := value.(structure.StringXInterface) + if !ok { + return "", errorx.DaoTypeErr("stringx") + } + + res, err := strVal.Add(renewal) + if err != nil { + return "", err + } + return res, nil +} + +func (d *Dao) Reduce(key string, renewal int32) (string, error) { + value, lruOk := d.lru.Get(key) + if !lruOk { + val := stringx.NewStringSingle() + res, err := val.Reduce(renewal) + if err != nil { + return "", nil + } + d.lru.Add(key, val) + return res, nil + } + + strVal, ok := value.(structure.StringXInterface) + if !ok { + return "", errorx.DaoTypeErr("stringx") + } + + res, err := strVal.Reduce(renewal) + if err != nil { + return "", err + } + return res, nil +} + +func (d *Dao) Setbit(key string, val bool, offer int32) error { + value, lruOk := d.lru.Get(key) + if !lruOk { + valStr := stringx.NewStringSingle() + length := valStr.Setbit(offer, val) + d.lru.UpdateLruSize(length) + d.lru.Add(key, valStr) + return nil + } + + strVal, ok := value.(structure.StringXInterface) + if !ok { + return errorx.DaoTypeErr("stringx") + } + + length := strVal.Setbit(offer, val) + d.lru.UpdateLruSize(length) + return nil +} + +func (d *Dao) GetBit(key string, offer int32) (bool, error) { + value, lruOk := d.lru.Get(key) + if !lruOk { + return false, errorx.NotKeyErr(key) + } + strVal, ok := value.(structure.StringXInterface) + if !ok { + return false, errorx.DaoTypeErr("stringx") + } + return strVal.Getbit(offer) +} diff --git a/storage/dao/define.go b/storage/dao/define.go new file mode 100644 index 0000000..07a0cc0 --- /dev/null +++ b/storage/dao/define.go @@ -0,0 +1 @@ +package dao diff --git a/storage/server/single.gen.go b/storage/server/single.gen.go deleted file mode 100644 index 851ab87..0000000 --- a/storage/server/single.gen.go +++ /dev/null @@ -1,260 +0,0 @@ -// Code generated by gen-struct. DO NOT EDIT. -// make gen-struct generated - -package server - -import ( - context "context" - "gitee.com/timedb/wheatCache/pkg/errorx" - "gitee.com/timedb/wheatCache/pkg/event" - "gitee.com/timedb/wheatCache/pkg/proto" - "gitee.com/timedb/wheatCache/pkg/structure" - - "gitee.com/timedb/wheatCache/pkg/structure/stringx" - - "time" -) - -type serverSingle struct { - middleProduce event.ProduceInterface - lruProduce event.ProduceInterface - ttl time.Duration -} - -func NewServer() proto.CommServerServer { - ser := &serverSingle{} - return ser -} - -// TODO 移除 -func mockLruValue() structure.KeyBaseInterface { - return stringx.NewStringSingle() -} - -func (s *serverSingle) Set( - cxt context.Context, - req *proto.SetRequest, -) (*proto.SetResponse, error) { - lruEvent := event.NewEvent("lru event") - lruEvent.InitWaitEvent() - - work := event.EventWorkFunc(func() (interface{}, error) { - value := mockLruValue() // TODO 替换为从 lru 获取 - switch value.(type) { - case structure.StringXInterface: - resp, err := value.(structure.StringXInterface).Set(req) - if err != nil { - return nil, err - } - return resp, nil - - default: - return nil, errorx.New("value err") - } - - }) - - lruEvent.SetValue("lru work", work) - s.lruProduce.Call(cxt, lruEvent) - resp, err := lruEvent.StartWaitEvent(s.ttl) - if err != nil { - return nil, err - } - - switch resp.(type) { - case *proto.SetResponse: - default: - return nil, errorx.New("value err") - } - return resp.(*proto.SetResponse), nil -} - -func (s *serverSingle) Get( - cxt context.Context, - req *proto.GetRequest, -) (*proto.GetResponse, error) { - lruEvent := event.NewEvent("lru event") - lruEvent.InitWaitEvent() - - work := event.EventWorkFunc(func() (interface{}, error) { - value := mockLruValue() // TODO 替换为从 lru 获取 - switch value.(type) { - case structure.StringXInterface: - resp, err := value.(structure.StringXInterface).Get(req) - if err != nil { - return nil, err - } - return resp, nil - - default: - return nil, errorx.New("value err") - } - - }) - - lruEvent.SetValue("lru work", work) - s.lruProduce.Call(cxt, lruEvent) - resp, err := lruEvent.StartWaitEvent(s.ttl) - if err != nil { - return nil, err - } - - switch resp.(type) { - case *proto.GetResponse: - default: - return nil, errorx.New("value err") - } - return resp.(*proto.GetResponse), nil -} - -func (s *serverSingle) Add( - cxt context.Context, - req *proto.AddRequest, -) (*proto.AddResponse, error) { - lruEvent := event.NewEvent("lru event") - lruEvent.InitWaitEvent() - - work := event.EventWorkFunc(func() (interface{}, error) { - value := mockLruValue() // TODO 替换为从 lru 获取 - switch value.(type) { - case structure.StringXInterface: - resp, err := value.(structure.StringXInterface).Add(req) - if err != nil { - return nil, err - } - return resp, nil - - default: - return nil, errorx.New("value err") - } - - }) - - lruEvent.SetValue("lru work", work) - s.lruProduce.Call(cxt, lruEvent) - resp, err := lruEvent.StartWaitEvent(s.ttl) - if err != nil { - return nil, err - } - - switch resp.(type) { - case *proto.AddResponse: - default: - return nil, errorx.New("value err") - } - return resp.(*proto.AddResponse), nil -} - -func (s *serverSingle) Reduce( - cxt context.Context, - req *proto.ReduceRequest, -) (*proto.ReduceResponse, error) { - lruEvent := event.NewEvent("lru event") - lruEvent.InitWaitEvent() - - work := event.EventWorkFunc(func() (interface{}, error) { - value := mockLruValue() // TODO 替换为从 lru 获取 - switch value.(type) { - case structure.StringXInterface: - resp, err := value.(structure.StringXInterface).Reduce(req) - if err != nil { - return nil, err - } - return resp, nil - - default: - return nil, errorx.New("value err") - } - - }) - - lruEvent.SetValue("lru work", work) - s.lruProduce.Call(cxt, lruEvent) - resp, err := lruEvent.StartWaitEvent(s.ttl) - if err != nil { - return nil, err - } - - switch resp.(type) { - case *proto.ReduceResponse: - default: - return nil, errorx.New("value err") - } - return resp.(*proto.ReduceResponse), nil -} - -func (s *serverSingle) Setbit( - cxt context.Context, - req *proto.SetbitRequest, -) (*proto.SetbitResponse, error) { - lruEvent := event.NewEvent("lru event") - lruEvent.InitWaitEvent() - - work := event.EventWorkFunc(func() (interface{}, error) { - value := mockLruValue() // TODO 替换为从 lru 获取 - switch value.(type) { - case structure.StringXInterface: - resp, err := value.(structure.StringXInterface).Setbit(req) - if err != nil { - return nil, err - } - return resp, nil - - default: - return nil, errorx.New("value err") - } - - }) - - lruEvent.SetValue("lru work", work) - s.lruProduce.Call(cxt, lruEvent) - resp, err := lruEvent.StartWaitEvent(s.ttl) - if err != nil { - return nil, err - } - - switch resp.(type) { - case *proto.SetbitResponse: - default: - return nil, errorx.New("value err") - } - return resp.(*proto.SetbitResponse), nil -} - -func (s *serverSingle) Getbit( - cxt context.Context, - req *proto.GetbitRequest, -) (*proto.GetbitResponse, error) { - lruEvent := event.NewEvent("lru event") - lruEvent.InitWaitEvent() - - work := event.EventWorkFunc(func() (interface{}, error) { - value := mockLruValue() // TODO 替换为从 lru 获取 - switch value.(type) { - case structure.StringXInterface: - resp, err := value.(structure.StringXInterface).Getbit(req) - if err != nil { - return nil, err - } - return resp, nil - - default: - return nil, errorx.New("value err") - } - - }) - - lruEvent.SetValue("lru work", work) - s.lruProduce.Call(cxt, lruEvent) - resp, err := lruEvent.StartWaitEvent(s.ttl) - if err != nil { - return nil, err - } - - switch resp.(type) { - case *proto.GetbitResponse: - default: - return nil, errorx.New("value err") - } - return resp.(*proto.GetbitResponse), nil -} diff --git a/storage/server/single/define.go b/storage/server/single/define.go new file mode 100644 index 0000000..8e650fd --- /dev/null +++ b/storage/server/single/define.go @@ -0,0 +1,12 @@ +package single + +import ( + "sync" +) + +var ( + oneSingleServer sync.Once + sysSingleServer *serverSingle +) + +const timeOutDefault = 2 diff --git a/storage/server/single/single.go b/storage/server/single/single.go new file mode 100644 index 0000000..13a6f18 --- /dev/null +++ b/storage/server/single/single.go @@ -0,0 +1,38 @@ +package single + +import ( + "gitee.com/timedb/wheatCache/pkg/event" + "gitee.com/timedb/wheatCache/pkg/lru" + "gitee.com/timedb/wheatCache/storage/dao" + "github.com/spf13/viper" + "time" +) + +type serverSingle struct { + middleProduce event.ProduceInterface + lruProduce event.ProduceInterface + timeOut time.Duration + lruCache *lru.SingleCache + dao *dao.Dao +} + +func NewServer() *serverSingle { + oneSingleServer.Do(func() { + timeOut := viper.GetInt("storage.timeOut") + if timeOut == 0 { + timeOut = timeOutDefault + } + + lruCache := lru.NewLRUCache() + + ser := &serverSingle{ + lruCache: lruCache, + lruProduce: event.NewProduce(lruCache.GetDriver()), + timeOut: time.Duration(timeOut), + dao: dao.NewDao(lruCache), + } + sysSingleServer = ser + + }) + return sysSingleServer +} diff --git a/storage/server/single/stringx.go b/storage/server/single/stringx.go new file mode 100644 index 0000000..66f6e36 --- /dev/null +++ b/storage/server/single/stringx.go @@ -0,0 +1,134 @@ +package single + +import ( + context "context" + "gitee.com/timedb/wheatCache/pkg/event" + "gitee.com/timedb/wheatCache/pkg/lru" + "gitee.com/timedb/wheatCache/pkg/proto" +) + +func (s *serverSingle) Set( + cxt context.Context, + req *proto.SetRequest, +) (*proto.SetResponse, error) { + + work := event.EventWorkFunc(func() (interface{}, error) { + return s.dao.Set(req.Key.Key, req.Val) + }) + + lruEvent := event.NewEvent(lru.OptionEventName) + lruEvent.InitWaitEvent() + lruEvent.SetValue(lru.WorkFuncEventKey, work) + s.lruProduce.Call(cxt, lruEvent) + resp, err := lruEvent.StartWaitEvent(s.timeOut) + if err != nil { + return nil, err + } + + return &proto.SetResponse{ + Result: resp.(string), + }, nil +} + +func (s *serverSingle) Get( + cxt context.Context, + req *proto.GetRequest, +) (*proto.GetResponse, error) { + work := event.EventWorkFunc(func() (interface{}, error) { + return s.dao.Get(req.Key.Key) + }) + + lruEvent := event.NewEvent(lru.OptionEventName) + lruEvent.InitWaitEvent() + lruEvent.SetValue(lru.WorkFuncEventKey, work) + s.lruProduce.Call(cxt, lruEvent) + resp, err := lruEvent.StartWaitEvent(s.timeOut) + if err != nil { + return nil, err + } + return &proto.GetResponse{ + Result: resp.(string), + }, nil +} + +func (s serverSingle) Add( + cxt context.Context, + req *proto.AddRequest, +) (*proto.AddResponse, error) { + work := event.EventWorkFunc(func() (interface{}, error) { + return s.dao.Add(req.Key.Key, req.Renewal) + }) + + lruEvent := event.NewEvent(lru.OptionEventName) + lruEvent.InitWaitEvent() + lruEvent.SetValue(lru.WorkFuncEventKey, work) + s.lruProduce.Call(cxt, lruEvent) + resp, err := lruEvent.StartWaitEvent(s.timeOut) + if err != nil { + return nil, err + } + return &proto.AddResponse{ + Result: resp.(string), + }, nil +} + +func (s *serverSingle) Reduce( + cxt context.Context, + req *proto.ReduceRequest, +) (*proto.ReduceResponse, error) { + work := event.EventWorkFunc(func() (interface{}, error) { + return s.dao.Add(req.Key.Key, req.Renewal) + }) + + lruEvent := event.NewEvent(lru.OptionEventName) + lruEvent.InitWaitEvent() + lruEvent.SetValue(lru.WorkFuncEventKey, work) + s.lruProduce.Call(cxt, lruEvent) + resp, err := lruEvent.StartWaitEvent(s.timeOut) + if err != nil { + return nil, err + } + return &proto.ReduceResponse{ + Result: resp.(string), + }, nil +} + +func (s *serverSingle) Setbit( + cxt context.Context, + req *proto.SetbitRequest, +) (*proto.SetbitResponse, error) { + work := event.EventWorkFunc(func() (interface{}, error) { + return nil, s.dao.Setbit(req.Key.Key, req.Val, req.Offer) + }) + + lruEvent := event.NewEvent(lru.OptionEventName) + lruEvent.InitWaitEvent() + lruEvent.SetValue(lru.WorkFuncEventKey, work) + s.lruProduce.Call(cxt, lruEvent) + _, err := lruEvent.StartWaitEvent(s.timeOut) + if err != nil { + return nil, err + } + return &proto.SetbitResponse{}, nil +} + +func (s *serverSingle) Getbit( + cxt context.Context, + req *proto.GetbitRequest, +) (*proto.GetbitResponse, error) { + work := event.EventWorkFunc(func() (interface{}, error) { + return s.dao.GetBit(req.Key.Key, req.Offer) + }) + + lruEvent := event.NewEvent(lru.OptionEventName) + lruEvent.InitWaitEvent() + lruEvent.SetValue(lru.WorkFuncEventKey, work) + s.lruProduce.Call(cxt, lruEvent) + flag, err := lruEvent.StartWaitEvent(s.timeOut) + if err != nil { + return nil, err + } + return &proto.GetbitResponse{ + Val: flag.(bool), + }, nil +}