forked from p53841790/wheat-cache
commit
310e11c65b
|
@ -0,0 +1,22 @@
|
|||
package client
|
||||
|
||||
import (
|
||||
"gitee.com/timedb/wheatCache/client/middle"
|
||||
"gitee.com/timedb/wheatCache/pkg/proto"
|
||||
"google.golang.org/grpc"
|
||||
)
|
||||
|
||||
func newWheatClient(targer string, opt ...middle.ClientMiddle) (proto.CommServerClient, error) {
|
||||
|
||||
interceptor := middle.GetUnaryInterceptor(opt...)
|
||||
comm, err := grpc.Dial(targer, grpc.WithInsecure(), grpc.WithUnaryInterceptor(interceptor))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return proto.NewCommServerClient(comm), nil
|
||||
}
|
||||
|
||||
func NewWheatClient(targer string, opt ...middle.ClientMiddle) (proto.CommServerClient, error) {
|
||||
return newWheatClient(targer, opt...)
|
||||
}
|
|
@ -0,0 +1,23 @@
|
|||
package client
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
|
||||
"gitee.com/timedb/wheatCache/client/middle"
|
||||
"gitee.com/timedb/wheatCache/pkg/proto"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestClient(t *testing.T) {
|
||||
cli, err := NewWheatClient("127.0.0.1:5891", middle.WithUnaryColonyClient)
|
||||
require.NoError(t, err)
|
||||
ctx := context.Background()
|
||||
_, err = cli.Set(ctx, &proto.SetRequest{
|
||||
Key: &proto.BaseKey{
|
||||
Key: "apple",
|
||||
},
|
||||
Val: "yyyy",
|
||||
})
|
||||
require.NoError(t, err)
|
||||
}
|
|
@ -0,0 +1,6 @@
|
|||
package middle
|
||||
|
||||
import "context"
|
||||
|
||||
// type UnaryClientInterceptor func(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, invoker UnaryInvoker, opts ...CallOption) error
|
||||
type ClientMiddle func(ctx context.Context, method string, req, reply interface{}, header map[string]string) error
|
|
@ -0,0 +1,53 @@
|
|||
package middle
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"gitee.com/timedb/wheatCache/pkg/proto"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/metadata"
|
||||
"google.golang.org/grpc/status"
|
||||
)
|
||||
|
||||
func WithUnaryColonyClient(ctx context.Context, method string, req, reply interface{}, header map[string]string) error {
|
||||
key, ok := req.(proto.GetKeyBaseInterface)
|
||||
if !ok {
|
||||
return status.Errorf(codes.Unknown, "key base err")
|
||||
}
|
||||
if header == nil {
|
||||
return nil
|
||||
}
|
||||
// meta 解析会出现 全部小写问题
|
||||
header[proto.BaseKeyMethodKey] = key.GetKey().Key
|
||||
return nil
|
||||
}
|
||||
|
||||
func getKeyByKeyMapvalue(m map[string]string) []string {
|
||||
l := make([]string, 0)
|
||||
for key, value := range m {
|
||||
l = append(l, key, value)
|
||||
}
|
||||
return l
|
||||
}
|
||||
|
||||
func GetUnaryInterceptor(middleOpts ...ClientMiddle) grpc.UnaryClientInterceptor {
|
||||
return func(ctx context.Context, method string, req, reply interface{},
|
||||
cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
|
||||
// 加载中间件
|
||||
header := make(map[string]string)
|
||||
for _, mid := range middleOpts {
|
||||
err := mid(ctx, method, req, reply, header)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
lm := getKeyByKeyMapvalue(header)
|
||||
|
||||
headerData := metadata.Pairs(lm...)
|
||||
ctxH := metadata.NewOutgoingContext(ctx, headerData)
|
||||
|
||||
return invoker(ctxH, method, req, reply, cc, opts...)
|
||||
}
|
||||
}
|
|
@ -1,34 +0,0 @@
|
|||
package cmd
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"testing"
|
||||
|
||||
"gitee.com/timedb/wheatCache/pkg/proto"
|
||||
"github.com/stretchr/testify/require"
|
||||
"google.golang.org/grpc"
|
||||
)
|
||||
|
||||
func TestContextChannel(t *testing.T) {
|
||||
cxt := context.Background()
|
||||
_, ctxChannel := context.WithCancel(cxt)
|
||||
ctxChannel()
|
||||
ctxChannel()
|
||||
}
|
||||
|
||||
func TestGateway_Master(t *testing.T) {
|
||||
comm, err := grpc.Dial("127.0.0.1:5891", grpc.WithInsecure())
|
||||
require.NoError(t, err)
|
||||
ctx := context.Background()
|
||||
cli := proto.NewCommServerClient(comm)
|
||||
resp, err := cli.Set(ctx, &proto.SetRequest{
|
||||
Key: &proto.BaseKey{
|
||||
Key: "aa",
|
||||
},
|
||||
Val: "awd",
|
||||
})
|
||||
require.NoError(t, err)
|
||||
fmt.Printf(resp.Result)
|
||||
|
||||
}
|
|
@ -26,7 +26,7 @@ func GetDirectorByServiceHash() StreamDirector {
|
|||
"grpc header is not found %s, please check the client interceptor", proto.BaseKeyMethodKey)
|
||||
}
|
||||
|
||||
logx.Infoln(baseKey)
|
||||
logx.Infoln(baseKey[0])
|
||||
|
||||
// TODO hash, mock 直接转发到 storage dev 上
|
||||
cli, err := grpc.DialContext(ctx, "127.0.0.1:5890", grpc.WithInsecure(), grpc.WithDefaultCallOptions(grpc.ForceCodec(codec.Codec())))
|
||||
|
|
|
@ -5,5 +5,5 @@ type GetKeyBaseInterface interface {
|
|||
}
|
||||
|
||||
const (
|
||||
BaseKeyMethodKey = "BaseKey"
|
||||
BaseKeyMethodKey = "basekey"
|
||||
)
|
||||
|
|
|
@ -61,7 +61,7 @@ func updateValueNotString(s *StringSingle, val int32) (string, error) {
|
|||
return "", err
|
||||
}
|
||||
s.val.SetFloat64(f + float64(val))
|
||||
return strconv.FormatFloat(f+1, 'f', 2, 64), nil
|
||||
return strconv.FormatFloat(f+float64(val), 'f', 2, 64), nil
|
||||
case structure.DynamicInt:
|
||||
i, err := s.val.ToInt()
|
||||
if err != nil {
|
||||
|
|
|
@ -4,12 +4,24 @@ import (
|
|||
"testing"
|
||||
|
||||
_ "gitee.com/timedb/wheatCache/conf"
|
||||
"gitee.com/timedb/wheatCache/pkg/logx"
|
||||
"gitee.com/timedb/wheatCache/pkg/lru"
|
||||
"gitee.com/timedb/wheatCache/pkg/proto"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func mockData(t *testing.T, d *Dao) {
|
||||
|
||||
values := []string{"1", "1.3", "abcdefg"}
|
||||
|
||||
for _, val := range values {
|
||||
key := &proto.BaseKey{
|
||||
Key: val,
|
||||
}
|
||||
_, err := d.Set(key, val)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
}
|
||||
func TestDao_Set(t *testing.T) {
|
||||
lruCache := lru.NewLRUCache()
|
||||
dao := NewDao(lruCache)
|
||||
|
@ -17,8 +29,61 @@ func TestDao_Set(t *testing.T) {
|
|||
Key: "abc",
|
||||
}
|
||||
|
||||
res, err := dao.Get(key)
|
||||
res, err := dao.Set(key, "abc")
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, res, "abc")
|
||||
|
||||
res, err = dao.Get(key)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, res, "abc")
|
||||
}
|
||||
|
||||
func TestDao_Add(t *testing.T) {
|
||||
lruCache := lru.NewLRUCache()
|
||||
dao := NewDao(lruCache)
|
||||
mockData(t, dao)
|
||||
|
||||
resp, err := dao.Add(&proto.BaseKey{Key: "1"}, 2)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, resp, "3")
|
||||
|
||||
resp, err = dao.Add(&proto.BaseKey{Key: "1.3"}, 2)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, resp, "3.30")
|
||||
|
||||
_, err = dao.Add(&proto.BaseKey{Key: "abcdefg"}, 2)
|
||||
require.Error(t, err)
|
||||
logx.Infoln(res)
|
||||
}
|
||||
|
||||
func TestDao_Reduce(t *testing.T) {
|
||||
lruCache := lru.NewLRUCache()
|
||||
dao := NewDao(lruCache)
|
||||
mockData(t, dao)
|
||||
|
||||
resp, err := dao.Reduce(&proto.BaseKey{Key: "1"}, 2)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, resp, "-1")
|
||||
|
||||
resp, err = dao.Reduce(&proto.BaseKey{Key: "1.3"}, 2)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, resp, "-0.70")
|
||||
|
||||
_, err = dao.Reduce(&proto.BaseKey{Key: "abcdefg"}, 2)
|
||||
require.Error(t, err)
|
||||
}
|
||||
|
||||
func TestDao_Setbit(t *testing.T) {
|
||||
lruCache := lru.NewLRUCache()
|
||||
dao := NewDao(lruCache)
|
||||
key := &proto.BaseKey{
|
||||
Key: "abc",
|
||||
}
|
||||
|
||||
err := dao.Setbit(key, true, 3089)
|
||||
require.NoError(t, err)
|
||||
|
||||
re, err := dao.GetBit(key, 3089)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, re, true)
|
||||
|
||||
}
|
||||
|
|
|
@ -1,21 +0,0 @@
|
|||
package server
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"gitee.com/timedb/wheatCache/pkg/proto"
|
||||
"github.com/stretchr/testify/require"
|
||||
"google.golang.org/grpc"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestServerSingle_Get(t *testing.T) {
|
||||
comm, err := grpc.Dial("127.0.0.1:5890", grpc.WithInsecure())
|
||||
require.NoError(t, err)
|
||||
|
||||
ctx := context.Background()
|
||||
cli := proto.NewCommServerClient(comm)
|
||||
resp, err := cli.Get(ctx, &proto.GetRequest{})
|
||||
require.NoError(t, err)
|
||||
fmt.Println(resp)
|
||||
}
|
Loading…
Reference in New Issue