Merge pull request #289 from meshplus/refactor/add-executed-txs
refactor(executor): rename NewBlockEvent to ExecutedEvent and add TxHashList
This commit is contained in:
commit
d9ad9bf0d1
|
@ -35,7 +35,7 @@ func (cbs *ChainBrokerService) Subscribe(req *pb.SubscriptionRequest, server pb.
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cbs *ChainBrokerService) handleNewBlockSubscription(server pb.ChainBroker_SubscribeServer) error {
|
func (cbs *ChainBrokerService) handleNewBlockSubscription(server pb.ChainBroker_SubscribeServer) error {
|
||||||
blockCh := make(chan events.NewBlockEvent)
|
blockCh := make(chan events.ExecutedEvent)
|
||||||
sub := cbs.api.Feed().SubscribeNewBlockEvent(blockCh)
|
sub := cbs.api.Feed().SubscribeNewBlockEvent(blockCh)
|
||||||
defer sub.Unsubscribe()
|
defer sub.Unsubscribe()
|
||||||
|
|
||||||
|
@ -57,7 +57,7 @@ func (cbs *ChainBrokerService) handleNewBlockSubscription(server pb.ChainBroker_
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cbs *ChainBrokerService) handleBlockHeaderSubscription(server pb.ChainBroker_SubscribeServer) error {
|
func (cbs *ChainBrokerService) handleBlockHeaderSubscription(server pb.ChainBroker_SubscribeServer) error {
|
||||||
blockCh := make(chan events.NewBlockEvent)
|
blockCh := make(chan events.ExecutedEvent)
|
||||||
sub := cbs.api.Feed().SubscribeNewBlockEvent(blockCh)
|
sub := cbs.api.Feed().SubscribeNewBlockEvent(blockCh)
|
||||||
defer sub.Unsubscribe()
|
defer sub.Unsubscribe()
|
||||||
|
|
||||||
|
@ -79,7 +79,7 @@ func (cbs *ChainBrokerService) handleBlockHeaderSubscription(server pb.ChainBrok
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cbs *ChainBrokerService) handleInterchainTxSubscription(server pb.ChainBroker_SubscribeServer) error {
|
func (cbs *ChainBrokerService) handleInterchainTxSubscription(server pb.ChainBroker_SubscribeServer) error {
|
||||||
blockCh := make(chan events.NewBlockEvent)
|
blockCh := make(chan events.ExecutedEvent)
|
||||||
sub := cbs.api.Feed().SubscribeNewBlockEvent(blockCh)
|
sub := cbs.api.Feed().SubscribeNewBlockEvent(blockCh)
|
||||||
defer sub.Unsubscribe()
|
defer sub.Unsubscribe()
|
||||||
|
|
||||||
|
|
|
@ -27,7 +27,7 @@ func (bxh *BitXHub) start() {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (bxh *BitXHub) listenEvent() {
|
func (bxh *BitXHub) listenEvent() {
|
||||||
blockCh := make(chan events.NewBlockEvent)
|
blockCh := make(chan events.ExecutedEvent)
|
||||||
orderMsgCh := make(chan events.OrderMessageEvent)
|
orderMsgCh := make(chan events.OrderMessageEvent)
|
||||||
blockSub := bxh.BlockExecutor.SubscribeBlockEvent(blockCh)
|
blockSub := bxh.BlockExecutor.SubscribeBlockEvent(blockCh)
|
||||||
orderMsgSub := bxh.PeerMgr.SubscribeOrderMessage(orderMsgCh)
|
orderMsgSub := bxh.PeerMgr.SubscribeOrderMessage(orderMsgCh)
|
||||||
|
|
|
@ -56,7 +56,7 @@ type ChainAPI interface {
|
||||||
}
|
}
|
||||||
|
|
||||||
type FeedAPI interface {
|
type FeedAPI interface {
|
||||||
SubscribeNewBlockEvent(chan<- events.NewBlockEvent) event.Subscription
|
SubscribeNewBlockEvent(chan<- events.ExecutedEvent) event.Subscription
|
||||||
}
|
}
|
||||||
|
|
||||||
type AccountAPI interface {
|
type AccountAPI interface {
|
||||||
|
|
|
@ -10,6 +10,6 @@ type FeedAPI CoreAPI
|
||||||
|
|
||||||
var _ api.FeedAPI = (*FeedAPI)(nil)
|
var _ api.FeedAPI = (*FeedAPI)(nil)
|
||||||
|
|
||||||
func (api *FeedAPI) SubscribeNewBlockEvent(ch chan<- events.NewBlockEvent) event.Subscription {
|
func (api *FeedAPI) SubscribeNewBlockEvent(ch chan<- events.ExecutedEvent) event.Subscription {
|
||||||
return api.bxh.BlockExecutor.SubscribeBlockEvent(ch)
|
return api.bxh.BlockExecutor.SubscribeBlockEvent(ch)
|
||||||
}
|
}
|
||||||
|
|
|
@ -106,8 +106,8 @@ func (exec *BlockExecutor) ExecuteBlock(block *pb.Block) {
|
||||||
exec.preBlockC <- block
|
exec.preBlockC <- block
|
||||||
}
|
}
|
||||||
|
|
||||||
// SubscribeBlockEvent registers a subscription of NewBlockEvent.
|
// SubscribeBlockEvent registers a subscription of ExecutedEvent.
|
||||||
func (exec *BlockExecutor) SubscribeBlockEvent(ch chan<- events.NewBlockEvent) event.Subscription {
|
func (exec *BlockExecutor) SubscribeBlockEvent(ch chan<- events.ExecutedEvent) event.Subscription {
|
||||||
return exec.blockFeed.Subscribe(ch)
|
return exec.blockFeed.Subscribe(ch)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -208,7 +208,7 @@ func (exec *BlockExecutor) persistData() {
|
||||||
for data := range exec.persistC {
|
for data := range exec.persistC {
|
||||||
now := time.Now()
|
now := time.Now()
|
||||||
exec.ledger.PersistBlockData(data)
|
exec.ledger.PersistBlockData(data)
|
||||||
exec.postBlockEvent(data.Block, data.InterchainMeta)
|
exec.postBlockEvent(data.Block, data.InterchainMeta, data.TxHashList)
|
||||||
exec.logger.WithFields(logrus.Fields{
|
exec.logger.WithFields(logrus.Fields{
|
||||||
"height": data.Block.BlockHeader.Number,
|
"height": data.Block.BlockHeader.Number,
|
||||||
"hash": data.Block.BlockHash.String(),
|
"hash": data.Block.BlockHash.String(),
|
||||||
|
|
|
@ -149,7 +149,7 @@ func TestBlockExecutor_ExecuteBlock(t *testing.T) {
|
||||||
assert.Nil(t, exec.Start())
|
assert.Nil(t, exec.Start())
|
||||||
|
|
||||||
done := make(chan bool)
|
done := make(chan bool)
|
||||||
ch := make(chan events.NewBlockEvent)
|
ch := make(chan events.ExecutedEvent)
|
||||||
blockSub := exec.SubscribeBlockEvent(ch)
|
blockSub := exec.SubscribeBlockEvent(ch)
|
||||||
defer blockSub.Unsubscribe()
|
defer blockSub.Unsubscribe()
|
||||||
|
|
||||||
|
@ -218,7 +218,7 @@ func TestBlockExecutor_ApplyReadonlyTransactions(t *testing.T) {
|
||||||
assert.Equal(t, pb.Receipt_SUCCESS, receipts[0].Status)
|
assert.Equal(t, pb.Receipt_SUCCESS, receipts[0].Status)
|
||||||
}
|
}
|
||||||
|
|
||||||
func listenBlock(wg *sync.WaitGroup, done chan bool, blockCh chan events.NewBlockEvent) {
|
func listenBlock(wg *sync.WaitGroup, done chan bool, blockCh chan events.ExecutedEvent) {
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-blockCh:
|
case <-blockCh:
|
||||||
|
@ -287,7 +287,7 @@ func TestBlockExecutor_ExecuteBlock_Transfer(t *testing.T) {
|
||||||
err = executor.Start()
|
err = executor.Start()
|
||||||
require.Nil(t, err)
|
require.Nil(t, err)
|
||||||
|
|
||||||
ch := make(chan events.NewBlockEvent)
|
ch := make(chan events.ExecutedEvent)
|
||||||
sub := executor.SubscribeBlockEvent(ch)
|
sub := executor.SubscribeBlockEvent(ch)
|
||||||
defer sub.Unsubscribe()
|
defer sub.Unsubscribe()
|
||||||
|
|
||||||
|
|
|
@ -25,6 +25,12 @@ import (
|
||||||
|
|
||||||
func (exec *BlockExecutor) processExecuteEvent(block *pb.Block) *ledger.BlockData {
|
func (exec *BlockExecutor) processExecuteEvent(block *pb.Block) *ledger.BlockData {
|
||||||
current := time.Now()
|
current := time.Now()
|
||||||
|
var txHashList []*types.Hash
|
||||||
|
|
||||||
|
for _, tx := range block.Transactions {
|
||||||
|
txHashList = append(txHashList, tx.TransactionHash)
|
||||||
|
}
|
||||||
|
|
||||||
block = exec.verifyProofs(block)
|
block = exec.verifyProofs(block)
|
||||||
receipts := exec.txsExecutor.ApplyTransactions(block.Transactions)
|
receipts := exec.txsExecutor.ApplyTransactions(block.Transactions)
|
||||||
|
|
||||||
|
@ -83,6 +89,7 @@ func (exec *BlockExecutor) processExecuteEvent(block *pb.Block) *ledger.BlockDat
|
||||||
Accounts: accounts,
|
Accounts: accounts,
|
||||||
Journal: journal,
|
Journal: journal,
|
||||||
InterchainMeta: interchainMeta,
|
InterchainMeta: interchainMeta,
|
||||||
|
TxHashList: txHashList,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -250,8 +257,12 @@ func (exec *BlockExecutor) applyTx(index int, tx *pb.Transaction, opt *agency.Tx
|
||||||
return receipt
|
return receipt
|
||||||
}
|
}
|
||||||
|
|
||||||
func (exec *BlockExecutor) postBlockEvent(block *pb.Block, interchainMeta *pb.InterchainMeta) {
|
func (exec *BlockExecutor) postBlockEvent(block *pb.Block, interchainMeta *pb.InterchainMeta, txHashList []*types.Hash) {
|
||||||
go exec.blockFeed.Send(events.NewBlockEvent{Block: block, InterchainMeta: interchainMeta})
|
go exec.blockFeed.Send(events.ExecutedEvent{
|
||||||
|
Block: block,
|
||||||
|
InterchainMeta: interchainMeta,
|
||||||
|
TxHashList: txHashList,
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func (exec *BlockExecutor) applyTransaction(i int, tx *pb.Transaction, opt *agency.TxOpt) ([]byte, error) {
|
func (exec *BlockExecutor) applyTransaction(i int, tx *pb.Transaction, opt *agency.TxOpt) ([]byte, error) {
|
||||||
|
|
|
@ -20,5 +20,5 @@ type Executor interface {
|
||||||
ApplyReadonlyTransactions(txs []*pb.Transaction) []*pb.Receipt
|
ApplyReadonlyTransactions(txs []*pb.Transaction) []*pb.Receipt
|
||||||
|
|
||||||
// SubscribeBlockEvent
|
// SubscribeBlockEvent
|
||||||
SubscribeBlockEvent(chan<- events.NewBlockEvent) event.Subscription
|
SubscribeBlockEvent(chan<- events.ExecutedEvent) event.Subscription
|
||||||
}
|
}
|
||||||
|
|
|
@ -48,6 +48,7 @@ type BlockData struct {
|
||||||
Accounts map[string]*Account
|
Accounts map[string]*Account
|
||||||
Journal *BlockJournal
|
Journal *BlockJournal
|
||||||
InterchainMeta *pb.InterchainMeta
|
InterchainMeta *pb.InterchainMeta
|
||||||
|
TxHashList []*types.Hash
|
||||||
}
|
}
|
||||||
|
|
||||||
// New create a new ledger instance
|
// New create a new ledger instance
|
||||||
|
|
|
@ -5,9 +5,10 @@ import (
|
||||||
"github.com/meshplus/bitxhub-model/pb"
|
"github.com/meshplus/bitxhub-model/pb"
|
||||||
)
|
)
|
||||||
|
|
||||||
type NewBlockEvent struct {
|
type ExecutedEvent struct {
|
||||||
Block *pb.Block
|
Block *pb.Block
|
||||||
InterchainMeta *pb.InterchainMeta
|
InterchainMeta *pb.InterchainMeta
|
||||||
|
TxHashList []*types.Hash
|
||||||
}
|
}
|
||||||
|
|
||||||
type CheckpointEvent struct {
|
type CheckpointEvent struct {
|
||||||
|
|
Loading…
Reference in New Issue