Merge pull request #297 from meshplus/fix-swarm-connect

fix:fix swarm connect with less peers
This commit is contained in:
Sandy Zhou 2020-12-22 15:56:29 +08:00 committed by GitHub
commit 625fca5795
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 29 additions and 10 deletions

View File

@ -2,7 +2,6 @@ package syncer
import ( import (
"fmt" "fmt"
"github.com/meshplus/bitxhub-kit/types"
"io/ioutil" "io/ioutil"
"strings" "strings"
"testing" "testing"
@ -15,6 +14,7 @@ import (
"github.com/meshplus/bitxhub-kit/crypto/asym" "github.com/meshplus/bitxhub-kit/crypto/asym"
"github.com/meshplus/bitxhub-kit/crypto/asym/ecdsa" "github.com/meshplus/bitxhub-kit/crypto/asym/ecdsa"
"github.com/meshplus/bitxhub-kit/log" "github.com/meshplus/bitxhub-kit/log"
"github.com/meshplus/bitxhub-kit/types"
"github.com/meshplus/bitxhub-model/pb" "github.com/meshplus/bitxhub-model/pb"
"github.com/meshplus/bitxhub/internal/ledger/mock_ledger" "github.com/meshplus/bitxhub/internal/ledger/mock_ledger"
"github.com/meshplus/bitxhub/internal/repo" "github.com/meshplus/bitxhub/internal/repo"
@ -27,6 +27,9 @@ func TestStateSyncer_SyncCFTBlocks(t *testing.T) {
peerCnt := 3 peerCnt := 3
swarms := NewSwarms(t, peerCnt) swarms := NewSwarms(t, peerCnt)
for swarms[0].CountConnectedPeers() != 2 {
time.Sleep(100*time.Millisecond)
}
otherPeers := swarms[0].OtherPeers() otherPeers := swarms[0].OtherPeers()
peerIds := make([]uint64, 0) peerIds := make([]uint64, 0)
for id, _ := range otherPeers { for id, _ := range otherPeers {
@ -57,7 +60,9 @@ func TestStateSyncer_SyncBFTBlocks(t *testing.T) {
peerCnt := 4 peerCnt := 4
swarms := NewSwarms(t, peerCnt) swarms := NewSwarms(t, peerCnt)
//time.Sleep(100 * time.Millisecond) for swarms[0].CountConnectedPeers() != 3 {
time.Sleep(100*time.Millisecond)
}
otherPeers := swarms[0].OtherPeers() otherPeers := swarms[0].OtherPeers()
peerIds := make([]uint64, 0) peerIds := make([]uint64, 0)
for id, _ := range otherPeers { for id, _ := range otherPeers {
@ -180,10 +185,6 @@ func NewSwarms(t *testing.T, peerCnt int) []*peermgr.Swarm {
CACert: cert, CACert: cert,
}, },
Config: &repo.Config{ Config: &repo.Config{
Ping: repo.Ping{
Enable: true,
Duration: 2 * time.Second,
},
}, },
} }

View File

@ -163,6 +163,9 @@ func TestSwarm_Send(t *testing.T) {
peerCnt := 4 peerCnt := 4
swarms := NewSwarms(t, peerCnt) swarms := NewSwarms(t, peerCnt)
for swarms[0].CountConnectedPeers() != 3 {
time.Sleep(100*time.Millisecond)
}
msg := &pb.Message{ msg := &pb.Message{
Type: pb.Message_GET_BLOCK, Type: pb.Message_GET_BLOCK,
@ -297,6 +300,10 @@ func TestSwarm_AsyncSend(t *testing.T) {
peerCnt := 4 peerCnt := 4
swarms := NewSwarms(t, peerCnt) swarms := NewSwarms(t, peerCnt)
for swarms[0].CountConnectedPeers() != 3 {
time.Sleep(100*time.Millisecond)
}
orderMsgCh := make(chan events.OrderMessageEvent) orderMsgCh := make(chan events.OrderMessageEvent)
orderMsgSub := swarms[2].SubscribeOrderMessage(orderMsgCh) orderMsgSub := swarms[2].SubscribeOrderMessage(orderMsgCh)

View File

@ -28,6 +28,9 @@ type PeerManager interface {
// Broadcast message to all node // Broadcast message to all node
Broadcast(*pb.Message) error Broadcast(*pb.Message) error
// CountConnectedPeers counts connected peer numbers
CountConnectedPeers() uint64
// Peers // Peers
Peers() map[uint64]*pb.VpInfo Peers() map[uint64]*pb.VpInfo
@ -44,7 +47,7 @@ type PeerManager interface {
DelNode(delID uint64) DelNode(delID uint64)
// UpdateRouter update the local router to quorum router. // UpdateRouter update the local router to quorum router.
UpdateRouter (vpInfos map[uint64]*pb.VpInfo, isNew bool) bool UpdateRouter(vpInfos map[uint64]*pb.VpInfo, isNew bool) bool
// Disconnect disconnect with all vp peers. // Disconnect disconnect with all vp peers.
Disconnect(vpInfos map[uint64]*pb.VpInfo) Disconnect(vpInfos map[uint64]*pb.VpInfo)

View File

@ -18,8 +18,8 @@ import (
"github.com/meshplus/bitxhub/internal/repo" "github.com/meshplus/bitxhub/internal/repo"
"github.com/meshplus/bitxhub/pkg/cert" "github.com/meshplus/bitxhub/pkg/cert"
network "github.com/meshplus/go-lightp2p" network "github.com/meshplus/go-lightp2p"
"github.com/sirupsen/logrus"
ma "github.com/multiformats/go-multiaddr" ma "github.com/multiformats/go-multiaddr"
"github.com/sirupsen/logrus"
) )
const ( const (
@ -467,6 +467,15 @@ func (swarm *Swarm) Disconnect(vpInfos map[uint64]*pb.VpInfo) {
} }
} }
func (swarm *Swarm) CountConnectedPeers() uint64 {
var counter uint64
swarm.connectedPeers.Range(func(k, v interface{}) bool {
counter++
return true
})
return counter
}
func (swarm *Swarm) reset() { func (swarm *Swarm) reset() {
swarm.routers = nil swarm.routers = nil
swarm.multiAddrs = nil swarm.multiAddrs = nil
@ -475,7 +484,7 @@ func (swarm *Swarm) reset() {
} }
func constructMultiaddr(vpInfo *pb.VpInfo) (*peer.AddrInfo, error) { func constructMultiaddr(vpInfo *pb.VpInfo) (*peer.AddrInfo, error) {
addrs := make([]ma.Multiaddr,0) addrs := make([]ma.Multiaddr, 0)
for _, host := range vpInfo.Hosts { for _, host := range vpInfo.Hosts {
addr, err := ma.NewMultiaddr(fmt.Sprintf("%s%s", host, vpInfo.Pid)) addr, err := ma.NewMultiaddr(fmt.Sprintf("%s%s", host, vpInfo.Pid))
if err != nil { if err != nil {
@ -490,4 +499,3 @@ func constructMultiaddr(vpInfo *pb.VpInfo) (*peer.AddrInfo, error) {
} }
return addrInfo, nil return addrInfo, nil
} }