Merge pull request #118 from meshplus/fix/fix-pack-when-the-number-of-block-txs-is-full

fix(order):fix pack when the number of block txs is full
This commit is contained in:
jzhe 2020-07-17 19:16:20 +08:00 committed by GitHub
commit 1a46725b54
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 77 additions and 41 deletions

View File

@ -92,9 +92,27 @@ func (tp *TxPool) AddPendingTx(tx *pb.Transaction, isAckTx bool) error {
}
//add pending tx
tp.pushBack(hash, tx, isAckTx)
//immediately pack if it is greater than the total amount of block transactions
if tp.isExecuting {
tp.packFullBlock()
}
return nil
}
//packFullBlock immediately pack if it is greater than the total amount of block transactions
func (tp *TxPool) packFullBlock() {
tp.Lock()
defer tp.Unlock()
l := tp.pendingTxs.Len()
if l < tp.config.PackSize {
return
}
if r := tp.ready(tp.config.PackSize); r != nil {
tp.readyC <- r
}
}
//Current txpool's size
func (tp *TxPool) PoolSize() int {
tp.RLock()
@ -124,7 +142,7 @@ func (tp *TxPool) BuildReqLookUp() {
}
}
//CheckExecute check the txpool status, only leader node can run Execute()
//CheckExecute checks the txpool status, only leader node can run Execute()
func (tp *TxPool) CheckExecute(isLeader bool) {
if isLeader {
if !tp.isExecuting {
@ -144,7 +162,7 @@ func (tp *TxPool) executeInit() {
tp.isExecuting = true
tp.pendingTxs.Init()
tp.presenceTxs = sync.Map{}
tp.logger.Debugln("start txpool execute")
tp.logger.Infoln("start txpool execute")
}
//execute schedule to collect txs to the ready channel
@ -156,11 +174,7 @@ func (tp *TxPool) execute() {
for {
select {
case <-ticker.C:
ready := tp.ready()
if ready == nil {
continue
}
tp.readyC <- ready
tp.periodPackBlock()
case <-tp.ctx.Done():
tp.isExecuting = false
tp.logger.Infoln("done txpool execute")
@ -170,13 +184,12 @@ func (tp *TxPool) execute() {
}
//ready pack the block
func (tp *TxPool) ready() *raftproto.Ready {
func (tp *TxPool) periodPackBlock() {
tp.Lock()
defer tp.Unlock()
l := tp.pendingTxs.Len()
if l == 0 {
return nil
return
}
var size int
@ -185,6 +198,13 @@ func (tp *TxPool) ready() *raftproto.Ready {
} else {
size = l
}
if r := tp.ready(size); r != nil {
tp.readyC <- r
}
}
//ready pack the block
func (tp *TxPool) ready(size int) *raftproto.Ready {
hashes := make([]types.Hash, 0, size)
for i := 0; i < size; i++ {
front := tp.pendingTxs.Front()

View File

@ -47,9 +47,21 @@ func (n *Node) Prepare(tx *pb.Transaction) error {
}
}
n.pushBack(tx)
if n.PoolSize() >= n.packSize {
if r := n.ready(); r != nil {
n.commitC <- r
}
}
return nil
}
//Current txpool's size
func (n *Node) PoolSize() int {
n.RLock()
defer n.RUnlock()
return n.pendingTxs.Len()
}
func (n *Node) Commit() chan *pb.Block {
return n.commitC
}
@ -114,38 +126,9 @@ func (n *Node) execute() {
for {
select {
case <-ticker.C:
n.Lock()
l := n.pendingTxs.Len()
if l == 0 {
n.Unlock()
continue
if r := n.ready(); r != nil {
n.commitC <- r
}
var size int
if l > n.packSize {
size = n.packSize
} else {
size = l
}
txs := make([]*pb.Transaction, 0, size)
for i := 0; i < size; i++ {
front := n.pendingTxs.Front()
tx := front.Value.(*pb.Transaction)
txs = append(txs, tx)
n.pendingTxs.Remove(front)
}
n.height++
n.Unlock()
block := &pb.Block{
BlockHeader: &pb.BlockHeader{
Version: []byte("1.0.0"),
Number: n.height,
Timestamp: time.Now().UnixNano(),
},
Transactions: txs,
}
n.commitC <- block
case <-n.ctx.Done():
n.logger.Infoln("Done txpool execute")
return
@ -154,6 +137,39 @@ func (n *Node) execute() {
}
func (n *Node) ready() *pb.Block {
n.Lock()
defer n.Unlock()
l := n.pendingTxs.Len()
if l == 0 {
return nil
}
var size int
if l > n.packSize {
size = n.packSize
} else {
size = l
}
txs := make([]*pb.Transaction, 0, size)
for i := 0; i < size; i++ {
front := n.pendingTxs.Front()
tx := front.Value.(*pb.Transaction)
txs = append(txs, tx)
n.pendingTxs.Remove(front)
}
n.height++
block := &pb.Block{
BlockHeader: &pb.BlockHeader{
Version: []byte("1.0.0"),
Number: n.height,
Timestamp: time.Now().UnixNano(),
},
Transactions: txs,
}
return block
}
func (n *Node) pushBack(value interface{}) *list.Element {
n.Lock()
defer n.Unlock()