feat:add state update

This commit is contained in:
jzhe 2020-12-21 15:08:04 +08:00
parent b1b8230353
commit ba3963e8d0
14 changed files with 726 additions and 10 deletions

View File

@ -88,3 +88,14 @@ func (cbs *ChainBrokerService) GetBlocks(ctx context.Context, req *pb.GetBlocksR
Blocks: blocks,
}, nil
}
func (cbs *ChainBrokerService) GetBlockHeaders(ctx context.Context, req *pb.GetBlockHeadersRequest) (*pb.GetBlockHeadersResponse, error) {
headers, err := cbs.api.Broker().GetBlockHeaders(req.Start, req.End)
if err != nil {
return nil, err
}
return &pb.GetBlockHeadersResponse{
BlockHeaders: headers,
}, nil
}

2
go.mod
View File

@ -27,7 +27,7 @@ require (
github.com/magiconair/properties v1.8.4
github.com/meshplus/bitxhub-core v0.1.0-rc1.0.20201125025329-ac1187099a88
github.com/meshplus/bitxhub-kit v1.1.2-0.20201203072410-8a0383a6870d
github.com/meshplus/bitxhub-model v1.1.2-0.20201218090311-9e471bb1654c
github.com/meshplus/bitxhub-model v1.1.2-0.20201221070800-ca8184215353
github.com/meshplus/go-lightp2p v0.0.0-20201203044909-e09b34cd93ab
github.com/mitchellh/go-homedir v1.1.0
github.com/multiformats/go-multiaddr v0.2.2

3
go.sum
View File

@ -607,6 +607,9 @@ github.com/meshplus/bitxhub-model v1.0.0-rc3/go.mod h1:ZCctQIYTlE3vJ8Lhkrgs9bWwN
github.com/meshplus/bitxhub-model v1.1.2-0.20201021152621-0b3c17c54b23/go.mod h1:4qWBZx5wv7WZzUqiuBsbkQqQ2Ju8aOFpsoNpBBNy8Us=
github.com/meshplus/bitxhub-model v1.1.2-0.20201218090311-9e471bb1654c h1:xVrWp1PlY9kxUQkb8Go/MKbm0PhYGi4yo+YBUDkGx0A=
github.com/meshplus/bitxhub-model v1.1.2-0.20201218090311-9e471bb1654c/go.mod h1:HpL39K8fuh3nDu+HuprYuDcS/C3eFOQBvFkd1bPViwA=
github.com/meshplus/bitxhub-model v1.1.2-0.20201221062626-1774aad0f842 h1:THnvVag1IMAjfVGJJJSmN53o6YwXGPkbR+8dE53ckjw=
github.com/meshplus/bitxhub-model v1.1.2-0.20201221062626-1774aad0f842/go.mod h1:HpL39K8fuh3nDu+HuprYuDcS/C3eFOQBvFkd1bPViwA=
github.com/meshplus/bitxhub-model v1.1.2-0.20201221070800-ca8184215353/go.mod h1:HpL39K8fuh3nDu+HuprYuDcS/C3eFOQBvFkd1bPViwA=
github.com/meshplus/go-lightp2p v0.0.0-20201203044909-e09b34cd93ab h1:JclTakVV0dcXxl/dScmN77htnYe3n19hh7m2eMk9Abs=
github.com/meshplus/go-lightp2p v0.0.0-20201203044909-e09b34cd93ab/go.mod h1:L3pEzDMouz+xcIVwG2fj+mAsM95GAkzoo7cEd2CzmCQ=
github.com/mgutz/ansi v0.0.0-20170206155736-9520e82c474b/go.mod h1:01TrycV0kFyexm33Z7vhZRXopbI8J3TDReVlkTgMUxE=

View File

@ -45,6 +45,7 @@ type BrokerAPI interface {
FetchSignsFromOtherPeers(content string, typ pb.GetMultiSignsRequest_Type) map[string][]byte
GetSign(content string, typ pb.GetMultiSignsRequest_Type) (string, []byte, error)
GetBlockHeaders(start uint64, end uint64) ([]*pb.BlockHeader, error)
}
type NetworkAPI interface {

View File

@ -121,6 +121,24 @@ func (b *BrokerAPI) GetBlocks(start uint64, end uint64) ([]*pb.Block, error) {
return blocks, nil
}
func (b *BrokerAPI) GetBlockHeaders(start uint64, end uint64) ([]*pb.BlockHeader, error) {
meta := b.bxh.Ledger.GetChainMeta()
var blockHeaders []*pb.BlockHeader
if meta.Height < end {
end = meta.Height
}
for i := start; i > 0 && i <= end; i++ {
b, err := b.GetBlock("HEIGHT", strconv.Itoa(int(i)))
if err != nil {
continue
}
blockHeaders = append(blockHeaders, b.BlockHeader)
}
return blockHeaders, nil
}
func (b *BrokerAPI) RemovePier(pid string, isUnion bool) {
b.bxh.Router.RemovePier(pid, isUnion)
}

View File

@ -0,0 +1,322 @@
package syncer
import (
"fmt"
"math/rand"
"sync"
"time"
"github.com/Rican7/retry"
"github.com/Rican7/retry/strategy"
"github.com/meshplus/bitxhub-kit/types"
"github.com/meshplus/bitxhub-model/pb"
"github.com/meshplus/bitxhub/pkg/peermgr"
"github.com/sirupsen/logrus"
)
var _ Syncer = (*StateSyncer)(nil)
type StateSyncer struct {
checkpoint uint64 // check point
peerMgr peermgr.PeerManager // network manager
badPeers *sync.Map // peer node set who return bad block
quorum uint64 // quorum node numbers
peerIds []uint64 // peers who have current newly consensus state
logger logrus.FieldLogger
}
type rangeHeight struct {
begin uint64
end uint64
}
func New(checkpoint uint64, peerMgr peermgr.PeerManager, quorum uint64, peerIds []uint64, logger logrus.FieldLogger) (*StateSyncer, error) {
if checkpoint == 0 {
return nil, fmt.Errorf("checkpoint not be 0")
}
if quorum <= 0 {
return nil, fmt.Errorf("the vp nodes' quorum must be positive")
}
if len(peerIds) < int(quorum) {
return nil, fmt.Errorf("the peers num must be gather than quorum")
}
return &StateSyncer{
checkpoint: checkpoint,
peerMgr: peerMgr,
logger: logger,
quorum: quorum,
peerIds: peerIds,
badPeers: &sync.Map{},
}, nil
}
func (s *StateSyncer) SyncCFTBlocks(begin, end uint64, blockCh chan *pb.Block) error {
rangeHeights, err := s.calcRangeHeight(begin, end)
if err != nil {
return err
}
for _, rangeHeight := range rangeHeights {
rangeTmp := rangeHeight
err := retry.Retry(func(attempt uint) error {
id := s.randPeers()
s.logger.WithFields(logrus.Fields{
"begin": rangeTmp.begin,
"end": rangeTmp.end,
"peer_id": id,
}).Info("syncing range block")
blocks, err := s.fetchBlocks(id, rangeTmp.begin, rangeTmp.end)
if err != nil {
s.logger.Errorf("fetch blocks error:%w", err)
return err
}
for _, block := range blocks {
blockCh <- block
}
return nil
}, strategy.Wait(100*time.Millisecond))
if err != nil {
s.logger.Error(err)
}
}
blockCh <- nil
return nil
}
func (s *StateSyncer) SyncBFTBlocks(begin, end uint64, metaHash *types.Hash, blockCh chan *pb.Block) error {
rangeHeights, err := s.calcRangeHeight(begin, end)
if err != nil {
return err
}
var parentBlockHash *types.Hash
for i, rangeHeight := range rangeHeights {
if i == 0 {
parentBlockHash = metaHash
}
rangeTmp := rangeHeight
headers := s.syncQuorumRangeBlockHeaders(rangeTmp, parentBlockHash)
if headers == nil {
return fmt.Errorf("fetch and verify the quorum peers' block header error: %v", rangeTmp)
}
blocks := s.syncRangeBlocks(headers)
if blocks == nil {
return fmt.Errorf("fetch and verify peers' block error: %v", rangeTmp)
}
for _, block := range blocks {
blockCh <- block
}
}
blockCh <- nil
return nil
}
func (s *StateSyncer) syncQuorumRangeBlockHeaders(rangeHeight *rangeHeight, parentBlockHash *types.Hash) []*pb.BlockHeader {
var isQuorum bool
var hash *types.Hash
latestBlockHeaderCounter := make(map[*types.Hash]uint64)
blockHeadersM := make(map[*types.Hash][]*pb.BlockHeader)
fetchAndVerifyBlockHeaders := func(id uint64) {
s.logger.WithFields(logrus.Fields{
"begin": rangeHeight.begin,
"end": rangeHeight.end,
"peer_id": id,
}).Info("syncing range block header")
headers, err := s.fetchBlockHeaders(id, rangeHeight.begin, rangeHeight.end)
if err != nil {
s.logger.Errorf("fetch block headers error:%w", err)
return
}
err = s.verifyBlockHeaders(parentBlockHash, headers)
if err != nil {
s.logger.Errorf("check block headers error:%w", err)
return
}
latestBlock := &pb.Block{BlockHeader: headers[len(headers)-1]}
blockHash := latestBlock.Hash()
latestBlockHeaderCounter[blockHash]++
blockHeadersM[blockHash] = headers
}
for _, id := range s.peerIds {
for latestHash, counter := range latestBlockHeaderCounter {
if counter >= s.quorum {
hash = latestHash
isQuorum = true
break
}
}
if isQuorum {
break
}
fetchAndVerifyBlockHeaders(id)
}
if hash == nil {
return nil
}
return blockHeadersM[hash]
}
func (s *StateSyncer) syncRangeBlocks(headers []*pb.BlockHeader) []*pb.Block {
var blocks []*pb.Block
begin := headers[0].Number
end := headers[len(headers)-1].Number
fetchAndVerifyBlocks := func(id uint64) {
s.logger.WithFields(logrus.Fields{
"begin": begin,
"end": end,
"peer_id": id,
}).Info("syncing range block")
fetchBlocks, err := s.fetchBlocks(id, begin, end)
if err != nil {
s.logger.Errorf("fetch block headers error:%w", err)
return
}
for i, block := range fetchBlocks {
err := s.verifyBlock(headers[i], block)
if err != nil {
s.logger.Errorf("check block headers error:%w", err)
return
}
}
blocks = fetchBlocks
}
for _, id := range s.peerIds {
if blocks != nil {
break
}
fetchAndVerifyBlocks(id)
}
return blocks
}
func (s *StateSyncer) randPeers() uint64 {
ids := make([]uint64, 0)
for _, id := range s.peerIds {
_, ok := s.badPeers.Load(id)
if ok {
continue
}
ids = append(ids, id)
}
randIndex := rand.Int63n(int64(len(ids)))
return ids[randIndex]
}
func (s *StateSyncer) calcRangeHeight(begin, end uint64) ([]*rangeHeight, error) {
if begin > end {
return nil, fmt.Errorf("the end height:%d is less than the start height:%d", end, begin)
}
startNo := begin / s.checkpoint
rangeHeights := make([]*rangeHeight, 0)
for ; begin <= end; {
rangeBegin := begin
rangeEnd := (startNo + 1) * s.checkpoint
if rangeEnd > end {
rangeEnd = end
}
rangeHeights = append(rangeHeights, &rangeHeight{
begin: rangeBegin,
end: rangeEnd,
})
begin = rangeEnd + 1
startNo++
}
return rangeHeights, nil
}
func (s *StateSyncer) fetchBlockHeaders(id uint64, begin, end uint64) ([]*pb.BlockHeader, error) {
if begin > end {
return nil, fmt.Errorf("the end height:%d is less than the start height:%d", end, begin)
}
req := &pb.GetBlockHeadersRequest{
Start: begin,
End: end,
}
data, err := req.Marshal()
if err != nil {
return nil, err
}
m := &pb.Message{
Type: pb.Message_GET_BLOCK_HEADERS,
Data: data,
}
res, err := s.peerMgr.Send(id, m)
if err != nil {
return nil, err
}
blockHeaders := &pb.GetBlockHeadersResponse{}
if err := blockHeaders.Unmarshal(res.Data); err != nil {
return nil, err
}
return blockHeaders.BlockHeaders, nil
}
func (s *StateSyncer) fetchBlocks(id uint64, begin, end uint64) ([]*pb.Block, error) {
if begin > end {
return nil, fmt.Errorf("the end height:%d is less than the start height: %d", end, begin)
}
req := &pb.GetBlocksRequest{
Start: begin,
End: end,
}
data, err := req.Marshal()
if err != nil {
return nil, err
}
m := &pb.Message{
Type: pb.Message_GET_BLOCKS,
Data: data,
}
res, err := s.peerMgr.Send(id, m)
if err != nil {
return nil, err
}
blocks := &pb.GetBlocksResponse{}
if err := blocks.Unmarshal(res.Data); err != nil {
return nil, err
}
return blocks.Blocks, nil
}
func (s *StateSyncer) verifyBlockHeaders(parentHash *types.Hash, headers []*pb.BlockHeader) error {
if parentHash == nil || len(headers) == 0 {
return fmt.Errorf("args must not be nil or empty")
}
for _, header := range headers {
block := &pb.Block{BlockHeader: header}
hash := block.Hash()
ok, _ := parentHash.Equals(header.ParentHash)
if !ok {
return fmt.Errorf("block number is %d, hash is %s, but parent hash is %s", header.Number, hash.Hash, header.ParentHash)
}
parentHash = hash
}
return nil
}
func (s *StateSyncer) verifyBlock(header *pb.BlockHeader, block *pb.Block) error {
if header == nil || block == nil {
return fmt.Errorf("args must not be nil or empty")
}
originBlock := &pb.Block{BlockHeader: header}
hash := originBlock.Hash()
//todo(jz): need to calc txs merkle root and compare with block's tx root
ok, _ := hash.Equals(block.BlockHash)
if !ok {
return fmt.Errorf("block hash is not equals, number is %d", block.Height())
}
return nil
}

View File

@ -0,0 +1,14 @@
package syncer
import (
"github.com/meshplus/bitxhub-kit/types"
"github.com/meshplus/bitxhub-model/pb"
)
type Syncer interface {
// SyncCFTBlocks fetches the block list from other node, and just fetches but not verifies the block
SyncCFTBlocks(begin, end uint64, blockCh chan *pb.Block) error
// SyncBFTBlocks fetches the block list from quorum nodes, and verifies all the block
SyncBFTBlocks(begin, end uint64, metaHash *types.Hash, blockCh chan *pb.Block) error
}

View File

@ -0,0 +1,177 @@
package syncer
import (
"fmt"
"io/ioutil"
"strings"
"testing"
"time"
"github.com/golang/mock/gomock"
crypto2 "github.com/libp2p/go-libp2p-core/crypto"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/meshplus/bitxhub-kit/crypto"
"github.com/meshplus/bitxhub-kit/crypto/asym"
"github.com/meshplus/bitxhub-kit/crypto/asym/ecdsa"
"github.com/meshplus/bitxhub-kit/log"
"github.com/meshplus/bitxhub-model/pb"
"github.com/meshplus/bitxhub/internal/ledger/mock_ledger"
"github.com/meshplus/bitxhub/internal/repo"
"github.com/meshplus/bitxhub/pkg/cert"
"github.com/meshplus/bitxhub/pkg/peermgr"
"github.com/stretchr/testify/require"
)
func TestStateSyncer_SyncCFTBlocks(t *testing.T) {
peerCnt := 3
swarms := NewSwarms(t, peerCnt)
time.Sleep(2 * time.Second)
otherPeers := swarms[0].OtherPeers()
peerIds := make([]uint64, 0)
for id, _ := range otherPeers {
peerIds = append(peerIds, id)
}
logger := log.NewWithModule("syncer")
syncer, err := New(10, swarms[0], 2, peerIds, logger)
require.Nil(t, err)
begin := 2
end := 100
blockCh := make(chan *pb.Block, 1024)
go syncer.SyncCFTBlocks(uint64(begin), uint64(end), blockCh)
blocks := make([]*pb.Block, 0)
for block := range blockCh {
if block == nil {
break
}
blocks = append(blocks, block)
}
require.Equal(t, len(blocks), end-begin+1)
}
//
//func TestStateSyncer_SyncBFTBlocks(t *testing.T) {
//
//}
func genKeysAndConfig(t *testing.T, peerCnt int) ([]crypto2.PrivKey, []crypto.PrivateKey, []string, []string) {
var nodeKeys []crypto2.PrivKey
var privKeys []crypto.PrivateKey
var peers []string
var ids []string
port := 5001
for i := 0; i < peerCnt; i++ {
key, err := asym.GenerateKeyPair(crypto.ECDSA_P256)
require.Nil(t, err)
libp2pKey, err := convertToLibp2pPrivKey(key)
require.Nil(t, err)
nodeKeys = append(nodeKeys, libp2pKey)
id, err := peer.IDFromPublicKey(libp2pKey.GetPublic())
require.Nil(t, err)
peer := fmt.Sprintf("/ip4/127.0.0.1/tcp/%d/p2p/", port)
peers = append(peers, peer)
ids = append(ids, id.String())
port++
privKey, err := asym.GenerateKeyPair(crypto.Secp256k1)
require.Nil(t, err)
privKeys = append(privKeys, privKey)
}
return nodeKeys, privKeys, peers, ids
}
func convertToLibp2pPrivKey(privateKey crypto.PrivateKey) (crypto2.PrivKey, error) {
ecdsaPrivKey, ok := privateKey.(*ecdsa.PrivateKey)
if !ok {
return nil, fmt.Errorf("convert to libp2p private key: not ecdsa private key")
}
libp2pPrivKey, _, err := crypto2.ECDSAKeyPairFromKey(ecdsaPrivKey.K)
if err != nil {
return nil, err
}
return libp2pPrivKey, nil
}
func peers(id uint64, addrs []string, ids []string) []*repo.NetworkNodes {
m := make([]*repo.NetworkNodes, 0, len(addrs))
for i, addr := range addrs {
m = append(m, &repo.NetworkNodes{
ID: uint64(i + 1),
Account: "",
Pid: ids[i],
Hosts: []string{addr},
})
}
return m
}
func NewSwarms(t *testing.T, peerCnt int) []*peermgr.Swarm {
var swarms []*peermgr.Swarm
nodeKeys, privKeys, addrs, ids := genKeysAndConfig(t, peerCnt)
mockCtl := gomock.NewController(t)
mockLedger := mock_ledger.NewMockLedger(mockCtl)
mockLedger.EXPECT().GetBlock(gomock.Any()).Return(&pb.Block{
BlockHeader: &pb.BlockHeader{
Number: 1,
},
}, nil).AnyTimes()
agencyData, err := ioutil.ReadFile("testdata/agency.cert")
require.Nil(t, err)
nodeData, err := ioutil.ReadFile("testdata/node.cert")
require.Nil(t, err)
caData, err := ioutil.ReadFile("testdata/ca.cert")
require.Nil(t, err)
cert, err := cert.ParseCert(caData)
require.Nil(t, err)
for i := 0; i < peerCnt; i++ {
repo := &repo.Repo{
Key: &repo.Key{},
NetworkConfig: &repo.NetworkConfig{
N: uint64(peerCnt),
ID: uint64(i + 1),
},
Certs: &repo.Certs{
NodeCertData: nodeData,
AgencyCertData: agencyData,
CACert: cert,
},
Config: &repo.Config{
Ping: repo.Ping{
Enable: true,
Duration: 2 * time.Second,
},
},
}
idx := strings.LastIndex(addrs[i], "/p2p/")
local := addrs[i][:idx]
repo.NetworkConfig.LocalAddr = local
repo.Key.Libp2pPrivKey = nodeKeys[i]
repo.Key.PrivKey = privKeys[i]
repo.NetworkConfig.Nodes = peers(uint64(i), addrs, ids)
swarm, err := peermgr.New(repo, log.NewWithModule(fmt.Sprintf("swarm%d", i)), mockLedger)
require.Nil(t, err)
err = swarm.Start()
require.Nil(t, err)
swarms = append(swarms, swarm)
}
return swarms
}

16
pkg/order/syncer/testdata/agency.cert vendored Normal file
View File

@ -0,0 +1,16 @@
-----BEGIN CERTIFICATE-----
MIICkjCCAjegAwIBAgIDAoB1MAoGCCqGSM49BAMCMIGhMQswCQYDVQQGEwJDTjER
MA8GA1UECBMIWmhlSmlhbmcxETAPBgNVBAcTCEhhbmdaaG91MR8wDQYDVQQJEwZz
dHJlZXQwDgYDVQQJEwdhZGRyZXNzMQ8wDQYDVQQREwYzMjQwMDAxEzARBgNVBAoT
Ckh5cGVyY2hhaW4xEDAOBgNVBAsTB0JpdFhIdWIxEzARBgNVBAMTCmJpdHhodWIu
Y24wIBcNMjAwODExMDUwNzEzWhgPMjA3MDA3MzAwNTA3MTNaMIGaMQswCQYDVQQG
EwJDTjERMA8GA1UECBMIWmhlSmlhbmcxETAPBgNVBAcTCEhhbmdaaG91MR8wDQYD
VQQJEwZzdHJlZXQwDgYDVQQJEwdhZGRyZXNzMQ8wDQYDVQQREwYzMjQwMDAxDzAN
BgNVBAoTBkFnZW5jeTEQMA4GA1UECxMHQml0WEh1YjEQMA4GA1UEAxMHQml0WEh1
YjBZMBMGByqGSM49AgEGCCqGSM49AwEHA0IABETCj+MAoveSsjSLGr3y8F5IEccL
Afn8d9rmw3ON24Y6NzoqV72T+kvXP7lJlj+7bhYJRtXO3FiPDBp1fFQb1tyjYTBf
MA4GA1UdDwEB/wQEAwIBpjAPBgNVHSUECDAGBgRVHSUAMA8GA1UdEwEB/wQFMAMB
Af8wKwYDVR0jBCQwIoAgFjEjLDiDWrdUwpIcMxUaxWQSVMS7agCqo44QcOAMlYMw
CgYIKoZIzj0EAwIDSQAwRgIhAJeZBApwY2BtYXK28Wa0gzJ8Nc9pjhtX2UFoc7cl
YilwAiEA3m60eyYQj/BiSDZ8+kZbUJO2C7y0HaT1xwNdjSZBFhI=
-----END CERTIFICATE-----

16
pkg/order/syncer/testdata/ca.cert vendored Normal file
View File

@ -0,0 +1,16 @@
-----BEGIN CERTIFICATE-----
MIIClTCCAjygAwIBAgIDAKWTMAoGCCqGSM49BAMCMIGhMQswCQYDVQQGEwJDTjER
MA8GA1UECBMIWmhlSmlhbmcxETAPBgNVBAcTCEhhbmdaaG91MR8wDQYDVQQJEwZz
dHJlZXQwDgYDVQQJEwdhZGRyZXNzMQ8wDQYDVQQREwYzMjQwMDAxEzARBgNVBAoT
Ckh5cGVyY2hhaW4xEDAOBgNVBAsTB0JpdFhIdWIxEzARBgNVBAMTCmJpdHhodWIu
Y24wIBcNMjAwODExMDUwNzEyWhgPMjA3MDA3MzAwNTA3MTJaMIGhMQswCQYDVQQG
EwJDTjERMA8GA1UECBMIWmhlSmlhbmcxETAPBgNVBAcTCEhhbmdaaG91MR8wDQYD
VQQJEwZzdHJlZXQwDgYDVQQJEwdhZGRyZXNzMQ8wDQYDVQQREwYzMjQwMDAxEzAR
BgNVBAoTCkh5cGVyY2hhaW4xEDAOBgNVBAsTB0JpdFhIdWIxEzARBgNVBAMTCmJp
dHhodWIuY24wWTATBgcqhkjOPQIBBggqhkjOPQMBBwNCAASznYuB78fMNqgooAnp
YPUfWoQCDAM0nV5YwSj1Rz6RcWKuRAPQooPFb8K2WO6hfRUGtCba+l48FD34R/RP
aajqo18wXTAOBgNVHQ8BAf8EBAMCAaYwDwYDVR0lBAgwBgYEVR0lADAPBgNVHRMB
Af8EBTADAQH/MCkGA1UdDgQiBCAWMSMsOINat1TCkhwzFRrFZBJUxLtqAKqjjhBw
4AyVgzAKBggqhkjOPQQDAgNHADBEAiAW5pb+b2aLuEBJCkaEXfukb/HwFsNm7zmx
Ha6oyk0HYAIgXhsMYlIXKf5RmlBFiEWZZRJTyiubI7NDqN5+JlVBAog=
-----END CERTIFICATE-----

15
pkg/order/syncer/testdata/node.cert vendored Normal file
View File

@ -0,0 +1,15 @@
-----BEGIN CERTIFICATE-----
MIICWjCCAf+gAwIBAgIDDhFLMAoGCCqGSM49BAMCMIGaMQswCQYDVQQGEwJDTjER
MA8GA1UECBMIWmhlSmlhbmcxETAPBgNVBAcTCEhhbmdaaG91MR8wDQYDVQQJEwZz
dHJlZXQwDgYDVQQJEwdhZGRyZXNzMQ8wDQYDVQQREwYzMjQwMDAxDzANBgNVBAoT
BkFnZW5jeTEQMA4GA1UECxMHQml0WEh1YjEQMA4GA1UEAxMHQml0WEh1YjAgFw0y
MDA4MTEwNTA3MTNaGA8yMDcwMDczMDA1MDcxM1owgZkxCzAJBgNVBAYTAkNOMREw
DwYDVQQIEwhaaGVKaWFuZzERMA8GA1UEBxMISGFuZ1pob3UxHzANBgNVBAkTBnN0
cmVldDAOBgNVBAkTB2FkZHJlc3MxDzANBgNVBBETBjMyNDAwMDEOMAwGA1UEChMF
Tm9kZTExEDAOBgNVBAsTB0JpdFhIdWIxEDAOBgNVBAMTB0JpdFhIdWIwWTATBgcq
hkjOPQIBBggqhkjOPQMBBwNCAATgjTYEnavxerFuEKJ8C39QUY12xh/TC2E5V7ni
nmQcOgDDRv5HW4sskTSm/WX2D0BMzwb7XE5ATyoDeM9qcurDozEwLzAOBgNVHQ8B
Af8EBAMCAaYwDwYDVR0lBAgwBgYEVR0lADAMBgNVHRMBAf8EAjAAMAoGCCqGSM49
BAMCA0kAMEYCIQD5Oz1xJvFgzYm/lTzoaO/i0ayPVRgSdBwvK6hEICo5lAIhAMtG
aswjd2wVA4zB5GPEmJ/tvPUnxrlOAU67AQMYR4zf
-----END CERTIFICATE-----

View File

@ -28,6 +28,10 @@ func (swarm *Swarm) handleMessage(s network.Stream, data []byte) {
switch m.Type {
case pb.Message_GET_BLOCK:
return swarm.handleGetBlockPack(s, m)
case pb.Message_GET_BLOCK_HEADERS:
return swarm.handleGetBlockHeadersPack(s, m)
case pb.Message_GET_BLOCKS:
return swarm.handleGetBlocksPack(s, m)
case pb.Message_FETCH_CERT:
return swarm.handleFetchCertMessage(s)
case pb.Message_CONSENSUS:
@ -82,6 +86,38 @@ func (swarm *Swarm) handleGetBlockPack(s network.Stream, msg *pb.Message) error
return nil
}
func (swarm *Swarm) handleGetBlockHeadersPack(s network.Stream, msg *pb.Message) error {
req := &pb.GetBlockHeadersRequest{}
if err := req.Unmarshal(msg.Data); err != nil {
return err
}
res := &pb.GetBlockHeadersResponse{}
blockHeaders := make([]*pb.BlockHeader, 0)
for i := req.Start; i <= req.End; i++ {
block, err := swarm.ledger.GetBlock(req.Start)
if err != nil {
return err
}
blockHeaders = append(blockHeaders, block.BlockHeader)
}
res.BlockHeaders = blockHeaders
v, err := res.Marshal()
if err != nil {
return err
}
m := &pb.Message{
Type: pb.Message_GET_BLOCK_HEADERS_ACK,
Data: v,
}
if err := swarm.SendWithStream(s, m); err != nil {
return err
}
return nil
}
func (swarm *Swarm) handleFetchCertMessage(s network.Stream) error {
certs := &model.CertsMessage{
AgencyCert: swarm.repo.Certs.AgencyCertData,
@ -251,3 +287,36 @@ func (swarm *Swarm) handleFetchIBTPSignMessage(s network.Stream, data []byte) {
swarm.logger.Errorf("send asset exchange sign back: %s", err)
}
}
func (swarm *Swarm) handleGetBlocksPack(s network.Stream, msg *pb.Message) error {
req := &pb.GetBlocksRequest{}
if err := req.Unmarshal(msg.Data); err != nil {
return err
}
res := &pb.GetBlocksResponse{}
blocks := make([]*pb.Block, 0)
for i := req.Start; i <= req.End; i++ {
block, err := swarm.ledger.GetBlock(req.Start)
if err != nil {
return err
}
blocks = append(blocks, block)
}
res.Blocks = blocks
v, err := res.Marshal()
if err != nil {
return err
}
m := &pb.Message{
Type: pb.Message_GET_BLOCKS_ACK,
Data: v,
}
if err := swarm.SendWithStream(s, m); err != nil {
return err
}
return nil
}

View File

@ -186,22 +186,77 @@ func TestSwarm_Send(t *testing.T) {
require.Nil(t, err)
require.Equal(t, uint64(1), block.BlockHeader.Number)
req := pb.GetBlocksRequest{
Start: 1,
End: 1,
}
data, err := req.Marshal()
require.Nil(t, err)
fetchBlocksMsg := &pb.Message{
Type: pb.Message_GET_BLOCKS,
Data: data,
}
err = retry.Retry(func(attempt uint) error {
res, err = swarms[2].Send(1, fetchBlocksMsg)
if err != nil {
swarms[2].logger.Errorf(err.Error())
return err
}
return nil
}, strategy.Wait(50*time.Millisecond))
require.Nil(t, err)
require.Equal(t, pb.Message_GET_BLOCKS_ACK, res.Type)
var getBlocksRes pb.GetBlocksResponse
err = getBlocksRes.Unmarshal(res.Data)
require.Nil(t, err)
require.Equal(t, 1, len(getBlocksRes.Blocks))
getBlockHeadersReq := pb.GetBlockHeadersRequest{
Start: 1,
End: 1,
}
data, err = getBlockHeadersReq.Marshal()
require.Nil(t, err)
fetchBlockHeadersMsg := &pb.Message{
Type: pb.Message_GET_BLOCK_HEADERS,
Data: data,
}
err = retry.Retry(func(attempt uint) error {
res, err = swarms[2].Send(4, fetchBlockHeadersMsg)
if err != nil {
swarms[2].logger.Errorf(err.Error())
return err
}
return nil
}, strategy.Wait(50*time.Millisecond))
require.Nil(t, err)
require.Equal(t, pb.Message_GET_BLOCK_HEADERS_ACK, res.Type)
var getBlockHeaderssRes pb.GetBlockHeadersResponse
err = getBlockHeaderssRes.Unmarshal(res.Data)
require.Nil(t, err)
require.Equal(t, 1, len(getBlockHeaderssRes.BlockHeaders))
fetchBlockSignMsg := &pb.Message{
Type: pb.Message_FETCH_BLOCK_SIGN,
Data: []byte("1"),
}
err = retry.Retry(func(attempt uint) error {
res, err = swarms[0].Send(3, fetchBlockSignMsg)
res, err = swarms[1].Send(3, fetchBlockSignMsg)
if err != nil {
swarms[0].logger.Errorf(err.Error())
swarms[1].logger.Errorf(err.Error())
return err
}
return nil
}, strategy.Wait(50*time.Millisecond))
require.Nil(t, err)
require.Equal(t, pb.Message_FETCH_BLOCK_SIGN_ACK, res.Type)
require.Nil(t, err)
require.NotNil(t, res.Data)
fetchAESMsg := &pb.Message{
@ -219,7 +274,6 @@ func TestSwarm_Send(t *testing.T) {
}, strategy.Wait(50*time.Millisecond))
require.Nil(t, err)
require.Equal(t, pb.Message_FETCH_ASSET_EXCHANGE_SIGN_ACK, res.Type)
require.Nil(t, err)
require.NotNil(t, res.Data)
fetchIBTPSignMsg := &pb.Message{
@ -237,7 +291,6 @@ func TestSwarm_Send(t *testing.T) {
}, strategy.Wait(50*time.Millisecond))
require.Nil(t, err)
require.Equal(t, pb.Message_FETCH_IBTP_SIGN_ACK, res.Type)
require.Nil(t, err)
require.NotNil(t, res.Data)
}

View File

@ -327,13 +327,13 @@ func (swarm *Swarm) verifyCert(id uint64) error {
}
if err := verifyCerts(nodeCert, agencyCert, swarm.repo.Certs.CACert); err != nil {
err = swarm.p2p.Disconnect(swarm.routers[id].Pid)
if err != nil {
return fmt.Errorf("disconnect peer: %w", err)
}
return fmt.Errorf("verify certs: %w", err)
}
err = swarm.p2p.Disconnect(swarm.routers[id].Pid)
if err != nil {
return fmt.Errorf("disconnect peer: %w", err)
}
return nil
}
@ -490,3 +490,4 @@ func constructMultiaddr(vpInfo *pb.VpInfo) (*peer.AddrInfo, error) {
}
return addrInfo, nil
}