From 09730e9ebfc84c8e324800af4bc68e0b7d04cb16 Mon Sep 17 00:00:00 2001 From: bandl <1658002533@qq.com> Date: Sun, 24 Oct 2021 20:03:35 +0800 Subject: [PATCH] feat(gateway): add hash ring gateway --- gateway/cmd/root.go | 11 ++++++++++- gateway/proxy/define.go | 3 ++- gateway/proxy/director.go | 12 +++++++----- gateway/proxy/proxy.go | 13 +++++++++---- gateway/transport/define.go | 4 ++++ 5 files changed, 32 insertions(+), 11 deletions(-) diff --git a/gateway/cmd/root.go b/gateway/cmd/root.go index d98e590..cb68456 100644 --- a/gateway/cmd/root.go +++ b/gateway/cmd/root.go @@ -7,6 +7,7 @@ import ( _ "gitee.com/timedb/wheatCache/conf" wheatCodec "gitee.com/timedb/wheatCache/gateway/codec" "gitee.com/timedb/wheatCache/gateway/proxy" + "gitee.com/timedb/wheatCache/gateway/transport" "gitee.com/timedb/wheatCache/pkg/logx" "gitee.com/timedb/wheatCache/pkg/util/server" "github.com/spf13/cobra" @@ -49,11 +50,19 @@ func Execute() { } func GetGatewayServer() *grpc.Server { + + targets := viper.GetStringSlice("gateway.target") + + logx.Debug("service target in %v", targets) + + stream := proxy.GetDirectorByServiceHash() + transport := transport.NewHashTransport(transport.HashReplicasDefault, nil, targets...) + opts := make([]grpc.ServerOption, 0) opts = append( opts, grpc.ForceServerCodec(wheatCodec.Codec()), - grpc.UnknownServiceHandler(proxy.TransparentHandler(proxy.GetDirectorByServiceHash())), + grpc.UnknownServiceHandler(proxy.TransparentHandler(stream, transport)), ) return grpc.NewServer(opts...) diff --git a/gateway/proxy/define.go b/gateway/proxy/define.go index 0167d65..42101fa 100644 --- a/gateway/proxy/define.go +++ b/gateway/proxy/define.go @@ -3,10 +3,11 @@ package proxy import ( "context" + "gitee.com/timedb/wheatCache/gateway/transport" "google.golang.org/grpc" ) -type StreamDirector func(ctx context.Context, fullMethodName string) (context.Context, *grpc.ClientConn, error) +type StreamDirector func(ctx context.Context, fullMethodName string, transport transport.TransPortInterface) (context.Context, *grpc.ClientConn, error) var ( clientStreamDescForProxying = &grpc.StreamDesc{ diff --git a/gateway/proxy/director.go b/gateway/proxy/director.go index d610c13..7fcd937 100644 --- a/gateway/proxy/director.go +++ b/gateway/proxy/director.go @@ -4,7 +4,7 @@ import ( "context" "gitee.com/timedb/wheatCache/gateway/codec" - "gitee.com/timedb/wheatCache/pkg/logx" + "gitee.com/timedb/wheatCache/gateway/transport" "gitee.com/timedb/wheatCache/pkg/proto" "google.golang.org/grpc" "google.golang.org/grpc/codes" @@ -13,7 +13,7 @@ import ( ) func GetDirectorByServiceHash() StreamDirector { - return func(ctx context.Context, fullMethodName string) (context.Context, *grpc.ClientConn, error) { + return func(ctx context.Context, fullMethodName string, transport transport.TransPortInterface) (context.Context, *grpc.ClientConn, error) { md, ok := metadata.FromIncomingContext(ctx) if !ok { @@ -26,10 +26,12 @@ func GetDirectorByServiceHash() StreamDirector { "grpc header is not found %s, please check the client interceptor", proto.BaseKeyMethodKey) } - logx.Infoln(baseKey[0]) + target, err := transport.GetTargetAddr(baseKey...) + if err != nil { + return nil, nil, status.Errorf(codes.Unknown, "get transport err, err:%v", err) + } - // TODO hash, mock 直接转发到 storage dev 上 - cli, err := grpc.DialContext(ctx, "127.0.0.1:5890", grpc.WithInsecure(), grpc.WithDefaultCallOptions(grpc.ForceCodec(codec.Codec()))) + cli, err := grpc.DialContext(ctx, target, grpc.WithInsecure(), grpc.WithDefaultCallOptions(grpc.ForceCodec(codec.Codec()))) return ctx, cli, err } } diff --git a/gateway/proxy/proxy.go b/gateway/proxy/proxy.go index 9171d0d..e419433 100644 --- a/gateway/proxy/proxy.go +++ b/gateway/proxy/proxy.go @@ -5,6 +5,7 @@ import ( "io" wheatCodec "gitee.com/timedb/wheatCache/gateway/codec" + "gitee.com/timedb/wheatCache/gateway/transport" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -15,13 +16,17 @@ import ( // backends. It should be used as a `grpc.UnknownServiceHandler`. // // This can *only* be used if the `server` also uses grpcproxy.CodecForServer() ServerOption. -func TransparentHandler(director StreamDirector) grpc.StreamHandler { - streamer := &handler{director} +func TransparentHandler(director StreamDirector, tranport transport.TransPortInterface) grpc.StreamHandler { + streamer := &handler{ + director, + tranport, + } return streamer.handler } type handler struct { - director StreamDirector + director StreamDirector + transport transport.TransPortInterface } // handler is where the real magic of proxying happens. @@ -33,7 +38,7 @@ func (s *handler) handler(srv interface{}, serverStream grpc.ServerStream) error return status.Errorf(codes.Internal, "lowLevelServerStream not exists in context") } - outgoingCtx, backendConn, err := s.director(serverStream.Context(), fullMethodName) + outgoingCtx, backendConn, err := s.director(serverStream.Context(), fullMethodName, s.transport) if err != nil { return err } diff --git a/gateway/transport/define.go b/gateway/transport/define.go index a60126e..57c7942 100644 --- a/gateway/transport/define.go +++ b/gateway/transport/define.go @@ -5,3 +5,7 @@ type TransPortInterface interface { IsEmpty() bool AddTarget(targets ...string) } + +const ( + HashReplicasDefault = 3 +)