diff --git a/gateway/cmd/root.go b/gateway/cmd/root.go index a3deb3a..9300437 100644 --- a/gateway/cmd/root.go +++ b/gateway/cmd/root.go @@ -6,8 +6,8 @@ import ( _ "gitee.com/wheat-os/wheatCache/conf" wheatCodec "gitee.com/wheat-os/wheatCache/gateway/codec" + "gitee.com/wheat-os/wheatCache/gateway/endpoint" "gitee.com/wheat-os/wheatCache/gateway/proxy" - "gitee.com/wheat-os/wheatCache/gateway/transport" "gitee.com/wheat-os/wheatCache/pkg/logx" "gitee.com/wheat-os/wheatCache/pkg/util/server" "github.com/spf13/cobra" @@ -56,13 +56,13 @@ func GetGatewayServer() *grpc.Server { logx.Debug("service target in %v", targets) stream := proxy.GetDirectorByServiceHash() - transport := transport.NewHashTransport(transport.HashReplicasDefault, nil, targets...) + endpoint := endpoint.NewHashEndpoint(endpoint.HashReplicasDefault, nil, targets...) opts := make([]grpc.ServerOption, 0) opts = append( opts, grpc.ForceServerCodec(wheatCodec.Codec()), - grpc.UnknownServiceHandler(proxy.TransparentHandler(stream, transport)), + grpc.UnknownServiceHandler(proxy.TransparentHandler(stream, endpoint)), ) return grpc.NewServer(opts...) diff --git a/gateway/transport/define.go b/gateway/endpoint/define.go similarity index 70% rename from gateway/transport/define.go rename to gateway/endpoint/define.go index 57c7942..9e5b6ec 100644 --- a/gateway/transport/define.go +++ b/gateway/endpoint/define.go @@ -1,6 +1,6 @@ -package transport +package endpoint -type TransPortInterface interface { +type EndpointInterface interface { GetTargetAddr(...string) (string, error) IsEmpty() bool AddTarget(targets ...string) diff --git a/gateway/transport/hash.go b/gateway/endpoint/hash.go similarity index 71% rename from gateway/transport/hash.go rename to gateway/endpoint/hash.go index 329de74..0b1e2c7 100644 --- a/gateway/transport/hash.go +++ b/gateway/endpoint/hash.go @@ -1,4 +1,4 @@ -package transport +package endpoint import ( "hash/crc32" @@ -25,34 +25,34 @@ func (s UInt32Slice) Swap(i, j int) { s[i], s[j] = s[j], s[i] } -type HashTransport struct { +type HashEndpoint struct { hash HashFunc replicas int // 复制因子 keys UInt32Slice hashMap map[uint32]string // taraget 隐射 } -func NewHashTransport(replicas int, fn HashFunc, target ...string) TransPortInterface { - transport := &HashTransport{ +func NewHashEndpoint(replicas int, fn HashFunc, target ...string) EndpointInterface { + endpoint := &HashEndpoint{ replicas: replicas, hash: fn, hashMap: make(map[uint32]string, len(target)), } - if transport.hash == nil { - transport.hash = crc32.ChecksumIEEE // 默认使用 CRC32 算法 + if endpoint.hash == nil { + endpoint.hash = crc32.ChecksumIEEE // 默认使用 CRC32 算法 } - transport.AddTarget(target...) + endpoint.AddTarget(target...) - return transport + return endpoint } -func (h *HashTransport) IsEmpty() bool { +func (h *HashEndpoint) IsEmpty() bool { return len(h.keys) == 0 } -func (h *HashTransport) AddTarget(targets ...string) { +func (h *HashEndpoint) AddTarget(targets ...string) { for _, tar := range targets { for i := 0; i < h.replicas; i++ { @@ -66,7 +66,7 @@ func (h *HashTransport) AddTarget(targets ...string) { sort.Sort(h.keys) } -func (h *HashTransport) GetTargetAddr(str ...string) (string, error) { +func (h *HashEndpoint) GetTargetAddr(str ...string) (string, error) { if h.IsEmpty() { return "", errorx.New("gateway not register transport") } diff --git a/gateway/transport/hash_test.go b/gateway/endpoint/hash_test.go similarity index 94% rename from gateway/transport/hash_test.go rename to gateway/endpoint/hash_test.go index f6e9774..b074a72 100644 --- a/gateway/transport/hash_test.go +++ b/gateway/endpoint/hash_test.go @@ -1,4 +1,4 @@ -package transport +package endpoint import ( "testing" diff --git a/gateway/proxy/define.go b/gateway/proxy/define.go index e100c5c..a20ec52 100644 --- a/gateway/proxy/define.go +++ b/gateway/proxy/define.go @@ -3,11 +3,11 @@ package proxy import ( "context" - "gitee.com/wheat-os/wheatCache/gateway/transport" + "gitee.com/wheat-os/wheatCache/gateway/endpoint" "google.golang.org/grpc" ) -type StreamDirector func(ctx context.Context, fullMethodName string, transport transport.TransPortInterface) (context.Context, *grpc.ClientConn, error) +type StreamDirector func(ctx context.Context, fullMethodName string, endpoint endpoint.EndpointInterface) (context.Context, *grpc.ClientConn, error) var ( clientStreamDescForProxying = &grpc.StreamDesc{ diff --git a/gateway/proxy/director.go b/gateway/proxy/director.go index 0215c8b..d473cfb 100644 --- a/gateway/proxy/director.go +++ b/gateway/proxy/director.go @@ -4,7 +4,7 @@ import ( "context" "gitee.com/wheat-os/wheatCache/gateway/codec" - "gitee.com/wheat-os/wheatCache/gateway/transport" + "gitee.com/wheat-os/wheatCache/gateway/endpoint" "gitee.com/wheat-os/wheatCache/pkg/proto" "google.golang.org/grpc" "google.golang.org/grpc/codes" @@ -13,7 +13,7 @@ import ( ) func GetDirectorByServiceHash() StreamDirector { - return func(ctx context.Context, fullMethodName string, transport transport.TransPortInterface) (context.Context, *grpc.ClientConn, error) { + return func(ctx context.Context, fullMethodName string, endpoint endpoint.EndpointInterface) (context.Context, *grpc.ClientConn, error) { md, ok := metadata.FromIncomingContext(ctx) if !ok { @@ -26,7 +26,7 @@ func GetDirectorByServiceHash() StreamDirector { "grpc header is not found %s, please check the client interceptor", proto.BaseKeyMethodKey) } - target, err := transport.GetTargetAddr(baseKey...) + target, err := endpoint.GetTargetAddr(baseKey...) if err != nil { return nil, nil, status.Errorf(codes.Unknown, "get transport err, err:%v", err) } diff --git a/gateway/proxy/proxy.go b/gateway/proxy/proxy.go index e317a33..85fa532 100644 --- a/gateway/proxy/proxy.go +++ b/gateway/proxy/proxy.go @@ -5,7 +5,7 @@ import ( "io" wheatCodec "gitee.com/wheat-os/wheatCache/gateway/codec" - "gitee.com/wheat-os/wheatCache/gateway/transport" + "gitee.com/wheat-os/wheatCache/gateway/endpoint" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -16,17 +16,17 @@ import ( // backends. It should be used as a `grpc.UnknownServiceHandler`. // // This can *only* be used if the `server` also uses grpcproxy.CodecForServer() ServerOption. -func TransparentHandler(director StreamDirector, tranport transport.TransPortInterface) grpc.StreamHandler { +func TransparentHandler(director StreamDirector, endpoint endpoint.EndpointInterface) grpc.StreamHandler { streamer := &handler{ director, - tranport, + endpoint, } return streamer.handler } type handler struct { - director StreamDirector - transport transport.TransPortInterface + director StreamDirector + endpoint endpoint.EndpointInterface } // handler is where the real magic of proxying happens. @@ -38,7 +38,7 @@ func (s *handler) handler(srv interface{}, serverStream grpc.ServerStream) error return status.Errorf(codes.Internal, "lowLevelServerStream not exists in context") } - outgoingCtx, backendConn, err := s.director(serverStream.Context(), fullMethodName, s.transport) + outgoingCtx, backendConn, err := s.director(serverStream.Context(), fullMethodName, s.endpoint) if err != nil { return err }