feat(gateway): add hash ring gateway

This commit is contained in:
bandl 2021-10-24 20:03:35 +08:00
parent cd1ba29f42
commit 09730e9ebf
5 changed files with 32 additions and 11 deletions

View File

@ -7,6 +7,7 @@ import (
_ "gitee.com/timedb/wheatCache/conf"
wheatCodec "gitee.com/timedb/wheatCache/gateway/codec"
"gitee.com/timedb/wheatCache/gateway/proxy"
"gitee.com/timedb/wheatCache/gateway/transport"
"gitee.com/timedb/wheatCache/pkg/logx"
"gitee.com/timedb/wheatCache/pkg/util/server"
"github.com/spf13/cobra"
@ -49,11 +50,19 @@ func Execute() {
}
func GetGatewayServer() *grpc.Server {
targets := viper.GetStringSlice("gateway.target")
logx.Debug("service target in %v", targets)
stream := proxy.GetDirectorByServiceHash()
transport := transport.NewHashTransport(transport.HashReplicasDefault, nil, targets...)
opts := make([]grpc.ServerOption, 0)
opts = append(
opts,
grpc.ForceServerCodec(wheatCodec.Codec()),
grpc.UnknownServiceHandler(proxy.TransparentHandler(proxy.GetDirectorByServiceHash())),
grpc.UnknownServiceHandler(proxy.TransparentHandler(stream, transport)),
)
return grpc.NewServer(opts...)

View File

@ -3,10 +3,11 @@ package proxy
import (
"context"
"gitee.com/timedb/wheatCache/gateway/transport"
"google.golang.org/grpc"
)
type StreamDirector func(ctx context.Context, fullMethodName string) (context.Context, *grpc.ClientConn, error)
type StreamDirector func(ctx context.Context, fullMethodName string, transport transport.TransPortInterface) (context.Context, *grpc.ClientConn, error)
var (
clientStreamDescForProxying = &grpc.StreamDesc{

View File

@ -4,7 +4,7 @@ import (
"context"
"gitee.com/timedb/wheatCache/gateway/codec"
"gitee.com/timedb/wheatCache/pkg/logx"
"gitee.com/timedb/wheatCache/gateway/transport"
"gitee.com/timedb/wheatCache/pkg/proto"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
@ -13,7 +13,7 @@ import (
)
func GetDirectorByServiceHash() StreamDirector {
return func(ctx context.Context, fullMethodName string) (context.Context, *grpc.ClientConn, error) {
return func(ctx context.Context, fullMethodName string, transport transport.TransPortInterface) (context.Context, *grpc.ClientConn, error) {
md, ok := metadata.FromIncomingContext(ctx)
if !ok {
@ -26,10 +26,12 @@ func GetDirectorByServiceHash() StreamDirector {
"grpc header is not found %s, please check the client interceptor", proto.BaseKeyMethodKey)
}
logx.Infoln(baseKey[0])
target, err := transport.GetTargetAddr(baseKey...)
if err != nil {
return nil, nil, status.Errorf(codes.Unknown, "get transport err, err:%v", err)
}
// TODO hash, mock 直接转发到 storage dev 上
cli, err := grpc.DialContext(ctx, "127.0.0.1:5890", grpc.WithInsecure(), grpc.WithDefaultCallOptions(grpc.ForceCodec(codec.Codec())))
cli, err := grpc.DialContext(ctx, target, grpc.WithInsecure(), grpc.WithDefaultCallOptions(grpc.ForceCodec(codec.Codec())))
return ctx, cli, err
}
}

View File

@ -5,6 +5,7 @@ import (
"io"
wheatCodec "gitee.com/timedb/wheatCache/gateway/codec"
"gitee.com/timedb/wheatCache/gateway/transport"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
@ -15,13 +16,17 @@ import (
// backends. It should be used as a `grpc.UnknownServiceHandler`.
//
// This can *only* be used if the `server` also uses grpcproxy.CodecForServer() ServerOption.
func TransparentHandler(director StreamDirector) grpc.StreamHandler {
streamer := &handler{director}
func TransparentHandler(director StreamDirector, tranport transport.TransPortInterface) grpc.StreamHandler {
streamer := &handler{
director,
tranport,
}
return streamer.handler
}
type handler struct {
director StreamDirector
director StreamDirector
transport transport.TransPortInterface
}
// handler is where the real magic of proxying happens.
@ -33,7 +38,7 @@ func (s *handler) handler(srv interface{}, serverStream grpc.ServerStream) error
return status.Errorf(codes.Internal, "lowLevelServerStream not exists in context")
}
outgoingCtx, backendConn, err := s.director(serverStream.Context(), fullMethodName)
outgoingCtx, backendConn, err := s.director(serverStream.Context(), fullMethodName, s.transport)
if err != nil {
return err
}

View File

@ -5,3 +5,7 @@ type TransPortInterface interface {
IsEmpty() bool
AddTarget(targets ...string)
}
const (
HashReplicasDefault = 3
)