Merge pull request #180 from meshplus/fix-mempool

fix(mempool): fix two bugs:
This commit is contained in:
Sandy Zhou 2020-09-30 10:44:23 +08:00 committed by GitHub
commit e400af4add
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 18 additions and 12 deletions

View File

@ -457,12 +457,10 @@ func (n *Node) mint(ready *raftproto.Ready) {
// follower node update the block height
expectHeight := n.mempool.GetChainHeight()
if !n.IsLeader() {
if expectHeight != ready.Height-1 {
n.logger.Warningf("Receive batch %d, but not match, expect height: %d", ready.Height, expectHeight+1)
return
}
n.mempool.IncreaseChainHeight()
isLeader := n.IsLeader()
if !isLeader && expectHeight != ready.Height-1 {
n.logger.Warningf("Receive batch %d, but not match, expect height: %d", ready.Height, expectHeight+1)
return
}
missingTxsHash, txList := n.mempool.GetBlock(ready)
@ -496,6 +494,9 @@ func (n *Node) mint(ready *raftproto.Ready) {
return
}
}
if !isLeader {
n.mempool.IncreaseChainHeight()
}
block := &pb.Block{
BlockHeader: &pb.BlockHeader{
Version: []byte("1.0.0"),

View File

@ -76,6 +76,7 @@ func (mpi *mempoolImpl) RecvForwardTxs(txSlice *TxSlice) {
// UpdateLeader updates the
func (mpi *mempoolImpl) UpdateLeader(newLeader uint64) {
// TODO (YH): should block until mempool finishing updating the leader info.
mpi.subscribe.updateLeaderC <- newLeader
}

View File

@ -384,10 +384,13 @@ func (mpi *mempoolImpl) constructSameBatch(ready *raftproto.Ready) *mempoolBatch
}
res.missingTxnHashList = missingTxList
res.txList = txList
// store the batch to cache
mpi.txStore.batchedCache[ready.Height] = txList
// store the batch to db
mpi.batchStore(txList)
// persist the correct batch
if len(res.missingTxnHashList) == 0 {
// store the batch to cache
mpi.txStore.batchedCache[ready.Height] = txList
// store the batch to db
mpi.batchStore(txList)
}
return res
}
@ -474,7 +477,8 @@ func (mpi *mempoolImpl) processFetchTxnRequest(fetchTxnRequest *FetchTxnRequest)
if txList, err = mpi.loadTxnFromCache(fetchTxnRequest); err != nil {
if txList, err = mpi.loadTxnFromStorage(fetchTxnRequest); err != nil {
if txList, err = mpi.loadTxnFromLedger(fetchTxnRequest); err != nil {
mpi.logger.Error("Process fetch txn request failed.")
mpi.logger.Errorf("Process fetch txn request [peer: %s, block height: %d] failed",
fetchTxnRequest.ReplicaId, fetchTxnRequest.Height)
return err
}
}
@ -489,7 +493,7 @@ func (mpi *mempoolImpl) processFetchTxnRequest(fetchTxnRequest *FetchTxnRequest)
return err
}
pbMsg := mpi.msgToConsensusPbMsg(resBytes, raftproto.RaftMessage_GET_TX_ACK)
mpi.logger.Debugf("Send fetch transactions response to replica %d", fetchTxnRequest.ReplicaId)
mpi.logger.Debugf("Send fetch missing transactions response to replica %d", fetchTxnRequest.ReplicaId)
mpi.unicast(fetchTxnRequest.ReplicaId, pbMsg)
return nil
}