fix GetPendingNonceByAccount api which may be concurrent reading/writing
This commit is contained in:
parent
f43bb1c09a
commit
76e3690bf0
|
@ -9,7 +9,7 @@ solo = false
|
|||
monitor = 40011
|
||||
|
||||
[pprof]
|
||||
enable = true
|
||||
enable = false
|
||||
|
||||
[monitor]
|
||||
enable = true
|
||||
|
|
|
@ -602,5 +602,5 @@ func (n *Node) writeAppliedIndex(index uint64) {
|
|||
}
|
||||
|
||||
func (n *Node) GetPendingNonceByAccount(account string) uint64 {
|
||||
return n.mempool.PendingNonceAt(account)
|
||||
return n.mempool.GetPendingNonceByAccount(account)
|
||||
}
|
||||
|
|
|
@ -35,7 +35,7 @@ func (snk *sortedNonceKey) Less(item btree.Item) bool {
|
|||
func makeOrderedIndexKey(account string, tx *pb.Transaction) *orderedIndexKey {
|
||||
return &orderedIndexKey{
|
||||
account: account,
|
||||
nonce: uint64(tx.Nonce),
|
||||
nonce: tx.Nonce,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -56,13 +56,13 @@ func newBtreeIndex() *btreeIndex {
|
|||
}
|
||||
|
||||
func (idx *btreeIndex) insert(tx *pb.Transaction) {
|
||||
idx.data.ReplaceOrInsert(makeSortedNonceKeyKey(uint64(tx.Nonce)))
|
||||
idx.data.ReplaceOrInsert(makeSortedNonceKeyKey(tx.Nonce))
|
||||
}
|
||||
|
||||
func (idx *btreeIndex) remove(txs map[string][]*pb.Transaction) {
|
||||
for _, list := range txs {
|
||||
for _, tx := range list {
|
||||
idx.data.Delete(makeSortedNonceKeyKey(uint64(tx.Nonce)))
|
||||
idx.data.Delete(makeSortedNonceKeyKey(tx.Nonce))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -80,6 +80,6 @@ func (idx *btreeIndex) removeByOrderedQueueKey(txs map[string][]*pb.Transaction)
|
|||
}
|
||||
|
||||
// Size returns the size of the index
|
||||
func (idx *btreeIndex) size() uint64 {
|
||||
return uint64(idx.data.Len())
|
||||
func (idx *btreeIndex) size() int {
|
||||
return idx.data.Len()
|
||||
}
|
||||
|
|
|
@ -41,8 +41,8 @@ type MemPool interface {
|
|||
// Remove committed transactions from mempool
|
||||
CommitTransactions(ready *raftproto.Ready)
|
||||
|
||||
// GetPendingNonce will return the latest pending nonce of a given account
|
||||
PendingNonceAt(account string) uint64
|
||||
// GetPendingNonceByAccount will return the latest pending nonce of a given account
|
||||
GetPendingNonceByAccount(account string) uint64
|
||||
}
|
||||
|
||||
// NewMempool return the mempool instance.
|
||||
|
@ -123,6 +123,12 @@ func (mpi *mempoolImpl) CommitTransactions(ready *raftproto.Ready) {
|
|||
mpi.subscribe.commitTxnC <- ready
|
||||
}
|
||||
|
||||
func (mpi *mempoolImpl) PendingNonceAt(account string) uint64 {
|
||||
return mpi.txStore.nonceCache.getPendingNonce(account)
|
||||
func (mpi *mempoolImpl) GetPendingNonceByAccount(account string) uint64 {
|
||||
waitC := make(chan uint64)
|
||||
getNonceRequest := &getNonceRequest{
|
||||
account: account,
|
||||
waitC: waitC,
|
||||
}
|
||||
mpi.subscribe.pendingNonceC <- getNonceRequest
|
||||
return <- waitC
|
||||
}
|
||||
|
|
|
@ -116,7 +116,7 @@ func (mpi *mempoolImpl) listenEvent() {
|
|||
}
|
||||
mpi.batchC <- ready
|
||||
} else {
|
||||
mpi.logger.Debug("The length of priorityIndex is 0, ")
|
||||
mpi.logger.Debug("The length of priorityIndex is 0, skip the batch timer")
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -146,6 +146,10 @@ func (mpi *mempoolImpl) listenEvent() {
|
|||
continue
|
||||
}
|
||||
waitC <- true
|
||||
|
||||
case getNonceRequest := <-mpi.subscribe.pendingNonceC:
|
||||
pendingNonce := mpi.txStore.nonceCache.getPendingNonce(getNonceRequest.account)
|
||||
getNonceRequest.waitC <- pendingNonce
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -168,7 +172,7 @@ func (mpi *mempoolImpl) processTransactions(txs []*pb.Transaction) error {
|
|||
currentSeqNo := mpi.txStore.nonceCache.getPendingNonce(txAccount)
|
||||
if tx.Nonce < currentSeqNo {
|
||||
mpi.logger.Warningf("account %s current sequence number is %d, required %d", txAccount, tx.Nonce, currentSeqNo+1)
|
||||
return nil
|
||||
continue
|
||||
}
|
||||
// check the existence of hash of this tx
|
||||
txHash := tx.TransactionHash.Hex()
|
||||
|
|
|
@ -56,6 +56,7 @@ type subscribeEvent struct {
|
|||
getBlockC chan *constructBatchEvent
|
||||
commitTxnC chan *raftproto.Ready
|
||||
updateLeaderC chan uint64
|
||||
pendingNonceC chan *getNonceRequest
|
||||
}
|
||||
|
||||
type mempoolBatch struct {
|
||||
|
@ -93,3 +94,8 @@ type txItem struct {
|
|||
account string
|
||||
tx *pb.Transaction
|
||||
}
|
||||
|
||||
type getNonceRequest struct {
|
||||
account string
|
||||
waitC chan uint64
|
||||
}
|
||||
|
|
|
@ -17,14 +17,14 @@ import (
|
|||
type Node struct {
|
||||
sync.RWMutex
|
||||
height uint64 // current block height
|
||||
pendingTxs *list.List //pending tx pool
|
||||
commitC chan *pb.Block //block channel
|
||||
logger logrus.FieldLogger //logger
|
||||
reqLookUp *order.ReqLookUp //bloom filter
|
||||
pendingTxs *list.List // pending tx pool
|
||||
commitC chan *pb.Block // block channel
|
||||
logger logrus.FieldLogger // logger
|
||||
reqLookUp *order.ReqLookUp // bloom filter
|
||||
getTransactionFunc func(hash types.Hash) (*pb.Transaction, error)
|
||||
|
||||
packSize int //maximum number of transaction packages
|
||||
blockTick time.Duration //block packed period
|
||||
packSize int // maximum number of transaction packages
|
||||
blockTick time.Duration // block packed period
|
||||
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
|
@ -39,6 +39,11 @@ func (n *Node) Stop() {
|
|||
n.cancel()
|
||||
}
|
||||
|
||||
func (n *Node) GetPendingNonceByAccount(account string) uint64 {
|
||||
// TODO: implement me
|
||||
return 0
|
||||
}
|
||||
|
||||
func (n *Node) Prepare(tx *pb.Transaction) error {
|
||||
hash := tx.TransactionHash
|
||||
if ok := n.reqLookUp.LookUp(hash.Bytes()); ok {
|
||||
|
|
Loading…
Reference in New Issue