diff --git a/pkg/order/mempool/mempool_impl.go b/pkg/order/mempool/mempool_impl.go index 755c5c2..fbbb7bb 100644 --- a/pkg/order/mempool/mempool_impl.go +++ b/pkg/order/mempool/mempool_impl.go @@ -146,10 +146,6 @@ func (mpi *mempoolImpl) generateBlock() (*raftproto.RequestBatch, error) { if txSeq >= 1 { _, seenPrevious = mpi.txStore.batchedTxs[orderedIndexKey{account: tx.account, nonce: txSeq - 1}] } - if txSeq == 3 { - mpi.logger.Infof("seenPrevious %v and commitNonce is %d", seenPrevious, commitNonce) - mpi.logger.Infof("batched txs is %v", mpi.txStore.batchedTxs) - } // include transaction if it's "next" for given account or // we've already sent its ancestor to Consensus if seenPrevious || (txSeq == commitNonce) { @@ -303,8 +299,7 @@ func (mpi *mempoolImpl) GetTimeoutTransactions(rebroadcastDuration time.Duration }) for _, item := range timeoutItems { // update the liveTime of timeout txs - item.timestamp = currentTime - mpi.txStore.ttlIndex.items[makeAccountNonceKey(item.account, item.nonce)] = currentTime + mpi.txStore.ttlIndex.updateByTtlKey(item, currentTime) } // shard txList into fixed size in case txList is too large to broadcast one time return mpi.shardTxList(timeoutItems, mpi.txSliceSize) diff --git a/pkg/order/mempool/mempool_test.go b/pkg/order/mempool/mempool_test.go index b1993fc..ef512bd 100644 --- a/pkg/order/mempool/mempool_test.go +++ b/pkg/order/mempool/mempool_test.go @@ -1,7 +1,7 @@ package mempool import ( - "fmt" + "math" "testing" "time" @@ -19,12 +19,6 @@ func TestGetBlock(t *testing.T) { tx2 := constructTx(uint64(2), &privKey1) tx3 := constructTx(uint64(2), &privKey2) tx4 := constructTx(uint64(4), &privKey2) - from1, err := privKey1.PublicKey().Address() - ast.Nil(err) - fmt.Printf("from 1 is %s\n", from1) - from2, err := privKey2.PublicKey().Address() - ast.Nil(err) - fmt.Printf("from 2 is %s\n", from2) var txList []*pb.Transaction txList = append(txList, tx1, tx2, tx3, tx4) // mock follower @@ -290,6 +284,7 @@ func TestGetTimeoutTransaction(t *testing.T) { ast := assert.New(t) mpi, _ := mockMempoolImpl() mpi.txSliceSize = 3 + allTxHashes := make([]*types.Hash, 0) txList := make([]*pb.Transaction, 0) privKey1 := genPrivKey() @@ -301,6 +296,7 @@ func TestGetTimeoutTransaction(t *testing.T) { tx1 := constructTx(i, &privKey1) tx2 := constructTx(i, &privKey2) txList = append(txList, tx1, tx2) + allTxHashes = append(allTxHashes, tx1.Hash(), tx2.Hash()) } batch := mpi.ProcessTransactions(txList, true, true) ast.Nil(batch) @@ -312,6 +308,7 @@ func TestGetTimeoutTransaction(t *testing.T) { tx1 := constructTx(i, &privKey1) tx2 := constructTx(i, &privKey2) txList = append(txList, tx1, tx2) + allTxHashes = append(allTxHashes, tx1.Hash(), tx2.Hash()) } batch = mpi.ProcessTransactions(txList, true, false) ast.NotNil(batch) @@ -331,6 +328,7 @@ func TestGetTimeoutTransaction(t *testing.T) { tx1 := constructTx(3, &privKey1) tx2 := constructTx(3, &privKey2) txList = append(txList, tx1, tx2) + allTxHashes = append(allTxHashes, tx1.Hash(), tx2.Hash()) batch = mpi.ProcessTransactions(txList, true, true) ast.NotNil(batch) @@ -340,6 +338,7 @@ func TestGetTimeoutTransaction(t *testing.T) { ast.Equal(2, len(timeoutList)) ast.Equal(3, len(timeoutList[0])) ast.Equal(1, len(timeoutList[1])) + ast.Equal(6, mpi.txStore.ttlIndex.index.Len()) // wait another 150 millisecond, tx3 be timeout too. // though tx1,tx2 has wait a long time, they are not local and they won't be broadcast @@ -349,4 +348,22 @@ func TestGetTimeoutTransaction(t *testing.T) { ast.Equal(2, len(timeoutList)) ast.Equal(3, len(timeoutList[0])) ast.Equal(3, len(timeoutList[1])) + ast.Equal(6, mpi.txStore.ttlIndex.index.Len()) + + // check if all indices are normally cleaned after commit + ast.Equal(10, len(allTxHashes)) + state := &ChainState{ + TxHashList: allTxHashes, + Height: uint64(2), + } + mpi.CommitTransactions(state) + time.Sleep(100 * time.Millisecond) + ast.Equal(0, mpi.txStore.ttlIndex.index.Len()) + ast.Equal(0, len(mpi.txStore.ttlIndex.items)) + ast.Equal(int64(math.MaxInt64), mpi.txStore.earliestTimestamp) + ast.Equal(0, mpi.txStore.priorityIndex.size()) + ast.Equal(0, mpi.txStore.parkingLotIndex.size()) + ast.Equal(0, len(mpi.txStore.batchedTxs)) + ast.Equal(0, len(mpi.txStore.txHashMap)) + ast.Equal(uint64(0), mpi.txStore.priorityNonBatchSize) } diff --git a/pkg/order/mempool/tx_store.go b/pkg/order/mempool/tx_store.go index 6dde2bd..e9419b7 100644 --- a/pkg/order/mempool/tx_store.go +++ b/pkg/order/mempool/tx_store.go @@ -218,10 +218,16 @@ func (tlm *txLiveTimeMap) removeByTtlKey(txs map[string][]*pb.Transaction) { for _, tx := range list { liveTime, ok := tlm.items[makeAccountNonceKey(account, tx.Nonce)] if !ok { - return + continue } tlm.index.Delete(&orderedTimeoutKey{account, tx.Nonce, liveTime}) delete(tlm.items, makeAccountNonceKey(account, tx.Nonce)) } } } + +func (tlm *txLiveTimeMap) updateByTtlKey(originalKey *orderedTimeoutKey, newTime int64) { + tlm.index.Delete(originalKey) + delete(tlm.items, makeAccountNonceKey(originalKey.account, originalKey.nonce)) + tlm.insertByTtlKey(originalKey.account, originalKey.nonce, newTime) +}