refactor(executor): rename NewBlockEvent to ExecutedEvent and add TxHashList

This commit is contained in:
zhourong 2020-12-17 18:50:06 +08:00
parent 876a5af0dc
commit af93ad5a74
10 changed files with 29 additions and 16 deletions

View File

@ -35,7 +35,7 @@ func (cbs *ChainBrokerService) Subscribe(req *pb.SubscriptionRequest, server pb.
}
func (cbs *ChainBrokerService) handleNewBlockSubscription(server pb.ChainBroker_SubscribeServer) error {
blockCh := make(chan events.NewBlockEvent)
blockCh := make(chan events.ExecutedEvent)
sub := cbs.api.Feed().SubscribeNewBlockEvent(blockCh)
defer sub.Unsubscribe()
@ -57,7 +57,7 @@ func (cbs *ChainBrokerService) handleNewBlockSubscription(server pb.ChainBroker_
}
func (cbs *ChainBrokerService) handleBlockHeaderSubscription(server pb.ChainBroker_SubscribeServer) error {
blockCh := make(chan events.NewBlockEvent)
blockCh := make(chan events.ExecutedEvent)
sub := cbs.api.Feed().SubscribeNewBlockEvent(blockCh)
defer sub.Unsubscribe()
@ -79,7 +79,7 @@ func (cbs *ChainBrokerService) handleBlockHeaderSubscription(server pb.ChainBrok
}
func (cbs *ChainBrokerService) handleInterchainTxSubscription(server pb.ChainBroker_SubscribeServer) error {
blockCh := make(chan events.NewBlockEvent)
blockCh := make(chan events.ExecutedEvent)
sub := cbs.api.Feed().SubscribeNewBlockEvent(blockCh)
defer sub.Unsubscribe()

View File

@ -27,7 +27,7 @@ func (bxh *BitXHub) start() {
}
func (bxh *BitXHub) listenEvent() {
blockCh := make(chan events.NewBlockEvent)
blockCh := make(chan events.ExecutedEvent)
orderMsgCh := make(chan events.OrderMessageEvent)
blockSub := bxh.BlockExecutor.SubscribeBlockEvent(blockCh)
orderMsgSub := bxh.PeerMgr.SubscribeOrderMessage(orderMsgCh)

View File

@ -56,7 +56,7 @@ type ChainAPI interface {
}
type FeedAPI interface {
SubscribeNewBlockEvent(chan<- events.NewBlockEvent) event.Subscription
SubscribeNewBlockEvent(chan<- events.ExecutedEvent) event.Subscription
}
type AccountAPI interface {

View File

@ -10,6 +10,6 @@ type FeedAPI CoreAPI
var _ api.FeedAPI = (*FeedAPI)(nil)
func (api *FeedAPI) SubscribeNewBlockEvent(ch chan<- events.NewBlockEvent) event.Subscription {
func (api *FeedAPI) SubscribeNewBlockEvent(ch chan<- events.ExecutedEvent) event.Subscription {
return api.bxh.BlockExecutor.SubscribeBlockEvent(ch)
}

View File

@ -106,8 +106,8 @@ func (exec *BlockExecutor) ExecuteBlock(block *pb.Block) {
exec.preBlockC <- block
}
// SubscribeBlockEvent registers a subscription of NewBlockEvent.
func (exec *BlockExecutor) SubscribeBlockEvent(ch chan<- events.NewBlockEvent) event.Subscription {
// SubscribeBlockEvent registers a subscription of ExecutedEvent.
func (exec *BlockExecutor) SubscribeBlockEvent(ch chan<- events.ExecutedEvent) event.Subscription {
return exec.blockFeed.Subscribe(ch)
}
@ -208,7 +208,7 @@ func (exec *BlockExecutor) persistData() {
for data := range exec.persistC {
now := time.Now()
exec.ledger.PersistBlockData(data)
exec.postBlockEvent(data.Block, data.InterchainMeta)
exec.postBlockEvent(data.Block, data.InterchainMeta, data.TxHashList)
exec.logger.WithFields(logrus.Fields{
"height": data.Block.BlockHeader.Number,
"hash": data.Block.BlockHash.String(),

View File

@ -149,7 +149,7 @@ func TestBlockExecutor_ExecuteBlock(t *testing.T) {
assert.Nil(t, exec.Start())
done := make(chan bool)
ch := make(chan events.NewBlockEvent)
ch := make(chan events.ExecutedEvent)
blockSub := exec.SubscribeBlockEvent(ch)
defer blockSub.Unsubscribe()
@ -218,7 +218,7 @@ func TestBlockExecutor_ApplyReadonlyTransactions(t *testing.T) {
assert.Equal(t, pb.Receipt_SUCCESS, receipts[0].Status)
}
func listenBlock(wg *sync.WaitGroup, done chan bool, blockCh chan events.NewBlockEvent) {
func listenBlock(wg *sync.WaitGroup, done chan bool, blockCh chan events.ExecutedEvent) {
for {
select {
case <-blockCh:
@ -287,7 +287,7 @@ func TestBlockExecutor_ExecuteBlock_Transfer(t *testing.T) {
err = executor.Start()
require.Nil(t, err)
ch := make(chan events.NewBlockEvent)
ch := make(chan events.ExecutedEvent)
sub := executor.SubscribeBlockEvent(ch)
defer sub.Unsubscribe()

View File

@ -25,6 +25,12 @@ import (
func (exec *BlockExecutor) processExecuteEvent(block *pb.Block) *ledger.BlockData {
current := time.Now()
var txHashList []*types.Hash
for _, tx := range block.Transactions {
txHashList = append(txHashList, tx.TransactionHash)
}
block = exec.verifyProofs(block)
receipts := exec.txsExecutor.ApplyTransactions(block.Transactions)
@ -83,6 +89,7 @@ func (exec *BlockExecutor) processExecuteEvent(block *pb.Block) *ledger.BlockDat
Accounts: accounts,
Journal: journal,
InterchainMeta: interchainMeta,
TxHashList: txHashList,
}
}
@ -250,8 +257,12 @@ func (exec *BlockExecutor) applyTx(index int, tx *pb.Transaction, opt *agency.Tx
return receipt
}
func (exec *BlockExecutor) postBlockEvent(block *pb.Block, interchainMeta *pb.InterchainMeta) {
go exec.blockFeed.Send(events.NewBlockEvent{Block: block, InterchainMeta: interchainMeta})
func (exec *BlockExecutor) postBlockEvent(block *pb.Block, interchainMeta *pb.InterchainMeta, txHashList []*types.Hash) {
go exec.blockFeed.Send(events.ExecutedEvent{
Block: block,
InterchainMeta: interchainMeta,
TxHashList: txHashList,
})
}
func (exec *BlockExecutor) applyTransaction(i int, tx *pb.Transaction, opt *agency.TxOpt) ([]byte, error) {

View File

@ -20,5 +20,5 @@ type Executor interface {
ApplyReadonlyTransactions(txs []*pb.Transaction) []*pb.Receipt
// SubscribeBlockEvent
SubscribeBlockEvent(chan<- events.NewBlockEvent) event.Subscription
SubscribeBlockEvent(chan<- events.ExecutedEvent) event.Subscription
}

View File

@ -48,6 +48,7 @@ type BlockData struct {
Accounts map[string]*Account
Journal *BlockJournal
InterchainMeta *pb.InterchainMeta
TxHashList []*types.Hash
}
// New create a new ledger instance

View File

@ -5,9 +5,10 @@ import (
"github.com/meshplus/bitxhub-model/pb"
)
type NewBlockEvent struct {
type ExecutedEvent struct {
Block *pb.Block
InterchainMeta *pb.InterchainMeta
TxHashList []*types.Hash
}
type CheckpointEvent struct {