Merge pull request #309 from meshplus/feat/mempool-timeout

Feat/mempool tx timeout and rebroadcast for raft
This commit is contained in:
jzhe 2021-01-19 18:36:53 +08:00 committed by GitHub
commit cb59bacb0f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 549 additions and 79 deletions

View File

@ -1,5 +1,6 @@
[raft]
batch_timeout = "0.3s" # Block packaging time period.
check_interval = "3m" # Node check interval to check if there exist txs to be rebroadcast
tick_timeout = "0.1s" # TickTimeout is the internal logical clock for the Node by a single tick, Election timeouts and heartbeat timeouts are in units of ticks.
election_tick = 10 # ElectionTick is the number of Node.Tick invocations that must pass between elections.
heartbeat_tick = 1 # HeartbeatTick is the number of Node.Tick invocations that must pass between heartbeats.

4
go.sum
View File

@ -40,6 +40,7 @@ github.com/Shopify/sarama v1.27.0/go.mod h1:aCdj6ymI8uyPEux1JJ9gcaDT6cinjGhNCAhs
github.com/Shopify/toxiproxy v2.1.4+incompatible h1:TKdv8HiTLgE5wdJuEML90aBgNWsokNbMijUGhmcoBJc=
github.com/Shopify/toxiproxy v2.1.4+incompatible/go.mod h1:OXgGpZ6Cli1/URJOF1DMxUHB2q5Ap20/P/eIdh4G0pI=
github.com/StackExchange/wmi v0.0.0-20180116203802-5d049714c4a6/go.mod h1:3eOhrUMpNV+6aFIbp5/iudMxNCF27Vw2OZgy4xEx0Fg=
github.com/VictoriaMetrics/fastcache v1.5.7 h1:4y6y0G8PRzszQUYIQHHssv/jgPHAb5qQuuDNdCbyAgw=
github.com/VictoriaMetrics/fastcache v1.5.7/go.mod h1:ptDBkNMQI4RtmVo8VS/XwRY6RoTu1dAWCbrk+6WsEM8=
github.com/aead/siphash v1.0.1/go.mod h1:Nywa3cDsYNNK3gaciGTWPwHt0wlpNV15vwmswBAUSII=
github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
@ -782,6 +783,7 @@ github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQD
github.com/ryanuber/columnize v0.0.0-20160712163229-9b3edd62028f/go.mod h1:sm1tb6uqfes/u+d4ooFouqFdy9/2g9QGwK3SQygK0Ts=
github.com/satori/go.uuid v1.2.0/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0=
github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg65j358z/aeFdxmN0P9QXhEzd20vsDc=
github.com/shirou/gopsutil v2.20.5+incompatible h1:tYH07UPoQt0OCQdgWWMgYHy3/a9bcxNpBIysykNIP7I=
github.com/shirou/gopsutil v2.20.5+incompatible/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA=
github.com/shurcooL/sanitized_anchor_name v1.0.0 h1:PdmoCO6wvbs+7yrJyMORt4/BmY5IYyJwS/kOiWx8mHo=
github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc=
@ -820,7 +822,9 @@ github.com/spf13/viper v1.7.1 h1:pM5oEahlgWv/WnHXpgbKz7iLIxRf65tye2Ci+XFK5sk=
github.com/spf13/viper v1.7.1/go.mod h1:8WkrPz2fc9jxqZNCJI/76HCieCp4Q8HaLFoCha5qpdg=
github.com/src-d/envconfig v1.0.0/go.mod h1:Q9YQZ7BKITldTBnoxsE5gOeB5y66RyPXeue/R4aaNBc=
github.com/status-im/keycard-go v0.0.0-20190316090335-8537d3370df4/go.mod h1:RZLeN1LMWmRsyYjvAu+I6Dm9QmlDaIIt+Y+4Kd7Tp+Q=
github.com/steakknife/bloomfilter v0.0.0-20180922174646-6819c0d2a570 h1:gIlAHnH1vJb5vwEjIp5kBj/eu99p/bl0Ay2goiPe5xE=
github.com/steakknife/bloomfilter v0.0.0-20180922174646-6819c0d2a570/go.mod h1:8OR4w3TdeIHIh1g6EMY5p0gVNOovcWC+1vpc7naMuAw=
github.com/steakknife/hamming v0.0.0-20180906055917-c99c65617cd3 h1:njlZPzLwU639dk2kqnCPPv+wNjq7Xb6EfUxe/oX0/NM=
github.com/steakknife/hamming v0.0.0-20180906055917-c99c65617cd3/go.mod h1:hpGUWaI9xL8pRQCTXQgocU38Qw1g0Us7n5PxxTwTCYU=
github.com/streadway/amqp v0.0.0-20190827072141-edfb9018d271/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=

View File

@ -27,6 +27,7 @@ type SyncerConfig struct {
type RAFT struct {
BatchTimeout time.Duration `mapstructure:"batch_timeout"`
CheckInterval time.Duration `mapstructure:"check_interval"`
TickTimeout time.Duration `mapstructure:"tick_timeout"`
ElectionTick int `mapstructure:"election_tick"`
HeartbeatTick int `mapstructure:"heartbeat_tick"`

View File

@ -4,7 +4,6 @@ import (
"context"
"errors"
"fmt"
"github.com/meshplus/bitxhub-kit/log"
"path/filepath"
"sync"
"sync/atomic"
@ -12,6 +11,7 @@ import (
"github.com/coreos/etcd/raft"
"github.com/coreos/etcd/raft/raftpb"
"github.com/meshplus/bitxhub-kit/log"
"github.com/meshplus/bitxhub-kit/storage"
"github.com/meshplus/bitxhub-kit/types"
"github.com/meshplus/bitxhub-model/pb"
@ -39,26 +39,28 @@ type Node struct {
txCache *mempool.TxCache // cache the transactions received from api
batchTimerMgr *BatchTimer
proposeC chan *raftproto.RequestBatch // proposed ready, input channel
confChangeC <-chan raftpb.ConfChange // proposed cluster config changes
commitC chan *pb.CommitEvent // the hash commit channel
errorC chan<- error // errors from raft session
tickTimeout time.Duration // tick timeout
msgC chan []byte // receive messages from remote peer
stateC chan *mempool.ChainState // receive the executed block state
proposeC chan *raftproto.RequestBatch // proposed ready, input channel
confChangeC <-chan raftpb.ConfChange // proposed cluster config changes
commitC chan *pb.CommitEvent // the hash commit channel
errorC chan<- error // errors from raft session
tickTimeout time.Duration // tick timeout
checkInterval time.Duration // interval for rebroadcast
msgC chan []byte // receive messages from remote peer
stateC chan *mempool.ChainState // receive the executed block state
rebroadcastTicker chan *raftproto.TxSlice // receive the executed block state
confState raftpb.ConfState // raft requires ConfState to be persisted within snapshot
blockAppliedIndex sync.Map // mapping of block height and apply index in raft log
appliedIndex uint64 // current apply index in raft log
snapCount uint64 // snapshot count
snapshotIndex uint64 // current snapshot apply index in raft log
lastIndex uint64 // last apply index in raft log
lastExec uint64 // the index of the last-applied block
readyPool *sync.Pool // ready pool, avoiding memory growth fast
justElected bool // track new leader status
confState raftpb.ConfState // raft requires ConfState to be persisted within snapshot
blockAppliedIndex sync.Map // mapping of block height and apply index in raft log
appliedIndex uint64 // current apply index in raft log
snapCount uint64 // snapshot count
snapshotIndex uint64 // current snapshot apply index in raft log
lastIndex uint64 // last apply index in raft log
lastExec uint64 // the index of the last-applied block
readyPool *sync.Pool // ready pool, avoiding memory growth fast
justElected bool // track new leader status
getChainMetaFunc func() *pb.ChainMeta // current chain meta
ctx context.Context // context
haltC chan struct{} // exit signal
ctx context.Context // context
haltC chan struct{} // exit signal
}
// NewNode new raft node
@ -120,6 +122,13 @@ func NewNode(opts ...order.Option) (order.Order, error) {
snapCount = DefaultSnapshotCount
}
var checkInterval time.Duration
if raftConfig.RAFT.CheckInterval == 0 {
checkInterval = DefaultCheckInterval
} else {
checkInterval = raftConfig.RAFT.CheckInterval
}
node := &Node{
id: config.ID,
lastExec: config.Applied,
@ -142,6 +151,7 @@ func NewNode(opts ...order.Option) (order.Order, error) {
readyPool: readyPool,
ctx: context.Background(),
mempool: mempoolInst,
checkInterval: checkInterval,
}
node.raftStorage.SnapshotCatchUpEntries = node.snapCount
@ -175,6 +185,7 @@ func (n *Node) Start() error {
n.node = raft.StartNode(rc, n.peers)
}
n.tickTimeout = tickTimeout
go n.run()
go n.txCache.ListenEvent()
n.logger.Info("Consensus module started")
@ -249,7 +260,9 @@ func (n *Node) run() {
n.snapshotIndex = snap.Metadata.Index
n.appliedIndex = snap.Metadata.Index
ticker := time.NewTicker(n.tickTimeout)
rebroadcastTicker := time.NewTicker(n.checkInterval)
defer ticker.Stop()
defer rebroadcastTicker.Stop()
// handle input request
go func() {
@ -308,11 +321,24 @@ func (n *Node) run() {
_ = n.peerMgr.Broadcast(pbMsg)
// 2. process transactions
n.processTransactions(txSet.TxList)
n.processTransactions(txSet.TxList, true)
case state := <-n.stateC:
n.reportState(state)
case <-rebroadcastTicker.C:
// check periodically if there are long-pending txs in mempool
rebroadcastTxs := n.mempool.GetTimeoutTransactions(n.checkInterval)
for _, txSlice := range rebroadcastTxs {
txSet := &raftproto.TxSlice{TxList: txSlice}
data, err := txSet.Marshal()
if err != nil {
n.logger.Errorf("Marshal failed, err: %s", err.Error())
return
}
pbMsg := msgToConsensusPbMsg(data, raftproto.RaftMessage_BROADCAST_TX, n.id)
_ = n.peerMgr.Broadcast(pbMsg)
}
case <-n.batchTimerMgr.BatchTimeoutEvent():
n.batchTimerMgr.StopBatchTimer()
// call txPool module to generate a tx batch
@ -386,7 +412,7 @@ func (n *Node) run() {
}
}
func (n *Node) processTransactions(txList []*pb.Transaction) {
func (n *Node) processTransactions(txList []*pb.Transaction, isLocal bool) {
// leader node would check if this transaction triggered generating a batch or not
if n.isLeader() {
// start batch timer when this node receives the first transaction
@ -394,12 +420,12 @@ func (n *Node) processTransactions(txList []*pb.Transaction) {
n.batchTimerMgr.StartBatchTimer()
}
// If this transaction triggers generating a batch, stop batch timer
if batch := n.mempool.ProcessTransactions(txList, true); batch != nil {
if batch := n.mempool.ProcessTransactions(txList, true, isLocal); batch != nil {
n.batchTimerMgr.StopBatchTimer()
n.postProposal(batch)
}
} else {
n.mempool.ProcessTransactions(txList, false)
n.mempool.ProcessTransactions(txList, false, isLocal)
}
}
@ -560,7 +586,7 @@ func (n *Node) processRaftCoreMsg(msg []byte) error {
if err := txSlice.Unmarshal(rm.Data); err != nil {
return err
}
n.processTransactions(txSlice.TxList)
n.processTransactions(txSlice.TxList, false)
default:
return fmt.Errorf("unexpected raft message received")

View File

@ -14,8 +14,9 @@ import (
)
const (
DefaultBatchTick = 500 * time.Millisecond
DefaultBatchTick = 500 * time.Millisecond
DefaultSnapshotCount = 1000
DefaultCheckInterval = 3 * time.Minute
)
func generateRaftPeers(config *order.Config) ([]raft.Peer, error) {

View File

@ -1,6 +1,8 @@
package mempool
import (
"fmt"
"github.com/meshplus/bitxhub-model/pb"
"github.com/google/btree"
@ -32,6 +34,23 @@ func (snk *sortedNonceKey) Less(item btree.Item) bool {
return snk.nonce < dst.nonce
}
type orderedTimeoutKey struct {
account string
nonce uint64
timestamp int64 // the timestamp of index key created
}
func (otk *orderedTimeoutKey) Less(than btree.Item) bool {
other := than.(*orderedTimeoutKey)
if otk.timestamp != other.timestamp {
return otk.timestamp < other.timestamp
}
if otk.account != other.account {
return otk.account < other.account
}
return otk.nonce < other.nonce
}
func makeOrderedIndexKey(account string, tx *pb.Transaction) *orderedIndexKey {
return &orderedIndexKey{
account: account,
@ -39,12 +58,24 @@ func makeOrderedIndexKey(account string, tx *pb.Transaction) *orderedIndexKey {
}
}
func makeTimeoutKey(account string, tx *pb.Transaction) *orderedTimeoutKey {
return &orderedTimeoutKey{
account: account,
nonce: tx.Nonce,
timestamp: tx.Timestamp,
}
}
func makeSortedNonceKey(nonce uint64) *sortedNonceKey {
return &sortedNonceKey{
nonce: nonce,
}
}
func makeAccountNonceKey(account string, nonce uint64) string {
return fmt.Sprintf("%s-%d", account, nonce)
}
type btreeIndex struct {
data *btree.BTree
}
@ -79,6 +110,18 @@ func (idx *btreeIndex) removeByOrderedQueueKey(txs map[string][]*pb.Transaction)
}
}
func (idx *btreeIndex) insertByTimeoutKey(account string, tx *pb.Transaction) {
idx.data.ReplaceOrInsert(makeTimeoutKey(account, tx))
}
func (idx *btreeIndex) removeByTimeoutKey(txs map[string][]*pb.Transaction) {
for account, list := range txs {
for _, tx := range list {
idx.data.Delete(makeTimeoutKey(account, tx))
}
}
}
// Size returns the size of the index
func (idx *btreeIndex) size() int {
return idx.data.Len()

View File

@ -11,7 +11,7 @@ var _ MemPool = (*mempoolImpl)(nil)
type MemPool interface {
// ProcessTransactions process transaction from api and other vp nodes.
ProcessTransactions(txs []*pb.Transaction, isLeader bool) *raftproto.RequestBatch
ProcessTransactions(txs []*pb.Transaction, isLeader, isLocal bool) *raftproto.RequestBatch
// GenerateBlock generate a block
GenerateBlock() *raftproto.RequestBatch
@ -24,6 +24,8 @@ type MemPool interface {
SetBatchSeqNo(batchSeq uint64)
GetTimeoutTransactions(rebroadcastDuration time.Duration) [][]*pb.Transaction
External
}

View File

@ -1,7 +1,9 @@
package mempool
import (
"math"
"sync"
"time"
"github.com/google/btree"
"github.com/meshplus/bitxhub-model/pb"
@ -10,19 +12,21 @@ import (
)
type mempoolImpl struct {
localID uint64
batchSize uint64
batchSeqNo uint64 // track the sequence number of block
poolSize uint64
logger logrus.FieldLogger
txStore *transactionStore // store all transactions info
localID uint64
batchSize uint64
txSliceSize uint64
batchSeqNo uint64 // track the sequence number of block
poolSize uint64
logger logrus.FieldLogger
txStore *transactionStore // store all transactions info
}
func newMempoolImpl(config *Config) *mempoolImpl {
mpi := &mempoolImpl{
localID: config.ID,
batchSeqNo: config.ChainHeight,
logger: config.Logger,
localID: config.ID,
batchSeqNo: config.ChainHeight,
logger: config.Logger,
txSliceSize: config.TxSliceSize,
}
mpi.txStore = newTransactionStore()
if config.BatchSize == 0 {
@ -35,13 +39,19 @@ func newMempoolImpl(config *Config) *mempoolImpl {
} else {
mpi.poolSize = config.PoolSize
}
if config.TxSliceSize == 0 {
mpi.txSliceSize = DefaultTxSetSize
} else {
mpi.txSliceSize = config.TxSliceSize
}
mpi.logger.Infof("MemPool batch size = %d", mpi.batchSize)
mpi.logger.Infof("MemPool tx slice size = %d", mpi.batchSize)
mpi.logger.Infof("MemPool batch seqNo = %d", mpi.batchSeqNo)
mpi.logger.Infof("MemPool pool size = %d", mpi.poolSize)
return mpi
}
func (mpi *mempoolImpl) ProcessTransactions(txs []*pb.Transaction, isLeader bool) *raftproto.RequestBatch {
func (mpi *mempoolImpl) ProcessTransactions(txs []*pb.Transaction, isLeader, isLocal bool) *raftproto.RequestBatch {
validTxs := make(map[string][]*pb.Transaction)
for _, tx := range txs {
// check the sequence number of tx
@ -65,7 +75,7 @@ func (mpi *mempoolImpl) ProcessTransactions(txs []*pb.Transaction, isLeader bool
}
// Process all the new transaction and merge any errors into the original slice
dirtyAccounts := mpi.txStore.insertTxs(validTxs)
dirtyAccounts := mpi.txStore.insertTxs(validTxs, isLocal)
// send tx to mempool store
mpi.processDirtyAccount(dirtyAccounts)
@ -91,10 +101,13 @@ func (mpi *mempoolImpl) processDirtyAccount(dirtyAccounts map[string]bool) {
readyTxs, nonReadyTxs, nextDemandNonce := list.filterReady(pendingNonce)
mpi.txStore.nonceCache.setPendingNonce(account, nextDemandNonce)
// inset ready txs into priorityIndex.
// insert ready txs into priorityIndex.
for _, tx := range readyTxs {
mpi.txStore.priorityIndex.insertByOrderedQueueKey(account, tx)
if !mpi.txStore.priorityIndex.data.Has(makeTimeoutKey(account, tx)) {
mpi.txStore.priorityIndex.insertByTimeoutKey(account, tx)
}
}
mpi.txStore.updateEarliestTimestamp()
mpi.txStore.priorityNonBatchSize = mpi.txStore.priorityNonBatchSize + uint64(len(readyTxs))
// inset non-ready txs into parkingLotIndex.
@ -108,7 +121,9 @@ func (mpi *mempoolImpl) processDirtyAccount(dirtyAccounts map[string]bool) {
// getBlock fetches next block of transactions for consensus,
// batchedTx are all txs sent to consensus but were not committed yet, mempool should filter out such txs.
func (mpi *mempoolImpl) generateBlock() (*raftproto.RequestBatch, error) {
// txs has lower nonce will be observed first in priority index iterator.
// tx which has lower timestamp will be observed first in priority index iterator.
// and if first seen tx's nonce isn't the required nonce for the account,
// it will be stored in skip DS first.
mpi.logger.Debugf("Length of non-batched transactions: %d", mpi.txStore.priorityNonBatchSize)
var batchSize uint64
if poolLen := mpi.txStore.priorityNonBatchSize; poolLen > mpi.batchSize {
@ -120,8 +135,8 @@ func (mpi *mempoolImpl) generateBlock() (*raftproto.RequestBatch, error) {
skippedTxs := make(map[orderedIndexKey]bool)
result := make([]orderedIndexKey, 0, mpi.batchSize)
mpi.txStore.priorityIndex.data.Ascend(func(a btree.Item) bool {
tx := a.(*orderedIndexKey)
// if tx has existed in bathedTxs,
tx := a.(*orderedTimeoutKey)
// if tx has existed in bathedTxs, ignore this tx
if _, ok := mpi.txStore.batchedTxs[orderedIndexKey{tx.account, tx.nonce}]; ok {
return true
}
@ -134,7 +149,7 @@ func (mpi *mempoolImpl) generateBlock() (*raftproto.RequestBatch, error) {
// include transaction if it's "next" for given account or
// we've already sent its ancestor to Consensus
if seenPrevious || (txSeq == commitNonce) {
ptr := orderedIndexKey{account: tx.account, nonce: tx.nonce}
ptr := orderedIndexKey{account: tx.account, nonce: txSeq}
mpi.txStore.batchedTxs[ptr] = true
result = append(result, ptr)
if uint64(len(result)) == batchSize {
@ -142,7 +157,7 @@ func (mpi *mempoolImpl) generateBlock() (*raftproto.RequestBatch, error) {
}
// check if we can now include some txs that were skipped before for given account
skippedTxn := orderedIndexKey{account: tx.account, nonce: tx.nonce + 1}
skippedTxn := orderedIndexKey{account: tx.account, nonce: txSeq + 1}
for {
if _, ok := skippedTxs[skippedTxn]; !ok {
break
@ -155,7 +170,7 @@ func (mpi *mempoolImpl) generateBlock() (*raftproto.RequestBatch, error) {
skippedTxn.nonce++
}
} else {
skippedTxs[orderedIndexKey{tx.account, tx.nonce}] = true
skippedTxs[orderedIndexKey{tx.account, txSeq}] = true
}
return true
})
@ -222,14 +237,19 @@ func (mpi *mempoolImpl) processCommitTransactions(state *ChainState) {
removedTxs := list.forward(commitNonce)
// remove index smaller than commitNonce delete index.
var wg sync.WaitGroup
wg.Add(3)
wg.Add(4)
go func(ready map[string][]*pb.Transaction) {
defer wg.Done()
list.index.removeBySortedNonceKey(removedTxs)
}(removedTxs)
go func(ready map[string][]*pb.Transaction) {
defer wg.Done()
mpi.txStore.priorityIndex.removeByOrderedQueueKey(removedTxs)
mpi.txStore.priorityIndex.removeByTimeoutKey(removedTxs)
}(removedTxs)
go func(ready map[string][]*pb.Transaction) {
defer wg.Done()
mpi.txStore.ttlIndex.removeByTtlKey(removedTxs)
mpi.txStore.updateEarliestTimestamp()
}(removedTxs)
go func(ready map[string][]*pb.Transaction) {
defer wg.Done()
@ -248,5 +268,67 @@ func (mpi *mempoolImpl) processCommitTransactions(state *ChainState) {
}
mpi.logger.Debugf("Replica %d removes batches in mempool, and now there are %d non-batched txs,"+
"priority len: %d, parkingLot len: %d, batchedTx len: %d, txHashMap len: %d", mpi.localID, mpi.txStore.priorityNonBatchSize,
mpi.txStore.priorityIndex.size(), mpi.txStore.parkingLotIndex.size(), len(mpi.txStore.batchedTxs), len(mpi.txStore.txHashMap))
mpi.txStore.priorityIndex.size(), mpi.txStore.parkingLotIndex.size(), len(mpi.txStore.batchedTxs), len(mpi.txStore.txHashMap))
}
func (mpi *mempoolImpl) GetTimeoutTransactions(rebroadcastDuration time.Duration) [][]*pb.Transaction {
// all the tx whose live time is less than lowBoundTime should be rebroadcast
mpi.logger.Debugf("Start gathering timeout txs, ttl index len is %d", mpi.txStore.ttlIndex.index.Len())
currentTime := time.Now().UnixNano()
if currentTime < mpi.txStore.earliestTimestamp+rebroadcastDuration.Nanoseconds() {
// if the latest incoming tx has not exceeded the timeout limit, then none will be timeout
return [][]*pb.Transaction{}
}
timeoutItems := make([]*orderedTimeoutKey, 0)
mpi.txStore.ttlIndex.index.Ascend(func(i btree.Item) bool {
item := i.(*orderedTimeoutKey)
if item.timestamp > math.MaxInt64 {
// TODO(tyx): if this tx has rebroadcast many times and exceeded a final limit,
// it is expired and will be removed from mempool
return true
}
// if this tx has not exceeded the rebroadcast duration, break iteration
timeoutTime := item.timestamp + rebroadcastDuration.Nanoseconds()
_, ok := mpi.txStore.allTxs[item.account]
if !ok || currentTime < timeoutTime {
return false
}
timeoutItems = append(timeoutItems, item)
return true
})
for _, item := range timeoutItems {
// update the liveTime of timeout txs
mpi.txStore.ttlIndex.updateByTtlKey(item, currentTime)
}
// shard txList into fixed size in case txList is too large to broadcast one time
return mpi.shardTxList(timeoutItems, mpi.txSliceSize)
}
func (mpi *mempoolImpl) shardTxList(timeoutItems []*orderedTimeoutKey, batchLen uint64) [][]*pb.Transaction {
begin := uint64(0)
end := uint64(len(timeoutItems)) - 1
totalLen := uint64(len(timeoutItems))
// shape timeout txs to batch size in case totalLen is too large
batchNums := totalLen / batchLen
if totalLen%batchLen != 0 {
batchNums++
}
shardedLists := make([][]*pb.Transaction, 0, batchNums)
for i := uint64(0); i < batchNums; i++ {
actualLen := batchLen
if end-begin+1 < batchLen {
actualLen = end - begin + 1
}
shardedList := make([]*pb.Transaction, actualLen)
for j := uint64(0); j < batchLen && begin <= end; j++ {
txMap, _ := mpi.txStore.allTxs[timeoutItems[begin].account]
shardedList[j] = txMap.items[timeoutItems[begin].nonce].tx
begin++
}
shardedLists = append(shardedLists, shardedList)
}
return shardedLists
}

View File

@ -1,6 +1,7 @@
package mempool
import (
"math"
"testing"
"time"
@ -21,17 +22,17 @@ func TestGetBlock(t *testing.T) {
var txList []*pb.Transaction
txList = append(txList, tx1, tx2, tx3, tx4)
// mock follower
batch := mpi.ProcessTransactions(txList, false)
batch := mpi.ProcessTransactions(txList, false, true)
ast.Nil(batch)
// mock leader
batch = mpi.ProcessTransactions(txList, true)
batch = mpi.ProcessTransactions(txList, true, true)
ast.Nil(batch)
// mock leader to getBlock
txList = make([]*pb.Transaction,0)
txList = make([]*pb.Transaction, 0)
tx5 := constructTx(uint64(1), &privKey2)
txList = append(txList, tx5)
batch = mpi.ProcessTransactions(txList, true)
batch = mpi.ProcessTransactions(txList, true, true)
ast.Equal(4, len(batch.TxList))
}
@ -52,7 +53,7 @@ func TestGetPendingNonceByAccount(t *testing.T) {
tx5 := constructTx(uint64(4), &privKey2)
var txList []*pb.Transaction
txList = append(txList, tx1, tx2, tx3, tx4, tx5)
batch := mpi.ProcessTransactions(txList, false)
batch := mpi.ProcessTransactions(txList, false, true)
ast.Nil(batch)
nonce = mpi.GetPendingNonceByAccount(account1.String())
ast.Equal(uint64(3), nonce)
@ -74,7 +75,7 @@ func TestCommitTransactions(t *testing.T) {
tx4 := constructTx(uint64(4), &privKey2)
var txList []*pb.Transaction
txList = append(txList, tx1, tx2, tx3, tx4)
batch := mpi.ProcessTransactions(txList, true)
batch := mpi.ProcessTransactions(txList, true, true)
ast.Nil(batch)
ast.Equal(3, mpi.txStore.priorityIndex.size())
ast.Equal(1, mpi.txStore.parkingLotIndex.size())
@ -85,17 +86,17 @@ func TestCommitTransactions(t *testing.T) {
tx5 := constructTx(uint64(2), &privKey2)
txList = []*pb.Transaction{}
txList = append(txList, tx5)
batch = mpi.ProcessTransactions(txList, true)
batch = mpi.ProcessTransactions(txList, true, true)
ast.Equal(4, len(batch.TxList))
ast.Equal(4, mpi.txStore.priorityIndex.size())
ast.Equal(1, mpi.txStore.parkingLotIndex.size())
ast.Equal(uint64(2), mpi.batchSeqNo)
ast.Equal(uint64(2), mpi.batchSeqNo)
var txHashList []*types.Hash
txHashList = append(txHashList, tx1.TransactionHash, tx2.TransactionHash, tx3.TransactionHash, tx5.TransactionHash)
state := &ChainState{
TxHashList: txHashList,
Height: uint64(2),
Height: uint64(2),
}
mpi.CommitTransactions(state)
time.Sleep(100 * time.Millisecond)
@ -126,7 +127,7 @@ func TestProcessTransactions(t *testing.T) {
tx4 := constructTx(uint64(2), &privKey2)
tx5 := constructTx(uint64(4), &privKey2)
txList = append(txList, tx1, tx2, tx3, tx4, tx5)
batch := mpi.ProcessTransactions(txList, false)
batch := mpi.ProcessTransactions(txList, false, true)
ast.Nil(batch)
ast.Equal(4, mpi.txStore.priorityIndex.size())
ast.Equal(1, mpi.txStore.parkingLotIndex.size())
@ -143,7 +144,7 @@ func TestProcessTransactions(t *testing.T) {
tx7 := constructTx(uint64(5), &privKey2)
txList = make([]*pb.Transaction, 0)
txList = append(txList, tx6, tx7)
batch = mpi.ProcessTransactions(txList, true)
batch = mpi.ProcessTransactions(txList, true, true)
ast.Equal(4, len(batch.TxList))
ast.Equal(uint64(2), batch.Height)
ast.Equal(uint64(1), mpi.txStore.priorityNonBatchSize)
@ -156,7 +157,6 @@ func TestProcessTransactions(t *testing.T) {
ast.Equal(uint64(3), mpi.txStore.nonceCache.getPendingNonce(account2.String()))
}
func TestForward(t *testing.T) {
ast := assert.New(t)
mpi, _ := mockMempoolImpl()
@ -169,7 +169,7 @@ func TestForward(t *testing.T) {
tx4 := constructTx(uint64(4), &privKey1)
tx5 := constructTx(uint64(6), &privKey1)
txList = append(txList, tx1, tx2, tx3, tx4, tx5)
batch := mpi.ProcessTransactions(txList, false)
batch := mpi.ProcessTransactions(txList, false, true)
ast.Nil(batch)
list := mpi.txStore.allTxs[account1.String()]
ast.Equal(5, list.index.size())
@ -181,4 +181,189 @@ func TestForward(t *testing.T) {
ast.Equal(2, len(removeList[account1.String()]))
ast.Equal(uint64(1), removeList[account1.String()][0].Nonce)
ast.Equal(uint64(2), removeList[account1.String()][1].Nonce)
}
}
func TestUnorderedIncomingTxs(t *testing.T) {
ast := assert.New(t)
mpi, _ := mockMempoolImpl()
mpi.batchSize = 5
txList := make([]*pb.Transaction, 0)
privKey1 := genPrivKey()
account1, _ := privKey1.PublicKey().Address()
privKey2 := genPrivKey()
account2, _ := privKey2.PublicKey().Address()
nonceArr := []uint64{4, 2, 1}
for _, i := range nonceArr {
tx1 := constructTx(i, &privKey1)
tx2 := constructTx(i, &privKey2)
txList = append(txList, tx1, tx2)
}
batch := mpi.ProcessTransactions(txList, true, true)
ast.Nil(batch) // not enough for 5 txs to generate batch
ast.Equal(4, mpi.txStore.priorityIndex.size())
ast.Equal(2, mpi.txStore.parkingLotIndex.size())
ast.Equal(6, len(mpi.txStore.txHashMap))
ast.Equal(3, mpi.txStore.allTxs[account1.String()].index.size())
ast.Equal(3, mpi.txStore.allTxs[account2.String()].index.size())
ast.Equal(uint64(1), mpi.txStore.nonceCache.getCommitNonce(account1.String()))
ast.Equal(uint64(3), mpi.txStore.nonceCache.getPendingNonce(account1.String()))
ast.Equal(uint64(1), mpi.txStore.nonceCache.getCommitNonce(account2.String()))
ast.Equal(uint64(3), mpi.txStore.nonceCache.getPendingNonce(account2.String()))
tx7 := constructTx(uint64(3), &privKey1)
tx8 := constructTx(uint64(5), &privKey2)
txList = make([]*pb.Transaction, 0)
txList = append(txList, tx7, tx8)
batch = mpi.ProcessTransactions(txList, true, true)
ast.NotNil(batch)
var hashes []types.Hash
// this batch will contain tx{1,2,3} for privKey1 and tx{1,2} for privKey2
ast.Equal(5, len(batch.TxList))
ast.Equal(uint64(2), batch.Height)
ast.Equal(uint64(1), mpi.txStore.priorityNonBatchSize)
ast.Equal(6, mpi.txStore.priorityIndex.size())
ast.Equal(3, mpi.txStore.parkingLotIndex.size(), "delete parkingLot until finishing executor")
ast.Equal(8, len(mpi.txStore.txHashMap))
ast.Equal(4, mpi.txStore.allTxs[account1.String()].index.size())
ast.Equal(4, mpi.txStore.allTxs[account2.String()].index.size())
ast.Equal(uint64(5), mpi.txStore.nonceCache.getPendingNonce(account1.String()))
ast.Equal(uint64(3), mpi.txStore.nonceCache.getPendingNonce(account2.String()))
// process committed txs
hashList := make([]*types.Hash, 0, len(hashes))
for _, tx := range batch.TxList {
hashList = append(hashList, tx.Hash())
}
ready := &ChainState{
TxHashList: hashList,
Height: 2,
}
mpi.processCommitTransactions(ready)
time.Sleep(100 * time.Millisecond)
ast.Equal(uint64(1), mpi.txStore.priorityNonBatchSize)
ast.Equal(1, mpi.txStore.priorityIndex.size())
ast.Equal(3, mpi.txStore.parkingLotIndex.size(), "delete parkingLot until finishing executor")
ast.Equal(3, len(mpi.txStore.txHashMap))
ast.Equal(1, mpi.txStore.allTxs[account1.String()].index.size())
ast.Equal(2, mpi.txStore.allTxs[account2.String()].index.size())
ast.Equal(uint64(5), mpi.txStore.nonceCache.getPendingNonce(account1.String()))
ast.Equal(uint64(3), mpi.txStore.nonceCache.getCommitNonce(account2.String()))
ast.Equal(uint64(3), mpi.txStore.nonceCache.getPendingNonce(account2.String()))
// generate block3
txList = make([]*pb.Transaction, 0)
account1NonceArr2 := []uint64{5, 6}
for _, i := range account1NonceArr2 {
tx1 := constructTx(i, &privKey1)
txList = append(txList, tx1)
}
account2NonceArr2 := []uint64{3, 6}
for _, i := range account2NonceArr2 {
tx1 := constructTx(i, &privKey2)
txList = append(txList, tx1)
}
batch = mpi.ProcessTransactions(txList, true, true)
ast.NotNil(batch)
ast.Equal(uint64(2), mpi.txStore.priorityNonBatchSize)
ast.Equal(7, mpi.txStore.priorityIndex.size())
ast.Equal(3, mpi.txStore.parkingLotIndex.size(), "delete parkingLot until finishing executor")
ast.Equal(7, len(mpi.txStore.txHashMap))
ast.Equal(3, mpi.txStore.allTxs[account1.String()].index.size())
ast.Equal(4, mpi.txStore.allTxs[account2.String()].index.size())
ast.Equal(uint64(4), mpi.txStore.nonceCache.getCommitNonce(account1.String()))
ast.Equal(uint64(7), mpi.txStore.nonceCache.getPendingNonce(account1.String()))
ast.Equal(uint64(3), mpi.txStore.nonceCache.getCommitNonce(account2.String()))
ast.Equal(uint64(7), mpi.txStore.nonceCache.getPendingNonce(account2.String()))
}
func TestGetTimeoutTransaction(t *testing.T) {
ast := assert.New(t)
mpi, _ := mockMempoolImpl()
mpi.txSliceSize = 3
allTxHashes := make([]*types.Hash, 0)
txList := make([]*pb.Transaction, 0)
privKey1 := genPrivKey()
account1, _ := privKey1.PublicKey().Address()
privKey2 := genPrivKey()
account2, _ := privKey2.PublicKey().Address()
nonceArr := []uint64{4, 5}
for _, i := range nonceArr {
tx1 := constructTx(i, &privKey1)
tx2 := constructTx(i, &privKey2)
txList = append(txList, tx1, tx2)
allTxHashes = append(allTxHashes, tx1.Hash(), tx2.Hash())
}
batch := mpi.ProcessTransactions(txList, true, true)
ast.Nil(batch)
// set another incoming list for timeout
time.Sleep(150 * time.Millisecond)
nonceArr = []uint64{1, 2}
for _, i := range nonceArr {
tx1 := constructTx(i, &privKey1)
tx2 := constructTx(i, &privKey2)
txList = append(txList, tx1, tx2)
allTxHashes = append(allTxHashes, tx1.Hash(), tx2.Hash())
}
batch = mpi.ProcessTransactions(txList, true, false)
ast.NotNil(batch)
// tx1,tx2 for account1 and account2 will be batched.
// all txs for account1 and account2 will be in priorityIndex.
// And tx4,tx5 for account1 and account2 will be timeout while tx3 will not
ast.Equal(4, mpi.txStore.priorityIndex.size())
ast.Equal(4, mpi.txStore.parkingLotIndex.size())
ast.Equal(8, len(mpi.txStore.txHashMap))
ast.Equal(4, mpi.txStore.allTxs[account1.String()].index.size())
ast.Equal(4, mpi.txStore.allTxs[account2.String()].index.size())
ast.Equal(uint64(1), mpi.txStore.nonceCache.getCommitNonce(account1.String()))
ast.Equal(uint64(3), mpi.txStore.nonceCache.getPendingNonce(account1.String()))
ast.Equal(uint64(1), mpi.txStore.nonceCache.getCommitNonce(account2.String()))
ast.Equal(uint64(3), mpi.txStore.nonceCache.getPendingNonce(account2.String()))
tx1 := constructTx(3, &privKey1)
tx2 := constructTx(3, &privKey2)
txList = append(txList, tx1, tx2)
allTxHashes = append(allTxHashes, tx1.Hash(), tx2.Hash())
batch = mpi.ProcessTransactions(txList, true, true)
ast.NotNil(batch)
// tx4,tx5 should be timeout
timeoutList := mpi.GetTimeoutTransactions(100 * time.Millisecond)
ast.NotNil(timeoutList)
ast.Equal(2, len(timeoutList))
ast.Equal(3, len(timeoutList[0]))
ast.Equal(1, len(timeoutList[1]))
ast.Equal(6, mpi.txStore.ttlIndex.index.Len())
// wait another 150 millisecond, tx3 be timeout too.
// though tx1,tx2 has wait a long time, they are not local and they won't be broadcast
time.Sleep(150 * time.Millisecond)
timeoutList = mpi.GetTimeoutTransactions(100 * time.Millisecond)
ast.NotNil(timeoutList)
ast.Equal(2, len(timeoutList))
ast.Equal(3, len(timeoutList[0]))
ast.Equal(3, len(timeoutList[1]))
ast.Equal(6, mpi.txStore.ttlIndex.index.Len())
// check if all indices are normally cleaned after commit
ast.Equal(10, len(allTxHashes))
state := &ChainState{
TxHashList: allTxHashes,
Height: uint64(2),
}
mpi.CommitTransactions(state)
time.Sleep(100 * time.Millisecond)
ast.Equal(0, mpi.txStore.ttlIndex.index.Len())
ast.Equal(0, len(mpi.txStore.ttlIndex.items))
ast.Equal(int64(math.MaxInt64), mpi.txStore.earliestTimestamp)
ast.Equal(0, mpi.txStore.priorityIndex.size())
ast.Equal(0, mpi.txStore.parkingLotIndex.size())
ast.Equal(0, len(mpi.txStore.batchedTxs))
ast.Equal(0, len(mpi.txStore.txHashMap))
ast.Equal(uint64(0), mpi.txStore.priorityNonBatchSize)
}

View File

@ -1,9 +1,11 @@
package mempool
import (
"math"
"sync"
"github.com/google/btree"
"github.com/meshplus/bitxhub-model/pb"
"sync"
)
type transactionStore struct {
@ -13,6 +15,10 @@ type transactionStore struct {
allTxs map[string]*txSortedMap
// track the commit nonce and pending nonce of each account.
nonceCache *nonceCache
// keep track of the latest timestamp of ready txs in ttlIndex
earliestTimestamp int64
// keep track of the livetime of ready txs in priorityIndex
ttlIndex *txLiveTimeMap
// keeps track of "non-ready" txs (txs that can't be included in next block)
// only used to help remove some txs if pool is full.
parkingLotIndex *btreeIndex
@ -31,11 +37,12 @@ func newTransactionStore() *transactionStore {
batchedTxs: make(map[orderedIndexKey]bool),
parkingLotIndex: newBtreeIndex(),
priorityIndex: newBtreeIndex(),
ttlIndex: newTxLiveTimeMap(),
nonceCache: newNonceCache(),
}
}
func (txStore *transactionStore) insertTxs(txs map[string][]*pb.Transaction) map[string]bool {
func (txStore *transactionStore) insertTxs(txs map[string][]*pb.Transaction, isLocal bool) map[string]bool {
dirtyAccounts := make(map[string]bool)
for account, list := range txs {
for _, tx := range list {
@ -54,9 +61,14 @@ func (txStore *transactionStore) insertTxs(txs map[string][]*pb.Transaction) map
txItem := &txItem{
account: account,
tx: tx,
local: isLocal,
}
txList.items[tx.Nonce] = txItem
txList.index.insertBySortedNonceKey(tx)
if isLocal {
// no need to rebroadcast tx from other nodes to reduce network overhead
txStore.ttlIndex.insertByTtlKey(account, tx.Nonce, tx.Timestamp)
}
}
dirtyAccounts[account] = true
}
@ -75,6 +87,16 @@ func (txStore *transactionStore) getTxByOrderKey(account string, seqNo uint64) *
return nil
}
func (txStore *transactionStore) updateEarliestTimestamp() {
// find the earliest tx in ttlIndex
earliestTime := int64(math.MaxInt64)
latestItem := txStore.ttlIndex.index.Min()
if latestItem != nil {
earliestTime = latestItem.(*orderedTimeoutKey).timestamp
}
txStore.earliestTimestamp = earliestTime
}
type txSortedMap struct {
items map[uint64]*txItem // map nonce to transaction
index *btreeIndex // index for items
@ -134,7 +156,7 @@ type nonceCache struct {
// pendingNonces records each account's latest nonce which has been included in
// priority queue. Invariant: pendingNonces[account] >= commitNonces[account]
pendingNonces map[string]uint64
pendingMu sync.RWMutex
pendingMu sync.RWMutex
}
func newNonceCache() *nonceCache {
@ -171,3 +193,41 @@ func (nc *nonceCache) setPendingNonce(account string, nonce uint64) {
defer nc.pendingMu.Unlock()
nc.pendingNonces[account] = nonce
}
// since the live time field in sortedTtlKey may vary during process
// we need to track the latest live time since its latest broadcast.
type txLiveTimeMap struct {
items map[string]int64 // map account to its latest live time
index *btree.BTree // index for txs
}
func newTxLiveTimeMap() *txLiveTimeMap {
return &txLiveTimeMap{
index: btree.New(btreeDegree),
items: make(map[string]int64),
}
}
func (tlm *txLiveTimeMap) insertByTtlKey(account string, nonce uint64, liveTime int64) {
tlm.index.ReplaceOrInsert(&orderedTimeoutKey{account, nonce, liveTime})
tlm.items[makeAccountNonceKey(account, nonce)] = liveTime
}
func (tlm *txLiveTimeMap) removeByTtlKey(txs map[string][]*pb.Transaction) {
for account, list := range txs {
for _, tx := range list {
liveTime, ok := tlm.items[makeAccountNonceKey(account, tx.Nonce)]
if !ok {
continue
}
tlm.index.Delete(&orderedTimeoutKey{account, tx.Nonce, liveTime})
delete(tlm.items, makeAccountNonceKey(account, tx.Nonce))
}
}
}
func (tlm *txLiveTimeMap) updateByTtlKey(originalKey *orderedTimeoutKey, newTime int64) {
tlm.index.Delete(originalKey)
delete(tlm.items, makeAccountNonceKey(originalKey.account, originalKey.nonce))
tlm.insertByTtlKey(originalKey.account, originalKey.nonce, newTime)
}

View File

@ -17,15 +17,16 @@ const (
DefaultTxCacheSize = 10000
DefaultBatchSize = 500
DefaultTxSetSize = 10
DefaultTxSetTick = 100 * time.Millisecond
DefaultTxSetTick = 100 * time.Millisecond
)
type Config struct {
ID uint64
BatchSize uint64
PoolSize uint64
TxSliceSize uint64
TxSliceTimeout time.Duration
ID uint64
BatchSize uint64
PoolSize uint64
RebroadcastTimeout time.Duration
TxSliceSize uint64
TxSliceTimeout time.Duration
ChainHeight uint64
Logger logrus.FieldLogger
}
@ -33,10 +34,11 @@ type Config struct {
type txItem struct {
account string
tx *pb.Transaction
local bool
}
type ChainState struct {
Height uint64
BlockHash *types.Hash
Height uint64
BlockHash *types.Hash
TxHashList []*types.Hash
}
}

View File

@ -179,7 +179,7 @@ func (n *Node) listenReadyBlock() {
if !n.batchMgr.IsBatchTimerActive() {
n.batchMgr.StartBatchTimer()
}
if batch := n.mempool.ProcessTransactions(txSet.TxList, true); batch != nil {
if batch := n.mempool.ProcessTransactions(txSet.TxList, true, true); batch != nil {
n.batchMgr.StopBatchTimer()
n.proposeC <- batch
}

View File

@ -10,6 +10,7 @@ import (
peer "github.com/libp2p/go-libp2p-core/peer"
pb "github.com/meshplus/bitxhub-model/pb"
events "github.com/meshplus/bitxhub/internal/model/events"
peermgr "github.com/meshplus/bitxhub/pkg/peermgr"
network "github.com/meshplus/go-lightp2p"
reflect "reflect"
)
@ -20,10 +21,6 @@ type MockPeerManager struct {
recorder *MockPeerManagerMockRecorder
}
func (m *MockPeerManager) CountConnectedPeers() uint64 {
return 0
}
// MockPeerManagerMockRecorder is the mock recorder for MockPeerManager
type MockPeerManagerMockRecorder struct {
mock *MockPeerManager
@ -231,3 +228,69 @@ func (mr *MockPeerManagerMockRecorder) Disconnect(vpInfos interface{}) *gomock.C
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Disconnect", reflect.TypeOf((*MockPeerManager)(nil).Disconnect), vpInfos)
}
// PierManager mocks base method
func (m *MockPeerManager) PierManager() peermgr.PierManager {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "PierManager")
ret0, _ := ret[0].(peermgr.PierManager)
return ret0
}
// PierManager indicates an expected call of PierManager
func (mr *MockPeerManagerMockRecorder) PierManager() *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PierManager", reflect.TypeOf((*MockPeerManager)(nil).PierManager))
}
// MockPierManager is a mock of PierManager interface
type MockPierManager struct {
ctrl *gomock.Controller
recorder *MockPierManagerMockRecorder
}
// MockPierManagerMockRecorder is the mock recorder for MockPierManager
type MockPierManagerMockRecorder struct {
mock *MockPierManager
}
// NewMockPierManager creates a new mock instance
func NewMockPierManager(ctrl *gomock.Controller) *MockPierManager {
mock := &MockPierManager{ctrl: ctrl}
mock.recorder = &MockPierManagerMockRecorder{mock}
return mock
}
// EXPECT returns an object that allows the caller to indicate expected use
func (m *MockPierManager) EXPECT() *MockPierManagerMockRecorder {
return m.recorder
}
// Piers mocks base method
func (m *MockPierManager) Piers() *peermgr.Piers {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "Piers")
ret0, _ := ret[0].(*peermgr.Piers)
return ret0
}
// Piers indicates an expected call of Piers
func (mr *MockPierManagerMockRecorder) Piers() *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Piers", reflect.TypeOf((*MockPierManager)(nil).Piers))
}
// AskPierMaster mocks base method
func (m *MockPierManager) AskPierMaster(arg0 string) (bool, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "AskPierMaster", arg0)
ret0, _ := ret[0].(bool)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// AskPierMaster indicates an expected call of AskPierMaster
func (mr *MockPierManagerMockRecorder) AskPierMaster(arg0 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AskPierMaster", reflect.TypeOf((*MockPierManager)(nil).AskPierMaster), arg0)
}