!64 添加 实现 网关的转发功能

Merge pull request !64 from bandl/feat-getway
This commit is contained in:
bandl 2021-10-16 14:55:54 +00:00 committed by Gitee
commit b97edc4cf1
15 changed files with 425 additions and 30 deletions

3
.gitignore vendored
View File

@ -17,4 +17,5 @@
# build file
/bin/storage
/bin/storage
/bin/gateway

View File

@ -26,3 +26,9 @@ middleware-driver:
plugins-control:
logcontext: [ "logMiddle" ]
gateway:
host: '127.0.0.1'
port: 5891

View File

@ -0,0 +1,11 @@
### Cache 分布式方案-Getway
![getway方案](https://gitee.com/timedb/img/raw/master/images/getway方案.svg)
1. single 集群分布式方案中,使用 getway 方向代理客户端的 grpc 请求, 通过 hash 环实现 分布式。
2. 集群模式中, 通过主从来 实现 cache 的备份问题,提高容灾性。

View File

@ -0,0 +1,34 @@
package cmd
import (
"context"
"fmt"
"testing"
"gitee.com/timedb/wheatCache/pkg/proto"
"github.com/stretchr/testify/require"
"google.golang.org/grpc"
)
func TestContextChannel(t *testing.T) {
cxt := context.Background()
_, ctxChannel := context.WithCancel(cxt)
ctxChannel()
ctxChannel()
}
func TestGateway_Master(t *testing.T) {
comm, err := grpc.Dial("127.0.0.1:5891", grpc.WithInsecure())
require.NoError(t, err)
ctx := context.Background()
cli := proto.NewCommServerClient(comm)
resp, err := cli.Set(ctx, &proto.SetRequest{
Key: &proto.BaseKey{
Key: "aa",
},
Val: "awd",
})
require.NoError(t, err)
fmt.Printf(resp.Result)
}

60
gateway/cmd/root.go Normal file
View File

@ -0,0 +1,60 @@
package cmd
import (
"fmt"
"net"
_ "gitee.com/timedb/wheatCache/conf"
wheatCodec "gitee.com/timedb/wheatCache/gateway/codec"
"gitee.com/timedb/wheatCache/gateway/proxy"
"gitee.com/timedb/wheatCache/pkg/logx"
"gitee.com/timedb/wheatCache/pkg/util/server"
"github.com/spf13/cobra"
"github.com/spf13/viper"
"google.golang.org/grpc"
)
// rootCmd represents the base command when called without any subcommands
var rootCmd = &cobra.Command{
Use: "getway",
Short: "getway",
Long: `start getway server`,
Run: func(cmd *cobra.Command, args []string) {
host := viper.GetString("gateway.host")
port := viper.GetInt("gateway.port")
tcpAddr, err := net.ResolveTCPAddr("tcp", fmt.Sprintf("%s:%d", host, port))
if err != nil {
logx.Panic("get gateway addr err:%v", err)
}
listen, err := net.ListenTCP("tcp", tcpAddr)
if err != nil {
logx.Panic("get gateway tcp conn err:%v", err)
}
gatewayServer := GetGatewayServer()
server.ElegantExitServer(gatewayServer)
logx.Info("start gateway in addr: %s", tcpAddr.String())
if err := gatewayServer.Serve(listen); err != nil {
logx.Errorln(err)
}
},
}
// Execute adds all child commands to the root command and sets flags appropriately.
// This is called by main.main(). It only needs to happen once to the rootCmd.
func Execute() {
cobra.CheckErr(rootCmd.Execute())
}
func GetGatewayServer() *grpc.Server {
opts := make([]grpc.ServerOption, 0)
opts = append(
opts,
grpc.ForceServerCodec(wheatCodec.Codec()),
grpc.UnknownServiceHandler(proxy.TransparentHandler(proxy.GetDirectorByServiceHash())),
)
return grpc.NewServer(opts...)
}

60
gateway/codec/codce.go Normal file
View File

@ -0,0 +1,60 @@
package codec
import (
"google.golang.org/grpc/encoding"
"google.golang.org/protobuf/proto"
)
// protoCodec 用于 gateway 解析全部的 grpc 类型的消息
type protoCodec struct{}
func (protoCodec) Name() string {
return "wheat-cache-proto"
}
func (protoCodec) Marshal(v interface{}) ([]byte, error) {
return proto.Marshal(v.(proto.Message))
}
func (protoCodec) Unmarshal(data []byte, v interface{}) error {
return proto.Unmarshal(data, v.(proto.Message))
}
type Frame struct {
payload []byte
}
type proxyCodec struct {
baseCodec encoding.Codec
}
func (p *proxyCodec) Name() string {
return "wheat-cache-proxy"
}
func (p *proxyCodec) Marshal(v interface{}) ([]byte, error) {
out, ok := v.(*Frame)
if !ok {
return p.Marshal(v)
}
return out.payload, nil
}
func (p *proxyCodec) Unmarshal(data []byte, v interface{}) error {
dst, ok := v.(*Frame)
if !ok {
return p.Unmarshal(data, v)
}
dst.payload = data
return nil
}
// CodeWithParent 生成基于 proto 的解码器
func CodeWithParent(parent encoding.Codec) encoding.Codec {
return &proxyCodec{parent}
}
func Codec() encoding.Codec {
return CodeWithParent(protoCodec{})
}

22
gateway/main.go Normal file
View File

@ -0,0 +1,22 @@
/*
Copyright © 2021 NAME HERE <EMAIL ADDRESS>
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package main
import "gitee.com/timedb/wheatCache/gateway/cmd"
func main() {
cmd.Execute()
}

16
gateway/proxy/define.go Normal file
View File

@ -0,0 +1,16 @@
package proxy
import (
"context"
"google.golang.org/grpc"
)
type StreamDirector func(ctx context.Context, fullMethodName string) (context.Context, *grpc.ClientConn, error)
var (
clientStreamDescForProxying = &grpc.StreamDesc{
ServerStreams: true,
ClientStreams: true,
}
)

16
gateway/proxy/director.go Normal file
View File

@ -0,0 +1,16 @@
package proxy
import (
"context"
"gitee.com/timedb/wheatCache/gateway/codec"
"google.golang.org/grpc"
)
func GetDirectorByServiceHash() StreamDirector {
return func(ctx context.Context, fullMethodName string) (context.Context, *grpc.ClientConn, error) {
// TODO hash, mock 直接转发到 storage dev 上
cli, err := grpc.DialContext(ctx, "127.0.0.1:5890", grpc.WithInsecure(), grpc.WithDefaultCallOptions(grpc.ForceCodec(codec.Codec())))
return ctx, cli, err
}
}

125
gateway/proxy/proxy.go Normal file
View File

@ -0,0 +1,125 @@
package proxy
import (
"context"
"io"
wheatCodec "gitee.com/timedb/wheatCache/gateway/codec"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
)
// TransparentHandler returns a handler that attempts to proxy all requests that are not registered in the server.
// The indented use here is as a transparent proxy, where the server doesn't know about the services implemented by the
// backends. It should be used as a `grpc.UnknownServiceHandler`.
//
// This can *only* be used if the `server` also uses grpcproxy.CodecForServer() ServerOption.
func TransparentHandler(director StreamDirector) grpc.StreamHandler {
streamer := &handler{director}
return streamer.handler
}
type handler struct {
director StreamDirector
}
// handler is where the real magic of proxying happens.
// It is invoked like any gRPC server stream and uses the gRPC server framing to get and receive bytes from the wire,
// forwarding it to a ClientStream established against the relevant ClientConn.
func (s *handler) handler(srv interface{}, serverStream grpc.ServerStream) error {
fullMethodName, ok := grpc.MethodFromServerStream(serverStream)
if !ok {
return grpc.Errorf(codes.Internal, "lowLevelServerStream not exists in context")
}
outgoingCtx, backendConn, err := s.director(serverStream.Context(), fullMethodName)
if err != nil {
return err
}
clientCtx, clientCancel := context.WithCancel(outgoingCtx)
defer clientCancel()
clientStream, err := grpc.NewClientStream(clientCtx, clientStreamDescForProxying, backendConn, fullMethodName)
if err != nil {
return err
}
s2cErrChan := s.forwardServerToClient(serverStream, clientStream)
c2sErrChan := s.forwardClientToServer(clientStream, serverStream)
for i := 0; i < 2; i++ {
select {
case s2cErr := <-s2cErrChan:
if s2cErr == io.EOF {
// 客户端流发送完毕正常关闭结束, Proxy 关闭对 Backend 的连接
clientStream.CloseSend()
break
}
clientCancel()
return grpc.Errorf(codes.Internal, "failed proxying s2c: %v", s2cErr)
case c2sErr := <-c2sErrChan:
// 服务的没用在提供数据触发这个分支
serverStream.SetTrailer(clientStream.Trailer())
if c2sErr != io.EOF {
return c2sErr
}
return nil
}
}
return grpc.Errorf(codes.Internal, "gRPC proxying should never reach this stage.")
}
func (s *handler) forwardClientToServer(src grpc.ClientStream, dst grpc.ServerStream) chan error {
ret := make(chan error, 1)
go func() {
f := &wheatCodec.Frame{}
for i := 0; ; i++ {
if err := src.RecvMsg(f); err != nil {
ret <- err // this can be io.EOF which is happy case
break
}
if i == 0 {
// This is a bit of a hack, but client to server headers are only readable after first client msg is
// received but must be written to server stream before the first msg is flushed.
// This is the only place to do it nicely.
md, err := src.Header()
if err != nil {
ret <- err
break
}
if err := dst.SendHeader(md); err != nil {
ret <- err
break
}
}
if err := dst.SendMsg(f); err != nil {
ret <- err
break
}
}
}()
return ret
}
func (s *handler) forwardServerToClient(src grpc.ServerStream, dst grpc.ClientStream) chan error {
ret := make(chan error, 1)
go func() {
f := &wheatCodec.Frame{}
for i := 0; ; i++ {
if err := src.RecvMsg(f); err != nil {
ret <- err // this can be io.EOF which is happy case
break
}
if err := dst.SendMsg(f); err != nil {
ret <- err
break
}
}
}()
return ret
}

View File

@ -7,19 +7,28 @@ dcgen:
@python3 ./shell/proto.py
@python3 ./shell/make-struct.py
.PHONY : build
build:
.PHONY: build-storage
build-storage:
@cd storage && go build -o $(BASE_OUT)/storage
.PHONY: build-gateway
build-gateway:
@cd gateway && go build -o $(BASE_OUT)/gateway
.PHONY: install
install:
@make gen-middleware
@make build
@make build-storage
@make build-gateway
.PHONY: dev
dev:
.PHONY: storage
storage:
@./bin/storage storage
.PHONY: gateway
gateway:
@./bin/gateway gateway
.PHONY: gen-struct
gen-struct:
@python3 ./shell/make-struct.py

29
pkg/util/server/os.go Normal file
View File

@ -0,0 +1,29 @@
package server
import (
"os"
"os/signal"
"syscall"
"gitee.com/timedb/wheatCache/pkg/logx"
"google.golang.org/grpc"
)
func ElegantExitServer(s *grpc.Server) {
c := make(chan os.Signal)
signal.Notify(c, syscall.SIGHUP, syscall.SIGINT)
go func() {
select {
case <-c:
s.Stop()
msg := `
|-------Wheat tools---------|
| see you next time |
|thank you for your efforts |
|---------------------------|
`
logx.Infoln(msg)
}
}()
}

View File

@ -3,15 +3,15 @@ package cmd
import (
"fmt"
"gitee.com/timedb/wheatCache/storage/server/single"
"log"
"net"
"os"
"os/signal"
"syscall"
"gitee.com/timedb/wheatCache/storage/server/single"
_ "gitee.com/timedb/wheatCache/conf"
"gitee.com/timedb/wheatCache/pkg/logx"
"gitee.com/timedb/wheatCache/pkg/proto"
"gitee.com/timedb/wheatCache/pkg/util/server"
"github.com/spf13/cobra"
"github.com/spf13/viper"
"google.golang.org/grpc"
@ -25,7 +25,6 @@ var rootCmd = &cobra.Command{
Long: `start storage server`,
Run: func(cmd *cobra.Command, args []string) {
storageServer := single.NewServer()
// 先写死, 等配置文件
conf := viper.GetStringMap("storage")
host := conf["host"].(string)
port := conf["port"].(int)
@ -37,31 +36,17 @@ var rootCmd = &cobra.Command{
listen, err := net.ListenTCP("tcp", tcpAddr)
if err != nil {
log.Panicln(err)
logx.Panicln(err)
}
s := grpc.NewServer()
proto.RegisterCommServerServer(s, storageServer)
reflection.Register(s)
c := make(chan os.Signal)
signal.Notify(c, syscall.SIGHUP, syscall.SIGINT)
go func() {
select {
case <-c:
s.Stop()
msg := `
|-------Wheat tools---------|
| see you next time |
|thank you for your efforts |
|---------------------------|
`
fmt.Println(msg)
}
}()
server.ElegantExitServer(s)
logx.Info("start gateway in addr: %s", tcpAddr.String())
if err := s.Serve(listen); err != nil {
log.Panicln(err)
logx.Errorln(err)
}
},
}

View File

@ -28,7 +28,7 @@ func NewServer() *serverSingle {
ser := &serverSingle{
lruCache: lruCache,
lruProduce: event.NewProduce(lruCache.GetDriver()),
timeOut: time.Duration(timeOut),
timeOut: time.Duration(timeOut) * time.Second,
dao: dao.NewDao(lruCache),
}
sysSingleServer = ser

View File

@ -0,0 +1,21 @@
package server
import (
"context"
"fmt"
"gitee.com/timedb/wheatCache/pkg/proto"
"github.com/stretchr/testify/require"
"google.golang.org/grpc"
"testing"
)
func TestServerSingle_Get(t *testing.T) {
comm, err := grpc.Dial("127.0.0.1:5890", grpc.WithInsecure())
require.NoError(t, err)
ctx := context.Background()
cli := proto.NewCommServerClient(comm)
resp, err := cli.Get(ctx, &proto.GetRequest{})
require.NoError(t, err)
fmt.Println(resp)
}