feat(api): add default tx wrapper for return

1. add default tx wrapper for return when a block contains no interchan tx
2. close channel when GetInterchainWrapper and GetBlockHeader finish
This commit is contained in:
Alexader 2020-04-28 18:43:59 +08:00
parent e6229af3db
commit 574f90cc3a
2 changed files with 29 additions and 1 deletions

View File

@ -4,6 +4,7 @@ import (
"encoding/json"
"fmt"
"github.com/meshplus/bitxhub-kit/types"
"github.com/meshplus/bitxhub-model/pb"
"github.com/meshplus/bitxhub/internal/model/events"
)
@ -25,7 +26,7 @@ func (cbs *ChainBrokerService) Subscribe(req *pb.SubscriptionRequest, server pb.
case pb.SubscriptionRequest_BLOCK_HEADER.String():
return cbs.handleBlockHeaderSubscription(server)
case pb.SubscriptionRequest_INTERCHAIN_TX_WRAPPER.String():
return cbs.handleInterchainTxWrapperSubscription(server, string(req.Extra))
return cbs.handleInterchainTxWrapperSubscription(server, types.Bytes2Address(req.Extra).String())
}
return nil
@ -111,6 +112,7 @@ func (cbs *ChainBrokerService) handleInterchainTxSubscription(server pb.ChainBro
func (cbs *ChainBrokerService) handleInterchainTxWrapperSubscription(server pb.ChainBroker_SubscribeServer, pid string) error {
ch, err := cbs.api.Broker().AddPier(pid)
defer cbs.api.Broker().RemovePier(pid)
if err != nil {
return err
}

View File

@ -91,11 +91,24 @@ func (router *InterchainRouter) PutBlock(block *pb.Block) {
return true
}
// empty interchain tx in this block
hashes := make([]types.Hash, 0, len(block.Transactions))
for _, tx := range block.Transactions {
hashes = append(hashes, tx.TransactionHash)
}
w <- &pb.InterchainTxWrapper{
Height: block.Height(),
TransactionHashes: hashes,
}
return true
})
}
func (router *InterchainRouter) GetBlockHeader(begin, end uint64, ch chan<- *pb.BlockHeader) error {
defer close(ch)
for i := begin; i <= end; i++ {
block, err := router.ledger.GetBlock(i)
if err != nil {
@ -110,6 +123,8 @@ func (router *InterchainRouter) GetBlockHeader(begin, end uint64, ch chan<- *pb.
}
func (router *InterchainRouter) GetInterchainTxWrapper(pid string, begin, end uint64, ch chan<- *pb.InterchainTxWrapper) error {
defer close(ch)
for i := begin; i <= end; i++ {
block, err := router.ledger.GetBlock(i)
if err != nil {
@ -121,6 +136,17 @@ func (router *InterchainRouter) GetInterchainTxWrapper(pid string, begin, end ui
ch <- ret[pid]
continue
}
// empty interchain tx in this block
hashes := make([]types.Hash, 0, len(block.Transactions))
for _, tx := range block.Transactions {
hashes = append(hashes, tx.TransactionHash)
}
ch <- &pb.InterchainTxWrapper{
Height: block.Height(),
TransactionHashes: hashes,
}
}
return nil