fix(hash-endpoint): tranport to endpoint

This commit is contained in:
bandl 2021-11-07 21:45:11 +08:00
parent f6eaae71b9
commit 51bd9b242f
7 changed files with 28 additions and 28 deletions

View File

@ -6,8 +6,8 @@ import (
_ "gitee.com/wheat-os/wheatCache/conf" _ "gitee.com/wheat-os/wheatCache/conf"
wheatCodec "gitee.com/wheat-os/wheatCache/gateway/codec" wheatCodec "gitee.com/wheat-os/wheatCache/gateway/codec"
"gitee.com/wheat-os/wheatCache/gateway/endpoint"
"gitee.com/wheat-os/wheatCache/gateway/proxy" "gitee.com/wheat-os/wheatCache/gateway/proxy"
"gitee.com/wheat-os/wheatCache/gateway/transport"
"gitee.com/wheat-os/wheatCache/pkg/logx" "gitee.com/wheat-os/wheatCache/pkg/logx"
"gitee.com/wheat-os/wheatCache/pkg/util/server" "gitee.com/wheat-os/wheatCache/pkg/util/server"
"github.com/spf13/cobra" "github.com/spf13/cobra"
@ -56,13 +56,13 @@ func GetGatewayServer() *grpc.Server {
logx.Debug("service target in %v", targets) logx.Debug("service target in %v", targets)
stream := proxy.GetDirectorByServiceHash() stream := proxy.GetDirectorByServiceHash()
transport := transport.NewHashTransport(transport.HashReplicasDefault, nil, targets...) endpoint := endpoint.NewHashEndpoint(endpoint.HashReplicasDefault, nil, targets...)
opts := make([]grpc.ServerOption, 0) opts := make([]grpc.ServerOption, 0)
opts = append( opts = append(
opts, opts,
grpc.ForceServerCodec(wheatCodec.Codec()), grpc.ForceServerCodec(wheatCodec.Codec()),
grpc.UnknownServiceHandler(proxy.TransparentHandler(stream, transport)), grpc.UnknownServiceHandler(proxy.TransparentHandler(stream, endpoint)),
) )
return grpc.NewServer(opts...) return grpc.NewServer(opts...)

View File

@ -1,6 +1,6 @@
package transport package endpoint
type TransPortInterface interface { type EndpointInterface interface {
GetTargetAddr(...string) (string, error) GetTargetAddr(...string) (string, error)
IsEmpty() bool IsEmpty() bool
AddTarget(targets ...string) AddTarget(targets ...string)

View File

@ -1,4 +1,4 @@
package transport package endpoint
import ( import (
"hash/crc32" "hash/crc32"
@ -25,34 +25,34 @@ func (s UInt32Slice) Swap(i, j int) {
s[i], s[j] = s[j], s[i] s[i], s[j] = s[j], s[i]
} }
type HashTransport struct { type HashEndpoint struct {
hash HashFunc hash HashFunc
replicas int // 复制因子 replicas int // 复制因子
keys UInt32Slice keys UInt32Slice
hashMap map[uint32]string // taraget 隐射 hashMap map[uint32]string // taraget 隐射
} }
func NewHashTransport(replicas int, fn HashFunc, target ...string) TransPortInterface { func NewHashEndpoint(replicas int, fn HashFunc, target ...string) EndpointInterface {
transport := &HashTransport{ endpoint := &HashEndpoint{
replicas: replicas, replicas: replicas,
hash: fn, hash: fn,
hashMap: make(map[uint32]string, len(target)), hashMap: make(map[uint32]string, len(target)),
} }
if transport.hash == nil { if endpoint.hash == nil {
transport.hash = crc32.ChecksumIEEE // 默认使用 CRC32 算法 endpoint.hash = crc32.ChecksumIEEE // 默认使用 CRC32 算法
} }
transport.AddTarget(target...) endpoint.AddTarget(target...)
return transport return endpoint
} }
func (h *HashTransport) IsEmpty() bool { func (h *HashEndpoint) IsEmpty() bool {
return len(h.keys) == 0 return len(h.keys) == 0
} }
func (h *HashTransport) AddTarget(targets ...string) { func (h *HashEndpoint) AddTarget(targets ...string) {
for _, tar := range targets { for _, tar := range targets {
for i := 0; i < h.replicas; i++ { for i := 0; i < h.replicas; i++ {
@ -66,7 +66,7 @@ func (h *HashTransport) AddTarget(targets ...string) {
sort.Sort(h.keys) sort.Sort(h.keys)
} }
func (h *HashTransport) GetTargetAddr(str ...string) (string, error) { func (h *HashEndpoint) GetTargetAddr(str ...string) (string, error) {
if h.IsEmpty() { if h.IsEmpty() {
return "", errorx.New("gateway not register transport") return "", errorx.New("gateway not register transport")
} }

View File

@ -1,4 +1,4 @@
package transport package endpoint
import ( import (
"testing" "testing"

View File

@ -3,11 +3,11 @@ package proxy
import ( import (
"context" "context"
"gitee.com/wheat-os/wheatCache/gateway/transport" "gitee.com/wheat-os/wheatCache/gateway/endpoint"
"google.golang.org/grpc" "google.golang.org/grpc"
) )
type StreamDirector func(ctx context.Context, fullMethodName string, transport transport.TransPortInterface) (context.Context, *grpc.ClientConn, error) type StreamDirector func(ctx context.Context, fullMethodName string, endpoint endpoint.EndpointInterface) (context.Context, *grpc.ClientConn, error)
var ( var (
clientStreamDescForProxying = &grpc.StreamDesc{ clientStreamDescForProxying = &grpc.StreamDesc{

View File

@ -4,7 +4,7 @@ import (
"context" "context"
"gitee.com/wheat-os/wheatCache/gateway/codec" "gitee.com/wheat-os/wheatCache/gateway/codec"
"gitee.com/wheat-os/wheatCache/gateway/transport" "gitee.com/wheat-os/wheatCache/gateway/endpoint"
"gitee.com/wheat-os/wheatCache/pkg/proto" "gitee.com/wheat-os/wheatCache/pkg/proto"
"google.golang.org/grpc" "google.golang.org/grpc"
"google.golang.org/grpc/codes" "google.golang.org/grpc/codes"
@ -13,7 +13,7 @@ import (
) )
func GetDirectorByServiceHash() StreamDirector { func GetDirectorByServiceHash() StreamDirector {
return func(ctx context.Context, fullMethodName string, transport transport.TransPortInterface) (context.Context, *grpc.ClientConn, error) { return func(ctx context.Context, fullMethodName string, endpoint endpoint.EndpointInterface) (context.Context, *grpc.ClientConn, error) {
md, ok := metadata.FromIncomingContext(ctx) md, ok := metadata.FromIncomingContext(ctx)
if !ok { if !ok {
@ -26,7 +26,7 @@ func GetDirectorByServiceHash() StreamDirector {
"grpc header is not found %s, please check the client interceptor", proto.BaseKeyMethodKey) "grpc header is not found %s, please check the client interceptor", proto.BaseKeyMethodKey)
} }
target, err := transport.GetTargetAddr(baseKey...) target, err := endpoint.GetTargetAddr(baseKey...)
if err != nil { if err != nil {
return nil, nil, status.Errorf(codes.Unknown, "get transport err, err:%v", err) return nil, nil, status.Errorf(codes.Unknown, "get transport err, err:%v", err)
} }

View File

@ -5,7 +5,7 @@ import (
"io" "io"
wheatCodec "gitee.com/wheat-os/wheatCache/gateway/codec" wheatCodec "gitee.com/wheat-os/wheatCache/gateway/codec"
"gitee.com/wheat-os/wheatCache/gateway/transport" "gitee.com/wheat-os/wheatCache/gateway/endpoint"
"google.golang.org/grpc" "google.golang.org/grpc"
"google.golang.org/grpc/codes" "google.golang.org/grpc/codes"
"google.golang.org/grpc/status" "google.golang.org/grpc/status"
@ -16,17 +16,17 @@ import (
// backends. It should be used as a `grpc.UnknownServiceHandler`. // backends. It should be used as a `grpc.UnknownServiceHandler`.
// //
// This can *only* be used if the `server` also uses grpcproxy.CodecForServer() ServerOption. // This can *only* be used if the `server` also uses grpcproxy.CodecForServer() ServerOption.
func TransparentHandler(director StreamDirector, tranport transport.TransPortInterface) grpc.StreamHandler { func TransparentHandler(director StreamDirector, endpoint endpoint.EndpointInterface) grpc.StreamHandler {
streamer := &handler{ streamer := &handler{
director, director,
tranport, endpoint,
} }
return streamer.handler return streamer.handler
} }
type handler struct { type handler struct {
director StreamDirector director StreamDirector
transport transport.TransPortInterface endpoint endpoint.EndpointInterface
} }
// handler is where the real magic of proxying happens. // handler is where the real magic of proxying happens.
@ -38,7 +38,7 @@ func (s *handler) handler(srv interface{}, serverStream grpc.ServerStream) error
return status.Errorf(codes.Internal, "lowLevelServerStream not exists in context") return status.Errorf(codes.Internal, "lowLevelServerStream not exists in context")
} }
outgoingCtx, backendConn, err := s.director(serverStream.Context(), fullMethodName, s.transport) outgoingCtx, backendConn, err := s.director(serverStream.Context(), fullMethodName, s.endpoint)
if err != nil { if err != nil {
return err return err
} }