diff --git a/api/grpc/block.go b/api/grpc/block.go index 92df199..47d8c0f 100644 --- a/api/grpc/block.go +++ b/api/grpc/block.go @@ -88,3 +88,14 @@ func (cbs *ChainBrokerService) GetBlocks(ctx context.Context, req *pb.GetBlocksR Blocks: blocks, }, nil } + +func (cbs *ChainBrokerService) GetBlockHeaders(ctx context.Context, req *pb.GetBlockHeadersRequest) (*pb.GetBlockHeadersResponse, error) { + headers, err := cbs.api.Broker().GetBlockHeaders(req.Start, req.End) + if err != nil { + return nil, err + } + + return &pb.GetBlockHeadersResponse{ + BlockHeaders: headers, + }, nil +} diff --git a/go.mod b/go.mod index 929bdff..1d8473d 100644 --- a/go.mod +++ b/go.mod @@ -27,7 +27,7 @@ require ( github.com/magiconair/properties v1.8.4 github.com/meshplus/bitxhub-core v0.1.0-rc1.0.20201125025329-ac1187099a88 github.com/meshplus/bitxhub-kit v1.1.2-0.20201203072410-8a0383a6870d - github.com/meshplus/bitxhub-model v1.1.2-0.20201218090311-9e471bb1654c + github.com/meshplus/bitxhub-model v1.1.2-0.20201221070800-ca8184215353 github.com/meshplus/go-lightp2p v0.0.0-20201203044909-e09b34cd93ab github.com/mitchellh/go-homedir v1.1.0 github.com/multiformats/go-multiaddr v0.2.2 diff --git a/go.sum b/go.sum index be17ab5..1deb7d8 100644 --- a/go.sum +++ b/go.sum @@ -611,6 +611,10 @@ github.com/meshplus/bitxhub-model v1.1.2-0.20201209072914-6846fa78ff35 h1:fuY1Vl github.com/meshplus/bitxhub-model v1.1.2-0.20201209072914-6846fa78ff35/go.mod h1:sk7glP/0M9G9On4SN6rMPaLGqet8Uu35wA65Mxc3Cms= github.com/meshplus/bitxhub-model v1.1.2-0.20201218090311-9e471bb1654c h1:xVrWp1PlY9kxUQkb8Go/MKbm0PhYGi4yo+YBUDkGx0A= github.com/meshplus/bitxhub-model v1.1.2-0.20201218090311-9e471bb1654c/go.mod h1:HpL39K8fuh3nDu+HuprYuDcS/C3eFOQBvFkd1bPViwA= +github.com/meshplus/bitxhub-model v1.1.2-0.20201221062626-1774aad0f842 h1:THnvVag1IMAjfVGJJJSmN53o6YwXGPkbR+8dE53ckjw= +github.com/meshplus/bitxhub-model v1.1.2-0.20201221062626-1774aad0f842/go.mod h1:HpL39K8fuh3nDu+HuprYuDcS/C3eFOQBvFkd1bPViwA= +github.com/meshplus/bitxhub-model v1.1.2-0.20201221070800-ca8184215353 h1:7wkdviM8ssujL6pQHjHRGo9k+IOioBviaRKySjd0lzQ= +github.com/meshplus/bitxhub-model v1.1.2-0.20201221070800-ca8184215353/go.mod h1:HpL39K8fuh3nDu+HuprYuDcS/C3eFOQBvFkd1bPViwA= github.com/meshplus/go-lightp2p v0.0.0-20201203044909-e09b34cd93ab h1:JclTakVV0dcXxl/dScmN77htnYe3n19hh7m2eMk9Abs= github.com/meshplus/go-lightp2p v0.0.0-20201203044909-e09b34cd93ab/go.mod h1:L3pEzDMouz+xcIVwG2fj+mAsM95GAkzoo7cEd2CzmCQ= github.com/mgutz/ansi v0.0.0-20170206155736-9520e82c474b/go.mod h1:01TrycV0kFyexm33Z7vhZRXopbI8J3TDReVlkTgMUxE= diff --git a/internal/coreapi/api/api.go b/internal/coreapi/api/api.go index d4ef8b2..bc67060 100644 --- a/internal/coreapi/api/api.go +++ b/internal/coreapi/api/api.go @@ -45,6 +45,7 @@ type BrokerAPI interface { FetchSignsFromOtherPeers(content string, typ pb.GetMultiSignsRequest_Type) map[string][]byte GetSign(content string, typ pb.GetMultiSignsRequest_Type) (string, []byte, error) + GetBlockHeaders(start uint64, end uint64) ([]*pb.BlockHeader, error) } type NetworkAPI interface { diff --git a/internal/coreapi/broker.go b/internal/coreapi/broker.go index 113a040..a81c494 100644 --- a/internal/coreapi/broker.go +++ b/internal/coreapi/broker.go @@ -121,6 +121,24 @@ func (b *BrokerAPI) GetBlocks(start uint64, end uint64) ([]*pb.Block, error) { return blocks, nil } +func (b *BrokerAPI) GetBlockHeaders(start uint64, end uint64) ([]*pb.BlockHeader, error) { + meta := b.bxh.Ledger.GetChainMeta() + + var blockHeaders []*pb.BlockHeader + if meta.Height < end { + end = meta.Height + } + for i := start; i > 0 && i <= end; i++ { + b, err := b.GetBlock("HEIGHT", strconv.Itoa(int(i))) + if err != nil { + continue + } + blockHeaders = append(blockHeaders, b.BlockHeader) + } + + return blockHeaders, nil +} + func (b *BrokerAPI) RemovePier(pid string, isUnion bool) { b.bxh.Router.RemovePier(pid, isUnion) } diff --git a/pkg/order/syncer/state_syncer.go b/pkg/order/syncer/state_syncer.go new file mode 100644 index 0000000..c6816c6 --- /dev/null +++ b/pkg/order/syncer/state_syncer.go @@ -0,0 +1,339 @@ +package syncer + +import ( + "fmt" + "math/rand" + "sync" + "time" + + "github.com/Rican7/retry" + "github.com/Rican7/retry/strategy" + "github.com/meshplus/bitxhub-kit/types" + "github.com/meshplus/bitxhub-model/pb" + "github.com/meshplus/bitxhub/pkg/peermgr" + "github.com/sirupsen/logrus" +) + +var _ Syncer = (*StateSyncer)(nil) + +type StateSyncer struct { + checkpoint uint64 // check point + peerMgr peermgr.PeerManager // network manager + badPeers *sync.Map // peer node set who return bad block + quorum uint64 // quorum node numbers + peerIds []uint64 // peers who have current newly consensus state + logger logrus.FieldLogger +} + +type rangeHeight struct { + begin uint64 + end uint64 +} + +func New(checkpoint uint64, peerMgr peermgr.PeerManager, quorum uint64, peerIds []uint64, logger logrus.FieldLogger) (*StateSyncer, error) { + if checkpoint == 0 { + return nil, fmt.Errorf("checkpoint not be 0") + } + if quorum <= 0 { + return nil, fmt.Errorf("the vp nodes' quorum must be positive") + } + if len(peerIds) < int(quorum) { + return nil, fmt.Errorf("the peers num must be gather than quorum") + } + return &StateSyncer{ + checkpoint: checkpoint, + peerMgr: peerMgr, + logger: logger, + quorum: quorum, + peerIds: peerIds, + badPeers: &sync.Map{}, + }, nil +} + +func (s *StateSyncer) SyncCFTBlocks(begin, end uint64, blockCh chan *pb.Block) error { + rangeHeights, err := s.calcRangeHeight(begin, end) + if err != nil { + return err + } + + for _, rangeHeight := range rangeHeights { + rangeTmp := rangeHeight + err := retry.Retry(func(attempt uint) error { + id, err := s.randPeers() + if err != nil { + s.logger.Errorf(err.Error()) + return err + } + + + s.logger.WithFields(logrus.Fields{ + "begin": rangeTmp.begin, + "end": rangeTmp.end, + "peer_id": id, + }).Info("syncing range block") + + blocks, err := s.fetchBlocks(id, rangeTmp.begin, rangeTmp.end) + if err != nil { + s.badPeers.Store(id, nil) + s.logger.Errorf("fetch blocks error:%w", err) + return err + } + for _, block := range blocks { + blockCh <- block + } + return nil + }, strategy.Wait(100*time.Millisecond)) + if err != nil { + s.logger.Error(err) + } + } + blockCh <- nil + + return nil +} + +func (s *StateSyncer) SyncBFTBlocks(begin, end uint64, metaHash *types.Hash, blockCh chan *pb.Block) error { + rangeHeights, err := s.calcRangeHeight(begin, end) + if err != nil { + return err + } + + var parentBlockHash *types.Hash + for i, rangeHeight := range rangeHeights { + if i == 0 { + parentBlockHash = metaHash + } + rangeTmp := rangeHeight + headers := s.syncQuorumRangeBlockHeaders(rangeTmp, parentBlockHash) + if headers == nil { + return fmt.Errorf("fetch and verify the quorum peers' block header error: %v", rangeTmp) + } + blocks := s.syncRangeBlocks(headers) + if blocks == nil { + return fmt.Errorf("fetch and verify peers' block error: %v", rangeTmp) + } + for _, block := range blocks { + blockCh <- block + } + parentBlockHash = blocks[len(blocks)-1].Hash() + } + blockCh <- nil + return nil +} + +func (s *StateSyncer) syncQuorumRangeBlockHeaders(rangeHeight *rangeHeight, parentBlockHash *types.Hash) []*pb.BlockHeader { + var isQuorum bool + var hash string + latestBlockHeaderCounter := make(map[string]uint64) + blockHeadersM := make(map[string][]*pb.BlockHeader) + + fetchAndVerifyBlockHeaders := func(id uint64) { + s.logger.WithFields(logrus.Fields{ + "begin": rangeHeight.begin, + "end": rangeHeight.end, + "peer_id": id, + }).Info("syncing range block header") + headers, err := s.fetchBlockHeaders(id, rangeHeight.begin, rangeHeight.end) + if err != nil { + s.logger.Errorf("fetch block headers error:%w", err) + return + } + err = s.verifyBlockHeaders(parentBlockHash, headers) + if err != nil { + s.badPeers.Store(id, nil) + s.logger.Errorf("check block headers error:%w", err) + return + } + latestBlock := &pb.Block{BlockHeader: headers[len(headers)-1]} + blockHash := latestBlock.Hash() + latestBlockHeaderCounter[blockHash.String()]++ + blockHeadersM[blockHash.String()] = headers + } + + + for _, id := range s.peerIds { + + fetchAndVerifyBlockHeaders(id) + for latestHash, counter := range latestBlockHeaderCounter { + if counter >= s.quorum { + hash = latestHash + isQuorum = true + break + } + } + if isQuorum { + break + } + } + if !isQuorum { + return nil + } + + return blockHeadersM[hash] + +} + +func (s *StateSyncer) syncRangeBlocks(headers []*pb.BlockHeader) []*pb.Block { + var blocks []*pb.Block + begin := headers[0].Number + end := headers[len(headers)-1].Number + + fetchAndVerifyBlocks := func(id uint64) { + s.logger.WithFields(logrus.Fields{ + "begin": begin, + "end": end, + "peer_id": id, + }).Info("syncing range block") + fetchBlocks, err := s.fetchBlocks(id, begin, end) + if err != nil { + s.badPeers.Store(id, nil) + s.logger.Errorf("fetch block headers error:%w", err) + return + } + for i, block := range fetchBlocks { + err := s.verifyBlock(headers[i], block) + if err != nil { + s.badPeers.Store(id, nil) + s.logger.Errorf("check block headers error:%w", err) + return + } + } + blocks = fetchBlocks + } + for _, id := range s.peerIds { + if blocks != nil { + break + } + fetchAndVerifyBlocks(id) + } + return blocks +} + +func (s *StateSyncer) randPeers() (uint64, error) { + ids := make([]uint64, 0) + for _, id := range s.peerIds { + _, ok := s.badPeers.Load(id) + if ok { + continue + } + ids = append(ids, id) + } + if len(ids) == 0 { + return 0, fmt.Errorf("peers nums is 0") + } + randIndex := rand.Int63n(int64(len(ids))) + return ids[randIndex], nil +} + +func (s *StateSyncer) calcRangeHeight(begin, end uint64) ([]*rangeHeight, error) { + if begin > end { + return nil, fmt.Errorf("the end height:%d is less than the start height:%d", end, begin) + } + startNo := begin / s.checkpoint + rangeHeights := make([]*rangeHeight, 0) + for ; begin <= end; { + rangeBegin := begin + rangeEnd := (startNo + 1) * s.checkpoint + if rangeEnd > end { + rangeEnd = end + } + + rangeHeights = append(rangeHeights, &rangeHeight{ + begin: rangeBegin, + end: rangeEnd, + }) + begin = rangeEnd + 1 + startNo++ + } + return rangeHeights, nil +} + +func (s *StateSyncer) fetchBlockHeaders(id uint64, begin, end uint64) ([]*pb.BlockHeader, error) { + if begin > end { + return nil, fmt.Errorf("the end height:%d is less than the start height:%d", end, begin) + } + + req := &pb.GetBlockHeadersRequest{ + Start: begin, + End: end, + } + data, err := req.Marshal() + if err != nil { + return nil, err + } + m := &pb.Message{ + Type: pb.Message_GET_BLOCK_HEADERS, + Data: data, + } + + res, err := s.peerMgr.Send(id, m) + if err != nil { + return nil, err + } + + blockHeaders := &pb.GetBlockHeadersResponse{} + if err := blockHeaders.Unmarshal(res.Data); err != nil { + return nil, err + } + return blockHeaders.BlockHeaders, nil +} + +func (s *StateSyncer) fetchBlocks(id uint64, begin, end uint64) ([]*pb.Block, error) { + if begin > end { + return nil, fmt.Errorf("the end height:%d is less than the start height: %d", end, begin) + } + + req := &pb.GetBlocksRequest{ + Start: begin, + End: end, + } + data, err := req.Marshal() + if err != nil { + return nil, err + } + m := &pb.Message{ + Type: pb.Message_GET_BLOCKS, + Data: data, + } + + res, err := s.peerMgr.Send(id, m) + if err != nil { + return nil, err + } + + blocks := &pb.GetBlocksResponse{} + if err := blocks.Unmarshal(res.Data); err != nil { + return nil, err + } + return blocks.Blocks, nil +} + +func (s *StateSyncer) verifyBlockHeaders(parentHash *types.Hash, headers []*pb.BlockHeader) error { + if parentHash == nil || len(headers) == 0 { + return fmt.Errorf("args must not be nil or empty") + } + for _, header := range headers { + block := &pb.Block{BlockHeader: header} + hash := block.Hash() + ok, _ := parentHash.Equals(header.ParentHash) + if !ok { + return fmt.Errorf("block number is %d, hash is %s, but parent hash is %s", header.Number, hash.String(), header.ParentHash) + } + parentHash = hash + } + return nil +} + +func (s *StateSyncer) verifyBlock(header *pb.BlockHeader, block *pb.Block) error { + if header == nil || block == nil { + return fmt.Errorf("args must not be nil or empty") + } + originBlock := &pb.Block{BlockHeader: header} + hash := originBlock.Hash() + //todo(jz): need to calc txs merkle root and compare with block's tx root + ok, _ := hash.Equals(block.BlockHash) + if !ok { + return fmt.Errorf("block hash is not equals, number is %d", block.Height()) + } + return nil +} diff --git a/pkg/order/syncer/syncer.go b/pkg/order/syncer/syncer.go new file mode 100644 index 0000000..591fc2c --- /dev/null +++ b/pkg/order/syncer/syncer.go @@ -0,0 +1,14 @@ +package syncer + +import ( + "github.com/meshplus/bitxhub-kit/types" + "github.com/meshplus/bitxhub-model/pb" +) + +type Syncer interface { + // SyncCFTBlocks fetches the block list from other node, and just fetches but not verifies the block + SyncCFTBlocks(begin, end uint64, blockCh chan *pb.Block) error + + // SyncBFTBlocks fetches the block list from quorum nodes, and verifies all the block + SyncBFTBlocks(begin, end uint64, metaHash *types.Hash, blockCh chan *pb.Block) error +} diff --git a/pkg/order/syncer/syncer_test.go b/pkg/order/syncer/syncer_test.go new file mode 100644 index 0000000..2945c7d --- /dev/null +++ b/pkg/order/syncer/syncer_test.go @@ -0,0 +1,236 @@ +package syncer + +import ( + "fmt" + "github.com/meshplus/bitxhub-kit/types" + "io/ioutil" + "strings" + "testing" + "time" + + "github.com/golang/mock/gomock" + crypto2 "github.com/libp2p/go-libp2p-core/crypto" + "github.com/libp2p/go-libp2p-core/peer" + "github.com/meshplus/bitxhub-kit/crypto" + "github.com/meshplus/bitxhub-kit/crypto/asym" + "github.com/meshplus/bitxhub-kit/crypto/asym/ecdsa" + "github.com/meshplus/bitxhub-kit/log" + "github.com/meshplus/bitxhub-model/pb" + "github.com/meshplus/bitxhub/internal/ledger/mock_ledger" + "github.com/meshplus/bitxhub/internal/repo" + "github.com/meshplus/bitxhub/pkg/cert" + "github.com/meshplus/bitxhub/pkg/peermgr" + "github.com/stretchr/testify/require" +) + +func TestStateSyncer_SyncCFTBlocks(t *testing.T) { + peerCnt := 3 + swarms := NewSwarms(t, peerCnt) + + otherPeers := swarms[0].OtherPeers() + peerIds := make([]uint64, 0) + for id, _ := range otherPeers { + peerIds = append(peerIds, id) + } + logger := log.NewWithModule("syncer") + syncer, err := New(10, swarms[0], 2, peerIds, logger) + require.Nil(t, err) + + begin := 2 + end := 100 + blockCh := make(chan *pb.Block, 1024) + go syncer.SyncCFTBlocks(uint64(begin), uint64(end), blockCh) + + blocks := make([]*pb.Block, 0) + for block := range blockCh { + if block == nil { + break + } + blocks = append(blocks, block) + } + + require.Equal(t, len(blocks), end-begin+1) + +} + +func TestStateSyncer_SyncBFTBlocks(t *testing.T) { + peerCnt := 4 + swarms := NewSwarms(t, peerCnt) + + //time.Sleep(100 * time.Millisecond) + otherPeers := swarms[0].OtherPeers() + peerIds := make([]uint64, 0) + for id, _ := range otherPeers { + peerIds = append(peerIds, id) + } + logger := log.NewWithModule("syncer") + syncer, err := New(10, swarms[0], 3, peerIds, logger) + require.Nil(t, err) + + begin := 2 + end := 100 + blockCh := make(chan *pb.Block, 1024) + + metaHash := types.NewHashByStr("0xbC1C6897f97782F3161492d5CcfBE0691502f15894A0b2f2f40069C995E33cCB") + go syncer.SyncBFTBlocks(uint64(begin), uint64(end), metaHash, blockCh) + + blocks := make([]*pb.Block, 0) + for block := range blockCh { + if block == nil { + break + } + blocks = append(blocks, block) + } + + require.Equal(t, len(blocks), end-begin+1) +} + +func genKeysAndConfig(t *testing.T, peerCnt int) ([]crypto2.PrivKey, []crypto.PrivateKey, []string, []string) { + var nodeKeys []crypto2.PrivKey + var privKeys []crypto.PrivateKey + var peers []string + var ids []string + + port := 5001 + + for i := 0; i < peerCnt; i++ { + key, err := asym.GenerateKeyPair(crypto.ECDSA_P256) + require.Nil(t, err) + + libp2pKey, err := convertToLibp2pPrivKey(key) + require.Nil(t, err) + nodeKeys = append(nodeKeys, libp2pKey) + id, err := peer.IDFromPublicKey(libp2pKey.GetPublic()) + require.Nil(t, err) + + peer := fmt.Sprintf("/ip4/127.0.0.1/tcp/%d/p2p/", port) + peers = append(peers, peer) + ids = append(ids, id.String()) + port++ + + privKey, err := asym.GenerateKeyPair(crypto.Secp256k1) + require.Nil(t, err) + + privKeys = append(privKeys, privKey) + } + + return nodeKeys, privKeys, peers, ids +} + +func convertToLibp2pPrivKey(privateKey crypto.PrivateKey) (crypto2.PrivKey, error) { + ecdsaPrivKey, ok := privateKey.(*ecdsa.PrivateKey) + if !ok { + return nil, fmt.Errorf("convert to libp2p private key: not ecdsa private key") + } + + libp2pPrivKey, _, err := crypto2.ECDSAKeyPairFromKey(ecdsaPrivKey.K) + if err != nil { + return nil, err + } + + return libp2pPrivKey, nil +} + +func peers(id uint64, addrs []string, ids []string) []*repo.NetworkNodes { + m := make([]*repo.NetworkNodes, 0, len(addrs)) + for i, addr := range addrs { + m = append(m, &repo.NetworkNodes{ + ID: uint64(i + 1), + Account: "", + Pid: ids[i], + Hosts: []string{addr}, + }) + } + return m +} + +func NewSwarms(t *testing.T, peerCnt int) []*peermgr.Swarm { + var swarms []*peermgr.Swarm + + blocks := genBlocks(1024) + nodeKeys, privKeys, addrs, ids := genKeysAndConfig(t, peerCnt) + mockCtl := gomock.NewController(t) + mockLedger := mock_ledger.NewMockLedger(mockCtl) + mockLedger.EXPECT().GetBlock(gomock.Any()).DoAndReturn(func(height uint64) (*pb.Block, error) { + return blocks[height-1], nil + }).AnyTimes() + + agencyData, err := ioutil.ReadFile("testdata/agency.cert") + require.Nil(t, err) + + nodeData, err := ioutil.ReadFile("testdata/node.cert") + require.Nil(t, err) + + caData, err := ioutil.ReadFile("testdata/ca.cert") + require.Nil(t, err) + + cert, err := cert.ParseCert(caData) + require.Nil(t, err) + + for i := 0; i < peerCnt; i++ { + repo := &repo.Repo{ + Key: &repo.Key{}, + NetworkConfig: &repo.NetworkConfig{ + N: uint64(peerCnt), + ID: uint64(i + 1), + }, + Certs: &repo.Certs{ + NodeCertData: nodeData, + AgencyCertData: agencyData, + CACert: cert, + }, + Config: &repo.Config{ + Ping: repo.Ping{ + Enable: true, + Duration: 2 * time.Second, + }, + }, + } + + idx := strings.LastIndex(addrs[i], "/p2p/") + local := addrs[i][:idx] + repo.NetworkConfig.LocalAddr = local + repo.Key.Libp2pPrivKey = nodeKeys[i] + repo.Key.PrivKey = privKeys[i] + repo.NetworkConfig.Nodes = peers(uint64(i), addrs, ids) + + swarm, err := peermgr.New(repo, log.NewWithModule(fmt.Sprintf("swarm%d", i)), mockLedger) + require.Nil(t, err) + err = swarm.Start() + require.Nil(t, err) + swarms = append(swarms, swarm) + } + return swarms +} + +func genBlocks(count int) []*pb.Block { + blocks := make([]*pb.Block, 0, count) + for height := 1; height <= count; height++ { + block := &pb.Block{} + if height == 1 { + block.BlockHeader = &pb.BlockHeader{ + Number: 1, + StateRoot: types.NewHashByStr("0xc30B6E0ad5327fc8548f4BaFab3271cA6a5bD92f084095958c84970165bfA6E7"), + TxRoot: nil, + ReceiptRoot: nil, + ParentHash: nil, + Timestamp: 0, + Version: nil, + } + block.BlockHash = types.NewHashByStr("0xbC1C6897f97782F3161492d5CcfBE0691502f15894A0b2f2f40069C995E33cCB") + } else { + block.BlockHeader = &pb.BlockHeader{ + Number: uint64(height), + StateRoot: blocks[len(blocks)-1].BlockHeader.StateRoot, + TxRoot: nil, + ReceiptRoot: nil, + ParentHash: blocks[len(blocks)-1].BlockHash, + Timestamp: 0, + Version: nil, + } + block.BlockHash = block.Hash() + } + blocks = append(blocks, block) + } + return blocks +} diff --git a/pkg/order/syncer/testdata/agency.cert b/pkg/order/syncer/testdata/agency.cert new file mode 100644 index 0000000..cd3dcd1 --- /dev/null +++ b/pkg/order/syncer/testdata/agency.cert @@ -0,0 +1,16 @@ +-----BEGIN CERTIFICATE----- +MIICkjCCAjegAwIBAgIDAoB1MAoGCCqGSM49BAMCMIGhMQswCQYDVQQGEwJDTjER +MA8GA1UECBMIWmhlSmlhbmcxETAPBgNVBAcTCEhhbmdaaG91MR8wDQYDVQQJEwZz +dHJlZXQwDgYDVQQJEwdhZGRyZXNzMQ8wDQYDVQQREwYzMjQwMDAxEzARBgNVBAoT +Ckh5cGVyY2hhaW4xEDAOBgNVBAsTB0JpdFhIdWIxEzARBgNVBAMTCmJpdHhodWIu +Y24wIBcNMjAwODExMDUwNzEzWhgPMjA3MDA3MzAwNTA3MTNaMIGaMQswCQYDVQQG +EwJDTjERMA8GA1UECBMIWmhlSmlhbmcxETAPBgNVBAcTCEhhbmdaaG91MR8wDQYD +VQQJEwZzdHJlZXQwDgYDVQQJEwdhZGRyZXNzMQ8wDQYDVQQREwYzMjQwMDAxDzAN +BgNVBAoTBkFnZW5jeTEQMA4GA1UECxMHQml0WEh1YjEQMA4GA1UEAxMHQml0WEh1 +YjBZMBMGByqGSM49AgEGCCqGSM49AwEHA0IABETCj+MAoveSsjSLGr3y8F5IEccL +Afn8d9rmw3ON24Y6NzoqV72T+kvXP7lJlj+7bhYJRtXO3FiPDBp1fFQb1tyjYTBf +MA4GA1UdDwEB/wQEAwIBpjAPBgNVHSUECDAGBgRVHSUAMA8GA1UdEwEB/wQFMAMB +Af8wKwYDVR0jBCQwIoAgFjEjLDiDWrdUwpIcMxUaxWQSVMS7agCqo44QcOAMlYMw +CgYIKoZIzj0EAwIDSQAwRgIhAJeZBApwY2BtYXK28Wa0gzJ8Nc9pjhtX2UFoc7cl +YilwAiEA3m60eyYQj/BiSDZ8+kZbUJO2C7y0HaT1xwNdjSZBFhI= +-----END CERTIFICATE----- diff --git a/pkg/order/syncer/testdata/ca.cert b/pkg/order/syncer/testdata/ca.cert new file mode 100644 index 0000000..148c5a7 --- /dev/null +++ b/pkg/order/syncer/testdata/ca.cert @@ -0,0 +1,16 @@ +-----BEGIN CERTIFICATE----- +MIIClTCCAjygAwIBAgIDAKWTMAoGCCqGSM49BAMCMIGhMQswCQYDVQQGEwJDTjER +MA8GA1UECBMIWmhlSmlhbmcxETAPBgNVBAcTCEhhbmdaaG91MR8wDQYDVQQJEwZz +dHJlZXQwDgYDVQQJEwdhZGRyZXNzMQ8wDQYDVQQREwYzMjQwMDAxEzARBgNVBAoT +Ckh5cGVyY2hhaW4xEDAOBgNVBAsTB0JpdFhIdWIxEzARBgNVBAMTCmJpdHhodWIu +Y24wIBcNMjAwODExMDUwNzEyWhgPMjA3MDA3MzAwNTA3MTJaMIGhMQswCQYDVQQG +EwJDTjERMA8GA1UECBMIWmhlSmlhbmcxETAPBgNVBAcTCEhhbmdaaG91MR8wDQYD +VQQJEwZzdHJlZXQwDgYDVQQJEwdhZGRyZXNzMQ8wDQYDVQQREwYzMjQwMDAxEzAR +BgNVBAoTCkh5cGVyY2hhaW4xEDAOBgNVBAsTB0JpdFhIdWIxEzARBgNVBAMTCmJp +dHhodWIuY24wWTATBgcqhkjOPQIBBggqhkjOPQMBBwNCAASznYuB78fMNqgooAnp +YPUfWoQCDAM0nV5YwSj1Rz6RcWKuRAPQooPFb8K2WO6hfRUGtCba+l48FD34R/RP +aajqo18wXTAOBgNVHQ8BAf8EBAMCAaYwDwYDVR0lBAgwBgYEVR0lADAPBgNVHRMB +Af8EBTADAQH/MCkGA1UdDgQiBCAWMSMsOINat1TCkhwzFRrFZBJUxLtqAKqjjhBw +4AyVgzAKBggqhkjOPQQDAgNHADBEAiAW5pb+b2aLuEBJCkaEXfukb/HwFsNm7zmx +Ha6oyk0HYAIgXhsMYlIXKf5RmlBFiEWZZRJTyiubI7NDqN5+JlVBAog= +-----END CERTIFICATE----- diff --git a/pkg/order/syncer/testdata/node.cert b/pkg/order/syncer/testdata/node.cert new file mode 100644 index 0000000..fec0bb4 --- /dev/null +++ b/pkg/order/syncer/testdata/node.cert @@ -0,0 +1,15 @@ +-----BEGIN CERTIFICATE----- +MIICWjCCAf+gAwIBAgIDDhFLMAoGCCqGSM49BAMCMIGaMQswCQYDVQQGEwJDTjER +MA8GA1UECBMIWmhlSmlhbmcxETAPBgNVBAcTCEhhbmdaaG91MR8wDQYDVQQJEwZz +dHJlZXQwDgYDVQQJEwdhZGRyZXNzMQ8wDQYDVQQREwYzMjQwMDAxDzANBgNVBAoT +BkFnZW5jeTEQMA4GA1UECxMHQml0WEh1YjEQMA4GA1UEAxMHQml0WEh1YjAgFw0y +MDA4MTEwNTA3MTNaGA8yMDcwMDczMDA1MDcxM1owgZkxCzAJBgNVBAYTAkNOMREw +DwYDVQQIEwhaaGVKaWFuZzERMA8GA1UEBxMISGFuZ1pob3UxHzANBgNVBAkTBnN0 +cmVldDAOBgNVBAkTB2FkZHJlc3MxDzANBgNVBBETBjMyNDAwMDEOMAwGA1UEChMF +Tm9kZTExEDAOBgNVBAsTB0JpdFhIdWIxEDAOBgNVBAMTB0JpdFhIdWIwWTATBgcq +hkjOPQIBBggqhkjOPQMBBwNCAATgjTYEnavxerFuEKJ8C39QUY12xh/TC2E5V7ni +nmQcOgDDRv5HW4sskTSm/WX2D0BMzwb7XE5ATyoDeM9qcurDozEwLzAOBgNVHQ8B +Af8EBAMCAaYwDwYDVR0lBAgwBgYEVR0lADAMBgNVHRMBAf8EAjAAMAoGCCqGSM49 +BAMCA0kAMEYCIQD5Oz1xJvFgzYm/lTzoaO/i0ayPVRgSdBwvK6hEICo5lAIhAMtG +aswjd2wVA4zB5GPEmJ/tvPUnxrlOAU67AQMYR4zf +-----END CERTIFICATE----- diff --git a/pkg/peermgr/handle.go b/pkg/peermgr/handle.go index 45445a4..31ea424 100644 --- a/pkg/peermgr/handle.go +++ b/pkg/peermgr/handle.go @@ -28,6 +28,10 @@ func (swarm *Swarm) handleMessage(s network.Stream, data []byte) { switch m.Type { case pb.Message_GET_BLOCK: return swarm.handleGetBlockPack(s, m) + case pb.Message_GET_BLOCK_HEADERS: + return swarm.handleGetBlockHeadersPack(s, m) + case pb.Message_GET_BLOCKS: + return swarm.handleGetBlocksPack(s, m) case pb.Message_FETCH_CERT: return swarm.handleFetchCertMessage(s) case pb.Message_CONSENSUS: @@ -82,6 +86,38 @@ func (swarm *Swarm) handleGetBlockPack(s network.Stream, msg *pb.Message) error return nil } +func (swarm *Swarm) handleGetBlockHeadersPack(s network.Stream, msg *pb.Message) error { + req := &pb.GetBlockHeadersRequest{} + if err := req.Unmarshal(msg.Data); err != nil { + return err + } + + res := &pb.GetBlockHeadersResponse{} + blockHeaders := make([]*pb.BlockHeader, 0) + for i := req.Start; i <= req.End; i++ { + block, err := swarm.ledger.GetBlock(i) + if err != nil { + return err + } + blockHeaders = append(blockHeaders, block.BlockHeader) + } + res.BlockHeaders = blockHeaders + v, err := res.Marshal() + if err != nil { + return err + } + m := &pb.Message{ + Type: pb.Message_GET_BLOCK_HEADERS_ACK, + Data: v, + } + + if err := swarm.SendWithStream(s, m); err != nil { + return err + } + + return nil +} + func (swarm *Swarm) handleFetchCertMessage(s network.Stream) error { certs := &model.CertsMessage{ AgencyCert: swarm.repo.Certs.AgencyCertData, @@ -251,3 +287,36 @@ func (swarm *Swarm) handleFetchIBTPSignMessage(s network.Stream, data []byte) { swarm.logger.Errorf("send asset exchange sign back: %s", err) } } + + +func (swarm *Swarm) handleGetBlocksPack(s network.Stream, msg *pb.Message) error { + req := &pb.GetBlocksRequest{} + if err := req.Unmarshal(msg.Data); err != nil { + return err + } + + res := &pb.GetBlocksResponse{} + blocks := make([]*pb.Block, 0) + for i := req.Start; i <= req.End; i++ { + block, err := swarm.ledger.GetBlock(i) + if err != nil { + return err + } + blocks = append(blocks, block) + } + res.Blocks = blocks + v, err := res.Marshal() + if err != nil { + return err + } + m := &pb.Message{ + Type: pb.Message_GET_BLOCKS_ACK, + Data: v, + } + + if err := swarm.SendWithStream(s, m); err != nil { + return err + } + + return nil +} diff --git a/pkg/peermgr/handle_test.go b/pkg/peermgr/handle_test.go index a82af5a..ef3a6a5 100644 --- a/pkg/peermgr/handle_test.go +++ b/pkg/peermgr/handle_test.go @@ -163,7 +163,6 @@ func TestSwarm_Send(t *testing.T) { peerCnt := 4 swarms := NewSwarms(t, peerCnt) - time.Sleep(2 * time.Second) msg := &pb.Message{ Type: pb.Message_GET_BLOCK, @@ -186,22 +185,77 @@ func TestSwarm_Send(t *testing.T) { require.Nil(t, err) require.Equal(t, uint64(1), block.BlockHeader.Number) + req := pb.GetBlocksRequest{ + Start: 1, + End: 1, + } + data, err := req.Marshal() + require.Nil(t, err) + + fetchBlocksMsg := &pb.Message{ + Type: pb.Message_GET_BLOCKS, + Data: data, + } + err = retry.Retry(func(attempt uint) error { + res, err = swarms[2].Send(1, fetchBlocksMsg) + if err != nil { + swarms[2].logger.Errorf(err.Error()) + return err + } + return nil + }, strategy.Wait(50*time.Millisecond)) + require.Nil(t, err) + require.Equal(t, pb.Message_GET_BLOCKS_ACK, res.Type) + var getBlocksRes pb.GetBlocksResponse + err = getBlocksRes.Unmarshal(res.Data) + require.Nil(t, err) + require.Equal(t, 1, len(getBlocksRes.Blocks)) + + + + getBlockHeadersReq := pb.GetBlockHeadersRequest{ + Start: 1, + End: 1, + } + data, err = getBlockHeadersReq.Marshal() + require.Nil(t, err) + + fetchBlockHeadersMsg := &pb.Message{ + Type: pb.Message_GET_BLOCK_HEADERS, + Data: data, + } + err = retry.Retry(func(attempt uint) error { + res, err = swarms[2].Send(4, fetchBlockHeadersMsg) + if err != nil { + swarms[2].logger.Errorf(err.Error()) + return err + } + return nil + }, strategy.Wait(50*time.Millisecond)) + require.Nil(t, err) + require.Equal(t, pb.Message_GET_BLOCK_HEADERS_ACK, res.Type) + + var getBlockHeaderssRes pb.GetBlockHeadersResponse + err = getBlockHeaderssRes.Unmarshal(res.Data) + require.Nil(t, err) + require.Equal(t, 1, len(getBlockHeaderssRes.BlockHeaders)) + + fetchBlockSignMsg := &pb.Message{ Type: pb.Message_FETCH_BLOCK_SIGN, Data: []byte("1"), } err = retry.Retry(func(attempt uint) error { - res, err = swarms[0].Send(3, fetchBlockSignMsg) + res, err = swarms[1].Send(3, fetchBlockSignMsg) if err != nil { - swarms[0].logger.Errorf(err.Error()) + swarms[1].logger.Errorf(err.Error()) return err } return nil }, strategy.Wait(50*time.Millisecond)) require.Nil(t, err) require.Equal(t, pb.Message_FETCH_BLOCK_SIGN_ACK, res.Type) - require.Nil(t, err) require.NotNil(t, res.Data) fetchAESMsg := &pb.Message{ @@ -219,7 +273,6 @@ func TestSwarm_Send(t *testing.T) { }, strategy.Wait(50*time.Millisecond)) require.Nil(t, err) require.Equal(t, pb.Message_FETCH_ASSET_EXCHANGE_SIGN_ACK, res.Type) - require.Nil(t, err) require.NotNil(t, res.Data) fetchIBTPSignMsg := &pb.Message{ @@ -237,7 +290,6 @@ func TestSwarm_Send(t *testing.T) { }, strategy.Wait(50*time.Millisecond)) require.Nil(t, err) require.Equal(t, pb.Message_FETCH_IBTP_SIGN_ACK, res.Type) - require.Nil(t, err) require.NotNil(t, res.Data) } @@ -245,8 +297,6 @@ func TestSwarm_AsyncSend(t *testing.T) { peerCnt := 4 swarms := NewSwarms(t, peerCnt) - time.Sleep(2 * time.Second) - orderMsgCh := make(chan events.OrderMessageEvent) orderMsgSub := swarms[2].SubscribeOrderMessage(orderMsgCh) diff --git a/pkg/peermgr/swarm.go b/pkg/peermgr/swarm.go index eded458..e256225 100644 --- a/pkg/peermgr/swarm.go +++ b/pkg/peermgr/swarm.go @@ -327,13 +327,13 @@ func (swarm *Swarm) verifyCert(id uint64) error { } if err := verifyCerts(nodeCert, agencyCert, swarm.repo.Certs.CACert); err != nil { + err = swarm.p2p.Disconnect(swarm.routers[id].Pid) + if err != nil { + return fmt.Errorf("disconnect peer: %w", err) + } return fmt.Errorf("verify certs: %w", err) } - err = swarm.p2p.Disconnect(swarm.routers[id].Pid) - if err != nil { - return fmt.Errorf("disconnect peer: %w", err) - } return nil } @@ -490,3 +490,4 @@ func constructMultiaddr(vpInfo *pb.VpInfo) (*peer.AddrInfo, error) { } return addrInfo, nil } +