feat(mempool): add mempool feature

This commit is contained in:
Lizen 2020-09-24 15:11:33 +08:00
parent a5d4752606
commit d156957976
20 changed files with 2884 additions and 565 deletions

View File

@ -6,7 +6,13 @@ max_inflight_msgs = 500 # MaxInflightMsgs limits the max number of in-
check_quorum = true # Leader steps down when quorum is not active for an electionTimeout.
pre_vote = true # PreVote prevents reconnected node from disturbing network.
disable_proposal_forwarding = true # This prevents blocks from being accidentally proposed by followers.
[raft.tx_pool]
pack_size = 500 # How many transactions should the primary pack.
[raft.mempool]
batch_size = 200 # How many transactions should the primary pack.
pool_size = 50000 # How many transactions could the txPool stores in total.
block_tick = "500ms" # Block packaging time period.
tx_slice_size = 10 # How many transactions should the node broadcast at once
batch_tick = "0.3s" # Block packaging time period.
tx_slice_timeout = "0.1s" # Node broadcasts transactions if there are cached transactions, although set_size isn't reached yet
fetch_timeout = "3s" # How long to wait before fetching missing transactions finished

2
go.mod
View File

@ -11,6 +11,7 @@ require (
github.com/gogo/protobuf v1.3.1
github.com/golang/mock v1.4.3
github.com/golang/protobuf v1.4.2 // indirect
github.com/google/btree v1.0.0
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0
github.com/grpc-ecosystem/grpc-gateway v1.13.0
github.com/hokaccha/go-prettyjson v0.0.0-20190818114111-108c894c2c0e
@ -22,6 +23,7 @@ require (
github.com/meshplus/go-lightp2p v0.0.0-20200817105923-6b3aee40fa54
github.com/mitchellh/go-homedir v1.1.0
github.com/multiformats/go-multiaddr v0.2.2
github.com/orcaman/concurrent-map v0.0.0-20190826125027-8c72a8bb44f6
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.5.0
github.com/rs/cors v1.7.0

2
go.sum
View File

@ -818,6 +818,8 @@ github.com/opentracing/opentracing-go v1.0.2 h1:3jA2P6O1F9UOrWVpwrIo17pu01KWvNWg
github.com/opentracing/opentracing-go v1.0.2/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o=
github.com/opentracing/opentracing-go v1.1.0 h1:pWlfV3Bxv7k65HYwkikxat0+s3pV4bsqf19k25Ur8rU=
github.com/opentracing/opentracing-go v1.1.0/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o=
github.com/orcaman/concurrent-map v0.0.0-20190826125027-8c72a8bb44f6 h1:lNCW6THrCKBiJBpz8kbVGjC7MgdCGKwuvBgc7LoD6sw=
github.com/orcaman/concurrent-map v0.0.0-20190826125027-8c72a8bb44f6/go.mod h1:Lu3tH6HLW3feq74c2GC+jIMS/K2CFcDWnWD9XkenwhI=
github.com/pborman/uuid v0.0.0-20170112150404-1b00554d8222/go.mod h1:VyrYX9gd7irzKovcSS6BIIEwPRkP2Wm2m9ufcdFSJ34=
github.com/pelletier/go-toml v1.2.0 h1:T5zMGML61Wp+FlcbWjRDT7yAxhJNAiPPLOFECq181zc=
github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic=

View File

@ -8,7 +8,6 @@ import (
"github.com/ethereum/go-ethereum/event"
"github.com/meshplus/bitxhub-core/validator"
"github.com/meshplus/bitxhub-kit/log"
"github.com/meshplus/bitxhub-kit/types"
"github.com/meshplus/bitxhub-model/pb"
"github.com/meshplus/bitxhub/internal/constant"
@ -52,7 +51,7 @@ type BlockExecutor struct {
// New creates executor instance
func New(chainLedger ledger.Ledger, logger logrus.FieldLogger) (*BlockExecutor, error) {
ibtpVerify := proof.New(chainLedger, log.NewWithModule("proof"))
ibtpVerify := proof.New(chainLedger, logger)
boltContracts := registerBoltContracts()

View File

@ -13,10 +13,14 @@ type RAFTConfig struct {
RAFT RAFT
}
type TxPoolConfig struct {
PackSize int `mapstructure:"pack_size"`
BlockTick time.Duration `mapstructure:"block_tick"`
PoolSize int `mapstructure:"pool_size"`
type MempoolConfig struct {
BatchSize uint64 `mapstructure:"batch_size"`
PoolSize uint64 `mapstructure:"pool_size"`
TxSliceSize uint64 `mapstructure:"tx_slice_size"`
BatchTick time.Duration `mapstructure:"batch_tick"`
FetchTimeout time.Duration `mapstructure:"fetch_timeout"`
TxSliceTimeout time.Duration `mapstructure:"tx_slice_timeout"`
}
type RAFT struct {
@ -27,29 +31,21 @@ type RAFT struct {
CheckQuorum bool `mapstructure:"check_quorum"`
PreVote bool `mapstructure:"pre_vote"`
DisableProposalForwarding bool `mapstructure:"disable_proposal_forwarding"`
TxPoolConfig TxPoolConfig `mapstructure:"tx_pool"`
MempoolConfig MempoolConfig `mapstructure:"mempool"`
}
func defaultRaftConfig() raft.Config {
return raft.Config{
ElectionTick: 10, //ElectionTick is the number of Node.Tick invocations that must pass between elections.(s)
HeartbeatTick: 1, //HeartbeatTick is the number of Node.Tick invocations that must pass between heartbeats.(s)
MaxSizePerMsg: 1024 * 1024, //1024*1024, MaxSizePerMsg limits the max size of each append message.
MaxInflightMsgs: 500, //MaxInflightMsgs limits the max number of in-flight append messages during optimistic replication phase.
ElectionTick: 10, // ElectionTick is the number of Node.Tick invocations that must pass between elections.(s)
HeartbeatTick: 1, // HeartbeatTick is the number of Node.Tick invocations that must pass between heartbeats.(s)
MaxSizePerMsg: 1024 * 1024, // 1024*1024, MaxSizePerMsg limits the max size of each append message.
MaxInflightMsgs: 500, // MaxInflightMsgs limits the max number of in-flight append messages during optimistic replication phase.
PreVote: true, // PreVote prevents reconnected node from disturbing network.
CheckQuorum: true, // Leader steps down when quorum is not active for an electionTimeout.
DisableProposalForwarding: true, // This prevents blocks from being accidentally proposed by followers
}
}
func defaultTxPoolConfig() TxPoolConfig {
return TxPoolConfig{
PackSize: 500, // How many transactions should the primary pack.
BlockTick: 500 * time.Millisecond, //Block packaging time period.
PoolSize: 50000, //How many transactions could the txPool stores in total.
}
}
func generateRaftConfig(id uint64, repoRoot string, logger logrus.FieldLogger, ram MemoryStorage) (*raft.Config, error) {
readConfig, err := readConfig(repoRoot)
if err != nil {
@ -77,22 +73,19 @@ func generateRaftConfig(id uint64, repoRoot string, logger logrus.FieldLogger, r
return &defaultConfig, nil
}
func generateTxPoolConfig(repoRoot string) (*TxPoolConfig, error) {
func generateMempoolConfig(repoRoot string) (*MempoolConfig, error) {
readConfig, err := readConfig(repoRoot)
if err != nil {
return &TxPoolConfig{}, nil
return nil, err
}
defaultTxPoolConfig := defaultTxPoolConfig()
if readConfig.RAFT.TxPoolConfig.BlockTick > 0 {
defaultTxPoolConfig.BlockTick = readConfig.RAFT.TxPoolConfig.BlockTick
}
if readConfig.RAFT.TxPoolConfig.PackSize > 0 {
defaultTxPoolConfig.PackSize = readConfig.RAFT.TxPoolConfig.PackSize
}
if readConfig.RAFT.TxPoolConfig.PoolSize > 0 {
defaultTxPoolConfig.PoolSize = readConfig.RAFT.TxPoolConfig.PoolSize
}
return &defaultTxPoolConfig, nil
mempoolConf := &MempoolConfig{}
mempoolConf.BatchSize = readConfig.RAFT.MempoolConfig.BatchSize
mempoolConf.PoolSize = readConfig.RAFT.MempoolConfig.PoolSize
mempoolConf.TxSliceSize = readConfig.RAFT.MempoolConfig.TxSliceSize
mempoolConf.BatchTick = readConfig.RAFT.MempoolConfig.BatchTick
mempoolConf.FetchTimeout = readConfig.RAFT.MempoolConfig.FetchTimeout
mempoolConf.TxSliceTimeout = readConfig.RAFT.MempoolConfig.TxSliceTimeout
return mempoolConf, nil
}
func readConfig(repoRoot string) (*RAFTConfig, error) {

View File

@ -5,19 +5,21 @@ import (
"encoding/binary"
"fmt"
"path/filepath"
"sort"
"sync"
"sync/atomic"
"time"
"github.com/coreos/etcd/raft"
"github.com/coreos/etcd/raft/raftpb"
"github.com/meshplus/bitxhub-kit/types"
"github.com/meshplus/bitxhub-model/pb"
"github.com/meshplus/bitxhub/pkg/order"
raftproto "github.com/meshplus/bitxhub/pkg/order/etcdraft/proto"
"github.com/meshplus/bitxhub/pkg/order/etcdraft/txpool"
"github.com/meshplus/bitxhub/pkg/order/mempool"
"github.com/meshplus/bitxhub/pkg/peermgr"
"github.com/meshplus/bitxhub/pkg/storage"
"github.com/coreos/etcd/raft"
"github.com/coreos/etcd/raft/raftpb"
"github.com/gogo/protobuf/sortkeys"
"github.com/sirupsen/logrus"
)
@ -37,11 +39,11 @@ type Node struct {
errorC chan<- error // errors from raft session
raftStorage *RaftStorage // the raft backend storage system
tp *txpool.TxPool // transaction pool
storage storage.Storage // db
mempool mempool.MemPool // transaction pool
repoRoot string //project path
logger logrus.FieldLogger //logger
repoRoot string // project path
logger logrus.FieldLogger // logger
blockAppliedIndex sync.Map // mapping of block height and apply index in raft log
appliedIndex uint64 // current apply index in raft log
@ -50,7 +52,7 @@ type Node struct {
lastIndex uint64 // last apply index in raft log
readyPool *sync.Pool // ready pool, avoiding memory growth fast
readyCache sync.Map //ready cache
readyCache sync.Map // ready cache
ctx context.Context // context
haltC chan struct{} // exit signal
}
@ -73,34 +75,42 @@ func NewNode(opts ...order.Option) (order.Order, error) {
return nil, err
}
//generate raft peers
// generate raft peers
peers, err := GenerateRaftPeers(config)
if err != nil {
return nil, fmt.Errorf("generate raft peers: %w", err)
}
//generate txpool config
tpc, err := generateTxPoolConfig(repoRoot)
batchC := make(chan *raftproto.Ready)
memConfig, err := generateMempoolConfig(repoRoot)
if err != nil {
return nil, fmt.Errorf("generate raft txpool config: %w", err)
}
txPoolConfig := &txpool.Config{
PackSize: tpc.PackSize,
BlockTick: tpc.BlockTick,
PoolSize: tpc.PoolSize,
mempoolConf := &mempool.Config{
ID: config.ID,
PeerMgr: config.PeerMgr,
ChainHeight: config.Applied,
GetTransactionFunc: config.GetTransactionFunc,
BatchSize: memConfig.BatchSize,
BatchTick: memConfig.BatchTick,
PoolSize: memConfig.PoolSize,
TxSliceSize: memConfig.TxSliceSize,
FetchTimeout: memConfig.FetchTimeout,
TxSliceTimeout: memConfig.TxSliceTimeout,
}
txPool, proposeC := txpool.New(config, dbStorage, txPoolConfig)
mempoolInst := mempool.NewMempool(mempoolConf, dbStorage, batchC)
readyPool := &sync.Pool{New: func() interface{} {
return new(raftproto.Ready)
}}
return &Node{
id: config.ID,
proposeC: proposeC,
proposeC: batchC,
confChangeC: make(chan raftpb.ConfChange),
commitC: make(chan *pb.Block, 1024),
errorC: make(chan<- error),
tp: txPool,
repoRoot: repoRoot,
snapCount: defaultSnapshotCount,
peerMgr: config.PeerMgr,
@ -110,12 +120,13 @@ func NewNode(opts ...order.Option) (order.Order, error) {
raftStorage: raftStorage,
readyPool: readyPool,
ctx: context.Background(),
mempool: mempoolInst,
}, nil
}
//Start or restart raft node
// Start or restart raft node
func (n *Node) Start() error {
n.blockAppliedIndex.Store(n.tp.GetHeight(), n.loadAppliedIndex())
n.blockAppliedIndex.Store(n.mempool.GetChainHeight(), n.loadAppliedIndex())
rc, err := generateRaftConfig(n.id, n.repoRoot, n.logger, n.raftStorage.ram)
if err != nil {
return fmt.Errorf("generate raft config: %w", err)
@ -127,31 +138,25 @@ func (n *Node) Start() error {
}
go n.run()
n.mempool.Start()
n.logger.Info("Consensus module started")
return nil
}
//Stop the raft node
// Stop the raft node
func (n *Node) Stop() {
n.tp.CheckExecute(false)
n.mempool.Stop()
n.node.Stop()
n.logger.Infof("Consensus stopped")
}
//Add the transaction into txpool and broadcast it to other nodes
// Add the transaction into txpool and broadcast it to other nodes
func (n *Node) Prepare(tx *pb.Transaction) error {
if !n.Ready() {
return nil
}
if err := n.tp.AddPendingTx(tx, false); err != nil {
return err
}
if err := n.tp.Broadcast(tx); err != nil {
return err
}
return nil
return n.mempool.RecvTransaction(tx)
}
func (n *Node) Commit() chan *pb.Block {
@ -170,20 +175,20 @@ func (n *Node) ReportState(height uint64, hash types.Hash) {
n.logger.Errorf("can not found appliedIndex:", height)
return
}
//block already persisted, record the apply index in db
// block already persisted, record the apply index in db
n.writeAppliedIndex(appliedIndex.(uint64))
n.blockAppliedIndex.Delete(height)
n.tp.BuildReqLookUp() //store bloom filter
ready, ok := n.readyCache.Load(height)
// TODO: delete readyCache
readyBytes, ok := n.readyCache.Load(height)
if !ok {
n.logger.Errorf("can not found ready:", height)
return
}
hashes := ready.(*raftproto.Ready).TxHashes
// remove redundant tx
n.tp.BatchDelete(hashes)
ready := readyBytes.(*raftproto.Ready)
// clean related mempool info
n.mempool.CommitTransactions(ready)
n.readyCache.Delete(height)
}
@ -204,47 +209,32 @@ func (n *Node) Step(ctx context.Context, msg []byte) error {
return err
}
return n.node.Step(ctx, *msg)
case raftproto.RaftMessage_GET_TX:
hash := types.Hash{}
if err := hash.Unmarshal(rm.Data); err != nil {
fetchTxnRequest := &mempool.FetchTxnRequest{}
if err := fetchTxnRequest.Unmarshal(rm.Data); err != nil {
return err
}
tx, ok := n.tp.GetTx(hash, true)
if !ok {
return nil
}
v, err := tx.Marshal()
if err != nil {
return err
}
txAck := &raftproto.RaftMessage{
Type: raftproto.RaftMessage_GET_TX_ACK,
Data: v,
}
txAckData, err := txAck.Marshal()
if err != nil {
return err
}
m := &pb.Message{
Type: pb.Message_CONSENSUS,
Data: txAckData,
}
return n.peerMgr.AsyncSend(rm.FromId, m)
n.mempool.RecvFetchTxnRequest(fetchTxnRequest)
case raftproto.RaftMessage_GET_TX_ACK:
tx := &pb.Transaction{}
if err := tx.Unmarshal(rm.Data); err != nil {
fetchTxnResponse := &mempool.FetchTxnResponse{}
if err := fetchTxnResponse.Unmarshal(rm.Data); err != nil {
return err
}
return n.tp.AddPendingTx(tx, true)
n.mempool.RecvFetchTxnResponse(fetchTxnResponse)
case raftproto.RaftMessage_BROADCAST_TX:
tx := &pb.Transaction{}
if err := tx.Unmarshal(rm.Data); err != nil {
txSlice := &mempool.TxSlice{}
if err := txSlice.Unmarshal(rm.Data); err != nil {
return err
}
return n.tp.AddPendingTx(tx, false)
n.mempool.RecvForwardTxs(txSlice)
default:
return fmt.Errorf("unexpected raft message received")
}
return nil
}
func (n *Node) IsLeader() bool {
@ -281,14 +271,14 @@ func (n *Node) run() {
n.proposeC = nil
} else {
if !n.IsLeader() {
n.tp.CheckExecute(false)
n.logger.Warn("Follower node can't propose a proposal")
n.mempool.UpdateLeader(n.leader)
continue
}
data, err := ready.Marshal()
if err != nil {
n.logger.Panic(err)
}
n.tp.BatchStore(ready.TxHashes)
if err := n.node.Propose(n.ctx, data); err != nil {
n.logger.Panic("Failed to propose block [%d] to raft: %s", ready.Height, err)
}
@ -332,16 +322,16 @@ func (n *Node) run() {
}
}
if rd.SoftState != nil {
n.leader = rd.SoftState.Lead
n.tp.CheckExecute(n.IsLeader())
newLeader := atomic.LoadUint64(&rd.SoftState.Lead)
n.leader = newLeader
n.mempool.UpdateLeader(newLeader)
}
// 3: AsyncSend all Messages to the nodes named in the To field.
go n.send(rd.Messages)
n.maybeTriggerSnapshot()
// 4: Call Node.Advance() to signal readiness for the next batch of
// updates.
// 4: Call Node.Advance() to signal readiness for the next batch of updates.
n.node.Advance()
case <-n.ctx.Done():
n.Stop()
@ -454,57 +444,61 @@ func (n *Node) publishEntries(ents []raftpb.Entry) bool {
return true
}
//mint the block
// mint the block
func (n *Node) mint(ready *raftproto.Ready) {
n.logger.WithFields(logrus.Fields{
"height": ready.Height,
"count": len(ready.TxHashes),
}).Debugln("block will be generated")
//follower node update the block height
if n.tp.GetHeight() == ready.Height-1 {
n.tp.UpdateHeight()
}
loseTxs := make([]types.Hash, 0)
txs := make([]*pb.Transaction, 0, len(ready.TxHashes))
for _, hash := range ready.TxHashes {
_, ok := n.tp.GetTx(hash, false)
if !ok {
loseTxs = append(loseTxs, hash)
// follower node update the block height
expectHeight := n.mempool.GetChainHeight()
if !n.IsLeader() {
if expectHeight != ready.Height-1 {
n.logger.Warningf("Receive batch %d, but not match, expect height: %d", ready.Height, expectHeight+1)
return
}
n.mempool.IncreaseChainHeight()
}
//handle missing txs
if len(loseTxs) != 0 {
var wg sync.WaitGroup
wg.Add(len(loseTxs))
for _, hash := range loseTxs {
go func(hash types.Hash) {
defer wg.Done()
n.tp.FetchTx(hash, ready.Height)
}(hash)
}
wg.Wait()
missingTxsHash, txList := n.mempool.GetBlock(ready)
// handle missing txs
if len(missingTxsHash) != 0 {
waitLostTxnC := make(chan bool)
lostTxnEvent := &mempool.LocalMissingTxnEvent{
Height: ready.Height,
WaitC: waitLostTxnC,
MissingTxnHashList: missingTxsHash,
}
for _, hash := range ready.TxHashes {
tx, _ := n.tp.GetTx(hash, false)
txs = append(txs, tx)
// NOTE!!! block until finishing fetching the missing txs
n.mempool.FetchTxn(lostTxnEvent)
select {
case isSuccess := <-waitLostTxnC:
if !isSuccess {
n.logger.Error("Fetch missing txn failed")
return
}
n.logger.Debug("Fetch missing transactions success")
case <-time.After(mempool.DefaultFetchTxnTimeout):
// TODO: add fetch request resend timer
n.logger.Debugf("Fetch missing transactions timeout, block height: %d", ready.Height)
return
}
if missingTxsHash, txList = n.mempool.GetBlock(ready); len(missingTxsHash) != 0 {
n.logger.Error("Still missing transaction")
return
}
}
n.tp.RemoveTxs(ready.TxHashes, n.IsLeader())
block := &pb.Block{
BlockHeader: &pb.BlockHeader{
Version: []byte("1.0.0"),
Number: ready.Height,
Timestamp: time.Now().UnixNano(),
},
Transactions: txs,
}
n.logger.WithFields(logrus.Fields{
"txpool_size": n.tp.PoolSize(),
}).Debugln("current tx pool size")
if len(loseTxs) == len(ready.TxHashes) {
block.Extra = []byte("updateState")
Transactions: txList,
}
n.readyCache.Store(ready.Height, ready)
n.commitC <- block
@ -555,12 +549,17 @@ func (n *Node) maybeTriggerSnapshot() {
func GenerateRaftPeers(config *order.Config) ([]raft.Peer, error) {
nodes := config.Nodes
peers := make([]raft.Peer, 0, len(nodes))
for id, node := range nodes {
peers = append(peers, raft.Peer{ID: id, Context: node.Bytes()})
// sort by node id
idSlice := make([]uint64, len(nodes))
for id := range nodes {
idSlice = append(idSlice, id)
}
sortkeys.Uint64s(idSlice)
for _, id := range idSlice {
addr := nodes[id]
peers = append(peers, raft.Peer{ID: id, Context: addr.Bytes()})
}
sort.Slice(peers, func(i, j int) bool {
return peers[i].ID < peers[j].ID
})
return peers, nil
}

View File

@ -1,411 +0,0 @@
package txpool
import (
"container/list"
"context"
"fmt"
"sync"
"sync/atomic"
"time"
"github.com/meshplus/bitxhub/pkg/order"
"github.com/Rican7/retry"
"github.com/Rican7/retry/strategy"
"github.com/meshplus/bitxhub-kit/types"
"github.com/meshplus/bitxhub-model/pb"
raftproto "github.com/meshplus/bitxhub/pkg/order/etcdraft/proto"
"github.com/meshplus/bitxhub/pkg/peermgr"
"github.com/meshplus/bitxhub/pkg/storage"
"github.com/sirupsen/logrus"
)
type getTransactionFunc func(hash types.Hash) (*pb.Transaction, error)
type TxPool struct {
sync.RWMutex //lock for the pendingTxs
nodeId uint64 //node id
height uint64 //current block height
isExecuting bool //only raft leader can execute
pendingTxs *list.List //pending tx pool
presenceTxs sync.Map //tx cache
ackTxs map[types.Hash]bool //ack tx means get tx by pb.RaftMessage_GET_TX_ACK
readyC chan *raftproto.Ready //ready channel, receive by raft Propose channel
peerMgr peermgr.PeerManager //network manager
logger logrus.FieldLogger //logger
reqLookUp *order.ReqLookUp //bloom filter
storage storage.Storage //storage pending tx
config *Config //tx pool config
ctx context.Context //context
cancel context.CancelFunc //stop Execute
getTransactionFunc getTransactionFunc //get transaction by ledger
}
type Config struct {
PackSize int //how many transactions should the primary pack
BlockTick time.Duration //block packaging time period
PoolSize int //how many transactions could the txPool stores in total
SetSize int //how many transactions should the node broadcast at once
}
//New txpool
func New(config *order.Config, storage storage.Storage, txPoolConfig *Config) (*TxPool, chan *raftproto.Ready) {
readyC := make(chan *raftproto.Ready)
reqLookUp, err := order.NewReqLookUp(storage, config.Logger)
if err != nil {
return nil, nil
}
ctx, cancel := context.WithCancel(context.Background())
return &TxPool{
nodeId: config.ID,
peerMgr: config.PeerMgr,
logger: config.Logger,
readyC: readyC,
height: config.Applied,
pendingTxs: list.New(),
ackTxs: make(map[types.Hash]bool),
reqLookUp: reqLookUp,
storage: storage,
getTransactionFunc: config.GetTransactionFunc,
config: txPoolConfig,
ctx: ctx,
cancel: cancel,
}, readyC
}
//AddPendingTx add pending transaction into txpool
func (tp *TxPool) AddPendingTx(tx *pb.Transaction, isAckTx bool) error {
if tp.PoolSize() >= tp.config.PoolSize {
tp.logger.Debugf("Tx pool size: %d is full", tp.PoolSize())
return nil
}
hash := tx.TransactionHash
if e := tp.get(hash); e != nil {
return nil
}
//look up by bloom filter
if ok := tp.reqLookUp.LookUp(hash.Bytes()); ok {
//find the tx again by ledger if hash in bloom filter
if tx, _ := tp.getTransactionFunc(hash); tx != nil {
return nil
}
}
//add pending tx
tp.pushBack(hash, tx, isAckTx)
//immediately pack if it is greater than the total amount of block transactions
if tp.isExecuting {
tp.packFullBlock()
}
return nil
}
//packFullBlock immediately pack if it is greater than the total amount of block transactions
func (tp *TxPool) packFullBlock() {
tp.Lock()
defer tp.Unlock()
l := tp.pendingTxs.Len()
if l < tp.config.PackSize {
return
}
if r := tp.ready(tp.config.PackSize); r != nil {
tp.readyC <- r
}
}
//Current txpool's size
func (tp *TxPool) PoolSize() int {
tp.RLock()
defer tp.RUnlock()
return tp.pendingTxs.Len()
}
//RemoveTxs remove txs from the cache
func (tp *TxPool) RemoveTxs(hashes []types.Hash, isLeader bool) {
tp.Lock()
defer tp.Unlock()
for _, hash := range hashes {
if !isLeader {
if e := tp.get(hash); e != nil {
tp.pendingTxs.Remove(e)
}
}
tp.presenceTxs.Delete(hash)
}
}
//BuildReqLookUp store the bloom filter
func (tp *TxPool) BuildReqLookUp() {
if err := tp.reqLookUp.Build(); err != nil {
tp.logger.Errorf("bloom filter persistence error", err)
}
}
//CheckExecute checks the txpool status, only leader node can run Execute()
func (tp *TxPool) CheckExecute(isLeader bool) {
if isLeader {
if !tp.isExecuting {
go tp.execute()
}
} else {
if tp.isExecuting {
tp.cancel()
}
}
}
//execute init
func (tp *TxPool) executeInit() {
tp.Lock()
defer tp.Unlock()
tp.isExecuting = true
tp.pendingTxs.Init()
tp.presenceTxs = sync.Map{}
tp.logger.Infoln("start txpool execute")
}
//execute schedule to collect txs to the ready channel
func (tp *TxPool) execute() {
tp.executeInit()
ticker := time.NewTicker(tp.config.BlockTick)
defer ticker.Stop()
for {
select {
case <-ticker.C:
tp.periodPackBlock()
case <-tp.ctx.Done():
tp.isExecuting = false
tp.logger.Infoln("done txpool execute")
return
}
}
}
func (tp *TxPool) periodPackBlock() {
tp.Lock()
defer tp.Unlock()
l := tp.pendingTxs.Len()
if l == 0 {
return
}
var size int
if l > tp.config.PackSize {
size = tp.config.PackSize
} else {
size = l
}
if r := tp.ready(size); r != nil {
tp.readyC <- r
}
}
//ready pack the block
func (tp *TxPool) ready(size int) *raftproto.Ready {
hashes := make([]types.Hash, 0, size)
for i := 0; i < size; i++ {
front := tp.pendingTxs.Front()
tx := front.Value.(*pb.Transaction)
hash := tx.TransactionHash
tp.pendingTxs.Remove(front)
if _, ok := tp.ackTxs[hash]; ok {
delete(tp.ackTxs, hash)
continue
}
hashes = append(hashes, hash)
}
if len(hashes) == 0 {
return nil
}
height := tp.UpdateHeight()
return &raftproto.Ready{
TxHashes: hashes,
Height: height,
}
}
//UpdateHeight add the block height
func (tp *TxPool) UpdateHeight() uint64 {
return atomic.AddUint64(&tp.height, 1)
}
//GetHeight get current block height
func (tp *TxPool) GetHeight() uint64 {
return atomic.LoadUint64(&tp.height)
}
//GetTx get the transaction by txpool or ledger
func (tp *TxPool) GetTx(hash types.Hash, findByStore bool) (*pb.Transaction, bool) {
if e := tp.get(hash); e != nil {
return e.Value.(*pb.Transaction), true
}
if findByStore {
// find by txpool store
tx, ok := tp.load(hash)
if ok {
return tx, true
}
// find by ledger
tx, err := tp.getTransactionFunc(hash)
if err != nil {
return nil, false
}
return tx, true
}
return nil, false
}
//Broadcast the new transaction to other nodes
func (tp *TxPool) Broadcast(tx *pb.Transaction) error {
data, err := tx.Marshal()
if err != nil {
return err
}
rm := &raftproto.RaftMessage{
Type: raftproto.RaftMessage_BROADCAST_TX,
Data: data,
}
cmData, err := rm.Marshal()
if err != nil {
return err
}
msg := &pb.Message{
Type: pb.Message_CONSENSUS,
Data: cmData,
}
for id := range tp.peerMgr.Peers() {
if id == tp.nodeId {
continue
}
if err := tp.peerMgr.AsyncSend(id, msg); err != nil {
tp.logger.Debugf("send tx to:%d %s", id, err.Error())
continue
}
}
return nil
}
// Fetch tx by local txpool or network
func (tp *TxPool) FetchTx(hash types.Hash, height uint64) *pb.Transaction {
if tx, ok := tp.GetTx(hash, false); ok {
return tx
}
raftMessage := &raftproto.RaftMessage{
Type: raftproto.RaftMessage_GET_TX,
FromId: tp.nodeId,
Data: hash.Bytes(),
}
rmData, err := raftMessage.Marshal()
if err != nil {
return nil
}
m := &pb.Message{
Type: pb.Message_CONSENSUS,
Data: rmData,
}
asyncGet := func() (tx *pb.Transaction, err error) {
if tx, ok := tp.GetTx(hash, false); ok {
return tx, nil
}
if err := tp.peerMgr.Broadcast(m); err != nil {
tp.logger.Debugln(err)
}
return nil, fmt.Errorf("can't get tx: %s, block_height:%d", hash.String(), height)
}
var tx *pb.Transaction
if err := retry.Retry(func(attempt uint) (err error) {
tx, err = asyncGet()
if err != nil {
//retry times > 2
if attempt > 2 {
tp.logger.Debugln(err)
}
return err
}
return nil
}, strategy.Wait(50*time.Millisecond)); err != nil {
tp.logger.Errorln(err)
}
return tx
}
func (tp *TxPool) get(key types.Hash) *list.Element {
e, ok := tp.presenceTxs.Load(key)
if ok {
return e.(*list.Element)
}
return nil
}
func (tp *TxPool) pushBack(key types.Hash, value interface{}, isAckTx bool) *list.Element {
tp.Lock()
defer tp.Unlock()
if e := tp.get(key); e != nil {
return nil
}
if isAckTx {
tp.ackTxs[key] = true
}
e := tp.pendingTxs.PushBack(value)
tp.presenceTxs.Store(key, e)
return e
}
func compositeKey(value interface{}) []byte {
var prefix = []byte("tx-")
return append(prefix, []byte(fmt.Sprintf("%v", value))...)
}
func (tp *TxPool) store(tx *pb.Transaction) {
txKey := compositeKey(tx.TransactionHash.Bytes())
txData, _ := tx.Marshal()
tp.storage.Put(txKey, txData)
}
func (tp *TxPool) load(hash types.Hash) (*pb.Transaction, bool) {
txKey := compositeKey(hash.Bytes())
txData := tp.storage.Get(txKey)
if txData == nil {
return nil, false
}
var tx pb.Transaction
if err := tx.Unmarshal(txData); err != nil {
tp.logger.Error(err)
return nil, false
}
return &tx, true
}
//BatchStore batch store txs
func (tp *TxPool) BatchStore(hashes []types.Hash) {
batch := tp.storage.NewBatch()
for _, hash := range hashes {
e := tp.get(hash)
if e == nil {
tp.logger.Debugln("BatchStore not found tx:", hash.String())
continue
}
tx := e.Value.(*pb.Transaction)
txKey := compositeKey(hash.Bytes())
txData, _ := tx.Marshal()
batch.Put(txKey, txData)
}
batch.Commit()
}
//BatchDelete batch delete txs
func (tp *TxPool) BatchDelete(hashes []types.Hash) {
batch := tp.storage.NewBatch()
for _, hash := range hashes {
txKey := compositeKey(hash.Bytes())
batch.Delete(txKey)
}
batch.Commit()
}

View File

@ -0,0 +1,86 @@
package mempool
import (
"github.com/meshplus/bitxhub-model/pb"
"github.com/google/btree"
)
// the key of priorityIndex and parkingLotIndex.
type orderedIndexKey struct {
account string
nonce uint64
}
// TODO (YH): add expiration time order
// Less should guarantee item can be cast into orderedIndexKey.
func (oik *orderedIndexKey) Less(than btree.Item) bool {
other := than.(*orderedIndexKey)
if oik.account != other.account {
return oik.account < other.account
}
return oik.nonce < other.nonce
}
type sortedNonceKey struct {
nonce uint64
}
// Less should guarantee item can be cast into sortedNonceKey.
func (snk *sortedNonceKey) Less(item btree.Item) bool {
dst, _ := item.(*sortedNonceKey)
return snk.nonce < dst.nonce
}
func makeOrderedIndexKey(account string, tx *pb.Transaction) *orderedIndexKey {
return &orderedIndexKey{
account: account,
nonce: uint64(tx.Nonce),
}
}
func makeSortedNonceKeyKey(nonce uint64) *sortedNonceKey {
return &sortedNonceKey{
nonce: nonce,
}
}
type btreeIndex struct {
data *btree.BTree
}
func newBtreeIndex() *btreeIndex {
return &btreeIndex{
data: btree.New(btreeDegree),
}
}
func (idx *btreeIndex) insert(tx *pb.Transaction) {
idx.data.ReplaceOrInsert(makeSortedNonceKeyKey(uint64(tx.Nonce)))
}
func (idx *btreeIndex) remove(txs map[string][]*pb.Transaction) {
for _, list := range txs {
for _, tx := range list {
idx.data.Delete(makeSortedNonceKeyKey(uint64(tx.Nonce)))
}
}
}
func (idx *btreeIndex) insertByOrderedQueueKey(account string, tx *pb.Transaction) {
idx.data.ReplaceOrInsert(makeOrderedIndexKey(account, tx))
}
func (idx *btreeIndex) removeByOrderedQueueKey(txs map[string][]*pb.Transaction) {
for account, list := range txs {
for _, tx := range list {
idx.data.Delete(makeOrderedIndexKey(account, tx))
}
}
}
// Size returns the size of the index
func (idx *btreeIndex) size() uint64 {
return uint64(idx.data.Len())
}

View File

@ -0,0 +1,28 @@
package mempool
import (
"testing"
"github.com/meshplus/bitxhub-model/pb"
"github.com/stretchr/testify/assert"
)
func TestLess(t *testing.T) {
ast := assert.New(t)
tx := &pb.Transaction{
Nonce: 1,
}
orderedIndexKey := makeOrderedIndexKey("account", tx)
tx.Nonce = 2
orderedIndexKey1 := makeOrderedIndexKey("bitxhub", tx)
isLess := orderedIndexKey.Less(orderedIndexKey1)
if "bitxhub" < "dccount" {
t.Log("yes")
}
ast.Equal(true, isLess)
}

View File

@ -0,0 +1,121 @@
package mempool
import (
"errors"
"github.com/meshplus/bitxhub-model/pb"
raftproto "github.com/meshplus/bitxhub/pkg/order/etcdraft/proto"
"github.com/meshplus/bitxhub/pkg/storage"
)
var _ MemPool = (*mempoolImpl)(nil)
//go:generate mockgen -destination mock_mempool/mock_mempool.go -package mock_mempool -source types.go
type MemPool interface {
// Start starts mempool service.
Start() error
// Stop stops mempool service.
Stop()
// RecvTransaction receives transaction from API.
RecvTransaction(tx *pb.Transaction) error
// RecvForwardTxs receives transactions from other vp nodes.
RecvForwardTxs(txSlice *TxSlice)
UpdateLeader(uint64)
FetchTxn(lostTxnEvent *LocalMissingTxnEvent)
RecvFetchTxnRequest(fetchTxnRequest *FetchTxnRequest)
RecvFetchTxnResponse(fetchTxnResponse *FetchTxnResponse)
GetChainHeight() uint64
IncreaseChainHeight()
GetBlock(ready *raftproto.Ready) (map[uint64]string, []*pb.Transaction)
// Remove committed transactions from mempool
CommitTransactions(ready *raftproto.Ready)
}
// NewMempool return the mempool instance.
func NewMempool(config *Config, storage storage.Storage, batchC chan *raftproto.Ready) MemPool {
return newMempoolImpl(config, storage, batchC)
}
// RecvTransaction receives transaction from api and other vp nodes.
func (mpi *mempoolImpl) RecvTransaction(tx *pb.Transaction) error {
if mpi.txCache.IsFull() && mpi.poolIsFull() {
return errors.New("transaction cache and pool are full, we will drop this transaction")
}
mpi.txCache.recvTxC <- tx
return nil
}
// RecvTransaction receives transaction from api and other vp nodes.
func (mpi *mempoolImpl) RecvForwardTxs(txSlice *TxSlice) {
mpi.subscribe.txForwardC <- txSlice
}
// UpdateLeader updates the
func (mpi *mempoolImpl) UpdateLeader(newLeader uint64) {
mpi.subscribe.updateLeaderC <- newLeader
}
// FetchTxn sends the fetch request.
func (mpi *mempoolImpl) FetchTxn(lostTxnEvent *LocalMissingTxnEvent) {
mpi.subscribe.localMissingTxnEvent <- lostTxnEvent
}
func (mpi *mempoolImpl) RecvFetchTxnRequest(fetchTxnRequest *FetchTxnRequest) {
mpi.subscribe.fetchTxnRequestC <- fetchTxnRequest
}
func (mpi *mempoolImpl) RecvFetchTxnResponse(fetchTxnResponse *FetchTxnResponse) {
mpi.subscribe.fetchTxnResponseC <- fetchTxnResponse
}
// Start starts the mempool service.
func (mpi *mempoolImpl) Start() error {
mpi.logger.Debug("Start Listen mempool events")
go mpi.listenEvent()
go mpi.txCache.listenEvent()
return nil
}
func (mpi *mempoolImpl) Stop() {
if mpi.close != nil {
close(mpi.close)
}
if mpi.txCache.close != nil {
close(mpi.txCache.close)
}
}
func (mpi *mempoolImpl) GetChainHeight() uint64 {
return mpi.getBatchSeqNo()
}
func (mpi *mempoolImpl) IncreaseChainHeight() {
mpi.increaseBatchSeqNo()
}
func (mpi *mempoolImpl) GetBlock(ready *raftproto.Ready) (missingTxnHashList map[uint64]string, txList []*pb.Transaction) {
waitC := make(chan *mempoolBatch)
getBlock := &constructBatchEvent{
ready: ready,
result: waitC,
}
mpi.subscribe.getBlockC <- getBlock
// block until finishing constructing related batch
batch := <-waitC
return batch.missingTxnHashList, batch.txList
}
func (mpi *mempoolImpl) CommitTransactions(ready *raftproto.Ready) {
mpi.subscribe.commitTxnC <- ready
}

View File

@ -0,0 +1,577 @@
package mempool
import (
"errors"
"fmt"
"sync"
"sync/atomic"
"time"
"github.com/meshplus/bitxhub-kit/crypto"
"github.com/meshplus/bitxhub-kit/crypto/asym"
"github.com/meshplus/bitxhub-kit/types"
"github.com/meshplus/bitxhub-model/pb"
"github.com/meshplus/bitxhub/internal/loggers"
raftproto "github.com/meshplus/bitxhub/pkg/order/etcdraft/proto"
"github.com/meshplus/bitxhub/pkg/peermgr"
"github.com/meshplus/bitxhub/pkg/storage"
"github.com/google/btree"
"github.com/sirupsen/logrus"
)
type mempoolImpl struct {
localID uint64
leader uint64 // leader node id
batchSize uint64
batchSeqNo uint64 // track the sequence number of block
logger logrus.FieldLogger
batchC chan *raftproto.Ready
close chan bool
txStore *transactionStore // store all transactions info
txCache *TxCache // cache the transactions received from api
subscribe *subscribeEvent
storage storage.Storage
peerMgr peermgr.PeerManager //network manager
batchTimerMgr *timerManager
ledgerHelper func(hash types.Hash) (*pb.Transaction, error)
}
func newMempoolImpl(config *Config, storage storage.Storage, batchC chan *raftproto.Ready) *mempoolImpl {
mpi := &mempoolImpl{
localID: config.ID,
peerMgr: config.PeerMgr,
batchSeqNo: config.ChainHeight,
ledgerHelper: config.GetTransactionFunc,
logger: loggers.Logger(loggers.Order),
batchC: batchC,
storage: storage,
}
mpi.txStore = newTransactionStore()
mpi.txCache = newTxCache(config.TxSliceTimeout)
mpi.subscribe = newSubscribe()
if config.BatchSize == 0 {
mpi.batchSize = DefaultBatchSize
} else {
mpi.batchSize = config.BatchSize
}
var batchTick time.Duration
if config.BatchTick == 0 {
batchTick = DefaultBatchTick
} else {
batchTick = config.BatchTick
}
mpi.batchTimerMgr = newTimer(batchTick)
return mpi
}
func (mpi *mempoolImpl) listenEvent() {
waitC := make(chan bool)
for {
select {
case <-mpi.close:
mpi.logger.Info("----- Exit listen loop -----")
return
case newLeader := <-mpi.subscribe.updateLeaderC:
if newLeader == mpi.localID {
mpi.logger.Info("----- Become the leader node -----")
}
mpi.leader = newLeader
case txSet := <-mpi.txCache.txSetC:
// 1. send transactions to other peer
data, err := txSet.Marshal()
if err != nil {
mpi.logger.Errorf("Marshal failed, err: %s", err.Error())
return
}
pbMsg := mpi.msgToConsensusPbMsg(data, raftproto.RaftMessage_BROADCAST_TX)
mpi.broadcast(pbMsg)
// 2. process transactions
if err := mpi.processTransactions(txSet.TxList); err != nil {
mpi.logger.Errorf("Process transactions failed, err: %s", err.Error())
}
case txSlice := <-mpi.subscribe.txForwardC:
if err := mpi.processTransactions(txSlice.TxList); err != nil {
mpi.logger.Errorf("Process transactions failed, err: %s", err.Error())
}
case res := <-mpi.subscribe.getBlockC:
result := mpi.getBlock(res.ready)
res.result <- result
case <- mpi.batchTimerMgr.timeoutEventC:
if mpi.isBatchTimerActive() {
mpi.stopBatchTimer(StopReason1)
mpi.logger.Debug("Batch timer expired, try to create a batch")
if mpi.txStore.priorityNonBatchSize > 0 {
ready, err := mpi.generateBlock(true)
if err != nil {
mpi.logger.Errorf("Generator batch failed")
continue
}
mpi.batchC <- ready
} else {
mpi.logger.Debug("The length of priorityIndex is 0, ")
}
}
case commitReady := <-mpi.subscribe.commitTxnC:
gcStartTime := time.Now()
mpi.processCommitTransactions(commitReady)
duration := time.Now().Sub(gcStartTime).Nanoseconds()
mpi.logger.Debugf("GC duration %v", duration)
case lostTxnEvent := <-mpi.subscribe.localMissingTxnEvent:
if err := mpi.sendFetchTxnRequest(lostTxnEvent.Height, lostTxnEvent.MissingTxnHashList); err != nil {
mpi.logger.Errorf("Process fetch txn failed, err: %s", err.Error())
lostTxnEvent.WaitC <- false
} else {
mpi.logger.Debug("Process fetch txn success")
waitC = lostTxnEvent.WaitC
}
case fetchRequest := <-mpi.subscribe.fetchTxnRequestC:
if err := mpi.processFetchTxnRequest(fetchRequest); err != nil {
mpi.logger.Error("Process fetchTxnRequest failed")
}
case fetchRes := <-mpi.subscribe.fetchTxnResponseC:
if err := mpi.processFetchTxnResponse(fetchRes); err != nil {
waitC <- false
continue
}
waitC <- true
}
}
}
func (mpi *mempoolImpl) processTransactions(txs []*pb.Transaction) error {
validTxs := make(map[string][]*pb.Transaction)
for _, tx := range txs {
// check if this tx signature is valid first
ok, _ := asym.Verify(crypto.Secp256k1, tx.Signature, tx.SignHash().Bytes(), tx.From)
if !ok {
return fmt.Errorf("invalid signature")
}
// check the sequence number of tx
// TODO refactor Transaction
txAccount, err := getAccount(tx)
if err != nil {
return fmt.Errorf("get tx account failed, err: %s", err.Error())
}
currentSeqNo := mpi.txStore.nonceCache.getPendingNonce(txAccount)
if tx.Nonce < currentSeqNo {
mpi.logger.Warningf("current sequence number is %d, required %d", tx.Nonce, currentSeqNo+1)
continue
}
// check the existence of hash of this tx
txHash := tx.TransactionHash.Hex()
if txPointer := mpi.txStore.txHashMap[txHash]; txPointer != nil {
mpi.logger.Warningf("Tx %s already received", txHash)
continue
}
_, ok = validTxs[txAccount]
if !ok {
validTxs[txAccount] = make([]*pb.Transaction, 0)
}
validTxs[txAccount] = append(validTxs[txAccount], tx)
}
// Process all the new transaction and merge any errors into the original slice
dirtyAccounts := mpi.txStore.InsertTxs(validTxs)
// send tx to mempool store
mpi.processDirtyAccount(dirtyAccounts)
if mpi.isLeader() {
// start batch timer when this node receives the first transaction set of a batch
if !mpi.isBatchTimerActive() {
mpi.startBatchTimer(StartReason1)
}
// generator batch by block size
if mpi.txStore.priorityNonBatchSize >= mpi.batchSize {
ready, err := mpi.generateBlock(false)
if err != nil {
return errors.New("generator batch fai")
}
// stop batch timer
mpi.stopBatchTimer(StopReason2)
mpi.batchC <- ready
}
}
return nil
}
func (txStore *transactionStore) InsertTxs(txs map[string][]*pb.Transaction) map[string]bool {
dirtyAccounts := make(map[string]bool)
for account, list := range txs {
for _, tx := range list {
txHash := tx.TransactionHash.Hex()
txPointer := &orderedIndexKey{
account: account,
nonce: tx.Nonce,
}
txStore.txHashMap[txHash] = txPointer
list, ok := txStore.allTxs[account]
if !ok {
// if this is new account to send tx, create a new txSortedMap
txStore.allTxs[account] = newTxSortedMap()
}
list = txStore.allTxs[account]
txItem := &txItem{
account: account,
tx: tx,
}
list.items[tx.Nonce] = txItem
list.index.insert(tx)
atomic.AddInt32(&txStore.poolSize, 1)
}
dirtyAccounts[account] = true
}
return dirtyAccounts
}
func (mpi *mempoolImpl) processDirtyAccount(dirtyAccounts map[string]bool) {
for account := range dirtyAccounts {
if list, ok := mpi.txStore.allTxs[account]; ok {
// search for related sequential txs in allTxs
// and add these txs into priorityIndex and parkingLotIndex
pendingNonce := mpi.txStore.nonceCache.getPendingNonce(account)
readyTxs, nonReadyTxs, nextDemandNonce := list.filterReady(pendingNonce)
mpi.txStore.nonceCache.setPendingNonce(account, nextDemandNonce)
// inset ready txs into priorityIndex.
for _, tx := range readyTxs {
mpi.txStore.priorityIndex.insertByOrderedQueueKey(account, tx)
}
mpi.txStore.priorityNonBatchSize = mpi.txStore.priorityNonBatchSize + uint64(len(readyTxs))
// inset non-ready txs into parkingLotIndex.
for _, tx := range nonReadyTxs {
mpi.txStore.parkingLotIndex.insertByOrderedQueueKey(account, tx)
}
}
}
}
// getBlock fetches next block of transactions for consensus,
// batchedTx are all txs sent to consensus but were not committed yet, mempool should filter out such txs.
func (mpi *mempoolImpl) generateBlock(isTimeout bool) (*raftproto.Ready, error) {
result := make([]orderedIndexKey, 0, mpi.batchSize)
// txs has lower nonce will be observed first in priority index iterator.
mpi.logger.Infof("Length of priority index: %v", mpi.txStore.priorityIndex.data.Len())
mpi.txStore.priorityIndex.data.Ascend(func(a btree.Item) bool {
tx := a.(*orderedIndexKey)
// if tx has existed in bathedTxs,
if _, ok := mpi.txStore.batchedTxs[orderedIndexKey{tx.account, tx.nonce}]; ok {
return true
}
txSeq := tx.nonce
commitNonce := mpi.txStore.nonceCache.getCommitNonce(tx.account)
var seenPrevious bool
if txSeq >= 1 {
_, seenPrevious = mpi.txStore.batchedTxs[orderedIndexKey{account: tx.account, nonce: txSeq - 1}]
}
// include transaction if it's "next" for given account or
// we've already sent its ancestor to Consensus
if seenPrevious || (txSeq == commitNonce) {
ptr := orderedIndexKey{account: tx.account, nonce: tx.nonce}
mpi.txStore.batchedTxs[ptr] = true
result = append(result, ptr)
// batched by batch size or timeout
condition1 := uint64(len(result)) == mpi.batchSize
condition2 := isTimeout && uint64(len(result)) == mpi.txStore.priorityNonBatchSize
if condition1 || condition2 {
return false
}
}
return true
})
// convert transaction pointers to real values
hashList := make([]types.Hash, len(result))
txList := make([]*pb.Transaction, len(result))
for i, v := range result {
rawTransaction := mpi.txStore.getTxByOrderKey(v.account, v.nonce)
hashList[i] = rawTransaction.TransactionHash
txList[i] = rawTransaction
}
mpi.increaseBatchSeqNo()
batchSeqNo := mpi.getBatchSeqNo()
ready := &raftproto.Ready{
TxHashes: hashList,
Height: batchSeqNo,
}
// store the batch to cache
if _, ok := mpi.txStore.batchedCache[batchSeqNo]; ok {
mpi.logger.Errorf("Generate block with height %d, but there is already block at this height", batchSeqNo)
return nil, errors.New("wrong block height ")
}
// store the batch to cache
mpi.txStore.batchedCache[batchSeqNo] = txList
// store the batch to db
mpi.batchStore(txList)
mpi.txStore.priorityNonBatchSize = mpi.txStore.priorityNonBatchSize - uint64(len(hashList))
mpi.logger.Infof("Generated block %d with %d txs", batchSeqNo, len(txList))
return ready, nil
}
func (mpi *mempoolImpl) getBlock(ready *raftproto.Ready) *mempoolBatch {
res := &mempoolBatch{}
// leader get the block directly from batchedCache
if mpi.isLeader() {
if txList, ok := mpi.txStore.batchedCache[ready.Height]; !ok {
mpi.logger.Warningf("Leader get block failed, can't find block %d from batchedCache", ready.Height)
missingTxnHashList := make(map[uint64]string)
for i, txHash := range ready.TxHashes {
missingTxnHashList[uint64(i)] = txHash.Hex()
}
res.missingTxnHashList = missingTxnHashList
} else {
// TODO (YH): check tx hash and length
res.txList = txList
}
return res
}
// follower construct the same batch by given ready.
return mpi.constructSameBatch(ready)
}
func (mpi *mempoolImpl) constructSameBatch(ready *raftproto.Ready) *mempoolBatch {
res := &mempoolBatch{}
if txList, ok := mpi.txStore.batchedCache[ready.Height]; ok {
mpi.logger.Warningf("Batch %d already exists in batchedCache", ready.Height)
// TODO (YH): check tx hash and length
res.txList = txList
return res
}
missingTxList := make(map[uint64]string)
txList := make([]*pb.Transaction, 0)
for index, txHash := range ready.TxHashes {
var (
txPointer *orderedIndexKey
txMap *txSortedMap
txItem *txItem
ok bool
)
if txPointer, _ = mpi.txStore.txHashMap[txHash.Hex()]; txPointer == nil {
missingTxList[uint64(index)] = txHash.Hex()
continue
}
if txMap, ok = mpi.txStore.allTxs[txPointer.account]; !ok {
mpi.logger.Warningf("Transaction %s exist in txHashMap but not in allTxs", txHash.Hex())
missingTxList[uint64(index)] = txHash.Hex()
continue
}
if txItem, ok = txMap.items[txPointer.nonce]; !ok {
mpi.logger.Warningf("Transaction %s exist in txHashMap but not in allTxs", txHash.Hex())
missingTxList[uint64(index)] = txHash.Hex()
continue
}
txList = append(txList, txItem.tx)
mpi.txStore.batchedTxs[*txPointer] = true
}
res.missingTxnHashList = missingTxList
res.txList = txList
// store the batch to cache
mpi.txStore.batchedCache[ready.Height] = txList
// store the batch to db
mpi.batchStore(txList)
return res
}
func (mpi *mempoolImpl) processCommitTransactions(ready *raftproto.Ready) {
dirtyAccounts := make(map[string]bool)
// update current cached commit nonce for account
for _, txHash := range ready.TxHashes {
txHashStr := txHash.Hex()
txPointer := mpi.txStore.txHashMap[txHashStr]
preCommitNonce := mpi.txStore.nonceCache.getCommitNonce(txPointer.account)
newCommitNonce := txPointer.nonce + 1
if preCommitNonce < newCommitNonce {
mpi.txStore.nonceCache.setCommitNonce(txPointer.account, newCommitNonce)
}
delete(mpi.txStore.txHashMap, txHashStr)
delete(mpi.txStore.batchedTxs, *txPointer)
dirtyAccounts[txPointer.account] = true
}
// clean related txs info in cache
for account := range dirtyAccounts {
commitNonce := mpi.txStore.nonceCache.getCommitNonce(account)
if list, ok := mpi.txStore.allTxs[account]; ok {
// remove all previous seq number txs for this account.
removedTxs := list.forward(commitNonce)
// remove index smaller than commitNonce delete index.
var wg sync.WaitGroup
wg.Add(3)
go func(ready map[string][]*pb.Transaction) {
defer wg.Done()
list.index.remove(removedTxs)
}(removedTxs)
go func(ready map[string][]*pb.Transaction) {
defer wg.Done()
mpi.txStore.priorityIndex.removeByOrderedQueueKey(removedTxs)
}(removedTxs)
go func(ready map[string][]*pb.Transaction) {
defer wg.Done()
mpi.txStore.parkingLotIndex.removeByOrderedQueueKey(removedTxs)
}(removedTxs)
wg.Wait()
delta := int32(len(removedTxs))
atomic.AddInt32(&mpi.txStore.poolSize, -delta)
}
}
mpi.batchDelete(ready.TxHashes)
delete(mpi.txStore.batchedCache, ready.Height)
// restart batch timer for remain txs.
if mpi.isLeader(){
mpi.startBatchTimer(StartReason2)
}
mpi.logger.Debugf("Replica removes batch %d in mempool, and now there are %d batches, "+
"priority len: %d, parkingLot len: %d", ready.Height, len(mpi.txStore.batchedCache),
mpi.txStore.priorityIndex.size(), mpi.txStore.parkingLotIndex.size())
}
// sendFetchTxnRequest sends fetching missing transactions request to leader node.
func (mpi *mempoolImpl) sendFetchTxnRequest(height uint64, lostTxnHashList map[uint64]string) error {
filterFetchTxHashList := &FetchTxnRequest{
ReplicaId: mpi.localID,
Height: height,
MissingTxHashes: lostTxnHashList,
}
missingHashListBytes, err := filterFetchTxHashList.Marshal()
if err != nil {
mpi.logger.Error("Marshal MissingHashList fail")
return err
}
pbMsg := mpi.msgToConsensusPbMsg(missingHashListBytes, raftproto.RaftMessage_GET_TX)
mpi.logger.Debugf("Send fetch transactions request to replica %d", mpi.leader)
mpi.unicast(mpi.leader, pbMsg)
mpi.txStore.missingBatch[height] = lostTxnHashList
return nil
}
// processFetchTxnRequest processes fetch request...
func (mpi *mempoolImpl) processFetchTxnRequest(fetchTxnRequest *FetchTxnRequest) error {
txList := make(map[uint64]*pb.Transaction, len(fetchTxnRequest.MissingTxHashes))
var err error
if txList, err = mpi.loadTxnFromCache(fetchTxnRequest); err != nil {
if txList, err = mpi.loadTxnFromStorage(fetchTxnRequest); err != nil {
if txList, err = mpi.loadTxnFromLedger(fetchTxnRequest); err != nil {
mpi.logger.Error("Process fetch txn request failed.")
return err
}
}
}
fetchTxnResponse := &FetchTxnResponse{
ReplicaId: mpi.localID,
Height: fetchTxnRequest.Height,
MissingTxnList: txList,
}
resBytes, err := fetchTxnResponse.Marshal()
if err != nil {
return err
}
pbMsg := mpi.msgToConsensusPbMsg(resBytes, raftproto.RaftMessage_GET_TX_ACK)
mpi.logger.Debugf("Send fetch transactions response to replica %d", fetchTxnRequest.ReplicaId)
mpi.unicast(fetchTxnRequest.ReplicaId, pbMsg)
return nil
}
func (mpi *mempoolImpl) loadTxnFromCache(fetchTxnRequest *FetchTxnRequest) (map[uint64]*pb.Transaction, error) {
missingHashList := fetchTxnRequest.MissingTxHashes
targetHeight := fetchTxnRequest.Height
for _, txHash := range missingHashList {
if txPointer, _ := mpi.txStore.txHashMap[txHash]; txPointer == nil {
return nil, fmt.Errorf("transaction %s dones't exist in txHashMap", txHash)
}
}
var targetBatch []*pb.Transaction
var ok bool
if targetBatch, ok = mpi.txStore.batchedCache[targetHeight]; !ok {
return nil, fmt.Errorf("batch %d dones't exist in batchedCache", targetHeight)
}
targetBatchLen := uint64(len(targetBatch))
txList := make(map[uint64]*pb.Transaction, len(missingHashList))
for index, txHash := range missingHashList {
if index > targetBatchLen || targetBatch[index].TransactionHash.Hex() != txHash {
return nil, fmt.Errorf("find invaild transaction, index: %d, targetHash: %s", index, txHash)
}
txList[index] = targetBatch[index]
}
return txList, nil
}
// TODO (YH): restore txn from wal
func (mpi *mempoolImpl) loadTxnFromStorage(fetchTxnRequest *FetchTxnRequest) (map[uint64]*pb.Transaction, error) {
missingHashList := fetchTxnRequest.MissingTxHashes
txList := make(map[uint64]*pb.Transaction)
for index, txHash := range missingHashList {
var (
rawHash types.Hash
err error
)
if rawHash, err = hex2Hash(txHash); err != nil {
return nil, err
}
if tx, ok := mpi.load(rawHash); !ok {
return nil, errors.New("can't load tx from storage")
} else {
txList[index] = tx
}
}
return txList, nil
}
func (mpi *mempoolImpl) loadTxnFromLedger(fetchTxnRequest *FetchTxnRequest) (map[uint64]*pb.Transaction, error) {
missingHashList := fetchTxnRequest.MissingTxHashes
txList := make(map[uint64]*pb.Transaction)
for index, txHash := range missingHashList {
var (
tx *pb.Transaction
rawHash types.Hash
err error
)
if rawHash, err = hex2Hash(txHash); err != nil {
return nil, err
}
if tx, err = mpi.ledgerHelper(rawHash); err != nil {
return nil, err
}
txList[index] = tx
}
return txList, nil
}
func (mpi *mempoolImpl) processFetchTxnResponse(fetchTxnResponse *FetchTxnResponse) error {
mpi.logger.Debugf("Receive fetch transactions response from replica %d", fetchTxnResponse.ReplicaId)
if _, ok := mpi.txStore.missingBatch[fetchTxnResponse.Height]; !ok {
return errors.New("can't find batch %d from missingBatch")
}
expectLen := len(mpi.txStore.missingBatch[fetchTxnResponse.Height])
recvLen := len(fetchTxnResponse.MissingTxnList)
if recvLen != expectLen {
return fmt.Errorf("receive unmatched fetching txn response, expect length: %d, received length: %d", expectLen, recvLen)
}
validTxn := make([]*pb.Transaction, 0)
targetBatch := mpi.txStore.missingBatch[fetchTxnResponse.Height]
for index, tx := range fetchTxnResponse.MissingTxnList {
if tx.Hash().Hex() != targetBatch[index] {
return errors.New("find a hash mismatch tx")
}
validTxn = append(validTxn, tx)
}
if err := mpi.processTransactions(validTxn); err != nil {
return err
}
delete(mpi.txStore.missingBatch, fetchTxnResponse.Height)
return nil
}

View File

@ -0,0 +1,264 @@
package mempool
//
//import (
// "encoding/json"
// "fmt"
// "math/rand"
// "sort"
// "testing"
// "time"
//
// "github.com/google/btree"
// "github.com/meshplus/bitxhub-kit/crypto"
// "github.com/meshplus/bitxhub-kit/crypto/asym"
// "github.com/meshplus/bitxhub-kit/types"
// "github.com/meshplus/bitxhub-model/pb"
// "github.com/stretchr/testify/require"
//)
//
//var (
// InterchainContractAddr = types.String2Address("000000000000000000000000000000000000000a")
// appchains = []string{
// "0x3f9d18f7c3a6e5e4c0b877fe3e688ab08840b997",
// "0xa8ae1bbc1105944a84a71b89056930d951d420fe",
// "0x929545f44692178edb7fa468b44c5351596184ba",
// "0x7368022e6659236983eb959b8a1fa22577d48294",
// }
//)
//
//func TestCoreMemPool_RecvTransactions(t *testing.T) {
// readyTxs := make([]*pb.Transaction, 0)
// nonreadyTxs := make([]*pb.Transaction, 0)
// txIndex := make(map[string]*pb.Transaction)
// privKey, err := asym.GenerateKeyPair(crypto.Secp256k1)
// require.Nil(t, err)
// pubKey := privKey.PublicKey()
// addr, err := pubKey.Address()
// require.Nil(t, err)
//
// sort.Strings(appchains)
// readyTxsLen := 2
// for _, appchain := range appchains {
// for i := 1; i <= readyTxsLen; i++ {
// readyTxs = append(readyTxs, mockTxhelper(t, txIndex, appchain, uint64(i)))
// // add unready txs
// nonreadyTxs = append(nonreadyTxs, mockTxhelper(t, txIndex, appchain, uint64(i+readyTxsLen+1)))
// }
// }
//
// // set timestamp and signature for txs
// for _, tx := range readyTxs {
// addSigAndTime(t, tx, addr, privKey)
// }
// for _, tx := range nonreadyTxs {
// addSigAndTime(t, tx, addr, privKey)
// }
//
// // shuffle tx order
// rand.Seed(time.Now().UnixNano())
// rand.Shuffle(len(readyTxs), func(i, j int) {
// readyTxs[i], readyTxs[j] = readyTxs[j], readyTxs[i]
// })
// memPool := newMempoolImpl(nil, nil, nil)
// require.Nil(t, memPool.recvTransactions(readyTxs))
// require.Nil(t, memPool.RecvTransactions(nonreadyTxs))
//
// // check if all txs are indexed in memPool.allTxs
// // and if all txs are indexed by its account and nonce
// require.Equal(t, len(appchains), len(memPool.transactionStore.allTxs))
// checkAllTxs(t, memPool, txIndex, readyTxsLen, readyTxsLen)
// checkHashMap(t, memPool, true, readyTxs, nonreadyTxs)
//
// // check if priorityIndex is correctly recorded
// require.Equal(t, len(readyTxs), memPool.transactionStore.priorityIndex.data.Len())
// for _, tx := range readyTxs {
// ok := memPool.priorityIndex.data.Has(makeKey(tx))
// require.True(t, ok)
// ok = memPool.transactionStore.parkingLotIndex.data.Has(makeKey(tx))
// require.True(t, !ok)
// }
//
// // check if parkingLotIndex is correctly recorded
// require.Equal(t, len(nonreadyTxs), memPool.transactionStore.parkingLotIndex.data.Len())
// for _, tx := range nonreadyTxs {
// ok := memPool.transactionStore.parkingLotIndex.data.Has(makeKey(tx))
// require.True(t, ok)
// ok = memPool.transactionStore.priorityIndex.data.Has(makeKey(tx))
// require.True(t, !ok)
// }
//
// // add the missing tx for each appchain
// missingTxs := make([]*pb.Transaction, 0, len(appchains))
// for _, appchain := range appchains {
// missingTxs = append(missingTxs, mockTxhelper(t, txIndex, appchain, uint64(readyTxsLen+1)))
// }
// for _, tx := range missingTxs {
// addSigAndTime(t, tx, addr, privKey)
// }
//
// require.Nil(t, memPool.RecvTransactions(missingTxs))
//
// // check if parkingLotIndex is empty now
// require.Equal(t, 0, memPool.transactionStore.parkingLotIndex.data.Len())
// // check if priorityIndex has received missingTxs and txs from original parkingLotIndex
// for _, tx := range missingTxs {
// ok := memPool.transactionStore.priorityIndex.data.Has(makeKey(tx))
// require.True(t, ok)
// }
// for _, tx := range nonreadyTxs {
// ok := memPool.transactionStore.priorityIndex.data.Has(makeKey(tx))
// require.True(t, ok)
// }
// checkHashMap(t, memPool, true, readyTxs, nonreadyTxs, missingTxs)
//}
//
//func TestCoreMemPool_RecvTransactions_Margin(t *testing.T) {
// readyTxs := make([]*pb.Transaction, 0)
// identicalNonceTxs := make([]*pb.Transaction, 0)
// replayedTxs := make([]*pb.Transaction, 0)
// readyTxIndex := make(map[string]*pb.Transaction)
// identicalNonceTxIndex := make(map[string]*pb.Transaction)
// privKey, err := asym.GenerateKeyPair(crypto.Secp256k1)
// require.Nil(t, err)
// pubKey := privKey.PublicKey()
// addr, err := pubKey.Address()
// require.Nil(t, err)
//
// sort.Strings(appchains)
// readyTxsLen := 2
// for _, appchain := range appchains {
// for i := 1; i <= readyTxsLen; i++ {
// tx := mockTxhelper(t, readyTxIndex, appchain, uint64(i))
// readyTxs = append(readyTxs, tx)
// // add tx with same index but different content
// identicalNonceTx := mockTxhelper(t, identicalNonceTxIndex, appchain, uint64(i))
// identicalNonceTxs = append(identicalNonceTxs, identicalNonceTx)
// }
// }
//
// // set timestamp and signature for txs
// for _, tx := range readyTxs {
// addSigAndTime(t, tx, addr, privKey)
// // add repeated txs
// replayedTxs = append(replayedTxs, tx)
// }
// for _, tx := range identicalNonceTxs {
// addSigAndTime(t, tx, addr, privKey)
// }
//
// memPool := New()
// require.Nil(t, memPool.RecvTransactions(readyTxs))
// require.NotNil(t, memPool.RecvTransactions(replayedTxs))
// err = memPool.RecvTransactions(identicalNonceTxs)
// require.NotNil(t, err)
//
// require.Equal(t, len(appchains), len(memPool.transactionStore.allTxs))
// checkAllTxs(t, memPool, readyTxIndex, readyTxsLen, 0)
// checkHashMap(t, memPool, true, readyTxs)
// checkHashMap(t, memPool, false, identicalNonceTxs)
//}
//
//func checkAllTxs(t *testing.T, memPool *CoreMemPool,
// txIndex map[string]*pb.Transaction, readyTxsLen, nonReadyTxLen int) {
// for _, appchain := range appchains {
// idx := uint64(1)
// accountAddr := fmt.Sprintf("%s-%s", appchain, appchain)
//
// txMap, ok := memPool.transactionStore.allTxs[accountAddr]
// require.True(t, ok)
// require.NotNil(t, txMap.index)
// require.Equal(t, readyTxsLen+nonReadyTxLen, txMap.index.data.Len())
// require.Equal(t, readyTxsLen+nonReadyTxLen, len(txMap.items))
// txMap.index.data.Ascend(func(i btree.Item) bool {
// orderedKey := i.(*orderedIndexKey)
// if idx <= uint64(readyTxsLen) {
// require.Equal(t, orderedKey.nonce, idx)
// } else {
// require.Equal(t, orderedKey.nonce, idx+1)
// }
// require.Equal(t, orderedKey.accountAddress, accountAddr)
//
// ibtpID := fmt.Sprintf("%s-%s-%d", appchain, appchain, orderedKey.nonce)
// require.Equal(t, txIndex[ibtpID], txMap.items[orderedKey.nonce])
// idx++
// return true
// })
// }
//}
//
//func checkHashMap(t *testing.T, memPool *CoreMemPool, expectedStatus bool, txsSlice ...[]*pb.Transaction) {
// for _, txs := range txsSlice {
// for _, tx := range txs {
// _, ok := memPool.transactionStore.txHashMap[tx.TransactionHash.Hex()]
// require.Equal(t, expectedStatus, ok)
// }
// }
//}
//
//func mockTxhelper(t *testing.T, txIndex map[string]*pb.Transaction, appchainAddr string, index uint64) *pb.Transaction {
// ibtp := mockIBTP(t, appchainAddr, appchainAddr, index)
// tx := mockInterchainTx(t, ibtp)
// txIndex[ibtp.ID()] = tx
// return tx
//}
//
//func addSigAndTime(t *testing.T, tx *pb.Transaction, addr types.Address, privKey crypto.PrivateKey) {
// tx.Timestamp = time.Now().UnixNano()
// tx.From = addr
// sig, err := privKey.Sign(tx.SignHash().Bytes())
// tx.Signature = sig
// require.Nil(t, err)
// tx.TransactionHash = tx.Hash()
//}
//
//func mockInterchainTx(t *testing.T, ibtp *pb.IBTP) *pb.Transaction {
// ib, err := ibtp.Marshal()
// require.Nil(t, err)
//
// ipd := &pb.InvokePayload{
// Method: "HandleIBTP",
// Args: []*pb.Arg{{Value: ib}},
// }
// pd, err := ipd.Marshal()
// require.Nil(t, err)
//
// data := &pb.TransactionData{
// VmType: pb.TransactionData_BVM,
// Type: pb.TransactionData_INVOKE,
// Payload: pd,
// }
//
// return &pb.Transaction{
// To: InterchainContractAddr,
// Nonce: int64(ibtp.Index),
// Data: data,
// Extra: []byte(fmt.Sprintf("%s-%s", ibtp.From, ibtp.To)),
// }
//}
//
//func mockIBTP(t *testing.T, from, to string, nonce uint64) *pb.IBTP {
// content := pb.Content{
// SrcContractId: from,
// DstContractId: from,
// Func: "interchainget",
// Args: [][]byte{[]byte("Alice"), []byte("10")},
// }
//
// bytes, err := content.Marshal()
// require.Nil(t, err)
//
// ibtppd, err := json.Marshal(pb.Payload{
// Encrypted: false,
// Content: bytes,
// })
// require.Nil(t, err)
//
// return &pb.IBTP{
// From: from,
// To: to,
// Payload: ibtppd,
// Index: nonce,
// Type: pb.IBTP_INTERCHAIN,
// Timestamp: time.Now().UnixNano(),
// }
//}

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,23 @@
syntax = "proto3";
package mempool;
import "github.com/meshplus/bitxhub-model/pb/transaction.proto";
message tx_slice {
repeated pb.Transaction TxList = 1;
}
message fetch_txn_request {
uint64 replicaId = 1;
uint64 height = 2;
map<uint64, string> missing_tx_hashes = 3;
}
message fetch_txn_response {
uint64 replicaId = 1;
uint64 height = 2;
map<uint64, pb.Transaction> missing_txn_list = 3;
}

View File

@ -0,0 +1,27 @@
package mempool
import (
"github.com/meshplus/bitxhub-model/pb"
)
// broadcast the new transaction to other nodes
func (mpi *mempoolImpl) broadcast(m *pb.Message) {
for id := range mpi.peerMgr.Peers() {
if id == mpi.localID {
continue
}
go func(id uint64) {
if err := mpi.peerMgr.AsyncSend(id, m); err != nil {
mpi.logger.Debugf("Send tx slice to peer %d failed, err: %s", id, err.Error())
}
}(id)
}
}
func (mpi *mempoolImpl) unicast(to uint64, m *pb.Message) {
go func() {
if err := mpi.peerMgr.AsyncSend(to, m); err != nil {
mpi.logger.Error("Send message to peer %d failed, err: %s", to, err.Error())
}
}()
}

View File

@ -0,0 +1,54 @@
package mempool
import (
"fmt"
"github.com/meshplus/bitxhub-kit/types"
"github.com/meshplus/bitxhub-model/pb"
)
// batchStore persists batch into DB, which
func (mpi *mempoolImpl) batchStore(txList []*pb.Transaction) {
batch := mpi.storage.NewBatch()
for _, tx := range txList {
txKey := compositeKey(tx.TransactionHash.Bytes())
txData, _ := tx.Marshal()
batch.Put(txKey, txData)
}
batch.Commit()
}
// batchDelete batch delete txs
func (mpi *mempoolImpl) batchDelete(hashes []types.Hash) {
batch := mpi.storage.NewBatch()
for _, hash := range hashes {
txKey := compositeKey(hash.Bytes())
batch.Delete(txKey)
}
batch.Commit()
}
func (mpi *mempoolImpl) store(tx *pb.Transaction) {
txKey := compositeKey(tx.TransactionHash.Bytes())
txData, _ := tx.Marshal()
mpi.storage.Put(txKey, txData)
}
func (mpi *mempoolImpl) load(hash types.Hash) (*pb.Transaction, bool) {
txKey := compositeKey(hash.Bytes())
txData := mpi.storage.Get(txKey)
if txData == nil {
return nil, false
}
var tx pb.Transaction
if err := tx.Unmarshal(txData); err != nil {
mpi.logger.Error(err)
return nil, false
}
return &tx, true
}
func compositeKey(value interface{}) []byte {
var prefix = []byte("tx-")
return append(prefix, []byte(fmt.Sprintf("%v", value))...)
}

View File

@ -0,0 +1,101 @@
package mempool
import (
"time"
"github.com/meshplus/bitxhub-model/pb"
"github.com/meshplus/bitxhub/internal/loggers"
"github.com/sirupsen/logrus"
)
type TxCache struct {
recvTxC chan *pb.Transaction
txSetC chan *TxSlice
txSet []*pb.Transaction
logger logrus.FieldLogger
timerC chan bool
stopTimerC chan bool
close chan bool
txSetTick time.Duration
}
func newTxCache(txSliceTimeout time.Duration) *TxCache {
txCache := &TxCache{}
txCache.recvTxC = make(chan *pb.Transaction, DefaultTxCacheSize)
txCache.close = make(chan bool)
txCache.txSetC = make(chan *TxSlice)
txCache.timerC = make(chan bool)
txCache.stopTimerC = make(chan bool)
txCache.txSet = make([]*pb.Transaction, 0)
txCache.logger = loggers.Logger(loggers.Order)
if txSliceTimeout == 0 {
txCache.txSetTick = DefaultTxSetTick
} else {
txCache.txSetTick = txSliceTimeout
}
return txCache
}
func (tc *TxCache) listenEvent() {
for {
select {
case <-tc.close:
tc.logger.Info("Exit transaction cache")
case tx := <-tc.recvTxC:
tc.appendTx(tx)
case <-tc.timerC:
tc.stopTxSetTimer()
tc.postTxSet()
}
}
}
func (tc *TxCache) appendTx(tx *pb.Transaction) {
if tx == nil {
tc.logger.Errorf("Transaction is nil")
return
}
if len(tc.txSet) == 0 {
tc.startTxSetTimer()
}
tc.txSet = append(tc.txSet, tx)
if len(tc.txSet) >= DefaultTxSetSize {
tc.stopTxSetTimer()
tc.postTxSet()
}
}
func (tc *TxCache) postTxSet() {
dst := make([]*pb.Transaction, len(tc.txSet))
copy(dst, tc.txSet)
txSet := &TxSlice{
TxList: dst,
}
tc.txSetC <- txSet
tc.txSet = make([]*pb.Transaction, 0)
}
func (tc *TxCache) IsFull() bool {
return len(tc.recvTxC) == DefaultTxCacheSize
}
func (tc *TxCache) startTxSetTimer() {
go func() {
timer := time.NewTimer(tc.txSetTick)
select {
case <-timer.C:
tc.timerC <- true
case <-tc.stopTimerC:
return
}
}()
}
func (tc *TxCache) stopTxSetTimer() {
close(tc.stopTimerC)
tc.stopTimerC = make(chan bool)
}

View File

@ -0,0 +1,140 @@
package mempool
import (
"github.com/google/btree"
"github.com/meshplus/bitxhub-model/pb"
)
type transactionStore struct {
// track all valid tx hashes cached in mempool
txHashMap map[string]*orderedIndexKey
// track all valid tx, mapping user' account to all related transactions.
allTxs map[string]*txSortedMap
// track the commit nonce and pending nonce of each account.
nonceCache *nonceCache
// keeps track of "non-ready" txs (txs that can't be included in next block)
// only used to help remove some txs if pool is full.
parkingLotIndex *btreeIndex
// keeps track of "ready" txs
priorityIndex *btreeIndex
// cache all the batched txs which haven't executed.
batchedTxs map[orderedIndexKey]bool
// cache all batches created by current primary in order, removed after they are been executed.
// TODO (YH): change the type of key from height to digest.
batchedCache map[uint64][]*pb.Transaction
// trace the missing transaction
missingBatch map[uint64]map[uint64]string
// track the current size of mempool
poolSize int32
// track the non-batch priority transaction.
priorityNonBatchSize uint64
}
func newTransactionStore() *transactionStore {
return &transactionStore{
txHashMap: make(map[string]*orderedIndexKey, 0),
allTxs: make(map[string]*txSortedMap),
batchedTxs: make(map[orderedIndexKey]bool),
missingBatch: make(map[uint64]map[uint64]string),
batchedCache: make(map[uint64][]*pb.Transaction),
parkingLotIndex: newBtreeIndex(),
priorityIndex: newBtreeIndex(),
nonceCache: newNonceCache(),
}
}
// Get transaction by account address + nonce
func (txStore *transactionStore) getTxByOrderKey(account string, seqNo uint64) *pb.Transaction {
if list, ok := txStore.allTxs[account]; ok {
res := list.items[seqNo]
if res == nil {
return nil
}
return res.tx
}
return nil
}
type txSortedMap struct {
items map[uint64]*txItem // map nonce to transaction
index *btreeIndex // index for items
}
func newTxSortedMap() *txSortedMap {
return &txSortedMap{
items: make(map[uint64]*txItem),
index: newBtreeIndex(),
}
}
func (m *txSortedMap) filterReady(demandNonce uint64) ([]*pb.Transaction, []*pb.Transaction, uint64) {
var readyTxs, nonReadyTxs []*pb.Transaction
if m.index.data.Len() == 0 {
return nil, nil, demandNonce
}
demandKey := makeSortedNonceKeyKey(demandNonce)
m.index.data.AscendGreaterOrEqual(demandKey, func(i btree.Item) bool {
nonce := i.(*sortedNonceKey).nonce
if nonce == demandNonce {
readyTxs = append(readyTxs, m.items[demandNonce].tx)
demandNonce++
} else {
nonReadyTxs = append(nonReadyTxs, m.items[nonce].tx)
}
return true
})
return readyTxs, nonReadyTxs, demandNonce
}
// forward removes all allTxs from the map with a nonce lower than the
// provided commitNonce.
func (m *txSortedMap) forward(commitNonce uint64) map[string][]*pb.Transaction {
removedTxs := make(map[string][]*pb.Transaction)
commitNonceKey := makeSortedNonceKeyKey(commitNonce)
m.index.data.AscendLessThan(commitNonceKey, func(i btree.Item) bool {
// delete tx from map.
nonce := i.(*sortedNonceKey).nonce
txItem := m.items[nonce]
account := txItem.account
if _, ok := removedTxs[account]; ! ok {
removedTxs[account] = make([]*pb.Transaction, 0)
}
removedTxs[account] = append(removedTxs[account], txItem.tx)
delete(m.items, nonce)
return true
})
return removedTxs
}
type nonceCache struct {
// commitNonces records each account's latest committed nonce in ledger.
commitNonces map[string]uint64
// pendingNonces records each account's latest nonce which has been included in
// priority queue. Invariant: pendingNonces[account] >= commitNonces[account]
pendingNonces map[string]uint64
}
func (nc *nonceCache) getCommitNonce(account string) uint64 {
nonce, ok := nc.commitNonces[account]
if !ok {
return 1
}
return nonce
}
func (nc *nonceCache) setCommitNonce(account string, nonce uint64) {
nc.commitNonces[account] = nonce
}
func (nc *nonceCache) getPendingNonce(account string) uint64 {
nonce, ok := nc.pendingNonces[account]
if !ok {
return 1
}
return nonce
}
func (nc *nonceCache) setPendingNonce(account string, nonce uint64) {
nc.pendingNonces[account] = nonce
}

View File

@ -0,0 +1,95 @@
package mempool
import (
"time"
"github.com/meshplus/bitxhub-kit/types"
"github.com/meshplus/bitxhub-model/pb"
raftproto "github.com/meshplus/bitxhub/pkg/order/etcdraft/proto"
"github.com/meshplus/bitxhub/pkg/peermgr"
cmap "github.com/orcaman/concurrent-map"
)
const (
btreeDegree = 10
)
const (
DefaultPoolSize = 50000
DefaultTxCacheSize = 10000
DefaultBatchSize = 500
DefaultTxSetSize = 10
DefaultBatchTick = 500 * time.Millisecond
DefaultTxSetTick = 100 * time.Millisecond
DefaultFetchTxnTimeout = 3 * time.Second
)
// batch timer reasons
const (
StartReason1 = "first transaction set"
StartReason2 = "finish executing a batch"
StopReason1 = "generated a batch by batch timer"
StopReason2 = "generated a batch by batch size"
StopReason3 = "restart batch timer"
)
// IBTP methods
const (
IBTPMethod1 = "HandleIBTP"
IBTPMethod2 = "HandleIBTPs"
)
type LocalMissingTxnEvent struct {
Height uint64
MissingTxnHashList map[uint64]string
WaitC chan bool
}
type subscribeEvent struct {
txForwardC chan *TxSlice
localMissingTxnEvent chan *LocalMissingTxnEvent
fetchTxnRequestC chan *FetchTxnRequest
fetchTxnResponseC chan *FetchTxnResponse
getBlockC chan *constructBatchEvent
commitTxnC chan *raftproto.Ready
updateLeaderC chan uint64
}
type mempoolBatch struct {
missingTxnHashList map[uint64]string
txList []*pb.Transaction
}
type constructBatchEvent struct {
ready *raftproto.Ready
result chan *mempoolBatch
}
type Config struct {
ID uint64
BatchSize uint64
PoolSize uint64
TxSliceSize uint64
BatchTick time.Duration
FetchTimeout time.Duration
TxSliceTimeout time.Duration
PeerMgr peermgr.PeerManager
GetTransactionFunc func(hash types.Hash) (*pb.Transaction, error)
ChainHeight uint64
}
type timerManager struct {
timeout time.Duration // default timeout of this timer
isActive cmap.ConcurrentMap // track all the timers with this timerName if it is active now
timeoutEventC chan bool
}
type txItem struct {
account string
tx *pb.Transaction
}

146
pkg/order/mempool/utils.go Normal file
View File

@ -0,0 +1,146 @@
package mempool
import (
"encoding/hex"
"errors"
"fmt"
cmap "github.com/orcaman/concurrent-map"
"strconv"
"sync/atomic"
"time"
"github.com/meshplus/bitxhub-kit/types"
"github.com/meshplus/bitxhub-model/pb"
raftproto "github.com/meshplus/bitxhub/pkg/order/etcdraft/proto"
)
func (mpi *mempoolImpl) getBatchSeqNo() uint64 {
return atomic.LoadUint64(&mpi.batchSeqNo)
}
func (mpi *mempoolImpl) increaseBatchSeqNo() {
atomic.AddUint64(&mpi.batchSeqNo, 1)
}
// getTxByTxPointer returns the tx stored in allTxs by given TxPointer.
func (mpi *mempoolImpl) getTxByTxPointer(txPointer orderedIndexKey) *pb.Transaction {
if txnMap, ok := mpi.txStore.allTxs[txPointer.account]; ok {
return txnMap.items[txPointer.nonce].tx
}
return nil
}
func (mpi *mempoolImpl) msgToConsensusPbMsg(data []byte, tyr raftproto.RaftMessage_Type) *pb.Message {
rm := &raftproto.RaftMessage{
Type: tyr,
FromId: mpi.localID,
Data: data,
}
cmData, err := rm.Marshal()
if err != nil {
return nil
}
msg := &pb.Message{
Type: pb.Message_CONSENSUS,
Data: cmData,
}
return msg
}
func newSubscribe() *subscribeEvent {
return &subscribeEvent{
txForwardC: make(chan *TxSlice),
localMissingTxnEvent: make(chan *LocalMissingTxnEvent),
fetchTxnRequestC: make(chan *FetchTxnRequest),
updateLeaderC: make(chan uint64),
fetchTxnResponseC: make(chan *FetchTxnResponse),
commitTxnC: make(chan *raftproto.Ready),
getBlockC: make(chan *constructBatchEvent),
}
}
// TODO (YH): restore commitNonce and pendingNonce from db.
func newNonceCache() *nonceCache {
return &nonceCache{
commitNonces: make(map[string]uint64),
pendingNonces: make(map[string]uint64),
}
}
func hex2Hash(hash string) (types.Hash, error) {
var (
hubHash types.Hash
hashBytes []byte
err error
)
if hashBytes, err = hex.DecodeString(hash); err != nil {
return types.Hash{}, err
}
if len(hashBytes) != types.HashLength {
return types.Hash{}, errors.New("invalid tx hash")
}
copy(hubHash[:], hashBytes)
return hubHash, nil
}
func (mpi *mempoolImpl) poolIsFull() bool {
return atomic.LoadInt32(&mpi.txStore.poolSize) >= DefaultPoolSize
}
func (mpi *mempoolImpl) isLeader() bool {
return mpi.leader == mpi.localID
}
func (mpi *mempoolImpl) isBatchTimerActive() bool {
return !mpi.batchTimerMgr.isActive.IsEmpty()
}
// startBatchTimer starts the batch timer and reset the batchTimerActive to true.
func (mpi *mempoolImpl) startBatchTimer(reason string) {
// stop old timer
mpi.stopBatchTimer(StopReason3)
mpi.logger.Debugf("Start batch timer, reason: %s",reason)
timestamp := time.Now().UnixNano()
key := strconv.FormatInt(timestamp, 10)
mpi.batchTimerMgr.isActive.Set(key, true)
time.AfterFunc(mpi.batchTimerMgr.timeout, func() {
if mpi.batchTimerMgr.isActive.Has(key) {
mpi.batchTimerMgr.timeoutEventC <- true
}
})
}
// stopBatchTimer stops the batch timer and reset the batchTimerActive to false.
func (mpi *mempoolImpl) stopBatchTimer(reason string) {
if mpi.batchTimerMgr.isActive.IsEmpty() {
return
}
mpi.logger.Debugf("Stop batch timer, reason: %s", reason)
mpi.batchTimerMgr.isActive = cmap.New()
}
// newTimer news a timer with default timeout.
func newTimer(d time.Duration) *timerManager {
return &timerManager{
timeout: d,
isActive: cmap.New(),
timeoutEventC: make(chan bool),
}
}
func getAccount(tx *pb.Transaction) (string, error) {
payload := &pb.InvokePayload{}
if err := payload.Unmarshal(tx.Data.Payload); err != nil {
return "",fmt.Errorf("unmarshal invoke payload: %s", err.Error())
}
if payload.Method == IBTPMethod1 || payload.Method == IBTPMethod2 {
ibtp := &pb.IBTP{}
if err := ibtp.Unmarshal(payload.Args[0].Value); err != nil {
return "", fmt.Errorf("unmarshal ibtp from tx :%w", err)
}
account := fmt.Sprintf("%s-%s",ibtp.From,ibtp.To)
return account, nil
}
return tx.From.Hex(), nil
}