diff --git a/gateway/proxy/director.go b/gateway/proxy/director.go index 3379e17..19d546b 100644 --- a/gateway/proxy/director.go +++ b/gateway/proxy/director.go @@ -4,11 +4,30 @@ import ( "context" "gitee.com/timedb/wheatCache/gateway/codec" + "gitee.com/timedb/wheatCache/pkg/logx" + "gitee.com/timedb/wheatCache/pkg/proto" "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/metadata" + "google.golang.org/grpc/status" ) func GetDirectorByServiceHash() StreamDirector { return func(ctx context.Context, fullMethodName string) (context.Context, *grpc.ClientConn, error) { + + md, ok := metadata.FromIncomingContext(ctx) + if !ok { + return nil, nil, status.Errorf(codes.Unknown, "from FromIncomingContext err") + } + + baseKey, ok := md[proto.BaseKeyMethodKey] + if !ok { + return nil, nil, status.Errorf(codes.Unknown, + "grpc header is not found %s, please check the client interceptor", proto.BaseKeyMethodKey) + } + + logx.Infoln(baseKey) + // TODO hash, mock 直接转发到 storage dev 上 cli, err := grpc.DialContext(ctx, "127.0.0.1:5890", grpc.WithInsecure(), grpc.WithDefaultCallOptions(grpc.ForceCodec(codec.Codec()))) return ctx, cli, err diff --git a/gateway/proxy/proxy.go b/gateway/proxy/proxy.go index df475d5..9171d0d 100644 --- a/gateway/proxy/proxy.go +++ b/gateway/proxy/proxy.go @@ -7,6 +7,7 @@ import ( wheatCodec "gitee.com/timedb/wheatCache/gateway/codec" "google.golang.org/grpc" "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" ) // TransparentHandler returns a handler that attempts to proxy all requests that are not registered in the server. @@ -29,7 +30,7 @@ type handler struct { func (s *handler) handler(srv interface{}, serverStream grpc.ServerStream) error { fullMethodName, ok := grpc.MethodFromServerStream(serverStream) if !ok { - return grpc.Errorf(codes.Internal, "lowLevelServerStream not exists in context") + return status.Errorf(codes.Internal, "lowLevelServerStream not exists in context") } outgoingCtx, backendConn, err := s.director(serverStream.Context(), fullMethodName) @@ -58,7 +59,7 @@ func (s *handler) handler(srv interface{}, serverStream grpc.ServerStream) error } clientCancel() - return grpc.Errorf(codes.Internal, "failed proxying s2c: %v", s2cErr) + return status.Errorf(codes.Internal, "failed proxying s2c: %v", s2cErr) case c2sErr := <-c2sErrChan: // 服务的没用在提供数据触发这个分支 serverStream.SetTrailer(clientStream.Trailer()) @@ -71,7 +72,7 @@ func (s *handler) handler(srv interface{}, serverStream grpc.ServerStream) error } } - return grpc.Errorf(codes.Internal, "gRPC proxying should never reach this stage.") + return status.Errorf(codes.Internal, "gRPC proxying should never reach this stage.") } func (s *handler) forwardClientToServer(src grpc.ClientStream, dst grpc.ServerStream) chan error { diff --git a/pkg/proto/define.go b/pkg/proto/define.go new file mode 100644 index 0000000..a82e599 --- /dev/null +++ b/pkg/proto/define.go @@ -0,0 +1,9 @@ +package proto + +type GetKeyBaseInterface interface { + GetKey() *BaseKey +} + +const ( + BaseKeyMethodKey = "BaseKey" +)