From f7d4cee1028261d78c594028e69b9390ab930bba Mon Sep 17 00:00:00 2001 From: bandl <1658002533@qq.com> Date: Wed, 29 Sep 2021 21:43:59 +0800 Subject: [PATCH 01/18] feat(getway): init getway --- getway/cmd/root.go | 19 +++++++++++++++++++ getway/main.go | 22 ++++++++++++++++++++++ 2 files changed, 41 insertions(+) create mode 100644 getway/cmd/root.go create mode 100644 getway/main.go diff --git a/getway/cmd/root.go b/getway/cmd/root.go new file mode 100644 index 0000000..f81b95b --- /dev/null +++ b/getway/cmd/root.go @@ -0,0 +1,19 @@ +package cmd + +import ( + "github.com/spf13/cobra" +) + +// rootCmd represents the base command when called without any subcommands +var rootCmd = &cobra.Command{ + Use: "getway", + Short: "getway", + Long: `start getway server`, + Run: func(cmd *cobra.Command, args []string) {}, +} + +// Execute adds all child commands to the root command and sets flags appropriately. +// This is called by main.main(). It only needs to happen once to the rootCmd. +func Execute() { + cobra.CheckErr(rootCmd.Execute()) +} diff --git a/getway/main.go b/getway/main.go new file mode 100644 index 0000000..016a89a --- /dev/null +++ b/getway/main.go @@ -0,0 +1,22 @@ +/* +Copyright © 2021 NAME HERE + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +package main + +import "gitee.com/timedb/wheatCache/getway/cmd" + +func main() { + cmd.Execute() +} From 5934e88e03c0ee8071ab6caaa4915f37c3dfb59a Mon Sep 17 00:00:00 2001 From: bandl <1658002533@qq.com> Date: Sat, 2 Oct 2021 22:13:23 +0800 Subject: [PATCH 02/18] test(storage): add single server test --- storage/server/single_test.go | 22 ++++++++++++++++++++++ 1 file changed, 22 insertions(+) create mode 100644 storage/server/single_test.go diff --git a/storage/server/single_test.go b/storage/server/single_test.go new file mode 100644 index 0000000..7c098fb --- /dev/null +++ b/storage/server/single_test.go @@ -0,0 +1,22 @@ +package server + +import ( + "context" + "fmt" + "gitee.com/timedb/wheatCache/pkg/proto" + "github.com/stretchr/testify/require" + "google.golang.org/grpc" + "testing" +) + +func TestServerSingle_Get(t *testing.T) { + comm, err := grpc.Dial("127.0.0.1:5890", grpc.WithInsecure()) + require.NoError(t, err) + + ctx := context.Background() + cli := proto.NewCommServerClient(comm) + resp, err := cli.Get(ctx, &proto.GetRequest{ + }) + require.NoError(t, err) + fmt.Println(resp) +} From ccf71797131960be2adf77a4efa8db913d0c3f55 Mon Sep 17 00:00:00 2001 From: bandl <1658002533@qq.com> Date: Sat, 2 Oct 2021 22:13:42 +0800 Subject: [PATCH 03/18] doc(getway): add getway doc --- doc/getway/Cache 分布式方案-Getway.md | 11 +++++++++++ 1 file changed, 11 insertions(+) create mode 100644 doc/getway/Cache 分布式方案-Getway.md diff --git a/doc/getway/Cache 分布式方案-Getway.md b/doc/getway/Cache 分布式方案-Getway.md new file mode 100644 index 0000000..cbee738 --- /dev/null +++ b/doc/getway/Cache 分布式方案-Getway.md @@ -0,0 +1,11 @@ +### Cache 分布式方案-Getway + + + +![getway方案](https://gitee.com/timedb/img/raw/master/images/getway方案.svg) + + + +1. single 集群分布式方案中,使用 getway 方向代理客户端的 grpc 请求, 通过 hash 环实现 分布式。 +2. 集群模式中, 通过主从来 实现 cache 的备份问题,提高容灾性。 + From 101059126c7fb7864fa6e1f66fd22c625cb42d3a Mon Sep 17 00:00:00 2001 From: bandl <1658002533@qq.com> Date: Wed, 6 Oct 2021 16:51:06 +0800 Subject: [PATCH 04/18] fix(single-server): fix timeOut --- storage/server/single/single.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/storage/server/single/single.go b/storage/server/single/single.go index 13a6f18..8d1e4de 100644 --- a/storage/server/single/single.go +++ b/storage/server/single/single.go @@ -28,7 +28,7 @@ func NewServer() *serverSingle { ser := &serverSingle{ lruCache: lruCache, lruProduce: event.NewProduce(lruCache.GetDriver()), - timeOut: time.Duration(timeOut), + timeOut: time.Duration(timeOut) * time.Second, dao: dao.NewDao(lruCache), } sysSingleServer = ser From 554f7feef9b5a5949bcf55dc09fed8b61c2b8bea Mon Sep 17 00:00:00 2001 From: bandl <1658002533@qq.com> Date: Wed, 6 Oct 2021 20:34:47 +0800 Subject: [PATCH 05/18] chore(proto): update proto --- storage/server/single_test.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/storage/server/single_test.go b/storage/server/single_test.go index 7c098fb..80ef1ee 100644 --- a/storage/server/single_test.go +++ b/storage/server/single_test.go @@ -15,8 +15,7 @@ func TestServerSingle_Get(t *testing.T) { ctx := context.Background() cli := proto.NewCommServerClient(comm) - resp, err := cli.Get(ctx, &proto.GetRequest{ - }) + resp, err := cli.Get(ctx, &proto.GetRequest{}) require.NoError(t, err) fmt.Println(resp) } From 57a655a7085d41af4a8ea3b77d5bce2ec2d5d4cc Mon Sep 17 00:00:00 2001 From: bandl <1658002533@qq.com> Date: Fri, 15 Oct 2021 22:35:16 +0800 Subject: [PATCH 06/18] feat(storage): update conf --- storage/cmd/root.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/storage/cmd/root.go b/storage/cmd/root.go index e011f95..b4c00f4 100644 --- a/storage/cmd/root.go +++ b/storage/cmd/root.go @@ -3,14 +3,16 @@ package cmd import ( "fmt" - "gitee.com/timedb/wheatCache/storage/server/single" "log" "net" "os" "os/signal" "syscall" + "gitee.com/timedb/wheatCache/storage/server/single" + _ "gitee.com/timedb/wheatCache/conf" + "gitee.com/timedb/wheatCache/pkg/logx" "gitee.com/timedb/wheatCache/pkg/proto" "github.com/spf13/cobra" "github.com/spf13/viper" @@ -25,7 +27,6 @@ var rootCmd = &cobra.Command{ Long: `start storage server`, Run: func(cmd *cobra.Command, args []string) { storageServer := single.NewServer() - // 先写死, 等配置文件 conf := viper.GetStringMap("storage") host := conf["host"].(string) port := conf["port"].(int) @@ -37,7 +38,7 @@ var rootCmd = &cobra.Command{ listen, err := net.ListenTCP("tcp", tcpAddr) if err != nil { - log.Panicln(err) + logx.Panicln(err) } s := grpc.NewServer() proto.RegisterCommServerServer(s, storageServer) From 32936576d3d2d6a099a20e38709002081067fe81 Mon Sep 17 00:00:00 2001 From: bandl <1658002533@qq.com> Date: Fri, 15 Oct 2021 22:36:29 +0800 Subject: [PATCH 07/18] feat(gateway): update gateway cmd --- {getway => gateway}/cmd/root.go | 0 {getway => gateway}/main.go | 2 +- 2 files changed, 1 insertion(+), 1 deletion(-) rename {getway => gateway}/cmd/root.go (100%) rename {getway => gateway}/main.go (92%) diff --git a/getway/cmd/root.go b/gateway/cmd/root.go similarity index 100% rename from getway/cmd/root.go rename to gateway/cmd/root.go diff --git a/getway/main.go b/gateway/main.go similarity index 92% rename from getway/main.go rename to gateway/main.go index 016a89a..f77a8ee 100644 --- a/getway/main.go +++ b/gateway/main.go @@ -15,7 +15,7 @@ limitations under the License. */ package main -import "gitee.com/timedb/wheatCache/getway/cmd" +import "gitee.com/timedb/wheatCache/gateway/cmd" func main() { cmd.Execute() From cbf7ba19339a485fc44be8e9b5614125004daa23 Mon Sep 17 00:00:00 2001 From: bandl <1658002533@qq.com> Date: Fri, 15 Oct 2021 22:36:49 +0800 Subject: [PATCH 08/18] feat(gateway): add codec mode --- gateway/codec/codce.go | 60 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 60 insertions(+) create mode 100644 gateway/codec/codce.go diff --git a/gateway/codec/codce.go b/gateway/codec/codce.go new file mode 100644 index 0000000..f4c535d --- /dev/null +++ b/gateway/codec/codce.go @@ -0,0 +1,60 @@ +package codec + +import ( + "google.golang.org/grpc/encoding" + "google.golang.org/protobuf/proto" +) + +// protoCodec 用于 gateway 解析全部的 grpc 类型的消息 +type protoCodec struct{} + +func (protoCodec) Name() string { + return "wheat-cache-proto" +} + +func (protoCodec) Marshal(v interface{}) ([]byte, error) { + return proto.Marshal(v.(proto.Message)) +} + +func (protoCodec) Unmarshal(data []byte, v interface{}) error { + return proto.Unmarshal(data, v.(proto.Message)) +} + +type frame struct { + payload []byte +} + +type proxyCodec struct { + baseCodec encoding.Codec +} + +func (p *proxyCodec) Name() string { + return "wheat-cache-proxy" +} + +func (p *proxyCodec) Marshal(v interface{}) ([]byte, error) { + out, ok := v.(*frame) + if !ok { + return p.Marshal(v) + } + return out.payload, nil +} + +func (p *proxyCodec) Unmarshal(data []byte, v interface{}) error { + dst, ok := v.(*frame) + if !ok { + return p.Unmarshal(data, v) + } + + dst.payload = data + return nil +} + +// CodeWithParent 生成基于 proto 的解码器 +func CodeWithParent(parent encoding.Codec) encoding.Codec { + return &proxyCodec{parent} +} + +func Codec() encoding.Codec { + return CodeWithParent(protoCodec{}) +} From 62eaa8edd093c2fa82f55f91b9ec64433c4e09aa Mon Sep 17 00:00:00 2001 From: bandl <1658002533@qq.com> Date: Sat, 16 Oct 2021 16:53:21 +0800 Subject: [PATCH 09/18] feat(gateway-codce): update codce option --- gateway/codec/codce.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/gateway/codec/codce.go b/gateway/codec/codce.go index f4c535d..f8ce557 100644 --- a/gateway/codec/codce.go +++ b/gateway/codec/codce.go @@ -20,7 +20,7 @@ func (protoCodec) Unmarshal(data []byte, v interface{}) error { return proto.Unmarshal(data, v.(proto.Message)) } -type frame struct { +type Frame struct { payload []byte } @@ -33,7 +33,7 @@ func (p *proxyCodec) Name() string { } func (p *proxyCodec) Marshal(v interface{}) ([]byte, error) { - out, ok := v.(*frame) + out, ok := v.(*Frame) if !ok { return p.Marshal(v) } @@ -41,7 +41,7 @@ func (p *proxyCodec) Marshal(v interface{}) ([]byte, error) { } func (p *proxyCodec) Unmarshal(data []byte, v interface{}) error { - dst, ok := v.(*frame) + dst, ok := v.(*Frame) if !ok { return p.Unmarshal(data, v) } From 2c854deb5273f4af2221dc96fb36f90f4dec2647 Mon Sep 17 00:00:00 2001 From: bandl <1658002533@qq.com> Date: Sat, 16 Oct 2021 16:54:35 +0800 Subject: [PATCH 10/18] feat(gateway): add a forwarding rules to gateway --- gateway/proxy/define.go | 16 +++++ gateway/proxy/director.go | 16 +++++ gateway/proxy/proxy.go | 125 ++++++++++++++++++++++++++++++++++++++ 3 files changed, 157 insertions(+) create mode 100644 gateway/proxy/define.go create mode 100644 gateway/proxy/director.go create mode 100644 gateway/proxy/proxy.go diff --git a/gateway/proxy/define.go b/gateway/proxy/define.go new file mode 100644 index 0000000..0167d65 --- /dev/null +++ b/gateway/proxy/define.go @@ -0,0 +1,16 @@ +package proxy + +import ( + "context" + + "google.golang.org/grpc" +) + +type StreamDirector func(ctx context.Context, fullMethodName string) (context.Context, *grpc.ClientConn, error) + +var ( + clientStreamDescForProxying = &grpc.StreamDesc{ + ServerStreams: true, + ClientStreams: true, + } +) diff --git a/gateway/proxy/director.go b/gateway/proxy/director.go new file mode 100644 index 0000000..9a99dab --- /dev/null +++ b/gateway/proxy/director.go @@ -0,0 +1,16 @@ +package proxy + +import ( + "context" + + "gitee.com/timedb/wheatCache/gateway/codec" + "google.golang.org/grpc" +) + +func GetDirectorByServiceHash() StreamDirector { + return func(ctx context.Context, fullMethodName string) (context.Context, *grpc.ClientConn, error) { + // TODO hash, mock 直接转发到 storage dev 上 + cli, err := grpc.DialContext(ctx, "127.0.0.1:5890", grpc.WithInsecure(), grpc.ForceCodec(codec.Codec())) + return ctx, cli, err + } +} diff --git a/gateway/proxy/proxy.go b/gateway/proxy/proxy.go new file mode 100644 index 0000000..df475d5 --- /dev/null +++ b/gateway/proxy/proxy.go @@ -0,0 +1,125 @@ +package proxy + +import ( + "context" + "io" + + wheatCodec "gitee.com/timedb/wheatCache/gateway/codec" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" +) + +// TransparentHandler returns a handler that attempts to proxy all requests that are not registered in the server. +// The indented use here is as a transparent proxy, where the server doesn't know about the services implemented by the +// backends. It should be used as a `grpc.UnknownServiceHandler`. +// +// This can *only* be used if the `server` also uses grpcproxy.CodecForServer() ServerOption. +func TransparentHandler(director StreamDirector) grpc.StreamHandler { + streamer := &handler{director} + return streamer.handler +} + +type handler struct { + director StreamDirector +} + +// handler is where the real magic of proxying happens. +// It is invoked like any gRPC server stream and uses the gRPC server framing to get and receive bytes from the wire, +// forwarding it to a ClientStream established against the relevant ClientConn. +func (s *handler) handler(srv interface{}, serverStream grpc.ServerStream) error { + fullMethodName, ok := grpc.MethodFromServerStream(serverStream) + if !ok { + return grpc.Errorf(codes.Internal, "lowLevelServerStream not exists in context") + } + + outgoingCtx, backendConn, err := s.director(serverStream.Context(), fullMethodName) + if err != nil { + return err + } + + clientCtx, clientCancel := context.WithCancel(outgoingCtx) + defer clientCancel() + + clientStream, err := grpc.NewClientStream(clientCtx, clientStreamDescForProxying, backendConn, fullMethodName) + if err != nil { + return err + } + + s2cErrChan := s.forwardServerToClient(serverStream, clientStream) + c2sErrChan := s.forwardClientToServer(clientStream, serverStream) + + for i := 0; i < 2; i++ { + select { + case s2cErr := <-s2cErrChan: + if s2cErr == io.EOF { + // 客户端流发送完毕正常关闭结束, Proxy 关闭对 Backend 的连接 + clientStream.CloseSend() + break + } + + clientCancel() + return grpc.Errorf(codes.Internal, "failed proxying s2c: %v", s2cErr) + case c2sErr := <-c2sErrChan: + // 服务的没用在提供数据触发这个分支 + serverStream.SetTrailer(clientStream.Trailer()) + + if c2sErr != io.EOF { + return c2sErr + } + + return nil + } + } + + return grpc.Errorf(codes.Internal, "gRPC proxying should never reach this stage.") +} + +func (s *handler) forwardClientToServer(src grpc.ClientStream, dst grpc.ServerStream) chan error { + ret := make(chan error, 1) + go func() { + f := &wheatCodec.Frame{} + for i := 0; ; i++ { + if err := src.RecvMsg(f); err != nil { + ret <- err // this can be io.EOF which is happy case + break + } + if i == 0 { + // This is a bit of a hack, but client to server headers are only readable after first client msg is + // received but must be written to server stream before the first msg is flushed. + // This is the only place to do it nicely. + md, err := src.Header() + if err != nil { + ret <- err + break + } + if err := dst.SendHeader(md); err != nil { + ret <- err + break + } + } + if err := dst.SendMsg(f); err != nil { + ret <- err + break + } + } + }() + return ret +} + +func (s *handler) forwardServerToClient(src grpc.ServerStream, dst grpc.ClientStream) chan error { + ret := make(chan error, 1) + go func() { + f := &wheatCodec.Frame{} + for i := 0; ; i++ { + if err := src.RecvMsg(f); err != nil { + ret <- err // this can be io.EOF which is happy case + break + } + if err := dst.SendMsg(f); err != nil { + ret <- err + break + } + } + }() + return ret +} From 4e78d08e2ccdca9b3604f4ff83b82ba63d4271f3 Mon Sep 17 00:00:00 2001 From: bandl <1658002533@qq.com> Date: Sat, 16 Oct 2021 18:29:31 +0800 Subject: [PATCH 11/18] feat(conf): addgateway conf --- conf/wheat-cache.yaml | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/conf/wheat-cache.yaml b/conf/wheat-cache.yaml index f81e5bf..08f2c02 100644 --- a/conf/wheat-cache.yaml +++ b/conf/wheat-cache.yaml @@ -26,3 +26,9 @@ middleware-driver: plugins-control: logcontext: [ "logMiddle" ] + + +gateway: + host: '127.0.0.1' + port: 5891 + \ No newline at end of file From ec8a1265814c6a6ae323950cf934bb69b3ae5db8 Mon Sep 17 00:00:00 2001 From: bandl <1658002533@qq.com> Date: Sat, 16 Oct 2021 20:44:27 +0800 Subject: [PATCH 12/18] doc(feat-gateway-doc): add gateway doc --- doc/{getway => gateway}/Cache 分布式方案-Getway.md | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename doc/{getway => gateway}/Cache 分布式方案-Getway.md (100%) diff --git a/doc/getway/Cache 分布式方案-Getway.md b/doc/gateway/Cache 分布式方案-Getway.md similarity index 100% rename from doc/getway/Cache 分布式方案-Getway.md rename to doc/gateway/Cache 分布式方案-Getway.md From 3adeac539b3e2dccb079499a3650ac7de2cc0fea Mon Sep 17 00:00:00 2001 From: bandl <1658002533@qq.com> Date: Sat, 16 Oct 2021 22:50:38 +0800 Subject: [PATCH 13/18] chore(conf): update conf and makefule --- .gitignore | 3 ++- makefile | 19 ++++++++++++++----- 2 files changed, 16 insertions(+), 6 deletions(-) diff --git a/.gitignore b/.gitignore index 6c5c7d6..d66f960 100644 --- a/.gitignore +++ b/.gitignore @@ -17,4 +17,5 @@ # build file -/bin/storage \ No newline at end of file +/bin/storage +/bin/gateway \ No newline at end of file diff --git a/makefile b/makefile index 7558082..e790a1f 100644 --- a/makefile +++ b/makefile @@ -7,19 +7,28 @@ dcgen: @python3 ./shell/proto.py @python3 ./shell/make-struct.py -.PHONY : build -build: +.PHONY: build-storage +build-storage: @cd storage && go build -o $(BASE_OUT)/storage +.PHONY: build-gateway +build-gateway: + @cd gateway && go build -o $(BASE_OUT)/gateway + .PHONY: install install: @make gen-middleware - @make build + @make build-storage + @make build-gateway -.PHONY: dev -dev: +.PHONY: storage +storage: @./bin/storage storage +.PHONY: gateway +gateway: + @./bin/gateway gateway + .PHONY: gen-struct gen-struct: @python3 ./shell/make-struct.py From 40374333fa73a0c054eba86fea176a29d59927a3 Mon Sep 17 00:00:00 2001 From: bandl <1658002533@qq.com> Date: Sat, 16 Oct 2021 22:51:28 +0800 Subject: [PATCH 14/18] feat(uitl): grace exit --- pkg/util/server/os.go | 29 +++++++++++++++++++++++++++++ 1 file changed, 29 insertions(+) create mode 100644 pkg/util/server/os.go diff --git a/pkg/util/server/os.go b/pkg/util/server/os.go new file mode 100644 index 0000000..20f368c --- /dev/null +++ b/pkg/util/server/os.go @@ -0,0 +1,29 @@ +package server + +import ( + "os" + "os/signal" + "syscall" + + "gitee.com/timedb/wheatCache/pkg/logx" + "google.golang.org/grpc" +) + +func ElegantExitServer(s *grpc.Server) { + c := make(chan os.Signal) + signal.Notify(c, syscall.SIGHUP, syscall.SIGINT) + go func() { + select { + case <-c: + s.Stop() + + msg := ` + |-------Wheat tools---------| + | see you next time | + |thank you for your efforts | + |---------------------------| + ` + logx.Infoln(msg) + } + }() +} From 52c9b166f3c5689870bbfaed96dd2b2258e2361f Mon Sep 17 00:00:00 2001 From: bandl <1658002533@qq.com> Date: Sat, 16 Oct 2021 22:51:51 +0800 Subject: [PATCH 15/18] feat(storage): update storage --- storage/cmd/root.go | 24 ++++-------------------- 1 file changed, 4 insertions(+), 20 deletions(-) diff --git a/storage/cmd/root.go b/storage/cmd/root.go index b4c00f4..22424a4 100644 --- a/storage/cmd/root.go +++ b/storage/cmd/root.go @@ -5,15 +5,13 @@ import ( "fmt" "log" "net" - "os" - "os/signal" - "syscall" "gitee.com/timedb/wheatCache/storage/server/single" _ "gitee.com/timedb/wheatCache/conf" "gitee.com/timedb/wheatCache/pkg/logx" "gitee.com/timedb/wheatCache/pkg/proto" + "gitee.com/timedb/wheatCache/pkg/util/server" "github.com/spf13/cobra" "github.com/spf13/viper" "google.golang.org/grpc" @@ -44,25 +42,11 @@ var rootCmd = &cobra.Command{ proto.RegisterCommServerServer(s, storageServer) reflection.Register(s) - c := make(chan os.Signal) - signal.Notify(c, syscall.SIGHUP, syscall.SIGINT) - go func() { - select { - case <-c: - s.Stop() - - msg := ` - |-------Wheat tools---------| - | see you next time | - |thank you for your efforts | - |---------------------------| - ` - fmt.Println(msg) - } - }() + server.ElegantExitServer(s) + logx.Info("start gateway in addr: %s", tcpAddr.String()) if err := s.Serve(listen); err != nil { - log.Panicln(err) + logx.Errorln(err) } }, } From 43392236b6391207452fe4a2fbbde9c761742cd7 Mon Sep 17 00:00:00 2001 From: bandl <1658002533@qq.com> Date: Sat, 16 Oct 2021 22:52:37 +0800 Subject: [PATCH 16/18] feat(gateway): add gateway root --- gateway/cmd/root.go | 43 ++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 42 insertions(+), 1 deletion(-) diff --git a/gateway/cmd/root.go b/gateway/cmd/root.go index f81b95b..d98e590 100644 --- a/gateway/cmd/root.go +++ b/gateway/cmd/root.go @@ -1,7 +1,17 @@ package cmd import ( + "fmt" + "net" + + _ "gitee.com/timedb/wheatCache/conf" + wheatCodec "gitee.com/timedb/wheatCache/gateway/codec" + "gitee.com/timedb/wheatCache/gateway/proxy" + "gitee.com/timedb/wheatCache/pkg/logx" + "gitee.com/timedb/wheatCache/pkg/util/server" "github.com/spf13/cobra" + "github.com/spf13/viper" + "google.golang.org/grpc" ) // rootCmd represents the base command when called without any subcommands @@ -9,7 +19,27 @@ var rootCmd = &cobra.Command{ Use: "getway", Short: "getway", Long: `start getway server`, - Run: func(cmd *cobra.Command, args []string) {}, + Run: func(cmd *cobra.Command, args []string) { + host := viper.GetString("gateway.host") + port := viper.GetInt("gateway.port") + + tcpAddr, err := net.ResolveTCPAddr("tcp", fmt.Sprintf("%s:%d", host, port)) + if err != nil { + logx.Panic("get gateway addr err:%v", err) + } + listen, err := net.ListenTCP("tcp", tcpAddr) + if err != nil { + logx.Panic("get gateway tcp conn err:%v", err) + } + + gatewayServer := GetGatewayServer() + server.ElegantExitServer(gatewayServer) + + logx.Info("start gateway in addr: %s", tcpAddr.String()) + if err := gatewayServer.Serve(listen); err != nil { + logx.Errorln(err) + } + }, } // Execute adds all child commands to the root command and sets flags appropriately. @@ -17,3 +47,14 @@ var rootCmd = &cobra.Command{ func Execute() { cobra.CheckErr(rootCmd.Execute()) } + +func GetGatewayServer() *grpc.Server { + opts := make([]grpc.ServerOption, 0) + opts = append( + opts, + grpc.ForceServerCodec(wheatCodec.Codec()), + grpc.UnknownServiceHandler(proxy.TransparentHandler(proxy.GetDirectorByServiceHash())), + ) + + return grpc.NewServer(opts...) +} From ebdc5bd5b05063b8d16114ecbb812a5d7a5e84c2 Mon Sep 17 00:00:00 2001 From: bandl <1658002533@qq.com> Date: Sat, 16 Oct 2021 22:53:11 +0800 Subject: [PATCH 17/18] feat(gateway-reset): add director mock --- gateway/proxy/director.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gateway/proxy/director.go b/gateway/proxy/director.go index 9a99dab..3379e17 100644 --- a/gateway/proxy/director.go +++ b/gateway/proxy/director.go @@ -10,7 +10,7 @@ import ( func GetDirectorByServiceHash() StreamDirector { return func(ctx context.Context, fullMethodName string) (context.Context, *grpc.ClientConn, error) { // TODO hash, mock 直接转发到 storage dev 上 - cli, err := grpc.DialContext(ctx, "127.0.0.1:5890", grpc.WithInsecure(), grpc.ForceCodec(codec.Codec())) + cli, err := grpc.DialContext(ctx, "127.0.0.1:5890", grpc.WithInsecure(), grpc.WithDefaultCallOptions(grpc.ForceCodec(codec.Codec()))) return ctx, cli, err } } From 91d93bf5be5766128bc3ce3be82cb35be457b25c Mon Sep 17 00:00:00 2001 From: bandl <1658002533@qq.com> Date: Sat, 16 Oct 2021 22:53:27 +0800 Subject: [PATCH 18/18] test(gateway): add gateway test --- gateway/cmd/gateway_test.go | 34 ++++++++++++++++++++++++++++++++++ 1 file changed, 34 insertions(+) create mode 100644 gateway/cmd/gateway_test.go diff --git a/gateway/cmd/gateway_test.go b/gateway/cmd/gateway_test.go new file mode 100644 index 0000000..fe25ac4 --- /dev/null +++ b/gateway/cmd/gateway_test.go @@ -0,0 +1,34 @@ +package cmd + +import ( + "context" + "fmt" + "testing" + + "gitee.com/timedb/wheatCache/pkg/proto" + "github.com/stretchr/testify/require" + "google.golang.org/grpc" +) + +func TestContextChannel(t *testing.T) { + cxt := context.Background() + _, ctxChannel := context.WithCancel(cxt) + ctxChannel() + ctxChannel() +} + +func TestGateway_Master(t *testing.T) { + comm, err := grpc.Dial("127.0.0.1:5891", grpc.WithInsecure()) + require.NoError(t, err) + ctx := context.Background() + cli := proto.NewCommServerClient(comm) + resp, err := cli.Set(ctx, &proto.SetRequest{ + Key: &proto.BaseKey{ + Key: "aa", + }, + Val: "awd", + }) + require.NoError(t, err) + fmt.Printf(resp.Result) + +}