371 lines
9.0 KiB
Go
Executable File
371 lines
9.0 KiB
Go
Executable File
package executor
|
|
|
|
import (
|
|
"bytes"
|
|
"encoding/json"
|
|
"fmt"
|
|
"sort"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"github.com/cbergoon/merkletree"
|
|
"github.com/meshplus/bitxhub-core/agency"
|
|
"github.com/meshplus/bitxhub-kit/crypto"
|
|
"github.com/meshplus/bitxhub-kit/crypto/asym"
|
|
"github.com/meshplus/bitxhub-kit/types"
|
|
"github.com/meshplus/bitxhub-model/pb"
|
|
"github.com/meshplus/bitxhub/internal/ledger"
|
|
"github.com/meshplus/bitxhub/internal/model/events"
|
|
"github.com/meshplus/bitxhub/pkg/vm"
|
|
"github.com/meshplus/bitxhub/pkg/vm/boltvm"
|
|
"github.com/meshplus/bitxhub/pkg/vm/wasm"
|
|
"github.com/sirupsen/logrus"
|
|
)
|
|
|
|
func (exec *BlockExecutor) processExecuteEvent(block *pb.Block) *ledger.BlockData {
|
|
current := time.Now()
|
|
var txHashList []*types.Hash
|
|
|
|
for _, tx := range block.Transactions {
|
|
txHashList = append(txHashList, tx.TransactionHash)
|
|
}
|
|
|
|
block = exec.verifyProofs(block)
|
|
receipts := exec.txsExecutor.ApplyTransactions(block.Transactions)
|
|
|
|
applyTxsDuration.Observe(float64(time.Since(current)) / float64(time.Second))
|
|
exec.logger.WithFields(logrus.Fields{
|
|
"time": time.Since(current),
|
|
"count": len(block.Transactions),
|
|
}).Debug("Apply transactions elapsed")
|
|
|
|
calcMerkleStart := time.Now()
|
|
l1Root, l2Roots, err := exec.buildTxMerkleTree(block.Transactions)
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
|
|
receiptRoot, err := exec.calcReceiptMerkleRoot(receipts)
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
|
|
calcMerkleDuration.Observe(float64(time.Since(calcMerkleStart)) / float64(time.Second))
|
|
|
|
block.BlockHeader.TxRoot = l1Root
|
|
block.BlockHeader.ReceiptRoot = receiptRoot
|
|
block.BlockHeader.ParentHash = exec.currentBlockHash
|
|
|
|
accounts, journal := exec.ledger.FlushDirtyDataAndComputeJournal()
|
|
|
|
block.BlockHeader.StateRoot = journal.ChangedHash
|
|
block.BlockHash = block.Hash()
|
|
|
|
exec.logger.WithFields(logrus.Fields{
|
|
"tx_root": block.BlockHeader.TxRoot.String(),
|
|
"receipt_root": block.BlockHeader.ReceiptRoot.String(),
|
|
"state_root": block.BlockHeader.StateRoot.String(),
|
|
}).Debug("block meta")
|
|
calcBlockSize.Observe(float64(block.Size()))
|
|
executeBlockDuration.Observe(float64(time.Since(current)) / float64(time.Second))
|
|
|
|
counter := make(map[string]*pb.Uint64Slice)
|
|
for k, v := range exec.txsExecutor.GetInterchainCounter() {
|
|
counter[k] = &pb.Uint64Slice{Slice: v}
|
|
}
|
|
interchainMeta := &pb.InterchainMeta{
|
|
Counter: counter,
|
|
L2Roots: l2Roots,
|
|
}
|
|
exec.clear()
|
|
|
|
exec.currentHeight = block.BlockHeader.Number
|
|
exec.currentBlockHash = block.BlockHash
|
|
|
|
return &ledger.BlockData{
|
|
Block: block,
|
|
Receipts: receipts,
|
|
Accounts: accounts,
|
|
Journal: journal,
|
|
InterchainMeta: interchainMeta,
|
|
TxHashList: txHashList,
|
|
}
|
|
}
|
|
|
|
func (exec *BlockExecutor) listenPreExecuteEvent() {
|
|
for {
|
|
select {
|
|
case block := <-exec.preBlockC:
|
|
now := time.Now()
|
|
block = exec.verifySign(block)
|
|
exec.logger.WithFields(logrus.Fields{
|
|
"height": block.BlockHeader.Number,
|
|
"count": len(block.Transactions),
|
|
"elapse": time.Since(now),
|
|
}).Debug("Verified signature")
|
|
exec.blockC <- block
|
|
case <-exec.ctx.Done():
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func (exec *BlockExecutor) buildTxMerkleTree(txs []*pb.Transaction) (*types.Hash, []types.Hash, error) {
|
|
var (
|
|
groupCnt = len(exec.txsExecutor.GetInterchainCounter()) + 1
|
|
wg = sync.WaitGroup{}
|
|
lock = sync.Mutex{}
|
|
l2Roots = make([]types.Hash, 0, groupCnt)
|
|
errorCnt = int32(0)
|
|
)
|
|
|
|
wg.Add(groupCnt - 1)
|
|
for addr, txIndexes := range exec.txsExecutor.GetInterchainCounter() {
|
|
go func(addr string, txIndexes []uint64) {
|
|
defer wg.Done()
|
|
|
|
txHashes := make([]merkletree.Content, 0, len(txIndexes))
|
|
for _, txIndex := range txIndexes {
|
|
txHashes = append(txHashes, txs[txIndex].TransactionHash)
|
|
}
|
|
|
|
hash, err := calcMerkleRoot(txHashes)
|
|
if err != nil {
|
|
atomic.AddInt32(&errorCnt, 1)
|
|
return
|
|
}
|
|
|
|
lock.Lock()
|
|
defer lock.Unlock()
|
|
l2Roots = append(l2Roots, *hash)
|
|
}(addr, txIndexes)
|
|
}
|
|
|
|
txHashes := make([]merkletree.Content, 0, len(exec.txsExecutor.GetNormalTxs()))
|
|
for _, txHash := range exec.txsExecutor.GetNormalTxs() {
|
|
txHashes = append(txHashes, txHash)
|
|
}
|
|
|
|
hash, err := calcMerkleRoot(txHashes)
|
|
if err != nil {
|
|
atomic.AddInt32(&errorCnt, 1)
|
|
}
|
|
|
|
lock.Lock()
|
|
l2Roots = append(l2Roots, *hash)
|
|
lock.Unlock()
|
|
|
|
wg.Wait()
|
|
if errorCnt != 0 {
|
|
return nil, nil, fmt.Errorf("build tx merkle tree error")
|
|
}
|
|
|
|
sort.Slice(l2Roots, func(i, j int) bool {
|
|
return bytes.Compare(l2Roots[i].Bytes(), l2Roots[j].Bytes()) < 0
|
|
})
|
|
|
|
contents := make([]merkletree.Content, 0, groupCnt)
|
|
for _, l2Root := range l2Roots {
|
|
r := l2Root
|
|
contents = append(contents, &r)
|
|
}
|
|
root, err := calcMerkleRoot(contents)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
|
|
return root, l2Roots, nil
|
|
}
|
|
|
|
func (exec *BlockExecutor) verifySign(block *pb.Block) *pb.Block {
|
|
if block.BlockHeader.Number == 1 {
|
|
return block
|
|
}
|
|
|
|
var (
|
|
wg sync.WaitGroup
|
|
mutex sync.Mutex
|
|
index []int
|
|
)
|
|
txs := block.Transactions
|
|
|
|
wg.Add(len(txs))
|
|
for i, tx := range txs {
|
|
go func(i int, tx *pb.Transaction) {
|
|
defer wg.Done()
|
|
ok, _ := asym.Verify(crypto.Secp256k1, tx.Signature, tx.SignHash().Bytes(), *tx.From)
|
|
if !ok {
|
|
mutex.Lock()
|
|
defer mutex.Unlock()
|
|
index = append(index, i)
|
|
}
|
|
}(i, tx)
|
|
}
|
|
wg.Wait()
|
|
|
|
if len(index) > 0 {
|
|
sort.Sort(sort.Reverse(sort.IntSlice(index)))
|
|
for _, idx := range index {
|
|
txs = append(txs[:idx], txs[idx+1:]...)
|
|
}
|
|
block.Transactions = txs
|
|
}
|
|
|
|
return block
|
|
}
|
|
|
|
func (exec *BlockExecutor) applyTx(index int, tx *pb.Transaction, opt *agency.TxOpt) *pb.Receipt {
|
|
receipt := &pb.Receipt{
|
|
Version: tx.Version,
|
|
TxHash: tx.TransactionHash,
|
|
}
|
|
normalTx := true
|
|
|
|
ret, err := exec.applyTransaction(index, tx, opt)
|
|
if err != nil {
|
|
receipt.Status = pb.Receipt_FAILED
|
|
receipt.Ret = []byte(err.Error())
|
|
} else {
|
|
receipt.Status = pb.Receipt_SUCCESS
|
|
receipt.Ret = ret
|
|
}
|
|
|
|
events := exec.ledger.Events(tx.TransactionHash.String())
|
|
if len(events) != 0 {
|
|
receipt.Events = events
|
|
for _, ev := range events {
|
|
if ev.Interchain {
|
|
m := make(map[string]uint64)
|
|
err := json.Unmarshal(ev.Data, &m)
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
|
|
for k, v := range m {
|
|
exec.txsExecutor.AddInterchainCounter(k, v)
|
|
}
|
|
normalTx = false
|
|
}
|
|
}
|
|
}
|
|
|
|
if normalTx {
|
|
exec.txsExecutor.AddNormalTx(tx.TransactionHash)
|
|
}
|
|
|
|
return receipt
|
|
}
|
|
|
|
func (exec *BlockExecutor) postBlockEvent(block *pb.Block, interchainMeta *pb.InterchainMeta, txHashList []*types.Hash) {
|
|
go exec.blockFeed.Send(events.ExecutedEvent{
|
|
Block: block,
|
|
InterchainMeta: interchainMeta,
|
|
TxHashList: txHashList,
|
|
})
|
|
}
|
|
|
|
func (exec *BlockExecutor) applyTransaction(i int, tx *pb.Transaction, opt *agency.TxOpt) ([]byte, error) {
|
|
if tx.IsIBTP() {
|
|
ctx := vm.NewContext(tx, uint64(i), nil, exec.ledger, exec.logger)
|
|
instance := boltvm.New(ctx, exec.validationEngine, exec.getContracts(opt))
|
|
return instance.HandleIBTP(tx.IBTP)
|
|
}
|
|
|
|
if tx.Payload == nil {
|
|
return nil, fmt.Errorf("empty transaction data")
|
|
}
|
|
|
|
data := &pb.TransactionData{}
|
|
if err := data.Unmarshal(tx.Payload); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
switch data.Type {
|
|
case pb.TransactionData_NORMAL:
|
|
err := exec.transfer(tx.From, tx.To, data.Amount)
|
|
return nil, err
|
|
default:
|
|
var instance vm.VM
|
|
switch data.VmType {
|
|
case pb.TransactionData_BVM:
|
|
ctx := vm.NewContext(tx, uint64(i), data, exec.ledger, exec.logger)
|
|
instance = boltvm.New(ctx, exec.validationEngine, exec.getContracts(opt))
|
|
case pb.TransactionData_XVM:
|
|
ctx := vm.NewContext(tx, uint64(i), data, exec.ledger, exec.logger)
|
|
imports, err := wasm.EmptyImports()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
instance, err = wasm.New(ctx, imports, exec.wasmInstances)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
default:
|
|
return nil, fmt.Errorf("wrong vm type")
|
|
}
|
|
|
|
return instance.Run(data.Payload)
|
|
}
|
|
}
|
|
|
|
func (exec *BlockExecutor) clear() {
|
|
exec.ledger.Clear()
|
|
}
|
|
|
|
func (exec *BlockExecutor) transfer(from, to *types.Address, value uint64) error {
|
|
if value == 0 {
|
|
return nil
|
|
}
|
|
|
|
fv := exec.ledger.GetBalance(from)
|
|
if fv < value {
|
|
return fmt.Errorf("not sufficient funds for %s", from.String())
|
|
}
|
|
|
|
tv := exec.ledger.GetBalance(to)
|
|
|
|
exec.ledger.SetBalance(from, fv-value)
|
|
exec.ledger.SetBalance(to, tv+value)
|
|
|
|
return nil
|
|
}
|
|
|
|
func (exec *BlockExecutor) calcReceiptMerkleRoot(receipts []*pb.Receipt) (*types.Hash, error) {
|
|
current := time.Now()
|
|
|
|
receiptHashes := make([]merkletree.Content, 0, len(receipts))
|
|
for _, receipt := range receipts {
|
|
receiptHashes = append(receiptHashes, receipt.Hash())
|
|
}
|
|
receiptRoot, err := calcMerkleRoot(receiptHashes)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
exec.logger.WithField("time", time.Since(current)).Debug("Calculate receipt merkle roots")
|
|
|
|
return receiptRoot, nil
|
|
}
|
|
|
|
func calcMerkleRoot(contents []merkletree.Content) (*types.Hash, error) {
|
|
if len(contents) == 0 {
|
|
return &types.Hash{}, nil
|
|
}
|
|
|
|
tree, err := merkletree.NewTree(contents)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return types.NewHash(tree.MerkleRoot()), nil
|
|
}
|
|
|
|
func (exec *BlockExecutor) getContracts(opt *agency.TxOpt) map[string]agency.Contract {
|
|
if opt != nil && opt.Contracts != nil {
|
|
return opt.Contracts
|
|
}
|
|
|
|
return exec.txsExecutor.GetBoltContracts()
|
|
}
|