2020-09-24 15:11:33 +08:00
package mempool
import (
2021-01-12 14:40:48 +08:00
"sync"
2020-12-16 16:25:29 +08:00
"github.com/google/btree"
2020-09-24 15:11:33 +08:00
"github.com/meshplus/bitxhub-model/pb"
raftproto "github.com/meshplus/bitxhub/pkg/order/etcdraft/proto"
"github.com/sirupsen/logrus"
)
type mempoolImpl struct {
localID uint64
batchSize uint64
batchSeqNo uint64 // track the sequence number of block
2020-12-16 16:25:29 +08:00
poolSize uint64
2020-09-24 15:11:33 +08:00
logger logrus . FieldLogger
2020-12-16 16:25:29 +08:00
txStore * transactionStore // store all transactions info
2020-09-24 15:11:33 +08:00
}
2020-12-16 16:25:29 +08:00
func newMempoolImpl ( config * Config ) * mempoolImpl {
2020-09-24 15:11:33 +08:00
mpi := & mempoolImpl {
2020-12-16 16:25:29 +08:00
localID : config . ID ,
batchSeqNo : config . ChainHeight ,
logger : config . Logger ,
2020-09-24 15:11:33 +08:00
}
mpi . txStore = newTransactionStore ( )
if config . BatchSize == 0 {
mpi . batchSize = DefaultBatchSize
} else {
mpi . batchSize = config . BatchSize
}
2020-12-16 16:25:29 +08:00
if config . PoolSize == 0 {
mpi . poolSize = DefaultPoolSize
2020-09-24 15:11:33 +08:00
} else {
2020-12-16 16:25:29 +08:00
mpi . poolSize = config . PoolSize
2020-09-24 15:11:33 +08:00
}
2020-12-16 16:25:29 +08:00
mpi . logger . Infof ( "MemPool batch size = %d" , mpi . batchSize )
mpi . logger . Infof ( "MemPool batch seqNo = %d" , mpi . batchSeqNo )
mpi . logger . Infof ( "MemPool pool size = %d" , mpi . poolSize )
2020-09-24 15:11:33 +08:00
return mpi
}
2020-12-16 16:25:29 +08:00
func ( mpi * mempoolImpl ) ProcessTransactions ( txs [ ] * pb . Transaction , isLeader bool ) * raftproto . RequestBatch {
2020-09-24 15:11:33 +08:00
validTxs := make ( map [ string ] [ ] * pb . Transaction )
for _ , tx := range txs {
// check the sequence number of tx
2020-10-22 16:43:43 +08:00
txAccount := tx . Account ( )
2020-09-24 15:11:33 +08:00
currentSeqNo := mpi . txStore . nonceCache . getPendingNonce ( txAccount )
if tx . Nonce < currentSeqNo {
2020-09-27 17:03:17 +08:00
mpi . logger . Warningf ( "Account %s, current sequence number is %d, required %d" , txAccount , tx . Nonce , currentSeqNo + 1 )
2020-09-25 16:18:28 +08:00
continue
2020-09-24 15:11:33 +08:00
}
// check the existence of hash of this tx
2020-10-21 22:18:18 +08:00
txHash := tx . TransactionHash . String ( )
2020-09-24 15:11:33 +08:00
if txPointer := mpi . txStore . txHashMap [ txHash ] ; txPointer != nil {
2020-10-21 11:06:17 +08:00
mpi . logger . Warningf ( "Tx [account: %s, nonce: %d, hash: %s] already received" , txAccount , tx . Nonce , txHash )
2020-09-24 15:11:33 +08:00
continue
}
2020-09-30 19:19:38 +08:00
_ , ok := validTxs [ txAccount ]
2020-09-24 15:11:33 +08:00
if ! ok {
validTxs [ txAccount ] = make ( [ ] * pb . Transaction , 0 )
}
validTxs [ txAccount ] = append ( validTxs [ txAccount ] , tx )
}
// Process all the new transaction and merge any errors into the original slice
2020-10-21 11:06:17 +08:00
dirtyAccounts := mpi . txStore . insertTxs ( validTxs )
2020-09-24 15:11:33 +08:00
// send tx to mempool store
mpi . processDirtyAccount ( dirtyAccounts )
2020-12-16 16:25:29 +08:00
// generator batch by block size
if isLeader && mpi . txStore . priorityNonBatchSize >= mpi . batchSize {
batch , err := mpi . generateBlock ( )
if err != nil {
mpi . logger . Errorf ( "Generator batch failed" )
return nil
2020-09-24 15:11:33 +08:00
}
2020-12-16 16:25:29 +08:00
return batch
2020-09-24 15:11:33 +08:00
}
return nil
}
func ( mpi * mempoolImpl ) processDirtyAccount ( dirtyAccounts map [ string ] bool ) {
for account := range dirtyAccounts {
if list , ok := mpi . txStore . allTxs [ account ] ; ok {
// search for related sequential txs in allTxs
// and add these txs into priorityIndex and parkingLotIndex
pendingNonce := mpi . txStore . nonceCache . getPendingNonce ( account )
readyTxs , nonReadyTxs , nextDemandNonce := list . filterReady ( pendingNonce )
mpi . txStore . nonceCache . setPendingNonce ( account , nextDemandNonce )
// inset ready txs into priorityIndex.
for _ , tx := range readyTxs {
mpi . txStore . priorityIndex . insertByOrderedQueueKey ( account , tx )
}
mpi . txStore . priorityNonBatchSize = mpi . txStore . priorityNonBatchSize + uint64 ( len ( readyTxs ) )
// inset non-ready txs into parkingLotIndex.
for _ , tx := range nonReadyTxs {
mpi . txStore . parkingLotIndex . insertByOrderedQueueKey ( account , tx )
}
}
}
}
// getBlock fetches next block of transactions for consensus,
// batchedTx are all txs sent to consensus but were not committed yet, mempool should filter out such txs.
2020-12-16 16:25:29 +08:00
func ( mpi * mempoolImpl ) generateBlock ( ) ( * raftproto . RequestBatch , error ) {
2020-09-24 15:11:33 +08:00
// txs has lower nonce will be observed first in priority index iterator.
2020-12-16 16:25:29 +08:00
mpi . logger . Debugf ( "Length of non-batched transactions: %d" , mpi . txStore . priorityNonBatchSize )
var batchSize uint64
if poolLen := mpi . txStore . priorityNonBatchSize ; poolLen > mpi . batchSize {
batchSize = mpi . batchSize
} else {
batchSize = mpi . txStore . priorityNonBatchSize
}
skippedTxs := make ( map [ orderedIndexKey ] bool )
result := make ( [ ] orderedIndexKey , 0 , mpi . batchSize )
2020-09-24 15:11:33 +08:00
mpi . txStore . priorityIndex . data . Ascend ( func ( a btree . Item ) bool {
tx := a . ( * orderedIndexKey )
// if tx has existed in bathedTxs,
if _ , ok := mpi . txStore . batchedTxs [ orderedIndexKey { tx . account , tx . nonce } ] ; ok {
return true
}
txSeq := tx . nonce
commitNonce := mpi . txStore . nonceCache . getCommitNonce ( tx . account )
var seenPrevious bool
if txSeq >= 1 {
_ , seenPrevious = mpi . txStore . batchedTxs [ orderedIndexKey { account : tx . account , nonce : txSeq - 1 } ]
}
// include transaction if it's "next" for given account or
// we've already sent its ancestor to Consensus
if seenPrevious || ( txSeq == commitNonce ) {
ptr := orderedIndexKey { account : tx . account , nonce : tx . nonce }
mpi . txStore . batchedTxs [ ptr ] = true
result = append ( result , ptr )
2020-12-16 16:25:29 +08:00
if uint64 ( len ( result ) ) == batchSize {
2020-09-24 15:11:33 +08:00
return false
}
2020-12-16 16:25:29 +08:00
// check if we can now include some txs that were skipped before for given account
skippedTxn := orderedIndexKey { account : tx . account , nonce : tx . nonce + 1 }
for {
if _ , ok := skippedTxs [ skippedTxn ] ; ! ok {
break
}
2021-01-12 14:40:48 +08:00
mpi . txStore . batchedTxs [ skippedTxn ] = true
2020-12-16 16:25:29 +08:00
result = append ( result , skippedTxn )
if uint64 ( len ( result ) ) == batchSize {
return false
}
skippedTxn . nonce ++
}
} else {
skippedTxs [ orderedIndexKey { tx . account , tx . nonce } ] = true
2020-09-24 15:11:33 +08:00
}
return true
} )
2020-12-16 16:25:29 +08:00
if len ( result ) == 0 && mpi . txStore . priorityNonBatchSize > 0 {
mpi . logger . Error ( "===== NOTE!!! Leader generate a batch with 0 txs" )
mpi . txStore . priorityNonBatchSize = 0
return nil , nil
}
2020-09-24 15:11:33 +08:00
// convert transaction pointers to real values
txList := make ( [ ] * pb . Transaction , len ( result ) )
for i , v := range result {
rawTransaction := mpi . txStore . getTxByOrderKey ( v . account , v . nonce )
txList [ i ] = rawTransaction
}
2020-12-16 16:25:29 +08:00
mpi . batchSeqNo ++
batchSeqNo := mpi . batchSeqNo
batch := & raftproto . RequestBatch {
TxList : txList ,
Height : batchSeqNo ,
2020-09-24 15:11:33 +08:00
}
2020-12-16 16:25:29 +08:00
if mpi . txStore . priorityNonBatchSize >= uint64 ( len ( txList ) ) {
mpi . txStore . priorityNonBatchSize = mpi . txStore . priorityNonBatchSize - uint64 ( len ( txList ) )
2020-09-24 15:11:33 +08:00
}
2020-12-16 16:25:29 +08:00
mpi . logger . Debugf ( "Leader generate a batch with %d txs, which height is %d, and now there are %d pending txs." , len ( txList ) , batchSeqNo , mpi . txStore . priorityNonBatchSize )
return batch , nil
2020-09-24 15:11:33 +08:00
}
2020-09-30 19:19:38 +08:00
// processCommitTransactions removes the transactions in ready.
2020-12-16 16:25:29 +08:00
func ( mpi * mempoolImpl ) processCommitTransactions ( state * ChainState ) {
2020-09-24 15:11:33 +08:00
dirtyAccounts := make ( map [ string ] bool )
// update current cached commit nonce for account
2020-12-16 16:25:29 +08:00
updateAccounts := make ( map [ string ] uint64 )
// update current cached commit nonce for account
for _ , txHash := range state . TxHashList {
2020-10-21 22:18:18 +08:00
strHash := txHash . String ( )
txPointer := mpi . txStore . txHashMap [ strHash ]
txPointer , ok := mpi . txStore . txHashMap [ strHash ]
2020-09-27 17:00:47 +08:00
if ! ok {
2020-10-21 22:18:18 +08:00
mpi . logger . Warningf ( "Remove transaction %s failed, Can't find it from txHashMap" , strHash )
2020-09-27 17:00:47 +08:00
continue
}
2020-09-24 15:11:33 +08:00
preCommitNonce := mpi . txStore . nonceCache . getCommitNonce ( txPointer . account )
newCommitNonce := txPointer . nonce + 1
if preCommitNonce < newCommitNonce {
mpi . txStore . nonceCache . setCommitNonce ( txPointer . account , newCommitNonce )
2020-12-16 16:25:29 +08:00
// Note!!! updating pendingNonce to commitNonce for the restart node
pendingNonce := mpi . txStore . nonceCache . getPendingNonce ( txPointer . account )
if pendingNonce < newCommitNonce {
updateAccounts [ txPointer . account ] = newCommitNonce
mpi . txStore . nonceCache . setPendingNonce ( txPointer . account , newCommitNonce )
}
2020-09-24 15:11:33 +08:00
}
2020-10-21 22:18:18 +08:00
delete ( mpi . txStore . txHashMap , strHash )
2020-09-24 15:11:33 +08:00
delete ( mpi . txStore . batchedTxs , * txPointer )
dirtyAccounts [ txPointer . account ] = true
}
// clean related txs info in cache
for account := range dirtyAccounts {
commitNonce := mpi . txStore . nonceCache . getCommitNonce ( account )
if list , ok := mpi . txStore . allTxs [ account ] ; ok {
2020-10-13 20:02:59 +08:00
// remove all previous seq number txs for this account.
2020-09-24 15:11:33 +08:00
removedTxs := list . forward ( commitNonce )
2020-10-13 20:02:59 +08:00
// remove index smaller than commitNonce delete index.
2020-09-24 15:11:33 +08:00
var wg sync . WaitGroup
wg . Add ( 3 )
go func ( ready map [ string ] [ ] * pb . Transaction ) {
defer wg . Done ( )
2020-09-30 19:19:38 +08:00
list . index . removeBySortedNonceKey ( removedTxs )
2020-09-24 15:11:33 +08:00
} ( removedTxs )
go func ( ready map [ string ] [ ] * pb . Transaction ) {
defer wg . Done ( )
mpi . txStore . priorityIndex . removeByOrderedQueueKey ( removedTxs )
} ( removedTxs )
go func ( ready map [ string ] [ ] * pb . Transaction ) {
defer wg . Done ( )
mpi . txStore . parkingLotIndex . removeByOrderedQueueKey ( removedTxs )
} ( removedTxs )
wg . Wait ( )
}
}
2020-12-16 16:25:29 +08:00
readyNum := uint64 ( mpi . txStore . priorityIndex . size ( ) )
// set priorityNonBatchSize to min(nonBatchedTxs, readyNum),
if mpi . txStore . priorityNonBatchSize > readyNum {
mpi . txStore . priorityNonBatchSize = readyNum
2020-09-24 15:11:33 +08:00
}
2020-12-16 16:25:29 +08:00
for account , pendingNonce := range updateAccounts {
mpi . logger . Debugf ( "Account %s update its pendingNonce to %d by commitNonce" , account , pendingNonce )
2020-09-24 15:11:33 +08:00
}
2020-12-16 16:25:29 +08:00
mpi . logger . Debugf ( "Replica %d removes batches in mempool, and now there are %d non-batched txs," +
"priority len: %d, parkingLot len: %d, batchedTx len: %d, txHashMap len: %d" , mpi . localID , mpi . txStore . priorityNonBatchSize ,
mpi . txStore . priorityIndex . size ( ) , mpi . txStore . parkingLotIndex . size ( ) , len ( mpi . txStore . batchedTxs ) , len ( mpi . txStore . txHashMap ) )
2020-09-24 15:11:33 +08:00
}