fix(mempool): fix two mempool bugs:

1. If some transactions are missing when calling the func constructSameBatch by a
   follower to construct the same block, avoid persistent this block;
2. Increase the chain height of follower after finishing getting the block from mempool;
This commit is contained in:
Lizen 2020-09-29 21:27:16 +08:00
parent 8b4b168ce1
commit 5a53cf1435
3 changed files with 18 additions and 12 deletions

View File

@ -457,13 +457,11 @@ func (n *Node) mint(ready *raftproto.Ready) {
// follower node update the block height // follower node update the block height
expectHeight := n.mempool.GetChainHeight() expectHeight := n.mempool.GetChainHeight()
if !n.IsLeader() { isLeader := n.IsLeader()
if expectHeight != ready.Height-1 { if !isLeader && expectHeight != ready.Height-1 {
n.logger.Warningf("Receive batch %d, but not match, expect height: %d", ready.Height, expectHeight+1) n.logger.Warningf("Receive batch %d, but not match, expect height: %d", ready.Height, expectHeight+1)
return return
} }
n.mempool.IncreaseChainHeight()
}
missingTxsHash, txList := n.mempool.GetBlock(ready) missingTxsHash, txList := n.mempool.GetBlock(ready)
// handle missing txs // handle missing txs
@ -496,6 +494,9 @@ func (n *Node) mint(ready *raftproto.Ready) {
return return
} }
} }
if !isLeader {
n.mempool.IncreaseChainHeight()
}
block := &pb.Block{ block := &pb.Block{
BlockHeader: &pb.BlockHeader{ BlockHeader: &pb.BlockHeader{
Version: []byte("1.0.0"), Version: []byte("1.0.0"),

View File

@ -76,6 +76,7 @@ func (mpi *mempoolImpl) RecvForwardTxs(txSlice *TxSlice) {
// UpdateLeader updates the // UpdateLeader updates the
func (mpi *mempoolImpl) UpdateLeader(newLeader uint64) { func (mpi *mempoolImpl) UpdateLeader(newLeader uint64) {
// TODO (YH): should block until mempool finishing updating the leader info.
mpi.subscribe.updateLeaderC <- newLeader mpi.subscribe.updateLeaderC <- newLeader
} }

View File

@ -384,10 +384,13 @@ func (mpi *mempoolImpl) constructSameBatch(ready *raftproto.Ready) *mempoolBatch
} }
res.missingTxnHashList = missingTxList res.missingTxnHashList = missingTxList
res.txList = txList res.txList = txList
// persist the correct batch
if len(res.missingTxnHashList) == 0 {
// store the batch to cache // store the batch to cache
mpi.txStore.batchedCache[ready.Height] = txList mpi.txStore.batchedCache[ready.Height] = txList
// store the batch to db // store the batch to db
mpi.batchStore(txList) mpi.batchStore(txList)
}
return res return res
} }
@ -474,7 +477,8 @@ func (mpi *mempoolImpl) processFetchTxnRequest(fetchTxnRequest *FetchTxnRequest)
if txList, err = mpi.loadTxnFromCache(fetchTxnRequest); err != nil { if txList, err = mpi.loadTxnFromCache(fetchTxnRequest); err != nil {
if txList, err = mpi.loadTxnFromStorage(fetchTxnRequest); err != nil { if txList, err = mpi.loadTxnFromStorage(fetchTxnRequest); err != nil {
if txList, err = mpi.loadTxnFromLedger(fetchTxnRequest); err != nil { if txList, err = mpi.loadTxnFromLedger(fetchTxnRequest); err != nil {
mpi.logger.Error("Process fetch txn request failed.") mpi.logger.Errorf("Process fetch txn request [peer: %s, block height: %d] failed",
fetchTxnRequest.ReplicaId, fetchTxnRequest.Height)
return err return err
} }
} }
@ -489,7 +493,7 @@ func (mpi *mempoolImpl) processFetchTxnRequest(fetchTxnRequest *FetchTxnRequest)
return err return err
} }
pbMsg := mpi.msgToConsensusPbMsg(resBytes, raftproto.RaftMessage_GET_TX_ACK) pbMsg := mpi.msgToConsensusPbMsg(resBytes, raftproto.RaftMessage_GET_TX_ACK)
mpi.logger.Debugf("Send fetch transactions response to replica %d", fetchTxnRequest.ReplicaId) mpi.logger.Debugf("Send fetch missing transactions response to replica %d", fetchTxnRequest.ReplicaId)
mpi.unicast(fetchTxnRequest.ReplicaId, pbMsg) mpi.unicast(fetchTxnRequest.ReplicaId, pbMsg)
return nil return nil
} }