feat(mempool): add nonce cache to persist storage

This commit is contained in:
Alexader 2021-01-25 11:14:38 +08:00
parent 8b062ffbc0
commit ddcddfad80
7 changed files with 154 additions and 16 deletions

View File

@ -96,6 +96,7 @@ func NewNode(opts ...order.Option) (order.Order, error) {
ID: config.ID,
ChainHeight: config.Applied,
Logger: config.Logger,
StoragePath: config.StoragePath,
BatchSize: raftConfig.RAFT.MempoolConfig.BatchSize,
PoolSize: raftConfig.RAFT.MempoolConfig.PoolSize,

View File

@ -2,12 +2,19 @@ package mempool
import (
"math"
"os"
"path/filepath"
"strconv"
"sync"
"time"
"github.com/coreos/etcd/pkg/fileutil"
"github.com/google/btree"
"github.com/meshplus/bitxhub-kit/storage"
"github.com/meshplus/bitxhub-kit/storage/leveldb"
"github.com/meshplus/bitxhub-model/pb"
raftproto "github.com/meshplus/bitxhub/pkg/order/etcdraft/proto"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)
@ -19,16 +26,22 @@ type mempoolImpl struct {
poolSize uint64
logger logrus.FieldLogger
txStore *transactionStore // store all transactions info
store storage.Storage // persist storage for mem pool
}
func newMempoolImpl(config *Config) *mempoolImpl {
db, err := loadOrCreateStorage(filepath.Join(config.StoragePath, "mempool"))
if err != nil {
config.Logger.Panicf("load or create mem pool storage :%s", err.Error())
}
mpi := &mempoolImpl{
localID: config.ID,
batchSeqNo: config.ChainHeight,
logger: config.Logger,
txSliceSize: config.TxSliceSize,
store: db,
}
mpi.txStore = newTransactionStore()
mpi.txStore = newTransactionStore(db)
if config.BatchSize == 0 {
mpi.batchSize = DefaultBatchSize
} else {
@ -206,6 +219,8 @@ func (mpi *mempoolImpl) processCommitTransactions(state *ChainState) {
// update current cached commit nonce for account
updateAccounts := make(map[string]uint64)
// update current cached commit nonce for account
storageBatch := mpi.store.NewBatch()
defer storageBatch.Commit()
for _, txHash := range state.TxHashList {
strHash := txHash.String()
txPointer := mpi.txStore.txHashMap[strHash]
@ -218,6 +233,7 @@ func (mpi *mempoolImpl) processCommitTransactions(state *ChainState) {
newCommitNonce := txPointer.nonce + 1
if preCommitNonce < newCommitNonce {
mpi.txStore.nonceCache.setCommitNonce(txPointer.account, newCommitNonce)
storageBatch.Put(committedNonceKey(txPointer.account), []byte(strconv.FormatUint(newCommitNonce, 10)))
// Note!!! updating pendingNonce to commitNonce for the restart node
pendingNonce := mpi.txStore.nonceCache.getPendingNonce(txPointer.account)
if pendingNonce < newCommitNonce {
@ -332,3 +348,12 @@ func (mpi *mempoolImpl) shardTxList(timeoutItems []*orderedTimeoutKey, batchLen
}
return shardedLists
}
func loadOrCreateStorage(memPoolDir string) (storage.Storage, error) {
if !fileutil.Exist(memPoolDir) {
if err := os.MkdirAll(memPoolDir, os.ModePerm); err != nil {
return nil, errors.Errorf("failed to mkdir '%s' for mem pool: %s", memPoolDir, err)
}
}
return leveldb.New(memPoolDir)
}

View File

@ -1,7 +1,9 @@
package mempool
import (
"io/ioutil"
"math"
"os"
"testing"
"time"
@ -12,7 +14,10 @@ import (
func TestGetBlock(t *testing.T) {
ast := assert.New(t)
mpi, _ := mockMempoolImpl()
storePath, err := ioutil.TempDir("", "mempool")
ast.Nil(err)
defer os.RemoveAll(storePath)
mpi, _ := mockMempoolImpl(storePath)
privKey1 := genPrivKey()
privKey2 := genPrivKey()
tx1 := constructTx(uint64(1), &privKey1)
@ -38,7 +43,10 @@ func TestGetBlock(t *testing.T) {
func TestGetPendingNonceByAccount(t *testing.T) {
ast := assert.New(t)
mpi, _ := mockMempoolImpl()
storePath, err := ioutil.TempDir("", "mempool")
ast.Nil(err)
defer os.RemoveAll(storePath)
mpi, _ := mockMempoolImpl(storePath)
privKey1 := genPrivKey()
account1, _ := privKey1.PublicKey().Address()
nonce := mpi.GetPendingNonceByAccount(account1.String())
@ -65,7 +73,10 @@ func TestGetPendingNonceByAccount(t *testing.T) {
func TestCommitTransactions(t *testing.T) {
ast := assert.New(t)
mpi, batchC := mockMempoolImpl()
storePath, err := ioutil.TempDir("", "mempool")
ast.Nil(err)
defer os.RemoveAll(storePath)
mpi, batchC := mockMempoolImpl(storePath)
privKey1 := genPrivKey()
account1, _ := privKey1.PublicKey().Address()
nonce := mpi.GetPendingNonceByAccount(account1.String())
@ -108,7 +119,10 @@ func TestCommitTransactions(t *testing.T) {
func TestIncreaseChainHeight(t *testing.T) {
ast := assert.New(t)
mpi, _ := mockMempoolImpl()
storePath, err := ioutil.TempDir("", "mempool")
ast.Nil(err)
defer os.RemoveAll(storePath)
mpi, _ := mockMempoolImpl(storePath)
ast.Equal(uint64(1), mpi.batchSeqNo)
mpi.batchSeqNo++
mpi.SetBatchSeqNo(mpi.batchSeqNo)
@ -117,7 +131,10 @@ func TestIncreaseChainHeight(t *testing.T) {
func TestProcessTransactions(t *testing.T) {
ast := assert.New(t)
mpi, _ := mockMempoolImpl()
storePath, err := ioutil.TempDir("", "mempool")
ast.Nil(err)
defer os.RemoveAll(storePath)
mpi, _ := mockMempoolImpl(storePath)
txList := make([]*pb.Transaction, 0)
privKey1 := genPrivKey()
account1, _ := privKey1.PublicKey().Address()
@ -161,7 +178,10 @@ func TestProcessTransactions(t *testing.T) {
func TestForward(t *testing.T) {
ast := assert.New(t)
mpi, _ := mockMempoolImpl()
storePath, err := ioutil.TempDir("", "mempool")
ast.Nil(err)
defer os.RemoveAll(storePath)
mpi, _ := mockMempoolImpl(storePath)
txList := make([]*pb.Transaction, 0)
privKey1 := genPrivKey()
account1, _ := privKey1.PublicKey().Address()
@ -187,7 +207,10 @@ func TestForward(t *testing.T) {
func TestUnorderedIncomingTxs(t *testing.T) {
ast := assert.New(t)
mpi, _ := mockMempoolImpl()
storePath, err := ioutil.TempDir("", "mempool")
ast.Nil(err)
defer os.RemoveAll(storePath)
mpi, _ := mockMempoolImpl(storePath)
mpi.batchSize = 5
txList := make([]*pb.Transaction, 0)
@ -284,7 +307,10 @@ func TestUnorderedIncomingTxs(t *testing.T) {
func TestGetTimeoutTransaction(t *testing.T) {
ast := assert.New(t)
mpi, _ := mockMempoolImpl()
storePath, err := ioutil.TempDir("", "mempool")
ast.Nil(err)
defer os.RemoveAll(storePath)
mpi, _ := mockMempoolImpl(storePath)
mpi.txSliceSize = 3
allTxHashes := make([]*types.Hash, 0)
@ -369,3 +395,58 @@ func TestGetTimeoutTransaction(t *testing.T) {
ast.Equal(0, len(mpi.txStore.txHashMap))
ast.Equal(uint64(0), mpi.txStore.priorityNonBatchSize)
}
func TestRestore(t *testing.T) {
ast := assert.New(t)
storePath, err := ioutil.TempDir("", "mempool")
ast.Nil(err)
defer os.RemoveAll(storePath)
mpi, batchC := mockMempoolImpl(storePath)
privKey1 := genPrivKey()
account1, _ := privKey1.PublicKey().Address()
nonce := mpi.GetPendingNonceByAccount(account1.String())
ast.Equal(uint64(1), nonce)
privKey2 := genPrivKey()
account2, _ := privKey1.PublicKey().Address()
tx1 := constructTx(uint64(1), &privKey1)
tx2 := constructTx(uint64(2), &privKey1)
tx3 := constructTx(uint64(1), &privKey2)
tx4 := constructTx(uint64(4), &privKey2)
var txList []*pb.Transaction
txList = append(txList, tx1, tx2, tx3, tx4)
batch := mpi.ProcessTransactions(txList, true, true)
ast.Nil(batch)
ast.Equal(3, mpi.txStore.priorityIndex.size())
ast.Equal(1, mpi.txStore.parkingLotIndex.size())
go func() {
<-batchC
}()
tx5 := constructTx(uint64(2), &privKey2)
txList = []*pb.Transaction{}
txList = append(txList, tx5)
batch = mpi.ProcessTransactions(txList, true, true)
ast.Equal(4, len(batch.TxList))
ast.Equal(4, mpi.txStore.priorityIndex.size())
ast.Equal(1, mpi.txStore.parkingLotIndex.size())
ast.Equal(uint64(2), mpi.batchSeqNo)
var txHashList []*types.Hash
txHashList = append(txHashList, tx1.TransactionHash, tx2.TransactionHash, tx3.TransactionHash, tx5.TransactionHash)
state := &ChainState{
TxHashList: txHashList,
Height: uint64(2),
}
mpi.CommitTransactions(state)
time.Sleep(100 * time.Millisecond)
ast.Equal(0, mpi.txStore.priorityIndex.size())
ast.Equal(1, mpi.txStore.parkingLotIndex.size())
// stop and restore
ast.Nil(mpi.store.Close())
newMpi, _ := mockMempoolImpl(storePath)
ast.Equal(uint64(3), newMpi.txStore.nonceCache.getCommitNonce(account1.String()))
ast.Equal(uint64(3), newMpi.txStore.nonceCache.getCommitNonce(account2.String()))
ast.Equal(uint64(3), newMpi.txStore.nonceCache.getPendingNonce(account1.String()))
ast.Equal(uint64(3), newMpi.txStore.nonceCache.getPendingNonce(account2.String()))
}

View File

@ -21,7 +21,7 @@ const (
DefaultTestTxSetSize = uint64(1)
)
func mockMempoolImpl() (*mempoolImpl, chan *raftproto.Ready) {
func mockMempoolImpl(path string) (*mempoolImpl, chan *raftproto.Ready) {
config := &Config{
ID: 1,
ChainHeight: DefaultTestChainHeight,
@ -30,6 +30,7 @@ func mockMempoolImpl() (*mempoolImpl, chan *raftproto.Ready) {
TxSliceSize: DefaultTestTxSetSize,
TxSliceTimeout: DefaultTxSetTick,
Logger: log.NewWithModule("consensus"),
StoragePath: path,
}
proposalC := make(chan *raftproto.Ready)
mempool := newMempoolImpl(config)

View File

@ -1,10 +1,13 @@
package mempool
import (
"fmt"
"math"
"strconv"
"sync"
"github.com/google/btree"
"github.com/meshplus/bitxhub-kit/storage"
"github.com/meshplus/bitxhub-model/pb"
)
@ -30,7 +33,7 @@ type transactionStore struct {
priorityNonBatchSize uint64
}
func newTransactionStore() *transactionStore {
func newTransactionStore(db storage.Storage) *transactionStore {
return &transactionStore{
txHashMap: make(map[string]*orderedIndexKey, 0),
allTxs: make(map[string]*txSortedMap),
@ -38,7 +41,7 @@ func newTransactionStore() *transactionStore {
parkingLotIndex: newBtreeIndex(),
priorityIndex: newBtreeIndex(),
ttlIndex: newTxLiveTimeMap(),
nonceCache: newNonceCache(),
nonceCache: newNonceCache(db),
}
}
@ -156,20 +159,23 @@ type nonceCache struct {
// pendingNonces records each account's latest nonce which has been included in
// priority queue. Invariant: pendingNonces[account] >= commitNonces[account]
pendingNonces map[string]uint64
pendingMu sync.RWMutex
// falling back to reading from local database if an account is unknown.
fallback storage.Storage
pendingMu sync.RWMutex
}
func newNonceCache() *nonceCache {
func newNonceCache(store storage.Storage) *nonceCache {
return &nonceCache{
commitNonces: make(map[string]uint64),
pendingNonces: make(map[string]uint64),
fallback: store,
}
}
func (nc *nonceCache) getCommitNonce(account string) uint64 {
nonce, ok := nc.commitNonces[account]
if !ok {
return 1
return nc.getNonceFromDB(committedNonceKey(account))
}
return nonce
}
@ -183,7 +189,8 @@ func (nc *nonceCache) getPendingNonce(account string) uint64 {
defer nc.pendingMu.RUnlock()
nonce, ok := nc.pendingNonces[account]
if !ok {
return 1
// if nonce is unknown, check if there is nonce persisted in db
return nc.getNonceFromDB(pendingNonceKey(account))
}
return nonce
}
@ -191,9 +198,30 @@ func (nc *nonceCache) getPendingNonce(account string) uint64 {
func (nc *nonceCache) setPendingNonce(account string, nonce uint64) {
nc.pendingMu.Lock()
defer nc.pendingMu.Unlock()
nc.fallback.Put(pendingNonceKey(account), []byte(strconv.FormatUint(nonce, 10)))
nc.pendingNonces[account] = nonce
}
func (nc *nonceCache) getNonceFromDB(key []byte) uint64 {
var value []byte
if value = nc.fallback.Get(key); value == nil {
return 1
}
nonce, err := strconv.ParseUint(string(value), 10, 64)
if err != nil {
return 1
}
return nonce
}
func pendingNonceKey(account string) []byte {
return []byte(fmt.Sprintf("pending-%s", account))
}
func committedNonceKey(account string) []byte {
return []byte(fmt.Sprintf("committed-%s", account))
}
// since the live time field in sortedTtlKey may vary during process
// we need to track the latest live time since its latest broadcast.
type txLiveTimeMap struct {

View File

@ -29,6 +29,7 @@ type Config struct {
TxSliceTimeout time.Duration
ChainHeight uint64
Logger logrus.FieldLogger
StoragePath string // db for persist mem pool meta data
}
type txItem struct {

View File

@ -101,6 +101,7 @@ func NewNode(opts ...order.Option) (order.Order, error) {
ID: config.ID,
ChainHeight: config.Applied,
Logger: config.Logger,
StoragePath: config.StoragePath,
BatchSize: memConfig.BatchSize,
PoolSize: memConfig.PoolSize,