Merge branch 'master' into feat/fit-did
This commit is contained in:
commit
bd0fac5bb4
|
@ -19,6 +19,7 @@ pid
|
|||
scripts/prepare
|
||||
logs
|
||||
mock_*/
|
||||
!mock_peermgr
|
||||
.fabric-samples
|
||||
coverage.txt
|
||||
docs/.vuepress/dist
|
||||
|
|
4
Makefile
4
Makefile
|
@ -79,4 +79,8 @@ linter:
|
|||
cluster:install${TAGS}
|
||||
@cd scripts && bash cluster.sh
|
||||
|
||||
## make solo: Run one node in solo mode
|
||||
solo:install${TAGS}
|
||||
@cd scripts && bash solo.sh
|
||||
|
||||
.PHONY: tester build
|
||||
|
|
|
@ -2,8 +2,8 @@ package grpc
|
|||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
|
||||
"github.com/hokaccha/go-prettyjson"
|
||||
"github.com/meshplus/bitxhub-model/pb"
|
||||
)
|
||||
|
||||
|
@ -19,7 +19,7 @@ func GetChainStatus(cbs *ChainBrokerService) (*pb.Response, error) {
|
|||
|
||||
func GetValidators(cbs *ChainBrokerService) (*pb.Response, error) {
|
||||
addresses := cbs.genesis.Addresses
|
||||
v, err := prettyjson.Marshal(addresses)
|
||||
v, err := json.Marshal(addresses)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
|
@ -51,8 +51,7 @@ func start(ctx *cli.Context) error {
|
|||
log.WithPersist(true),
|
||||
log.WithFilePath(filepath.Join(repoRoot, repo.Config.Log.Dir)),
|
||||
log.WithFileName(repo.Config.Log.Filename),
|
||||
log.WithMaxSize(2*1024*1024),
|
||||
log.WithMaxAge(24*time.Hour),
|
||||
log.WithMaxAge(90*24*time.Hour),
|
||||
log.WithRotationTime(24*time.Hour),
|
||||
)
|
||||
if err != nil {
|
||||
|
@ -145,7 +144,7 @@ func handleShutdown(node *app.BitXHub, wg *sync.WaitGroup) {
|
|||
// runtimePProf will record the cpu or memory profiles every 5 second.
|
||||
func runtimePProf(repoRoot, mode string, id uint64, duration time.Duration) {
|
||||
tick := time.NewTicker(duration)
|
||||
rootPath := filepath.Join(repoRoot,"/pprof/")
|
||||
rootPath := filepath.Join(repoRoot, "/pprof/")
|
||||
exist := fileExist(rootPath)
|
||||
if !exist {
|
||||
err := os.Mkdir(rootPath, os.ModePerm)
|
||||
|
@ -158,13 +157,13 @@ func runtimePProf(repoRoot, mode string, id uint64, duration time.Duration) {
|
|||
var cpuFile *os.File
|
||||
if mode == "cpu" {
|
||||
subPath := fmt.Sprint("cpu-", time.Now().Format("20060102-15:04:05"))
|
||||
cpuPath := filepath.Join(rootPath,subPath)
|
||||
cpuPath := filepath.Join(rootPath, subPath)
|
||||
cpuFile, _ = os.Create(cpuPath)
|
||||
_ = pprof.StartCPUProfile(cpuFile)
|
||||
}
|
||||
for {
|
||||
select {
|
||||
case <- tick.C:
|
||||
case <-tick.C:
|
||||
switch mode {
|
||||
case "cpu":
|
||||
pprof.StopCPUProfile()
|
||||
|
|
|
@ -21,8 +21,8 @@ solo = false
|
|||
allowed_origins = ["*"]
|
||||
|
||||
[ping]
|
||||
enable = false
|
||||
duration = "2s"
|
||||
enable = true
|
||||
duration = "15s"
|
||||
|
||||
[security]
|
||||
enable_tls = false
|
||||
|
|
|
@ -232,6 +232,7 @@ func TestInterchainManager_HandleIBTP(t *testing.T) {
|
|||
from := types.NewAddress([]byte{0}).String()
|
||||
to := types.NewAddress([]byte{1}).String()
|
||||
mockStub.EXPECT().Set(gomock.Any(), gomock.Any()).AnyTimes()
|
||||
mockStub.EXPECT().SetObject(gomock.Any(), gomock.Any()).AnyTimes()
|
||||
f1 := mockStub.EXPECT().Get(appchainMgr.PREFIX+from).Return(false, nil)
|
||||
|
||||
interchain := pb.Interchain{
|
||||
|
@ -290,7 +291,7 @@ func TestInterchainManager_HandleIBTP(t *testing.T) {
|
|||
|
||||
res = im.HandleIBTP(ibtp)
|
||||
assert.False(t, res.Ok)
|
||||
assert.Equal(t, "wrong index, required 2, but 0", string(res.Result))
|
||||
assert.Equal(t, "index already exists, required 2, but 0", string(res.Result))
|
||||
|
||||
ibtp.Index = 2
|
||||
res = im.HandleIBTP(ibtp)
|
||||
|
|
|
@ -186,7 +186,10 @@ func (x *InterchainManager) checkIBTP(ibtp *pb.IBTP, interchain *pb.Interchain)
|
|||
}
|
||||
|
||||
idx := interchain.InterchainCounter[ibtp.To]
|
||||
if idx+1 != ibtp.Index {
|
||||
if ibtp.Index <= idx {
|
||||
return fmt.Errorf(fmt.Sprintf("index already exists, required %d, but %d", idx+1, ibtp.Index))
|
||||
}
|
||||
if ibtp.Index > idx+1 {
|
||||
return fmt.Errorf(fmt.Sprintf("wrong index, required %d, but %d", idx+1, ibtp.Index))
|
||||
}
|
||||
} else {
|
||||
|
@ -195,10 +198,12 @@ func (x *InterchainManager) checkIBTP(ibtp *pb.IBTP, interchain *pb.Interchain)
|
|||
}
|
||||
|
||||
idx := interchain.ReceiptCounter[ibtp.To]
|
||||
if idx+1 != ibtp.Index {
|
||||
if interchain.SourceReceiptCounter[ibtp.To]+1 != ibtp.Index {
|
||||
return fmt.Errorf("wrong receipt index, required %d, but %d", idx+1, ibtp.Index)
|
||||
}
|
||||
if ibtp.Index <= idx {
|
||||
return fmt.Errorf(fmt.Sprintf("receipt index already exists, required %d, but %d", idx+1, ibtp.Index))
|
||||
}
|
||||
|
||||
if ibtp.Index > idx+1 {
|
||||
return fmt.Errorf(fmt.Sprintf("wrong receipt index, required %d, but %d", idx+1, ibtp.Index))
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -224,6 +229,8 @@ func (x *InterchainManager) ProcessIBTP(ibtp *pb.IBTP, interchain *pb.Interchain
|
|||
ic, _ := x.getInterchain(ibtp.To)
|
||||
ic.SourceReceiptCounter[ibtp.From] = ibtp.Index
|
||||
x.setInterchain(ibtp.To, ic)
|
||||
x.SetObject(x.indexReceiptMapKey(ibtp.ID()), x.GetTxHash())
|
||||
|
||||
}
|
||||
|
||||
x.PostInterchainEvent(m)
|
||||
|
@ -402,3 +409,7 @@ func AppchainKey(id string) string {
|
|||
func (x *InterchainManager) indexMapKey(id string) string {
|
||||
return fmt.Sprintf("index-tx-%s", id)
|
||||
}
|
||||
|
||||
func (x *InterchainManager) indexReceiptMapKey(id string) string {
|
||||
return fmt.Sprintf("index-receipt-tx-%s", id)
|
||||
}
|
||||
|
|
|
@ -153,7 +153,7 @@ func (exec *BlockExecutor) listenExecuteEvent() {
|
|||
"height": block.BlockHeader.Number,
|
||||
"count": len(block.Transactions),
|
||||
"elapse": time.Since(now),
|
||||
}).Infof("Executed block")
|
||||
}).Debug("Executed block")
|
||||
exec.persistC <- blockData
|
||||
case <-exec.ctx.Done():
|
||||
return
|
||||
|
|
|
@ -96,7 +96,7 @@ func (exec *BlockExecutor) listenPreExecuteEvent() {
|
|||
"height": block.BlockHeader.Number,
|
||||
"count": len(block.Transactions),
|
||||
"elapse": time.Since(now),
|
||||
}).Infof("Verified signature")
|
||||
}).Debug("Verified signature")
|
||||
exec.blockC <- block
|
||||
case <-exec.ctx.Done():
|
||||
return
|
||||
|
|
|
@ -28,7 +28,7 @@ func TestReadNewConfig(t *testing.T) {
|
|||
assert.True(t, 4 == cfg.ID)
|
||||
assert.True(t, 4 == cfg.N)
|
||||
assert.True(t, 4 == len(cfg.Nodes))
|
||||
assert.True(t, "/ip4/127.0.0.1/tcp/4001" == cfg.LocalAddr)
|
||||
assert.True(t, "/ip4/0.0.0.0/tcp/4001" == cfg.LocalAddr)
|
||||
assert.True(t, "/ip4/127.0.0.1/tcp/4002/p2p/QmNRgD6djYJERNpDpHqRn3mxjJ9SYiiGWzExNSy4sEmSNL" == cfg.Nodes[0].Addr)
|
||||
assert.True(t, 3 == len(cfg.OtherNodes))
|
||||
}
|
||||
|
|
|
@ -130,6 +130,11 @@ func loadNetworkConfig(repoRoot string) (*NetworkConfig, error) {
|
|||
for _, node := range networkConfig.Nodes {
|
||||
if node.ID == networkConfig.ID {
|
||||
networkConfig.LocalAddr = node.Addr
|
||||
addr, err := ma.NewMultiaddr(networkConfig.LocalAddr)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("new multiaddr: %w", err)
|
||||
}
|
||||
networkConfig.LocalAddr = strings.Replace(networkConfig.LocalAddr, ma.Split(addr)[0].String(), "/ip4/0.0.0.0", -1)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -9,6 +9,9 @@ import (
|
|||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/coreos/etcd/raft"
|
||||
"github.com/coreos/etcd/raft/raftpb"
|
||||
"github.com/gogo/protobuf/sortkeys"
|
||||
"github.com/meshplus/bitxhub-kit/storage"
|
||||
"github.com/meshplus/bitxhub-kit/types"
|
||||
"github.com/meshplus/bitxhub-model/pb"
|
||||
|
@ -16,10 +19,6 @@ import (
|
|||
raftproto "github.com/meshplus/bitxhub/pkg/order/etcdraft/proto"
|
||||
"github.com/meshplus/bitxhub/pkg/order/mempool"
|
||||
"github.com/meshplus/bitxhub/pkg/peermgr"
|
||||
|
||||
"github.com/coreos/etcd/raft"
|
||||
"github.com/coreos/etcd/raft/raftpb"
|
||||
"github.com/gogo/protobuf/sortkeys"
|
||||
"github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
|
@ -52,10 +51,14 @@ type Node struct {
|
|||
snapshotIndex uint64 // current snapshot apply index in raft log
|
||||
lastIndex uint64 // last apply index in raft log
|
||||
|
||||
readyPool *sync.Pool // ready pool, avoiding memory growth fast
|
||||
readyCache sync.Map // ready cache
|
||||
ctx context.Context // context
|
||||
haltC chan struct{} // exit signal
|
||||
readyPool *sync.Pool // ready pool, avoiding memory growth fast
|
||||
readyCache sync.Map // ready cache
|
||||
|
||||
justElected bool
|
||||
isRestart bool
|
||||
|
||||
ctx context.Context // context
|
||||
haltC chan struct{} // exit signal
|
||||
|
||||
}
|
||||
|
||||
|
@ -136,6 +139,7 @@ func (n *Node) Start() error {
|
|||
}
|
||||
if restart {
|
||||
n.node = raft.RestartNode(rc)
|
||||
n.isRestart = true
|
||||
} else {
|
||||
n.node = raft.StartNode(rc, n.peers)
|
||||
}
|
||||
|
@ -175,17 +179,17 @@ func (n *Node) ReportState(height uint64, hash types.Hash) {
|
|||
}
|
||||
appliedIndex, ok := n.blockAppliedIndex.Load(height)
|
||||
if !ok {
|
||||
n.logger.Errorf("can not found appliedIndex:", height)
|
||||
n.logger.Debugf("can not found appliedIndex:", height)
|
||||
return
|
||||
}
|
||||
// block already persisted, record the apply index in db
|
||||
n.writeAppliedIndex(appliedIndex.(uint64))
|
||||
n.blockAppliedIndex.Delete(height)
|
||||
n.blockAppliedIndex.Delete(height - 1)
|
||||
|
||||
// TODO: delete readyCache
|
||||
readyBytes, ok := n.readyCache.Load(height)
|
||||
if !ok {
|
||||
n.logger.Errorf("can not found ready:", height)
|
||||
n.logger.Debugf("can not found ready:", height)
|
||||
return
|
||||
}
|
||||
ready := readyBytes.(*raftproto.Ready)
|
||||
|
@ -273,17 +277,13 @@ func (n *Node) run() {
|
|||
if !ok {
|
||||
n.proposeC = nil
|
||||
} else {
|
||||
if !n.IsLeader() {
|
||||
n.logger.Warn("Follower node can't propose a proposal")
|
||||
n.mempool.UpdateLeader(n.leader)
|
||||
continue
|
||||
}
|
||||
data, err := ready.Marshal()
|
||||
if err != nil {
|
||||
n.logger.Panic(err)
|
||||
}
|
||||
n.logger.Debugf("Proposed block %d to raft core consensus", ready.Height)
|
||||
if err := n.node.Propose(n.ctx, data); err != nil {
|
||||
n.logger.Panic("Failed to propose block [%d] to raft: %s", ready.Height, err)
|
||||
n.logger.Errorf("Failed to propose block [%d] to raft: %s", ready.Height, err)
|
||||
}
|
||||
}
|
||||
case cc, ok := <-n.confChangeC:
|
||||
|
@ -293,7 +293,7 @@ func (n *Node) run() {
|
|||
confChangeCount++
|
||||
cc.ID = confChangeCount
|
||||
if err := n.node.ProposeConfChange(n.ctx, cc); err != nil {
|
||||
n.logger.Panic("Failed to propose configuration update to Raft node: %s", err)
|
||||
n.logger.Errorf("Failed to propose configuration update to Raft node: %s", err)
|
||||
}
|
||||
}
|
||||
case <-n.ctx.Done():
|
||||
|
@ -314,9 +314,30 @@ func (n *Node) run() {
|
|||
// 1: Write HardState, Entries, and Snapshot to persistent storage if they
|
||||
// are not empty.
|
||||
if err := n.raftStorage.Store(rd.Entries, rd.HardState, rd.Snapshot); err != nil {
|
||||
n.logger.Fatalf("failed to persist etcd/raft data: %s", err)
|
||||
n.logger.Errorf("failed to persist etcd/raft data: %s", err)
|
||||
}
|
||||
|
||||
if rd.SoftState != nil {
|
||||
newLeader := atomic.LoadUint64(&rd.SoftState.Lead)
|
||||
if newLeader != n.leader {
|
||||
n.logger.Infof("Raft leader changed: %d -> %d", n.leader, newLeader)
|
||||
oldLeader := n.leader
|
||||
n.leader = newLeader
|
||||
if newLeader == n.id {
|
||||
// If the cluster is started for the first time, the leader node starts listening requests directly.
|
||||
if !n.isRestart && n.getBlockAppliedIndex() == uint64(0) {
|
||||
n.mempool.UpdateLeader(n.leader)
|
||||
} else {
|
||||
// new leader should not serve requests
|
||||
n.justElected = true
|
||||
}
|
||||
}
|
||||
// old leader node stop batch block
|
||||
if oldLeader == n.id {
|
||||
n.mempool.UpdateLeader(n.leader)
|
||||
}
|
||||
}
|
||||
}
|
||||
// 2: Apply Snapshot (if any) and CommittedEntries to the state machine.
|
||||
if len(rd.CommittedEntries) != 0 {
|
||||
if ok := n.publishEntries(n.entriesToApply(rd.CommittedEntries)); !ok {
|
||||
|
@ -324,13 +345,19 @@ func (n *Node) run() {
|
|||
return
|
||||
}
|
||||
}
|
||||
if rd.SoftState != nil {
|
||||
newLeader := atomic.LoadUint64(&rd.SoftState.Lead)
|
||||
n.leader = newLeader
|
||||
n.mempool.UpdateLeader(newLeader)
|
||||
|
||||
if n.justElected {
|
||||
msgInflight := n.ramLastIndex() > n.appliedIndex+1
|
||||
if msgInflight {
|
||||
n.logger.Debugf("There are in flight blocks, new leader should not serve requests")
|
||||
continue
|
||||
}
|
||||
n.justElected = false
|
||||
n.mempool.UpdateLeader(n.leader)
|
||||
}
|
||||
|
||||
// 3: AsyncSend all Messages to the nodes named in the To field.
|
||||
go n.send(rd.Messages)
|
||||
n.send(rd.Messages)
|
||||
|
||||
n.maybeTriggerSnapshot()
|
||||
|
||||
|
@ -342,46 +369,57 @@ func (n *Node) run() {
|
|||
}
|
||||
}
|
||||
|
||||
func (n *Node) ramLastIndex() uint64 {
|
||||
i, _ := n.raftStorage.ram.LastIndex()
|
||||
n.logger.Infof("New Leader's last index is %d, appliedIndex is %d", i, n.appliedIndex)
|
||||
return i
|
||||
}
|
||||
|
||||
// send raft consensus message
|
||||
func (n *Node) send(messages []raftpb.Message) {
|
||||
for _, msg := range messages {
|
||||
if msg.To == 0 {
|
||||
continue
|
||||
}
|
||||
status := raft.SnapshotFinish
|
||||
go func(msg raftpb.Message) {
|
||||
if msg.To == 0 {
|
||||
return
|
||||
}
|
||||
status := raft.SnapshotFinish
|
||||
|
||||
data, err := (&msg).Marshal()
|
||||
if err != nil {
|
||||
n.logger.Error(err)
|
||||
continue
|
||||
}
|
||||
data, err := (&msg).Marshal()
|
||||
if err != nil {
|
||||
n.logger.Error(err)
|
||||
return
|
||||
}
|
||||
|
||||
rm := &raftproto.RaftMessage{
|
||||
Type: raftproto.RaftMessage_CONSENSUS,
|
||||
Data: data,
|
||||
}
|
||||
rmData, err := rm.Marshal()
|
||||
if err != nil {
|
||||
n.logger.Error(err)
|
||||
continue
|
||||
}
|
||||
p2pMsg := &pb.Message{
|
||||
Type: pb.Message_CONSENSUS,
|
||||
Data: rmData,
|
||||
}
|
||||
rm := &raftproto.RaftMessage{
|
||||
Type: raftproto.RaftMessage_CONSENSUS,
|
||||
Data: data,
|
||||
}
|
||||
rmData, err := rm.Marshal()
|
||||
if err != nil {
|
||||
n.logger.Error(err)
|
||||
return
|
||||
}
|
||||
p2pMsg := &pb.Message{
|
||||
Type: pb.Message_CONSENSUS,
|
||||
Data: rmData,
|
||||
}
|
||||
|
||||
err = n.peerMgr.AsyncSend(msg.To, p2pMsg)
|
||||
if err != nil {
|
||||
n.logger.WithFields(logrus.Fields{
|
||||
"from": msg.From,
|
||||
}).Debug(err)
|
||||
n.node.ReportUnreachable(msg.To)
|
||||
status = raft.SnapshotFailure
|
||||
}
|
||||
err = n.peerMgr.AsyncSend(msg.To, p2pMsg)
|
||||
if err != nil {
|
||||
n.logger.WithFields(logrus.Fields{
|
||||
"from": n.id,
|
||||
"to": msg.To,
|
||||
"msg_type": msg.Type,
|
||||
"err": err.Error(),
|
||||
}).Debugf("async send msg")
|
||||
n.node.ReportUnreachable(msg.To)
|
||||
status = raft.SnapshotFailure
|
||||
}
|
||||
|
||||
if msg.Type == raftpb.MsgSnap {
|
||||
n.node.ReportSnapshot(msg.To, status)
|
||||
}
|
||||
if msg.Type == raftpb.MsgSnap {
|
||||
n.node.ReportSnapshot(msg.To, status)
|
||||
}
|
||||
}(msg)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -394,11 +432,6 @@ func (n *Node) publishEntries(ents []raftpb.Entry) bool {
|
|||
break
|
||||
}
|
||||
|
||||
ready := n.readyPool.Get().(*raftproto.Ready)
|
||||
if err := ready.Unmarshal(ents[i].Data); err != nil {
|
||||
n.logger.Error(err)
|
||||
continue
|
||||
}
|
||||
// This can happen:
|
||||
//
|
||||
// if (1) we crashed after applying this block to the chain, but
|
||||
|
@ -414,6 +447,12 @@ func (n *Node) publishEntries(ents []raftpb.Entry) bool {
|
|||
continue
|
||||
}
|
||||
|
||||
ready := n.readyPool.Get().(*raftproto.Ready)
|
||||
if err := ready.Unmarshal(ents[i].Data); err != nil {
|
||||
n.logger.Error(err)
|
||||
continue
|
||||
}
|
||||
|
||||
n.mint(ready)
|
||||
n.blockAppliedIndex.Store(ready.Height, ents[i].Index)
|
||||
case raftpb.EntryConfChange:
|
||||
|
|
|
@ -0,0 +1,31 @@
|
|||
package solo
|
||||
|
||||
import (
|
||||
"time"
|
||||
)
|
||||
|
||||
type RAFTConfig struct {
|
||||
RAFT RAFT
|
||||
}
|
||||
|
||||
type MempoolConfig struct {
|
||||
BatchSize uint64 `mapstructure:"batch_size"`
|
||||
PoolSize uint64 `mapstructure:"pool_size"`
|
||||
TxSliceSize uint64 `mapstructure:"tx_slice_size"`
|
||||
|
||||
BatchTick time.Duration `mapstructure:"batch_tick"`
|
||||
FetchTimeout time.Duration `mapstructure:"fetch_timeout"`
|
||||
TxSliceTimeout time.Duration `mapstructure:"tx_slice_timeout"`
|
||||
}
|
||||
|
||||
type RAFT struct {
|
||||
TickTimeout time.Duration `mapstructure:"tick_timeout"`
|
||||
ElectionTick int `mapstructure:"election_tick"`
|
||||
HeartbeatTick int `mapstructure:"heartbeat_tick"`
|
||||
MaxSizePerMsg uint64 `mapstructure:"max_size_per_msg"`
|
||||
MaxInflightMsgs int `mapstructure:"max_inflight_msgs"`
|
||||
CheckQuorum bool `mapstructure:"check_quorum"`
|
||||
PreVote bool `mapstructure:"pre_vote"`
|
||||
DisableProposalForwarding bool `mapstructure:"disable_proposal_forwarding"`
|
||||
MempoolConfig MempoolConfig `mapstructure:"mempool"`
|
||||
}
|
|
@ -1,27 +1,36 @@
|
|||
package solo
|
||||
|
||||
import (
|
||||
"container/list"
|
||||
"context"
|
||||
"fmt"
|
||||
"path/filepath"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/golang/mock/gomock"
|
||||
"github.com/libp2p/go-libp2p-core/peer"
|
||||
"github.com/meshplus/bitxhub-kit/storage/leveldb"
|
||||
"github.com/meshplus/bitxhub-kit/types"
|
||||
"github.com/meshplus/bitxhub-model/pb"
|
||||
"github.com/meshplus/bitxhub/pkg/order"
|
||||
raftproto "github.com/meshplus/bitxhub/pkg/order/etcdraft/proto"
|
||||
"github.com/meshplus/bitxhub/pkg/order/mempool"
|
||||
"github.com/meshplus/bitxhub/pkg/peermgr/mock_peermgr"
|
||||
"github.com/sirupsen/logrus"
|
||||
"github.com/spf13/viper"
|
||||
)
|
||||
|
||||
type Node struct {
|
||||
ID uint64
|
||||
sync.RWMutex
|
||||
height uint64 // current block height
|
||||
pendingTxs *list.List // pending tx pool
|
||||
commitC chan *pb.Block // block channel
|
||||
logger logrus.FieldLogger // logger
|
||||
reqLookUp *order.ReqLookUp // bloom filter
|
||||
getTransactionFunc func(hash *types.Hash) (*pb.Transaction, error)
|
||||
mempool mempool.MemPool // transaction pool
|
||||
proposeC chan *raftproto.Ready // proposed listenReadyBlock, input channel
|
||||
|
||||
packSize int // maximum number of transaction packages
|
||||
blockTick time.Duration // block packed period
|
||||
|
@ -31,40 +40,28 @@ type Node struct {
|
|||
}
|
||||
|
||||
func (n *Node) Start() error {
|
||||
go n.execute()
|
||||
go n.listenReadyBlock()
|
||||
if err := n.mempool.Start(); err != nil {
|
||||
return err
|
||||
}
|
||||
n.mempool.UpdateLeader(n.ID)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (n *Node) Stop() {
|
||||
n.mempool.Stop()
|
||||
n.cancel()
|
||||
}
|
||||
|
||||
func (n *Node) GetPendingNonceByAccount(account string) uint64 {
|
||||
// TODO: implement me
|
||||
return 1
|
||||
return n.mempool.GetPendingNonceByAccount(account)
|
||||
}
|
||||
|
||||
func (n *Node) Prepare(tx *pb.Transaction) error {
|
||||
hash := tx.TransactionHash
|
||||
if ok := n.reqLookUp.LookUp(hash.Bytes()); ok {
|
||||
if tx, _ := n.getTransactionFunc(hash); tx != nil {
|
||||
return nil
|
||||
}
|
||||
if !n.Ready() {
|
||||
return nil
|
||||
}
|
||||
n.pushBack(tx)
|
||||
if n.PoolSize() >= n.packSize {
|
||||
if r := n.ready(); r != nil {
|
||||
n.commitC <- r
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
//Current txpool's size
|
||||
func (n *Node) PoolSize() int {
|
||||
n.RLock()
|
||||
defer n.RUnlock()
|
||||
return n.pendingTxs.Len()
|
||||
return n.mempool.RecvTransaction(tx)
|
||||
}
|
||||
|
||||
func (n *Node) Commit() chan *pb.Block {
|
||||
|
@ -110,73 +107,100 @@ func NewNode(opts ...order.Option) (order.Order, error) {
|
|||
return nil, fmt.Errorf("new bloom filter: %w", err)
|
||||
}
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
|
||||
mockCtl := gomock.NewController(&testing.T{})
|
||||
peerMgr := mock_peermgr.NewMockPeerManager(mockCtl)
|
||||
peerMgr.EXPECT().Peers().Return(map[uint64]*peer.AddrInfo{}).AnyTimes()
|
||||
memConfig, err := generateMempoolConfig(config.RepoRoot)
|
||||
mempoolConf := &mempool.Config{
|
||||
ID: config.ID,
|
||||
ChainHeight: config.Applied,
|
||||
GetTransactionFunc: config.GetTransactionFunc,
|
||||
PeerMgr: peerMgr,
|
||||
Logger: config.Logger,
|
||||
|
||||
BatchSize: memConfig.BatchSize,
|
||||
BatchTick: memConfig.BatchTick,
|
||||
PoolSize: memConfig.PoolSize,
|
||||
TxSliceSize: memConfig.TxSliceSize,
|
||||
FetchTimeout: memConfig.FetchTimeout,
|
||||
TxSliceTimeout: memConfig.TxSliceTimeout,
|
||||
}
|
||||
batchC := make(chan *raftproto.Ready, 10)
|
||||
mempoolInst := mempool.NewMempool(mempoolConf, storage, batchC)
|
||||
return &Node{
|
||||
ID: config.ID,
|
||||
height: config.Applied,
|
||||
pendingTxs: list.New(),
|
||||
commitC: make(chan *pb.Block, 1024),
|
||||
packSize: 500,
|
||||
blockTick: 500 * time.Millisecond,
|
||||
reqLookUp: reqLookUp,
|
||||
getTransactionFunc: config.GetTransactionFunc,
|
||||
mempool: mempoolInst,
|
||||
proposeC: batchC,
|
||||
logger: config.Logger,
|
||||
ctx: ctx,
|
||||
cancel: cancel,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Schedule to collect txs to the ready channel
|
||||
func (n *Node) execute() {
|
||||
ticker := time.NewTicker(n.blockTick)
|
||||
defer ticker.Stop()
|
||||
// Schedule to collect txs to the listenReadyBlock channel
|
||||
func (n *Node) listenReadyBlock() {
|
||||
for {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
if r := n.ready(); r != nil {
|
||||
n.commitC <- r
|
||||
}
|
||||
case <-n.ctx.Done():
|
||||
n.logger.Infoln("Done txpool execute")
|
||||
n.logger.Info("----- Exit listen ready block loop -----")
|
||||
return
|
||||
case proposal := <-n.proposeC:
|
||||
n.logger.WithFields(logrus.Fields{
|
||||
"proposal_height": proposal.Height,
|
||||
"tx_count": len(proposal.TxHashes),
|
||||
}).Debugf("Receive proposal from mempool")
|
||||
// collect txs from proposalC
|
||||
_, txs := n.mempool.GetBlockByHashList(proposal)
|
||||
n.height++
|
||||
|
||||
block := &pb.Block{
|
||||
BlockHeader: &pb.BlockHeader{
|
||||
Version: []byte("1.0.0"),
|
||||
Number: n.height,
|
||||
Timestamp: time.Now().UnixNano(),
|
||||
},
|
||||
Transactions: txs,
|
||||
}
|
||||
n.mempool.CommitTransactions(proposal)
|
||||
n.mempool.IncreaseChainHeight()
|
||||
n.commitC <- block
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func (n *Node) ready() *pb.Block {
|
||||
n.Lock()
|
||||
defer n.Unlock()
|
||||
l := n.pendingTxs.Len()
|
||||
if l == 0 {
|
||||
return nil
|
||||
func generateMempoolConfig(repoRoot string) (*MempoolConfig, error) {
|
||||
readConfig, err := readConfig(repoRoot)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
var size int
|
||||
if l > n.packSize {
|
||||
size = n.packSize
|
||||
} else {
|
||||
size = l
|
||||
}
|
||||
txs := make([]*pb.Transaction, 0, size)
|
||||
for i := 0; i < size; i++ {
|
||||
front := n.pendingTxs.Front()
|
||||
tx := front.Value.(*pb.Transaction)
|
||||
txs = append(txs, tx)
|
||||
n.pendingTxs.Remove(front)
|
||||
}
|
||||
n.height++
|
||||
|
||||
block := &pb.Block{
|
||||
BlockHeader: &pb.BlockHeader{
|
||||
Version: []byte("1.0.0"),
|
||||
Number: n.height,
|
||||
Timestamp: time.Now().UnixNano(),
|
||||
},
|
||||
Transactions: txs,
|
||||
}
|
||||
return block
|
||||
mempoolConf := &MempoolConfig{}
|
||||
mempoolConf.BatchSize = readConfig.RAFT.MempoolConfig.BatchSize
|
||||
mempoolConf.PoolSize = readConfig.RAFT.MempoolConfig.PoolSize
|
||||
mempoolConf.TxSliceSize = readConfig.RAFT.MempoolConfig.TxSliceSize
|
||||
mempoolConf.BatchTick = readConfig.RAFT.MempoolConfig.BatchTick
|
||||
mempoolConf.FetchTimeout = readConfig.RAFT.MempoolConfig.FetchTimeout
|
||||
mempoolConf.TxSliceTimeout = readConfig.RAFT.MempoolConfig.TxSliceTimeout
|
||||
return mempoolConf, nil
|
||||
}
|
||||
|
||||
func (n *Node) pushBack(value interface{}) *list.Element {
|
||||
n.Lock()
|
||||
defer n.Unlock()
|
||||
return n.pendingTxs.PushBack(value)
|
||||
func readConfig(repoRoot string) (*RAFTConfig, error) {
|
||||
v := viper.New()
|
||||
v.SetConfigFile(filepath.Join(repoRoot, "order.toml"))
|
||||
v.SetConfigType("toml")
|
||||
if err := v.ReadInConfig(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
config := &RAFTConfig{}
|
||||
|
||||
if err := v.Unmarshal(config); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return config, nil
|
||||
}
|
||||
|
|
|
@ -2,11 +2,13 @@ package solo
|
|||
|
||||
import (
|
||||
"io/ioutil"
|
||||
"math/rand"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/golang/mock/gomock"
|
||||
"github.com/libp2p/go-libp2p-core/peer"
|
||||
"github.com/meshplus/bitxhub-kit/crypto"
|
||||
"github.com/meshplus/bitxhub-kit/crypto/asym"
|
||||
"github.com/meshplus/bitxhub-kit/log"
|
||||
|
@ -14,6 +16,7 @@ import (
|
|||
"github.com/meshplus/bitxhub-model/pb"
|
||||
"github.com/meshplus/bitxhub/internal/repo"
|
||||
"github.com/meshplus/bitxhub/pkg/order"
|
||||
"github.com/meshplus/bitxhub/pkg/peermgr/mock_peermgr"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
@ -24,11 +27,31 @@ func TestNode_Start(t *testing.T) {
|
|||
repoRoot, err := ioutil.TempDir("", "node")
|
||||
defer os.RemoveAll(repoRoot)
|
||||
assert.Nil(t, err)
|
||||
|
||||
// write config file for order module
|
||||
fileData, err := ioutil.ReadFile("../../../config/order.toml")
|
||||
require.Nil(t, err)
|
||||
err = ioutil.WriteFile(filepath.Join(repoRoot, "order.toml"), fileData, 0644)
|
||||
require.Nil(t, err)
|
||||
|
||||
mockCtl := gomock.NewController(t)
|
||||
mockPeermgr := mock_peermgr.NewMockPeerManager(mockCtl)
|
||||
peers := make(map[uint64]*peer.AddrInfo)
|
||||
mockPeermgr.EXPECT().Peers().Return(peers).AnyTimes()
|
||||
|
||||
nodes := make(map[uint64]types.Address)
|
||||
hash := types.NewAddressByStr("000000000000000000000000000000000000000a")
|
||||
nodes[1] = *hash
|
||||
|
||||
order, err := NewNode(
|
||||
order.WithRepoRoot(repoRoot),
|
||||
order.WithStoragePath(repo.GetStoragePath(repoRoot, "order")),
|
||||
order.WithLogger(log.NewWithModule("consensus")),
|
||||
order.WithApplied(1),
|
||||
order.WithPeerManager(mockPeermgr),
|
||||
order.WithID(1),
|
||||
order.WithNodes(nodes),
|
||||
order.WithApplied(1),
|
||||
)
|
||||
require.Nil(t, err)
|
||||
|
||||
|
@ -42,10 +65,10 @@ func TestNode_Start(t *testing.T) {
|
|||
require.Nil(t, err)
|
||||
|
||||
tx := &pb.Transaction{
|
||||
From: from,
|
||||
To: types.NewAddressByStr(to),
|
||||
From: from,
|
||||
To: types.NewAddressByStr(to),
|
||||
Timestamp: time.Now().UnixNano(),
|
||||
Nonce: uint64(rand.Int63()),
|
||||
Nonce: 1,
|
||||
}
|
||||
tx.TransactionHash = tx.Hash()
|
||||
err = tx.Sign(privKey)
|
||||
|
|
|
@ -0,0 +1,165 @@
|
|||
// Code generated by MockGen. DO NOT EDIT.
|
||||
// Source: peermgr.go
|
||||
|
||||
// Package mock_peermgr is a generated GoMock package.
|
||||
package mock_peermgr
|
||||
|
||||
import (
|
||||
event "github.com/ethereum/go-ethereum/event"
|
||||
gomock "github.com/golang/mock/gomock"
|
||||
peer "github.com/libp2p/go-libp2p-core/peer"
|
||||
pb "github.com/meshplus/bitxhub-model/pb"
|
||||
events "github.com/meshplus/bitxhub/internal/model/events"
|
||||
network "github.com/meshplus/go-lightp2p"
|
||||
reflect "reflect"
|
||||
)
|
||||
|
||||
// MockPeerManager is a mock of PeerManager interface
|
||||
type MockPeerManager struct {
|
||||
ctrl *gomock.Controller
|
||||
recorder *MockPeerManagerMockRecorder
|
||||
}
|
||||
|
||||
// MockPeerManagerMockRecorder is the mock recorder for MockPeerManager
|
||||
type MockPeerManagerMockRecorder struct {
|
||||
mock *MockPeerManager
|
||||
}
|
||||
|
||||
// NewMockPeerManager creates a new mock instance
|
||||
func NewMockPeerManager(ctrl *gomock.Controller) *MockPeerManager {
|
||||
mock := &MockPeerManager{ctrl: ctrl}
|
||||
mock.recorder = &MockPeerManagerMockRecorder{mock}
|
||||
return mock
|
||||
}
|
||||
|
||||
// EXPECT returns an object that allows the caller to indicate expected use
|
||||
func (m *MockPeerManager) EXPECT() *MockPeerManagerMockRecorder {
|
||||
return m.recorder
|
||||
}
|
||||
|
||||
// Start mocks base method
|
||||
func (m *MockPeerManager) Start() error {
|
||||
m.ctrl.T.Helper()
|
||||
ret := m.ctrl.Call(m, "Start")
|
||||
ret0, _ := ret[0].(error)
|
||||
return ret0
|
||||
}
|
||||
|
||||
// Start indicates an expected call of Start
|
||||
func (mr *MockPeerManagerMockRecorder) Start() *gomock.Call {
|
||||
mr.mock.ctrl.T.Helper()
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Start", reflect.TypeOf((*MockPeerManager)(nil).Start))
|
||||
}
|
||||
|
||||
// Stop mocks base method
|
||||
func (m *MockPeerManager) Stop() error {
|
||||
m.ctrl.T.Helper()
|
||||
ret := m.ctrl.Call(m, "Stop")
|
||||
ret0, _ := ret[0].(error)
|
||||
return ret0
|
||||
}
|
||||
|
||||
// Stop indicates an expected call of Stop
|
||||
func (mr *MockPeerManagerMockRecorder) Stop() *gomock.Call {
|
||||
mr.mock.ctrl.T.Helper()
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Stop", reflect.TypeOf((*MockPeerManager)(nil).Stop))
|
||||
}
|
||||
|
||||
// AsyncSend mocks base method
|
||||
func (m *MockPeerManager) AsyncSend(arg0 uint64, arg1 *pb.Message) error {
|
||||
m.ctrl.T.Helper()
|
||||
ret := m.ctrl.Call(m, "AsyncSend", arg0, arg1)
|
||||
ret0, _ := ret[0].(error)
|
||||
return ret0
|
||||
}
|
||||
|
||||
// AsyncSend indicates an expected call of AsyncSend
|
||||
func (mr *MockPeerManagerMockRecorder) AsyncSend(arg0, arg1 interface{}) *gomock.Call {
|
||||
mr.mock.ctrl.T.Helper()
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AsyncSend", reflect.TypeOf((*MockPeerManager)(nil).AsyncSend), arg0, arg1)
|
||||
}
|
||||
|
||||
// SendWithStream mocks base method
|
||||
func (m *MockPeerManager) SendWithStream(arg0 network.Stream, arg1 *pb.Message) error {
|
||||
m.ctrl.T.Helper()
|
||||
ret := m.ctrl.Call(m, "SendWithStream", arg0, arg1)
|
||||
ret0, _ := ret[0].(error)
|
||||
return ret0
|
||||
}
|
||||
|
||||
// SendWithStream indicates an expected call of SendWithStream
|
||||
func (mr *MockPeerManagerMockRecorder) SendWithStream(arg0, arg1 interface{}) *gomock.Call {
|
||||
mr.mock.ctrl.T.Helper()
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SendWithStream", reflect.TypeOf((*MockPeerManager)(nil).SendWithStream), arg0, arg1)
|
||||
}
|
||||
|
||||
// Send mocks base method
|
||||
func (m *MockPeerManager) Send(arg0 uint64, arg1 *pb.Message) (*pb.Message, error) {
|
||||
m.ctrl.T.Helper()
|
||||
ret := m.ctrl.Call(m, "Send", arg0, arg1)
|
||||
ret0, _ := ret[0].(*pb.Message)
|
||||
ret1, _ := ret[1].(error)
|
||||
return ret0, ret1
|
||||
}
|
||||
|
||||
// Send indicates an expected call of Send
|
||||
func (mr *MockPeerManagerMockRecorder) Send(arg0, arg1 interface{}) *gomock.Call {
|
||||
mr.mock.ctrl.T.Helper()
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Send", reflect.TypeOf((*MockPeerManager)(nil).Send), arg0, arg1)
|
||||
}
|
||||
|
||||
// Broadcast mocks base method
|
||||
func (m *MockPeerManager) Broadcast(arg0 *pb.Message) error {
|
||||
m.ctrl.T.Helper()
|
||||
ret := m.ctrl.Call(m, "Broadcast", arg0)
|
||||
ret0, _ := ret[0].(error)
|
||||
return ret0
|
||||
}
|
||||
|
||||
// Broadcast indicates an expected call of Broadcast
|
||||
func (mr *MockPeerManagerMockRecorder) Broadcast(arg0 interface{}) *gomock.Call {
|
||||
mr.mock.ctrl.T.Helper()
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Broadcast", reflect.TypeOf((*MockPeerManager)(nil).Broadcast), arg0)
|
||||
}
|
||||
|
||||
// Peers mocks base method
|
||||
func (m *MockPeerManager) Peers() map[uint64]*peer.AddrInfo {
|
||||
m.ctrl.T.Helper()
|
||||
ret := m.ctrl.Call(m, "Peers")
|
||||
ret0, _ := ret[0].(map[uint64]*peer.AddrInfo)
|
||||
return ret0
|
||||
}
|
||||
|
||||
// Peers indicates an expected call of Peers
|
||||
func (mr *MockPeerManagerMockRecorder) Peers() *gomock.Call {
|
||||
mr.mock.ctrl.T.Helper()
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Peers", reflect.TypeOf((*MockPeerManager)(nil).Peers))
|
||||
}
|
||||
|
||||
// OtherPeers mocks base method
|
||||
func (m *MockPeerManager) OtherPeers() map[uint64]*peer.AddrInfo {
|
||||
m.ctrl.T.Helper()
|
||||
ret := m.ctrl.Call(m, "OtherPeers")
|
||||
ret0, _ := ret[0].(map[uint64]*peer.AddrInfo)
|
||||
return ret0
|
||||
}
|
||||
|
||||
// OtherPeers indicates an expected call of OtherPeers
|
||||
func (mr *MockPeerManagerMockRecorder) OtherPeers() *gomock.Call {
|
||||
mr.mock.ctrl.T.Helper()
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "OtherPeers", reflect.TypeOf((*MockPeerManager)(nil).OtherPeers))
|
||||
}
|
||||
|
||||
// SubscribeOrderMessage mocks base method
|
||||
func (m *MockPeerManager) SubscribeOrderMessage(ch chan<- events.OrderMessageEvent) event.Subscription {
|
||||
m.ctrl.T.Helper()
|
||||
ret := m.ctrl.Call(m, "SubscribeOrderMessage", ch)
|
||||
ret0, _ := ret[0].(event.Subscription)
|
||||
return ret0
|
||||
}
|
||||
|
||||
// SubscribeOrderMessage indicates an expected call of SubscribeOrderMessage
|
||||
func (mr *MockPeerManagerMockRecorder) SubscribeOrderMessage(ch interface{}) *gomock.Call {
|
||||
mr.mock.ctrl.T.Helper()
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SubscribeOrderMessage", reflect.TypeOf((*MockPeerManager)(nil).SubscribeOrderMessage), ch)
|
||||
}
|
|
@ -87,7 +87,7 @@ func (swarm *Swarm) Start() error {
|
|||
return err
|
||||
}
|
||||
|
||||
if err := swarm.verifyCert(id); err != nil {
|
||||
if err := swarm.verifyCertOrDisconnect(id); err != nil {
|
||||
if attempt != 0 && attempt%5 == 0 {
|
||||
swarm.logger.WithFields(logrus.Fields{
|
||||
"node": id,
|
||||
|
@ -126,6 +126,17 @@ func (swarm *Swarm) Stop() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (swarm *Swarm) verifyCertOrDisconnect(id uint64) error {
|
||||
if err := swarm.verifyCert(id); err != nil {
|
||||
if err = swarm.p2p.Disconnect(swarm.peers[id].ID.String()); err != nil {
|
||||
return err
|
||||
}
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
func (swarm *Swarm) Ping() {
|
||||
ticker := time.NewTicker(swarm.pingTimeout)
|
||||
for {
|
||||
|
@ -222,8 +233,9 @@ func (swarm *Swarm) Peers() map[uint64]*peer.AddrInfo {
|
|||
|
||||
func (swarm *Swarm) OtherPeers() map[uint64]*peer.AddrInfo {
|
||||
m := swarm.Peers()
|
||||
delete(m, swarm.repo.NetworkConfig.ID)
|
||||
|
||||
if swarm.repo != nil {
|
||||
delete(m, swarm.repo.NetworkConfig.ID)
|
||||
}
|
||||
return m
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue