diff --git a/config/order.toml b/config/order.toml index b99ff9d..7bf048e 100644 --- a/config/order.toml +++ b/config/order.toml @@ -1,5 +1,6 @@ [raft] batch_timeout = "0.3s" # Block packaging time period. +check_interval = "3m" # Node check interval to check if there exist txs to be rebroadcast tick_timeout = "0.1s" # TickTimeout is the internal logical clock for the Node by a single tick, Election timeouts and heartbeat timeouts are in units of ticks. election_tick = 10 # ElectionTick is the number of Node.Tick invocations that must pass between elections. heartbeat_tick = 1 # HeartbeatTick is the number of Node.Tick invocations that must pass between heartbeats. diff --git a/go.sum b/go.sum index f07c41b..a3d2334 100644 --- a/go.sum +++ b/go.sum @@ -40,6 +40,7 @@ github.com/Shopify/sarama v1.27.0/go.mod h1:aCdj6ymI8uyPEux1JJ9gcaDT6cinjGhNCAhs github.com/Shopify/toxiproxy v2.1.4+incompatible h1:TKdv8HiTLgE5wdJuEML90aBgNWsokNbMijUGhmcoBJc= github.com/Shopify/toxiproxy v2.1.4+incompatible/go.mod h1:OXgGpZ6Cli1/URJOF1DMxUHB2q5Ap20/P/eIdh4G0pI= github.com/StackExchange/wmi v0.0.0-20180116203802-5d049714c4a6/go.mod h1:3eOhrUMpNV+6aFIbp5/iudMxNCF27Vw2OZgy4xEx0Fg= +github.com/VictoriaMetrics/fastcache v1.5.7 h1:4y6y0G8PRzszQUYIQHHssv/jgPHAb5qQuuDNdCbyAgw= github.com/VictoriaMetrics/fastcache v1.5.7/go.mod h1:ptDBkNMQI4RtmVo8VS/XwRY6RoTu1dAWCbrk+6WsEM8= github.com/aead/siphash v1.0.1/go.mod h1:Nywa3cDsYNNK3gaciGTWPwHt0wlpNV15vwmswBAUSII= github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= @@ -782,6 +783,7 @@ github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQD github.com/ryanuber/columnize v0.0.0-20160712163229-9b3edd62028f/go.mod h1:sm1tb6uqfes/u+d4ooFouqFdy9/2g9QGwK3SQygK0Ts= github.com/satori/go.uuid v1.2.0/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0= github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg65j358z/aeFdxmN0P9QXhEzd20vsDc= +github.com/shirou/gopsutil v2.20.5+incompatible h1:tYH07UPoQt0OCQdgWWMgYHy3/a9bcxNpBIysykNIP7I= github.com/shirou/gopsutil v2.20.5+incompatible/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA= github.com/shurcooL/sanitized_anchor_name v1.0.0 h1:PdmoCO6wvbs+7yrJyMORt4/BmY5IYyJwS/kOiWx8mHo= github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc= @@ -820,7 +822,9 @@ github.com/spf13/viper v1.7.1 h1:pM5oEahlgWv/WnHXpgbKz7iLIxRf65tye2Ci+XFK5sk= github.com/spf13/viper v1.7.1/go.mod h1:8WkrPz2fc9jxqZNCJI/76HCieCp4Q8HaLFoCha5qpdg= github.com/src-d/envconfig v1.0.0/go.mod h1:Q9YQZ7BKITldTBnoxsE5gOeB5y66RyPXeue/R4aaNBc= github.com/status-im/keycard-go v0.0.0-20190316090335-8537d3370df4/go.mod h1:RZLeN1LMWmRsyYjvAu+I6Dm9QmlDaIIt+Y+4Kd7Tp+Q= +github.com/steakknife/bloomfilter v0.0.0-20180922174646-6819c0d2a570 h1:gIlAHnH1vJb5vwEjIp5kBj/eu99p/bl0Ay2goiPe5xE= github.com/steakknife/bloomfilter v0.0.0-20180922174646-6819c0d2a570/go.mod h1:8OR4w3TdeIHIh1g6EMY5p0gVNOovcWC+1vpc7naMuAw= +github.com/steakknife/hamming v0.0.0-20180906055917-c99c65617cd3 h1:njlZPzLwU639dk2kqnCPPv+wNjq7Xb6EfUxe/oX0/NM= github.com/steakknife/hamming v0.0.0-20180906055917-c99c65617cd3/go.mod h1:hpGUWaI9xL8pRQCTXQgocU38Qw1g0Us7n5PxxTwTCYU= github.com/streadway/amqp v0.0.0-20190827072141-edfb9018d271/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= diff --git a/pkg/order/etcdraft/config.go b/pkg/order/etcdraft/config.go index 87f0d12..3ed5f20 100644 --- a/pkg/order/etcdraft/config.go +++ b/pkg/order/etcdraft/config.go @@ -27,6 +27,7 @@ type SyncerConfig struct { type RAFT struct { BatchTimeout time.Duration `mapstructure:"batch_timeout"` + CheckInterval time.Duration `mapstructure:"check_interval"` TickTimeout time.Duration `mapstructure:"tick_timeout"` ElectionTick int `mapstructure:"election_tick"` HeartbeatTick int `mapstructure:"heartbeat_tick"` diff --git a/pkg/order/etcdraft/node.go b/pkg/order/etcdraft/node.go index daacb60..ab50c67 100644 --- a/pkg/order/etcdraft/node.go +++ b/pkg/order/etcdraft/node.go @@ -4,7 +4,6 @@ import ( "context" "errors" "fmt" - "github.com/meshplus/bitxhub-kit/log" "path/filepath" "sync" "sync/atomic" @@ -12,6 +11,7 @@ import ( "github.com/coreos/etcd/raft" "github.com/coreos/etcd/raft/raftpb" + "github.com/meshplus/bitxhub-kit/log" "github.com/meshplus/bitxhub-kit/storage" "github.com/meshplus/bitxhub-kit/types" "github.com/meshplus/bitxhub-model/pb" @@ -39,26 +39,28 @@ type Node struct { txCache *mempool.TxCache // cache the transactions received from api batchTimerMgr *BatchTimer - proposeC chan *raftproto.RequestBatch // proposed ready, input channel - confChangeC <-chan raftpb.ConfChange // proposed cluster config changes - commitC chan *pb.CommitEvent // the hash commit channel - errorC chan<- error // errors from raft session - tickTimeout time.Duration // tick timeout - msgC chan []byte // receive messages from remote peer - stateC chan *mempool.ChainState // receive the executed block state + proposeC chan *raftproto.RequestBatch // proposed ready, input channel + confChangeC <-chan raftpb.ConfChange // proposed cluster config changes + commitC chan *pb.CommitEvent // the hash commit channel + errorC chan<- error // errors from raft session + tickTimeout time.Duration // tick timeout + checkInterval time.Duration // interval for rebroadcast + msgC chan []byte // receive messages from remote peer + stateC chan *mempool.ChainState // receive the executed block state + rebroadcastTicker chan *raftproto.TxSlice // receive the executed block state - confState raftpb.ConfState // raft requires ConfState to be persisted within snapshot - blockAppliedIndex sync.Map // mapping of block height and apply index in raft log - appliedIndex uint64 // current apply index in raft log - snapCount uint64 // snapshot count - snapshotIndex uint64 // current snapshot apply index in raft log - lastIndex uint64 // last apply index in raft log - lastExec uint64 // the index of the last-applied block - readyPool *sync.Pool // ready pool, avoiding memory growth fast - justElected bool // track new leader status + confState raftpb.ConfState // raft requires ConfState to be persisted within snapshot + blockAppliedIndex sync.Map // mapping of block height and apply index in raft log + appliedIndex uint64 // current apply index in raft log + snapCount uint64 // snapshot count + snapshotIndex uint64 // current snapshot apply index in raft log + lastIndex uint64 // last apply index in raft log + lastExec uint64 // the index of the last-applied block + readyPool *sync.Pool // ready pool, avoiding memory growth fast + justElected bool // track new leader status getChainMetaFunc func() *pb.ChainMeta // current chain meta - ctx context.Context // context - haltC chan struct{} // exit signal + ctx context.Context // context + haltC chan struct{} // exit signal } // NewNode new raft node @@ -120,6 +122,13 @@ func NewNode(opts ...order.Option) (order.Order, error) { snapCount = DefaultSnapshotCount } + var checkInterval time.Duration + if raftConfig.RAFT.CheckInterval == 0 { + checkInterval = DefaultCheckInterval + } else { + checkInterval = raftConfig.RAFT.CheckInterval + } + node := &Node{ id: config.ID, lastExec: config.Applied, @@ -142,6 +151,7 @@ func NewNode(opts ...order.Option) (order.Order, error) { readyPool: readyPool, ctx: context.Background(), mempool: mempoolInst, + checkInterval: checkInterval, } node.raftStorage.SnapshotCatchUpEntries = node.snapCount @@ -175,6 +185,7 @@ func (n *Node) Start() error { n.node = raft.StartNode(rc, n.peers) } n.tickTimeout = tickTimeout + go n.run() go n.txCache.ListenEvent() n.logger.Info("Consensus module started") @@ -249,7 +260,9 @@ func (n *Node) run() { n.snapshotIndex = snap.Metadata.Index n.appliedIndex = snap.Metadata.Index ticker := time.NewTicker(n.tickTimeout) + rebroadcastTicker := time.NewTicker(n.checkInterval) defer ticker.Stop() + defer rebroadcastTicker.Stop() // handle input request go func() { @@ -308,11 +321,24 @@ func (n *Node) run() { _ = n.peerMgr.Broadcast(pbMsg) // 2. process transactions - n.processTransactions(txSet.TxList) + n.processTransactions(txSet.TxList, true) case state := <-n.stateC: n.reportState(state) + case <-rebroadcastTicker.C: + // check periodically if there are long-pending txs in mempool + rebroadcastTxs := n.mempool.GetTimeoutTransactions(n.checkInterval) + for _, txSlice := range rebroadcastTxs { + txSet := &raftproto.TxSlice{TxList: txSlice} + data, err := txSet.Marshal() + if err != nil { + n.logger.Errorf("Marshal failed, err: %s", err.Error()) + return + } + pbMsg := msgToConsensusPbMsg(data, raftproto.RaftMessage_BROADCAST_TX, n.id) + _ = n.peerMgr.Broadcast(pbMsg) + } case <-n.batchTimerMgr.BatchTimeoutEvent(): n.batchTimerMgr.StopBatchTimer() // call txPool module to generate a tx batch @@ -386,7 +412,7 @@ func (n *Node) run() { } } -func (n *Node) processTransactions(txList []*pb.Transaction) { +func (n *Node) processTransactions(txList []*pb.Transaction, isLocal bool) { // leader node would check if this transaction triggered generating a batch or not if n.isLeader() { // start batch timer when this node receives the first transaction @@ -394,12 +420,12 @@ func (n *Node) processTransactions(txList []*pb.Transaction) { n.batchTimerMgr.StartBatchTimer() } // If this transaction triggers generating a batch, stop batch timer - if batch := n.mempool.ProcessTransactions(txList, true); batch != nil { + if batch := n.mempool.ProcessTransactions(txList, true, isLocal); batch != nil { n.batchTimerMgr.StopBatchTimer() n.postProposal(batch) } } else { - n.mempool.ProcessTransactions(txList, false) + n.mempool.ProcessTransactions(txList, false, isLocal) } } @@ -560,7 +586,7 @@ func (n *Node) processRaftCoreMsg(msg []byte) error { if err := txSlice.Unmarshal(rm.Data); err != nil { return err } - n.processTransactions(txSlice.TxList) + n.processTransactions(txSlice.TxList, false) default: return fmt.Errorf("unexpected raft message received") diff --git a/pkg/order/etcdraft/util.go b/pkg/order/etcdraft/util.go index da8ed8b..de6f26a 100644 --- a/pkg/order/etcdraft/util.go +++ b/pkg/order/etcdraft/util.go @@ -14,8 +14,9 @@ import ( ) const ( - DefaultBatchTick = 500 * time.Millisecond + DefaultBatchTick = 500 * time.Millisecond DefaultSnapshotCount = 1000 + DefaultCheckInterval = 3 * time.Minute ) func generateRaftPeers(config *order.Config) ([]raft.Peer, error) { diff --git a/pkg/order/mempool/btree_index.go b/pkg/order/mempool/btree_index.go index 964e938..79e3059 100644 --- a/pkg/order/mempool/btree_index.go +++ b/pkg/order/mempool/btree_index.go @@ -1,6 +1,8 @@ package mempool import ( + "fmt" + "github.com/meshplus/bitxhub-model/pb" "github.com/google/btree" @@ -32,6 +34,23 @@ func (snk *sortedNonceKey) Less(item btree.Item) bool { return snk.nonce < dst.nonce } +type orderedTimeoutKey struct { + account string + nonce uint64 + timestamp int64 // the timestamp of index key created +} + +func (otk *orderedTimeoutKey) Less(than btree.Item) bool { + other := than.(*orderedTimeoutKey) + if otk.timestamp != other.timestamp { + return otk.timestamp < other.timestamp + } + if otk.account != other.account { + return otk.account < other.account + } + return otk.nonce < other.nonce +} + func makeOrderedIndexKey(account string, tx *pb.Transaction) *orderedIndexKey { return &orderedIndexKey{ account: account, @@ -39,12 +58,24 @@ func makeOrderedIndexKey(account string, tx *pb.Transaction) *orderedIndexKey { } } +func makeTimeoutKey(account string, tx *pb.Transaction) *orderedTimeoutKey { + return &orderedTimeoutKey{ + account: account, + nonce: tx.Nonce, + timestamp: tx.Timestamp, + } +} + func makeSortedNonceKey(nonce uint64) *sortedNonceKey { return &sortedNonceKey{ nonce: nonce, } } +func makeAccountNonceKey(account string, nonce uint64) string { + return fmt.Sprintf("%s-%d", account, nonce) +} + type btreeIndex struct { data *btree.BTree } @@ -79,6 +110,18 @@ func (idx *btreeIndex) removeByOrderedQueueKey(txs map[string][]*pb.Transaction) } } +func (idx *btreeIndex) insertByTimeoutKey(account string, tx *pb.Transaction) { + idx.data.ReplaceOrInsert(makeTimeoutKey(account, tx)) +} + +func (idx *btreeIndex) removeByTimeoutKey(txs map[string][]*pb.Transaction) { + for account, list := range txs { + for _, tx := range list { + idx.data.Delete(makeTimeoutKey(account, tx)) + } + } +} + // Size returns the size of the index func (idx *btreeIndex) size() int { return idx.data.Len() diff --git a/pkg/order/mempool/mempool.go b/pkg/order/mempool/mempool.go index 71fb233..52da959 100644 --- a/pkg/order/mempool/mempool.go +++ b/pkg/order/mempool/mempool.go @@ -11,7 +11,7 @@ var _ MemPool = (*mempoolImpl)(nil) type MemPool interface { // ProcessTransactions process transaction from api and other vp nodes. - ProcessTransactions(txs []*pb.Transaction, isLeader bool) *raftproto.RequestBatch + ProcessTransactions(txs []*pb.Transaction, isLeader, isLocal bool) *raftproto.RequestBatch // GenerateBlock generate a block GenerateBlock() *raftproto.RequestBatch @@ -24,6 +24,8 @@ type MemPool interface { SetBatchSeqNo(batchSeq uint64) + GetTimeoutTransactions(rebroadcastDuration time.Duration) [][]*pb.Transaction + External } diff --git a/pkg/order/mempool/mempool_impl.go b/pkg/order/mempool/mempool_impl.go index 98879f4..fbbb7bb 100644 --- a/pkg/order/mempool/mempool_impl.go +++ b/pkg/order/mempool/mempool_impl.go @@ -1,7 +1,9 @@ package mempool import ( + "math" "sync" + "time" "github.com/google/btree" "github.com/meshplus/bitxhub-model/pb" @@ -10,19 +12,21 @@ import ( ) type mempoolImpl struct { - localID uint64 - batchSize uint64 - batchSeqNo uint64 // track the sequence number of block - poolSize uint64 - logger logrus.FieldLogger - txStore *transactionStore // store all transactions info + localID uint64 + batchSize uint64 + txSliceSize uint64 + batchSeqNo uint64 // track the sequence number of block + poolSize uint64 + logger logrus.FieldLogger + txStore *transactionStore // store all transactions info } func newMempoolImpl(config *Config) *mempoolImpl { mpi := &mempoolImpl{ - localID: config.ID, - batchSeqNo: config.ChainHeight, - logger: config.Logger, + localID: config.ID, + batchSeqNo: config.ChainHeight, + logger: config.Logger, + txSliceSize: config.TxSliceSize, } mpi.txStore = newTransactionStore() if config.BatchSize == 0 { @@ -35,13 +39,19 @@ func newMempoolImpl(config *Config) *mempoolImpl { } else { mpi.poolSize = config.PoolSize } + if config.TxSliceSize == 0 { + mpi.txSliceSize = DefaultTxSetSize + } else { + mpi.txSliceSize = config.TxSliceSize + } mpi.logger.Infof("MemPool batch size = %d", mpi.batchSize) + mpi.logger.Infof("MemPool tx slice size = %d", mpi.batchSize) mpi.logger.Infof("MemPool batch seqNo = %d", mpi.batchSeqNo) mpi.logger.Infof("MemPool pool size = %d", mpi.poolSize) return mpi } -func (mpi *mempoolImpl) ProcessTransactions(txs []*pb.Transaction, isLeader bool) *raftproto.RequestBatch { +func (mpi *mempoolImpl) ProcessTransactions(txs []*pb.Transaction, isLeader, isLocal bool) *raftproto.RequestBatch { validTxs := make(map[string][]*pb.Transaction) for _, tx := range txs { // check the sequence number of tx @@ -65,7 +75,7 @@ func (mpi *mempoolImpl) ProcessTransactions(txs []*pb.Transaction, isLeader bool } // Process all the new transaction and merge any errors into the original slice - dirtyAccounts := mpi.txStore.insertTxs(validTxs) + dirtyAccounts := mpi.txStore.insertTxs(validTxs, isLocal) // send tx to mempool store mpi.processDirtyAccount(dirtyAccounts) @@ -91,10 +101,13 @@ func (mpi *mempoolImpl) processDirtyAccount(dirtyAccounts map[string]bool) { readyTxs, nonReadyTxs, nextDemandNonce := list.filterReady(pendingNonce) mpi.txStore.nonceCache.setPendingNonce(account, nextDemandNonce) - // inset ready txs into priorityIndex. + // insert ready txs into priorityIndex. for _, tx := range readyTxs { - mpi.txStore.priorityIndex.insertByOrderedQueueKey(account, tx) + if !mpi.txStore.priorityIndex.data.Has(makeTimeoutKey(account, tx)) { + mpi.txStore.priorityIndex.insertByTimeoutKey(account, tx) + } } + mpi.txStore.updateEarliestTimestamp() mpi.txStore.priorityNonBatchSize = mpi.txStore.priorityNonBatchSize + uint64(len(readyTxs)) // inset non-ready txs into parkingLotIndex. @@ -108,7 +121,9 @@ func (mpi *mempoolImpl) processDirtyAccount(dirtyAccounts map[string]bool) { // getBlock fetches next block of transactions for consensus, // batchedTx are all txs sent to consensus but were not committed yet, mempool should filter out such txs. func (mpi *mempoolImpl) generateBlock() (*raftproto.RequestBatch, error) { - // txs has lower nonce will be observed first in priority index iterator. + // tx which has lower timestamp will be observed first in priority index iterator. + // and if first seen tx's nonce isn't the required nonce for the account, + // it will be stored in skip DS first. mpi.logger.Debugf("Length of non-batched transactions: %d", mpi.txStore.priorityNonBatchSize) var batchSize uint64 if poolLen := mpi.txStore.priorityNonBatchSize; poolLen > mpi.batchSize { @@ -120,8 +135,8 @@ func (mpi *mempoolImpl) generateBlock() (*raftproto.RequestBatch, error) { skippedTxs := make(map[orderedIndexKey]bool) result := make([]orderedIndexKey, 0, mpi.batchSize) mpi.txStore.priorityIndex.data.Ascend(func(a btree.Item) bool { - tx := a.(*orderedIndexKey) - // if tx has existed in bathedTxs, + tx := a.(*orderedTimeoutKey) + // if tx has existed in bathedTxs, ignore this tx if _, ok := mpi.txStore.batchedTxs[orderedIndexKey{tx.account, tx.nonce}]; ok { return true } @@ -134,7 +149,7 @@ func (mpi *mempoolImpl) generateBlock() (*raftproto.RequestBatch, error) { // include transaction if it's "next" for given account or // we've already sent its ancestor to Consensus if seenPrevious || (txSeq == commitNonce) { - ptr := orderedIndexKey{account: tx.account, nonce: tx.nonce} + ptr := orderedIndexKey{account: tx.account, nonce: txSeq} mpi.txStore.batchedTxs[ptr] = true result = append(result, ptr) if uint64(len(result)) == batchSize { @@ -142,7 +157,7 @@ func (mpi *mempoolImpl) generateBlock() (*raftproto.RequestBatch, error) { } // check if we can now include some txs that were skipped before for given account - skippedTxn := orderedIndexKey{account: tx.account, nonce: tx.nonce + 1} + skippedTxn := orderedIndexKey{account: tx.account, nonce: txSeq + 1} for { if _, ok := skippedTxs[skippedTxn]; !ok { break @@ -155,7 +170,7 @@ func (mpi *mempoolImpl) generateBlock() (*raftproto.RequestBatch, error) { skippedTxn.nonce++ } } else { - skippedTxs[orderedIndexKey{tx.account, tx.nonce}] = true + skippedTxs[orderedIndexKey{tx.account, txSeq}] = true } return true }) @@ -222,14 +237,19 @@ func (mpi *mempoolImpl) processCommitTransactions(state *ChainState) { removedTxs := list.forward(commitNonce) // remove index smaller than commitNonce delete index. var wg sync.WaitGroup - wg.Add(3) + wg.Add(4) go func(ready map[string][]*pb.Transaction) { defer wg.Done() list.index.removeBySortedNonceKey(removedTxs) }(removedTxs) go func(ready map[string][]*pb.Transaction) { defer wg.Done() - mpi.txStore.priorityIndex.removeByOrderedQueueKey(removedTxs) + mpi.txStore.priorityIndex.removeByTimeoutKey(removedTxs) + }(removedTxs) + go func(ready map[string][]*pb.Transaction) { + defer wg.Done() + mpi.txStore.ttlIndex.removeByTtlKey(removedTxs) + mpi.txStore.updateEarliestTimestamp() }(removedTxs) go func(ready map[string][]*pb.Transaction) { defer wg.Done() @@ -248,5 +268,67 @@ func (mpi *mempoolImpl) processCommitTransactions(state *ChainState) { } mpi.logger.Debugf("Replica %d removes batches in mempool, and now there are %d non-batched txs,"+ "priority len: %d, parkingLot len: %d, batchedTx len: %d, txHashMap len: %d", mpi.localID, mpi.txStore.priorityNonBatchSize, - mpi.txStore.priorityIndex.size(), mpi.txStore.parkingLotIndex.size(), len(mpi.txStore.batchedTxs), len(mpi.txStore.txHashMap)) + mpi.txStore.priorityIndex.size(), mpi.txStore.parkingLotIndex.size(), len(mpi.txStore.batchedTxs), len(mpi.txStore.txHashMap)) +} + +func (mpi *mempoolImpl) GetTimeoutTransactions(rebroadcastDuration time.Duration) [][]*pb.Transaction { + // all the tx whose live time is less than lowBoundTime should be rebroadcast + mpi.logger.Debugf("Start gathering timeout txs, ttl index len is %d", mpi.txStore.ttlIndex.index.Len()) + currentTime := time.Now().UnixNano() + if currentTime < mpi.txStore.earliestTimestamp+rebroadcastDuration.Nanoseconds() { + // if the latest incoming tx has not exceeded the timeout limit, then none will be timeout + return [][]*pb.Transaction{} + } + + timeoutItems := make([]*orderedTimeoutKey, 0) + mpi.txStore.ttlIndex.index.Ascend(func(i btree.Item) bool { + item := i.(*orderedTimeoutKey) + if item.timestamp > math.MaxInt64 { + // TODO(tyx): if this tx has rebroadcast many times and exceeded a final limit, + // it is expired and will be removed from mempool + return true + } + // if this tx has not exceeded the rebroadcast duration, break iteration + timeoutTime := item.timestamp + rebroadcastDuration.Nanoseconds() + _, ok := mpi.txStore.allTxs[item.account] + if !ok || currentTime < timeoutTime { + return false + } + timeoutItems = append(timeoutItems, item) + return true + }) + for _, item := range timeoutItems { + // update the liveTime of timeout txs + mpi.txStore.ttlIndex.updateByTtlKey(item, currentTime) + } + // shard txList into fixed size in case txList is too large to broadcast one time + return mpi.shardTxList(timeoutItems, mpi.txSliceSize) +} + +func (mpi *mempoolImpl) shardTxList(timeoutItems []*orderedTimeoutKey, batchLen uint64) [][]*pb.Transaction { + begin := uint64(0) + end := uint64(len(timeoutItems)) - 1 + totalLen := uint64(len(timeoutItems)) + + // shape timeout txs to batch size in case totalLen is too large + batchNums := totalLen / batchLen + if totalLen%batchLen != 0 { + batchNums++ + } + shardedLists := make([][]*pb.Transaction, 0, batchNums) + for i := uint64(0); i < batchNums; i++ { + actualLen := batchLen + if end-begin+1 < batchLen { + actualLen = end - begin + 1 + } + + shardedList := make([]*pb.Transaction, actualLen) + for j := uint64(0); j < batchLen && begin <= end; j++ { + txMap, _ := mpi.txStore.allTxs[timeoutItems[begin].account] + shardedList[j] = txMap.items[timeoutItems[begin].nonce].tx + begin++ + } + shardedLists = append(shardedLists, shardedList) + } + return shardedLists } diff --git a/pkg/order/mempool/mempool_test.go b/pkg/order/mempool/mempool_test.go index fb993cc..ef512bd 100644 --- a/pkg/order/mempool/mempool_test.go +++ b/pkg/order/mempool/mempool_test.go @@ -1,6 +1,7 @@ package mempool import ( + "math" "testing" "time" @@ -21,17 +22,17 @@ func TestGetBlock(t *testing.T) { var txList []*pb.Transaction txList = append(txList, tx1, tx2, tx3, tx4) // mock follower - batch := mpi.ProcessTransactions(txList, false) + batch := mpi.ProcessTransactions(txList, false, true) ast.Nil(batch) // mock leader - batch = mpi.ProcessTransactions(txList, true) + batch = mpi.ProcessTransactions(txList, true, true) ast.Nil(batch) // mock leader to getBlock - txList = make([]*pb.Transaction,0) + txList = make([]*pb.Transaction, 0) tx5 := constructTx(uint64(1), &privKey2) txList = append(txList, tx5) - batch = mpi.ProcessTransactions(txList, true) + batch = mpi.ProcessTransactions(txList, true, true) ast.Equal(4, len(batch.TxList)) } @@ -52,7 +53,7 @@ func TestGetPendingNonceByAccount(t *testing.T) { tx5 := constructTx(uint64(4), &privKey2) var txList []*pb.Transaction txList = append(txList, tx1, tx2, tx3, tx4, tx5) - batch := mpi.ProcessTransactions(txList, false) + batch := mpi.ProcessTransactions(txList, false, true) ast.Nil(batch) nonce = mpi.GetPendingNonceByAccount(account1.String()) ast.Equal(uint64(3), nonce) @@ -74,7 +75,7 @@ func TestCommitTransactions(t *testing.T) { tx4 := constructTx(uint64(4), &privKey2) var txList []*pb.Transaction txList = append(txList, tx1, tx2, tx3, tx4) - batch := mpi.ProcessTransactions(txList, true) + batch := mpi.ProcessTransactions(txList, true, true) ast.Nil(batch) ast.Equal(3, mpi.txStore.priorityIndex.size()) ast.Equal(1, mpi.txStore.parkingLotIndex.size()) @@ -85,17 +86,17 @@ func TestCommitTransactions(t *testing.T) { tx5 := constructTx(uint64(2), &privKey2) txList = []*pb.Transaction{} txList = append(txList, tx5) - batch = mpi.ProcessTransactions(txList, true) + batch = mpi.ProcessTransactions(txList, true, true) ast.Equal(4, len(batch.TxList)) ast.Equal(4, mpi.txStore.priorityIndex.size()) ast.Equal(1, mpi.txStore.parkingLotIndex.size()) - ast.Equal(uint64(2), mpi.batchSeqNo) + ast.Equal(uint64(2), mpi.batchSeqNo) var txHashList []*types.Hash txHashList = append(txHashList, tx1.TransactionHash, tx2.TransactionHash, tx3.TransactionHash, tx5.TransactionHash) state := &ChainState{ TxHashList: txHashList, - Height: uint64(2), + Height: uint64(2), } mpi.CommitTransactions(state) time.Sleep(100 * time.Millisecond) @@ -126,7 +127,7 @@ func TestProcessTransactions(t *testing.T) { tx4 := constructTx(uint64(2), &privKey2) tx5 := constructTx(uint64(4), &privKey2) txList = append(txList, tx1, tx2, tx3, tx4, tx5) - batch := mpi.ProcessTransactions(txList, false) + batch := mpi.ProcessTransactions(txList, false, true) ast.Nil(batch) ast.Equal(4, mpi.txStore.priorityIndex.size()) ast.Equal(1, mpi.txStore.parkingLotIndex.size()) @@ -143,7 +144,7 @@ func TestProcessTransactions(t *testing.T) { tx7 := constructTx(uint64(5), &privKey2) txList = make([]*pb.Transaction, 0) txList = append(txList, tx6, tx7) - batch = mpi.ProcessTransactions(txList, true) + batch = mpi.ProcessTransactions(txList, true, true) ast.Equal(4, len(batch.TxList)) ast.Equal(uint64(2), batch.Height) ast.Equal(uint64(1), mpi.txStore.priorityNonBatchSize) @@ -156,7 +157,6 @@ func TestProcessTransactions(t *testing.T) { ast.Equal(uint64(3), mpi.txStore.nonceCache.getPendingNonce(account2.String())) } - func TestForward(t *testing.T) { ast := assert.New(t) mpi, _ := mockMempoolImpl() @@ -169,7 +169,7 @@ func TestForward(t *testing.T) { tx4 := constructTx(uint64(4), &privKey1) tx5 := constructTx(uint64(6), &privKey1) txList = append(txList, tx1, tx2, tx3, tx4, tx5) - batch := mpi.ProcessTransactions(txList, false) + batch := mpi.ProcessTransactions(txList, false, true) ast.Nil(batch) list := mpi.txStore.allTxs[account1.String()] ast.Equal(5, list.index.size()) @@ -181,4 +181,189 @@ func TestForward(t *testing.T) { ast.Equal(2, len(removeList[account1.String()])) ast.Equal(uint64(1), removeList[account1.String()][0].Nonce) ast.Equal(uint64(2), removeList[account1.String()][1].Nonce) -} \ No newline at end of file +} + +func TestUnorderedIncomingTxs(t *testing.T) { + ast := assert.New(t) + mpi, _ := mockMempoolImpl() + mpi.batchSize = 5 + + txList := make([]*pb.Transaction, 0) + privKey1 := genPrivKey() + account1, _ := privKey1.PublicKey().Address() + privKey2 := genPrivKey() + account2, _ := privKey2.PublicKey().Address() + nonceArr := []uint64{4, 2, 1} + for _, i := range nonceArr { + tx1 := constructTx(i, &privKey1) + tx2 := constructTx(i, &privKey2) + txList = append(txList, tx1, tx2) + } + batch := mpi.ProcessTransactions(txList, true, true) + ast.Nil(batch) // not enough for 5 txs to generate batch + ast.Equal(4, mpi.txStore.priorityIndex.size()) + ast.Equal(2, mpi.txStore.parkingLotIndex.size()) + ast.Equal(6, len(mpi.txStore.txHashMap)) + ast.Equal(3, mpi.txStore.allTxs[account1.String()].index.size()) + ast.Equal(3, mpi.txStore.allTxs[account2.String()].index.size()) + ast.Equal(uint64(1), mpi.txStore.nonceCache.getCommitNonce(account1.String())) + ast.Equal(uint64(3), mpi.txStore.nonceCache.getPendingNonce(account1.String())) + ast.Equal(uint64(1), mpi.txStore.nonceCache.getCommitNonce(account2.String())) + ast.Equal(uint64(3), mpi.txStore.nonceCache.getPendingNonce(account2.String())) + + tx7 := constructTx(uint64(3), &privKey1) + tx8 := constructTx(uint64(5), &privKey2) + txList = make([]*pb.Transaction, 0) + txList = append(txList, tx7, tx8) + batch = mpi.ProcessTransactions(txList, true, true) + ast.NotNil(batch) + + var hashes []types.Hash + // this batch will contain tx{1,2,3} for privKey1 and tx{1,2} for privKey2 + ast.Equal(5, len(batch.TxList)) + ast.Equal(uint64(2), batch.Height) + ast.Equal(uint64(1), mpi.txStore.priorityNonBatchSize) + ast.Equal(6, mpi.txStore.priorityIndex.size()) + ast.Equal(3, mpi.txStore.parkingLotIndex.size(), "delete parkingLot until finishing executor") + ast.Equal(8, len(mpi.txStore.txHashMap)) + ast.Equal(4, mpi.txStore.allTxs[account1.String()].index.size()) + ast.Equal(4, mpi.txStore.allTxs[account2.String()].index.size()) + ast.Equal(uint64(5), mpi.txStore.nonceCache.getPendingNonce(account1.String())) + ast.Equal(uint64(3), mpi.txStore.nonceCache.getPendingNonce(account2.String())) + + // process committed txs + hashList := make([]*types.Hash, 0, len(hashes)) + for _, tx := range batch.TxList { + hashList = append(hashList, tx.Hash()) + } + ready := &ChainState{ + TxHashList: hashList, + Height: 2, + } + mpi.processCommitTransactions(ready) + time.Sleep(100 * time.Millisecond) + + ast.Equal(uint64(1), mpi.txStore.priorityNonBatchSize) + ast.Equal(1, mpi.txStore.priorityIndex.size()) + ast.Equal(3, mpi.txStore.parkingLotIndex.size(), "delete parkingLot until finishing executor") + ast.Equal(3, len(mpi.txStore.txHashMap)) + ast.Equal(1, mpi.txStore.allTxs[account1.String()].index.size()) + ast.Equal(2, mpi.txStore.allTxs[account2.String()].index.size()) + ast.Equal(uint64(5), mpi.txStore.nonceCache.getPendingNonce(account1.String())) + ast.Equal(uint64(3), mpi.txStore.nonceCache.getCommitNonce(account2.String())) + ast.Equal(uint64(3), mpi.txStore.nonceCache.getPendingNonce(account2.String())) + + // generate block3 + txList = make([]*pb.Transaction, 0) + account1NonceArr2 := []uint64{5, 6} + for _, i := range account1NonceArr2 { + tx1 := constructTx(i, &privKey1) + txList = append(txList, tx1) + } + account2NonceArr2 := []uint64{3, 6} + for _, i := range account2NonceArr2 { + tx1 := constructTx(i, &privKey2) + txList = append(txList, tx1) + } + batch = mpi.ProcessTransactions(txList, true, true) + ast.NotNil(batch) + ast.Equal(uint64(2), mpi.txStore.priorityNonBatchSize) + ast.Equal(7, mpi.txStore.priorityIndex.size()) + ast.Equal(3, mpi.txStore.parkingLotIndex.size(), "delete parkingLot until finishing executor") + ast.Equal(7, len(mpi.txStore.txHashMap)) + ast.Equal(3, mpi.txStore.allTxs[account1.String()].index.size()) + ast.Equal(4, mpi.txStore.allTxs[account2.String()].index.size()) + ast.Equal(uint64(4), mpi.txStore.nonceCache.getCommitNonce(account1.String())) + ast.Equal(uint64(7), mpi.txStore.nonceCache.getPendingNonce(account1.String())) + ast.Equal(uint64(3), mpi.txStore.nonceCache.getCommitNonce(account2.String())) + ast.Equal(uint64(7), mpi.txStore.nonceCache.getPendingNonce(account2.String())) + +} + +func TestGetTimeoutTransaction(t *testing.T) { + ast := assert.New(t) + mpi, _ := mockMempoolImpl() + mpi.txSliceSize = 3 + allTxHashes := make([]*types.Hash, 0) + + txList := make([]*pb.Transaction, 0) + privKey1 := genPrivKey() + account1, _ := privKey1.PublicKey().Address() + privKey2 := genPrivKey() + account2, _ := privKey2.PublicKey().Address() + nonceArr := []uint64{4, 5} + for _, i := range nonceArr { + tx1 := constructTx(i, &privKey1) + tx2 := constructTx(i, &privKey2) + txList = append(txList, tx1, tx2) + allTxHashes = append(allTxHashes, tx1.Hash(), tx2.Hash()) + } + batch := mpi.ProcessTransactions(txList, true, true) + ast.Nil(batch) + + // set another incoming list for timeout + time.Sleep(150 * time.Millisecond) + nonceArr = []uint64{1, 2} + for _, i := range nonceArr { + tx1 := constructTx(i, &privKey1) + tx2 := constructTx(i, &privKey2) + txList = append(txList, tx1, tx2) + allTxHashes = append(allTxHashes, tx1.Hash(), tx2.Hash()) + } + batch = mpi.ProcessTransactions(txList, true, false) + ast.NotNil(batch) + // tx1,tx2 for account1 and account2 will be batched. + // all txs for account1 and account2 will be in priorityIndex. + // And tx4,tx5 for account1 and account2 will be timeout while tx3 will not + ast.Equal(4, mpi.txStore.priorityIndex.size()) + ast.Equal(4, mpi.txStore.parkingLotIndex.size()) + ast.Equal(8, len(mpi.txStore.txHashMap)) + ast.Equal(4, mpi.txStore.allTxs[account1.String()].index.size()) + ast.Equal(4, mpi.txStore.allTxs[account2.String()].index.size()) + ast.Equal(uint64(1), mpi.txStore.nonceCache.getCommitNonce(account1.String())) + ast.Equal(uint64(3), mpi.txStore.nonceCache.getPendingNonce(account1.String())) + ast.Equal(uint64(1), mpi.txStore.nonceCache.getCommitNonce(account2.String())) + ast.Equal(uint64(3), mpi.txStore.nonceCache.getPendingNonce(account2.String())) + + tx1 := constructTx(3, &privKey1) + tx2 := constructTx(3, &privKey2) + txList = append(txList, tx1, tx2) + allTxHashes = append(allTxHashes, tx1.Hash(), tx2.Hash()) + batch = mpi.ProcessTransactions(txList, true, true) + ast.NotNil(batch) + + // tx4,tx5 should be timeout + timeoutList := mpi.GetTimeoutTransactions(100 * time.Millisecond) + ast.NotNil(timeoutList) + ast.Equal(2, len(timeoutList)) + ast.Equal(3, len(timeoutList[0])) + ast.Equal(1, len(timeoutList[1])) + ast.Equal(6, mpi.txStore.ttlIndex.index.Len()) + + // wait another 150 millisecond, tx3 be timeout too. + // though tx1,tx2 has wait a long time, they are not local and they won't be broadcast + time.Sleep(150 * time.Millisecond) + timeoutList = mpi.GetTimeoutTransactions(100 * time.Millisecond) + ast.NotNil(timeoutList) + ast.Equal(2, len(timeoutList)) + ast.Equal(3, len(timeoutList[0])) + ast.Equal(3, len(timeoutList[1])) + ast.Equal(6, mpi.txStore.ttlIndex.index.Len()) + + // check if all indices are normally cleaned after commit + ast.Equal(10, len(allTxHashes)) + state := &ChainState{ + TxHashList: allTxHashes, + Height: uint64(2), + } + mpi.CommitTransactions(state) + time.Sleep(100 * time.Millisecond) + ast.Equal(0, mpi.txStore.ttlIndex.index.Len()) + ast.Equal(0, len(mpi.txStore.ttlIndex.items)) + ast.Equal(int64(math.MaxInt64), mpi.txStore.earliestTimestamp) + ast.Equal(0, mpi.txStore.priorityIndex.size()) + ast.Equal(0, mpi.txStore.parkingLotIndex.size()) + ast.Equal(0, len(mpi.txStore.batchedTxs)) + ast.Equal(0, len(mpi.txStore.txHashMap)) + ast.Equal(uint64(0), mpi.txStore.priorityNonBatchSize) +} diff --git a/pkg/order/mempool/tx_store.go b/pkg/order/mempool/tx_store.go index 8b05c0e..e9419b7 100644 --- a/pkg/order/mempool/tx_store.go +++ b/pkg/order/mempool/tx_store.go @@ -1,9 +1,11 @@ package mempool import ( + "math" + "sync" + "github.com/google/btree" "github.com/meshplus/bitxhub-model/pb" - "sync" ) type transactionStore struct { @@ -13,6 +15,10 @@ type transactionStore struct { allTxs map[string]*txSortedMap // track the commit nonce and pending nonce of each account. nonceCache *nonceCache + // keep track of the latest timestamp of ready txs in ttlIndex + earliestTimestamp int64 + // keep track of the livetime of ready txs in priorityIndex + ttlIndex *txLiveTimeMap // keeps track of "non-ready" txs (txs that can't be included in next block) // only used to help remove some txs if pool is full. parkingLotIndex *btreeIndex @@ -31,11 +37,12 @@ func newTransactionStore() *transactionStore { batchedTxs: make(map[orderedIndexKey]bool), parkingLotIndex: newBtreeIndex(), priorityIndex: newBtreeIndex(), + ttlIndex: newTxLiveTimeMap(), nonceCache: newNonceCache(), } } -func (txStore *transactionStore) insertTxs(txs map[string][]*pb.Transaction) map[string]bool { +func (txStore *transactionStore) insertTxs(txs map[string][]*pb.Transaction, isLocal bool) map[string]bool { dirtyAccounts := make(map[string]bool) for account, list := range txs { for _, tx := range list { @@ -54,9 +61,14 @@ func (txStore *transactionStore) insertTxs(txs map[string][]*pb.Transaction) map txItem := &txItem{ account: account, tx: tx, + local: isLocal, } txList.items[tx.Nonce] = txItem txList.index.insertBySortedNonceKey(tx) + if isLocal { + // no need to rebroadcast tx from other nodes to reduce network overhead + txStore.ttlIndex.insertByTtlKey(account, tx.Nonce, tx.Timestamp) + } } dirtyAccounts[account] = true } @@ -75,6 +87,16 @@ func (txStore *transactionStore) getTxByOrderKey(account string, seqNo uint64) * return nil } +func (txStore *transactionStore) updateEarliestTimestamp() { + // find the earliest tx in ttlIndex + earliestTime := int64(math.MaxInt64) + latestItem := txStore.ttlIndex.index.Min() + if latestItem != nil { + earliestTime = latestItem.(*orderedTimeoutKey).timestamp + } + txStore.earliestTimestamp = earliestTime +} + type txSortedMap struct { items map[uint64]*txItem // map nonce to transaction index *btreeIndex // index for items @@ -134,7 +156,7 @@ type nonceCache struct { // pendingNonces records each account's latest nonce which has been included in // priority queue. Invariant: pendingNonces[account] >= commitNonces[account] pendingNonces map[string]uint64 - pendingMu sync.RWMutex + pendingMu sync.RWMutex } func newNonceCache() *nonceCache { @@ -171,3 +193,41 @@ func (nc *nonceCache) setPendingNonce(account string, nonce uint64) { defer nc.pendingMu.Unlock() nc.pendingNonces[account] = nonce } + +// since the live time field in sortedTtlKey may vary during process +// we need to track the latest live time since its latest broadcast. +type txLiveTimeMap struct { + items map[string]int64 // map account to its latest live time + index *btree.BTree // index for txs +} + +func newTxLiveTimeMap() *txLiveTimeMap { + return &txLiveTimeMap{ + index: btree.New(btreeDegree), + items: make(map[string]int64), + } +} + +func (tlm *txLiveTimeMap) insertByTtlKey(account string, nonce uint64, liveTime int64) { + tlm.index.ReplaceOrInsert(&orderedTimeoutKey{account, nonce, liveTime}) + tlm.items[makeAccountNonceKey(account, nonce)] = liveTime +} + +func (tlm *txLiveTimeMap) removeByTtlKey(txs map[string][]*pb.Transaction) { + for account, list := range txs { + for _, tx := range list { + liveTime, ok := tlm.items[makeAccountNonceKey(account, tx.Nonce)] + if !ok { + continue + } + tlm.index.Delete(&orderedTimeoutKey{account, tx.Nonce, liveTime}) + delete(tlm.items, makeAccountNonceKey(account, tx.Nonce)) + } + } +} + +func (tlm *txLiveTimeMap) updateByTtlKey(originalKey *orderedTimeoutKey, newTime int64) { + tlm.index.Delete(originalKey) + delete(tlm.items, makeAccountNonceKey(originalKey.account, originalKey.nonce)) + tlm.insertByTtlKey(originalKey.account, originalKey.nonce, newTime) +} diff --git a/pkg/order/mempool/types.go b/pkg/order/mempool/types.go index 5fd8223..43d44ab 100644 --- a/pkg/order/mempool/types.go +++ b/pkg/order/mempool/types.go @@ -17,15 +17,16 @@ const ( DefaultTxCacheSize = 10000 DefaultBatchSize = 500 DefaultTxSetSize = 10 - DefaultTxSetTick = 100 * time.Millisecond + DefaultTxSetTick = 100 * time.Millisecond ) type Config struct { - ID uint64 - BatchSize uint64 - PoolSize uint64 - TxSliceSize uint64 - TxSliceTimeout time.Duration + ID uint64 + BatchSize uint64 + PoolSize uint64 + RebroadcastTimeout time.Duration + TxSliceSize uint64 + TxSliceTimeout time.Duration ChainHeight uint64 Logger logrus.FieldLogger } @@ -33,10 +34,11 @@ type Config struct { type txItem struct { account string tx *pb.Transaction + local bool } type ChainState struct { - Height uint64 - BlockHash *types.Hash + Height uint64 + BlockHash *types.Hash TxHashList []*types.Hash -} \ No newline at end of file +} diff --git a/pkg/order/solo/node.go b/pkg/order/solo/node.go index 7f025b4..d09cc3d 100644 --- a/pkg/order/solo/node.go +++ b/pkg/order/solo/node.go @@ -179,7 +179,7 @@ func (n *Node) listenReadyBlock() { if !n.batchMgr.IsBatchTimerActive() { n.batchMgr.StartBatchTimer() } - if batch := n.mempool.ProcessTransactions(txSet.TxList, true); batch != nil { + if batch := n.mempool.ProcessTransactions(txSet.TxList, true, true); batch != nil { n.batchMgr.StopBatchTimer() n.proposeC <- batch } diff --git a/pkg/peermgr/mock_peermgr/mock_peermgr.go b/pkg/peermgr/mock_peermgr/mock_peermgr.go index 9ec924e..c08f90b 100644 --- a/pkg/peermgr/mock_peermgr/mock_peermgr.go +++ b/pkg/peermgr/mock_peermgr/mock_peermgr.go @@ -10,6 +10,7 @@ import ( peer "github.com/libp2p/go-libp2p-core/peer" pb "github.com/meshplus/bitxhub-model/pb" events "github.com/meshplus/bitxhub/internal/model/events" + peermgr "github.com/meshplus/bitxhub/pkg/peermgr" network "github.com/meshplus/go-lightp2p" reflect "reflect" ) @@ -20,10 +21,6 @@ type MockPeerManager struct { recorder *MockPeerManagerMockRecorder } -func (m *MockPeerManager) CountConnectedPeers() uint64 { - return 0 -} - // MockPeerManagerMockRecorder is the mock recorder for MockPeerManager type MockPeerManagerMockRecorder struct { mock *MockPeerManager @@ -231,3 +228,69 @@ func (mr *MockPeerManagerMockRecorder) Disconnect(vpInfos interface{}) *gomock.C mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Disconnect", reflect.TypeOf((*MockPeerManager)(nil).Disconnect), vpInfos) } + +// PierManager mocks base method +func (m *MockPeerManager) PierManager() peermgr.PierManager { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "PierManager") + ret0, _ := ret[0].(peermgr.PierManager) + return ret0 +} + +// PierManager indicates an expected call of PierManager +func (mr *MockPeerManagerMockRecorder) PierManager() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PierManager", reflect.TypeOf((*MockPeerManager)(nil).PierManager)) +} + +// MockPierManager is a mock of PierManager interface +type MockPierManager struct { + ctrl *gomock.Controller + recorder *MockPierManagerMockRecorder +} + +// MockPierManagerMockRecorder is the mock recorder for MockPierManager +type MockPierManagerMockRecorder struct { + mock *MockPierManager +} + +// NewMockPierManager creates a new mock instance +func NewMockPierManager(ctrl *gomock.Controller) *MockPierManager { + mock := &MockPierManager{ctrl: ctrl} + mock.recorder = &MockPierManagerMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use +func (m *MockPierManager) EXPECT() *MockPierManagerMockRecorder { + return m.recorder +} + +// Piers mocks base method +func (m *MockPierManager) Piers() *peermgr.Piers { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Piers") + ret0, _ := ret[0].(*peermgr.Piers) + return ret0 +} + +// Piers indicates an expected call of Piers +func (mr *MockPierManagerMockRecorder) Piers() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Piers", reflect.TypeOf((*MockPierManager)(nil).Piers)) +} + +// AskPierMaster mocks base method +func (m *MockPierManager) AskPierMaster(arg0 string) (bool, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "AskPierMaster", arg0) + ret0, _ := ret[0].(bool) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// AskPierMaster indicates an expected call of AskPierMaster +func (mr *MockPierManagerMockRecorder) AskPierMaster(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AskPierMaster", reflect.TypeOf((*MockPierManager)(nil).AskPierMaster), arg0) +}