feat(gateway): add get keybase ctx

This commit is contained in:
bandl 2021-10-17 10:49:14 +08:00
parent 477613f0de
commit d6f1631d7b
3 changed files with 32 additions and 3 deletions

View File

@ -4,11 +4,30 @@ import (
"context" "context"
"gitee.com/timedb/wheatCache/gateway/codec" "gitee.com/timedb/wheatCache/gateway/codec"
"gitee.com/timedb/wheatCache/pkg/logx"
"gitee.com/timedb/wheatCache/pkg/proto"
"google.golang.org/grpc" "google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
) )
func GetDirectorByServiceHash() StreamDirector { func GetDirectorByServiceHash() StreamDirector {
return func(ctx context.Context, fullMethodName string) (context.Context, *grpc.ClientConn, error) { return func(ctx context.Context, fullMethodName string) (context.Context, *grpc.ClientConn, error) {
md, ok := metadata.FromIncomingContext(ctx)
if !ok {
return nil, nil, status.Errorf(codes.Unknown, "from FromIncomingContext err")
}
baseKey, ok := md[proto.BaseKeyMethodKey]
if !ok {
return nil, nil, status.Errorf(codes.Unknown,
"grpc header is not found %s, please check the client interceptor", proto.BaseKeyMethodKey)
}
logx.Infoln(baseKey)
// TODO hash, mock 直接转发到 storage dev 上 // TODO hash, mock 直接转发到 storage dev 上
cli, err := grpc.DialContext(ctx, "127.0.0.1:5890", grpc.WithInsecure(), grpc.WithDefaultCallOptions(grpc.ForceCodec(codec.Codec()))) cli, err := grpc.DialContext(ctx, "127.0.0.1:5890", grpc.WithInsecure(), grpc.WithDefaultCallOptions(grpc.ForceCodec(codec.Codec())))
return ctx, cli, err return ctx, cli, err

View File

@ -7,6 +7,7 @@ import (
wheatCodec "gitee.com/timedb/wheatCache/gateway/codec" wheatCodec "gitee.com/timedb/wheatCache/gateway/codec"
"google.golang.org/grpc" "google.golang.org/grpc"
"google.golang.org/grpc/codes" "google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
) )
// TransparentHandler returns a handler that attempts to proxy all requests that are not registered in the server. // TransparentHandler returns a handler that attempts to proxy all requests that are not registered in the server.
@ -29,7 +30,7 @@ type handler struct {
func (s *handler) handler(srv interface{}, serverStream grpc.ServerStream) error { func (s *handler) handler(srv interface{}, serverStream grpc.ServerStream) error {
fullMethodName, ok := grpc.MethodFromServerStream(serverStream) fullMethodName, ok := grpc.MethodFromServerStream(serverStream)
if !ok { if !ok {
return grpc.Errorf(codes.Internal, "lowLevelServerStream not exists in context") return status.Errorf(codes.Internal, "lowLevelServerStream not exists in context")
} }
outgoingCtx, backendConn, err := s.director(serverStream.Context(), fullMethodName) outgoingCtx, backendConn, err := s.director(serverStream.Context(), fullMethodName)
@ -58,7 +59,7 @@ func (s *handler) handler(srv interface{}, serverStream grpc.ServerStream) error
} }
clientCancel() clientCancel()
return grpc.Errorf(codes.Internal, "failed proxying s2c: %v", s2cErr) return status.Errorf(codes.Internal, "failed proxying s2c: %v", s2cErr)
case c2sErr := <-c2sErrChan: case c2sErr := <-c2sErrChan:
// 服务的没用在提供数据触发这个分支 // 服务的没用在提供数据触发这个分支
serverStream.SetTrailer(clientStream.Trailer()) serverStream.SetTrailer(clientStream.Trailer())
@ -71,7 +72,7 @@ func (s *handler) handler(srv interface{}, serverStream grpc.ServerStream) error
} }
} }
return grpc.Errorf(codes.Internal, "gRPC proxying should never reach this stage.") return status.Errorf(codes.Internal, "gRPC proxying should never reach this stage.")
} }
func (s *handler) forwardClientToServer(src grpc.ClientStream, dst grpc.ServerStream) chan error { func (s *handler) forwardClientToServer(src grpc.ClientStream, dst grpc.ServerStream) chan error {

9
pkg/proto/define.go Normal file
View File

@ -0,0 +1,9 @@
package proto
type GetKeyBaseInterface interface {
GetKey() *BaseKey
}
const (
BaseKeyMethodKey = "BaseKey"
)