diff --git a/conf/wheat-cache.yaml b/conf/wheat-cache.yaml index d29ec38..d0a7950 100644 --- a/conf/wheat-cache.yaml +++ b/conf/wheat-cache.yaml @@ -17,7 +17,7 @@ lruCache: logPrint: - stath: [ "debug", "error" ] + stath: ["error"] middleware-driver: @@ -47,4 +47,5 @@ plugins-control: gateway: host: '127.0.0.1' port: 5891 + target: ["127.0.0.1:5890"] \ No newline at end of file diff --git a/gateway/cmd/root.go b/gateway/cmd/root.go index d98e590..cb68456 100644 --- a/gateway/cmd/root.go +++ b/gateway/cmd/root.go @@ -7,6 +7,7 @@ import ( _ "gitee.com/timedb/wheatCache/conf" wheatCodec "gitee.com/timedb/wheatCache/gateway/codec" "gitee.com/timedb/wheatCache/gateway/proxy" + "gitee.com/timedb/wheatCache/gateway/transport" "gitee.com/timedb/wheatCache/pkg/logx" "gitee.com/timedb/wheatCache/pkg/util/server" "github.com/spf13/cobra" @@ -49,11 +50,19 @@ func Execute() { } func GetGatewayServer() *grpc.Server { + + targets := viper.GetStringSlice("gateway.target") + + logx.Debug("service target in %v", targets) + + stream := proxy.GetDirectorByServiceHash() + transport := transport.NewHashTransport(transport.HashReplicasDefault, nil, targets...) + opts := make([]grpc.ServerOption, 0) opts = append( opts, grpc.ForceServerCodec(wheatCodec.Codec()), - grpc.UnknownServiceHandler(proxy.TransparentHandler(proxy.GetDirectorByServiceHash())), + grpc.UnknownServiceHandler(proxy.TransparentHandler(stream, transport)), ) return grpc.NewServer(opts...) diff --git a/gateway/proxy/define.go b/gateway/proxy/define.go index 0167d65..42101fa 100644 --- a/gateway/proxy/define.go +++ b/gateway/proxy/define.go @@ -3,10 +3,11 @@ package proxy import ( "context" + "gitee.com/timedb/wheatCache/gateway/transport" "google.golang.org/grpc" ) -type StreamDirector func(ctx context.Context, fullMethodName string) (context.Context, *grpc.ClientConn, error) +type StreamDirector func(ctx context.Context, fullMethodName string, transport transport.TransPortInterface) (context.Context, *grpc.ClientConn, error) var ( clientStreamDescForProxying = &grpc.StreamDesc{ diff --git a/gateway/proxy/director.go b/gateway/proxy/director.go index d610c13..7fcd937 100644 --- a/gateway/proxy/director.go +++ b/gateway/proxy/director.go @@ -4,7 +4,7 @@ import ( "context" "gitee.com/timedb/wheatCache/gateway/codec" - "gitee.com/timedb/wheatCache/pkg/logx" + "gitee.com/timedb/wheatCache/gateway/transport" "gitee.com/timedb/wheatCache/pkg/proto" "google.golang.org/grpc" "google.golang.org/grpc/codes" @@ -13,7 +13,7 @@ import ( ) func GetDirectorByServiceHash() StreamDirector { - return func(ctx context.Context, fullMethodName string) (context.Context, *grpc.ClientConn, error) { + return func(ctx context.Context, fullMethodName string, transport transport.TransPortInterface) (context.Context, *grpc.ClientConn, error) { md, ok := metadata.FromIncomingContext(ctx) if !ok { @@ -26,10 +26,12 @@ func GetDirectorByServiceHash() StreamDirector { "grpc header is not found %s, please check the client interceptor", proto.BaseKeyMethodKey) } - logx.Infoln(baseKey[0]) + target, err := transport.GetTargetAddr(baseKey...) + if err != nil { + return nil, nil, status.Errorf(codes.Unknown, "get transport err, err:%v", err) + } - // TODO hash, mock 直接转发到 storage dev 上 - cli, err := grpc.DialContext(ctx, "127.0.0.1:5890", grpc.WithInsecure(), grpc.WithDefaultCallOptions(grpc.ForceCodec(codec.Codec()))) + cli, err := grpc.DialContext(ctx, target, grpc.WithInsecure(), grpc.WithDefaultCallOptions(grpc.ForceCodec(codec.Codec()))) return ctx, cli, err } } diff --git a/gateway/proxy/proxy.go b/gateway/proxy/proxy.go index 9171d0d..e419433 100644 --- a/gateway/proxy/proxy.go +++ b/gateway/proxy/proxy.go @@ -5,6 +5,7 @@ import ( "io" wheatCodec "gitee.com/timedb/wheatCache/gateway/codec" + "gitee.com/timedb/wheatCache/gateway/transport" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -15,13 +16,17 @@ import ( // backends. It should be used as a `grpc.UnknownServiceHandler`. // // This can *only* be used if the `server` also uses grpcproxy.CodecForServer() ServerOption. -func TransparentHandler(director StreamDirector) grpc.StreamHandler { - streamer := &handler{director} +func TransparentHandler(director StreamDirector, tranport transport.TransPortInterface) grpc.StreamHandler { + streamer := &handler{ + director, + tranport, + } return streamer.handler } type handler struct { - director StreamDirector + director StreamDirector + transport transport.TransPortInterface } // handler is where the real magic of proxying happens. @@ -33,7 +38,7 @@ func (s *handler) handler(srv interface{}, serverStream grpc.ServerStream) error return status.Errorf(codes.Internal, "lowLevelServerStream not exists in context") } - outgoingCtx, backendConn, err := s.director(serverStream.Context(), fullMethodName) + outgoingCtx, backendConn, err := s.director(serverStream.Context(), fullMethodName, s.transport) if err != nil { return err } diff --git a/gateway/transport/define.go b/gateway/transport/define.go new file mode 100644 index 0000000..57c7942 --- /dev/null +++ b/gateway/transport/define.go @@ -0,0 +1,11 @@ +package transport + +type TransPortInterface interface { + GetTargetAddr(...string) (string, error) + IsEmpty() bool + AddTarget(targets ...string) +} + +const ( + HashReplicasDefault = 3 +) diff --git a/gateway/transport/hash.go b/gateway/transport/hash.go new file mode 100644 index 0000000..8c0e74d --- /dev/null +++ b/gateway/transport/hash.go @@ -0,0 +1,85 @@ +package transport + +import ( + "hash/crc32" + "sort" + "strconv" + + "gitee.com/timedb/wheatCache/pkg/errorx" +) + +type HashFunc func(data []byte) uint32 + +// 实现 sort +type UInt32Slice []uint32 + +func (s UInt32Slice) Len() int { + return len(s) +} + +func (s UInt32Slice) Less(i, j int) bool { + return s[i] < s[j] +} + +func (s UInt32Slice) Swap(i, j int) { + s[i], s[j] = s[j], s[i] +} + +type HashTransport struct { + hash HashFunc + replicas int // 复制因子 + keys UInt32Slice + hashMap map[uint32]string // taraget 隐射 +} + +func NewHashTransport(replicas int, fn HashFunc, target ...string) TransPortInterface { + transport := &HashTransport{ + replicas: replicas, + hash: fn, + hashMap: make(map[uint32]string, len(target)), + } + + if transport.hash == nil { + transport.hash = crc32.ChecksumIEEE // 默认使用 CRC32 算法 + } + + transport.AddTarget(target...) + + return transport +} + +func (h *HashTransport) IsEmpty() bool { + return len(h.keys) == 0 +} + +func (h *HashTransport) AddTarget(targets ...string) { + for _, tar := range targets { + + for i := 0; i < h.replicas; i++ { + hash := h.hash([]byte(strconv.Itoa(i) + tar)) + h.keys = append(h.keys, hash) + h.hashMap[hash] = tar + } + } + + // 虚拟值排序,方便查找 + sort.Sort(h.keys) +} + +func (h *HashTransport) GetTargetAddr(str ...string) (string, error) { + if h.IsEmpty() { + return "", errorx.New("gateway not register transport") + } + + if len(str) != 1 { + return "", errorx.New("must give key") + + } + hash := h.hash([]byte(str[0])) + idx := sort.Search(len(h.keys), func(i int) bool { return h.keys[i] >= hash }) + if idx == len(h.keys) { + return h.hashMap[h.keys[0]], nil + } + + return h.hashMap[h.keys[idx]], nil +} diff --git a/gateway/transport/hash_test.go b/gateway/transport/hash_test.go new file mode 100644 index 0000000..f6e9774 --- /dev/null +++ b/gateway/transport/hash_test.go @@ -0,0 +1,17 @@ +package transport + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestHashTransport_GetTargetAddr(t *testing.T) { + tran := NewHashTransport(3, nil, "127.0.0.1:5581", "127.0.0.1:5582", "127.0.0.1:5583") + + key := "test" + + target, err := tran.GetTargetAddr(key) + require.NoError(t, err) + require.Equal(t, target, "127.0.0.1:5582") +}