feat(mempool): add comments and unit tests for mempool, and verify the signature in api module instead of mempool.

This commit is contained in:
Lizen0512 2020-09-30 19:19:38 +08:00
parent eb8baa149a
commit 41141593db
18 changed files with 786 additions and 317 deletions

View File

@ -5,6 +5,8 @@ import (
"fmt" "fmt"
"time" "time"
"github.com/meshplus/bitxhub-kit/crypto"
"github.com/meshplus/bitxhub-kit/crypto/asym"
"github.com/meshplus/bitxhub-kit/types" "github.com/meshplus/bitxhub-kit/types"
"github.com/meshplus/bitxhub-model/pb" "github.com/meshplus/bitxhub-model/pb"
) )
@ -86,12 +88,15 @@ func (cbs *ChainBrokerService) sendTransaction(req *pb.SendTransactionRequest) (
To: req.To, To: req.To,
Timestamp: req.Timestamp, Timestamp: req.Timestamp,
Data: req.Data, Data: req.Data,
Nonce: uint64(req.Nonce), Nonce: req.Nonce,
Signature: req.Signature, Signature: req.Signature,
Extra: req.Extra, Extra: req.Extra,
} }
tx.TransactionHash = tx.Hash() tx.TransactionHash = tx.Hash()
ok, _ := asym.Verify(crypto.Secp256k1, tx.Signature, tx.SignHash().Bytes(), tx.From)
if !ok {
return "", fmt.Errorf("invalid signature")
}
err := cbs.api.Broker().HandleTransaction(tx) err := cbs.api.Broker().HandleTransaction(tx)
if err != nil { if err != nil {
return "", err return "", err
@ -107,7 +112,7 @@ func (cbs *ChainBrokerService) sendView(req *pb.SendTransactionRequest) (*pb.Rec
To: req.To, To: req.To,
Timestamp: req.Timestamp, Timestamp: req.Timestamp,
Data: req.Data, Data: req.Data,
Nonce: uint64(req.Nonce), Nonce: req.Nonce,
Signature: req.Signature, Signature: req.Signature,
Extra: req.Extra, Extra: req.Extra,
} }

View File

@ -9,7 +9,7 @@ solo = false
monitor = 40011 monitor = 40011
[pprof] [pprof]
enable = false enable = true
[monitor] [monitor]
enable = true enable = true

View File

@ -9,7 +9,6 @@ import (
"sync/atomic" "sync/atomic"
"time" "time"
"github.com/cbergoon/merkletree"
"github.com/meshplus/bitxhub-kit/crypto" "github.com/meshplus/bitxhub-kit/crypto"
"github.com/meshplus/bitxhub-kit/crypto/asym" "github.com/meshplus/bitxhub-kit/crypto/asym"
"github.com/meshplus/bitxhub-kit/types" "github.com/meshplus/bitxhub-kit/types"
@ -19,6 +18,8 @@ import (
"github.com/meshplus/bitxhub/pkg/vm" "github.com/meshplus/bitxhub/pkg/vm"
"github.com/meshplus/bitxhub/pkg/vm/boltvm" "github.com/meshplus/bitxhub/pkg/vm/boltvm"
"github.com/meshplus/bitxhub/pkg/vm/wasm" "github.com/meshplus/bitxhub/pkg/vm/wasm"
"github.com/cbergoon/merkletree"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
) )

View File

@ -39,7 +39,7 @@ func makeOrderedIndexKey(account string, tx *pb.Transaction) *orderedIndexKey {
} }
} }
func makeSortedNonceKeyKey(nonce uint64) *sortedNonceKey { func makeSortedNonceKey(nonce uint64) *sortedNonceKey {
return &sortedNonceKey{ return &sortedNonceKey{
nonce: nonce, nonce: nonce,
} }
@ -55,14 +55,14 @@ func newBtreeIndex() *btreeIndex {
} }
} }
func (idx *btreeIndex) insert(tx *pb.Transaction) { func (idx *btreeIndex) insertBySortedNonceKey(tx *pb.Transaction) {
idx.data.ReplaceOrInsert(makeSortedNonceKeyKey(tx.Nonce)) idx.data.ReplaceOrInsert(makeSortedNonceKey(tx.Nonce))
} }
func (idx *btreeIndex) remove(txs map[string][]*pb.Transaction) { func (idx *btreeIndex) removeBySortedNonceKey(txs map[string][]*pb.Transaction) {
for _, list := range txs { for _, list := range txs {
for _, tx := range list { for _, tx := range list {
idx.data.Delete(makeSortedNonceKeyKey(tx.Nonce)) idx.data.Delete(makeSortedNonceKey(tx.Nonce))
} }
} }
} }

View File

@ -17,9 +17,51 @@ func TestLess(t *testing.T) {
tx.Nonce = 2 tx.Nonce = 2
orderedIndexKey1 := makeOrderedIndexKey("bitxhub", tx) orderedIndexKey1 := makeOrderedIndexKey("bitxhub", tx)
isLess := orderedIndexKey.Less(orderedIndexKey1) isLess := orderedIndexKey.Less(orderedIndexKey1)
ast.Equal(true, isLess) ast.Equal(true, isLess, "orderedIndexKey's account is less than orderedIndexKey1")
tx.Nonce = 2
orderedIndexKey2 := makeOrderedIndexKey("account", tx)
isLess = orderedIndexKey.Less(orderedIndexKey2)
ast.Equal(true, isLess, "orderedIndexKey's nonce is less than orderedIndexKey2")
} }
func TestSortedNonceKeyLess(t *testing.T) {
ast := assert.New(t)
sortedNonceKey := makeSortedNonceKey(uint64(1))
sortedNonceKey1 := makeSortedNonceKey(uint64(2))
isLess := sortedNonceKey.Less(sortedNonceKey1)
ast.Equal(true, isLess, "sortedNonceKey's nonce is less than sortedNonceKey1")
}
func TestSortedNonceIndex(t *testing.T) {
ast := assert.New(t)
tx := &pb.Transaction{
Nonce: uint64(1),
}
btreeIndex := newBtreeIndex()
btreeIndex.insertBySortedNonceKey(tx)
ast.Equal(1, btreeIndex.data.Len())
txn := make(map[string][]*pb.Transaction)
list := make([]*pb.Transaction, 1)
list[0] = tx
txn["account"] = list
btreeIndex.removeBySortedNonceKey(txn)
ast.Equal(0, btreeIndex.data.Len())
}
func TestOrderedQueueIndex(t *testing.T) {
ast := assert.New(t)
tx := &pb.Transaction{
Nonce: uint64(1),
}
btreeIndex := newBtreeIndex()
btreeIndex.insertByOrderedQueueKey("account", tx)
ast.Equal(1, btreeIndex.data.Len())
txn := make(map[string][]*pb.Transaction)
list := make([]*pb.Transaction, 1)
list[0] = tx
txn["account"] = list
btreeIndex.removeByOrderedQueueKey(txn)
ast.Equal(0, btreeIndex.data.Len())
}

View File

@ -65,6 +65,7 @@ func (mpi *mempoolImpl) RecvTransaction(tx *pb.Transaction) error {
if mpi.txCache.IsFull() && mpi.poolIsFull() { if mpi.txCache.IsFull() && mpi.poolIsFull() {
return errors.New("transaction cache and pool are full, we will drop this transaction") return errors.New("transaction cache and pool are full, we will drop this transaction")
} }
// TODOYH: how to inform the client that the nonce of is wrong, need to sync to correct nonce.
mpi.txCache.recvTxC <- tx mpi.txCache.recvTxC <- tx
return nil return nil
} }

View File

@ -7,8 +7,6 @@ import (
"sync/atomic" "sync/atomic"
"time" "time"
"github.com/meshplus/bitxhub-kit/crypto"
"github.com/meshplus/bitxhub-kit/crypto/asym"
"github.com/meshplus/bitxhub-kit/types" "github.com/meshplus/bitxhub-kit/types"
"github.com/meshplus/bitxhub-model/pb" "github.com/meshplus/bitxhub-model/pb"
raftproto "github.com/meshplus/bitxhub/pkg/order/etcdraft/proto" raftproto "github.com/meshplus/bitxhub/pkg/order/etcdraft/proto"
@ -48,7 +46,7 @@ func newMempoolImpl(config *Config, storage storage.Storage, batchC chan *raftpr
storage: storage, storage: storage,
} }
mpi.txStore = newTransactionStore() mpi.txStore = newTransactionStore()
mpi.txCache = newTxCache(config.TxSliceTimeout, config.Logger) mpi.txCache = newTxCache(config.TxSliceTimeout, config.TxSliceSize, config.Logger)
mpi.subscribe = newSubscribe() mpi.subscribe = newSubscribe()
if config.BatchSize == 0 { if config.BatchSize == 0 {
mpi.batchSize = DefaultBatchSize mpi.batchSize = DefaultBatchSize
@ -156,17 +154,11 @@ func (mpi *mempoolImpl) listenEvent() {
func (mpi *mempoolImpl) processTransactions(txs []*pb.Transaction) error { func (mpi *mempoolImpl) processTransactions(txs []*pb.Transaction) error {
validTxs := make(map[string][]*pb.Transaction) validTxs := make(map[string][]*pb.Transaction)
for _, tx := range txs { for _, tx := range txs {
// check if this tx signature is valid first
ok, _ := asym.Verify(crypto.Secp256k1, tx.Signature, tx.SignHash().Bytes(), tx.From)
if !ok {
return fmt.Errorf("invalid signature")
}
// check the sequence number of tx // check the sequence number of tx
// TODO refactor Transaction
txAccount, err := getAccount(tx) txAccount, err := getAccount(tx)
if err != nil { if err != nil {
return fmt.Errorf("get tx account failed, err: %s", err.Error()) mpi.logger.Warningf("get tx account failed, err: %s", err.Error())
continue
} }
currentSeqNo := mpi.txStore.nonceCache.getPendingNonce(txAccount) currentSeqNo := mpi.txStore.nonceCache.getPendingNonce(txAccount)
if tx.Nonce < currentSeqNo { if tx.Nonce < currentSeqNo {
@ -179,7 +171,7 @@ func (mpi *mempoolImpl) processTransactions(txs []*pb.Transaction) error {
mpi.logger.Warningf("Tx %s already received", txHash) mpi.logger.Warningf("Tx %s already received", txHash)
continue continue
} }
_, ok = validTxs[txAccount] _, ok := validTxs[txAccount]
if !ok { if !ok {
validTxs[txAccount] = make([]*pb.Transaction, 0) validTxs[txAccount] = make([]*pb.Transaction, 0)
} }
@ -233,7 +225,7 @@ func (txStore *transactionStore) InsertTxs(txs map[string][]*pb.Transaction) map
tx: tx, tx: tx,
} }
list.items[tx.Nonce] = txItem list.items[tx.Nonce] = txItem
list.index.insert(tx) list.index.insertBySortedNonceKey(tx)
atomic.AddInt32(&txStore.poolSize, 1) atomic.AddInt32(&txStore.poolSize, 1)
} }
dirtyAccounts[account] = true dirtyAccounts[account] = true
@ -348,6 +340,7 @@ func (mpi *mempoolImpl) getBlock(ready *raftproto.Ready) *mempoolBatch {
return mpi.constructSameBatch(ready) return mpi.constructSameBatch(ready)
} }
// constructSameBatch only be called by follower, constructs a batch by given ready info.
func (mpi *mempoolImpl) constructSameBatch(ready *raftproto.Ready) *mempoolBatch { func (mpi *mempoolImpl) constructSameBatch(ready *raftproto.Ready) *mempoolBatch {
res := &mempoolBatch{} res := &mempoolBatch{}
if txList, ok := mpi.txStore.batchedCache[ready.Height]; ok { if txList, ok := mpi.txStore.batchedCache[ready.Height]; ok {
@ -388,12 +381,11 @@ func (mpi *mempoolImpl) constructSameBatch(ready *raftproto.Ready) *mempoolBatch
if len(res.missingTxnHashList) == 0 { if len(res.missingTxnHashList) == 0 {
// store the batch to cache // store the batch to cache
mpi.txStore.batchedCache[ready.Height] = txList mpi.txStore.batchedCache[ready.Height] = txList
// store the batch to db
mpi.batchStore(txList)
} }
return res return res
} }
// processCommitTransactions removes the transactions in ready.
func (mpi *mempoolImpl) processCommitTransactions(ready *raftproto.Ready) { func (mpi *mempoolImpl) processCommitTransactions(ready *raftproto.Ready) {
dirtyAccounts := make(map[string]bool) dirtyAccounts := make(map[string]bool)
// update current cached commit nonce for account // update current cached commit nonce for account
@ -418,14 +410,14 @@ func (mpi *mempoolImpl) processCommitTransactions(ready *raftproto.Ready) {
for account := range dirtyAccounts { for account := range dirtyAccounts {
commitNonce := mpi.txStore.nonceCache.getCommitNonce(account) commitNonce := mpi.txStore.nonceCache.getCommitNonce(account)
if list, ok := mpi.txStore.allTxs[account]; ok { if list, ok := mpi.txStore.allTxs[account]; ok {
// remove all previous seq number txs for this account. // removeBySortedNonceKey all previous seq number txs for this account.
removedTxs := list.forward(commitNonce) removedTxs := list.forward(commitNonce)
// remove index smaller than commitNonce delete index. // removeBySortedNonceKey index smaller than commitNonce delete index.
var wg sync.WaitGroup var wg sync.WaitGroup
wg.Add(3) wg.Add(3)
go func(ready map[string][]*pb.Transaction) { go func(ready map[string][]*pb.Transaction) {
defer wg.Done() defer wg.Done()
list.index.remove(removedTxs) list.index.removeBySortedNonceKey(removedTxs)
}(removedTxs) }(removedTxs)
go func(ready map[string][]*pb.Transaction) { go func(ready map[string][]*pb.Transaction) {
defer wg.Done() defer wg.Done()
@ -440,7 +432,9 @@ func (mpi *mempoolImpl) processCommitTransactions(ready *raftproto.Ready) {
atomic.AddInt32(&mpi.txStore.poolSize, -delta) atomic.AddInt32(&mpi.txStore.poolSize, -delta)
} }
} }
mpi.batchDelete(ready.TxHashes) if mpi.isLeader() {
mpi.batchDelete(ready.TxHashes)
}
delete(mpi.txStore.batchedCache, ready.Height) delete(mpi.txStore.batchedCache, ready.Height)
// restart batch timer for remain txs. // restart batch timer for remain txs.
if mpi.isLeader() { if mpi.isLeader() {
@ -528,21 +522,23 @@ func (mpi *mempoolImpl) loadTxnFromStorage(fetchTxnRequest *FetchTxnRequest) (ma
txList := make(map[uint64]*pb.Transaction) txList := make(map[uint64]*pb.Transaction)
for index, txHash := range missingHashList { for index, txHash := range missingHashList {
var ( var (
tx *pb.Transaction
rawHash types.Hash rawHash types.Hash
err error err error
ok bool
) )
if rawHash, err = hex2Hash(txHash); err != nil { if rawHash, err = hex2Hash(txHash); err != nil {
return nil, err return nil, err
} }
if tx, ok := mpi.load(rawHash); !ok { if tx, ok = mpi.load(rawHash); !ok {
return nil, errors.New("can't load tx from storage") return nil, errors.New("can't load tx from storage")
} else {
txList[index] = tx
} }
txList[index] = tx
} }
return txList, nil return txList, nil
} }
// loadTxnFromLedger find missing transactions from ledger.
func (mpi *mempoolImpl) loadTxnFromLedger(fetchTxnRequest *FetchTxnRequest) (map[uint64]*pb.Transaction, error) { func (mpi *mempoolImpl) loadTxnFromLedger(fetchTxnRequest *FetchTxnRequest) (map[uint64]*pb.Transaction, error) {
missingHashList := fetchTxnRequest.MissingTxHashes missingHashList := fetchTxnRequest.MissingTxHashes
txList := make(map[uint64]*pb.Transaction) txList := make(map[uint64]*pb.Transaction)

View File

@ -0,0 +1,121 @@
package mempool
import (
"testing"
"github.com/meshplus/bitxhub-model/pb"
"github.com/stretchr/testify/assert"
)
func TestProcessTransactions(t *testing.T) {
ast := assert.New(t)
mpi, batchC := mockMempoolImpl()
defer cleanTestData()
txList := make([]*pb.Transaction, 0)
privKey1 := genPrivKey()
account1, _ := privKey1.PublicKey().Address()
privKey2 := genPrivKey()
account2, _ := privKey2.PublicKey().Address()
tx1 := constructTx(uint64(1), &privKey1)
tx2 := constructTx(uint64(2), &privKey1)
tx3 := constructTx(uint64(1), &privKey2)
tx4 := constructTx(uint64(2), &privKey2)
tx5 := constructTx(uint64(4), &privKey2)
txList = append(txList, tx1, tx2, tx3, tx4, tx5)
err := mpi.processTransactions(txList)
ast.Nil(err)
ast.Equal(4, mpi.txStore.priorityIndex.size())
ast.Equal(1, mpi.txStore.parkingLotIndex.size())
ast.Equal(5, len(mpi.txStore.txHashMap))
ast.Equal(0, len(mpi.txStore.batchedCache))
ast.Equal(2, mpi.txStore.allTxs[account1.Hex()].index.size())
ast.Equal(3, mpi.txStore.allTxs[account2.Hex()].index.size())
ast.Equal(uint64(1), mpi.txStore.nonceCache.getCommitNonce(account1.Hex()))
ast.Equal(uint64(3), mpi.txStore.nonceCache.getPendingNonce(account1.Hex()))
ast.Equal(uint64(1), mpi.txStore.nonceCache.getCommitNonce(account2.Hex()))
ast.Equal(uint64(3), mpi.txStore.nonceCache.getPendingNonce(account2.Hex()))
go func() {
mpi.batchSize = 4
mpi.leader = mpi.localID
tx6 := constructTx(uint64(3), &privKey1)
tx7 := constructTx(uint64(5), &privKey2)
txList = make([]*pb.Transaction, 0)
txList = append(txList, tx6, tx7)
err = mpi.processTransactions(txList)
ast.Nil(err)
}()
select {
case batch := <-batchC:
ast.Equal(4, len(batch.TxHashes))
ast.Equal(uint64(2), batch.Height)
ast.Equal(uint64(1), mpi.txStore.priorityNonBatchSize)
ast.Equal(5, mpi.txStore.priorityIndex.size())
ast.Equal(2, mpi.txStore.parkingLotIndex.size())
ast.Equal(7, len(mpi.txStore.txHashMap))
ast.Equal(1, len(mpi.txStore.batchedCache))
ast.Equal(4, len(mpi.txStore.batchedCache[uint64(2)]))
ast.Equal(3, mpi.txStore.allTxs[account1.Hex()].index.size())
ast.Equal(4, mpi.txStore.allTxs[account2.Hex()].index.size())
ast.Equal(uint64(4), mpi.txStore.nonceCache.getPendingNonce(account1.Hex()))
ast.Equal(uint64(3), mpi.txStore.nonceCache.getPendingNonce(account2.Hex()))
}
}
func TestProcessFetchTxnRequest(t *testing.T) {
ast := assert.New(t)
mpi, _ := mockMempoolImpl()
defer cleanTestData()
privKey1 := genPrivKey()
tx1 := constructTx(uint64(1), &privKey1)
var txList []*pb.Transaction
txList = append(txList, tx1)
missingList := make(map[uint64]string)
missingList[0] = tx1.TransactionHash.Hex()
fetchTxnRequest := &FetchTxnRequest{
Height: uint64(2),
MissingTxHashes: missingList,
}
err := mpi.processFetchTxnRequest(fetchTxnRequest)
ast.NotNil(err, " can't find the missing tx from local")
// load tx from cache
err = mpi.processTransactions(txList)
mpi.txStore.batchedCache[uint64(2)] = txList
err = mpi.processFetchTxnRequest(fetchTxnRequest)
ast.Nil(err)
}
func TestProcessFetchTxnResponse(t *testing.T) {
ast := assert.New(t)
mpi, _ := mockMempoolImpl()
defer cleanTestData()
fetchTxnResponse := &FetchTxnResponse{
Height: uint64(2),
}
err := mpi.processFetchTxnResponse(fetchTxnResponse)
ast.NotNil(err, "can't find batch 2 from missingBatch")
privKey1 := genPrivKey()
tx1 := constructTx(uint64(1), &privKey1)
missingList := make(map[uint64]string)
missingList[0] = tx1.TransactionHash.Hex()
mpi.txStore.missingBatch[uint64(2)] = missingList
fetchTxnResponse.MissingTxnList = make(map[uint64]*pb.Transaction)
fetchTxnResponse.MissingTxnList[0] = tx1
fetchTxnResponse.MissingTxnList[1] = tx1
err = mpi.processFetchTxnResponse(fetchTxnResponse)
ast.NotNil(err, "length mismatch")
delete(fetchTxnResponse.MissingTxnList, 1)
err = mpi.processFetchTxnResponse(fetchTxnResponse)
ast.Nil(err)
ast.Equal(0, len(mpi.txStore.missingBatch))
}

View File

@ -1,264 +1,233 @@
package mempool package mempool
//
//import ( import (
// "encoding/json" "testing"
// "fmt" "time"
// "math/rand"
// "sort" "github.com/meshplus/bitxhub-kit/types"
// "testing" "github.com/meshplus/bitxhub-model/pb"
// "time" raftproto "github.com/meshplus/bitxhub/pkg/order/etcdraft/proto"
//
// "github.com/google/btree" "github.com/stretchr/testify/assert"
// "github.com/meshplus/bitxhub-kit/crypto" )
// "github.com/meshplus/bitxhub-kit/crypto/asym"
// "github.com/meshplus/bitxhub-kit/types" func TestRecvTransaction(t *testing.T) {
// "github.com/meshplus/bitxhub-model/pb" ast := assert.New(t)
// "github.com/stretchr/testify/require" mempool, _ := mockMempoolImpl()
//) defer cleanTestData()
//
//var ( privKey1 := genPrivKey()
// InterchainContractAddr = types.String2Address("000000000000000000000000000000000000000a") tx1 := constructTx(uint64(1), &privKey1)
// appchains = []string{ go mempool.txCache.listenEvent()
// "0x3f9d18f7c3a6e5e4c0b877fe3e688ab08840b997", go func() {
// "0xa8ae1bbc1105944a84a71b89056930d951d420fe", _ = mempool.RecvTransaction(tx1)
// "0x929545f44692178edb7fa468b44c5351596184ba", }()
// "0x7368022e6659236983eb959b8a1fa22577d48294", select {
// } case txSet := <-mempool.txCache.txSetC:
//) ast.Equal(1, len(txSet.TxList))
// }
//func TestCoreMemPool_RecvTransactions(t *testing.T) {
// readyTxs := make([]*pb.Transaction, 0) err := mempool.Start()
// nonreadyTxs := make([]*pb.Transaction, 0) ast.Nil(err)
// txIndex := make(map[string]*pb.Transaction) privKey2 := genPrivKey()
// privKey, err := asym.GenerateKeyPair(crypto.Secp256k1) go func() {
// require.Nil(t, err) _ = mempool.RecvTransaction(tx1)
// pubKey := privKey.PublicKey() }()
// addr, err := pubKey.Address() time.Sleep(1 * time.Millisecond)
// require.Nil(t, err) ast.Equal(1, mempool.txStore.priorityIndex.size())
// ast.Equal(0, mempool.txStore.parkingLotIndex.size())
// sort.Strings(appchains)
// readyTxsLen := 2 tx2 := constructTx(uint64(2), &privKey1)
// for _, appchain := range appchains { tx3 := constructTx(uint64(1), &privKey2)
// for i := 1; i <= readyTxsLen; i++ { tx4 := constructTx(uint64(2), &privKey2)
// readyTxs = append(readyTxs, mockTxhelper(t, txIndex, appchain, uint64(i))) go func() {
// // add unready txs _ = mempool.RecvTransaction(tx4)
// nonreadyTxs = append(nonreadyTxs, mockTxhelper(t, txIndex, appchain, uint64(i+readyTxsLen+1))) }()
// } time.Sleep(1 * time.Millisecond)
// } ast.Equal(1, mempool.txStore.priorityIndex.size())
// ast.Equal(1, mempool.txStore.parkingLotIndex.size())
// // set timestamp and signature for txs go func() {
// for _, tx := range readyTxs { _ = mempool.RecvTransaction(tx2)
// addSigAndTime(t, tx, addr, privKey) }()
// } time.Sleep(1 * time.Millisecond)
// for _, tx := range nonreadyTxs { ast.Equal(2, mempool.txStore.priorityIndex.size())
// addSigAndTime(t, tx, addr, privKey) ast.Equal(1, mempool.txStore.parkingLotIndex.size())
// } go func() {
// _ = mempool.RecvTransaction(tx3)
// // shuffle tx order }()
// rand.Seed(time.Now().UnixNano()) time.Sleep(1 * time.Millisecond)
// rand.Shuffle(len(readyTxs), func(i, j int) { ast.Equal(4, mempool.txStore.priorityIndex.size())
// readyTxs[i], readyTxs[j] = readyTxs[j], readyTxs[i] ast.Equal(1, mempool.txStore.parkingLotIndex.size(), "delete tx4 until finishing executor")
// }) mempool.Stop()
// memPool := newMempoolImpl(nil, nil, nil) }
// require.Nil(t, memPool.recvTransactions(readyTxs))
// require.Nil(t, memPool.RecvTransactions(nonreadyTxs)) func TestRecvForwardTxs(t *testing.T) {
// ast := assert.New(t)
// // check if all txs are indexed in memPool.allTxs mempool, _ := mockMempoolImpl()
// // and if all txs are indexed by its account and nonce defer cleanTestData()
// require.Equal(t, len(appchains), len(memPool.transactionStore.allTxs))
// checkAllTxs(t, memPool, txIndex, readyTxsLen, readyTxsLen) privKey1 := genPrivKey()
// checkHashMap(t, memPool, true, readyTxs, nonreadyTxs) tx := constructTx(uint64(1), &privKey1)
// txList := []*pb.Transaction{tx}
// // check if priorityIndex is correctly recorded txSlice := &TxSlice{TxList: txList}
// require.Equal(t, len(readyTxs), memPool.transactionStore.priorityIndex.data.Len()) go mempool.RecvForwardTxs(txSlice)
// for _, tx := range readyTxs { select {
// ok := memPool.priorityIndex.data.Has(makeKey(tx)) case txSet := <-mempool.subscribe.txForwardC:
// require.True(t, ok) ast.Equal(1, len(txSet.TxList))
// ok = memPool.transactionStore.parkingLotIndex.data.Has(makeKey(tx)) }
// require.True(t, !ok) }
// }
// func TestUpdateLeader(t *testing.T) {
// // check if parkingLotIndex is correctly recorded ast := assert.New(t)
// require.Equal(t, len(nonreadyTxs), memPool.transactionStore.parkingLotIndex.data.Len()) mempool, _ := mockMempoolImpl()
// for _, tx := range nonreadyTxs { mempool.Start()
// ok := memPool.transactionStore.parkingLotIndex.data.Has(makeKey(tx)) defer cleanTestData()
// require.True(t, ok) go mempool.UpdateLeader(uint64(2))
// ok = memPool.transactionStore.priorityIndex.data.Has(makeKey(tx)) time.Sleep(1 * time.Millisecond)
// require.True(t, !ok) ast.Equal(uint64(2), mempool.leader)
// } }
//
// // add the missing tx for each appchain func TestGetBlock(t *testing.T) {
// missingTxs := make([]*pb.Transaction, 0, len(appchains)) ast := assert.New(t)
// for _, appchain := range appchains { mempool, _ := mockMempoolImpl()
// missingTxs = append(missingTxs, mockTxhelper(t, txIndex, appchain, uint64(readyTxsLen+1))) err := mempool.Start()
// } ast.Nil(err)
// for _, tx := range missingTxs { defer cleanTestData()
// addSigAndTime(t, tx, addr, privKey)
// } privKey1 := genPrivKey()
// privKey2 := genPrivKey()
// require.Nil(t, memPool.RecvTransactions(missingTxs)) tx1 := constructTx(uint64(1), &privKey1)
// tx2 := constructTx(uint64(2), &privKey1)
// // check if parkingLotIndex is empty now tx3 := constructTx(uint64(2), &privKey2)
// require.Equal(t, 0, memPool.transactionStore.parkingLotIndex.data.Len()) tx4 := constructTx(uint64(4), &privKey2)
// // check if priorityIndex has received missingTxs and txs from original parkingLotIndex tx5 := constructTx(uint64(1), &privKey2)
// for _, tx := range missingTxs { var txList []*pb.Transaction
// ok := memPool.transactionStore.priorityIndex.data.Has(makeKey(tx)) var txHashList []types.Hash
// require.True(t, ok) txList = append(txList, tx1, tx2, tx3, tx4)
// } txHashList = append(txHashList, tx1.TransactionHash, tx2.TransactionHash, tx3.TransactionHash, tx5.TransactionHash)
// for _, tx := range nonreadyTxs { err = mempool.processTransactions(txList)
// ok := memPool.transactionStore.priorityIndex.data.Has(makeKey(tx)) ast.Nil(err)
// require.True(t, ok) ready := &raftproto.Ready{
// } Height: uint64(2),
// checkHashMap(t, memPool, true, readyTxs, nonreadyTxs, missingTxs) TxHashes: txHashList,
//} }
// missingTxnHashList, txList := mempool.GetBlock(ready)
//func TestCoreMemPool_RecvTransactions_Margin(t *testing.T) { ast.Equal(1, len(missingTxnHashList), "missing tx5")
// readyTxs := make([]*pb.Transaction, 0) ast.Equal(3, len(txList))
// identicalNonceTxs := make([]*pb.Transaction, 0)
// replayedTxs := make([]*pb.Transaction, 0) txList = []*pb.Transaction{}
// readyTxIndex := make(map[string]*pb.Transaction) txList = append(txList, tx5)
// identicalNonceTxIndex := make(map[string]*pb.Transaction) err = mempool.processTransactions(txList)
// privKey, err := asym.GenerateKeyPair(crypto.Secp256k1) missingTxnHashList, txList = mempool.GetBlock(ready)
// require.Nil(t, err) ast.Equal(0, len(missingTxnHashList))
// pubKey := privKey.PublicKey() ast.Equal(4, len(txList))
// addr, err := pubKey.Address() }
// require.Nil(t, err)
// func TestGetPendingNonceByAccount(t *testing.T) {
// sort.Strings(appchains) ast := assert.New(t)
// readyTxsLen := 2 mpi, _ := mockMempoolImpl()
// for _, appchain := range appchains { err := mpi.Start()
// for i := 1; i <= readyTxsLen; i++ { ast.Nil(err)
// tx := mockTxhelper(t, readyTxIndex, appchain, uint64(i)) defer cleanTestData()
// readyTxs = append(readyTxs, tx)
// // add tx with same index but different content privKey1 := genPrivKey()
// identicalNonceTx := mockTxhelper(t, identicalNonceTxIndex, appchain, uint64(i)) account1, _ := privKey1.PublicKey().Address()
// identicalNonceTxs = append(identicalNonceTxs, identicalNonceTx) nonce := mpi.GetPendingNonceByAccount(account1.Hex())
// } ast.Equal(uint64(1), nonce)
// }
// privKey2 := genPrivKey()
// // set timestamp and signature for txs account2, _ := privKey2.PublicKey().Address()
// for _, tx := range readyTxs { tx1 := constructTx(uint64(1), &privKey1)
// addSigAndTime(t, tx, addr, privKey) tx2 := constructTx(uint64(2), &privKey1)
// // add repeated txs tx3 := constructTx(uint64(1), &privKey2)
// replayedTxs = append(replayedTxs, tx) tx4 := constructTx(uint64(2), &privKey2)
// } tx5 := constructTx(uint64(4), &privKey2)
// for _, tx := range identicalNonceTxs { var txList []*pb.Transaction
// addSigAndTime(t, tx, addr, privKey) txList = append(txList, tx1, tx2, tx3, tx4, tx5)
// } err = mpi.processTransactions(txList)
// ast.Nil(err)
// memPool := New() nonce = mpi.GetPendingNonceByAccount(account1.Hex())
// require.Nil(t, memPool.RecvTransactions(readyTxs)) ast.Equal(uint64(3), nonce)
// require.NotNil(t, memPool.RecvTransactions(replayedTxs)) nonce = mpi.GetPendingNonceByAccount(account2.Hex())
// err = memPool.RecvTransactions(identicalNonceTxs) ast.Equal(uint64(3), nonce, "not 4")
// require.NotNil(t, err) }
//
// require.Equal(t, len(appchains), len(memPool.transactionStore.allTxs)) func TestCommitTransactions(t *testing.T) {
// checkAllTxs(t, memPool, readyTxIndex, readyTxsLen, 0) ast := assert.New(t)
// checkHashMap(t, memPool, true, readyTxs) mpi, _ := mockMempoolImpl()
// checkHashMap(t, memPool, false, identicalNonceTxs) err := mpi.Start()
//} ast.Nil(err)
// defer cleanTestData()
//func checkAllTxs(t *testing.T, memPool *CoreMemPool,
// txIndex map[string]*pb.Transaction, readyTxsLen, nonReadyTxLen int) { privKey1 := genPrivKey()
// for _, appchain := range appchains { account1, _ := privKey1.PublicKey().Address()
// idx := uint64(1) nonce := mpi.GetPendingNonceByAccount(account1.Hex())
// accountAddr := fmt.Sprintf("%s-%s", appchain, appchain) ast.Equal(uint64(1), nonce)
//
// txMap, ok := memPool.transactionStore.allTxs[accountAddr] privKey2 := genPrivKey()
// require.True(t, ok) tx1 := constructTx(uint64(1), &privKey1)
// require.NotNil(t, txMap.index) tx2 := constructTx(uint64(2), &privKey1)
// require.Equal(t, readyTxsLen+nonReadyTxLen, txMap.index.data.Len()) tx3 := constructTx(uint64(1), &privKey2)
// require.Equal(t, readyTxsLen+nonReadyTxLen, len(txMap.items)) tx4 := constructTx(uint64(4), &privKey2)
// txMap.index.data.Ascend(func(i btree.Item) bool { var txList []*pb.Transaction
// orderedKey := i.(*orderedIndexKey) txList = append(txList, tx1, tx2, tx3, tx4)
// if idx <= uint64(readyTxsLen) { mpi.leader = uint64(1)
// require.Equal(t, orderedKey.nonce, idx) err = mpi.processTransactions(txList)
// } else { ast.Equal(3, mpi.txStore.priorityIndex.size())
// require.Equal(t, orderedKey.nonce, idx+1) ast.Equal(1, mpi.txStore.parkingLotIndex.size())
// } ast.Equal(0, len(mpi.txStore.batchedCache))
// require.Equal(t, orderedKey.accountAddress, accountAddr)
// go func() {
// ibtpID := fmt.Sprintf("%s-%s-%d", appchain, appchain, orderedKey.nonce) <-mpi.batchC
// require.Equal(t, txIndex[ibtpID], txMap.items[orderedKey.nonce]) }()
// idx++ tx5 := constructTx(uint64(2), &privKey2)
// return true txList = []*pb.Transaction{}
// }) txList = append(txList, tx5)
// } err = mpi.processTransactions(txList)
//} ast.Equal(4, mpi.txStore.priorityIndex.size())
// ast.Equal(1, mpi.txStore.parkingLotIndex.size())
//func checkHashMap(t *testing.T, memPool *CoreMemPool, expectedStatus bool, txsSlice ...[]*pb.Transaction) { ast.Equal(1, len(mpi.txStore.batchedCache))
// for _, txs := range txsSlice { height := mpi.GetChainHeight()
// for _, tx := range txs { ast.Equal(uint64(2), height)
// _, ok := memPool.transactionStore.txHashMap[tx.TransactionHash.Hex()]
// require.Equal(t, expectedStatus, ok) var txHashList []types.Hash
// } txHashList = append(txHashList, tx1.TransactionHash, tx2.TransactionHash, tx3.TransactionHash, tx5.TransactionHash)
// } ready := &raftproto.Ready{
//} Height: uint64(2),
// TxHashes: txHashList,
//func mockTxhelper(t *testing.T, txIndex map[string]*pb.Transaction, appchainAddr string, index uint64) *pb.Transaction { }
// ibtp := mockIBTP(t, appchainAddr, appchainAddr, index) mpi.CommitTransactions(ready)
// tx := mockInterchainTx(t, ibtp) time.Sleep(100 * time.Millisecond)
// txIndex[ibtp.ID()] = tx ast.Equal(0, mpi.txStore.priorityIndex.size())
// return tx ast.Equal(1, mpi.txStore.parkingLotIndex.size())
//} ast.Equal(0, len(mpi.txStore.batchedCache))
// }
//func addSigAndTime(t *testing.T, tx *pb.Transaction, addr types.Address, privKey crypto.PrivateKey) {
// tx.Timestamp = time.Now().UnixNano() func TestFetchTxn(t *testing.T) {
// tx.From = addr ast := assert.New(t)
// sig, err := privKey.Sign(tx.SignHash().Bytes()) mpi, _ := mockMempoolImpl()
// tx.Signature = sig err := mpi.Start()
// require.Nil(t, err) ast.Nil(err)
// tx.TransactionHash = tx.Hash() defer cleanTestData()
//}
// missingList := make(map[uint64]string)
//func mockInterchainTx(t *testing.T, ibtp *pb.IBTP) *pb.Transaction { missingList[0] = "tx1"
// ib, err := ibtp.Marshal() lostTxnEvent := &LocalMissingTxnEvent{
// require.Nil(t, err) Height: uint64(2),
// MissingTxnHashList: missingList,
// ipd := &pb.InvokePayload{ WaitC: make(chan bool),
// Method: "HandleIBTP", }
// Args: []*pb.Arg{{Value: ib}}, mpi.FetchTxn(lostTxnEvent)
// } time.Sleep(10 * time.Millisecond)
// pd, err := ipd.Marshal() ast.Equal(1, len(mpi.txStore.missingBatch))
// require.Nil(t, err) }
//
// data := &pb.TransactionData{ func TestIncreaseChainHeight(t *testing.T) {
// VmType: pb.TransactionData_BVM, ast := assert.New(t)
// Type: pb.TransactionData_INVOKE, mpi, _ := mockMempoolImpl()
// Payload: pd, defer cleanTestData()
// }
// ast.Equal(uint64(1), mpi.GetChainHeight())
// return &pb.Transaction{ mpi.increaseBatchSeqNo()
// To: InterchainContractAddr, ast.Equal(uint64(2), mpi.GetChainHeight())
// Nonce: int64(ibtp.Index), }
// Data: data,
// Extra: []byte(fmt.Sprintf("%s-%s", ibtp.From, ibtp.To)),
// }
//}
//
//func mockIBTP(t *testing.T, from, to string, nonce uint64) *pb.IBTP {
// content := pb.Content{
// SrcContractId: from,
// DstContractId: from,
// Func: "interchainget",
// Args: [][]byte{[]byte("Alice"), []byte("10")},
// }
//
// bytes, err := content.Marshal()
// require.Nil(t, err)
//
// ibtppd, err := json.Marshal(pb.Payload{
// Encrypted: false,
// Content: bytes,
// })
// require.Nil(t, err)
//
// return &pb.IBTP{
// From: from,
// To: to,
// Payload: ibtppd,
// Index: nonce,
// Type: pb.IBTP_INTERCHAIN,
// Timestamp: time.Now().UnixNano(),
// }
//}

View File

@ -0,0 +1,197 @@
package mempool
import (
"encoding/json"
"fmt"
"os"
"time"
"github.com/ethereum/go-ethereum/event"
"github.com/meshplus/bitxhub-kit/crypto"
"github.com/meshplus/bitxhub-kit/crypto/asym"
"github.com/meshplus/bitxhub-kit/log"
"github.com/meshplus/bitxhub-kit/types"
"github.com/meshplus/bitxhub-model/pb"
"github.com/meshplus/bitxhub/internal/model/events"
raftproto "github.com/meshplus/bitxhub/pkg/order/etcdraft/proto"
"github.com/meshplus/bitxhub/pkg/storage/leveldb"
network "github.com/meshplus/go-lightp2p"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/stretchr/testify/mock"
)
var (
InterchainContractAddr = types.String2Address("000000000000000000000000000000000000000a")
)
const (
DefaultTestChainHeight = uint64(1)
DefaultTestBatchSize = uint64(4)
DefaultTestTxSetSize = uint64(1)
LevelDBDir = "test-db"
)
func mockMempoolImpl() (*mempoolImpl, chan *raftproto.Ready) {
config := &Config{
ID: 1,
ChainHeight: DefaultTestChainHeight,
BatchSize: DefaultTestBatchSize,
PoolSize: DefaultPoolSize,
TxSliceSize: DefaultTestTxSetSize,
BatchTick: DefaultBatchTick,
FetchTimeout: DefaultFetchTxnTimeout,
TxSliceTimeout: DefaultTxSetTick,
Logger: log.NewWithModule("consensus"),
}
config.PeerMgr = newMockPeerMgr()
db, _ := leveldb.New(LevelDBDir)
proposalC := make(chan *raftproto.Ready)
mempool := newMempoolImpl(config, db, proposalC)
return mempool, proposalC
}
func genPrivKey() crypto.PrivateKey {
privKey, _ := asym.GenerateKeyPair(crypto.Secp256k1)
return privKey
}
func constructTx(nonce uint64, privKey *crypto.PrivateKey) *pb.Transaction {
var privK crypto.PrivateKey
if privKey == nil {
privK = genPrivKey()
}
privK = *privKey
pubKey := privK.PublicKey()
addr, _ := pubKey.Address()
tx := &pb.Transaction{Nonce: nonce}
tx.Timestamp = time.Now().UnixNano()
tx.From = addr
sig, _ := privK.Sign(tx.SignHash().Bytes())
tx.Signature = sig
tx.TransactionHash = tx.Hash()
return tx
}
func constructIBTPTx(nonce uint64, privKey *crypto.PrivateKey) *pb.Transaction {
var privK crypto.PrivateKey
if privKey == nil {
privK = genPrivKey()
}
privK = *privKey
pubKey := privK.PublicKey()
from, _ := pubKey.Address()
to := from.Hex()
ibtp := mockIBTP(from.Hex(), to, nonce)
tx := mockInterChainTx(ibtp)
tx.Timestamp = time.Now().UnixNano()
sig, _ := privK.Sign(tx.SignHash().Bytes())
tx.Signature = sig
tx.TransactionHash = tx.Hash()
return tx
}
func cleanTestData() bool {
err := os.RemoveAll(LevelDBDir)
if err != nil {
return false
}
return true
}
func mockInterChainTx(ibtp *pb.IBTP) *pb.Transaction {
ib, _ := ibtp.Marshal()
ipd := &pb.InvokePayload{
Method: "HandleIBTP",
Args: []*pb.Arg{{Value: ib}},
}
pd, _ := ipd.Marshal()
data := &pb.TransactionData{
VmType: pb.TransactionData_BVM,
Type: pb.TransactionData_INVOKE,
Payload: pd,
}
return &pb.Transaction{
To: InterchainContractAddr,
Nonce: ibtp.Index,
Data: data,
Extra: []byte(fmt.Sprintf("%s-%s-%d", ibtp.From, ibtp.To, ibtp.Type)),
}
}
func mockIBTP(from, to string, nonce uint64) *pb.IBTP {
content := pb.Content{
SrcContractId: from,
DstContractId: from,
Func: "interchainget",
Args: [][]byte{[]byte("Alice"), []byte("10")},
}
bytes, _ := content.Marshal()
ibtppd, _ := json.Marshal(pb.Payload{
Encrypted: false,
Content: bytes,
})
return &pb.IBTP{
From: from,
To: to,
Payload: ibtppd,
Index: nonce,
Type: pb.IBTP_INTERCHAIN,
Timestamp: time.Now().UnixNano(),
}
}
type mockPeerMgr struct {
mock.Mock
EventChan chan *pb.Message
}
func newMockPeerMgr() *mockPeerMgr {
return &mockPeerMgr{}
}
func (mpm *mockPeerMgr) Broadcast(msg *pb.Message) error {
mpm.EventChan <- msg
return nil
}
func (mpm *mockPeerMgr) AsyncSend(id uint64, msg *pb.Message) error {
mpm.EventChan <- msg
return nil
}
func (mpm *mockPeerMgr) Start() error {
return nil
}
func (mpm *mockPeerMgr) Stop() error {
return nil
}
func (mpm *mockPeerMgr) SendWithStream(network.Stream, *pb.Message) error {
return nil
}
func (mpm *mockPeerMgr) Send(uint64, *pb.Message) (*pb.Message, error) {
return nil, nil
}
func (mpm *mockPeerMgr) Peers() map[uint64]*peer.AddrInfo {
peers := make(map[uint64]*peer.AddrInfo, 3)
var id1 peer.ID
id1 = "peer1"
peers[0] = &peer.AddrInfo{ID: id1}
id1 = "peer2"
peers[1] = &peer.AddrInfo{ID: id1}
id1 = "peer3"
peers[2] = &peer.AddrInfo{ID: id1}
return peers
}
func (mpm *mockPeerMgr) OtherPeers() map[uint64]*peer.AddrInfo {
return nil
}
func (mpm *mockPeerMgr) SubscribeOrderMessage(ch chan<- events.OrderMessageEvent) event.Subscription {
return nil
}

View File

@ -7,7 +7,7 @@ import (
"github.com/meshplus/bitxhub-model/pb" "github.com/meshplus/bitxhub-model/pb"
) )
// batchStore persists batch into DB, which // batchStore persists batch into DB, only be called by leader.
func (mpi *mempoolImpl) batchStore(txList []*pb.Transaction) { func (mpi *mempoolImpl) batchStore(txList []*pb.Transaction) {
batch := mpi.storage.NewBatch() batch := mpi.storage.NewBatch()
for _, tx := range txList { for _, tx := range txList {
@ -18,7 +18,7 @@ func (mpi *mempoolImpl) batchStore(txList []*pb.Transaction) {
batch.Commit() batch.Commit()
} }
// batchDelete batch delete txs // batchDelete delete txs from DB by given hash list.
func (mpi *mempoolImpl) batchDelete(hashes []types.Hash) { func (mpi *mempoolImpl) batchDelete(hashes []types.Hash) {
batch := mpi.storage.NewBatch() batch := mpi.storage.NewBatch()
for _, hash := range hashes { for _, hash := range hashes {
@ -28,12 +28,6 @@ func (mpi *mempoolImpl) batchDelete(hashes []types.Hash) {
batch.Commit() batch.Commit()
} }
func (mpi *mempoolImpl) store(tx *pb.Transaction) {
txKey := compositeKey(tx.TransactionHash.Bytes())
txData, _ := tx.Marshal()
mpi.storage.Put(txKey, txData)
}
func (mpi *mempoolImpl) load(hash types.Hash) (*pb.Transaction, bool) { func (mpi *mempoolImpl) load(hash types.Hash) (*pb.Transaction, bool) {
txKey := compositeKey(hash.Bytes()) txKey := compositeKey(hash.Bytes())
txData := mpi.storage.Get(txKey) txData := mpi.storage.Get(txKey)

View File

@ -0,0 +1,33 @@
package mempool
import (
"testing"
"github.com/meshplus/bitxhub-kit/types"
"github.com/meshplus/bitxhub-model/pb"
"github.com/stretchr/testify/assert"
)
func TestStorage(t *testing.T) {
ast := assert.New(t)
mempool, _ := mockMempoolImpl()
defer cleanTestData()
txList := make([]*pb.Transaction, 0)
txHashList := make([]types.Hash, 0)
txHash1, _ := hex2Hash("txHash1")
tx1 := &pb.Transaction{Nonce: uint64(1), TransactionHash: txHash1}
txHash2, _ := hex2Hash("txHash2")
tx2 := &pb.Transaction{Nonce: uint64(1), TransactionHash: txHash2}
txList = append(txList, tx1, tx2)
txHashList = append(txHashList, txHash1, txHash2)
mempool.batchStore(txList)
tx, ok := mempool.load(txHash1)
ast.Equal(true, ok)
ast.Equal(uint64(1), tx.Nonce)
mempool.batchDelete(txHashList)
tx, ok = mempool.load(txHash1)
ast.Equal(false, ok)
ast.Nil(tx)
}

View File

@ -17,9 +17,10 @@ type TxCache struct {
stopTimerC chan bool stopTimerC chan bool
close chan bool close chan bool
txSetTick time.Duration txSetTick time.Duration
txSetSize uint64
} }
func newTxCache(txSliceTimeout time.Duration, logger logrus.FieldLogger) *TxCache { func newTxCache(txSliceTimeout time.Duration, txSetSize uint64, logger logrus.FieldLogger) *TxCache {
txCache := &TxCache{} txCache := &TxCache{}
txCache.recvTxC = make(chan *pb.Transaction, DefaultTxCacheSize) txCache.recvTxC = make(chan *pb.Transaction, DefaultTxCacheSize)
txCache.close = make(chan bool) txCache.close = make(chan bool)
@ -33,6 +34,11 @@ func newTxCache(txSliceTimeout time.Duration, logger logrus.FieldLogger) *TxCach
} else { } else {
txCache.txSetTick = txSliceTimeout txCache.txSetTick = txSliceTimeout
} }
if txSetSize == 0 {
txCache.txSetSize = DefaultTxSetSize
} else {
txCache.txSetSize = txSetSize
}
return txCache return txCache
} }
@ -62,7 +68,7 @@ func (tc *TxCache) appendTx(tx *pb.Transaction) {
tc.startTxSetTimer() tc.startTxSetTimer()
} }
tc.txSet = append(tc.txSet, tx) tc.txSet = append(tc.txSet, tx)
if len(tc.txSet) >= DefaultTxSetSize { if uint64(len(tc.txSet)) >= tc.txSetSize {
tc.stopTxSetTimer() tc.stopTxSetTimer()
tc.postTxSet() tc.postTxSet()
} }

View File

@ -0,0 +1,44 @@
package mempool
import (
"testing"
"time"
"github.com/meshplus/bitxhub-kit/log"
"github.com/meshplus/bitxhub-model/pb"
"github.com/stretchr/testify/assert"
)
func TestAppendTx(t *testing.T) {
ast := assert.New(t)
logger := log.NewWithModule("consensus")
sliceTimeout := 1 * time.Millisecond
txCache := newTxCache(sliceTimeout, 2, logger)
go txCache.listenEvent()
tx := &pb.Transaction{}
txCache.appendTx(nil)
ast.Equal(0, len(txCache.txSet), "nil transaction")
tx = &pb.Transaction{Nonce: 1}
txCache.appendTx(tx)
select {
case txSet := <-txCache.txSetC:
ast.Equal(1, len(txSet.TxList), "post tx set by timeout")
ast.Equal(0, len(txCache.txSet))
}
txCache.stopTxSetTimer()
txCache.txSetTick = 1 * time.Second
tx1 := &pb.Transaction{Nonce: 2}
tx2 := &pb.Transaction{Nonce: 3}
go txCache.appendTx(tx1)
go txCache.appendTx(tx2)
select {
case txSet := <-txCache.txSetC:
ast.Equal(2, len(txSet.TxList), "post tx set by size")
ast.Equal(0, len(txCache.txSet))
}
// test exit txCache
close(txCache.close)
}

View File

@ -72,7 +72,7 @@ func (m *txSortedMap) filterReady(demandNonce uint64) ([]*pb.Transaction, []*pb.
if m.index.data.Len() == 0 { if m.index.data.Len() == 0 {
return nil, nil, demandNonce return nil, nil, demandNonce
} }
demandKey := makeSortedNonceKeyKey(demandNonce) demandKey := makeSortedNonceKey(demandNonce)
m.index.data.AscendGreaterOrEqual(demandKey, func(i btree.Item) bool { m.index.data.AscendGreaterOrEqual(demandKey, func(i btree.Item) bool {
nonce := i.(*sortedNonceKey).nonce nonce := i.(*sortedNonceKey).nonce
if nonce == demandNonce { if nonce == demandNonce {
@ -91,7 +91,7 @@ func (m *txSortedMap) filterReady(demandNonce uint64) ([]*pb.Transaction, []*pb.
// provided commitNonce. // provided commitNonce.
func (m *txSortedMap) forward(commitNonce uint64) map[string][]*pb.Transaction { func (m *txSortedMap) forward(commitNonce uint64) map[string][]*pb.Transaction {
removedTxs := make(map[string][]*pb.Transaction) removedTxs := make(map[string][]*pb.Transaction)
commitNonceKey := makeSortedNonceKeyKey(commitNonce) commitNonceKey := makeSortedNonceKey(commitNonce)
m.index.data.AscendLessThan(commitNonceKey, func(i btree.Item) bool { m.index.data.AscendLessThan(commitNonceKey, func(i btree.Item) bool {
// delete tx from map. // delete tx from map.
nonce := i.(*sortedNonceKey).nonce nonce := i.(*sortedNonceKey).nonce

View File

@ -0,0 +1,37 @@
package mempool
import (
"testing"
"github.com/meshplus/bitxhub-model/pb"
"github.com/stretchr/testify/assert"
)
func TestForward(t *testing.T) {
ast := assert.New(t)
mpi, _ := mockMempoolImpl()
defer cleanTestData()
txList := make([]*pb.Transaction, 0)
privKey1 := genPrivKey()
account1, _ := privKey1.PublicKey().Address()
tx1 := constructTx(uint64(1), &privKey1)
tx2 := constructTx(uint64(2), &privKey1)
tx3 := constructTx(uint64(3), &privKey1)
tx4 := constructTx(uint64(4), &privKey1)
tx5 := constructTx(uint64(6), &privKey1)
txList = append(txList, tx1, tx2, tx3, tx4, tx5)
err := mpi.processTransactions(txList)
ast.Nil(err)
list := mpi.txStore.allTxs[account1.Hex()]
ast.Equal(5, list.index.size())
ast.Equal(4, mpi.txStore.priorityIndex.size())
ast.Equal(1, mpi.txStore.parkingLotIndex.size())
removeList := list.forward(uint64(3))
ast.Equal(1, len(removeList))
ast.Equal(2, len(removeList[account1.Hex()]))
ast.Equal(uint64(1), removeList[account1.Hex()][0].Nonce)
ast.Equal(uint64(2), removeList[account1.Hex()][1].Nonce)
}

View File

@ -0,0 +1,29 @@
package mempool
import (
"fmt"
"github.com/meshplus/bitxhub-model/pb"
"github.com/stretchr/testify/assert"
"testing"
)
func TestGetAccount(t *testing.T) {
ast := assert.New(t)
privKey := genPrivKey()
address, _ := privKey.PublicKey().Address()
tx := constructIBTPTx(uint64(1), &privKey)
addr, err := getAccount(tx)
ast.Nil(err)
expectedAddr := fmt.Sprintf("%s-%s-%d", address, address, pb.IBTP_INTERCHAIN)
ast.Equal(expectedAddr, addr)
data := &pb.TransactionData{
Payload: []byte("test"),
}
tx = &pb.Transaction{
To: InterchainContractAddr,
Data: data,
}
_, err = getAccount(tx)
ast.NotNil(err.Error(), "unmarshal invoke payload faile")
}

View File

@ -12,6 +12,7 @@ import (
"github.com/meshplus/bitxhub-model/pb" "github.com/meshplus/bitxhub-model/pb"
"github.com/meshplus/bitxhub/internal/constant" "github.com/meshplus/bitxhub/internal/constant"
raftproto "github.com/meshplus/bitxhub/pkg/order/etcdraft/proto" raftproto "github.com/meshplus/bitxhub/pkg/order/etcdraft/proto"
cmap "github.com/orcaman/concurrent-map" cmap "github.com/orcaman/concurrent-map"
) )
@ -23,14 +24,6 @@ func (mpi *mempoolImpl) increaseBatchSeqNo() {
atomic.AddUint64(&mpi.batchSeqNo, 1) atomic.AddUint64(&mpi.batchSeqNo, 1)
} }
// getTxByTxPointer returns the tx stored in allTxs by given TxPointer.
func (mpi *mempoolImpl) getTxByTxPointer(txPointer orderedIndexKey) *pb.Transaction {
if txnMap, ok := mpi.txStore.allTxs[txPointer.account]; ok {
return txnMap.items[txPointer.nonce].tx
}
return nil
}
func (mpi *mempoolImpl) msgToConsensusPbMsg(data []byte, tyr raftproto.RaftMessage_Type) *pb.Message { func (mpi *mempoolImpl) msgToConsensusPbMsg(data []byte, tyr raftproto.RaftMessage_Type) *pb.Message {
rm := &raftproto.RaftMessage{ rm := &raftproto.RaftMessage{
Type: tyr, Type: tyr,
@ -69,6 +62,7 @@ func newNonceCache() *nonceCache {
} }
} }
// TODO (YH): refactor the tx struct
func hex2Hash(hash string) (types.Hash, error) { func hex2Hash(hash string) (types.Hash, error) {
var ( var (
hubHash types.Hash hubHash types.Hash
@ -137,7 +131,7 @@ func getAccount(tx *pb.Transaction) (string, error) {
} }
payload := &pb.InvokePayload{} payload := &pb.InvokePayload{}
if err := payload.Unmarshal(tx.Data.Payload); err != nil { if err := payload.Unmarshal(tx.Data.Payload); err != nil {
return "", fmt.Errorf("unmarshal invoke payload: %s", err.Error()) return "", fmt.Errorf("unmarshal invoke payload failed: %s", err.Error())
} }
if payload.Method == IBTPMethod1 || payload.Method == IBTPMethod2 { if payload.Method == IBTPMethod1 || payload.Method == IBTPMethod2 {
ibtp := &pb.IBTP{} ibtp := &pb.IBTP{}