bitxhub/pkg/peermgr/swarm.go

398 lines
9.5 KiB
Go
Raw Normal View History

2020-03-29 21:32:01 +08:00
package peermgr
import (
"context"
"fmt"
2020-12-07 21:48:47 +08:00
"github.com/meshplus/bitxhub-kit/types"
"sync"
2020-03-29 21:32:01 +08:00
"time"
"github.com/Rican7/retry"
"github.com/Rican7/retry/strategy"
"github.com/ethereum/go-ethereum/event"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/protocol"
"github.com/meshplus/bitxhub-model/pb"
"github.com/meshplus/bitxhub/internal/ledger"
"github.com/meshplus/bitxhub/internal/model"
"github.com/meshplus/bitxhub/internal/model/events"
"github.com/meshplus/bitxhub/internal/repo"
"github.com/meshplus/bitxhub/pkg/cert"
2020-08-24 14:17:46 +08:00
network "github.com/meshplus/go-lightp2p"
2020-03-29 21:32:01 +08:00
"github.com/sirupsen/logrus"
)
const (
protocolID protocol.ID = "/B1txHu6/1.0.0" // magic protocol
)
type Swarm struct {
2020-12-07 21:48:47 +08:00
repo *repo.Repo
localID uint64
p2p network.Network
logger logrus.FieldLogger
routers map[uint64]*VPInfo // trace the vp nodes
multiAddrs map[uint64]*peer.AddrInfo
2020-11-05 11:04:53 +08:00
connectedPeers sync.Map
2020-12-07 21:48:47 +08:00
notifiee *notifiee
2020-11-05 11:04:53 +08:00
ledger ledger.Ledger
2020-03-29 21:32:01 +08:00
orderMessageFeed event.Feed
2020-11-05 11:04:53 +08:00
enablePing bool
pingTimeout time.Duration
2020-03-29 21:32:01 +08:00
ctx context.Context
cancel context.CancelFunc
}
func New(repoConfig *repo.Repo, logger logrus.FieldLogger, ledger ledger.Ledger) (*Swarm, error) {
2020-08-24 14:17:46 +08:00
var protocolIDs = []string{string(protocolID)}
2020-12-07 21:48:47 +08:00
// init peers with ips and hosts
peers := make([]string, 0)
for _, addr := range repo.NetworkConfig.Nodes {
peers = append(peers, addr.Addr)
}
validators := make(map[uint64]*VPInfo)
for i, node := range repo.NetworkConfig.Nodes {
keyAddr := *types.NewAddressByStr(repo.AllAddresses.Addresses[i])
IpInfo := repo.NetworkConfig.VpNodes[node.ID]
vpInfo := &VPInfo{
KeyAddr: keyAddr.String(),
IPAddr: IpInfo.ID.String(),
}
validators[node.ID] = vpInfo
}
multiAddrs := make(map[uint64]*peer.AddrInfo)
for _, node := range repo.NetworkConfig.Nodes {
if node.ID == repo.NetworkConfig.ID {
continue
}
IpInfo := repo.NetworkConfig.VpNodes[node.ID]
addInfo := &peer.AddrInfo{
ID:IpInfo.ID,
Addrs:IpInfo.Addrs,
}
multiAddrs[node.ID] = addInfo
}
notifiee := newNotifiee(validators, logger)
2020-04-21 22:56:19 +08:00
p2p, err := network.New(
network.WithLocalAddr(repoConfig.NetworkConfig.LocalAddr),
network.WithPrivateKey(repoConfig.Key.Libp2pPrivKey),
2020-08-24 14:17:46 +08:00
network.WithProtocolIDs(protocolIDs),
2020-04-21 22:56:19 +08:00
network.WithLogger(logger),
2020-12-07 21:48:47 +08:00
// enable discovery
network.WithBootstrap(peers),
network.WithNotify(notifiee),
2020-03-29 21:32:01 +08:00
)
if err != nil {
return nil, fmt.Errorf("create p2p: %w", err)
}
peers, err := repoConfig.NetworkConfig.GetPeers()
if err != nil {
return nil, fmt.Errorf("get peers:%w", err)
}
2020-03-29 21:32:01 +08:00
ctx, cancel := context.WithCancel(context.Background())
return &Swarm{
repo: repoConfig,
2020-12-07 21:48:47 +08:00
localID: repoConfig.NetworkConfig.ID,
2020-03-29 21:32:01 +08:00
p2p: p2p,
logger: logger,
ledger: ledger,
enablePing: repoConfig.Config.Ping.Enable,
pingTimeout: repoConfig.Config.Ping.Duration,
2020-12-07 21:48:47 +08:00
routers: peers,
multiAddrs: multiAddrs,
connectedPeers: sync.Map{},
2020-12-07 21:48:47 +08:00
notifiee: notifiee,
2020-03-29 21:32:01 +08:00
ctx: ctx,
cancel: cancel,
}, nil
}
func (swarm *Swarm) Start() error {
2020-04-21 22:56:19 +08:00
swarm.p2p.SetMessageHandler(swarm.handleMessage)
2020-03-29 21:32:01 +08:00
if err := swarm.p2p.Start(); err != nil {
return err
}
2020-12-07 21:48:47 +08:00
for id, addr := range swarm.multiAddrs {
2020-03-29 21:32:01 +08:00
go func(id uint64, addr *peer.AddrInfo) {
if err := retry.Retry(func(attempt uint) error {
2020-08-24 14:17:46 +08:00
if err := swarm.p2p.Connect(*addr); err != nil {
2020-03-29 21:32:01 +08:00
swarm.logger.WithFields(logrus.Fields{
"node": id,
"error": err,
}).Error("Connect failed")
return err
}
if err := swarm.verifyCertOrDisconnect(id); err != nil {
2020-03-29 21:32:01 +08:00
if attempt != 0 && attempt%5 == 0 {
swarm.logger.WithFields(logrus.Fields{
"node": id,
"error": err,
}).Error("Verify cert")
}
return err
}
swarm.logger.WithFields(logrus.Fields{
"node": id,
}).Info("Connect successfully")
swarm.connectedPeers.Store(id, addr)
2020-03-29 21:32:01 +08:00
return nil
},
strategy.Wait(1*time.Second),
); err != nil {
swarm.logger.Error(err)
}
}(id, addr)
}
2020-11-05 11:04:53 +08:00
if swarm.enablePing {
go swarm.Ping()
}
2020-03-29 21:32:01 +08:00
return nil
}
func (swarm *Swarm) Stop() error {
swarm.cancel()
return nil
}
func (swarm *Swarm) verifyCertOrDisconnect(id uint64) error {
if err := swarm.verifyCert(id); err != nil {
2020-12-07 21:48:47 +08:00
if err = swarm.p2p.Disconnect(swarm.routers[id].IPAddr); err != nil {
return err
}
return err
}
return nil
}
2020-11-05 11:04:53 +08:00
func (swarm *Swarm) Ping() {
ticker := time.NewTicker(swarm.pingTimeout)
for {
select {
case <-ticker.C:
fields := logrus.Fields{}
swarm.connectedPeers.Range(func(key, value interface{}) bool {
info := value.(*peer.AddrInfo)
pingCh, err := swarm.p2p.Ping(info.ID.String())
if err != nil {
return true
}
select {
case res := <-pingCh:
fields[fmt.Sprintf("%d", key.(uint64))] = res.RTT
case <-time.After(time.Second * 5):
swarm.logger.Errorf("ping to node %d timeout", key.(uint64))
}
return true
})
2020-12-07 21:48:47 +08:00
swarm.logger.WithFields(fields).Info("ping time")
2020-11-05 11:04:53 +08:00
}
}
}
func (swarm *Swarm) AsyncSend(id uint64, msg *pb.Message) error {
2020-12-07 21:48:47 +08:00
var (
addr string
err error
)
if addr, err = swarm.findPeer(id); err != nil {
2020-03-29 21:32:01 +08:00
return fmt.Errorf("p2p send: %w", err)
}
data, err := msg.Marshal()
if err != nil {
return err
}
2020-08-24 14:17:46 +08:00
return swarm.p2p.AsyncSend(addr, data)
2020-03-29 21:32:01 +08:00
}
2020-08-24 14:17:46 +08:00
func (swarm *Swarm) SendWithStream(s network.Stream, msg *pb.Message) error {
2020-03-29 21:32:01 +08:00
data, err := msg.Marshal()
if err != nil {
return err
}
2020-08-24 14:17:46 +08:00
return s.AsyncSend(data)
2020-03-29 21:32:01 +08:00
}
func (swarm *Swarm) Send(id uint64, msg *pb.Message) (*pb.Message, error) {
2020-12-07 21:48:47 +08:00
var (
addr string
err error
)
if addr, err = swarm.findPeer(id); err != nil {
2020-03-29 21:32:01 +08:00
return nil, fmt.Errorf("check id: %w", err)
}
data, err := msg.Marshal()
if err != nil {
return nil, err
}
2020-08-24 14:17:46 +08:00
ret, err := swarm.p2p.Send(addr, data)
2020-03-29 21:32:01 +08:00
if err != nil {
return nil, fmt.Errorf("sync send: %w", err)
}
m := &pb.Message{}
2020-08-24 14:17:46 +08:00
if err := m.Unmarshal(ret); err != nil {
2020-03-29 21:32:01 +08:00
return nil, err
}
return m, nil
}
func (swarm *Swarm) Broadcast(msg *pb.Message) error {
2020-12-07 21:48:47 +08:00
addrs := make([]string, 0, len(swarm.routers))
for id, addr := range swarm.routers {
if id == swarm.localID {
continue
}
addrs = append(addrs, addr.IPAddr)
}
// if we are in adding node but hasn't finished updateN, new node hash will be temporarily recorded
// in swarm.notifiee.newPeer.
// TODO (Peer): keep access goroutine safety
if swarm.notifiee.newPeer != "" {
swarm.logger.Debugf("Broadcast to new peer %s", swarm.notifiee.newPeer)
addrs = append(addrs, swarm.notifiee.newPeer)
2020-03-29 21:32:01 +08:00
}
data, err := msg.Marshal()
if err != nil {
return err
}
2020-08-24 14:17:46 +08:00
return swarm.p2p.Broadcast(addrs, data)
2020-03-29 21:32:01 +08:00
}
2020-12-07 21:48:47 +08:00
func (swarm *Swarm) Peers() map[uint64]*VPInfo {
return swarm.routers
2020-03-29 21:32:01 +08:00
}
func (swarm *Swarm) OtherPeers() map[uint64]*peer.AddrInfo {
2020-12-07 21:48:47 +08:00
addrInfos := make(map[uint64]*peer.AddrInfo)
for id, addr := range swarm.Peers() {
if id == swarm.localID {
continue
}
addrInfo := &peer.AddrInfo{
ID: peer.ID(addr.IPAddr),
}
addrInfos[id] = addrInfo
}
2020-12-07 21:48:47 +08:00
return addrInfos
2020-03-29 21:32:01 +08:00
}
func (swarm *Swarm) SubscribeOrderMessage(ch chan<- events.OrderMessageEvent) event.Subscription {
return swarm.orderMessageFeed.Subscribe(ch)
}
func (swarm *Swarm) verifyCert(id uint64) error {
2020-12-07 21:48:47 +08:00
if _, err := swarm.findPeer(id); err != nil {
return fmt.Errorf("check id: %w", err)
}
2020-03-29 21:32:01 +08:00
msg := &pb.Message{
Type: pb.Message_FETCH_CERT,
}
ret, err := swarm.Send(id, msg)
2020-03-29 21:32:01 +08:00
if err != nil {
return fmt.Errorf("sync send: %w", err)
}
certs := &model.CertsMessage{}
if err := certs.Unmarshal(ret.Data); err != nil {
return fmt.Errorf("unmarshal certs: %w", err)
}
nodeCert, err := cert.ParseCert(certs.NodeCert)
if err != nil {
return fmt.Errorf("parse node cert: %w", err)
}
agencyCert, err := cert.ParseCert(certs.AgencyCert)
if err != nil {
return fmt.Errorf("parse agency cert: %w", err)
}
if err := verifyCerts(nodeCert, agencyCert, swarm.repo.Certs.CACert); err != nil {
return fmt.Errorf("verify certs: %w", err)
}
2020-12-07 21:48:47 +08:00
err = swarm.p2p.Disconnect(swarm.routers[id].IPAddr)
2020-03-29 21:32:01 +08:00
if err != nil {
return fmt.Errorf("disconnect peer: %w", err)
}
return nil
}
2020-12-07 21:48:47 +08:00
func (swarm *Swarm) findPeer(id uint64) (string, error) {
if swarm.routers[id] != nil {
return swarm.routers[id].IPAddr, nil
2020-03-29 21:32:01 +08:00
}
2020-12-07 21:48:47 +08:00
swarm.notifiee.mu.Lock()
newPeerAddr := swarm.notifiee.newPeer
swarm.notifiee.mu.Unlock()
// new node id should be len(swarm.peers)+1,
if uint64(len(swarm.routers)+1) == id && newPeerAddr != "" {
return newPeerAddr, nil
}
return "", fmt.Errorf("wrong id: %d", id)
}
2020-03-29 21:32:01 +08:00
2020-12-07 21:48:47 +08:00
func (swarm *Swarm) AddNode(newNodeID uint64, vpInfo *VPInfo) {
if _, ok := swarm.routers[newNodeID]; ok {
swarm.logger.Warningf("VP[ID: %d, IpAddr: %s] has already exist in routing table", newNodeID, vpInfo.IPAddr)
return
}
swarm.logger.Infof("Add vp[ID: %d, IpAddr: %s] into routing table", newNodeID, vpInfo.IPAddr)
swarm.routers[newNodeID] = vpInfo
for index, p := range swarm.routers {
swarm.logger.Debugf("=====ID: %d, Addr: %v=====", index+1, p)
}
// update notifiee info
swarm.notifiee.mu.Lock()
swarm.notifiee.peers[newNodeID] = vpInfo
if swarm.notifiee.newPeer == vpInfo.IPAddr {
swarm.logger.Info("Clear notifiee newPeer info")
swarm.notifiee.newPeer = ""
} else {
swarm.logger.Warningf("Received vpInfo %v, but it doesn't equal to notifiee newPeer %s", vpInfo, swarm.notifiee.newPeer)
}
swarm.notifiee.mu.Unlock()
}
func (swarm *Swarm) DelNode(delID uint64) {
}
func (swarm *Swarm) UpdateRouter(vpInfos map[uint64]*VPInfo) {
swarm.logger.Infof("Update router: %+v", vpInfos)
swarm.routers = vpInfos
// update notifiee info
swarm.notifiee.mu.Lock()
swarm.notifiee.peers = vpInfos
swarm.notifiee.mu.Unlock()
2020-03-29 21:32:01 +08:00
}