diff --git a/pkg/order/etcdraft/txpool/txpool.go b/pkg/order/etcdraft/txpool/txpool.go index d307eec..e5da648 100644 --- a/pkg/order/etcdraft/txpool/txpool.go +++ b/pkg/order/etcdraft/txpool/txpool.go @@ -92,9 +92,27 @@ func (tp *TxPool) AddPendingTx(tx *pb.Transaction, isAckTx bool) error { } //add pending tx tp.pushBack(hash, tx, isAckTx) + + //immediately pack if it is greater than the total amount of block transactions + if tp.isExecuting { + tp.packFullBlock() + } return nil } +//packFullBlock immediately pack if it is greater than the total amount of block transactions +func (tp *TxPool) packFullBlock() { + tp.Lock() + defer tp.Unlock() + l := tp.pendingTxs.Len() + if l < tp.config.PackSize { + return + } + if r := tp.ready(tp.config.PackSize); r != nil { + tp.readyC <- r + } +} + //Current txpool's size func (tp *TxPool) PoolSize() int { tp.RLock() @@ -124,7 +142,7 @@ func (tp *TxPool) BuildReqLookUp() { } } -//CheckExecute check the txpool status, only leader node can run Execute() +//CheckExecute checks the txpool status, only leader node can run Execute() func (tp *TxPool) CheckExecute(isLeader bool) { if isLeader { if !tp.isExecuting { @@ -144,7 +162,7 @@ func (tp *TxPool) executeInit() { tp.isExecuting = true tp.pendingTxs.Init() tp.presenceTxs = sync.Map{} - tp.logger.Debugln("start txpool execute") + tp.logger.Infoln("start txpool execute") } //execute schedule to collect txs to the ready channel @@ -156,11 +174,7 @@ func (tp *TxPool) execute() { for { select { case <-ticker.C: - ready := tp.ready() - if ready == nil { - continue - } - tp.readyC <- ready + tp.periodPackBlock() case <-tp.ctx.Done(): tp.isExecuting = false tp.logger.Infoln("done txpool execute") @@ -170,13 +184,12 @@ func (tp *TxPool) execute() { } -//ready pack the block -func (tp *TxPool) ready() *raftproto.Ready { +func (tp *TxPool) periodPackBlock() { tp.Lock() defer tp.Unlock() l := tp.pendingTxs.Len() if l == 0 { - return nil + return } var size int @@ -185,6 +198,13 @@ func (tp *TxPool) ready() *raftproto.Ready { } else { size = l } + if r := tp.ready(size); r != nil { + tp.readyC <- r + } +} + +//ready pack the block +func (tp *TxPool) ready(size int) *raftproto.Ready { hashes := make([]types.Hash, 0, size) for i := 0; i < size; i++ { front := tp.pendingTxs.Front() diff --git a/pkg/order/solo/node.go b/pkg/order/solo/node.go index 3630f14..d70ebec 100644 --- a/pkg/order/solo/node.go +++ b/pkg/order/solo/node.go @@ -47,9 +47,21 @@ func (n *Node) Prepare(tx *pb.Transaction) error { } } n.pushBack(tx) + if n.PoolSize() >= n.packSize { + if r := n.ready(); r != nil { + n.commitC <- r + } + } return nil } +//Current txpool's size +func (n *Node) PoolSize() int { + n.RLock() + defer n.RUnlock() + return n.pendingTxs.Len() +} + func (n *Node) Commit() chan *pb.Block { return n.commitC } @@ -114,38 +126,9 @@ func (n *Node) execute() { for { select { case <-ticker.C: - n.Lock() - l := n.pendingTxs.Len() - if l == 0 { - n.Unlock() - continue + if r := n.ready(); r != nil { + n.commitC <- r } - - var size int - if l > n.packSize { - size = n.packSize - } else { - size = l - } - txs := make([]*pb.Transaction, 0, size) - for i := 0; i < size; i++ { - front := n.pendingTxs.Front() - tx := front.Value.(*pb.Transaction) - txs = append(txs, tx) - n.pendingTxs.Remove(front) - } - n.height++ - n.Unlock() - - block := &pb.Block{ - BlockHeader: &pb.BlockHeader{ - Version: []byte("1.0.0"), - Number: n.height, - Timestamp: time.Now().UnixNano(), - }, - Transactions: txs, - } - n.commitC <- block case <-n.ctx.Done(): n.logger.Infoln("Done txpool execute") return @@ -154,6 +137,39 @@ func (n *Node) execute() { } +func (n *Node) ready() *pb.Block { + n.Lock() + defer n.Unlock() + l := n.pendingTxs.Len() + if l == 0 { + return nil + } + var size int + if l > n.packSize { + size = n.packSize + } else { + size = l + } + txs := make([]*pb.Transaction, 0, size) + for i := 0; i < size; i++ { + front := n.pendingTxs.Front() + tx := front.Value.(*pb.Transaction) + txs = append(txs, tx) + n.pendingTxs.Remove(front) + } + n.height++ + + block := &pb.Block{ + BlockHeader: &pb.BlockHeader{ + Version: []byte("1.0.0"), + Number: n.height, + Timestamp: time.Now().UnixNano(), + }, + Transactions: txs, + } + return block +} + func (n *Node) pushBack(value interface{}) *list.Element { n.Lock() defer n.Unlock()