From d156957976b970abd8bab0cad91de7543cb4955a Mon Sep 17 00:00:00 2001 From: Lizen <“yhmocmoc@gmail.com”> Date: Thu, 24 Sep 2020 15:11:33 +0800 Subject: [PATCH] feat(mempool): add mempool feature --- config/order.toml | 12 +- go.mod | 2 + go.sum | 2 + internal/executor/executor.go | 3 +- pkg/order/etcdraft/config.go | 67 +- pkg/order/etcdraft/node.go | 223 +++--- pkg/order/etcdraft/txpool/txpool.go | 411 ---------- pkg/order/mempool/btree_index.go | 86 ++ pkg/order/mempool/btree_index_test.go | 28 + pkg/order/mempool/mempool.go | 121 +++ pkg/order/mempool/mempool_impl.go | 577 +++++++++++++ pkg/order/mempool/mempool_test.go | 264 ++++++ pkg/order/mempool/message.pb.go | 1067 +++++++++++++++++++++++++ pkg/order/mempool/message.proto | 23 + pkg/order/mempool/network.go | 27 + pkg/order/mempool/storage.go | 54 ++ pkg/order/mempool/tx_cache.go | 101 +++ pkg/order/mempool/tx_store.go | 140 ++++ pkg/order/mempool/types.go | 95 +++ pkg/order/mempool/utils.go | 146 ++++ 20 files changed, 2884 insertions(+), 565 deletions(-) delete mode 100644 pkg/order/etcdraft/txpool/txpool.go create mode 100644 pkg/order/mempool/btree_index.go create mode 100644 pkg/order/mempool/btree_index_test.go create mode 100644 pkg/order/mempool/mempool.go create mode 100644 pkg/order/mempool/mempool_impl.go create mode 100644 pkg/order/mempool/mempool_test.go create mode 100644 pkg/order/mempool/message.pb.go create mode 100644 pkg/order/mempool/message.proto create mode 100644 pkg/order/mempool/network.go create mode 100644 pkg/order/mempool/storage.go create mode 100644 pkg/order/mempool/tx_cache.go create mode 100644 pkg/order/mempool/tx_store.go create mode 100644 pkg/order/mempool/types.go create mode 100644 pkg/order/mempool/utils.go diff --git a/config/order.toml b/config/order.toml index 1b6aa75..5e8de1f 100644 --- a/config/order.toml +++ b/config/order.toml @@ -6,7 +6,13 @@ max_inflight_msgs = 500 # MaxInflightMsgs limits the max number of in- check_quorum = true # Leader steps down when quorum is not active for an electionTimeout. pre_vote = true # PreVote prevents reconnected node from disturbing network. disable_proposal_forwarding = true # This prevents blocks from being accidentally proposed by followers. - [raft.tx_pool] - pack_size = 500 # How many transactions should the primary pack. + + [raft.mempool] + batch_size = 200 # How many transactions should the primary pack. pool_size = 50000 # How many transactions could the txPool stores in total. - block_tick = "500ms" # Block packaging time period. \ No newline at end of file + tx_slice_size = 10 # How many transactions should the node broadcast at once + + batch_tick = "0.3s" # Block packaging time period. + tx_slice_timeout = "0.1s" # Node broadcasts transactions if there are cached transactions, although set_size isn't reached yet + fetch_timeout = "3s" # How long to wait before fetching missing transactions finished + diff --git a/go.mod b/go.mod index 4796ccd..4177328 100644 --- a/go.mod +++ b/go.mod @@ -11,6 +11,7 @@ require ( github.com/gogo/protobuf v1.3.1 github.com/golang/mock v1.4.3 github.com/golang/protobuf v1.4.2 // indirect + github.com/google/btree v1.0.0 github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 github.com/grpc-ecosystem/grpc-gateway v1.13.0 github.com/hokaccha/go-prettyjson v0.0.0-20190818114111-108c894c2c0e @@ -22,6 +23,7 @@ require ( github.com/meshplus/go-lightp2p v0.0.0-20200817105923-6b3aee40fa54 github.com/mitchellh/go-homedir v1.1.0 github.com/multiformats/go-multiaddr v0.2.2 + github.com/orcaman/concurrent-map v0.0.0-20190826125027-8c72a8bb44f6 github.com/pkg/errors v0.9.1 github.com/prometheus/client_golang v1.5.0 github.com/rs/cors v1.7.0 diff --git a/go.sum b/go.sum index d692b67..38559d6 100644 --- a/go.sum +++ b/go.sum @@ -818,6 +818,8 @@ github.com/opentracing/opentracing-go v1.0.2 h1:3jA2P6O1F9UOrWVpwrIo17pu01KWvNWg github.com/opentracing/opentracing-go v1.0.2/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o= github.com/opentracing/opentracing-go v1.1.0 h1:pWlfV3Bxv7k65HYwkikxat0+s3pV4bsqf19k25Ur8rU= github.com/opentracing/opentracing-go v1.1.0/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o= +github.com/orcaman/concurrent-map v0.0.0-20190826125027-8c72a8bb44f6 h1:lNCW6THrCKBiJBpz8kbVGjC7MgdCGKwuvBgc7LoD6sw= +github.com/orcaman/concurrent-map v0.0.0-20190826125027-8c72a8bb44f6/go.mod h1:Lu3tH6HLW3feq74c2GC+jIMS/K2CFcDWnWD9XkenwhI= github.com/pborman/uuid v0.0.0-20170112150404-1b00554d8222/go.mod h1:VyrYX9gd7irzKovcSS6BIIEwPRkP2Wm2m9ufcdFSJ34= github.com/pelletier/go-toml v1.2.0 h1:T5zMGML61Wp+FlcbWjRDT7yAxhJNAiPPLOFECq181zc= github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic= diff --git a/internal/executor/executor.go b/internal/executor/executor.go index b07e81d..bf343a1 100755 --- a/internal/executor/executor.go +++ b/internal/executor/executor.go @@ -8,7 +8,6 @@ import ( "github.com/ethereum/go-ethereum/event" "github.com/meshplus/bitxhub-core/validator" - "github.com/meshplus/bitxhub-kit/log" "github.com/meshplus/bitxhub-kit/types" "github.com/meshplus/bitxhub-model/pb" "github.com/meshplus/bitxhub/internal/constant" @@ -52,7 +51,7 @@ type BlockExecutor struct { // New creates executor instance func New(chainLedger ledger.Ledger, logger logrus.FieldLogger) (*BlockExecutor, error) { - ibtpVerify := proof.New(chainLedger, log.NewWithModule("proof")) + ibtpVerify := proof.New(chainLedger, logger) boltContracts := registerBoltContracts() diff --git a/pkg/order/etcdraft/config.go b/pkg/order/etcdraft/config.go index 9ec26f7..99fcb22 100644 --- a/pkg/order/etcdraft/config.go +++ b/pkg/order/etcdraft/config.go @@ -13,43 +13,39 @@ type RAFTConfig struct { RAFT RAFT } -type TxPoolConfig struct { - PackSize int `mapstructure:"pack_size"` - BlockTick time.Duration `mapstructure:"block_tick"` - PoolSize int `mapstructure:"pool_size"` +type MempoolConfig struct { + BatchSize uint64 `mapstructure:"batch_size"` + PoolSize uint64 `mapstructure:"pool_size"` + TxSliceSize uint64 `mapstructure:"tx_slice_size"` + + BatchTick time.Duration `mapstructure:"batch_tick"` + FetchTimeout time.Duration `mapstructure:"fetch_timeout"` + TxSliceTimeout time.Duration `mapstructure:"tx_slice_timeout"` } type RAFT struct { - ElectionTick int `mapstructure:"election_tick"` - HeartbeatTick int `mapstructure:"heartbeat_tick"` - MaxSizePerMsg uint64 `mapstructure:"max_size_per_msg"` - MaxInflightMsgs int `mapstructure:"max_inflight_msgs"` - CheckQuorum bool `mapstructure:"check_quorum"` - PreVote bool `mapstructure:"pre_vote"` - DisableProposalForwarding bool `mapstructure:"disable_proposal_forwarding"` - TxPoolConfig TxPoolConfig `mapstructure:"tx_pool"` + ElectionTick int `mapstructure:"election_tick"` + HeartbeatTick int `mapstructure:"heartbeat_tick"` + MaxSizePerMsg uint64 `mapstructure:"max_size_per_msg"` + MaxInflightMsgs int `mapstructure:"max_inflight_msgs"` + CheckQuorum bool `mapstructure:"check_quorum"` + PreVote bool `mapstructure:"pre_vote"` + DisableProposalForwarding bool `mapstructure:"disable_proposal_forwarding"` + MempoolConfig MempoolConfig `mapstructure:"mempool"` } func defaultRaftConfig() raft.Config { return raft.Config{ - ElectionTick: 10, //ElectionTick is the number of Node.Tick invocations that must pass between elections.(s) - HeartbeatTick: 1, //HeartbeatTick is the number of Node.Tick invocations that must pass between heartbeats.(s) - MaxSizePerMsg: 1024 * 1024, //1024*1024, MaxSizePerMsg limits the max size of each append message. - MaxInflightMsgs: 500, //MaxInflightMsgs limits the max number of in-flight append messages during optimistic replication phase. + ElectionTick: 10, // ElectionTick is the number of Node.Tick invocations that must pass between elections.(s) + HeartbeatTick: 1, // HeartbeatTick is the number of Node.Tick invocations that must pass between heartbeats.(s) + MaxSizePerMsg: 1024 * 1024, // 1024*1024, MaxSizePerMsg limits the max size of each append message. + MaxInflightMsgs: 500, // MaxInflightMsgs limits the max number of in-flight append messages during optimistic replication phase. PreVote: true, // PreVote prevents reconnected node from disturbing network. CheckQuorum: true, // Leader steps down when quorum is not active for an electionTimeout. DisableProposalForwarding: true, // This prevents blocks from being accidentally proposed by followers } } -func defaultTxPoolConfig() TxPoolConfig { - return TxPoolConfig{ - PackSize: 500, // How many transactions should the primary pack. - BlockTick: 500 * time.Millisecond, //Block packaging time period. - PoolSize: 50000, //How many transactions could the txPool stores in total. - } -} - func generateRaftConfig(id uint64, repoRoot string, logger logrus.FieldLogger, ram MemoryStorage) (*raft.Config, error) { readConfig, err := readConfig(repoRoot) if err != nil { @@ -77,22 +73,19 @@ func generateRaftConfig(id uint64, repoRoot string, logger logrus.FieldLogger, r return &defaultConfig, nil } -func generateTxPoolConfig(repoRoot string) (*TxPoolConfig, error) { +func generateMempoolConfig(repoRoot string) (*MempoolConfig, error) { readConfig, err := readConfig(repoRoot) if err != nil { - return &TxPoolConfig{}, nil + return nil, err } - defaultTxPoolConfig := defaultTxPoolConfig() - if readConfig.RAFT.TxPoolConfig.BlockTick > 0 { - defaultTxPoolConfig.BlockTick = readConfig.RAFT.TxPoolConfig.BlockTick - } - if readConfig.RAFT.TxPoolConfig.PackSize > 0 { - defaultTxPoolConfig.PackSize = readConfig.RAFT.TxPoolConfig.PackSize - } - if readConfig.RAFT.TxPoolConfig.PoolSize > 0 { - defaultTxPoolConfig.PoolSize = readConfig.RAFT.TxPoolConfig.PoolSize - } - return &defaultTxPoolConfig, nil + mempoolConf := &MempoolConfig{} + mempoolConf.BatchSize = readConfig.RAFT.MempoolConfig.BatchSize + mempoolConf.PoolSize = readConfig.RAFT.MempoolConfig.PoolSize + mempoolConf.TxSliceSize = readConfig.RAFT.MempoolConfig.TxSliceSize + mempoolConf.BatchTick = readConfig.RAFT.MempoolConfig.BatchTick + mempoolConf.FetchTimeout = readConfig.RAFT.MempoolConfig.FetchTimeout + mempoolConf.TxSliceTimeout = readConfig.RAFT.MempoolConfig.TxSliceTimeout + return mempoolConf, nil } func readConfig(repoRoot string) (*RAFTConfig, error) { diff --git a/pkg/order/etcdraft/node.go b/pkg/order/etcdraft/node.go index 8fa3a78..2094dd6 100644 --- a/pkg/order/etcdraft/node.go +++ b/pkg/order/etcdraft/node.go @@ -5,19 +5,21 @@ import ( "encoding/binary" "fmt" "path/filepath" - "sort" "sync" + "sync/atomic" "time" - "github.com/coreos/etcd/raft" - "github.com/coreos/etcd/raft/raftpb" "github.com/meshplus/bitxhub-kit/types" "github.com/meshplus/bitxhub-model/pb" "github.com/meshplus/bitxhub/pkg/order" raftproto "github.com/meshplus/bitxhub/pkg/order/etcdraft/proto" - "github.com/meshplus/bitxhub/pkg/order/etcdraft/txpool" + "github.com/meshplus/bitxhub/pkg/order/mempool" "github.com/meshplus/bitxhub/pkg/peermgr" "github.com/meshplus/bitxhub/pkg/storage" + + "github.com/coreos/etcd/raft" + "github.com/coreos/etcd/raft/raftpb" + "github.com/gogo/protobuf/sortkeys" "github.com/sirupsen/logrus" ) @@ -37,11 +39,11 @@ type Node struct { errorC chan<- error // errors from raft session raftStorage *RaftStorage // the raft backend storage system - tp *txpool.TxPool // transaction pool storage storage.Storage // db + mempool mempool.MemPool // transaction pool - repoRoot string //project path - logger logrus.FieldLogger //logger + repoRoot string // project path + logger logrus.FieldLogger // logger blockAppliedIndex sync.Map // mapping of block height and apply index in raft log appliedIndex uint64 // current apply index in raft log @@ -50,7 +52,7 @@ type Node struct { lastIndex uint64 // last apply index in raft log readyPool *sync.Pool // ready pool, avoiding memory growth fast - readyCache sync.Map //ready cache + readyCache sync.Map // ready cache ctx context.Context // context haltC chan struct{} // exit signal } @@ -73,34 +75,42 @@ func NewNode(opts ...order.Option) (order.Order, error) { return nil, err } - //generate raft peers + // generate raft peers peers, err := GenerateRaftPeers(config) + if err != nil { return nil, fmt.Errorf("generate raft peers: %w", err) } - //generate txpool config - tpc, err := generateTxPoolConfig(repoRoot) + batchC := make(chan *raftproto.Ready) + memConfig, err := generateMempoolConfig(repoRoot) if err != nil { return nil, fmt.Errorf("generate raft txpool config: %w", err) } - txPoolConfig := &txpool.Config{ - PackSize: tpc.PackSize, - BlockTick: tpc.BlockTick, - PoolSize: tpc.PoolSize, + mempoolConf := &mempool.Config{ + ID: config.ID, + PeerMgr: config.PeerMgr, + ChainHeight: config.Applied, + GetTransactionFunc: config.GetTransactionFunc, + + BatchSize: memConfig.BatchSize, + BatchTick: memConfig.BatchTick, + PoolSize: memConfig.PoolSize, + TxSliceSize: memConfig.TxSliceSize, + FetchTimeout: memConfig.FetchTimeout, + TxSliceTimeout: memConfig.TxSliceTimeout, } - txPool, proposeC := txpool.New(config, dbStorage, txPoolConfig) + mempoolInst := mempool.NewMempool(mempoolConf, dbStorage, batchC) readyPool := &sync.Pool{New: func() interface{} { return new(raftproto.Ready) }} return &Node{ id: config.ID, - proposeC: proposeC, + proposeC: batchC, confChangeC: make(chan raftpb.ConfChange), commitC: make(chan *pb.Block, 1024), errorC: make(chan<- error), - tp: txPool, repoRoot: repoRoot, snapCount: defaultSnapshotCount, peerMgr: config.PeerMgr, @@ -110,12 +120,13 @@ func NewNode(opts ...order.Option) (order.Order, error) { raftStorage: raftStorage, readyPool: readyPool, ctx: context.Background(), + mempool: mempoolInst, }, nil } -//Start or restart raft node +// Start or restart raft node func (n *Node) Start() error { - n.blockAppliedIndex.Store(n.tp.GetHeight(), n.loadAppliedIndex()) + n.blockAppliedIndex.Store(n.mempool.GetChainHeight(), n.loadAppliedIndex()) rc, err := generateRaftConfig(n.id, n.repoRoot, n.logger, n.raftStorage.ram) if err != nil { return fmt.Errorf("generate raft config: %w", err) @@ -127,31 +138,25 @@ func (n *Node) Start() error { } go n.run() + n.mempool.Start() n.logger.Info("Consensus module started") return nil } -//Stop the raft node +// Stop the raft node func (n *Node) Stop() { - n.tp.CheckExecute(false) + n.mempool.Stop() n.node.Stop() n.logger.Infof("Consensus stopped") } -//Add the transaction into txpool and broadcast it to other nodes +// Add the transaction into txpool and broadcast it to other nodes func (n *Node) Prepare(tx *pb.Transaction) error { if !n.Ready() { return nil } - if err := n.tp.AddPendingTx(tx, false); err != nil { - return err - } - if err := n.tp.Broadcast(tx); err != nil { - return err - } - - return nil + return n.mempool.RecvTransaction(tx) } func (n *Node) Commit() chan *pb.Block { @@ -170,20 +175,20 @@ func (n *Node) ReportState(height uint64, hash types.Hash) { n.logger.Errorf("can not found appliedIndex:", height) return } - //block already persisted, record the apply index in db + // block already persisted, record the apply index in db n.writeAppliedIndex(appliedIndex.(uint64)) n.blockAppliedIndex.Delete(height) - n.tp.BuildReqLookUp() //store bloom filter - - ready, ok := n.readyCache.Load(height) + // TODO: delete readyCache + readyBytes, ok := n.readyCache.Load(height) if !ok { n.logger.Errorf("can not found ready:", height) return } - hashes := ready.(*raftproto.Ready).TxHashes - // remove redundant tx - n.tp.BatchDelete(hashes) + ready := readyBytes.(*raftproto.Ready) + + // clean related mempool info + n.mempool.CommitTransactions(ready) n.readyCache.Delete(height) } @@ -204,47 +209,32 @@ func (n *Node) Step(ctx context.Context, msg []byte) error { return err } return n.node.Step(ctx, *msg) + case raftproto.RaftMessage_GET_TX: - hash := types.Hash{} - if err := hash.Unmarshal(rm.Data); err != nil { + fetchTxnRequest := &mempool.FetchTxnRequest{} + if err := fetchTxnRequest.Unmarshal(rm.Data); err != nil { return err } - tx, ok := n.tp.GetTx(hash, true) - if !ok { - return nil - } - v, err := tx.Marshal() - if err != nil { - return err - } - txAck := &raftproto.RaftMessage{ - Type: raftproto.RaftMessage_GET_TX_ACK, - Data: v, - } - txAckData, err := txAck.Marshal() - if err != nil { - return err - } - m := &pb.Message{ - Type: pb.Message_CONSENSUS, - Data: txAckData, - } - return n.peerMgr.AsyncSend(rm.FromId, m) + n.mempool.RecvFetchTxnRequest(fetchTxnRequest) + case raftproto.RaftMessage_GET_TX_ACK: - tx := &pb.Transaction{} - if err := tx.Unmarshal(rm.Data); err != nil { + fetchTxnResponse := &mempool.FetchTxnResponse{} + if err := fetchTxnResponse.Unmarshal(rm.Data); err != nil { return err } - return n.tp.AddPendingTx(tx, true) + n.mempool.RecvFetchTxnResponse(fetchTxnResponse) + case raftproto.RaftMessage_BROADCAST_TX: - tx := &pb.Transaction{} - if err := tx.Unmarshal(rm.Data); err != nil { + txSlice := &mempool.TxSlice{} + if err := txSlice.Unmarshal(rm.Data); err != nil { return err } - return n.tp.AddPendingTx(tx, false) + n.mempool.RecvForwardTxs(txSlice) + default: return fmt.Errorf("unexpected raft message received") } + return nil } func (n *Node) IsLeader() bool { @@ -281,14 +271,14 @@ func (n *Node) run() { n.proposeC = nil } else { if !n.IsLeader() { - n.tp.CheckExecute(false) + n.logger.Warn("Follower node can't propose a proposal") + n.mempool.UpdateLeader(n.leader) continue } data, err := ready.Marshal() if err != nil { n.logger.Panic(err) } - n.tp.BatchStore(ready.TxHashes) if err := n.node.Propose(n.ctx, data); err != nil { n.logger.Panic("Failed to propose block [%d] to raft: %s", ready.Height, err) } @@ -332,16 +322,16 @@ func (n *Node) run() { } } if rd.SoftState != nil { - n.leader = rd.SoftState.Lead - n.tp.CheckExecute(n.IsLeader()) + newLeader := atomic.LoadUint64(&rd.SoftState.Lead) + n.leader = newLeader + n.mempool.UpdateLeader(newLeader) } // 3: AsyncSend all Messages to the nodes named in the To field. go n.send(rd.Messages) n.maybeTriggerSnapshot() - // 4: Call Node.Advance() to signal readiness for the next batch of - // updates. + // 4: Call Node.Advance() to signal readiness for the next batch of updates. n.node.Advance() case <-n.ctx.Done(): n.Stop() @@ -454,57 +444,61 @@ func (n *Node) publishEntries(ents []raftpb.Entry) bool { return true } -//mint the block +// mint the block func (n *Node) mint(ready *raftproto.Ready) { n.logger.WithFields(logrus.Fields{ "height": ready.Height, "count": len(ready.TxHashes), }).Debugln("block will be generated") - //follower node update the block height - if n.tp.GetHeight() == ready.Height-1 { - n.tp.UpdateHeight() - } - loseTxs := make([]types.Hash, 0) - txs := make([]*pb.Transaction, 0, len(ready.TxHashes)) - for _, hash := range ready.TxHashes { - _, ok := n.tp.GetTx(hash, false) - if !ok { - loseTxs = append(loseTxs, hash) + + // follower node update the block height + expectHeight := n.mempool.GetChainHeight() + if !n.IsLeader() { + if expectHeight != ready.Height-1 { + n.logger.Warningf("Receive batch %d, but not match, expect height: %d", ready.Height, expectHeight+1) + return } + n.mempool.IncreaseChainHeight() } - //handle missing txs - if len(loseTxs) != 0 { - var wg sync.WaitGroup - wg.Add(len(loseTxs)) - for _, hash := range loseTxs { - go func(hash types.Hash) { - defer wg.Done() - n.tp.FetchTx(hash, ready.Height) - }(hash) + missingTxsHash, txList := n.mempool.GetBlock(ready) + // handle missing txs + if len(missingTxsHash) != 0 { + waitLostTxnC := make(chan bool) + lostTxnEvent := &mempool.LocalMissingTxnEvent{ + Height: ready.Height, + WaitC: waitLostTxnC, + MissingTxnHashList: missingTxsHash, } - wg.Wait() - } - for _, hash := range ready.TxHashes { - tx, _ := n.tp.GetTx(hash, false) - txs = append(txs, tx) + // NOTE!!! block until finishing fetching the missing txs + n.mempool.FetchTxn(lostTxnEvent) + select { + case isSuccess := <-waitLostTxnC: + if !isSuccess { + n.logger.Error("Fetch missing txn failed") + return + } + n.logger.Debug("Fetch missing transactions success") + + case <-time.After(mempool.DefaultFetchTxnTimeout): + // TODO: add fetch request resend timer + n.logger.Debugf("Fetch missing transactions timeout, block height: %d", ready.Height) + return + } + + if missingTxsHash, txList = n.mempool.GetBlock(ready); len(missingTxsHash) != 0 { + n.logger.Error("Still missing transaction") + return + } } - n.tp.RemoveTxs(ready.TxHashes, n.IsLeader()) block := &pb.Block{ BlockHeader: &pb.BlockHeader{ Version: []byte("1.0.0"), Number: ready.Height, Timestamp: time.Now().UnixNano(), }, - Transactions: txs, - } - n.logger.WithFields(logrus.Fields{ - "txpool_size": n.tp.PoolSize(), - }).Debugln("current tx pool size") - - if len(loseTxs) == len(ready.TxHashes) { - block.Extra = []byte("updateState") + Transactions: txList, } n.readyCache.Store(ready.Height, ready) n.commitC <- block @@ -555,12 +549,17 @@ func (n *Node) maybeTriggerSnapshot() { func GenerateRaftPeers(config *order.Config) ([]raft.Peer, error) { nodes := config.Nodes peers := make([]raft.Peer, 0, len(nodes)) - for id, node := range nodes { - peers = append(peers, raft.Peer{ID: id, Context: node.Bytes()}) + // sort by node id + idSlice := make([]uint64, len(nodes)) + for id := range nodes { + idSlice = append(idSlice, id) + } + sortkeys.Uint64s(idSlice) + + for _, id := range idSlice { + addr := nodes[id] + peers = append(peers, raft.Peer{ID: id, Context: addr.Bytes()}) } - sort.Slice(peers, func(i, j int) bool { - return peers[i].ID < peers[j].ID - }) return peers, nil } diff --git a/pkg/order/etcdraft/txpool/txpool.go b/pkg/order/etcdraft/txpool/txpool.go deleted file mode 100644 index e5da648..0000000 --- a/pkg/order/etcdraft/txpool/txpool.go +++ /dev/null @@ -1,411 +0,0 @@ -package txpool - -import ( - "container/list" - "context" - "fmt" - "sync" - "sync/atomic" - "time" - - "github.com/meshplus/bitxhub/pkg/order" - - "github.com/Rican7/retry" - "github.com/Rican7/retry/strategy" - "github.com/meshplus/bitxhub-kit/types" - "github.com/meshplus/bitxhub-model/pb" - raftproto "github.com/meshplus/bitxhub/pkg/order/etcdraft/proto" - "github.com/meshplus/bitxhub/pkg/peermgr" - "github.com/meshplus/bitxhub/pkg/storage" - "github.com/sirupsen/logrus" -) - -type getTransactionFunc func(hash types.Hash) (*pb.Transaction, error) - -type TxPool struct { - sync.RWMutex //lock for the pendingTxs - nodeId uint64 //node id - height uint64 //current block height - isExecuting bool //only raft leader can execute - pendingTxs *list.List //pending tx pool - presenceTxs sync.Map //tx cache - ackTxs map[types.Hash]bool //ack tx means get tx by pb.RaftMessage_GET_TX_ACK - readyC chan *raftproto.Ready //ready channel, receive by raft Propose channel - peerMgr peermgr.PeerManager //network manager - logger logrus.FieldLogger //logger - reqLookUp *order.ReqLookUp //bloom filter - storage storage.Storage //storage pending tx - config *Config //tx pool config - ctx context.Context //context - cancel context.CancelFunc //stop Execute - getTransactionFunc getTransactionFunc //get transaction by ledger -} - -type Config struct { - PackSize int //how many transactions should the primary pack - BlockTick time.Duration //block packaging time period - PoolSize int //how many transactions could the txPool stores in total - SetSize int //how many transactions should the node broadcast at once -} - -//New txpool -func New(config *order.Config, storage storage.Storage, txPoolConfig *Config) (*TxPool, chan *raftproto.Ready) { - readyC := make(chan *raftproto.Ready) - reqLookUp, err := order.NewReqLookUp(storage, config.Logger) - if err != nil { - return nil, nil - } - ctx, cancel := context.WithCancel(context.Background()) - return &TxPool{ - nodeId: config.ID, - peerMgr: config.PeerMgr, - logger: config.Logger, - readyC: readyC, - height: config.Applied, - pendingTxs: list.New(), - ackTxs: make(map[types.Hash]bool), - reqLookUp: reqLookUp, - storage: storage, - getTransactionFunc: config.GetTransactionFunc, - config: txPoolConfig, - ctx: ctx, - cancel: cancel, - }, readyC -} - -//AddPendingTx add pending transaction into txpool -func (tp *TxPool) AddPendingTx(tx *pb.Transaction, isAckTx bool) error { - if tp.PoolSize() >= tp.config.PoolSize { - tp.logger.Debugf("Tx pool size: %d is full", tp.PoolSize()) - return nil - } - hash := tx.TransactionHash - if e := tp.get(hash); e != nil { - return nil - } - //look up by bloom filter - if ok := tp.reqLookUp.LookUp(hash.Bytes()); ok { - //find the tx again by ledger if hash in bloom filter - if tx, _ := tp.getTransactionFunc(hash); tx != nil { - return nil - } - } - //add pending tx - tp.pushBack(hash, tx, isAckTx) - - //immediately pack if it is greater than the total amount of block transactions - if tp.isExecuting { - tp.packFullBlock() - } - return nil -} - -//packFullBlock immediately pack if it is greater than the total amount of block transactions -func (tp *TxPool) packFullBlock() { - tp.Lock() - defer tp.Unlock() - l := tp.pendingTxs.Len() - if l < tp.config.PackSize { - return - } - if r := tp.ready(tp.config.PackSize); r != nil { - tp.readyC <- r - } -} - -//Current txpool's size -func (tp *TxPool) PoolSize() int { - tp.RLock() - defer tp.RUnlock() - return tp.pendingTxs.Len() -} - -//RemoveTxs remove txs from the cache -func (tp *TxPool) RemoveTxs(hashes []types.Hash, isLeader bool) { - tp.Lock() - defer tp.Unlock() - for _, hash := range hashes { - if !isLeader { - if e := tp.get(hash); e != nil { - tp.pendingTxs.Remove(e) - } - } - tp.presenceTxs.Delete(hash) - } - -} - -//BuildReqLookUp store the bloom filter -func (tp *TxPool) BuildReqLookUp() { - if err := tp.reqLookUp.Build(); err != nil { - tp.logger.Errorf("bloom filter persistence error:", err) - } -} - -//CheckExecute checks the txpool status, only leader node can run Execute() -func (tp *TxPool) CheckExecute(isLeader bool) { - if isLeader { - if !tp.isExecuting { - go tp.execute() - } - } else { - if tp.isExecuting { - tp.cancel() - } - } -} - -//execute init -func (tp *TxPool) executeInit() { - tp.Lock() - defer tp.Unlock() - tp.isExecuting = true - tp.pendingTxs.Init() - tp.presenceTxs = sync.Map{} - tp.logger.Infoln("start txpool execute") -} - -//execute schedule to collect txs to the ready channel -func (tp *TxPool) execute() { - tp.executeInit() - ticker := time.NewTicker(tp.config.BlockTick) - defer ticker.Stop() - - for { - select { - case <-ticker.C: - tp.periodPackBlock() - case <-tp.ctx.Done(): - tp.isExecuting = false - tp.logger.Infoln("done txpool execute") - return - } - } - -} - -func (tp *TxPool) periodPackBlock() { - tp.Lock() - defer tp.Unlock() - l := tp.pendingTxs.Len() - if l == 0 { - return - } - - var size int - if l > tp.config.PackSize { - size = tp.config.PackSize - } else { - size = l - } - if r := tp.ready(size); r != nil { - tp.readyC <- r - } -} - -//ready pack the block -func (tp *TxPool) ready(size int) *raftproto.Ready { - hashes := make([]types.Hash, 0, size) - for i := 0; i < size; i++ { - front := tp.pendingTxs.Front() - tx := front.Value.(*pb.Transaction) - hash := tx.TransactionHash - tp.pendingTxs.Remove(front) - if _, ok := tp.ackTxs[hash]; ok { - delete(tp.ackTxs, hash) - continue - } - hashes = append(hashes, hash) - - } - if len(hashes) == 0 { - return nil - } - height := tp.UpdateHeight() - return &raftproto.Ready{ - TxHashes: hashes, - Height: height, - } -} - -//UpdateHeight add the block height -func (tp *TxPool) UpdateHeight() uint64 { - return atomic.AddUint64(&tp.height, 1) -} - -//GetHeight get current block height -func (tp *TxPool) GetHeight() uint64 { - return atomic.LoadUint64(&tp.height) -} - -//GetTx get the transaction by txpool or ledger -func (tp *TxPool) GetTx(hash types.Hash, findByStore bool) (*pb.Transaction, bool) { - if e := tp.get(hash); e != nil { - return e.Value.(*pb.Transaction), true - } - if findByStore { - // find by txpool store - tx, ok := tp.load(hash) - if ok { - return tx, true - } - // find by ledger - tx, err := tp.getTransactionFunc(hash) - if err != nil { - return nil, false - } - return tx, true - } - return nil, false -} - -//Broadcast the new transaction to other nodes -func (tp *TxPool) Broadcast(tx *pb.Transaction) error { - data, err := tx.Marshal() - if err != nil { - return err - } - rm := &raftproto.RaftMessage{ - Type: raftproto.RaftMessage_BROADCAST_TX, - Data: data, - } - cmData, err := rm.Marshal() - if err != nil { - return err - } - msg := &pb.Message{ - Type: pb.Message_CONSENSUS, - Data: cmData, - } - - for id := range tp.peerMgr.Peers() { - if id == tp.nodeId { - continue - } - if err := tp.peerMgr.AsyncSend(id, msg); err != nil { - tp.logger.Debugf("send tx to:%d %s", id, err.Error()) - continue - } - } - return nil -} - -// Fetch tx by local txpool or network -func (tp *TxPool) FetchTx(hash types.Hash, height uint64) *pb.Transaction { - if tx, ok := tp.GetTx(hash, false); ok { - return tx - } - raftMessage := &raftproto.RaftMessage{ - Type: raftproto.RaftMessage_GET_TX, - FromId: tp.nodeId, - Data: hash.Bytes(), - } - rmData, err := raftMessage.Marshal() - if err != nil { - return nil - } - m := &pb.Message{ - Type: pb.Message_CONSENSUS, - Data: rmData, - } - - asyncGet := func() (tx *pb.Transaction, err error) { - if tx, ok := tp.GetTx(hash, false); ok { - return tx, nil - } - if err := tp.peerMgr.Broadcast(m); err != nil { - tp.logger.Debugln(err) - } - return nil, fmt.Errorf("can't get tx: %s, block_height:%d", hash.String(), height) - } - - var tx *pb.Transaction - if err := retry.Retry(func(attempt uint) (err error) { - tx, err = asyncGet() - if err != nil { - //retry times > 2 - if attempt > 2 { - tp.logger.Debugln(err) - } - return err - } - return nil - }, strategy.Wait(50*time.Millisecond)); err != nil { - tp.logger.Errorln(err) - } - return tx -} - -func (tp *TxPool) get(key types.Hash) *list.Element { - e, ok := tp.presenceTxs.Load(key) - if ok { - return e.(*list.Element) - } - return nil -} - -func (tp *TxPool) pushBack(key types.Hash, value interface{}, isAckTx bool) *list.Element { - tp.Lock() - defer tp.Unlock() - if e := tp.get(key); e != nil { - return nil - } - if isAckTx { - tp.ackTxs[key] = true - } - e := tp.pendingTxs.PushBack(value) - tp.presenceTxs.Store(key, e) - return e -} - -func compositeKey(value interface{}) []byte { - var prefix = []byte("tx-") - return append(prefix, []byte(fmt.Sprintf("%v", value))...) -} - -func (tp *TxPool) store(tx *pb.Transaction) { - txKey := compositeKey(tx.TransactionHash.Bytes()) - txData, _ := tx.Marshal() - tp.storage.Put(txKey, txData) -} - -func (tp *TxPool) load(hash types.Hash) (*pb.Transaction, bool) { - txKey := compositeKey(hash.Bytes()) - txData := tp.storage.Get(txKey) - if txData == nil { - return nil, false - } - var tx pb.Transaction - if err := tx.Unmarshal(txData); err != nil { - tp.logger.Error(err) - return nil, false - } - return &tx, true -} - -//BatchStore batch store txs -func (tp *TxPool) BatchStore(hashes []types.Hash) { - batch := tp.storage.NewBatch() - for _, hash := range hashes { - e := tp.get(hash) - if e == nil { - tp.logger.Debugln("BatchStore not found tx:", hash.String()) - continue - } - tx := e.Value.(*pb.Transaction) - txKey := compositeKey(hash.Bytes()) - txData, _ := tx.Marshal() - batch.Put(txKey, txData) - } - batch.Commit() -} - -//BatchDelete batch delete txs -func (tp *TxPool) BatchDelete(hashes []types.Hash) { - batch := tp.storage.NewBatch() - for _, hash := range hashes { - txKey := compositeKey(hash.Bytes()) - batch.Delete(txKey) - } - batch.Commit() -} diff --git a/pkg/order/mempool/btree_index.go b/pkg/order/mempool/btree_index.go new file mode 100644 index 0000000..8813c7d --- /dev/null +++ b/pkg/order/mempool/btree_index.go @@ -0,0 +1,86 @@ +package mempool + +import ( + "github.com/meshplus/bitxhub-model/pb" + + "github.com/google/btree" +) + +// the key of priorityIndex and parkingLotIndex. +type orderedIndexKey struct { + account string + nonce uint64 +} + +// TODO (YH): add expiration time order +// Less should guarantee item can be cast into orderedIndexKey. +func (oik *orderedIndexKey) Less(than btree.Item) bool { + other := than.(*orderedIndexKey) + if oik.account != other.account { + return oik.account < other.account + } + return oik.nonce < other.nonce +} + +type sortedNonceKey struct { + nonce uint64 +} + +// Less should guarantee item can be cast into sortedNonceKey. +func (snk *sortedNonceKey) Less(item btree.Item) bool { + dst, _ := item.(*sortedNonceKey) + return snk.nonce < dst.nonce +} + +func makeOrderedIndexKey(account string, tx *pb.Transaction) *orderedIndexKey { + return &orderedIndexKey{ + account: account, + nonce: uint64(tx.Nonce), + } +} + +func makeSortedNonceKeyKey(nonce uint64) *sortedNonceKey { + return &sortedNonceKey{ + nonce: nonce, + } +} + +type btreeIndex struct { + data *btree.BTree +} + +func newBtreeIndex() *btreeIndex { + return &btreeIndex{ + data: btree.New(btreeDegree), + } +} + +func (idx *btreeIndex) insert(tx *pb.Transaction) { + idx.data.ReplaceOrInsert(makeSortedNonceKeyKey(uint64(tx.Nonce))) +} + +func (idx *btreeIndex) remove(txs map[string][]*pb.Transaction) { + for _, list := range txs { + for _, tx := range list { + idx.data.Delete(makeSortedNonceKeyKey(uint64(tx.Nonce))) + } + } +} + +func (idx *btreeIndex) insertByOrderedQueueKey(account string, tx *pb.Transaction) { + idx.data.ReplaceOrInsert(makeOrderedIndexKey(account, tx)) +} + +func (idx *btreeIndex) removeByOrderedQueueKey(txs map[string][]*pb.Transaction) { + for account, list := range txs { + for _, tx := range list { + idx.data.Delete(makeOrderedIndexKey(account, tx)) + } + } +} + +// Size returns the size of the index +func (idx *btreeIndex) size() uint64 { + return uint64(idx.data.Len()) +} + diff --git a/pkg/order/mempool/btree_index_test.go b/pkg/order/mempool/btree_index_test.go new file mode 100644 index 0000000..608106f --- /dev/null +++ b/pkg/order/mempool/btree_index_test.go @@ -0,0 +1,28 @@ +package mempool + +import ( + "testing" + + "github.com/meshplus/bitxhub-model/pb" + + "github.com/stretchr/testify/assert" +) + +func TestLess(t *testing.T) { + ast := assert.New(t) + tx := &pb.Transaction{ + Nonce: 1, + } + orderedIndexKey := makeOrderedIndexKey("account", tx) + tx.Nonce = 2 + orderedIndexKey1 := makeOrderedIndexKey("bitxhub", tx) + isLess := orderedIndexKey.Less(orderedIndexKey1) + if "bitxhub" < "dccount" { + t.Log("yes") + } + ast.Equal(true, isLess) +} + + + + diff --git a/pkg/order/mempool/mempool.go b/pkg/order/mempool/mempool.go new file mode 100644 index 0000000..15f4502 --- /dev/null +++ b/pkg/order/mempool/mempool.go @@ -0,0 +1,121 @@ +package mempool + +import ( + "errors" + + "github.com/meshplus/bitxhub-model/pb" + raftproto "github.com/meshplus/bitxhub/pkg/order/etcdraft/proto" + "github.com/meshplus/bitxhub/pkg/storage" +) + +var _ MemPool = (*mempoolImpl)(nil) + +//go:generate mockgen -destination mock_mempool/mock_mempool.go -package mock_mempool -source types.go +type MemPool interface { + // Start starts mempool service. + Start() error + + // Stop stops mempool service. + Stop() + + // RecvTransaction receives transaction from API. + RecvTransaction(tx *pb.Transaction) error + + // RecvForwardTxs receives transactions from other vp nodes. + RecvForwardTxs(txSlice *TxSlice) + + UpdateLeader(uint64) + + FetchTxn(lostTxnEvent *LocalMissingTxnEvent) + + RecvFetchTxnRequest(fetchTxnRequest *FetchTxnRequest) + + RecvFetchTxnResponse(fetchTxnResponse *FetchTxnResponse) + + GetChainHeight() uint64 + + IncreaseChainHeight() + + GetBlock(ready *raftproto.Ready) (map[uint64]string, []*pb.Transaction) + + // Remove committed transactions from mempool + CommitTransactions(ready *raftproto.Ready) +} + +// NewMempool return the mempool instance. +func NewMempool(config *Config, storage storage.Storage, batchC chan *raftproto.Ready) MemPool { + return newMempoolImpl(config, storage, batchC) +} + +// RecvTransaction receives transaction from api and other vp nodes. +func (mpi *mempoolImpl) RecvTransaction(tx *pb.Transaction) error { + if mpi.txCache.IsFull() && mpi.poolIsFull() { + return errors.New("transaction cache and pool are full, we will drop this transaction") + } + mpi.txCache.recvTxC <- tx + return nil +} + +// RecvTransaction receives transaction from api and other vp nodes. +func (mpi *mempoolImpl) RecvForwardTxs(txSlice *TxSlice) { + mpi.subscribe.txForwardC <- txSlice +} + +// UpdateLeader updates the +func (mpi *mempoolImpl) UpdateLeader(newLeader uint64) { + mpi.subscribe.updateLeaderC <- newLeader +} + +// FetchTxn sends the fetch request. +func (mpi *mempoolImpl) FetchTxn(lostTxnEvent *LocalMissingTxnEvent) { + mpi.subscribe.localMissingTxnEvent <- lostTxnEvent +} + +func (mpi *mempoolImpl) RecvFetchTxnRequest(fetchTxnRequest *FetchTxnRequest) { + mpi.subscribe.fetchTxnRequestC <- fetchTxnRequest +} + +func (mpi *mempoolImpl) RecvFetchTxnResponse(fetchTxnResponse *FetchTxnResponse) { + mpi.subscribe.fetchTxnResponseC <- fetchTxnResponse +} + +// Start starts the mempool service. +func (mpi *mempoolImpl) Start() error { + mpi.logger.Debug("Start Listen mempool events") + go mpi.listenEvent() + go mpi.txCache.listenEvent() + return nil +} + +func (mpi *mempoolImpl) Stop() { + if mpi.close != nil { + close(mpi.close) + } + if mpi.txCache.close != nil { + close(mpi.txCache.close) + } +} + +func (mpi *mempoolImpl) GetChainHeight() uint64 { + return mpi.getBatchSeqNo() +} + +func (mpi *mempoolImpl) IncreaseChainHeight() { + mpi.increaseBatchSeqNo() +} + +func (mpi *mempoolImpl) GetBlock(ready *raftproto.Ready) (missingTxnHashList map[uint64]string, txList []*pb.Transaction) { + waitC := make(chan *mempoolBatch) + getBlock := &constructBatchEvent{ + ready: ready, + result: waitC, + } + mpi.subscribe.getBlockC <- getBlock + // block until finishing constructing related batch + batch := <-waitC + return batch.missingTxnHashList, batch.txList +} + +func (mpi *mempoolImpl) CommitTransactions(ready *raftproto.Ready) { + mpi.subscribe.commitTxnC <- ready +} diff --git a/pkg/order/mempool/mempool_impl.go b/pkg/order/mempool/mempool_impl.go new file mode 100644 index 0000000..6fcd29f --- /dev/null +++ b/pkg/order/mempool/mempool_impl.go @@ -0,0 +1,577 @@ +package mempool + +import ( + "errors" + "fmt" + "sync" + "sync/atomic" + "time" + + "github.com/meshplus/bitxhub-kit/crypto" + "github.com/meshplus/bitxhub-kit/crypto/asym" + "github.com/meshplus/bitxhub-kit/types" + "github.com/meshplus/bitxhub-model/pb" + "github.com/meshplus/bitxhub/internal/loggers" + raftproto "github.com/meshplus/bitxhub/pkg/order/etcdraft/proto" + "github.com/meshplus/bitxhub/pkg/peermgr" + "github.com/meshplus/bitxhub/pkg/storage" + + "github.com/google/btree" + "github.com/sirupsen/logrus" +) + +type mempoolImpl struct { + localID uint64 + leader uint64 // leader node id + batchSize uint64 + batchSeqNo uint64 // track the sequence number of block + logger logrus.FieldLogger + batchC chan *raftproto.Ready + close chan bool + + txStore *transactionStore // store all transactions info + txCache *TxCache // cache the transactions received from api + subscribe *subscribeEvent + storage storage.Storage + peerMgr peermgr.PeerManager //network manager + batchTimerMgr *timerManager + ledgerHelper func(hash types.Hash) (*pb.Transaction, error) +} + +func newMempoolImpl(config *Config, storage storage.Storage, batchC chan *raftproto.Ready) *mempoolImpl { + mpi := &mempoolImpl{ + localID: config.ID, + peerMgr: config.PeerMgr, + batchSeqNo: config.ChainHeight, + ledgerHelper: config.GetTransactionFunc, + logger: loggers.Logger(loggers.Order), + batchC: batchC, + storage: storage, + } + mpi.txStore = newTransactionStore() + mpi.txCache = newTxCache(config.TxSliceTimeout) + mpi.subscribe = newSubscribe() + if config.BatchSize == 0 { + mpi.batchSize = DefaultBatchSize + } else { + mpi.batchSize = config.BatchSize + } + var batchTick time.Duration + if config.BatchTick == 0 { + batchTick = DefaultBatchTick + } else { + batchTick = config.BatchTick + } + mpi.batchTimerMgr = newTimer(batchTick) + return mpi +} + +func (mpi *mempoolImpl) listenEvent() { + waitC := make(chan bool) + for { + select { + case <-mpi.close: + mpi.logger.Info("----- Exit listen loop -----") + return + + case newLeader := <-mpi.subscribe.updateLeaderC: + if newLeader == mpi.localID { + mpi.logger.Info("----- Become the leader node -----") + } + mpi.leader = newLeader + + case txSet := <-mpi.txCache.txSetC: + // 1. send transactions to other peer + data, err := txSet.Marshal() + if err != nil { + mpi.logger.Errorf("Marshal failed, err: %s", err.Error()) + return + } + pbMsg := mpi.msgToConsensusPbMsg(data, raftproto.RaftMessage_BROADCAST_TX) + mpi.broadcast(pbMsg) + + // 2. process transactions + if err := mpi.processTransactions(txSet.TxList); err != nil { + mpi.logger.Errorf("Process transactions failed, err: %s", err.Error()) + } + + case txSlice := <-mpi.subscribe.txForwardC: + if err := mpi.processTransactions(txSlice.TxList); err != nil { + mpi.logger.Errorf("Process transactions failed, err: %s", err.Error()) + } + + case res := <-mpi.subscribe.getBlockC: + result := mpi.getBlock(res.ready) + res.result <- result + + case <- mpi.batchTimerMgr.timeoutEventC: + if mpi.isBatchTimerActive() { + mpi.stopBatchTimer(StopReason1) + mpi.logger.Debug("Batch timer expired, try to create a batch") + if mpi.txStore.priorityNonBatchSize > 0 { + ready, err := mpi.generateBlock(true) + if err != nil { + mpi.logger.Errorf("Generator batch failed") + continue + } + mpi.batchC <- ready + } else { + mpi.logger.Debug("The length of priorityIndex is 0, ") + } + } + + case commitReady := <-mpi.subscribe.commitTxnC: + gcStartTime := time.Now() + mpi.processCommitTransactions(commitReady) + duration := time.Now().Sub(gcStartTime).Nanoseconds() + mpi.logger.Debugf("GC duration %v", duration) + + case lostTxnEvent := <-mpi.subscribe.localMissingTxnEvent: + if err := mpi.sendFetchTxnRequest(lostTxnEvent.Height, lostTxnEvent.MissingTxnHashList); err != nil { + mpi.logger.Errorf("Process fetch txn failed, err: %s", err.Error()) + lostTxnEvent.WaitC <- false + } else { + mpi.logger.Debug("Process fetch txn success") + waitC = lostTxnEvent.WaitC + } + + case fetchRequest := <-mpi.subscribe.fetchTxnRequestC: + if err := mpi.processFetchTxnRequest(fetchRequest); err != nil { + mpi.logger.Error("Process fetchTxnRequest failed") + } + + case fetchRes := <-mpi.subscribe.fetchTxnResponseC: + if err := mpi.processFetchTxnResponse(fetchRes); err != nil { + waitC <- false + continue + } + waitC <- true + } + } +} + +func (mpi *mempoolImpl) processTransactions(txs []*pb.Transaction) error { + validTxs := make(map[string][]*pb.Transaction) + for _, tx := range txs { + // check if this tx signature is valid first + ok, _ := asym.Verify(crypto.Secp256k1, tx.Signature, tx.SignHash().Bytes(), tx.From) + + if !ok { + return fmt.Errorf("invalid signature") + } + // check the sequence number of tx + // TODO refactor Transaction + txAccount, err := getAccount(tx) + if err != nil { + return fmt.Errorf("get tx account failed, err: %s", err.Error()) + } + currentSeqNo := mpi.txStore.nonceCache.getPendingNonce(txAccount) + if tx.Nonce < currentSeqNo { + mpi.logger.Warningf("current sequence number is %d, required %d", tx.Nonce, currentSeqNo+1) + continue + } + // check the existence of hash of this tx + txHash := tx.TransactionHash.Hex() + if txPointer := mpi.txStore.txHashMap[txHash]; txPointer != nil { + mpi.logger.Warningf("Tx %s already received", txHash) + continue + } + _, ok = validTxs[txAccount] + if !ok { + validTxs[txAccount] = make([]*pb.Transaction, 0) + } + validTxs[txAccount] = append(validTxs[txAccount], tx) + } + + // Process all the new transaction and merge any errors into the original slice + dirtyAccounts := mpi.txStore.InsertTxs(validTxs) + + // send tx to mempool store + mpi.processDirtyAccount(dirtyAccounts) + + if mpi.isLeader() { + // start batch timer when this node receives the first transaction set of a batch + if !mpi.isBatchTimerActive() { + mpi.startBatchTimer(StartReason1) + } + + // generator batch by block size + if mpi.txStore.priorityNonBatchSize >= mpi.batchSize { + ready, err := mpi.generateBlock(false) + if err != nil { + return errors.New("generator batch fai") + } + // stop batch timer + mpi.stopBatchTimer(StopReason2) + mpi.batchC <- ready + } + } + return nil +} + +func (txStore *transactionStore) InsertTxs(txs map[string][]*pb.Transaction) map[string]bool { + dirtyAccounts := make(map[string]bool) + for account, list := range txs { + for _, tx := range list { + txHash := tx.TransactionHash.Hex() + txPointer := &orderedIndexKey{ + account: account, + nonce: tx.Nonce, + } + txStore.txHashMap[txHash] = txPointer + list, ok := txStore.allTxs[account] + if !ok { + // if this is new account to send tx, create a new txSortedMap + txStore.allTxs[account] = newTxSortedMap() + } + list = txStore.allTxs[account] + txItem := &txItem{ + account: account, + tx: tx, + } + list.items[tx.Nonce] = txItem + list.index.insert(tx) + atomic.AddInt32(&txStore.poolSize, 1) + } + dirtyAccounts[account] = true + } + return dirtyAccounts +} + +func (mpi *mempoolImpl) processDirtyAccount(dirtyAccounts map[string]bool) { + for account := range dirtyAccounts { + if list, ok := mpi.txStore.allTxs[account]; ok { + // search for related sequential txs in allTxs + // and add these txs into priorityIndex and parkingLotIndex + pendingNonce := mpi.txStore.nonceCache.getPendingNonce(account) + readyTxs, nonReadyTxs, nextDemandNonce := list.filterReady(pendingNonce) + mpi.txStore.nonceCache.setPendingNonce(account, nextDemandNonce) + + // inset ready txs into priorityIndex. + for _, tx := range readyTxs { + mpi.txStore.priorityIndex.insertByOrderedQueueKey(account, tx) + } + mpi.txStore.priorityNonBatchSize = mpi.txStore.priorityNonBatchSize + uint64(len(readyTxs)) + + // inset non-ready txs into parkingLotIndex. + for _, tx := range nonReadyTxs { + mpi.txStore.parkingLotIndex.insertByOrderedQueueKey(account, tx) + } + } + } +} + +// getBlock fetches next block of transactions for consensus, +// batchedTx are all txs sent to consensus but were not committed yet, mempool should filter out such txs. +func (mpi *mempoolImpl) generateBlock(isTimeout bool) (*raftproto.Ready, error) { + result := make([]orderedIndexKey, 0, mpi.batchSize) + + // txs has lower nonce will be observed first in priority index iterator. + mpi.logger.Infof("Length of priority index: %v", mpi.txStore.priorityIndex.data.Len()) + mpi.txStore.priorityIndex.data.Ascend(func(a btree.Item) bool { + tx := a.(*orderedIndexKey) + // if tx has existed in bathedTxs, + if _, ok := mpi.txStore.batchedTxs[orderedIndexKey{tx.account, tx.nonce}]; ok { + return true + } + txSeq := tx.nonce + commitNonce := mpi.txStore.nonceCache.getCommitNonce(tx.account) + var seenPrevious bool + if txSeq >= 1 { + _, seenPrevious = mpi.txStore.batchedTxs[orderedIndexKey{account: tx.account, nonce: txSeq - 1}] + } + // include transaction if it's "next" for given account or + // we've already sent its ancestor to Consensus + if seenPrevious || (txSeq == commitNonce) { + ptr := orderedIndexKey{account: tx.account, nonce: tx.nonce} + mpi.txStore.batchedTxs[ptr] = true + result = append(result, ptr) + // batched by batch size or timeout + condition1 := uint64(len(result)) == mpi.batchSize + condition2 := isTimeout && uint64(len(result)) == mpi.txStore.priorityNonBatchSize + if condition1 || condition2 { + return false + } + } + return true + }) + + // convert transaction pointers to real values + hashList := make([]types.Hash, len(result)) + txList := make([]*pb.Transaction, len(result)) + for i, v := range result { + rawTransaction := mpi.txStore.getTxByOrderKey(v.account, v.nonce) + hashList[i] = rawTransaction.TransactionHash + txList[i] = rawTransaction + } + mpi.increaseBatchSeqNo() + batchSeqNo := mpi.getBatchSeqNo() + ready := &raftproto.Ready{ + TxHashes: hashList, + Height: batchSeqNo, + } + // store the batch to cache + if _, ok := mpi.txStore.batchedCache[batchSeqNo]; ok { + mpi.logger.Errorf("Generate block with height %d, but there is already block at this height", batchSeqNo) + return nil, errors.New("wrong block height ") + } + // store the batch to cache + mpi.txStore.batchedCache[batchSeqNo] = txList + // store the batch to db + mpi.batchStore(txList) + mpi.txStore.priorityNonBatchSize = mpi.txStore.priorityNonBatchSize - uint64(len(hashList)) + mpi.logger.Infof("Generated block %d with %d txs", batchSeqNo, len(txList)) + return ready, nil +} + +func (mpi *mempoolImpl) getBlock(ready *raftproto.Ready) *mempoolBatch { + res := &mempoolBatch{} + // leader get the block directly from batchedCache + if mpi.isLeader() { + if txList, ok := mpi.txStore.batchedCache[ready.Height]; !ok { + mpi.logger.Warningf("Leader get block failed, can't find block %d from batchedCache", ready.Height) + missingTxnHashList := make(map[uint64]string) + for i, txHash := range ready.TxHashes { + missingTxnHashList[uint64(i)] = txHash.Hex() + } + res.missingTxnHashList = missingTxnHashList + } else { + // TODO (YH): check tx hash and length + res.txList = txList + } + return res + } + // follower construct the same batch by given ready. + return mpi.constructSameBatch(ready) +} + +func (mpi *mempoolImpl) constructSameBatch(ready *raftproto.Ready) *mempoolBatch { + res := &mempoolBatch{} + if txList, ok := mpi.txStore.batchedCache[ready.Height]; ok { + mpi.logger.Warningf("Batch %d already exists in batchedCache", ready.Height) + // TODO (YH): check tx hash and length + res.txList = txList + return res + } + missingTxList := make(map[uint64]string) + txList := make([]*pb.Transaction, 0) + for index, txHash := range ready.TxHashes { + var ( + txPointer *orderedIndexKey + txMap *txSortedMap + txItem *txItem + ok bool + ) + if txPointer, _ = mpi.txStore.txHashMap[txHash.Hex()]; txPointer == nil { + missingTxList[uint64(index)] = txHash.Hex() + continue + } + if txMap, ok = mpi.txStore.allTxs[txPointer.account]; !ok { + mpi.logger.Warningf("Transaction %s exist in txHashMap but not in allTxs", txHash.Hex()) + missingTxList[uint64(index)] = txHash.Hex() + continue + } + if txItem, ok = txMap.items[txPointer.nonce]; !ok { + mpi.logger.Warningf("Transaction %s exist in txHashMap but not in allTxs", txHash.Hex()) + missingTxList[uint64(index)] = txHash.Hex() + continue + } + txList = append(txList, txItem.tx) + mpi.txStore.batchedTxs[*txPointer] = true + } + res.missingTxnHashList = missingTxList + res.txList = txList + // store the batch to cache + mpi.txStore.batchedCache[ready.Height] = txList + // store the batch to db + mpi.batchStore(txList) + return res +} + +func (mpi *mempoolImpl) processCommitTransactions(ready *raftproto.Ready) { + dirtyAccounts := make(map[string]bool) + // update current cached commit nonce for account + for _, txHash := range ready.TxHashes { + txHashStr := txHash.Hex() + txPointer := mpi.txStore.txHashMap[txHashStr] + preCommitNonce := mpi.txStore.nonceCache.getCommitNonce(txPointer.account) + newCommitNonce := txPointer.nonce + 1 + if preCommitNonce < newCommitNonce { + mpi.txStore.nonceCache.setCommitNonce(txPointer.account, newCommitNonce) + } + delete(mpi.txStore.txHashMap, txHashStr) + delete(mpi.txStore.batchedTxs, *txPointer) + dirtyAccounts[txPointer.account] = true + } + // clean related txs info in cache + for account := range dirtyAccounts { + commitNonce := mpi.txStore.nonceCache.getCommitNonce(account) + if list, ok := mpi.txStore.allTxs[account]; ok { + // remove all previous seq number txs for this account. + removedTxs := list.forward(commitNonce) + // remove index smaller than commitNonce delete index. + var wg sync.WaitGroup + wg.Add(3) + go func(ready map[string][]*pb.Transaction) { + defer wg.Done() + list.index.remove(removedTxs) + }(removedTxs) + go func(ready map[string][]*pb.Transaction) { + defer wg.Done() + mpi.txStore.priorityIndex.removeByOrderedQueueKey(removedTxs) + }(removedTxs) + go func(ready map[string][]*pb.Transaction) { + defer wg.Done() + mpi.txStore.parkingLotIndex.removeByOrderedQueueKey(removedTxs) + }(removedTxs) + wg.Wait() + delta := int32(len(removedTxs)) + atomic.AddInt32(&mpi.txStore.poolSize, -delta) + } + } + mpi.batchDelete(ready.TxHashes) + delete(mpi.txStore.batchedCache, ready.Height) + // restart batch timer for remain txs. + if mpi.isLeader(){ + mpi.startBatchTimer(StartReason2) + } + mpi.logger.Debugf("Replica removes batch %d in mempool, and now there are %d batches, "+ + "priority len: %d, parkingLot len: %d", ready.Height, len(mpi.txStore.batchedCache), + mpi.txStore.priorityIndex.size(), mpi.txStore.parkingLotIndex.size()) +} + +// sendFetchTxnRequest sends fetching missing transactions request to leader node. +func (mpi *mempoolImpl) sendFetchTxnRequest(height uint64, lostTxnHashList map[uint64]string) error { + filterFetchTxHashList := &FetchTxnRequest{ + ReplicaId: mpi.localID, + Height: height, + MissingTxHashes: lostTxnHashList, + } + missingHashListBytes, err := filterFetchTxHashList.Marshal() + if err != nil { + mpi.logger.Error("Marshal MissingHashList fail") + return err + } + pbMsg := mpi.msgToConsensusPbMsg(missingHashListBytes, raftproto.RaftMessage_GET_TX) + mpi.logger.Debugf("Send fetch transactions request to replica %d", mpi.leader) + mpi.unicast(mpi.leader, pbMsg) + mpi.txStore.missingBatch[height] = lostTxnHashList + return nil +} + +// processFetchTxnRequest processes fetch request... +func (mpi *mempoolImpl) processFetchTxnRequest(fetchTxnRequest *FetchTxnRequest) error { + txList := make(map[uint64]*pb.Transaction, len(fetchTxnRequest.MissingTxHashes)) + var err error + if txList, err = mpi.loadTxnFromCache(fetchTxnRequest); err != nil { + if txList, err = mpi.loadTxnFromStorage(fetchTxnRequest); err != nil { + if txList, err = mpi.loadTxnFromLedger(fetchTxnRequest); err != nil { + mpi.logger.Error("Process fetch txn request failed.") + return err + } + } + } + fetchTxnResponse := &FetchTxnResponse{ + ReplicaId: mpi.localID, + Height: fetchTxnRequest.Height, + MissingTxnList: txList, + } + resBytes, err := fetchTxnResponse.Marshal() + if err != nil { + return err + } + pbMsg := mpi.msgToConsensusPbMsg(resBytes, raftproto.RaftMessage_GET_TX_ACK) + mpi.logger.Debugf("Send fetch transactions response to replica %d", fetchTxnRequest.ReplicaId) + mpi.unicast(fetchTxnRequest.ReplicaId, pbMsg) + return nil +} + +func (mpi *mempoolImpl) loadTxnFromCache(fetchTxnRequest *FetchTxnRequest) (map[uint64]*pb.Transaction, error) { + missingHashList := fetchTxnRequest.MissingTxHashes + targetHeight := fetchTxnRequest.Height + for _, txHash := range missingHashList { + if txPointer, _ := mpi.txStore.txHashMap[txHash]; txPointer == nil { + return nil, fmt.Errorf("transaction %s dones't exist in txHashMap", txHash) + } + } + var targetBatch []*pb.Transaction + var ok bool + if targetBatch, ok = mpi.txStore.batchedCache[targetHeight]; !ok { + return nil, fmt.Errorf("batch %d dones't exist in batchedCache", targetHeight) + } + targetBatchLen := uint64(len(targetBatch)) + txList := make(map[uint64]*pb.Transaction, len(missingHashList)) + for index, txHash := range missingHashList { + if index > targetBatchLen || targetBatch[index].TransactionHash.Hex() != txHash { + return nil, fmt.Errorf("find invaild transaction, index: %d, targetHash: %s", index, txHash) + } + txList[index] = targetBatch[index] + } + return txList, nil +} + +// TODO (YH): restore txn from wal +func (mpi *mempoolImpl) loadTxnFromStorage(fetchTxnRequest *FetchTxnRequest) (map[uint64]*pb.Transaction, error) { + missingHashList := fetchTxnRequest.MissingTxHashes + txList := make(map[uint64]*pb.Transaction) + for index, txHash := range missingHashList { + var ( + rawHash types.Hash + err error + ) + if rawHash, err = hex2Hash(txHash); err != nil { + return nil, err + } + if tx, ok := mpi.load(rawHash); !ok { + return nil, errors.New("can't load tx from storage") + } else { + txList[index] = tx + } + } + return txList, nil +} + +func (mpi *mempoolImpl) loadTxnFromLedger(fetchTxnRequest *FetchTxnRequest) (map[uint64]*pb.Transaction, error) { + missingHashList := fetchTxnRequest.MissingTxHashes + txList := make(map[uint64]*pb.Transaction) + for index, txHash := range missingHashList { + var ( + tx *pb.Transaction + rawHash types.Hash + err error + ) + if rawHash, err = hex2Hash(txHash); err != nil { + return nil, err + } + if tx, err = mpi.ledgerHelper(rawHash); err != nil { + return nil, err + } + txList[index] = tx + } + return txList, nil +} + +func (mpi *mempoolImpl) processFetchTxnResponse(fetchTxnResponse *FetchTxnResponse) error { + mpi.logger.Debugf("Receive fetch transactions response from replica %d", fetchTxnResponse.ReplicaId) + if _, ok := mpi.txStore.missingBatch[fetchTxnResponse.Height]; !ok { + return errors.New("can't find batch %d from missingBatch") + } + expectLen := len(mpi.txStore.missingBatch[fetchTxnResponse.Height]) + recvLen := len(fetchTxnResponse.MissingTxnList) + if recvLen != expectLen { + return fmt.Errorf("receive unmatched fetching txn response, expect length: %d, received length: %d", expectLen, recvLen) + } + validTxn := make([]*pb.Transaction, 0) + targetBatch := mpi.txStore.missingBatch[fetchTxnResponse.Height] + for index, tx := range fetchTxnResponse.MissingTxnList { + if tx.Hash().Hex() != targetBatch[index] { + return errors.New("find a hash mismatch tx") + } + validTxn = append(validTxn, tx) + } + if err := mpi.processTransactions(validTxn); err != nil { + return err + } + delete(mpi.txStore.missingBatch, fetchTxnResponse.Height) + return nil +} diff --git a/pkg/order/mempool/mempool_test.go b/pkg/order/mempool/mempool_test.go new file mode 100644 index 0000000..00d8006 --- /dev/null +++ b/pkg/order/mempool/mempool_test.go @@ -0,0 +1,264 @@ +package mempool +// +//import ( +// "encoding/json" +// "fmt" +// "math/rand" +// "sort" +// "testing" +// "time" +// +// "github.com/google/btree" +// "github.com/meshplus/bitxhub-kit/crypto" +// "github.com/meshplus/bitxhub-kit/crypto/asym" +// "github.com/meshplus/bitxhub-kit/types" +// "github.com/meshplus/bitxhub-model/pb" +// "github.com/stretchr/testify/require" +//) +// +//var ( +// InterchainContractAddr = types.String2Address("000000000000000000000000000000000000000a") +// appchains = []string{ +// "0x3f9d18f7c3a6e5e4c0b877fe3e688ab08840b997", +// "0xa8ae1bbc1105944a84a71b89056930d951d420fe", +// "0x929545f44692178edb7fa468b44c5351596184ba", +// "0x7368022e6659236983eb959b8a1fa22577d48294", +// } +//) +// +//func TestCoreMemPool_RecvTransactions(t *testing.T) { +// readyTxs := make([]*pb.Transaction, 0) +// nonreadyTxs := make([]*pb.Transaction, 0) +// txIndex := make(map[string]*pb.Transaction) +// privKey, err := asym.GenerateKeyPair(crypto.Secp256k1) +// require.Nil(t, err) +// pubKey := privKey.PublicKey() +// addr, err := pubKey.Address() +// require.Nil(t, err) +// +// sort.Strings(appchains) +// readyTxsLen := 2 +// for _, appchain := range appchains { +// for i := 1; i <= readyTxsLen; i++ { +// readyTxs = append(readyTxs, mockTxhelper(t, txIndex, appchain, uint64(i))) +// // add unready txs +// nonreadyTxs = append(nonreadyTxs, mockTxhelper(t, txIndex, appchain, uint64(i+readyTxsLen+1))) +// } +// } +// +// // set timestamp and signature for txs +// for _, tx := range readyTxs { +// addSigAndTime(t, tx, addr, privKey) +// } +// for _, tx := range nonreadyTxs { +// addSigAndTime(t, tx, addr, privKey) +// } +// +// // shuffle tx order +// rand.Seed(time.Now().UnixNano()) +// rand.Shuffle(len(readyTxs), func(i, j int) { +// readyTxs[i], readyTxs[j] = readyTxs[j], readyTxs[i] +// }) +// memPool := newMempoolImpl(nil, nil, nil) +// require.Nil(t, memPool.recvTransactions(readyTxs)) +// require.Nil(t, memPool.RecvTransactions(nonreadyTxs)) +// +// // check if all txs are indexed in memPool.allTxs +// // and if all txs are indexed by its account and nonce +// require.Equal(t, len(appchains), len(memPool.transactionStore.allTxs)) +// checkAllTxs(t, memPool, txIndex, readyTxsLen, readyTxsLen) +// checkHashMap(t, memPool, true, readyTxs, nonreadyTxs) +// +// // check if priorityIndex is correctly recorded +// require.Equal(t, len(readyTxs), memPool.transactionStore.priorityIndex.data.Len()) +// for _, tx := range readyTxs { +// ok := memPool.priorityIndex.data.Has(makeKey(tx)) +// require.True(t, ok) +// ok = memPool.transactionStore.parkingLotIndex.data.Has(makeKey(tx)) +// require.True(t, !ok) +// } +// +// // check if parkingLotIndex is correctly recorded +// require.Equal(t, len(nonreadyTxs), memPool.transactionStore.parkingLotIndex.data.Len()) +// for _, tx := range nonreadyTxs { +// ok := memPool.transactionStore.parkingLotIndex.data.Has(makeKey(tx)) +// require.True(t, ok) +// ok = memPool.transactionStore.priorityIndex.data.Has(makeKey(tx)) +// require.True(t, !ok) +// } +// +// // add the missing tx for each appchain +// missingTxs := make([]*pb.Transaction, 0, len(appchains)) +// for _, appchain := range appchains { +// missingTxs = append(missingTxs, mockTxhelper(t, txIndex, appchain, uint64(readyTxsLen+1))) +// } +// for _, tx := range missingTxs { +// addSigAndTime(t, tx, addr, privKey) +// } +// +// require.Nil(t, memPool.RecvTransactions(missingTxs)) +// +// // check if parkingLotIndex is empty now +// require.Equal(t, 0, memPool.transactionStore.parkingLotIndex.data.Len()) +// // check if priorityIndex has received missingTxs and txs from original parkingLotIndex +// for _, tx := range missingTxs { +// ok := memPool.transactionStore.priorityIndex.data.Has(makeKey(tx)) +// require.True(t, ok) +// } +// for _, tx := range nonreadyTxs { +// ok := memPool.transactionStore.priorityIndex.data.Has(makeKey(tx)) +// require.True(t, ok) +// } +// checkHashMap(t, memPool, true, readyTxs, nonreadyTxs, missingTxs) +//} +// +//func TestCoreMemPool_RecvTransactions_Margin(t *testing.T) { +// readyTxs := make([]*pb.Transaction, 0) +// identicalNonceTxs := make([]*pb.Transaction, 0) +// replayedTxs := make([]*pb.Transaction, 0) +// readyTxIndex := make(map[string]*pb.Transaction) +// identicalNonceTxIndex := make(map[string]*pb.Transaction) +// privKey, err := asym.GenerateKeyPair(crypto.Secp256k1) +// require.Nil(t, err) +// pubKey := privKey.PublicKey() +// addr, err := pubKey.Address() +// require.Nil(t, err) +// +// sort.Strings(appchains) +// readyTxsLen := 2 +// for _, appchain := range appchains { +// for i := 1; i <= readyTxsLen; i++ { +// tx := mockTxhelper(t, readyTxIndex, appchain, uint64(i)) +// readyTxs = append(readyTxs, tx) +// // add tx with same index but different content +// identicalNonceTx := mockTxhelper(t, identicalNonceTxIndex, appchain, uint64(i)) +// identicalNonceTxs = append(identicalNonceTxs, identicalNonceTx) +// } +// } +// +// // set timestamp and signature for txs +// for _, tx := range readyTxs { +// addSigAndTime(t, tx, addr, privKey) +// // add repeated txs +// replayedTxs = append(replayedTxs, tx) +// } +// for _, tx := range identicalNonceTxs { +// addSigAndTime(t, tx, addr, privKey) +// } +// +// memPool := New() +// require.Nil(t, memPool.RecvTransactions(readyTxs)) +// require.NotNil(t, memPool.RecvTransactions(replayedTxs)) +// err = memPool.RecvTransactions(identicalNonceTxs) +// require.NotNil(t, err) +// +// require.Equal(t, len(appchains), len(memPool.transactionStore.allTxs)) +// checkAllTxs(t, memPool, readyTxIndex, readyTxsLen, 0) +// checkHashMap(t, memPool, true, readyTxs) +// checkHashMap(t, memPool, false, identicalNonceTxs) +//} +// +//func checkAllTxs(t *testing.T, memPool *CoreMemPool, +// txIndex map[string]*pb.Transaction, readyTxsLen, nonReadyTxLen int) { +// for _, appchain := range appchains { +// idx := uint64(1) +// accountAddr := fmt.Sprintf("%s-%s", appchain, appchain) +// +// txMap, ok := memPool.transactionStore.allTxs[accountAddr] +// require.True(t, ok) +// require.NotNil(t, txMap.index) +// require.Equal(t, readyTxsLen+nonReadyTxLen, txMap.index.data.Len()) +// require.Equal(t, readyTxsLen+nonReadyTxLen, len(txMap.items)) +// txMap.index.data.Ascend(func(i btree.Item) bool { +// orderedKey := i.(*orderedIndexKey) +// if idx <= uint64(readyTxsLen) { +// require.Equal(t, orderedKey.nonce, idx) +// } else { +// require.Equal(t, orderedKey.nonce, idx+1) +// } +// require.Equal(t, orderedKey.accountAddress, accountAddr) +// +// ibtpID := fmt.Sprintf("%s-%s-%d", appchain, appchain, orderedKey.nonce) +// require.Equal(t, txIndex[ibtpID], txMap.items[orderedKey.nonce]) +// idx++ +// return true +// }) +// } +//} +// +//func checkHashMap(t *testing.T, memPool *CoreMemPool, expectedStatus bool, txsSlice ...[]*pb.Transaction) { +// for _, txs := range txsSlice { +// for _, tx := range txs { +// _, ok := memPool.transactionStore.txHashMap[tx.TransactionHash.Hex()] +// require.Equal(t, expectedStatus, ok) +// } +// } +//} +// +//func mockTxhelper(t *testing.T, txIndex map[string]*pb.Transaction, appchainAddr string, index uint64) *pb.Transaction { +// ibtp := mockIBTP(t, appchainAddr, appchainAddr, index) +// tx := mockInterchainTx(t, ibtp) +// txIndex[ibtp.ID()] = tx +// return tx +//} +// +//func addSigAndTime(t *testing.T, tx *pb.Transaction, addr types.Address, privKey crypto.PrivateKey) { +// tx.Timestamp = time.Now().UnixNano() +// tx.From = addr +// sig, err := privKey.Sign(tx.SignHash().Bytes()) +// tx.Signature = sig +// require.Nil(t, err) +// tx.TransactionHash = tx.Hash() +//} +// +//func mockInterchainTx(t *testing.T, ibtp *pb.IBTP) *pb.Transaction { +// ib, err := ibtp.Marshal() +// require.Nil(t, err) +// +// ipd := &pb.InvokePayload{ +// Method: "HandleIBTP", +// Args: []*pb.Arg{{Value: ib}}, +// } +// pd, err := ipd.Marshal() +// require.Nil(t, err) +// +// data := &pb.TransactionData{ +// VmType: pb.TransactionData_BVM, +// Type: pb.TransactionData_INVOKE, +// Payload: pd, +// } +// +// return &pb.Transaction{ +// To: InterchainContractAddr, +// Nonce: int64(ibtp.Index), +// Data: data, +// Extra: []byte(fmt.Sprintf("%s-%s", ibtp.From, ibtp.To)), +// } +//} +// +//func mockIBTP(t *testing.T, from, to string, nonce uint64) *pb.IBTP { +// content := pb.Content{ +// SrcContractId: from, +// DstContractId: from, +// Func: "interchainget", +// Args: [][]byte{[]byte("Alice"), []byte("10")}, +// } +// +// bytes, err := content.Marshal() +// require.Nil(t, err) +// +// ibtppd, err := json.Marshal(pb.Payload{ +// Encrypted: false, +// Content: bytes, +// }) +// require.Nil(t, err) +// +// return &pb.IBTP{ +// From: from, +// To: to, +// Payload: ibtppd, +// Index: nonce, +// Type: pb.IBTP_INTERCHAIN, +// Timestamp: time.Now().UnixNano(), +// } +//} diff --git a/pkg/order/mempool/message.pb.go b/pkg/order/mempool/message.pb.go new file mode 100644 index 0000000..0526cc6 --- /dev/null +++ b/pkg/order/mempool/message.pb.go @@ -0,0 +1,1067 @@ +// Code generated by protoc-gen-gogo. DO NOT EDIT. +// source: message.proto + +package mempool + +import ( + fmt "fmt" + proto "github.com/gogo/protobuf/proto" + pb "github.com/meshplus/bitxhub-model/pb" + io "io" + math "math" + math_bits "math/bits" +) + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = fmt.Errorf +var _ = math.Inf + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the proto package it is being compiled against. +// A compilation error at this line likely means your copy of the +// proto package needs to be updated. +const _ = proto.GoGoProtoPackageIsVersion3 // please upgrade the proto package + +type TxSlice struct { + TxList []*pb.Transaction `protobuf:"bytes,1,rep,name=TxList,proto3" json:"TxList,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *TxSlice) Reset() { *m = TxSlice{} } +func (m *TxSlice) String() string { return proto.CompactTextString(m) } +func (*TxSlice) ProtoMessage() {} +func (*TxSlice) Descriptor() ([]byte, []int) { + return fileDescriptor_33c57e4bae7b9afd, []int{0} +} +func (m *TxSlice) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *TxSlice) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_TxSlice.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *TxSlice) XXX_Merge(src proto.Message) { + xxx_messageInfo_TxSlice.Merge(m, src) +} +func (m *TxSlice) XXX_Size() int { + return m.Size() +} +func (m *TxSlice) XXX_DiscardUnknown() { + xxx_messageInfo_TxSlice.DiscardUnknown(m) +} + +var xxx_messageInfo_TxSlice proto.InternalMessageInfo + +func (m *TxSlice) GetTxList() []*pb.Transaction { + if m != nil { + return m.TxList + } + return nil +} + +type FetchTxnRequest struct { + ReplicaId uint64 `protobuf:"varint,1,opt,name=replicaId,proto3" json:"replicaId,omitempty"` + Height uint64 `protobuf:"varint,2,opt,name=height,proto3" json:"height,omitempty"` + MissingTxHashes map[uint64]string `protobuf:"bytes,3,rep,name=missing_tx_hashes,json=missingTxHashes,proto3" json:"missing_tx_hashes,omitempty" protobuf_key:"varint,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *FetchTxnRequest) Reset() { *m = FetchTxnRequest{} } +func (m *FetchTxnRequest) String() string { return proto.CompactTextString(m) } +func (*FetchTxnRequest) ProtoMessage() {} +func (*FetchTxnRequest) Descriptor() ([]byte, []int) { + return fileDescriptor_33c57e4bae7b9afd, []int{1} +} +func (m *FetchTxnRequest) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *FetchTxnRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_FetchTxnRequest.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *FetchTxnRequest) XXX_Merge(src proto.Message) { + xxx_messageInfo_FetchTxnRequest.Merge(m, src) +} +func (m *FetchTxnRequest) XXX_Size() int { + return m.Size() +} +func (m *FetchTxnRequest) XXX_DiscardUnknown() { + xxx_messageInfo_FetchTxnRequest.DiscardUnknown(m) +} + +var xxx_messageInfo_FetchTxnRequest proto.InternalMessageInfo + +func (m *FetchTxnRequest) GetReplicaId() uint64 { + if m != nil { + return m.ReplicaId + } + return 0 +} + +func (m *FetchTxnRequest) GetHeight() uint64 { + if m != nil { + return m.Height + } + return 0 +} + +func (m *FetchTxnRequest) GetMissingTxHashes() map[uint64]string { + if m != nil { + return m.MissingTxHashes + } + return nil +} + +type FetchTxnResponse struct { + ReplicaId uint64 `protobuf:"varint,1,opt,name=replicaId,proto3" json:"replicaId,omitempty"` + Height uint64 `protobuf:"varint,2,opt,name=height,proto3" json:"height,omitempty"` + MissingTxnList map[uint64]*pb.Transaction `protobuf:"bytes,3,rep,name=missing_txn_list,json=missingTxnList,proto3" json:"missing_txn_list,omitempty" protobuf_key:"varint,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *FetchTxnResponse) Reset() { *m = FetchTxnResponse{} } +func (m *FetchTxnResponse) String() string { return proto.CompactTextString(m) } +func (*FetchTxnResponse) ProtoMessage() {} +func (*FetchTxnResponse) Descriptor() ([]byte, []int) { + return fileDescriptor_33c57e4bae7b9afd, []int{2} +} +func (m *FetchTxnResponse) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *FetchTxnResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_FetchTxnResponse.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *FetchTxnResponse) XXX_Merge(src proto.Message) { + xxx_messageInfo_FetchTxnResponse.Merge(m, src) +} +func (m *FetchTxnResponse) XXX_Size() int { + return m.Size() +} +func (m *FetchTxnResponse) XXX_DiscardUnknown() { + xxx_messageInfo_FetchTxnResponse.DiscardUnknown(m) +} + +var xxx_messageInfo_FetchTxnResponse proto.InternalMessageInfo + +func (m *FetchTxnResponse) GetReplicaId() uint64 { + if m != nil { + return m.ReplicaId + } + return 0 +} + +func (m *FetchTxnResponse) GetHeight() uint64 { + if m != nil { + return m.Height + } + return 0 +} + +func (m *FetchTxnResponse) GetMissingTxnList() map[uint64]*pb.Transaction { + if m != nil { + return m.MissingTxnList + } + return nil +} + +func init() { + proto.RegisterType((*TxSlice)(nil), "mempool.tx_slice") + proto.RegisterType((*FetchTxnRequest)(nil), "mempool.fetch_txn_request") + proto.RegisterMapType((map[uint64]string)(nil), "mempool.fetch_txn_request.MissingTxHashesEntry") + proto.RegisterType((*FetchTxnResponse)(nil), "mempool.fetch_txn_response") + proto.RegisterMapType((map[uint64]*pb.Transaction)(nil), "mempool.fetch_txn_response.MissingTxnListEntry") +} + +func init() { proto.RegisterFile("message.proto", fileDescriptor_33c57e4bae7b9afd) } + +var fileDescriptor_33c57e4bae7b9afd = []byte{ + // 357 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x9c, 0x92, 0xcd, 0x4a, 0xeb, 0x40, + 0x14, 0xc7, 0x99, 0xf6, 0xde, 0xde, 0xdb, 0xf1, 0xa3, 0xed, 0x58, 0x24, 0x14, 0x29, 0xa5, 0x20, + 0x76, 0x63, 0x02, 0x16, 0x44, 0x5c, 0x16, 0x04, 0x05, 0xdd, 0x84, 0x6e, 0xc4, 0x45, 0x48, 0xd2, + 0x63, 0x66, 0x30, 0xf3, 0x61, 0x66, 0x22, 0xe9, 0x1b, 0xba, 0xf4, 0x11, 0xa4, 0x4b, 0xdf, 0xc0, + 0x9d, 0x34, 0x09, 0xa4, 0xc5, 0xe0, 0xc2, 0xdd, 0x9c, 0x8f, 0xff, 0xfc, 0xcf, 0xef, 0xcc, 0xe0, + 0x3d, 0x0e, 0x5a, 0xfb, 0x11, 0xd8, 0x2a, 0x91, 0x46, 0x92, 0x7f, 0x1c, 0xb8, 0x92, 0x32, 0x1e, + 0x9c, 0x47, 0xcc, 0xd0, 0x34, 0xb0, 0x43, 0xc9, 0x1d, 0x0e, 0x9a, 0xaa, 0x38, 0xd5, 0x4e, 0xc0, + 0x4c, 0x46, 0xd3, 0xe0, 0x94, 0xcb, 0x05, 0xc4, 0x8e, 0x0a, 0x1c, 0x93, 0xf8, 0x42, 0xfb, 0xa1, + 0x61, 0x52, 0x14, 0x17, 0x8c, 0xa7, 0xf8, 0xbf, 0xc9, 0x3c, 0x1d, 0xb3, 0x10, 0xc8, 0x09, 0x6e, + 0xcd, 0xb3, 0x5b, 0xa6, 0x8d, 0x85, 0x46, 0xcd, 0xc9, 0xce, 0x59, 0xc7, 0x56, 0x81, 0x3d, 0xaf, + 0x24, 0x6e, 0x59, 0x1e, 0x7f, 0x20, 0xdc, 0x7b, 0x04, 0x13, 0x52, 0xcf, 0x64, 0xc2, 0x4b, 0xe0, + 0x39, 0x05, 0x6d, 0xc8, 0x11, 0x6e, 0x27, 0xa0, 0x62, 0x16, 0xfa, 0x37, 0x0b, 0x0b, 0x8d, 0xd0, + 0xe4, 0x8f, 0x5b, 0x25, 0xc8, 0x21, 0x6e, 0x51, 0x60, 0x11, 0x35, 0x56, 0x23, 0x2f, 0x95, 0x11, + 0x79, 0xc0, 0x3d, 0xce, 0xb4, 0x66, 0x22, 0xf2, 0x4c, 0xe6, 0x51, 0x5f, 0x53, 0xd0, 0x56, 0x33, + 0xf7, 0x77, 0xec, 0x92, 0xce, 0xfe, 0x66, 0x66, 0xdf, 0x15, 0x9a, 0x79, 0x76, 0x9d, 0x2b, 0xae, + 0x84, 0x49, 0x96, 0x6e, 0x87, 0x6f, 0x67, 0x07, 0x33, 0xdc, 0xaf, 0x6b, 0x24, 0x5d, 0xdc, 0x7c, + 0x82, 0x65, 0x39, 0xe4, 0xfa, 0x48, 0xfa, 0xf8, 0xef, 0x8b, 0x1f, 0xa7, 0x90, 0x4f, 0xd7, 0x76, + 0x8b, 0xe0, 0xb2, 0x71, 0x81, 0xc6, 0x9f, 0x08, 0x93, 0x4d, 0x7f, 0xad, 0xa4, 0xd0, 0xf0, 0x4b, + 0xda, 0x7b, 0xdc, 0xad, 0x68, 0x85, 0x17, 0xaf, 0x97, 0xfd, 0x13, 0x6c, 0x61, 0x56, 0xd1, 0x8a, + 0xf5, 0xfe, 0x0b, 0xd8, 0x7d, 0xbe, 0x95, 0x1c, 0xb8, 0xf8, 0xa0, 0xa6, 0xad, 0x06, 0xf5, 0x78, + 0x13, 0xb5, 0xe6, 0x95, 0x2b, 0xf6, 0xd9, 0xee, 0xeb, 0x6a, 0x88, 0xde, 0x56, 0x43, 0xf4, 0xbe, + 0x1a, 0xa2, 0xa0, 0x95, 0x7f, 0x99, 0xe9, 0x57, 0x00, 0x00, 0x00, 0xff, 0xff, 0x1d, 0xc0, 0xd7, + 0xb5, 0x84, 0x02, 0x00, 0x00, +} + +func (m *TxSlice) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *TxSlice) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *TxSlice) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if m.XXX_unrecognized != nil { + i -= len(m.XXX_unrecognized) + copy(dAtA[i:], m.XXX_unrecognized) + } + if len(m.TxList) > 0 { + for iNdEx := len(m.TxList) - 1; iNdEx >= 0; iNdEx-- { + { + size, err := m.TxList[iNdEx].MarshalToSizedBuffer(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = encodeVarintMessage(dAtA, i, uint64(size)) + } + i-- + dAtA[i] = 0xa + } + } + return len(dAtA) - i, nil +} + +func (m *FetchTxnRequest) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *FetchTxnRequest) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *FetchTxnRequest) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if m.XXX_unrecognized != nil { + i -= len(m.XXX_unrecognized) + copy(dAtA[i:], m.XXX_unrecognized) + } + if len(m.MissingTxHashes) > 0 { + for k := range m.MissingTxHashes { + v := m.MissingTxHashes[k] + baseI := i + i -= len(v) + copy(dAtA[i:], v) + i = encodeVarintMessage(dAtA, i, uint64(len(v))) + i-- + dAtA[i] = 0x12 + i = encodeVarintMessage(dAtA, i, uint64(k)) + i-- + dAtA[i] = 0x8 + i = encodeVarintMessage(dAtA, i, uint64(baseI-i)) + i-- + dAtA[i] = 0x1a + } + } + if m.Height != 0 { + i = encodeVarintMessage(dAtA, i, uint64(m.Height)) + i-- + dAtA[i] = 0x10 + } + if m.ReplicaId != 0 { + i = encodeVarintMessage(dAtA, i, uint64(m.ReplicaId)) + i-- + dAtA[i] = 0x8 + } + return len(dAtA) - i, nil +} + +func (m *FetchTxnResponse) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *FetchTxnResponse) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *FetchTxnResponse) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if m.XXX_unrecognized != nil { + i -= len(m.XXX_unrecognized) + copy(dAtA[i:], m.XXX_unrecognized) + } + if len(m.MissingTxnList) > 0 { + for k := range m.MissingTxnList { + v := m.MissingTxnList[k] + baseI := i + if v != nil { + { + size, err := v.MarshalToSizedBuffer(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = encodeVarintMessage(dAtA, i, uint64(size)) + } + i-- + dAtA[i] = 0x12 + } + i = encodeVarintMessage(dAtA, i, uint64(k)) + i-- + dAtA[i] = 0x8 + i = encodeVarintMessage(dAtA, i, uint64(baseI-i)) + i-- + dAtA[i] = 0x1a + } + } + if m.Height != 0 { + i = encodeVarintMessage(dAtA, i, uint64(m.Height)) + i-- + dAtA[i] = 0x10 + } + if m.ReplicaId != 0 { + i = encodeVarintMessage(dAtA, i, uint64(m.ReplicaId)) + i-- + dAtA[i] = 0x8 + } + return len(dAtA) - i, nil +} + +func encodeVarintMessage(dAtA []byte, offset int, v uint64) int { + offset -= sovMessage(v) + base := offset + for v >= 1<<7 { + dAtA[offset] = uint8(v&0x7f | 0x80) + v >>= 7 + offset++ + } + dAtA[offset] = uint8(v) + return base +} +func (m *TxSlice) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + if len(m.TxList) > 0 { + for _, e := range m.TxList { + l = e.Size() + n += 1 + l + sovMessage(uint64(l)) + } + } + if m.XXX_unrecognized != nil { + n += len(m.XXX_unrecognized) + } + return n +} + +func (m *FetchTxnRequest) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + if m.ReplicaId != 0 { + n += 1 + sovMessage(uint64(m.ReplicaId)) + } + if m.Height != 0 { + n += 1 + sovMessage(uint64(m.Height)) + } + if len(m.MissingTxHashes) > 0 { + for k, v := range m.MissingTxHashes { + _ = k + _ = v + mapEntrySize := 1 + sovMessage(uint64(k)) + 1 + len(v) + sovMessage(uint64(len(v))) + n += mapEntrySize + 1 + sovMessage(uint64(mapEntrySize)) + } + } + if m.XXX_unrecognized != nil { + n += len(m.XXX_unrecognized) + } + return n +} + +func (m *FetchTxnResponse) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + if m.ReplicaId != 0 { + n += 1 + sovMessage(uint64(m.ReplicaId)) + } + if m.Height != 0 { + n += 1 + sovMessage(uint64(m.Height)) + } + if len(m.MissingTxnList) > 0 { + for k, v := range m.MissingTxnList { + _ = k + _ = v + l = 0 + if v != nil { + l = v.Size() + l += 1 + sovMessage(uint64(l)) + } + mapEntrySize := 1 + sovMessage(uint64(k)) + l + n += mapEntrySize + 1 + sovMessage(uint64(mapEntrySize)) + } + } + if m.XXX_unrecognized != nil { + n += len(m.XXX_unrecognized) + } + return n +} + +func sovMessage(x uint64) (n int) { + return (math_bits.Len64(x|1) + 6) / 7 +} +func sozMessage(x uint64) (n int) { + return sovMessage(uint64((x << 1) ^ uint64((int64(x) >> 63)))) +} +func (m *TxSlice) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowMessage + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: tx_slice: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: tx_slice: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field TxList", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowMessage + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthMessage + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthMessage + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.TxList = append(m.TxList, &pb.Transaction{}) + if err := m.TxList[len(m.TxList)-1].Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipMessage(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthMessage + } + if (iNdEx + skippy) < 0 { + return ErrInvalidLengthMessage + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + m.XXX_unrecognized = append(m.XXX_unrecognized, dAtA[iNdEx:iNdEx+skippy]...) + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *FetchTxnRequest) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowMessage + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: fetch_txn_request: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: fetch_txn_request: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field ReplicaId", wireType) + } + m.ReplicaId = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowMessage + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.ReplicaId |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + case 2: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Height", wireType) + } + m.Height = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowMessage + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.Height |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + case 3: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field MissingTxHashes", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowMessage + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthMessage + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthMessage + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + if m.MissingTxHashes == nil { + m.MissingTxHashes = make(map[uint64]string) + } + var mapkey uint64 + var mapvalue string + for iNdEx < postIndex { + entryPreIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowMessage + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + if fieldNum == 1 { + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowMessage + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + mapkey |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + } else if fieldNum == 2 { + var stringLenmapvalue uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowMessage + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLenmapvalue |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLenmapvalue := int(stringLenmapvalue) + if intStringLenmapvalue < 0 { + return ErrInvalidLengthMessage + } + postStringIndexmapvalue := iNdEx + intStringLenmapvalue + if postStringIndexmapvalue < 0 { + return ErrInvalidLengthMessage + } + if postStringIndexmapvalue > l { + return io.ErrUnexpectedEOF + } + mapvalue = string(dAtA[iNdEx:postStringIndexmapvalue]) + iNdEx = postStringIndexmapvalue + } else { + iNdEx = entryPreIndex + skippy, err := skipMessage(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthMessage + } + if (iNdEx + skippy) > postIndex { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + m.MissingTxHashes[mapkey] = mapvalue + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipMessage(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthMessage + } + if (iNdEx + skippy) < 0 { + return ErrInvalidLengthMessage + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + m.XXX_unrecognized = append(m.XXX_unrecognized, dAtA[iNdEx:iNdEx+skippy]...) + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *FetchTxnResponse) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowMessage + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: fetch_txn_response: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: fetch_txn_response: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field ReplicaId", wireType) + } + m.ReplicaId = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowMessage + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.ReplicaId |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + case 2: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Height", wireType) + } + m.Height = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowMessage + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.Height |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + case 3: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field MissingTxnList", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowMessage + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthMessage + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthMessage + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + if m.MissingTxnList == nil { + m.MissingTxnList = make(map[uint64]*pb.Transaction) + } + var mapkey uint64 + var mapvalue *pb.Transaction + for iNdEx < postIndex { + entryPreIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowMessage + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + if fieldNum == 1 { + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowMessage + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + mapkey |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + } else if fieldNum == 2 { + var mapmsglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowMessage + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + mapmsglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if mapmsglen < 0 { + return ErrInvalidLengthMessage + } + postmsgIndex := iNdEx + mapmsglen + if postmsgIndex < 0 { + return ErrInvalidLengthMessage + } + if postmsgIndex > l { + return io.ErrUnexpectedEOF + } + mapvalue = &pb.Transaction{} + if err := mapvalue.Unmarshal(dAtA[iNdEx:postmsgIndex]); err != nil { + return err + } + iNdEx = postmsgIndex + } else { + iNdEx = entryPreIndex + skippy, err := skipMessage(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthMessage + } + if (iNdEx + skippy) > postIndex { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + m.MissingTxnList[mapkey] = mapvalue + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipMessage(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthMessage + } + if (iNdEx + skippy) < 0 { + return ErrInvalidLengthMessage + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + m.XXX_unrecognized = append(m.XXX_unrecognized, dAtA[iNdEx:iNdEx+skippy]...) + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func skipMessage(dAtA []byte) (n int, err error) { + l := len(dAtA) + iNdEx := 0 + depth := 0 + for iNdEx < l { + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowMessage + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + wireType := int(wire & 0x7) + switch wireType { + case 0: + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowMessage + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + iNdEx++ + if dAtA[iNdEx-1] < 0x80 { + break + } + } + case 1: + iNdEx += 8 + case 2: + var length int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowMessage + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + length |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + if length < 0 { + return 0, ErrInvalidLengthMessage + } + iNdEx += length + case 3: + depth++ + case 4: + if depth == 0 { + return 0, ErrUnexpectedEndOfGroupMessage + } + depth-- + case 5: + iNdEx += 4 + default: + return 0, fmt.Errorf("proto: illegal wireType %d", wireType) + } + if iNdEx < 0 { + return 0, ErrInvalidLengthMessage + } + if depth == 0 { + return iNdEx, nil + } + } + return 0, io.ErrUnexpectedEOF +} + +var ( + ErrInvalidLengthMessage = fmt.Errorf("proto: negative length found during unmarshaling") + ErrIntOverflowMessage = fmt.Errorf("proto: integer overflow") + ErrUnexpectedEndOfGroupMessage = fmt.Errorf("proto: unexpected end of group") +) diff --git a/pkg/order/mempool/message.proto b/pkg/order/mempool/message.proto new file mode 100644 index 0000000..58ef4ea --- /dev/null +++ b/pkg/order/mempool/message.proto @@ -0,0 +1,23 @@ +syntax = "proto3"; + +package mempool; + +import "github.com/meshplus/bitxhub-model/pb/transaction.proto"; + +message tx_slice { + repeated pb.Transaction TxList = 1; +} + +message fetch_txn_request { + uint64 replicaId = 1; + uint64 height = 2; + map missing_tx_hashes = 3; +} + +message fetch_txn_response { + uint64 replicaId = 1; + uint64 height = 2; + map missing_txn_list = 3; +} + + diff --git a/pkg/order/mempool/network.go b/pkg/order/mempool/network.go new file mode 100644 index 0000000..72d3a3d --- /dev/null +++ b/pkg/order/mempool/network.go @@ -0,0 +1,27 @@ +package mempool + +import ( + "github.com/meshplus/bitxhub-model/pb" +) + +// broadcast the new transaction to other nodes +func (mpi *mempoolImpl) broadcast(m *pb.Message) { + for id := range mpi.peerMgr.Peers() { + if id == mpi.localID { + continue + } + go func(id uint64) { + if err := mpi.peerMgr.AsyncSend(id, m); err != nil { + mpi.logger.Debugf("Send tx slice to peer %d failed, err: %s", id, err.Error()) + } + }(id) + } +} + +func (mpi *mempoolImpl) unicast(to uint64, m *pb.Message) { + go func() { + if err := mpi.peerMgr.AsyncSend(to, m); err != nil { + mpi.logger.Error("Send message to peer %d failed, err: %s", to, err.Error()) + } + }() +} diff --git a/pkg/order/mempool/storage.go b/pkg/order/mempool/storage.go new file mode 100644 index 0000000..bb471f6 --- /dev/null +++ b/pkg/order/mempool/storage.go @@ -0,0 +1,54 @@ +package mempool + +import ( + "fmt" + + "github.com/meshplus/bitxhub-kit/types" + "github.com/meshplus/bitxhub-model/pb" +) + +// batchStore persists batch into DB, which +func (mpi *mempoolImpl) batchStore(txList []*pb.Transaction) { + batch := mpi.storage.NewBatch() + for _, tx := range txList { + txKey := compositeKey(tx.TransactionHash.Bytes()) + txData, _ := tx.Marshal() + batch.Put(txKey, txData) + } + batch.Commit() +} + +// batchDelete batch delete txs +func (mpi *mempoolImpl) batchDelete(hashes []types.Hash) { + batch := mpi.storage.NewBatch() + for _, hash := range hashes { + txKey := compositeKey(hash.Bytes()) + batch.Delete(txKey) + } + batch.Commit() +} + +func (mpi *mempoolImpl) store(tx *pb.Transaction) { + txKey := compositeKey(tx.TransactionHash.Bytes()) + txData, _ := tx.Marshal() + mpi.storage.Put(txKey, txData) +} + +func (mpi *mempoolImpl) load(hash types.Hash) (*pb.Transaction, bool) { + txKey := compositeKey(hash.Bytes()) + txData := mpi.storage.Get(txKey) + if txData == nil { + return nil, false + } + var tx pb.Transaction + if err := tx.Unmarshal(txData); err != nil { + mpi.logger.Error(err) + return nil, false + } + return &tx, true +} + +func compositeKey(value interface{}) []byte { + var prefix = []byte("tx-") + return append(prefix, []byte(fmt.Sprintf("%v", value))...) +} diff --git a/pkg/order/mempool/tx_cache.go b/pkg/order/mempool/tx_cache.go new file mode 100644 index 0000000..1eea6ac --- /dev/null +++ b/pkg/order/mempool/tx_cache.go @@ -0,0 +1,101 @@ +package mempool + +import ( + "time" + + "github.com/meshplus/bitxhub-model/pb" + "github.com/meshplus/bitxhub/internal/loggers" + + "github.com/sirupsen/logrus" +) + +type TxCache struct { + recvTxC chan *pb.Transaction + txSetC chan *TxSlice + txSet []*pb.Transaction + logger logrus.FieldLogger + + timerC chan bool + stopTimerC chan bool + close chan bool + txSetTick time.Duration +} + +func newTxCache(txSliceTimeout time.Duration) *TxCache { + txCache := &TxCache{} + txCache.recvTxC = make(chan *pb.Transaction, DefaultTxCacheSize) + txCache.close = make(chan bool) + txCache.txSetC = make(chan *TxSlice) + txCache.timerC = make(chan bool) + txCache.stopTimerC = make(chan bool) + txCache.txSet = make([]*pb.Transaction, 0) + txCache.logger = loggers.Logger(loggers.Order) + if txSliceTimeout == 0 { + txCache.txSetTick = DefaultTxSetTick + } else { + txCache.txSetTick = txSliceTimeout + } + return txCache +} + +func (tc *TxCache) listenEvent() { + for { + select { + case <-tc.close: + tc.logger.Info("Exit transaction cache") + + case tx := <-tc.recvTxC: + tc.appendTx(tx) + + case <-tc.timerC: + tc.stopTxSetTimer() + tc.postTxSet() + } + } +} + +func (tc *TxCache) appendTx(tx *pb.Transaction) { + if tx == nil { + tc.logger.Errorf("Transaction is nil") + return + } + if len(tc.txSet) == 0 { + tc.startTxSetTimer() + } + tc.txSet = append(tc.txSet, tx) + if len(tc.txSet) >= DefaultTxSetSize { + tc.stopTxSetTimer() + tc.postTxSet() + } +} + +func (tc *TxCache) postTxSet() { + dst := make([]*pb.Transaction, len(tc.txSet)) + copy(dst, tc.txSet) + txSet := &TxSlice{ + TxList: dst, + } + tc.txSetC <- txSet + tc.txSet = make([]*pb.Transaction, 0) +} + +func (tc *TxCache) IsFull() bool { + return len(tc.recvTxC) == DefaultTxCacheSize +} + +func (tc *TxCache) startTxSetTimer() { + go func() { + timer := time.NewTimer(tc.txSetTick) + select { + case <-timer.C: + tc.timerC <- true + case <-tc.stopTimerC: + return + } + }() +} + +func (tc *TxCache) stopTxSetTimer() { + close(tc.stopTimerC) + tc.stopTimerC = make(chan bool) +} diff --git a/pkg/order/mempool/tx_store.go b/pkg/order/mempool/tx_store.go new file mode 100644 index 0000000..2d55c5f --- /dev/null +++ b/pkg/order/mempool/tx_store.go @@ -0,0 +1,140 @@ +package mempool + +import ( + "github.com/google/btree" + "github.com/meshplus/bitxhub-model/pb" +) + +type transactionStore struct { + // track all valid tx hashes cached in mempool + txHashMap map[string]*orderedIndexKey + // track all valid tx, mapping user' account to all related transactions. + allTxs map[string]*txSortedMap + // track the commit nonce and pending nonce of each account. + nonceCache *nonceCache + // keeps track of "non-ready" txs (txs that can't be included in next block) + // only used to help remove some txs if pool is full. + parkingLotIndex *btreeIndex + // keeps track of "ready" txs + priorityIndex *btreeIndex + // cache all the batched txs which haven't executed. + batchedTxs map[orderedIndexKey]bool + // cache all batches created by current primary in order, removed after they are been executed. + // TODO (YH): change the type of key from height to digest. + batchedCache map[uint64][]*pb.Transaction + // trace the missing transaction + missingBatch map[uint64]map[uint64]string + // track the current size of mempool + poolSize int32 + // track the non-batch priority transaction. + priorityNonBatchSize uint64 +} + +func newTransactionStore() *transactionStore { + return &transactionStore{ + txHashMap: make(map[string]*orderedIndexKey, 0), + allTxs: make(map[string]*txSortedMap), + batchedTxs: make(map[orderedIndexKey]bool), + missingBatch: make(map[uint64]map[uint64]string), + batchedCache: make(map[uint64][]*pb.Transaction), + parkingLotIndex: newBtreeIndex(), + priorityIndex: newBtreeIndex(), + nonceCache: newNonceCache(), + } +} + +// Get transaction by account address + nonce +func (txStore *transactionStore) getTxByOrderKey(account string, seqNo uint64) *pb.Transaction { + if list, ok := txStore.allTxs[account]; ok { + res := list.items[seqNo] + if res == nil { + return nil + } + return res.tx + } + return nil +} + +type txSortedMap struct { + items map[uint64]*txItem // map nonce to transaction + index *btreeIndex // index for items +} + +func newTxSortedMap() *txSortedMap { + return &txSortedMap{ + items: make(map[uint64]*txItem), + index: newBtreeIndex(), + } +} + +func (m *txSortedMap) filterReady(demandNonce uint64) ([]*pb.Transaction, []*pb.Transaction, uint64) { + var readyTxs, nonReadyTxs []*pb.Transaction + if m.index.data.Len() == 0 { + return nil, nil, demandNonce + } + demandKey := makeSortedNonceKeyKey(demandNonce) + m.index.data.AscendGreaterOrEqual(demandKey, func(i btree.Item) bool { + nonce := i.(*sortedNonceKey).nonce + if nonce == demandNonce { + readyTxs = append(readyTxs, m.items[demandNonce].tx) + demandNonce++ + } else { + nonReadyTxs = append(nonReadyTxs, m.items[nonce].tx) + } + return true + }) + + return readyTxs, nonReadyTxs, demandNonce +} + +// forward removes all allTxs from the map with a nonce lower than the +// provided commitNonce. +func (m *txSortedMap) forward(commitNonce uint64) map[string][]*pb.Transaction { + removedTxs := make(map[string][]*pb.Transaction) + commitNonceKey := makeSortedNonceKeyKey(commitNonce) + m.index.data.AscendLessThan(commitNonceKey, func(i btree.Item) bool { + // delete tx from map. + nonce := i.(*sortedNonceKey).nonce + txItem := m.items[nonce] + account := txItem.account + if _, ok := removedTxs[account]; ! ok { + removedTxs[account] = make([]*pb.Transaction, 0) + } + removedTxs[account] = append(removedTxs[account], txItem.tx) + delete(m.items, nonce) + return true + }) + return removedTxs +} + +type nonceCache struct { + // commitNonces records each account's latest committed nonce in ledger. + commitNonces map[string]uint64 + // pendingNonces records each account's latest nonce which has been included in + // priority queue. Invariant: pendingNonces[account] >= commitNonces[account] + pendingNonces map[string]uint64 +} + +func (nc *nonceCache) getCommitNonce(account string) uint64 { + nonce, ok := nc.commitNonces[account] + if !ok { + return 1 + } + return nonce +} + +func (nc *nonceCache) setCommitNonce(account string, nonce uint64) { + nc.commitNonces[account] = nonce +} + +func (nc *nonceCache) getPendingNonce(account string) uint64 { + nonce, ok := nc.pendingNonces[account] + if !ok { + return 1 + } + return nonce +} + +func (nc *nonceCache) setPendingNonce(account string, nonce uint64) { + nc.pendingNonces[account] = nonce +} diff --git a/pkg/order/mempool/types.go b/pkg/order/mempool/types.go new file mode 100644 index 0000000..6e87aa7 --- /dev/null +++ b/pkg/order/mempool/types.go @@ -0,0 +1,95 @@ +package mempool + +import ( + "time" + + "github.com/meshplus/bitxhub-kit/types" + "github.com/meshplus/bitxhub-model/pb" + raftproto "github.com/meshplus/bitxhub/pkg/order/etcdraft/proto" + "github.com/meshplus/bitxhub/pkg/peermgr" + + cmap "github.com/orcaman/concurrent-map" +) + +const ( + btreeDegree = 10 +) + +const ( + DefaultPoolSize = 50000 + DefaultTxCacheSize = 10000 + DefaultBatchSize = 500 + DefaultTxSetSize = 10 + + DefaultBatchTick = 500 * time.Millisecond + DefaultTxSetTick = 100 * time.Millisecond + DefaultFetchTxnTimeout = 3 * time.Second +) + +// batch timer reasons +const ( + StartReason1 = "first transaction set" + StartReason2 = "finish executing a batch" + + StopReason1 = "generated a batch by batch timer" + StopReason2 = "generated a batch by batch size" + StopReason3 = "restart batch timer" +) + +// IBTP methods +const ( + IBTPMethod1 = "HandleIBTP" + IBTPMethod2 = "HandleIBTPs" +) + +type LocalMissingTxnEvent struct { + Height uint64 + MissingTxnHashList map[uint64]string + WaitC chan bool +} + +type subscribeEvent struct { + txForwardC chan *TxSlice + localMissingTxnEvent chan *LocalMissingTxnEvent + fetchTxnRequestC chan *FetchTxnRequest + fetchTxnResponseC chan *FetchTxnResponse + getBlockC chan *constructBatchEvent + commitTxnC chan *raftproto.Ready + updateLeaderC chan uint64 +} + +type mempoolBatch struct { + missingTxnHashList map[uint64]string + txList []*pb.Transaction +} + +type constructBatchEvent struct { + ready *raftproto.Ready + result chan *mempoolBatch +} + +type Config struct { + ID uint64 + + BatchSize uint64 + PoolSize uint64 + TxSliceSize uint64 + BatchTick time.Duration + FetchTimeout time.Duration + TxSliceTimeout time.Duration + + PeerMgr peermgr.PeerManager + GetTransactionFunc func(hash types.Hash) (*pb.Transaction, error) + ChainHeight uint64 +} + +type timerManager struct { + timeout time.Duration // default timeout of this timer + isActive cmap.ConcurrentMap // track all the timers with this timerName if it is active now + timeoutEventC chan bool +} + +type txItem struct { + account string + tx *pb.Transaction +} diff --git a/pkg/order/mempool/utils.go b/pkg/order/mempool/utils.go new file mode 100644 index 0000000..1c9a30c --- /dev/null +++ b/pkg/order/mempool/utils.go @@ -0,0 +1,146 @@ +package mempool + +import ( + "encoding/hex" + "errors" + "fmt" + cmap "github.com/orcaman/concurrent-map" + "strconv" + "sync/atomic" + "time" + + "github.com/meshplus/bitxhub-kit/types" + "github.com/meshplus/bitxhub-model/pb" + raftproto "github.com/meshplus/bitxhub/pkg/order/etcdraft/proto" +) + +func (mpi *mempoolImpl) getBatchSeqNo() uint64 { + return atomic.LoadUint64(&mpi.batchSeqNo) +} + +func (mpi *mempoolImpl) increaseBatchSeqNo() { + atomic.AddUint64(&mpi.batchSeqNo, 1) +} + +// getTxByTxPointer returns the tx stored in allTxs by given TxPointer. +func (mpi *mempoolImpl) getTxByTxPointer(txPointer orderedIndexKey) *pb.Transaction { + if txnMap, ok := mpi.txStore.allTxs[txPointer.account]; ok { + return txnMap.items[txPointer.nonce].tx + } + return nil +} + +func (mpi *mempoolImpl) msgToConsensusPbMsg(data []byte, tyr raftproto.RaftMessage_Type) *pb.Message { + rm := &raftproto.RaftMessage{ + Type: tyr, + FromId: mpi.localID, + Data: data, + } + cmData, err := rm.Marshal() + if err != nil { + return nil + } + msg := &pb.Message{ + Type: pb.Message_CONSENSUS, + Data: cmData, + } + return msg +} + +func newSubscribe() *subscribeEvent { + return &subscribeEvent{ + txForwardC: make(chan *TxSlice), + localMissingTxnEvent: make(chan *LocalMissingTxnEvent), + fetchTxnRequestC: make(chan *FetchTxnRequest), + updateLeaderC: make(chan uint64), + fetchTxnResponseC: make(chan *FetchTxnResponse), + commitTxnC: make(chan *raftproto.Ready), + getBlockC: make(chan *constructBatchEvent), + } +} + +// TODO (YH): restore commitNonce and pendingNonce from db. +func newNonceCache() *nonceCache { + return &nonceCache{ + commitNonces: make(map[string]uint64), + pendingNonces: make(map[string]uint64), + } +} + +func hex2Hash(hash string) (types.Hash, error) { + var ( + hubHash types.Hash + hashBytes []byte + err error + ) + if hashBytes, err = hex.DecodeString(hash); err != nil { + return types.Hash{}, err + } + if len(hashBytes) != types.HashLength { + return types.Hash{}, errors.New("invalid tx hash") + } + copy(hubHash[:], hashBytes) + return hubHash, nil +} + +func (mpi *mempoolImpl) poolIsFull() bool { + return atomic.LoadInt32(&mpi.txStore.poolSize) >= DefaultPoolSize +} + +func (mpi *mempoolImpl) isLeader() bool { + return mpi.leader == mpi.localID +} + +func (mpi *mempoolImpl) isBatchTimerActive() bool { + return !mpi.batchTimerMgr.isActive.IsEmpty() +} + +// startBatchTimer starts the batch timer and reset the batchTimerActive to true. +func (mpi *mempoolImpl) startBatchTimer(reason string) { + // stop old timer + mpi.stopBatchTimer(StopReason3) + mpi.logger.Debugf("Start batch timer, reason: %s",reason) + timestamp := time.Now().UnixNano() + key := strconv.FormatInt(timestamp, 10) + mpi.batchTimerMgr.isActive.Set(key, true) + + time.AfterFunc(mpi.batchTimerMgr.timeout, func() { + if mpi.batchTimerMgr.isActive.Has(key) { + mpi.batchTimerMgr.timeoutEventC <- true + } + }) +} + +// stopBatchTimer stops the batch timer and reset the batchTimerActive to false. +func (mpi *mempoolImpl) stopBatchTimer(reason string) { + if mpi.batchTimerMgr.isActive.IsEmpty() { + return + } + mpi.logger.Debugf("Stop batch timer, reason: %s", reason) + mpi.batchTimerMgr.isActive = cmap.New() +} + +// newTimer news a timer with default timeout. +func newTimer(d time.Duration) *timerManager { + return &timerManager{ + timeout: d, + isActive: cmap.New(), + timeoutEventC: make(chan bool), + } +} + +func getAccount(tx *pb.Transaction) (string, error) { + payload := &pb.InvokePayload{} + if err := payload.Unmarshal(tx.Data.Payload); err != nil { + return "",fmt.Errorf("unmarshal invoke payload: %s", err.Error()) + } + if payload.Method == IBTPMethod1 || payload.Method == IBTPMethod2 { + ibtp := &pb.IBTP{} + if err := ibtp.Unmarshal(payload.Args[0].Value); err != nil { + return "", fmt.Errorf("unmarshal ibtp from tx :%w", err) + } + account := fmt.Sprintf("%s-%s",ibtp.From,ibtp.To) + return account, nil + } + return tx.From.Hex(), nil +} \ No newline at end of file