!75 gateway 添加 一致性 hash 环算法实现 分布式集群

Merge pull request !75 from bandl/feat-gateway-transpoart
This commit is contained in:
bandl 2021-10-24 12:07:30 +00:00 committed by Gitee
commit 3f4607fb58
8 changed files with 143 additions and 12 deletions

View File

@ -17,7 +17,7 @@ lruCache:
logPrint: logPrint:
stath: [ "debug", "error" ] stath: ["error"]
middleware-driver: middleware-driver:
@ -47,4 +47,5 @@ plugins-control:
gateway: gateway:
host: '127.0.0.1' host: '127.0.0.1'
port: 5891 port: 5891
target: ["127.0.0.1:5890"]

View File

@ -7,6 +7,7 @@ import (
_ "gitee.com/timedb/wheatCache/conf" _ "gitee.com/timedb/wheatCache/conf"
wheatCodec "gitee.com/timedb/wheatCache/gateway/codec" wheatCodec "gitee.com/timedb/wheatCache/gateway/codec"
"gitee.com/timedb/wheatCache/gateway/proxy" "gitee.com/timedb/wheatCache/gateway/proxy"
"gitee.com/timedb/wheatCache/gateway/transport"
"gitee.com/timedb/wheatCache/pkg/logx" "gitee.com/timedb/wheatCache/pkg/logx"
"gitee.com/timedb/wheatCache/pkg/util/server" "gitee.com/timedb/wheatCache/pkg/util/server"
"github.com/spf13/cobra" "github.com/spf13/cobra"
@ -49,11 +50,19 @@ func Execute() {
} }
func GetGatewayServer() *grpc.Server { func GetGatewayServer() *grpc.Server {
targets := viper.GetStringSlice("gateway.target")
logx.Debug("service target in %v", targets)
stream := proxy.GetDirectorByServiceHash()
transport := transport.NewHashTransport(transport.HashReplicasDefault, nil, targets...)
opts := make([]grpc.ServerOption, 0) opts := make([]grpc.ServerOption, 0)
opts = append( opts = append(
opts, opts,
grpc.ForceServerCodec(wheatCodec.Codec()), grpc.ForceServerCodec(wheatCodec.Codec()),
grpc.UnknownServiceHandler(proxy.TransparentHandler(proxy.GetDirectorByServiceHash())), grpc.UnknownServiceHandler(proxy.TransparentHandler(stream, transport)),
) )
return grpc.NewServer(opts...) return grpc.NewServer(opts...)

View File

@ -3,10 +3,11 @@ package proxy
import ( import (
"context" "context"
"gitee.com/timedb/wheatCache/gateway/transport"
"google.golang.org/grpc" "google.golang.org/grpc"
) )
type StreamDirector func(ctx context.Context, fullMethodName string) (context.Context, *grpc.ClientConn, error) type StreamDirector func(ctx context.Context, fullMethodName string, transport transport.TransPortInterface) (context.Context, *grpc.ClientConn, error)
var ( var (
clientStreamDescForProxying = &grpc.StreamDesc{ clientStreamDescForProxying = &grpc.StreamDesc{

View File

@ -4,7 +4,7 @@ import (
"context" "context"
"gitee.com/timedb/wheatCache/gateway/codec" "gitee.com/timedb/wheatCache/gateway/codec"
"gitee.com/timedb/wheatCache/pkg/logx" "gitee.com/timedb/wheatCache/gateway/transport"
"gitee.com/timedb/wheatCache/pkg/proto" "gitee.com/timedb/wheatCache/pkg/proto"
"google.golang.org/grpc" "google.golang.org/grpc"
"google.golang.org/grpc/codes" "google.golang.org/grpc/codes"
@ -13,7 +13,7 @@ import (
) )
func GetDirectorByServiceHash() StreamDirector { func GetDirectorByServiceHash() StreamDirector {
return func(ctx context.Context, fullMethodName string) (context.Context, *grpc.ClientConn, error) { return func(ctx context.Context, fullMethodName string, transport transport.TransPortInterface) (context.Context, *grpc.ClientConn, error) {
md, ok := metadata.FromIncomingContext(ctx) md, ok := metadata.FromIncomingContext(ctx)
if !ok { if !ok {
@ -26,10 +26,12 @@ func GetDirectorByServiceHash() StreamDirector {
"grpc header is not found %s, please check the client interceptor", proto.BaseKeyMethodKey) "grpc header is not found %s, please check the client interceptor", proto.BaseKeyMethodKey)
} }
logx.Infoln(baseKey[0]) target, err := transport.GetTargetAddr(baseKey...)
if err != nil {
return nil, nil, status.Errorf(codes.Unknown, "get transport err, err:%v", err)
}
// TODO hash, mock 直接转发到 storage dev 上 cli, err := grpc.DialContext(ctx, target, grpc.WithInsecure(), grpc.WithDefaultCallOptions(grpc.ForceCodec(codec.Codec())))
cli, err := grpc.DialContext(ctx, "127.0.0.1:5890", grpc.WithInsecure(), grpc.WithDefaultCallOptions(grpc.ForceCodec(codec.Codec())))
return ctx, cli, err return ctx, cli, err
} }
} }

View File

@ -5,6 +5,7 @@ import (
"io" "io"
wheatCodec "gitee.com/timedb/wheatCache/gateway/codec" wheatCodec "gitee.com/timedb/wheatCache/gateway/codec"
"gitee.com/timedb/wheatCache/gateway/transport"
"google.golang.org/grpc" "google.golang.org/grpc"
"google.golang.org/grpc/codes" "google.golang.org/grpc/codes"
"google.golang.org/grpc/status" "google.golang.org/grpc/status"
@ -15,13 +16,17 @@ import (
// backends. It should be used as a `grpc.UnknownServiceHandler`. // backends. It should be used as a `grpc.UnknownServiceHandler`.
// //
// This can *only* be used if the `server` also uses grpcproxy.CodecForServer() ServerOption. // This can *only* be used if the `server` also uses grpcproxy.CodecForServer() ServerOption.
func TransparentHandler(director StreamDirector) grpc.StreamHandler { func TransparentHandler(director StreamDirector, tranport transport.TransPortInterface) grpc.StreamHandler {
streamer := &handler{director} streamer := &handler{
director,
tranport,
}
return streamer.handler return streamer.handler
} }
type handler struct { type handler struct {
director StreamDirector director StreamDirector
transport transport.TransPortInterface
} }
// handler is where the real magic of proxying happens. // handler is where the real magic of proxying happens.
@ -33,7 +38,7 @@ func (s *handler) handler(srv interface{}, serverStream grpc.ServerStream) error
return status.Errorf(codes.Internal, "lowLevelServerStream not exists in context") return status.Errorf(codes.Internal, "lowLevelServerStream not exists in context")
} }
outgoingCtx, backendConn, err := s.director(serverStream.Context(), fullMethodName) outgoingCtx, backendConn, err := s.director(serverStream.Context(), fullMethodName, s.transport)
if err != nil { if err != nil {
return err return err
} }

View File

@ -0,0 +1,11 @@
package transport
type TransPortInterface interface {
GetTargetAddr(...string) (string, error)
IsEmpty() bool
AddTarget(targets ...string)
}
const (
HashReplicasDefault = 3
)

85
gateway/transport/hash.go Normal file
View File

@ -0,0 +1,85 @@
package transport
import (
"hash/crc32"
"sort"
"strconv"
"gitee.com/timedb/wheatCache/pkg/errorx"
)
type HashFunc func(data []byte) uint32
// 实现 sort
type UInt32Slice []uint32
func (s UInt32Slice) Len() int {
return len(s)
}
func (s UInt32Slice) Less(i, j int) bool {
return s[i] < s[j]
}
func (s UInt32Slice) Swap(i, j int) {
s[i], s[j] = s[j], s[i]
}
type HashTransport struct {
hash HashFunc
replicas int // 复制因子
keys UInt32Slice
hashMap map[uint32]string // taraget 隐射
}
func NewHashTransport(replicas int, fn HashFunc, target ...string) TransPortInterface {
transport := &HashTransport{
replicas: replicas,
hash: fn,
hashMap: make(map[uint32]string, len(target)),
}
if transport.hash == nil {
transport.hash = crc32.ChecksumIEEE // 默认使用 CRC32 算法
}
transport.AddTarget(target...)
return transport
}
func (h *HashTransport) IsEmpty() bool {
return len(h.keys) == 0
}
func (h *HashTransport) AddTarget(targets ...string) {
for _, tar := range targets {
for i := 0; i < h.replicas; i++ {
hash := h.hash([]byte(strconv.Itoa(i) + tar))
h.keys = append(h.keys, hash)
h.hashMap[hash] = tar
}
}
// 虚拟值排序,方便查找
sort.Sort(h.keys)
}
func (h *HashTransport) GetTargetAddr(str ...string) (string, error) {
if h.IsEmpty() {
return "", errorx.New("gateway not register transport")
}
if len(str) != 1 {
return "", errorx.New("must give key")
}
hash := h.hash([]byte(str[0]))
idx := sort.Search(len(h.keys), func(i int) bool { return h.keys[i] >= hash })
if idx == len(h.keys) {
return h.hashMap[h.keys[0]], nil
}
return h.hashMap[h.keys[idx]], nil
}

View File

@ -0,0 +1,17 @@
package transport
import (
"testing"
"github.com/stretchr/testify/require"
)
func TestHashTransport_GetTargetAddr(t *testing.T) {
tran := NewHashTransport(3, nil, "127.0.0.1:5581", "127.0.0.1:5582", "127.0.0.1:5583")
key := "test"
target, err := tran.GetTargetAddr(key)
require.NoError(t, err)
require.Equal(t, target, "127.0.0.1:5582")
}