perf(*): add localList to mark the txs received from api to reduce the number of signature verifications

This commit is contained in:
Lizen0512 2020-12-29 20:16:25 +08:00
parent 2985baf07c
commit 2d3e749dff
13 changed files with 89 additions and 45 deletions

2
go.mod
View File

@ -27,7 +27,7 @@ require (
github.com/magiconair/properties v1.8.4
github.com/meshplus/bitxhub-core v0.1.0-rc1.0.20201125025329-ac1187099a88
github.com/meshplus/bitxhub-kit v1.1.2-0.20201203072410-8a0383a6870d
github.com/meshplus/bitxhub-model v1.1.2-0.20201221070800-ca8184215353
github.com/meshplus/bitxhub-model v1.1.2-0.20201229110212-37dd343b4c76
github.com/meshplus/go-lightp2p v0.0.0-20201203044909-e09b34cd93ab
github.com/mitchellh/go-homedir v1.1.0
github.com/multiformats/go-multiaddr v0.2.2

5
go.sum
View File

@ -615,6 +615,11 @@ github.com/meshplus/bitxhub-model v1.1.2-0.20201221062626-1774aad0f842 h1:THnvVa
github.com/meshplus/bitxhub-model v1.1.2-0.20201221062626-1774aad0f842/go.mod h1:HpL39K8fuh3nDu+HuprYuDcS/C3eFOQBvFkd1bPViwA=
github.com/meshplus/bitxhub-model v1.1.2-0.20201221070800-ca8184215353 h1:7wkdviM8ssujL6pQHjHRGo9k+IOioBviaRKySjd0lzQ=
github.com/meshplus/bitxhub-model v1.1.2-0.20201221070800-ca8184215353/go.mod h1:HpL39K8fuh3nDu+HuprYuDcS/C3eFOQBvFkd1bPViwA=
github.com/meshplus/bitxhub-model v1.1.2-0.20201229073026-1b346f4d70af/go.mod h1:HpL39K8fuh3nDu+HuprYuDcS/C3eFOQBvFkd1bPViwA=
github.com/meshplus/bitxhub-model v1.1.2-0.20201229075949-5d556a5252e4 h1:C/NNvJOfnMZdoZT8ijgrMD+XW0P5i//MLocPlOpl/Xo=
github.com/meshplus/bitxhub-model v1.1.2-0.20201229075949-5d556a5252e4/go.mod h1:HpL39K8fuh3nDu+HuprYuDcS/C3eFOQBvFkd1bPViwA=
github.com/meshplus/bitxhub-model v1.1.2-0.20201229110212-37dd343b4c76 h1:3EndfTR7Zb1RbqjLNavrQ9BzjNz2WFWak4l+muq3klA=
github.com/meshplus/bitxhub-model v1.1.2-0.20201229110212-37dd343b4c76/go.mod h1:x3H+TL24wcByzHegenLfs+5PQkQGNsk8eCm31QJMa+Q=
github.com/meshplus/go-lightp2p v0.0.0-20201203044909-e09b34cd93ab h1:JclTakVV0dcXxl/dScmN77htnYe3n19hh7m2eMk9Abs=
github.com/meshplus/go-lightp2p v0.0.0-20201203044909-e09b34cd93ab/go.mod h1:L3pEzDMouz+xcIVwG2fj+mAsM95GAkzoo7cEd2CzmCQ=
github.com/mgutz/ansi v0.0.0-20170206155736-9520e82c474b/go.mod h1:01TrycV0kFyexm33Z7vhZRXopbI8J3TDReVlkTgMUxE=

View File

@ -11,12 +11,12 @@ func (bxh *BitXHub) start() {
go func() {
for {
select {
case block := <-bxh.Order.Commit():
case commitEvent := <-bxh.Order.Commit():
bxh.logger.WithFields(logrus.Fields{
"height": block.BlockHeader.Number,
"count": len(block.Transactions),
"height": commitEvent.Block.BlockHeader.Number,
"count": len(commitEvent.Block.Transactions),
}).Info("Generated block")
bxh.BlockExecutor.ExecuteBlock(block)
bxh.BlockExecutor.ExecuteBlock(commitEvent)
case <-bxh.Ctx.Done():
return
}

View File

@ -33,7 +33,7 @@ type BlockExecutor struct {
ledger ledger.Ledger
logger logrus.FieldLogger
blockC chan *pb.Block
preBlockC chan *pb.Block
preBlockC chan *pb.CommitEvent
persistC chan *ledger.BlockData
ibtpVerify proof.Verify
validationEngine validator.Engine
@ -63,7 +63,7 @@ func New(chainLedger ledger.Ledger, logger logrus.FieldLogger, typ string) (*Blo
ctx: ctx,
cancel: cancel,
blockC: make(chan *pb.Block, blockChanNumber),
preBlockC: make(chan *pb.Block, blockChanNumber),
preBlockC: make(chan *pb.CommitEvent, blockChanNumber),
persistC: make(chan *ledger.BlockData, persistChanNumber),
ibtpVerify: ibtpVerify,
validationEngine: ibtpVerify.ValidationEngine(),
@ -102,7 +102,7 @@ func (exec *BlockExecutor) Stop() error {
}
// ExecuteBlock executes block from order
func (exec *BlockExecutor) ExecuteBlock(block *pb.Block) {
func (exec *BlockExecutor) ExecuteBlock(block *pb.CommitEvent) {
exec.preBlockC <- block
}

View File

@ -159,10 +159,10 @@ func TestBlockExecutor_ExecuteBlock(t *testing.T) {
go listenBlock(&wg, done, ch)
// send blocks to executor
block1 := mockBlock(uint64(1), nil)
block2 := mockBlock(uint64(2), txs)
exec.ExecuteBlock(block1)
exec.ExecuteBlock(block2)
commitEvent1 := mockCommitEvent(uint64(1), nil)
commitEvent2 := mockCommitEvent(uint64(2), txs)
exec.ExecuteBlock(commitEvent1)
exec.ExecuteBlock(commitEvent2)
wg.Wait()
done <- true
@ -229,6 +229,18 @@ func listenBlock(wg *sync.WaitGroup, done chan bool, blockCh chan events.Execute
}
}
func mockCommitEvent(blockNumber uint64, txs []*pb.Transaction) *pb.CommitEvent {
block := mockBlock(blockNumber, txs)
localList := make([]bool, len(block.Transactions))
for i := 0; i < len(block.Transactions); i++ {
localList[i] = false
}
return &pb.CommitEvent{
Block: block,
LocalList: localList,
}
}
func mockBlock(blockNumber uint64, txs []*pb.Transaction) *pb.Block {
header := &pb.BlockHeader{
Number: blockNumber,
@ -295,7 +307,8 @@ func TestBlockExecutor_ExecuteBlock_Transfer(t *testing.T) {
txs = append(txs, mockTransferTx(t))
txs = append(txs, mockTransferTx(t))
txs = append(txs, mockTransferTx(t))
executor.ExecuteBlock(mockBlock(2, txs))
commitEvent := mockCommitEvent(2, txs)
executor.ExecuteBlock(commitEvent)
require.Nil(t, err)
block := <-ch

View File

@ -96,15 +96,15 @@ func (exec *BlockExecutor) processExecuteEvent(block *pb.Block) *ledger.BlockDat
func (exec *BlockExecutor) listenPreExecuteEvent() {
for {
select {
case block := <-exec.preBlockC:
case commitEvent := <-exec.preBlockC:
now := time.Now()
block = exec.verifySign(block)
commitEvent.Block = exec.verifySign(commitEvent)
exec.logger.WithFields(logrus.Fields{
"height": block.BlockHeader.Number,
"count": len(block.Transactions),
"height": commitEvent.Block.BlockHeader.Number,
"count": len(commitEvent.Block.Transactions),
"elapse": time.Since(now),
}).Debug("Verified signature")
exec.blockC <- block
exec.blockC <- commitEvent.Block
case <-exec.ctx.Done():
return
}
@ -178,9 +178,9 @@ func (exec *BlockExecutor) buildTxMerkleTree(txs []*pb.Transaction) (*types.Hash
return root, l2Roots, nil
}
func (exec *BlockExecutor) verifySign(block *pb.Block) *pb.Block {
if block.BlockHeader.Number == 1 {
return block
func (exec *BlockExecutor) verifySign(commitEvent *pb.CommitEvent) *pb.Block {
if commitEvent.Block.BlockHeader.Number == 1 {
return commitEvent.Block
}
var (
@ -188,10 +188,15 @@ func (exec *BlockExecutor) verifySign(block *pb.Block) *pb.Block {
mutex sync.Mutex
index []int
)
txs := block.Transactions
txs := commitEvent.Block.Transactions
txsLen := len(commitEvent.LocalList)
wg.Add(len(txs))
for i, tx := range txs {
// if the tx is received from api, we will pass the verify.
if txsLen > i && commitEvent.LocalList[i] {
wg.Done()
continue
}
go func(i int, tx *pb.Transaction) {
defer wg.Done()
ok, _ := asym.Verify(crypto.Secp256k1, tx.Signature, tx.SignHash().Bytes(), *tx.From)
@ -209,10 +214,10 @@ func (exec *BlockExecutor) verifySign(block *pb.Block) *pb.Block {
for _, idx := range index {
txs = append(txs[:idx], txs[idx+1:]...)
}
block.Transactions = txs
commitEvent.Block.Transactions = txs
}
return block
return commitEvent.Block
}
func (exec *BlockExecutor) applyTx(index int, tx *pb.Transaction, opt *agency.TxOpt) *pb.Receipt {

View File

@ -14,7 +14,7 @@ type Executor interface {
Stop() error
// ExecutorBlock
ExecuteBlock(*pb.Block)
ExecuteBlock(commitEvent *pb.CommitEvent)
// ApplyReadonlyTransactions execute readonly tx
ApplyReadonlyTransactions(txs []*pb.Transaction) []*pb.Receipt

View File

@ -40,7 +40,7 @@ type Node struct {
proposeC chan *raftproto.RequestBatch // proposed ready, input channel
confChangeC <-chan raftpb.ConfChange // proposed cluster config changes
commitC chan *pb.Block // the hash commit channel
commitC chan *pb.CommitEvent // the hash commit channel
errorC chan<- error // errors from raft session
tickTimeout time.Duration // tick timeout
msgC chan []byte // receive messages from remote peer
@ -116,7 +116,7 @@ func NewNode(opts ...order.Option) (order.Order, error) {
id: config.ID,
lastExec: config.Applied,
confChangeC: make(chan raftpb.ConfChange),
commitC: make(chan *pb.Block, 1024),
commitC: make(chan *pb.CommitEvent, 1024),
errorC: make(chan<- error),
msgC: make(chan []byte),
stateC: make(chan *mempool.ChainState),
@ -179,7 +179,7 @@ func (n *Node) Prepare(tx *pb.Transaction) error {
return nil
}
func (n *Node) Commit() chan *pb.Block {
func (n *Node) Commit() chan *pb.CommitEvent {
return n.commitC
}
@ -470,7 +470,16 @@ func (n *Node) mint(requestBatch *raftproto.RequestBatch) {
},
Transactions: requestBatch.TxList,
}
n.commitC <- block
// TODO (YH): refactor localLost
localList := make([]bool, len(requestBatch.TxList))
for i := 0; i < len(requestBatch.TxList); i++ {
localList[i] = false
}
executeEvent := &pb.CommitEvent{
Block: block,
LocalList: localList,
}
n.commitC <- executeEvent
}
//Determines whether the current apply index triggers a snapshot

View File

@ -80,9 +80,9 @@ func TestNode_Start(t *testing.T) {
err = order.Prepare(tx)
require.Nil(t, err)
block := <-order.Commit()
require.Equal(t, uint64(2), block.BlockHeader.Number)
require.Equal(t, 1, len(block.Transactions))
commitEvent := <-order.Commit()
require.Equal(t, uint64(2), commitEvent.Block.BlockHeader.Number)
require.Equal(t, 1, len(commitEvent.Block.Transactions))
order.Stop()
}
@ -137,9 +137,9 @@ func TestMulti_Node_Start(t *testing.T) {
err = orders[0].Prepare(tx)
require.Nil(t, err)
for i := 0; i < len(orders); i++ {
block := <-orders[i].Commit()
require.Equal(t, uint64(2), block.BlockHeader.Number)
require.Equal(t, 1, len(block.Transactions))
commitEvent := <-orders[i].Commit()
require.Equal(t, uint64(2), commitEvent.Block.BlockHeader.Number)
require.Equal(t, 1, len(commitEvent.Block.Transactions))
}
}

View File

@ -17,7 +17,7 @@ type Order interface {
Prepare(tx *pb.Transaction) error
// Commit recv blocks form Order and commit it by order
Commit() chan *pb.Block
Commit() chan *pb.CommitEvent
// Step send msg to the consensus engine
Step(msg []byte) error

View File

@ -18,7 +18,7 @@ import (
type Node struct {
ID uint64
commitC chan *pb.Block // block channel
commitC chan *pb.CommitEvent // block channel
logger logrus.FieldLogger // logger
mempool mempool.MemPool // transaction pool
proposeC chan *raftproto.RequestBatch // proposed listenReadyBlock, input channel
@ -61,7 +61,7 @@ func (n *Node) Prepare(tx *pb.Transaction) error {
return nil
}
func (n *Node) Commit() chan *pb.Block {
func (n *Node) Commit() chan *pb.CommitEvent {
return n.commitC
}
@ -113,7 +113,7 @@ func NewNode(opts ...order.Option) (order.Order, error) {
batchTimerMgr := etcdraft.NewTimer(batchTimeout, config.Logger)
soloNode := &Node{
ID: config.ID,
commitC: make(chan *pb.Block, 1024),
commitC: make(chan *pb.CommitEvent, 1024),
stateC: make(chan *mempool.ChainState),
lastExec: config.Applied,
mempool: mempoolInst,
@ -154,7 +154,15 @@ func (n *Node) listenReadyBlock() {
},
Transactions: proposal.TxList,
}
n.commitC <- block
localList := make([]bool, len(proposal.TxList))
for i := 0; i < len(proposal.TxList); i++ {
localList[i] = true
}
executeEvent := &pb.CommitEvent{
Block: block,
LocalList: localList,
}
n.commitC <- executeEvent
n.lastExec++
}
}

View File

@ -86,12 +86,12 @@ func TestNode_Start(t *testing.T) {
err = order.Prepare(tx)
require.Nil(t, err)
block := <-order.Commit()
require.Equal(t, uint64(2), block.BlockHeader.Number)
require.Equal(t, 1, len(block.Transactions))
commitEvent := <-order.Commit()
require.Equal(t, uint64(2), commitEvent.Block.BlockHeader.Number)
require.Equal(t, 1, len(commitEvent.Block.Transactions))
txHashList := make([]*types.Hash, 0)
txHashList = append(txHashList, tx.TransactionHash)
order.ReportState(block.Height(), block.BlockHash, txHashList)
order.ReportState(commitEvent.Block.Height(), commitEvent.Block.BlockHash, txHashList)
order.Stop()
}

View File

@ -20,6 +20,10 @@ type MockPeerManager struct {
recorder *MockPeerManagerMockRecorder
}
func (m *MockPeerManager) CountConnectedPeers() uint64 {
return 0
}
// MockPeerManagerMockRecorder is the mock recorder for MockPeerManager
type MockPeerManagerMockRecorder struct {
mock *MockPeerManager