bitxhub/api/grpc/broker.go

72 lines
1.5 KiB
Go

package grpc
import (
"context"
"fmt"
"net"
grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
"github.com/meshplus/bitxhub-model/pb"
"github.com/meshplus/bitxhub/internal/coreapi/api"
"github.com/meshplus/bitxhub/internal/loggers"
"github.com/meshplus/bitxhub/internal/repo"
"github.com/sirupsen/logrus"
"google.golang.org/grpc"
)
type ChainBrokerService struct {
config *repo.Config
api api.CoreAPI
server *grpc.Server
logger logrus.FieldLogger
ctx context.Context
cancel context.CancelFunc
}
func NewChainBrokerService(api api.CoreAPI, config *repo.Config) (*ChainBrokerService, error) {
ctx, cancel := context.WithCancel(context.Background())
server := grpc.NewServer(
grpc.StreamInterceptor(grpc_prometheus.StreamServerInterceptor),
grpc.UnaryInterceptor(grpc_prometheus.UnaryServerInterceptor),
grpc.MaxConcurrentStreams(1000))
return &ChainBrokerService{
logger: loggers.Logger(loggers.API),
config: config,
api: api,
server: server,
ctx: ctx,
cancel: cancel,
}, nil
}
func (cbs *ChainBrokerService) Start() error {
lis, err := net.Listen("tcp", fmt.Sprintf(":%d", cbs.config.Port.Grpc))
if err != nil {
return err
}
pb.RegisterChainBrokerServer(cbs.server, cbs)
cbs.logger.WithFields(logrus.Fields{
"port": cbs.config.Port.Grpc,
}).Info("GRPC service started")
go func() {
err := cbs.server.Serve(lis)
if err != nil {
cbs.logger.Error(err)
}
}()
return nil
}
func (cbs *ChainBrokerService) Stop() error {
cbs.cancel()
cbs.logger.Info("GRPC service stopped")
return nil
}