diff --git a/.gitignore b/.gitignore index 6c5c7d6..d66f960 100644 --- a/.gitignore +++ b/.gitignore @@ -17,4 +17,5 @@ # build file -/bin/storage \ No newline at end of file +/bin/storage +/bin/gateway \ No newline at end of file diff --git a/conf/wheat-cache.yaml b/conf/wheat-cache.yaml index f81e5bf..08f2c02 100644 --- a/conf/wheat-cache.yaml +++ b/conf/wheat-cache.yaml @@ -26,3 +26,9 @@ middleware-driver: plugins-control: logcontext: [ "logMiddle" ] + + +gateway: + host: '127.0.0.1' + port: 5891 + \ No newline at end of file diff --git a/doc/gateway/Cache 分布式方案-Getway.md b/doc/gateway/Cache 分布式方案-Getway.md new file mode 100644 index 0000000..cbee738 --- /dev/null +++ b/doc/gateway/Cache 分布式方案-Getway.md @@ -0,0 +1,11 @@ +### Cache 分布式方案-Getway + + + +![getway方案](https://gitee.com/timedb/img/raw/master/images/getway方案.svg) + + + +1. single 集群分布式方案中,使用 getway 方向代理客户端的 grpc 请求, 通过 hash 环实现 分布式。 +2. 集群模式中, 通过主从来 实现 cache 的备份问题,提高容灾性。 + diff --git a/gateway/cmd/gateway_test.go b/gateway/cmd/gateway_test.go new file mode 100644 index 0000000..fe25ac4 --- /dev/null +++ b/gateway/cmd/gateway_test.go @@ -0,0 +1,34 @@ +package cmd + +import ( + "context" + "fmt" + "testing" + + "gitee.com/timedb/wheatCache/pkg/proto" + "github.com/stretchr/testify/require" + "google.golang.org/grpc" +) + +func TestContextChannel(t *testing.T) { + cxt := context.Background() + _, ctxChannel := context.WithCancel(cxt) + ctxChannel() + ctxChannel() +} + +func TestGateway_Master(t *testing.T) { + comm, err := grpc.Dial("127.0.0.1:5891", grpc.WithInsecure()) + require.NoError(t, err) + ctx := context.Background() + cli := proto.NewCommServerClient(comm) + resp, err := cli.Set(ctx, &proto.SetRequest{ + Key: &proto.BaseKey{ + Key: "aa", + }, + Val: "awd", + }) + require.NoError(t, err) + fmt.Printf(resp.Result) + +} diff --git a/gateway/cmd/root.go b/gateway/cmd/root.go new file mode 100644 index 0000000..d98e590 --- /dev/null +++ b/gateway/cmd/root.go @@ -0,0 +1,60 @@ +package cmd + +import ( + "fmt" + "net" + + _ "gitee.com/timedb/wheatCache/conf" + wheatCodec "gitee.com/timedb/wheatCache/gateway/codec" + "gitee.com/timedb/wheatCache/gateway/proxy" + "gitee.com/timedb/wheatCache/pkg/logx" + "gitee.com/timedb/wheatCache/pkg/util/server" + "github.com/spf13/cobra" + "github.com/spf13/viper" + "google.golang.org/grpc" +) + +// rootCmd represents the base command when called without any subcommands +var rootCmd = &cobra.Command{ + Use: "getway", + Short: "getway", + Long: `start getway server`, + Run: func(cmd *cobra.Command, args []string) { + host := viper.GetString("gateway.host") + port := viper.GetInt("gateway.port") + + tcpAddr, err := net.ResolveTCPAddr("tcp", fmt.Sprintf("%s:%d", host, port)) + if err != nil { + logx.Panic("get gateway addr err:%v", err) + } + listen, err := net.ListenTCP("tcp", tcpAddr) + if err != nil { + logx.Panic("get gateway tcp conn err:%v", err) + } + + gatewayServer := GetGatewayServer() + server.ElegantExitServer(gatewayServer) + + logx.Info("start gateway in addr: %s", tcpAddr.String()) + if err := gatewayServer.Serve(listen); err != nil { + logx.Errorln(err) + } + }, +} + +// Execute adds all child commands to the root command and sets flags appropriately. +// This is called by main.main(). It only needs to happen once to the rootCmd. +func Execute() { + cobra.CheckErr(rootCmd.Execute()) +} + +func GetGatewayServer() *grpc.Server { + opts := make([]grpc.ServerOption, 0) + opts = append( + opts, + grpc.ForceServerCodec(wheatCodec.Codec()), + grpc.UnknownServiceHandler(proxy.TransparentHandler(proxy.GetDirectorByServiceHash())), + ) + + return grpc.NewServer(opts...) +} diff --git a/gateway/codec/codce.go b/gateway/codec/codce.go new file mode 100644 index 0000000..f8ce557 --- /dev/null +++ b/gateway/codec/codce.go @@ -0,0 +1,60 @@ +package codec + +import ( + "google.golang.org/grpc/encoding" + "google.golang.org/protobuf/proto" +) + +// protoCodec 用于 gateway 解析全部的 grpc 类型的消息 +type protoCodec struct{} + +func (protoCodec) Name() string { + return "wheat-cache-proto" +} + +func (protoCodec) Marshal(v interface{}) ([]byte, error) { + return proto.Marshal(v.(proto.Message)) +} + +func (protoCodec) Unmarshal(data []byte, v interface{}) error { + return proto.Unmarshal(data, v.(proto.Message)) +} + +type Frame struct { + payload []byte +} + +type proxyCodec struct { + baseCodec encoding.Codec +} + +func (p *proxyCodec) Name() string { + return "wheat-cache-proxy" +} + +func (p *proxyCodec) Marshal(v interface{}) ([]byte, error) { + out, ok := v.(*Frame) + if !ok { + return p.Marshal(v) + } + return out.payload, nil +} + +func (p *proxyCodec) Unmarshal(data []byte, v interface{}) error { + dst, ok := v.(*Frame) + if !ok { + return p.Unmarshal(data, v) + } + + dst.payload = data + return nil +} + +// CodeWithParent 生成基于 proto 的解码器 +func CodeWithParent(parent encoding.Codec) encoding.Codec { + return &proxyCodec{parent} +} + +func Codec() encoding.Codec { + return CodeWithParent(protoCodec{}) +} diff --git a/gateway/main.go b/gateway/main.go new file mode 100644 index 0000000..f77a8ee --- /dev/null +++ b/gateway/main.go @@ -0,0 +1,22 @@ +/* +Copyright © 2021 NAME HERE + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +package main + +import "gitee.com/timedb/wheatCache/gateway/cmd" + +func main() { + cmd.Execute() +} diff --git a/gateway/proxy/define.go b/gateway/proxy/define.go new file mode 100644 index 0000000..0167d65 --- /dev/null +++ b/gateway/proxy/define.go @@ -0,0 +1,16 @@ +package proxy + +import ( + "context" + + "google.golang.org/grpc" +) + +type StreamDirector func(ctx context.Context, fullMethodName string) (context.Context, *grpc.ClientConn, error) + +var ( + clientStreamDescForProxying = &grpc.StreamDesc{ + ServerStreams: true, + ClientStreams: true, + } +) diff --git a/gateway/proxy/director.go b/gateway/proxy/director.go new file mode 100644 index 0000000..3379e17 --- /dev/null +++ b/gateway/proxy/director.go @@ -0,0 +1,16 @@ +package proxy + +import ( + "context" + + "gitee.com/timedb/wheatCache/gateway/codec" + "google.golang.org/grpc" +) + +func GetDirectorByServiceHash() StreamDirector { + return func(ctx context.Context, fullMethodName string) (context.Context, *grpc.ClientConn, error) { + // TODO hash, mock 直接转发到 storage dev 上 + cli, err := grpc.DialContext(ctx, "127.0.0.1:5890", grpc.WithInsecure(), grpc.WithDefaultCallOptions(grpc.ForceCodec(codec.Codec()))) + return ctx, cli, err + } +} diff --git a/gateway/proxy/proxy.go b/gateway/proxy/proxy.go new file mode 100644 index 0000000..df475d5 --- /dev/null +++ b/gateway/proxy/proxy.go @@ -0,0 +1,125 @@ +package proxy + +import ( + "context" + "io" + + wheatCodec "gitee.com/timedb/wheatCache/gateway/codec" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" +) + +// TransparentHandler returns a handler that attempts to proxy all requests that are not registered in the server. +// The indented use here is as a transparent proxy, where the server doesn't know about the services implemented by the +// backends. It should be used as a `grpc.UnknownServiceHandler`. +// +// This can *only* be used if the `server` also uses grpcproxy.CodecForServer() ServerOption. +func TransparentHandler(director StreamDirector) grpc.StreamHandler { + streamer := &handler{director} + return streamer.handler +} + +type handler struct { + director StreamDirector +} + +// handler is where the real magic of proxying happens. +// It is invoked like any gRPC server stream and uses the gRPC server framing to get and receive bytes from the wire, +// forwarding it to a ClientStream established against the relevant ClientConn. +func (s *handler) handler(srv interface{}, serverStream grpc.ServerStream) error { + fullMethodName, ok := grpc.MethodFromServerStream(serverStream) + if !ok { + return grpc.Errorf(codes.Internal, "lowLevelServerStream not exists in context") + } + + outgoingCtx, backendConn, err := s.director(serverStream.Context(), fullMethodName) + if err != nil { + return err + } + + clientCtx, clientCancel := context.WithCancel(outgoingCtx) + defer clientCancel() + + clientStream, err := grpc.NewClientStream(clientCtx, clientStreamDescForProxying, backendConn, fullMethodName) + if err != nil { + return err + } + + s2cErrChan := s.forwardServerToClient(serverStream, clientStream) + c2sErrChan := s.forwardClientToServer(clientStream, serverStream) + + for i := 0; i < 2; i++ { + select { + case s2cErr := <-s2cErrChan: + if s2cErr == io.EOF { + // 客户端流发送完毕正常关闭结束, Proxy 关闭对 Backend 的连接 + clientStream.CloseSend() + break + } + + clientCancel() + return grpc.Errorf(codes.Internal, "failed proxying s2c: %v", s2cErr) + case c2sErr := <-c2sErrChan: + // 服务的没用在提供数据触发这个分支 + serverStream.SetTrailer(clientStream.Trailer()) + + if c2sErr != io.EOF { + return c2sErr + } + + return nil + } + } + + return grpc.Errorf(codes.Internal, "gRPC proxying should never reach this stage.") +} + +func (s *handler) forwardClientToServer(src grpc.ClientStream, dst grpc.ServerStream) chan error { + ret := make(chan error, 1) + go func() { + f := &wheatCodec.Frame{} + for i := 0; ; i++ { + if err := src.RecvMsg(f); err != nil { + ret <- err // this can be io.EOF which is happy case + break + } + if i == 0 { + // This is a bit of a hack, but client to server headers are only readable after first client msg is + // received but must be written to server stream before the first msg is flushed. + // This is the only place to do it nicely. + md, err := src.Header() + if err != nil { + ret <- err + break + } + if err := dst.SendHeader(md); err != nil { + ret <- err + break + } + } + if err := dst.SendMsg(f); err != nil { + ret <- err + break + } + } + }() + return ret +} + +func (s *handler) forwardServerToClient(src grpc.ServerStream, dst grpc.ClientStream) chan error { + ret := make(chan error, 1) + go func() { + f := &wheatCodec.Frame{} + for i := 0; ; i++ { + if err := src.RecvMsg(f); err != nil { + ret <- err // this can be io.EOF which is happy case + break + } + if err := dst.SendMsg(f); err != nil { + ret <- err + break + } + } + }() + return ret +} diff --git a/makefile b/makefile index 7558082..e790a1f 100644 --- a/makefile +++ b/makefile @@ -7,19 +7,28 @@ dcgen: @python3 ./shell/proto.py @python3 ./shell/make-struct.py -.PHONY : build -build: +.PHONY: build-storage +build-storage: @cd storage && go build -o $(BASE_OUT)/storage +.PHONY: build-gateway +build-gateway: + @cd gateway && go build -o $(BASE_OUT)/gateway + .PHONY: install install: @make gen-middleware - @make build + @make build-storage + @make build-gateway -.PHONY: dev -dev: +.PHONY: storage +storage: @./bin/storage storage +.PHONY: gateway +gateway: + @./bin/gateway gateway + .PHONY: gen-struct gen-struct: @python3 ./shell/make-struct.py diff --git a/pkg/util/server/os.go b/pkg/util/server/os.go new file mode 100644 index 0000000..20f368c --- /dev/null +++ b/pkg/util/server/os.go @@ -0,0 +1,29 @@ +package server + +import ( + "os" + "os/signal" + "syscall" + + "gitee.com/timedb/wheatCache/pkg/logx" + "google.golang.org/grpc" +) + +func ElegantExitServer(s *grpc.Server) { + c := make(chan os.Signal) + signal.Notify(c, syscall.SIGHUP, syscall.SIGINT) + go func() { + select { + case <-c: + s.Stop() + + msg := ` + |-------Wheat tools---------| + | see you next time | + |thank you for your efforts | + |---------------------------| + ` + logx.Infoln(msg) + } + }() +} diff --git a/storage/cmd/root.go b/storage/cmd/root.go index e011f95..22424a4 100644 --- a/storage/cmd/root.go +++ b/storage/cmd/root.go @@ -3,15 +3,15 @@ package cmd import ( "fmt" - "gitee.com/timedb/wheatCache/storage/server/single" "log" "net" - "os" - "os/signal" - "syscall" + + "gitee.com/timedb/wheatCache/storage/server/single" _ "gitee.com/timedb/wheatCache/conf" + "gitee.com/timedb/wheatCache/pkg/logx" "gitee.com/timedb/wheatCache/pkg/proto" + "gitee.com/timedb/wheatCache/pkg/util/server" "github.com/spf13/cobra" "github.com/spf13/viper" "google.golang.org/grpc" @@ -25,7 +25,6 @@ var rootCmd = &cobra.Command{ Long: `start storage server`, Run: func(cmd *cobra.Command, args []string) { storageServer := single.NewServer() - // 先写死, 等配置文件 conf := viper.GetStringMap("storage") host := conf["host"].(string) port := conf["port"].(int) @@ -37,31 +36,17 @@ var rootCmd = &cobra.Command{ listen, err := net.ListenTCP("tcp", tcpAddr) if err != nil { - log.Panicln(err) + logx.Panicln(err) } s := grpc.NewServer() proto.RegisterCommServerServer(s, storageServer) reflection.Register(s) - c := make(chan os.Signal) - signal.Notify(c, syscall.SIGHUP, syscall.SIGINT) - go func() { - select { - case <-c: - s.Stop() - - msg := ` - |-------Wheat tools---------| - | see you next time | - |thank you for your efforts | - |---------------------------| - ` - fmt.Println(msg) - } - }() + server.ElegantExitServer(s) + logx.Info("start gateway in addr: %s", tcpAddr.String()) if err := s.Serve(listen); err != nil { - log.Panicln(err) + logx.Errorln(err) } }, } diff --git a/storage/server/single/single.go b/storage/server/single/single.go index 13a6f18..8d1e4de 100644 --- a/storage/server/single/single.go +++ b/storage/server/single/single.go @@ -28,7 +28,7 @@ func NewServer() *serverSingle { ser := &serverSingle{ lruCache: lruCache, lruProduce: event.NewProduce(lruCache.GetDriver()), - timeOut: time.Duration(timeOut), + timeOut: time.Duration(timeOut) * time.Second, dao: dao.NewDao(lruCache), } sysSingleServer = ser diff --git a/storage/server/single_test.go b/storage/server/single_test.go new file mode 100644 index 0000000..80ef1ee --- /dev/null +++ b/storage/server/single_test.go @@ -0,0 +1,21 @@ +package server + +import ( + "context" + "fmt" + "gitee.com/timedb/wheatCache/pkg/proto" + "github.com/stretchr/testify/require" + "google.golang.org/grpc" + "testing" +) + +func TestServerSingle_Get(t *testing.T) { + comm, err := grpc.Dial("127.0.0.1:5890", grpc.WithInsecure()) + require.NoError(t, err) + + ctx := context.Background() + cli := proto.NewCommServerClient(comm) + resp, err := cli.Get(ctx, &proto.GetRequest{}) + require.NoError(t, err) + fmt.Println(resp) +}