refactor(*):refactor verify interchain tx

This commit is contained in:
jiangzhe 2020-09-14 11:29:14 +08:00
parent 369534d6dd
commit be3ef940fa
10 changed files with 350 additions and 110 deletions

View File

@ -13,7 +13,6 @@ import (
"github.com/meshplus/bitxhub/internal/ledger"
"github.com/meshplus/bitxhub/internal/ledger/genesis"
"github.com/meshplus/bitxhub/internal/loggers"
orderplg "github.com/meshplus/bitxhub/internal/plugins"
"github.com/meshplus/bitxhub/internal/repo"
"github.com/meshplus/bitxhub/internal/router"
"github.com/meshplus/bitxhub/internal/storages"
@ -57,7 +56,7 @@ func NewBitXHub(rep *repo.Repo) (*BitXHub, error) {
}
}
order, err := orderplg.New(
order, err := etcdraft.NewNode(
order.WithRepoRoot(repoRoot),
order.WithStoragePath(repo.GetStoragePath(repoRoot, "order")),
order.WithPluginPath(rep.Config.Plugin),
@ -67,6 +66,7 @@ func NewBitXHub(rep *repo.Repo) (*BitXHub, error) {
order.WithLogger(loggers.Logger(loggers.Order)),
order.WithApplied(chainMeta.Height),
order.WithDigest(chainMeta.BlockHash.Hex()),
order.WithLedger(bxh.Ledger),
order.WithGetChainMetaFunc(bxh.Ledger.GetChainMeta),
order.WithGetTransactionFunc(bxh.Ledger.GetTransaction),
)

View File

@ -57,11 +57,11 @@ func (i *Interchain) UnmarshalJSON(data []byte) error {
func (x *InterchainManager) Register() *boltvm.Response {
interchain := &Interchain{ID: x.Caller()}
ok := x.Has(x.appchainKey(x.Caller()))
ok := x.Has(AppchainKey(x.Caller()))
if ok {
x.GetObject(x.appchainKey(x.Caller()), interchain)
x.GetObject(AppchainKey(x.Caller()), interchain)
} else {
x.SetObject(x.appchainKey(x.Caller()), interchain)
x.SetObject(AppchainKey(x.Caller()), interchain)
}
body, err := json.Marshal(interchain)
if err != nil {
@ -72,13 +72,13 @@ func (x *InterchainManager) Register() *boltvm.Response {
}
func (x *InterchainManager) DeleteInterchain(id string) *boltvm.Response {
x.Delete(x.appchainKey(id))
x.Delete(AppchainKey(id))
return boltvm.Success(nil)
}
// Interchain returns information of the interchain count, Receipt count and SourceReceipt count
func (x *InterchainManager) Interchain() *boltvm.Response {
ok, data := x.Get(x.appchainKey(x.Caller()))
ok, data := x.Get(AppchainKey(x.Caller()))
if !ok {
return boltvm.Error(fmt.Errorf("this appchain does not exist").Error())
}
@ -87,7 +87,7 @@ func (x *InterchainManager) Interchain() *boltvm.Response {
// GetInterchain returns information of the interchain count, Receipt count and SourceReceipt count by id
func (x *InterchainManager) GetInterchain(id string) *boltvm.Response {
ok, data := x.Get(x.appchainKey(id))
ok, data := x.Get(AppchainKey(id))
if !ok {
return boltvm.Error(fmt.Errorf("this appchain does not exist").Error())
}
@ -104,13 +104,13 @@ func (x *InterchainManager) HandleIBTP(data []byte) *boltvm.Response {
return x.handleUnionIBTP(ibtp)
}
ok := x.Has(x.appchainKey(x.Caller()))
ok := x.Has(AppchainKey(x.Caller()))
if !ok {
return boltvm.Error("this appchain does not exist")
}
interchain := &Interchain{}
x.GetObject(x.appchainKey(ibtp.From), &interchain)
x.GetObject(AppchainKey(ibtp.From), &interchain)
if err := x.checkIBTP(ibtp, interchain); err != nil {
return boltvm.Error(err.Error())
@ -138,7 +138,7 @@ func (x *InterchainManager) HandleIBTP(data []byte) *boltvm.Response {
}
func (x *InterchainManager) HandleIBTPs(data []byte) *boltvm.Response {
ok := x.Has(x.appchainKey(x.Caller()))
ok := x.Has(AppchainKey(x.Caller()))
if !ok {
return boltvm.Error("this appchain does not exist")
}
@ -149,7 +149,7 @@ func (x *InterchainManager) HandleIBTPs(data []byte) *boltvm.Response {
}
interchain := &Interchain{}
x.GetObject(x.appchainKey(x.Caller()), &interchain)
x.GetObject(AppchainKey(x.Caller()), &interchain)
for _, ibtp := range ibtps.Iptp {
if err := x.checkIBTP(ibtp, interchain); err != nil {
@ -172,31 +172,31 @@ func (x *InterchainManager) checkIBTP(ibtp *pb.IBTP, interchain *Interchain) err
if ibtp.To == "" {
return fmt.Errorf("empty destination chain id")
}
if ok := x.Has(x.appchainKey(ibtp.To)); !ok {
if ok := x.Has(AppchainKey(ibtp.To)); !ok {
x.Logger().WithField("chain_id", ibtp.To).Debug("target appchain does not exist")
}
app := &appchainMgr.Appchain{}
res := x.CrossInvoke(constant.AppchainMgrContractAddr.String(), "GetAppchain", pb.String(ibtp.From))
if err := json.Unmarshal(res.Result, app); err != nil {
return err
}
// get validation rule contract address
res = x.CrossInvoke(constant.RuleManagerContractAddr.String(), "GetRuleAddress", pb.String(ibtp.From), pb.String(app.ChainType))
if !res.Ok {
return fmt.Errorf("this appchain does not register rule")
}
// handle validation
isValid, err := x.ValidationEngine().Validate(string(res.Result), ibtp.From, ibtp.Proof, ibtp.Payload, app.Validators)
if err != nil {
return err
}
if !isValid {
return fmt.Errorf("invalid interchain transaction")
}
//app := &appchainMgr.Appchain{}
//res := x.CrossInvoke(constant.AppchainMgrContractAddr.String(), "GetAppchain", pb.String(ibtp.From))
//if err := json.Unmarshal(res.Result, app); err != nil {
// return err
//}
//
//// get validation rule contract address
//res = x.CrossInvoke(constant.RuleManagerContractAddr.String(), "GetRuleAddress", pb.String(ibtp.From), pb.String(app.ChainType))
//if !res.Ok {
// return fmt.Errorf("this appchain does not register rule")
//}
//
//// handle validation
//isValid, err := x.ValidationEngine().Validate(string(res.Result), ibtp.From, ibtp.Proof, ibtp.Payload, app.Validators)
//if err != nil {
// return err
//}
//
//if !isValid {
// return fmt.Errorf("invalid interchain transaction")
//}
if pb.IBTP_INTERCHAIN == ibtp.Type ||
pb.IBTP_ASSET_EXCHANGE_INIT == ibtp.Type ||
@ -206,21 +206,21 @@ func (x *InterchainManager) checkIBTP(ibtp *pb.IBTP, interchain *Interchain) err
return fmt.Errorf("ibtp from != caller")
}
idx := interchain.InterchainCounter[ibtp.To]
if idx+1 != ibtp.Index {
return fmt.Errorf(fmt.Sprintf("wrong index, required %d, but %d", idx+1, ibtp.Index))
}
//idx := interchain.InterchainCounter[ibtp.To]
//if idx+1 != ibtp.Index {
// return fmt.Errorf(fmt.Sprintf("wrong index, required %d, but %d", idx+1, ibtp.Index))
//}
} else {
if ibtp.To != x.Caller() {
return fmt.Errorf("ibtp to != caller")
}
idx := interchain.ReceiptCounter[ibtp.To]
if idx+1 != ibtp.Index {
if interchain.SourceReceiptCounter[ibtp.To]+1 != ibtp.Index {
return fmt.Errorf("wrong receipt index, required %d, but %d", idx+1, ibtp.Index)
}
}
//idx := interchain.ReceiptCounter[ibtp.To]
//if idx+1 != ibtp.Index {
// if interchain.SourceReceiptCounter[ibtp.To]+1 != ibtp.Index {
// return fmt.Errorf("wrong receipt index, required %d, but %d", idx+1, ibtp.Index)
// }
//}
}
return nil
@ -234,18 +234,18 @@ func (x *InterchainManager) ProcessIBTP(ibtp *pb.IBTP, interchain *Interchain) {
pb.IBTP_ASSET_EXCHANGE_REDEEM == ibtp.Type ||
pb.IBTP_ASSET_EXCHANGE_REFUND == ibtp.Type {
interchain.InterchainCounter[ibtp.To]++
x.SetObject(x.appchainKey(ibtp.From), interchain)
x.SetObject(AppchainKey(ibtp.From), interchain)
x.SetObject(x.indexMapKey(ibtp.ID()), x.GetTxHash())
m[ibtp.To] = x.GetTxIndex()
} else {
interchain.ReceiptCounter[ibtp.To] = ibtp.Index
x.SetObject(x.appchainKey(ibtp.From), interchain)
x.SetObject(AppchainKey(ibtp.From), interchain)
m[ibtp.From] = x.GetTxIndex()
ic := &Interchain{}
x.GetObject(x.appchainKey(ibtp.To), &ic)
x.GetObject(AppchainKey(ibtp.To), &ic)
ic.SourceReceiptCounter[ibtp.From] = ibtp.Index
x.SetObject(x.appchainKey(ibtp.To), ic)
x.SetObject(AppchainKey(ibtp.To), ic)
}
x.PostInterchainEvent(m)
@ -323,7 +323,7 @@ func (x *InterchainManager) GetIBTPByID(id string) *boltvm.Response {
func (x *InterchainManager) handleUnionIBTP(ibtp *pb.IBTP) *boltvm.Response {
srcRelayChainID := strings.Split(ibtp.From, "-")[0]
ok := x.Has(x.appchainKey(srcRelayChainID))
ok := x.Has(AppchainKey(srcRelayChainID))
if !ok {
return boltvm.Error("this relay chain does not exist")
}
@ -331,7 +331,7 @@ func (x *InterchainManager) handleUnionIBTP(ibtp *pb.IBTP) *boltvm.Response {
if ibtp.To == "" {
return boltvm.Error("empty destination chain id")
}
if ok := x.Has(x.appchainKey(ibtp.To)); !ok {
if ok := x.Has(AppchainKey(ibtp.To)); !ok {
return boltvm.Error(fmt.Sprintf("target appchain does not exist: %s", ibtp.To))
}
@ -344,11 +344,11 @@ func (x *InterchainManager) handleUnionIBTP(ibtp *pb.IBTP) *boltvm.Response {
interchain := &Interchain{
ID: ibtp.From,
}
ok = x.Has(x.appchainKey(ibtp.From))
ok = x.Has(AppchainKey(ibtp.From))
if !ok {
x.SetObject(x.appchainKey(ibtp.From), interchain)
x.SetObject(AppchainKey(ibtp.From), interchain)
}
x.GetObject(x.appchainKey(ibtp.From), &interchain)
x.GetObject(AppchainKey(ibtp.From), &interchain)
if err := x.checkUnionIBTP(app, ibtp, interchain); err != nil {
return boltvm.Error(err.Error())
@ -417,7 +417,7 @@ func (x *InterchainManager) checkUnionIBTP(app *appchainMgr.Appchain, ibtp *pb.I
}
func (x *InterchainManager) appchainKey(id string) string {
func AppchainKey(id string) string {
return appchainMgr.PREFIX + id
}

View File

@ -19,13 +19,13 @@ type RuleManager struct {
boltvm.Stub
}
type rule struct {
type Rule struct {
Address string `json:"address"`
Status int32 `json:"status"` // 0 => registered, 1 => approved, -1 => rejected
}
type ruleRecord struct {
Rule *rule `json:"rule"`
Rule *Rule `json:"rule"`
IsApproved bool `json:"is_approved"`
Desc string `json:"desc"`
}
@ -37,18 +37,18 @@ func (r *RuleManager) RegisterRule(id string, address string) *boltvm.Response {
return boltvm.Error("this appchain does not exist")
}
rl := &rule{
rl := &Rule{
Address: address,
Status: 0,
}
r.SetObject(r.ruleKey(id), rl)
r.SetObject(RuleKey(id), rl)
return boltvm.Success(nil)
}
func (r *RuleManager) GetRuleAddress(id, chainType string) *boltvm.Response {
rl := &rule{}
rl := &Rule{}
ok := r.GetObject(r.ruleKey(id), rl)
ok := r.GetObject(RuleKey(id), rl)
if ok {
return boltvm.Success([]byte(rl.Address))
}
@ -61,8 +61,8 @@ func (r *RuleManager) GetRuleAddress(id, chainType string) *boltvm.Response {
}
func (r *RuleManager) Audit(id string, isApproved int32, desc string) *boltvm.Response {
rl := &rule{}
ok := r.GetObject(r.ruleKey(id), rl)
rl := &Rule{}
ok := r.GetObject(RuleKey(id), rl)
if !ok {
return boltvm.Error(fmt.Errorf("this rule does not exist").Error())
}
@ -79,12 +79,12 @@ func (r *RuleManager) Audit(id string, isApproved int32, desc string) *boltvm.Re
records = append(records, record)
r.SetObject(r.ruleRecordKey(id), records)
r.SetObject(r.ruleKey(id), rl)
r.SetObject(RuleKey(id), rl)
return boltvm.Success(nil)
}
func (r *RuleManager) ruleKey(id string) string {
func RuleKey(id string) string {
return rulePrefix + id
}

View File

@ -3,6 +3,8 @@ package order
import (
"fmt"
"github.com/meshplus/bitxhub/internal/ledger"
"github.com/meshplus/bitxhub-kit/crypto"
"github.com/meshplus/bitxhub-kit/types"
"github.com/meshplus/bitxhub-model/pb"
@ -21,6 +23,7 @@ type Config struct {
Nodes map[uint64]types.Address
Applied uint64
Digest string
Ledger ledger.Ledger
GetTransactionFunc func(hash types.Hash) (*pb.Transaction, error)
GetChainMetaFunc func() *pb.ChainMeta
}
@ -98,6 +101,12 @@ func WithGetTransactionFunc(f func(hash types.Hash) (*pb.Transaction, error)) Op
}
}
func WithLedger(l ledger.Ledger) Option {
return func(config *Config) {
config.Ledger = l
}
}
func checkConfig(config *Config) error {
if config.Logger == nil {
return fmt.Errorf("logger is nil")

View File

@ -0,0 +1,8 @@
proto:
cd proto && protoc -I=. \
-I${GOPATH}/src \
-I${GOPATH}/src/github.com/gogo/protobuf/protobuf \
--gogofast_out=:. \
message.proto
.PHONY: proto

View File

@ -180,9 +180,9 @@ func (n *Node) ReportState(height uint64, hash types.Hash) {
n.logger.Errorf("can not found ready:", height)
return
}
hashes := ready.(*raftproto.Ready).TxHashes
txHashes := ready.(*raftproto.Ready).TxHashes
// remove redundant tx
n.tp.BatchDelete(hashes)
n.tp.BatchDelete(txHashes)
n.readyCache.Delete(height)
}
@ -287,6 +287,7 @@ func (n *Node) run() {
if err != nil {
n.logger.Panic(err)
}
n.tp.BatchStore(ready.TxHashes)
if err := n.node.Propose(n.ctx, data); err != nil {
n.logger.Panic("Failed to propose block [%d] to raft: %s", ready.Height, err)
@ -463,9 +464,11 @@ func (n *Node) mint(ready *raftproto.Ready) {
if n.tp.GetHeight() == ready.Height-1 {
n.tp.UpdateHeight()
}
txHashes := ready.TxHashes
loseTxs := make([]types.Hash, 0)
txs := make([]*pb.Transaction, 0, len(ready.TxHashes))
for _, hash := range ready.TxHashes {
txs := make([]*pb.Transaction, 0, len(txHashes))
for _, hash := range txHashes {
_, ok := n.tp.GetTx(hash, false)
if !ok {
loseTxs = append(loseTxs, hash)
@ -485,11 +488,15 @@ func (n *Node) mint(ready *raftproto.Ready) {
wg.Wait()
}
for _, hash := range ready.TxHashes {
for i, hash := range txHashes {
tx, _ := n.tp.GetTx(hash, false)
if ready.ProofHashes[i] != (types.Hash{}) {
tx.Extra = nil
}
txs = append(txs, tx)
}
n.tp.RemoveTxs(ready.TxHashes, n.IsLeader())
n.tp.RemoveTxs(txHashes, n.IsLeader())
block := &pb.Block{
BlockHeader: &pb.BlockHeader{
Version: []byte("1.0.0"),

View File

@ -5,13 +5,12 @@ package proto
import (
fmt "fmt"
io "io"
math "math"
math_bits "math/bits"
_ "github.com/gogo/protobuf/gogoproto"
proto "github.com/gogo/protobuf/proto"
github_com_meshplus_bitxhub_kit_types "github.com/meshplus/bitxhub-kit/types"
io "io"
math "math"
math_bits "math/bits"
)
// Reference imports to suppress errors if they are not otherwise used.
@ -121,7 +120,8 @@ func (m *RaftMessage) GetData() []byte {
type Ready struct {
TxHashes []github_com_meshplus_bitxhub_kit_types.Hash `protobuf:"bytes,1,rep,name=txHashes,proto3,customtype=github.com/meshplus/bitxhub-kit/types.Hash" json:"txHashes"`
Height uint64 `protobuf:"varint,2,opt,name=height,proto3" json:"height,omitempty"`
ProofHashes []github_com_meshplus_bitxhub_kit_types.Hash `protobuf:"bytes,2,rep,name=proofHashes,proto3,customtype=github.com/meshplus/bitxhub-kit/types.Hash" json:"proofHashes"`
Height uint64 `protobuf:"varint,3,opt,name=height,proto3" json:"height,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
@ -176,27 +176,28 @@ func init() {
func init() { proto.RegisterFile("message.proto", fileDescriptor_33c57e4bae7b9afd) }
var fileDescriptor_33c57e4bae7b9afd = []byte{
// 306 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x4c, 0x90, 0x41, 0x4b, 0xc3, 0x30,
0x1c, 0xc5, 0x97, 0xad, 0x1b, 0xfa, 0xb7, 0x1b, 0x25, 0x07, 0x2d, 0x1e, 0xb6, 0xb2, 0x53, 0x51,
0xd6, 0xc2, 0xfc, 0x04, 0x5b, 0x1d, 0x2a, 0xe2, 0x06, 0xe9, 0x04, 0x6f, 0x23, 0x75, 0x59, 0x53,
0xb4, 0xa4, 0x2c, 0x29, 0xac, 0x9f, 0xc9, 0x2f, 0xb2, 0xa3, 0x67, 0x0f, 0x43, 0xfa, 0x49, 0xa4,
0xa9, 0xca, 0x4e, 0x79, 0xbf, 0xf0, 0xde, 0x23, 0x2f, 0xd0, 0x4d, 0x99, 0x94, 0x34, 0x66, 0x5e,
0xb6, 0x15, 0x4a, 0xe0, 0xb6, 0x3e, 0x2e, 0x47, 0x71, 0xa2, 0x78, 0x1e, 0x79, 0xaf, 0x22, 0xf5,
0x63, 0x11, 0x0b, 0x5f, 0x5f, 0x47, 0xf9, 0x46, 0x93, 0x06, 0xad, 0xea, 0xd4, 0xf0, 0x03, 0xc1,
0x19, 0xa1, 0x1b, 0xf5, 0x54, 0x77, 0xe1, 0x6b, 0x30, 0x54, 0x91, 0x31, 0x1b, 0x39, 0xc8, 0xed,
0x8d, 0x2f, 0x6a, 0x97, 0x77, 0xe4, 0xf0, 0x96, 0x45, 0xc6, 0x88, 0x36, 0xe1, 0x73, 0xe8, 0x6c,
0xb6, 0x22, 0x7d, 0x58, 0xdb, 0x4d, 0x07, 0xb9, 0x06, 0xf9, 0x25, 0x8c, 0xc1, 0x58, 0x53, 0x45,
0xed, 0x96, 0x83, 0x5c, 0x93, 0x68, 0x3d, 0x0c, 0xc0, 0xa8, 0x92, 0xb8, 0x0b, 0xa7, 0xc1, 0x62,
0x1e, 0xce, 0xe6, 0xe1, 0x73, 0x68, 0x35, 0xb0, 0x05, 0xe6, 0x94, 0x2c, 0x26, 0xb7, 0xc1, 0x24,
0x5c, 0xae, 0x96, 0x2f, 0x16, 0xc2, 0x00, 0x9d, 0xbb, 0x99, 0xd6, 0x4d, 0xdc, 0x03, 0xa8, 0xf5,
0x6a, 0x12, 0x3c, 0x5a, 0xad, 0xa1, 0x80, 0x36, 0x61, 0x74, 0x5d, 0xe0, 0x39, 0x9c, 0xa8, 0xdd,
0x3d, 0x95, 0x9c, 0x49, 0x1b, 0x39, 0x2d, 0xd7, 0x9c, 0x8e, 0xf7, 0x87, 0x41, 0xe3, 0xeb, 0x30,
0xb8, 0x3a, 0xda, 0x9f, 0x32, 0xc9, 0xb3, 0xf7, 0x5c, 0xfa, 0x51, 0xa2, 0x76, 0x3c, 0x8f, 0x46,
0x6f, 0x89, 0xf2, 0xab, 0x97, 0x4b, 0xaf, 0xca, 0x92, 0xff, 0x8e, 0x6a, 0x09, 0x67, 0x49, 0xcc,
0xd5, 0xdf, 0x92, 0x9a, 0xa6, 0xe6, 0xbe, 0xec, 0xa3, 0xcf, 0xb2, 0x8f, 0xbe, 0xcb, 0x3e, 0x8a,
0x3a, 0xfa, 0x37, 0x6e, 0x7e, 0x02, 0x00, 0x00, 0xff, 0xff, 0x7c, 0xb0, 0xe7, 0x45, 0x7a, 0x01,
0x00, 0x00,
// 325 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x9c, 0x90, 0xc1, 0x6a, 0xf2, 0x40,
0x1c, 0xc4, 0x5d, 0x8d, 0xf2, 0x7d, 0x7f, 0xa3, 0x84, 0x3d, 0xb4, 0xa1, 0x07, 0x0d, 0x9e, 0x42,
0x8b, 0x09, 0xd8, 0x27, 0xd0, 0x54, 0xda, 0x52, 0xaa, 0xb0, 0x49, 0xa1, 0x37, 0x49, 0xea, 0x26,
0x1b, 0xda, 0xb0, 0xc1, 0xdd, 0x80, 0x3e, 0x53, 0x9f, 0xa3, 0xe0, 0xb1, 0xe7, 0x1e, 0xa4, 0xf8,
0x24, 0x25, 0x1b, 0x29, 0x39, 0xf7, 0xb4, 0x33, 0xcb, 0xcc, 0x8f, 0x3f, 0x03, 0xbd, 0x8c, 0x0a,
0x11, 0x26, 0xd4, 0xc9, 0x37, 0x5c, 0x72, 0xdc, 0x56, 0xcf, 0xc5, 0x38, 0x49, 0x25, 0x2b, 0x22,
0xe7, 0x85, 0x67, 0x6e, 0xc2, 0x13, 0xee, 0xaa, 0xef, 0xa8, 0x88, 0x95, 0x53, 0x46, 0xa9, 0xaa,
0x35, 0x7a, 0x47, 0xd0, 0x25, 0x61, 0x2c, 0x1f, 0x2b, 0x16, 0xbe, 0x02, 0x4d, 0xee, 0x72, 0x6a,
0x22, 0x0b, 0xd9, 0xfd, 0xc9, 0x79, 0x95, 0x72, 0x6a, 0x09, 0x27, 0xd8, 0xe5, 0x94, 0xa8, 0x10,
0x3e, 0x83, 0x4e, 0xbc, 0xe1, 0xd9, 0xfd, 0xda, 0x6c, 0x5a, 0xc8, 0xd6, 0xc8, 0xc9, 0x61, 0x0c,
0xda, 0x3a, 0x94, 0xa1, 0xd9, 0xb2, 0x90, 0xad, 0x13, 0xa5, 0x47, 0x1e, 0x68, 0x65, 0x13, 0xf7,
0xe0, 0xbf, 0xb7, 0x5c, 0xf8, 0xf3, 0x85, 0xff, 0xe4, 0x1b, 0x0d, 0x6c, 0x80, 0x3e, 0x23, 0xcb,
0xe9, 0x8d, 0x37, 0xf5, 0x83, 0x55, 0xf0, 0x6c, 0x20, 0x0c, 0xd0, 0xb9, 0x9d, 0x2b, 0xdd, 0xc4,
0x7d, 0x80, 0x4a, 0xaf, 0xa6, 0xde, 0x83, 0xd1, 0x1a, 0x7d, 0x20, 0x68, 0x13, 0x1a, 0xae, 0x77,
0x78, 0x01, 0xff, 0xe4, 0xf6, 0x2e, 0x14, 0x8c, 0x0a, 0x13, 0x59, 0x2d, 0x5b, 0x9f, 0x4d, 0xf6,
0x87, 0x61, 0xe3, 0xeb, 0x30, 0xbc, 0xac, 0x0d, 0x90, 0x51, 0xc1, 0xf2, 0xb7, 0x42, 0xb8, 0x51,
0x2a, 0xb7, 0xac, 0x88, 0xc6, 0xaf, 0xa9, 0x74, 0xcb, 0xd3, 0x85, 0x53, 0x76, 0xc9, 0x2f, 0x03,
0x07, 0xd0, 0xcd, 0x37, 0x9c, 0xc7, 0x27, 0x64, 0xf3, 0xcf, 0xc8, 0x3a, 0xa6, 0x1c, 0x88, 0xd1,
0x34, 0x61, 0x52, 0x4d, 0xa1, 0x91, 0x93, 0x9b, 0xe9, 0xfb, 0xe3, 0x00, 0x7d, 0x1e, 0x07, 0xe8,
0xfb, 0x38, 0x40, 0x51, 0x47, 0x8d, 0x7c, 0xfd, 0x13, 0x00, 0x00, 0xff, 0xff, 0xcd, 0x50, 0xe3,
0xb8, 0xd1, 0x01, 0x00, 0x00,
}
func (m *RaftMessage) Marshal() (dAtA []byte, err error) {
@ -270,7 +271,21 @@ func (m *Ready) MarshalToSizedBuffer(dAtA []byte) (int, error) {
if m.Height != 0 {
i = encodeVarintMessage(dAtA, i, uint64(m.Height))
i--
dAtA[i] = 0x10
dAtA[i] = 0x18
}
if len(m.ProofHashes) > 0 {
for iNdEx := len(m.ProofHashes) - 1; iNdEx >= 0; iNdEx-- {
{
size := m.ProofHashes[iNdEx].Size()
i -= size
if _, err := m.ProofHashes[iNdEx].MarshalTo(dAtA[i:]); err != nil {
return 0, err
}
i = encodeVarintMessage(dAtA, i, uint64(size))
}
i--
dAtA[i] = 0x12
}
}
if len(m.TxHashes) > 0 {
for iNdEx := len(m.TxHashes) - 1; iNdEx >= 0; iNdEx-- {
@ -334,6 +349,12 @@ func (m *Ready) Size() (n int) {
n += 1 + l + sovMessage(uint64(l))
}
}
if len(m.ProofHashes) > 0 {
for _, e := range m.ProofHashes {
l = e.Size()
n += 1 + l + sovMessage(uint64(l))
}
}
if m.Height != 0 {
n += 1 + sovMessage(uint64(m.Height))
}
@ -540,6 +561,41 @@ func (m *Ready) Unmarshal(dAtA []byte) error {
}
iNdEx = postIndex
case 2:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field ProofHashes", wireType)
}
var byteLen int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowMessage
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
byteLen |= int(b&0x7F) << shift
if b < 0x80 {
break
}
}
if byteLen < 0 {
return ErrInvalidLengthMessage
}
postIndex := iNdEx + byteLen
if postIndex < 0 {
return ErrInvalidLengthMessage
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
var v github_com_meshplus_bitxhub_kit_types.Hash
m.ProofHashes = append(m.ProofHashes, v)
if err := m.ProofHashes[len(m.ProofHashes)-1].Unmarshal(dAtA[iNdEx:postIndex]); err != nil {
return err
}
iNdEx = postIndex
case 3:
if wireType != 0 {
return fmt.Errorf("proto: wrong wireType = %d for field Height", wireType)
}

View File

@ -17,5 +17,7 @@ message RaftMessage {
message Ready {
repeated bytes txHashes = 1 [(gogoproto.customtype) = "github.com/meshplus/bitxhub-kit/types.Hash", (gogoproto.nullable) = false];
uint64 height = 2;
}
repeated bytes proofHashes = 2 [(gogoproto.customtype) = "github.com/meshplus/bitxhub-kit/types.Hash", (gogoproto.nullable) = false];
uint64 height = 3;
}

View File

@ -0,0 +1,113 @@
package txpool
import (
"bytes"
"crypto/sha256"
"encoding/json"
"strings"
"sync"
"github.com/sirupsen/logrus"
"github.com/meshplus/bitxhub-kit/types"
appchainMgr "github.com/meshplus/bitxhub-core/appchain-mgr"
"github.com/meshplus/bitxhub-core/validator"
"github.com/meshplus/bitxhub-model/pb"
"github.com/meshplus/bitxhub/internal/constant"
"github.com/meshplus/bitxhub/internal/executor/contracts"
"github.com/meshplus/bitxhub/internal/ledger"
)
type ProofPool struct {
proofs sync.Map //ibtp proof cache
ledger ledger.Ledger
ve *validator.ValidationEngine
logger logrus.FieldLogger
}
func (pl *ProofPool) extractIBTP(tx *pb.Transaction) *pb.IBTP {
if strings.ToLower(tx.To.String()) != constant.InterchainContractAddr.String() {
return nil
}
if tx.Data.VmType != pb.TransactionData_BVM {
return nil
}
ip := &pb.InvokePayload{}
if err := ip.Unmarshal(tx.Data.Payload); err != nil {
return nil
}
if ip.Method != "HandleIBTP" {
return nil
}
if len(ip.Args) != 1 {
return nil
}
ibtp := &pb.IBTP{}
if err := ibtp.Unmarshal(ip.Args[0].Value); err != nil {
pl.logger.Error(err)
return nil
}
return ibtp
}
func (pl *ProofPool) verifyProof(txHash types.Hash, ibtp *pb.IBTP, proof []byte) (bool, error) {
if proof == nil {
return false, nil
}
proofHash := sha256.Sum256(proof)
if !bytes.Equal(proofHash[:], ibtp.Proof) {
return false, nil
}
app := &appchainMgr.Appchain{}
ok, data := pl.getAccountState(constant.AppchainMgrContractAddr, contracts.AppchainKey(ibtp.From))
if !ok {
return false, nil
}
err := json.Unmarshal(data, app)
if err != nil {
return false, err
}
validateAddr := validator.FabricRuleAddr
rl := &contracts.Rule{}
ok, data = pl.getAccountState(constant.RuleManagerContractAddr, contracts.RuleKey(ibtp.From))
if ok {
if err := json.Unmarshal(data, rl); err != nil {
return false, err
}
validateAddr = rl.Address
} else {
if app.ChainType != "fabric" {
return false, nil
}
}
ok, err = pl.ve.Validate(validateAddr, ibtp.From, proof, ibtp.Payload, app.Validators)
if err != nil {
return false, err
}
return ok, nil
}
func (pl *ProofPool) getAccountState(address constant.BoltContractAddress, key string) (bool, []byte) {
return pl.ledger.GetAccount(address.Address()).GetState([]byte(key))
}
func (pl *ProofPool) putProofHash(txHash types.Hash, proofHash types.Hash) {
pl.proofs.Store(txHash, proofHash)
}
func (pl *ProofPool) getProofHash(txHash types.Hash) types.Hash {
proofHash, ok := pl.proofs.Load(txHash)
if !ok {
return types.Hash{}
}
return proofHash.(types.Hash)
}
func (pl *ProofPool) deleteProofHash(txHash types.Hash) {
pl.proofs.Delete(txHash)
}

View File

@ -8,6 +8,11 @@ import (
"sync/atomic"
"time"
"github.com/meshplus/bitxhub-core/validator"
"github.com/meshplus/bitxhub-kit/log"
"github.com/meshplus/bitxhub/internal/ledger"
"github.com/meshplus/bitxhub/pkg/order"
"github.com/Rican7/retry"
@ -29,6 +34,7 @@ type TxPool struct {
isExecuting bool //only raft leader can execute
pendingTxs *list.List //pending tx pool
presenceTxs sync.Map //tx cache
ledger ledger.Ledger //ledger
ackTxs map[types.Hash]bool //ack tx means get tx by pb.RaftMessage_GET_TX_ACK
readyC chan *raftproto.Ready //ready channel, receive by raft Propose channel
peerMgr peermgr.PeerManager //network manager
@ -36,9 +42,10 @@ type TxPool struct {
reqLookUp *order.ReqLookUp //bloom filter
storage storage.Storage //storage pending tx
config *Config //tx pool config
ctx context.Context //context
cancel context.CancelFunc //stop Execute
getTransactionFunc getTransactionFunc //get transaction by ledger
proofPool *ProofPool
ctx context.Context //context
cancel context.CancelFunc //stop Execute
}
type Config struct {
@ -55,6 +62,13 @@ func New(config *order.Config, storage storage.Storage, txPoolConfig *Config) (*
if err != nil {
return nil, nil
}
proofPool := &ProofPool{
ledger: config.Ledger,
logger: config.Logger,
ve: validator.NewValidationEngine(config.Ledger, log.NewWithModule("validator")),
}
ctx, cancel := context.WithCancel(context.Background())
return &TxPool{
nodeId: config.ID,
@ -66,6 +80,7 @@ func New(config *order.Config, storage storage.Storage, txPoolConfig *Config) (*
ackTxs: make(map[types.Hash]bool),
reqLookUp: reqLookUp,
storage: storage,
proofPool: proofPool,
getTransactionFunc: config.GetTransactionFunc,
config: txPoolConfig,
ctx: ctx,
@ -76,7 +91,7 @@ func New(config *order.Config, storage storage.Storage, txPoolConfig *Config) (*
//AddPendingTx add pending transaction into txpool
func (tp *TxPool) AddPendingTx(tx *pb.Transaction, isAckTx bool) error {
if tp.PoolSize() >= tp.config.PoolSize {
tp.logger.Debugf("Tx pool size: %d is full", tp.PoolSize())
tp.logger.Debugf("Tx pool size: {} is full", tp.PoolSize())
return nil
}
hash := tx.TransactionHash
@ -90,6 +105,12 @@ func (tp *TxPool) AddPendingTx(tx *pb.Transaction, isAckTx bool) error {
return nil
}
}
_, err := tp.checkIBTP(tx)
if err != nil {
return err
}
//add pending tx
tp.pushBack(hash, tx, isAckTx)
@ -100,6 +121,23 @@ func (tp *TxPool) AddPendingTx(tx *pb.Transaction, isAckTx bool) error {
return nil
}
func (tp *TxPool) checkIBTP(tx *pb.Transaction) (bool, error) {
if ibtp := tp.proofPool.extractIBTP(tx); ibtp != nil {
ok, err := tp.proofPool.verifyProof(tx.TransactionHash, ibtp, tx.Extra)
if err != nil {
return false, err
}
if ok {
hash := types.Bytes2Hash(ibtp.Proof)
tp.proofPool.putProofHash(tx.TransactionHash, hash)
}
if !ok {
return false, fmt.Errorf("ibtp verify fail:%s", ibtp.ID())
}
}
return true, nil
}
//packFullBlock immediately pack if it is greater than the total amount of block transactions
func (tp *TxPool) packFullBlock() {
tp.Lock()
@ -131,6 +169,7 @@ func (tp *TxPool) RemoveTxs(hashes []types.Hash, isLeader bool) {
}
}
tp.presenceTxs.Delete(hash)
tp.proofPool.deleteProofHash(hash)
}
}
@ -205,26 +244,32 @@ func (tp *TxPool) periodPackBlock() {
//ready pack the block
func (tp *TxPool) ready(size int) *raftproto.Ready {
hashes := make([]types.Hash, 0, size)
txHashes := make([]types.Hash, 0, size)
proofHashes := make([]types.Hash, 0, size)
for i := 0; i < size; i++ {
front := tp.pendingTxs.Front()
tx := front.Value.(*pb.Transaction)
hash := tx.TransactionHash
txHash := tx.TransactionHash
tp.pendingTxs.Remove(front)
if _, ok := tp.ackTxs[hash]; ok {
delete(tp.ackTxs, hash)
if _, ok := tp.ackTxs[txHash]; ok {
delete(tp.ackTxs, txHash)
continue
}
hashes = append(hashes, hash)
proofHash := tp.proofPool.getProofHash(txHash)
txHashes = append(txHashes, txHash)
proofHashes = append(proofHashes, proofHash)
}
if len(hashes) == 0 {
if len(txHashes) == 0 {
return nil
}
height := tp.UpdateHeight()
return &raftproto.Ready{
TxHashes: hashes,
Height: height,
TxHashes: txHashes,
ProofHashes: proofHashes,
Height: height,
}
}