diff --git a/client/client.go b/client/client.go new file mode 100644 index 0000000..9f6c3e6 --- /dev/null +++ b/client/client.go @@ -0,0 +1,22 @@ +package client + +import ( + "gitee.com/timedb/wheatCache/client/middle" + "gitee.com/timedb/wheatCache/pkg/proto" + "google.golang.org/grpc" +) + +func newWheatClient(targer string, opt ...middle.ClientMiddle) (proto.CommServerClient, error) { + + interceptor := middle.GetUnaryInterceptor(opt...) + comm, err := grpc.Dial(targer, grpc.WithInsecure(), grpc.WithUnaryInterceptor(interceptor)) + if err != nil { + return nil, err + } + + return proto.NewCommServerClient(comm), nil +} + +func NewWheatClient(targer string, opt ...middle.ClientMiddle) (proto.CommServerClient, error) { + return newWheatClient(targer, opt...) +} diff --git a/client/client_test.go b/client/client_test.go new file mode 100644 index 0000000..a317a40 --- /dev/null +++ b/client/client_test.go @@ -0,0 +1,23 @@ +package client + +import ( + "context" + "testing" + + "gitee.com/timedb/wheatCache/client/middle" + "gitee.com/timedb/wheatCache/pkg/proto" + "github.com/stretchr/testify/require" +) + +func TestClient(t *testing.T) { + cli, err := NewWheatClient("127.0.0.1:5891", middle.WithUnaryColonyClient) + require.NoError(t, err) + ctx := context.Background() + _, err = cli.Set(ctx, &proto.SetRequest{ + Key: &proto.BaseKey{ + Key: "apple", + }, + Val: "yyyy", + }) + require.NoError(t, err) +} diff --git a/client/middle/define.go b/client/middle/define.go new file mode 100644 index 0000000..a0fc2ff --- /dev/null +++ b/client/middle/define.go @@ -0,0 +1,6 @@ +package middle + +import "context" + +// type UnaryClientInterceptor func(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, invoker UnaryInvoker, opts ...CallOption) error +type ClientMiddle func(ctx context.Context, method string, req, reply interface{}, header map[string]string) error diff --git a/client/middle/middle.go b/client/middle/middle.go new file mode 100644 index 0000000..69f3ba4 --- /dev/null +++ b/client/middle/middle.go @@ -0,0 +1,53 @@ +package middle + +import ( + "context" + + "gitee.com/timedb/wheatCache/pkg/proto" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/metadata" + "google.golang.org/grpc/status" +) + +func WithUnaryColonyClient(ctx context.Context, method string, req, reply interface{}, header map[string]string) error { + key, ok := req.(proto.GetKeyBaseInterface) + if !ok { + return status.Errorf(codes.Unknown, "key base err") + } + if header == nil { + return nil + } + // meta 解析会出现 全部小写问题 + header[proto.BaseKeyMethodKey] = key.GetKey().Key + return nil +} + +func getKeyByKeyMapvalue(m map[string]string) []string { + l := make([]string, 0) + for key, value := range m { + l = append(l, key, value) + } + return l +} + +func GetUnaryInterceptor(middleOpts ...ClientMiddle) grpc.UnaryClientInterceptor { + return func(ctx context.Context, method string, req, reply interface{}, + cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { + // 加载中间件 + header := make(map[string]string) + for _, mid := range middleOpts { + err := mid(ctx, method, req, reply, header) + if err != nil { + return err + } + } + + lm := getKeyByKeyMapvalue(header) + + headerData := metadata.Pairs(lm...) + ctxH := metadata.NewOutgoingContext(ctx, headerData) + + return invoker(ctxH, method, req, reply, cc, opts...) + } +} diff --git a/gateway/cmd/gateway_test.go b/gateway/cmd/gateway_test.go deleted file mode 100644 index fe25ac4..0000000 --- a/gateway/cmd/gateway_test.go +++ /dev/null @@ -1,34 +0,0 @@ -package cmd - -import ( - "context" - "fmt" - "testing" - - "gitee.com/timedb/wheatCache/pkg/proto" - "github.com/stretchr/testify/require" - "google.golang.org/grpc" -) - -func TestContextChannel(t *testing.T) { - cxt := context.Background() - _, ctxChannel := context.WithCancel(cxt) - ctxChannel() - ctxChannel() -} - -func TestGateway_Master(t *testing.T) { - comm, err := grpc.Dial("127.0.0.1:5891", grpc.WithInsecure()) - require.NoError(t, err) - ctx := context.Background() - cli := proto.NewCommServerClient(comm) - resp, err := cli.Set(ctx, &proto.SetRequest{ - Key: &proto.BaseKey{ - Key: "aa", - }, - Val: "awd", - }) - require.NoError(t, err) - fmt.Printf(resp.Result) - -} diff --git a/gateway/proxy/director.go b/gateway/proxy/director.go index 19d546b..d610c13 100644 --- a/gateway/proxy/director.go +++ b/gateway/proxy/director.go @@ -26,7 +26,7 @@ func GetDirectorByServiceHash() StreamDirector { "grpc header is not found %s, please check the client interceptor", proto.BaseKeyMethodKey) } - logx.Infoln(baseKey) + logx.Infoln(baseKey[0]) // TODO hash, mock 直接转发到 storage dev 上 cli, err := grpc.DialContext(ctx, "127.0.0.1:5890", grpc.WithInsecure(), grpc.WithDefaultCallOptions(grpc.ForceCodec(codec.Codec()))) diff --git a/pkg/proto/define.go b/pkg/proto/define.go index a82e599..e333d8b 100644 --- a/pkg/proto/define.go +++ b/pkg/proto/define.go @@ -5,5 +5,5 @@ type GetKeyBaseInterface interface { } const ( - BaseKeyMethodKey = "BaseKey" + BaseKeyMethodKey = "basekey" ) diff --git a/pkg/structure/stringx/string.go b/pkg/structure/stringx/string.go index 6efe942..9879237 100644 --- a/pkg/structure/stringx/string.go +++ b/pkg/structure/stringx/string.go @@ -61,7 +61,7 @@ func updateValueNotString(s *StringSingle, val int32) (string, error) { return "", err } s.val.SetFloat64(f + float64(val)) - return strconv.FormatFloat(f+1, 'f', 2, 64), nil + return strconv.FormatFloat(f+float64(val), 'f', 2, 64), nil case structure.DynamicInt: i, err := s.val.ToInt() if err != nil { diff --git a/storage/dao/dao_test.go b/storage/dao/dao_test.go index 47bb56e..438a4f9 100644 --- a/storage/dao/dao_test.go +++ b/storage/dao/dao_test.go @@ -4,12 +4,24 @@ import ( "testing" _ "gitee.com/timedb/wheatCache/conf" - "gitee.com/timedb/wheatCache/pkg/logx" "gitee.com/timedb/wheatCache/pkg/lru" "gitee.com/timedb/wheatCache/pkg/proto" "github.com/stretchr/testify/require" ) +func mockData(t *testing.T, d *Dao) { + + values := []string{"1", "1.3", "abcdefg"} + + for _, val := range values { + key := &proto.BaseKey{ + Key: val, + } + _, err := d.Set(key, val) + require.NoError(t, err) + } + +} func TestDao_Set(t *testing.T) { lruCache := lru.NewLRUCache() dao := NewDao(lruCache) @@ -17,8 +29,61 @@ func TestDao_Set(t *testing.T) { Key: "abc", } - res, err := dao.Get(key) + res, err := dao.Set(key, "abc") + require.NoError(t, err) + require.Equal(t, res, "abc") + + res, err = dao.Get(key) + require.NoError(t, err) + require.Equal(t, res, "abc") +} + +func TestDao_Add(t *testing.T) { + lruCache := lru.NewLRUCache() + dao := NewDao(lruCache) + mockData(t, dao) + + resp, err := dao.Add(&proto.BaseKey{Key: "1"}, 2) + require.NoError(t, err) + require.Equal(t, resp, "3") + + resp, err = dao.Add(&proto.BaseKey{Key: "1.3"}, 2) + require.NoError(t, err) + require.Equal(t, resp, "3.30") + + _, err = dao.Add(&proto.BaseKey{Key: "abcdefg"}, 2) require.Error(t, err) - logx.Infoln(res) +} + +func TestDao_Reduce(t *testing.T) { + lruCache := lru.NewLRUCache() + dao := NewDao(lruCache) + mockData(t, dao) + + resp, err := dao.Reduce(&proto.BaseKey{Key: "1"}, 2) + require.NoError(t, err) + require.Equal(t, resp, "-1") + + resp, err = dao.Reduce(&proto.BaseKey{Key: "1.3"}, 2) + require.NoError(t, err) + require.Equal(t, resp, "-0.70") + + _, err = dao.Reduce(&proto.BaseKey{Key: "abcdefg"}, 2) + require.Error(t, err) +} + +func TestDao_Setbit(t *testing.T) { + lruCache := lru.NewLRUCache() + dao := NewDao(lruCache) + key := &proto.BaseKey{ + Key: "abc", + } + + err := dao.Setbit(key, true, 3089) + require.NoError(t, err) + + re, err := dao.GetBit(key, 3089) + require.NoError(t, err) + require.Equal(t, re, true) } diff --git a/storage/server/single_test.go b/storage/server/single_test.go deleted file mode 100644 index 80ef1ee..0000000 --- a/storage/server/single_test.go +++ /dev/null @@ -1,21 +0,0 @@ -package server - -import ( - "context" - "fmt" - "gitee.com/timedb/wheatCache/pkg/proto" - "github.com/stretchr/testify/require" - "google.golang.org/grpc" - "testing" -) - -func TestServerSingle_Get(t *testing.T) { - comm, err := grpc.Dial("127.0.0.1:5890", grpc.WithInsecure()) - require.NoError(t, err) - - ctx := context.Background() - cli := proto.NewCommServerClient(comm) - resp, err := cli.Get(ctx, &proto.GetRequest{}) - require.NoError(t, err) - fmt.Println(resp) -}