From 6b52259ad373026482c6c679a6871e8c1e114223 Mon Sep 17 00:00:00 2001 From: Alexader Date: Thu, 31 Dec 2020 15:57:25 +0800 Subject: [PATCH 1/5] feat(order): add timeout rebroadcast mechanism in mempool --- config/order.toml | 1 + go.sum | 4 ++ pkg/order/etcdraft/config.go | 7 +- pkg/order/etcdraft/node.go | 59 ++++++++++------ pkg/order/etcdraft/util.go | 2 +- pkg/order/mempool/btree_index.go | 48 +++++++++++++ pkg/order/mempool/mempool.go | 61 ++++++++++++++++ pkg/order/mempool/mempool_impl.go | 66 ++++++++++++------ pkg/order/mempool/mempool_test.go | 112 ++++++++++++++++++++++++++++-- pkg/order/mempool/tx_store.go | 42 ++++++++++- pkg/order/mempool/types.go | 19 ++--- 11 files changed, 361 insertions(+), 60 deletions(-) diff --git a/config/order.toml b/config/order.toml index b99ff9d..6b151e2 100644 --- a/config/order.toml +++ b/config/order.toml @@ -1,5 +1,6 @@ [raft] batch_timeout = "0.3s" # Block packaging time period. +rebroadcast_timeout = "3m" # Node timeout tick to check if there exist txs to be rebroadcast tick_timeout = "0.1s" # TickTimeout is the internal logical clock for the Node by a single tick, Election timeouts and heartbeat timeouts are in units of ticks. election_tick = 10 # ElectionTick is the number of Node.Tick invocations that must pass between elections. heartbeat_tick = 1 # HeartbeatTick is the number of Node.Tick invocations that must pass between heartbeats. diff --git a/go.sum b/go.sum index 4632438..9e8591b 100644 --- a/go.sum +++ b/go.sum @@ -40,6 +40,7 @@ github.com/Shopify/sarama v1.27.0/go.mod h1:aCdj6ymI8uyPEux1JJ9gcaDT6cinjGhNCAhs github.com/Shopify/toxiproxy v2.1.4+incompatible h1:TKdv8HiTLgE5wdJuEML90aBgNWsokNbMijUGhmcoBJc= github.com/Shopify/toxiproxy v2.1.4+incompatible/go.mod h1:OXgGpZ6Cli1/URJOF1DMxUHB2q5Ap20/P/eIdh4G0pI= github.com/StackExchange/wmi v0.0.0-20180116203802-5d049714c4a6/go.mod h1:3eOhrUMpNV+6aFIbp5/iudMxNCF27Vw2OZgy4xEx0Fg= +github.com/VictoriaMetrics/fastcache v1.5.7 h1:4y6y0G8PRzszQUYIQHHssv/jgPHAb5qQuuDNdCbyAgw= github.com/VictoriaMetrics/fastcache v1.5.7/go.mod h1:ptDBkNMQI4RtmVo8VS/XwRY6RoTu1dAWCbrk+6WsEM8= github.com/aead/siphash v1.0.1/go.mod h1:Nywa3cDsYNNK3gaciGTWPwHt0wlpNV15vwmswBAUSII= github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= @@ -793,6 +794,7 @@ github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQD github.com/ryanuber/columnize v0.0.0-20160712163229-9b3edd62028f/go.mod h1:sm1tb6uqfes/u+d4ooFouqFdy9/2g9QGwK3SQygK0Ts= github.com/satori/go.uuid v1.2.0/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0= github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg65j358z/aeFdxmN0P9QXhEzd20vsDc= +github.com/shirou/gopsutil v2.20.5+incompatible h1:tYH07UPoQt0OCQdgWWMgYHy3/a9bcxNpBIysykNIP7I= github.com/shirou/gopsutil v2.20.5+incompatible/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA= github.com/shurcooL/sanitized_anchor_name v1.0.0 h1:PdmoCO6wvbs+7yrJyMORt4/BmY5IYyJwS/kOiWx8mHo= github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc= @@ -831,7 +833,9 @@ github.com/spf13/viper v1.7.1 h1:pM5oEahlgWv/WnHXpgbKz7iLIxRf65tye2Ci+XFK5sk= github.com/spf13/viper v1.7.1/go.mod h1:8WkrPz2fc9jxqZNCJI/76HCieCp4Q8HaLFoCha5qpdg= github.com/src-d/envconfig v1.0.0/go.mod h1:Q9YQZ7BKITldTBnoxsE5gOeB5y66RyPXeue/R4aaNBc= github.com/status-im/keycard-go v0.0.0-20190316090335-8537d3370df4/go.mod h1:RZLeN1LMWmRsyYjvAu+I6Dm9QmlDaIIt+Y+4Kd7Tp+Q= +github.com/steakknife/bloomfilter v0.0.0-20180922174646-6819c0d2a570 h1:gIlAHnH1vJb5vwEjIp5kBj/eu99p/bl0Ay2goiPe5xE= github.com/steakknife/bloomfilter v0.0.0-20180922174646-6819c0d2a570/go.mod h1:8OR4w3TdeIHIh1g6EMY5p0gVNOovcWC+1vpc7naMuAw= +github.com/steakknife/hamming v0.0.0-20180906055917-c99c65617cd3 h1:njlZPzLwU639dk2kqnCPPv+wNjq7Xb6EfUxe/oX0/NM= github.com/steakknife/hamming v0.0.0-20180906055917-c99c65617cd3/go.mod h1:hpGUWaI9xL8pRQCTXQgocU38Qw1g0Us7n5PxxTwTCYU= github.com/streadway/amqp v0.0.0-20190827072141-edfb9018d271/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= diff --git a/pkg/order/etcdraft/config.go b/pkg/order/etcdraft/config.go index 87f0d12..8229f45 100644 --- a/pkg/order/etcdraft/config.go +++ b/pkg/order/etcdraft/config.go @@ -27,6 +27,7 @@ type SyncerConfig struct { type RAFT struct { BatchTimeout time.Duration `mapstructure:"batch_timeout"` + RebroadcastTimeout time.Duration `mapstructure:"rebroadcast_timeout"` TickTimeout time.Duration `mapstructure:"tick_timeout"` ElectionTick int `mapstructure:"election_tick"` HeartbeatTick int `mapstructure:"heartbeat_tick"` @@ -51,10 +52,10 @@ func defaultRaftConfig() raft.Config { } } -func generateEtcdRaftConfig(id uint64, repoRoot string, logger logrus.FieldLogger, ram MemoryStorage) (*raft.Config, time.Duration, error) { +func generateEtcdRaftConfig(id uint64, repoRoot string, logger logrus.FieldLogger, ram MemoryStorage) (*raft.Config, time.Duration, time.Duration, error) { readConfig, err := readConfig(repoRoot) if err != nil { - return &raft.Config{}, 100 * time.Millisecond, nil + return &raft.Config{}, 100 * time.Millisecond, 3 * time.Minute, nil } defaultConfig := defaultRaftConfig() defaultConfig.ID = id @@ -75,7 +76,7 @@ func generateEtcdRaftConfig(id uint64, repoRoot string, logger logrus.FieldLogge defaultConfig.PreVote = readConfig.RAFT.PreVote defaultConfig.CheckQuorum = readConfig.RAFT.CheckQuorum defaultConfig.DisableProposalForwarding = readConfig.RAFT.DisableProposalForwarding - return &defaultConfig, readConfig.RAFT.TickTimeout, nil + return &defaultConfig, readConfig.RAFT.TickTimeout, readConfig.RAFT.RebroadcastTimeout, nil } func generateRaftConfig(repoRoot string) (*RAFTConfig, error) { diff --git a/pkg/order/etcdraft/node.go b/pkg/order/etcdraft/node.go index daacb60..f3077a1 100644 --- a/pkg/order/etcdraft/node.go +++ b/pkg/order/etcdraft/node.go @@ -4,7 +4,6 @@ import ( "context" "errors" "fmt" - "github.com/meshplus/bitxhub-kit/log" "path/filepath" "sync" "sync/atomic" @@ -12,6 +11,7 @@ import ( "github.com/coreos/etcd/raft" "github.com/coreos/etcd/raft/raftpb" + "github.com/meshplus/bitxhub-kit/log" "github.com/meshplus/bitxhub-kit/storage" "github.com/meshplus/bitxhub-kit/types" "github.com/meshplus/bitxhub-model/pb" @@ -39,26 +39,28 @@ type Node struct { txCache *mempool.TxCache // cache the transactions received from api batchTimerMgr *BatchTimer - proposeC chan *raftproto.RequestBatch // proposed ready, input channel - confChangeC <-chan raftpb.ConfChange // proposed cluster config changes - commitC chan *pb.CommitEvent // the hash commit channel - errorC chan<- error // errors from raft session - tickTimeout time.Duration // tick timeout - msgC chan []byte // receive messages from remote peer - stateC chan *mempool.ChainState // receive the executed block state + proposeC chan *raftproto.RequestBatch // proposed ready, input channel + confChangeC <-chan raftpb.ConfChange // proposed cluster config changes + commitC chan *pb.CommitEvent // the hash commit channel + errorC chan<- error // errors from raft session + tickTimeout time.Duration // tick timeout + rebroadcastTimeout time.Duration // rebroadcast timeout + msgC chan []byte // receive messages from remote peer + stateC chan *mempool.ChainState // receive the executed block state + rebroadcastTicker chan *raftproto.TxSlice // receive the executed block state - confState raftpb.ConfState // raft requires ConfState to be persisted within snapshot - blockAppliedIndex sync.Map // mapping of block height and apply index in raft log - appliedIndex uint64 // current apply index in raft log - snapCount uint64 // snapshot count - snapshotIndex uint64 // current snapshot apply index in raft log - lastIndex uint64 // last apply index in raft log - lastExec uint64 // the index of the last-applied block - readyPool *sync.Pool // ready pool, avoiding memory growth fast - justElected bool // track new leader status + confState raftpb.ConfState // raft requires ConfState to be persisted within snapshot + blockAppliedIndex sync.Map // mapping of block height and apply index in raft log + appliedIndex uint64 // current apply index in raft log + snapCount uint64 // snapshot count + snapshotIndex uint64 // current snapshot apply index in raft log + lastIndex uint64 // last apply index in raft log + lastExec uint64 // the index of the last-applied block + readyPool *sync.Pool // ready pool, avoiding memory growth fast + justElected bool // track new leader status getChainMetaFunc func() *pb.ChainMeta // current chain meta - ctx context.Context // context - haltC chan struct{} // exit signal + ctx context.Context // context + haltC chan struct{} // exit signal } // NewNode new raft node @@ -165,7 +167,7 @@ func NewNode(opts ...order.Option) (order.Order, error) { // Start or restart raft node func (n *Node) Start() error { n.blockAppliedIndex.Store(n.lastExec, n.loadAppliedIndex()) - rc, tickTimeout, err := generateEtcdRaftConfig(n.id, n.repoRoot, n.logger, n.raftStorage.ram) + rc, tickTimeout, rebroadcastTimeout, err := generateEtcdRaftConfig(n.id, n.repoRoot, n.logger, n.raftStorage.ram) if err != nil { return fmt.Errorf("generate raft config: %w", err) } @@ -175,6 +177,8 @@ func (n *Node) Start() error { n.node = raft.StartNode(rc, n.peers) } n.tickTimeout = tickTimeout + n.rebroadcastTimeout = rebroadcastTimeout + go n.run() go n.txCache.ListenEvent() n.logger.Info("Consensus module started") @@ -249,7 +253,9 @@ func (n *Node) run() { n.snapshotIndex = snap.Metadata.Index n.appliedIndex = snap.Metadata.Index ticker := time.NewTicker(n.tickTimeout) + rebroadcastTicker := time.NewTicker(n.rebroadcastTimeout) defer ticker.Stop() + defer rebroadcastTicker.Stop() // handle input request go func() { @@ -313,6 +319,19 @@ func (n *Node) run() { case state := <-n.stateC: n.reportState(state) + case <-rebroadcastTicker.C: + // check periodically if there are long-pending txs in mempool + rebroadcastTxs := n.mempool.GetTimeoutTransactions(n.rebroadcastTimeout) + for _, txSlice := range rebroadcastTxs { + txSet := &raftproto.TxSlice{TxList: txSlice} + data, err := txSet.Marshal() + if err != nil { + n.logger.Errorf("Marshal failed, err: %s", err.Error()) + return + } + pbMsg := msgToConsensusPbMsg(data, raftproto.RaftMessage_BROADCAST_TX, n.id) + _ = n.peerMgr.Broadcast(pbMsg) + } case <-n.batchTimerMgr.BatchTimeoutEvent(): n.batchTimerMgr.StopBatchTimer() // call txPool module to generate a tx batch diff --git a/pkg/order/etcdraft/util.go b/pkg/order/etcdraft/util.go index da8ed8b..80dfb2c 100644 --- a/pkg/order/etcdraft/util.go +++ b/pkg/order/etcdraft/util.go @@ -14,7 +14,7 @@ import ( ) const ( - DefaultBatchTick = 500 * time.Millisecond + DefaultBatchTick = 500 * time.Millisecond DefaultSnapshotCount = 1000 ) diff --git a/pkg/order/mempool/btree_index.go b/pkg/order/mempool/btree_index.go index 964e938..eb15eb0 100644 --- a/pkg/order/mempool/btree_index.go +++ b/pkg/order/mempool/btree_index.go @@ -22,6 +22,17 @@ func (oik *orderedIndexKey) Less(than btree.Item) bool { return oik.nonce < other.nonce } +type sortedTtlKey struct { + account string + nonce uint64 + liveTime int64 // the latest live time of tx after it is received or rebroadcast +} + +func (stk *sortedTtlKey) Less(than btree.Item) bool { + other := than.(*sortedTtlKey) + return stk.liveTime < other.liveTime +} + type sortedNonceKey struct { nonce uint64 } @@ -32,6 +43,23 @@ func (snk *sortedNonceKey) Less(item btree.Item) bool { return snk.nonce < dst.nonce } +type orderedTimeoutKey struct { + account string + nonce uint64 + timestamp int64 // the timestamp of tx when it is received +} + +func (otk *orderedTimeoutKey) Less(than btree.Item) bool { + other := than.(*orderedTimeoutKey) + if otk.timestamp != other.timestamp { + return otk.timestamp < other.timestamp + } + if otk.account != other.account { + return otk.account < other.account + } + return otk.nonce < other.nonce +} + func makeOrderedIndexKey(account string, tx *pb.Transaction) *orderedIndexKey { return &orderedIndexKey{ account: account, @@ -39,6 +67,14 @@ func makeOrderedIndexKey(account string, tx *pb.Transaction) *orderedIndexKey { } } +func makeTimeoutKey(account string, tx *pb.Transaction) *orderedTimeoutKey { + return &orderedTimeoutKey{ + account: account, + nonce: tx.Nonce, + timestamp: tx.Timestamp, + } +} + func makeSortedNonceKey(nonce uint64) *sortedNonceKey { return &sortedNonceKey{ nonce: nonce, @@ -79,6 +115,18 @@ func (idx *btreeIndex) removeByOrderedQueueKey(txs map[string][]*pb.Transaction) } } +func (idx *btreeIndex) insertByTimeoutKey(account string, tx *pb.Transaction) { + idx.data.ReplaceOrInsert(makeTimeoutKey(account, tx)) +} + +func (idx *btreeIndex) removeByTimeoutKey(txs map[string][]*pb.Transaction) { + for account, list := range txs { + for _, tx := range list { + idx.data.Delete(makeTimeoutKey(account, tx)) + } + } +} + // Size returns the size of the index func (idx *btreeIndex) size() int { return idx.data.Len() diff --git a/pkg/order/mempool/mempool.go b/pkg/order/mempool/mempool.go index 71fb233..feb7c95 100644 --- a/pkg/order/mempool/mempool.go +++ b/pkg/order/mempool/mempool.go @@ -3,6 +3,8 @@ package mempool import ( "time" + "github.com/google/btree" + "github.com/meshplus/bitxhub-model/pb" raftproto "github.com/meshplus/bitxhub/pkg/order/etcdraft/proto" ) @@ -24,6 +26,8 @@ type MemPool interface { SetBatchSeqNo(batchSeq uint64) + GetTimeoutTransactions(rebroadcastDuration time.Duration) [][]*pb.Transaction + External } @@ -79,3 +83,60 @@ func (mpi *mempoolImpl) IsPoolFull() bool { func (mpi *mempoolImpl) SetBatchSeqNo(batchSeq uint64) { mpi.batchSeqNo = batchSeq } + +func (mpi *mempoolImpl) GetTimeoutTransactions(rebroadcastDuration time.Duration) [][]*pb.Transaction { + txList := make([]*pb.Transaction, 0, mpi.txStore.ttlIndex.index.Len()) + // all the tx whose live time is less than lowBoundTime should be rebroadcast + mpi.logger.Debugf("------- Start gathering timeout txs, ttl index len is %d -----------", mpi.txStore.ttlIndex.index.Len()) + allItem := make([]*sortedTtlKey, 0, mpi.txStore.ttlIndex.index.Len()) + currentTime := time.Now().UnixNano() + mpi.txStore.ttlIndex.index.Ascend(func(i btree.Item) bool { + item := i.(*sortedTtlKey) + timeoutTime := item.liveTime + rebroadcastDuration.Nanoseconds() + if txMap, ok := mpi.txStore.allTxs[item.account]; ok && currentTime > timeoutTime { + tx := txMap.items[item.nonce].tx + txList = append(txList, tx) + allItem = append(allItem, item) + } + return true + }) + if len(txList) == 0 { + return nil + } + for _, item := range allItem { + // update the liveTime of each tx + item.liveTime = currentTime + mpi.txStore.ttlIndex.items[makeKey(item.account, item.nonce)] = currentTime + } + // shard txList into fixed size in case txList is too large to broadcast one time + return shardTxList(txList, mpi.txSliceSize) +} + +func shardTxList(txs []*pb.Transaction, batchLen uint64) [][]*pb.Transaction { + begin := uint64(0) + end := uint64(len(txs)) - 1 + totalLen := uint64(len(txs)) + + // shape txs to fixed size in case totalLen is too large + batchNums := totalLen / batchLen + if totalLen%batchLen != 0 { + batchNums++ + } + shardedLists := make([][]*pb.Transaction, 0, batchNums) + for i := uint64(0); i <= batchNums; i++ { + actualLen := batchLen + if end-begin+1 < batchLen { + actualLen = end - begin + 1 + } + if actualLen == 0 { + continue + } + shardedList := make([]*pb.Transaction, actualLen) + for j := uint64(0); j < batchLen && begin <= end; j++ { + shardedList[j] = txs[begin] + begin++ + } + shardedLists = append(shardedLists, shardedList) + } + return shardedLists +} diff --git a/pkg/order/mempool/mempool_impl.go b/pkg/order/mempool/mempool_impl.go index 98879f4..582cfcb 100644 --- a/pkg/order/mempool/mempool_impl.go +++ b/pkg/order/mempool/mempool_impl.go @@ -1,6 +1,7 @@ package mempool import ( + "fmt" "sync" "github.com/google/btree" @@ -10,19 +11,21 @@ import ( ) type mempoolImpl struct { - localID uint64 - batchSize uint64 - batchSeqNo uint64 // track the sequence number of block - poolSize uint64 - logger logrus.FieldLogger - txStore *transactionStore // store all transactions info + localID uint64 + batchSize uint64 + txSliceSize uint64 + batchSeqNo uint64 // track the sequence number of block + poolSize uint64 + logger logrus.FieldLogger + txStore *transactionStore // store all transactions info } func newMempoolImpl(config *Config) *mempoolImpl { mpi := &mempoolImpl{ - localID: config.ID, - batchSeqNo: config.ChainHeight, - logger: config.Logger, + localID: config.ID, + batchSeqNo: config.ChainHeight, + logger: config.Logger, + txSliceSize: config.TxSliceSize, } mpi.txStore = newTransactionStore() if config.BatchSize == 0 { @@ -35,7 +38,13 @@ func newMempoolImpl(config *Config) *mempoolImpl { } else { mpi.poolSize = config.PoolSize } + if config.TxSliceSize == 0 { + mpi.txSliceSize = DefaultTxSetSize + } else { + mpi.txSliceSize = config.TxSliceSize + } mpi.logger.Infof("MemPool batch size = %d", mpi.batchSize) + mpi.logger.Infof("MemPool tx slice size = %d", mpi.batchSize) mpi.logger.Infof("MemPool batch seqNo = %d", mpi.batchSeqNo) mpi.logger.Infof("MemPool pool size = %d", mpi.poolSize) return mpi @@ -91,9 +100,12 @@ func (mpi *mempoolImpl) processDirtyAccount(dirtyAccounts map[string]bool) { readyTxs, nonReadyTxs, nextDemandNonce := list.filterReady(pendingNonce) mpi.txStore.nonceCache.setPendingNonce(account, nextDemandNonce) - // inset ready txs into priorityIndex. + // inset ready txs into priorityIndex and set ttlIndex for these txs. for _, tx := range readyTxs { - mpi.txStore.priorityIndex.insertByOrderedQueueKey(account, tx) + if !mpi.txStore.priorityIndex.data.Has(makeTimeoutKey(account, tx)) { + mpi.txStore.priorityIndex.insertByTimeoutKey(account, tx) + mpi.txStore.ttlIndex.insertByTtlKey(account, tx.Nonce, tx.Timestamp) + } } mpi.txStore.priorityNonBatchSize = mpi.txStore.priorityNonBatchSize + uint64(len(readyTxs)) @@ -108,7 +120,9 @@ func (mpi *mempoolImpl) processDirtyAccount(dirtyAccounts map[string]bool) { // getBlock fetches next block of transactions for consensus, // batchedTx are all txs sent to consensus but were not committed yet, mempool should filter out such txs. func (mpi *mempoolImpl) generateBlock() (*raftproto.RequestBatch, error) { - // txs has lower nonce will be observed first in priority index iterator. + // tx which has lower timestamp will be observed first in priority index iterator. + // and if first seen tx's nonce isn't the required nonce for the account, + // it will be stored in skip DS first. mpi.logger.Debugf("Length of non-batched transactions: %d", mpi.txStore.priorityNonBatchSize) var batchSize uint64 if poolLen := mpi.txStore.priorityNonBatchSize; poolLen > mpi.batchSize { @@ -120,8 +134,8 @@ func (mpi *mempoolImpl) generateBlock() (*raftproto.RequestBatch, error) { skippedTxs := make(map[orderedIndexKey]bool) result := make([]orderedIndexKey, 0, mpi.batchSize) mpi.txStore.priorityIndex.data.Ascend(func(a btree.Item) bool { - tx := a.(*orderedIndexKey) - // if tx has existed in bathedTxs, + tx := a.(*orderedTimeoutKey) + // if tx has existed in bathedTxs, ignore this tx if _, ok := mpi.txStore.batchedTxs[orderedIndexKey{tx.account, tx.nonce}]; ok { return true } @@ -131,10 +145,14 @@ func (mpi *mempoolImpl) generateBlock() (*raftproto.RequestBatch, error) { if txSeq >= 1 { _, seenPrevious = mpi.txStore.batchedTxs[orderedIndexKey{account: tx.account, nonce: txSeq - 1}] } + if txSeq == 3 { + mpi.logger.Infof("seenPrevious %v and commitNonce is %d", seenPrevious, commitNonce) + mpi.logger.Infof("batched txs is %v", mpi.txStore.batchedTxs) + } // include transaction if it's "next" for given account or // we've already sent its ancestor to Consensus if seenPrevious || (txSeq == commitNonce) { - ptr := orderedIndexKey{account: tx.account, nonce: tx.nonce} + ptr := orderedIndexKey{account: tx.account, nonce: txSeq} mpi.txStore.batchedTxs[ptr] = true result = append(result, ptr) if uint64(len(result)) == batchSize { @@ -142,7 +160,7 @@ func (mpi *mempoolImpl) generateBlock() (*raftproto.RequestBatch, error) { } // check if we can now include some txs that were skipped before for given account - skippedTxn := orderedIndexKey{account: tx.account, nonce: tx.nonce + 1} + skippedTxn := orderedIndexKey{account: tx.account, nonce: txSeq + 1} for { if _, ok := skippedTxs[skippedTxn]; !ok { break @@ -155,7 +173,7 @@ func (mpi *mempoolImpl) generateBlock() (*raftproto.RequestBatch, error) { skippedTxn.nonce++ } } else { - skippedTxs[orderedIndexKey{tx.account, tx.nonce}] = true + skippedTxs[orderedIndexKey{tx.account, txSeq}] = true } return true }) @@ -222,14 +240,18 @@ func (mpi *mempoolImpl) processCommitTransactions(state *ChainState) { removedTxs := list.forward(commitNonce) // remove index smaller than commitNonce delete index. var wg sync.WaitGroup - wg.Add(3) + wg.Add(4) go func(ready map[string][]*pb.Transaction) { defer wg.Done() list.index.removeBySortedNonceKey(removedTxs) }(removedTxs) go func(ready map[string][]*pb.Transaction) { defer wg.Done() - mpi.txStore.priorityIndex.removeByOrderedQueueKey(removedTxs) + mpi.txStore.priorityIndex.removeByTimeoutKey(removedTxs) + }(removedTxs) + go func(ready map[string][]*pb.Transaction) { + defer wg.Done() + mpi.txStore.ttlIndex.removeByTtlKey(removedTxs) }(removedTxs) go func(ready map[string][]*pb.Transaction) { defer wg.Done() @@ -248,5 +270,9 @@ func (mpi *mempoolImpl) processCommitTransactions(state *ChainState) { } mpi.logger.Debugf("Replica %d removes batches in mempool, and now there are %d non-batched txs,"+ "priority len: %d, parkingLot len: %d, batchedTx len: %d, txHashMap len: %d", mpi.localID, mpi.txStore.priorityNonBatchSize, - mpi.txStore.priorityIndex.size(), mpi.txStore.parkingLotIndex.size(), len(mpi.txStore.batchedTxs), len(mpi.txStore.txHashMap)) + mpi.txStore.priorityIndex.size(), mpi.txStore.parkingLotIndex.size(), len(mpi.txStore.batchedTxs), len(mpi.txStore.txHashMap)) +} + +func makeKey(account string, nonce uint64) string { + return fmt.Sprintf("%s-%d", account, nonce) } diff --git a/pkg/order/mempool/mempool_test.go b/pkg/order/mempool/mempool_test.go index fb993cc..02d68da 100644 --- a/pkg/order/mempool/mempool_test.go +++ b/pkg/order/mempool/mempool_test.go @@ -1,6 +1,7 @@ package mempool import ( + "fmt" "testing" "time" @@ -18,6 +19,12 @@ func TestGetBlock(t *testing.T) { tx2 := constructTx(uint64(2), &privKey1) tx3 := constructTx(uint64(2), &privKey2) tx4 := constructTx(uint64(4), &privKey2) + from1, err := privKey1.PublicKey().Address() + ast.Nil(err) + fmt.Printf("from 1 is %s\n", from1) + from2, err := privKey2.PublicKey().Address() + ast.Nil(err) + fmt.Printf("from 2 is %s\n", from2) var txList []*pb.Transaction txList = append(txList, tx1, tx2, tx3, tx4) // mock follower @@ -28,7 +35,7 @@ func TestGetBlock(t *testing.T) { ast.Nil(batch) // mock leader to getBlock - txList = make([]*pb.Transaction,0) + txList = make([]*pb.Transaction, 0) tx5 := constructTx(uint64(1), &privKey2) txList = append(txList, tx5) batch = mpi.ProcessTransactions(txList, true) @@ -89,13 +96,13 @@ func TestCommitTransactions(t *testing.T) { ast.Equal(4, len(batch.TxList)) ast.Equal(4, mpi.txStore.priorityIndex.size()) ast.Equal(1, mpi.txStore.parkingLotIndex.size()) - ast.Equal(uint64(2), mpi.batchSeqNo) + ast.Equal(uint64(2), mpi.batchSeqNo) var txHashList []*types.Hash txHashList = append(txHashList, tx1.TransactionHash, tx2.TransactionHash, tx3.TransactionHash, tx5.TransactionHash) state := &ChainState{ TxHashList: txHashList, - Height: uint64(2), + Height: uint64(2), } mpi.CommitTransactions(state) time.Sleep(100 * time.Millisecond) @@ -156,7 +163,6 @@ func TestProcessTransactions(t *testing.T) { ast.Equal(uint64(3), mpi.txStore.nonceCache.getPendingNonce(account2.String())) } - func TestForward(t *testing.T) { ast := assert.New(t) mpi, _ := mockMempoolImpl() @@ -181,4 +187,100 @@ func TestForward(t *testing.T) { ast.Equal(2, len(removeList[account1.String()])) ast.Equal(uint64(1), removeList[account1.String()][0].Nonce) ast.Equal(uint64(2), removeList[account1.String()][1].Nonce) -} \ No newline at end of file +} + +func TestGetTimeoutTransaction(t *testing.T) { + ast := assert.New(t) + mpi, _ := mockMempoolImpl() + mpi.batchSize = 5 + + txList := make([]*pb.Transaction, 0) + privKey1 := genPrivKey() + account1, _ := privKey1.PublicKey().Address() + privKey2 := genPrivKey() + account2, _ := privKey2.PublicKey().Address() + nonceArr := []uint64{4, 2, 1} + for _, i := range nonceArr { + tx1 := constructTx(i, &privKey1) + tx2 := constructTx(i, &privKey2) + txList = append(txList, tx1, tx2) + } + batch := mpi.ProcessTransactions(txList, true) + ast.Nil(batch) // not enough for 5 txs to generate batch + ast.Equal(4, mpi.txStore.priorityIndex.size()) + ast.Equal(2, mpi.txStore.parkingLotIndex.size()) + ast.Equal(6, len(mpi.txStore.txHashMap)) + ast.Equal(3, mpi.txStore.allTxs[account1.String()].index.size()) + ast.Equal(3, mpi.txStore.allTxs[account2.String()].index.size()) + ast.Equal(uint64(1), mpi.txStore.nonceCache.getCommitNonce(account1.String())) + ast.Equal(uint64(3), mpi.txStore.nonceCache.getPendingNonce(account1.String())) + ast.Equal(uint64(1), mpi.txStore.nonceCache.getCommitNonce(account2.String())) + ast.Equal(uint64(3), mpi.txStore.nonceCache.getPendingNonce(account2.String())) + + tx7 := constructTx(uint64(3), &privKey1) + tx8 := constructTx(uint64(5), &privKey2) + txList = make([]*pb.Transaction, 0) + txList = append(txList, tx7, tx8) + batch = mpi.ProcessTransactions(txList, true) + ast.NotNil(batch) + + var hashes []types.Hash + // this batch will contain tx{1,2,3} for privKey1 and tx{1,2} for privKey2 + ast.Equal(5, len(batch.TxList)) + ast.Equal(uint64(2), batch.Height) + ast.Equal(uint64(1), mpi.txStore.priorityNonBatchSize) + ast.Equal(6, mpi.txStore.priorityIndex.size()) + ast.Equal(3, mpi.txStore.parkingLotIndex.size(), "delete parkingLot until finishing executor") + ast.Equal(8, len(mpi.txStore.txHashMap)) + ast.Equal(4, mpi.txStore.allTxs[account1.String()].index.size()) + ast.Equal(4, mpi.txStore.allTxs[account2.String()].index.size()) + ast.Equal(uint64(5), mpi.txStore.nonceCache.getPendingNonce(account1.String())) + ast.Equal(uint64(3), mpi.txStore.nonceCache.getPendingNonce(account2.String())) + + // process committed txs + hashList := make([]*types.Hash, 0, len(hashes)) + for _, tx := range batch.TxList { + hashList = append(hashList, tx.Hash()) + } + ready := &ChainState{ + TxHashList: hashList, + Height: 2, + } + mpi.processCommitTransactions(ready) + time.Sleep(100 * time.Millisecond) + + ast.Equal(uint64(1), mpi.txStore.priorityNonBatchSize) + ast.Equal(1, mpi.txStore.priorityIndex.size()) + ast.Equal(3, mpi.txStore.parkingLotIndex.size(), "delete parkingLot until finishing executor") + ast.Equal(3, len(mpi.txStore.txHashMap)) + ast.Equal(1, mpi.txStore.allTxs[account1.String()].index.size()) + ast.Equal(2, mpi.txStore.allTxs[account2.String()].index.size()) + ast.Equal(uint64(5), mpi.txStore.nonceCache.getPendingNonce(account1.String())) + ast.Equal(uint64(3), mpi.txStore.nonceCache.getCommitNonce(account2.String())) + ast.Equal(uint64(3), mpi.txStore.nonceCache.getPendingNonce(account2.String())) + + // generate block3 + txList = make([]*pb.Transaction, 0) + account1NonceArr2 := []uint64{5, 6} + for _, i := range account1NonceArr2 { + tx1 := constructTx(i, &privKey1) + txList = append(txList, tx1) + } + account2NonceArr2 := []uint64{3, 6} + for _, i := range account2NonceArr2 { + tx1 := constructTx(i, &privKey2) + txList = append(txList, tx1) + } + batch = mpi.ProcessTransactions(txList, true) + ast.NotNil(batch) + ast.Equal(uint64(2), mpi.txStore.priorityNonBatchSize) + ast.Equal(7, mpi.txStore.priorityIndex.size()) + ast.Equal(3, mpi.txStore.parkingLotIndex.size(), "delete parkingLot until finishing executor") + ast.Equal(7, len(mpi.txStore.txHashMap)) + ast.Equal(3, mpi.txStore.allTxs[account1.String()].index.size()) + ast.Equal(4, mpi.txStore.allTxs[account2.String()].index.size()) + ast.Equal(uint64(4), mpi.txStore.nonceCache.getCommitNonce(account1.String())) + ast.Equal(uint64(7), mpi.txStore.nonceCache.getPendingNonce(account1.String())) + ast.Equal(uint64(3), mpi.txStore.nonceCache.getCommitNonce(account2.String())) + ast.Equal(uint64(7), mpi.txStore.nonceCache.getPendingNonce(account2.String())) +} diff --git a/pkg/order/mempool/tx_store.go b/pkg/order/mempool/tx_store.go index 8b05c0e..07f5ef8 100644 --- a/pkg/order/mempool/tx_store.go +++ b/pkg/order/mempool/tx_store.go @@ -1,9 +1,11 @@ package mempool import ( + "fmt" + "sync" + "github.com/google/btree" "github.com/meshplus/bitxhub-model/pb" - "sync" ) type transactionStore struct { @@ -13,6 +15,8 @@ type transactionStore struct { allTxs map[string]*txSortedMap // track the commit nonce and pending nonce of each account. nonceCache *nonceCache + // keep track of the livetime of ready txs in priorityIndex + ttlIndex *txLiveTimeMap // keeps track of "non-ready" txs (txs that can't be included in next block) // only used to help remove some txs if pool is full. parkingLotIndex *btreeIndex @@ -31,6 +35,7 @@ func newTransactionStore() *transactionStore { batchedTxs: make(map[orderedIndexKey]bool), parkingLotIndex: newBtreeIndex(), priorityIndex: newBtreeIndex(), + ttlIndex: newTxLiveTimeMap(), nonceCache: newNonceCache(), } } @@ -134,7 +139,7 @@ type nonceCache struct { // pendingNonces records each account's latest nonce which has been included in // priority queue. Invariant: pendingNonces[account] >= commitNonces[account] pendingNonces map[string]uint64 - pendingMu sync.RWMutex + pendingMu sync.RWMutex } func newNonceCache() *nonceCache { @@ -171,3 +176,36 @@ func (nc *nonceCache) setPendingNonce(account string, nonce uint64) { defer nc.pendingMu.Unlock() nc.pendingNonces[account] = nonce } + +// since the live time field in sortedTtlKey may vary during process +// we need to track the latest live time since its latest broadcast. +type txLiveTimeMap struct { + items map[string]int64 // map account to its latest live time + index *btree.BTree // index for txs +} + +func newTxLiveTimeMap() *txLiveTimeMap { + return &txLiveTimeMap{ + index: btree.New(btreeDegree), + items: make(map[string]int64), + } +} + +func (tlm *txLiveTimeMap) insertByTtlKey(account string, nonce uint64, liveTime int64) { + tlm.index.ReplaceOrInsert(&sortedTtlKey{account, nonce, liveTime}) + tlm.items[makeKey(account, nonce)] = liveTime +} + +func (tlm *txLiveTimeMap) removeByTtlKey(txs map[string][]*pb.Transaction) { + for account, list := range txs { + for _, tx := range list { + liveTime, ok := tlm.items[makeKey(account, tx.Nonce)] + if !ok { + fmt.Printf("ttl key for %s not found\n", account) + return + } + tlm.index.Delete(&sortedTtlKey{account, tx.Nonce, liveTime}) + delete(tlm.items, makeKey(account, tx.Nonce)) + } + } +} diff --git a/pkg/order/mempool/types.go b/pkg/order/mempool/types.go index 5fd8223..fa696cb 100644 --- a/pkg/order/mempool/types.go +++ b/pkg/order/mempool/types.go @@ -17,15 +17,16 @@ const ( DefaultTxCacheSize = 10000 DefaultBatchSize = 500 DefaultTxSetSize = 10 - DefaultTxSetTick = 100 * time.Millisecond + DefaultTxSetTick = 100 * time.Millisecond ) type Config struct { - ID uint64 - BatchSize uint64 - PoolSize uint64 - TxSliceSize uint64 - TxSliceTimeout time.Duration + ID uint64 + BatchSize uint64 + PoolSize uint64 + RebroadcastTimeout time.Duration + TxSliceSize uint64 + TxSliceTimeout time.Duration ChainHeight uint64 Logger logrus.FieldLogger } @@ -36,7 +37,7 @@ type txItem struct { } type ChainState struct { - Height uint64 - BlockHash *types.Hash + Height uint64 + BlockHash *types.Hash TxHashList []*types.Hash -} \ No newline at end of file +} From abd840d4873623be8d5cf146fb8fead1e4ffefba Mon Sep 17 00:00:00 2001 From: Alexader Date: Tue, 5 Jan 2021 13:38:29 +0800 Subject: [PATCH 2/5] feat(order): add ttl index and local indicator for mempool tx 1. add ttl index to track txs need to be rebroadcast 2. add local indicator for each tx in mempool to judge if this tx is from api module --- pkg/order/etcdraft/node.go | 10 +-- pkg/order/mempool/btree_index.go | 6 ++ pkg/order/mempool/mempool.go | 61 +--------------- pkg/order/mempool/mempool_impl.go | 86 ++++++++++++++++++++-- pkg/order/mempool/mempool_test.go | 92 ++++++++++++++++++++---- pkg/order/mempool/tx_store.go | 24 +++++-- pkg/order/mempool/types.go | 1 + pkg/order/solo/node.go | 2 +- pkg/peermgr/mock_peermgr/mock_peermgr.go | 4 -- 9 files changed, 190 insertions(+), 96 deletions(-) diff --git a/pkg/order/etcdraft/node.go b/pkg/order/etcdraft/node.go index f3077a1..71c5905 100644 --- a/pkg/order/etcdraft/node.go +++ b/pkg/order/etcdraft/node.go @@ -314,7 +314,7 @@ func (n *Node) run() { _ = n.peerMgr.Broadcast(pbMsg) // 2. process transactions - n.processTransactions(txSet.TxList) + n.processTransactions(txSet.TxList, true) case state := <-n.stateC: n.reportState(state) @@ -405,7 +405,7 @@ func (n *Node) run() { } } -func (n *Node) processTransactions(txList []*pb.Transaction) { +func (n *Node) processTransactions(txList []*pb.Transaction, isLocal bool) { // leader node would check if this transaction triggered generating a batch or not if n.isLeader() { // start batch timer when this node receives the first transaction @@ -413,12 +413,12 @@ func (n *Node) processTransactions(txList []*pb.Transaction) { n.batchTimerMgr.StartBatchTimer() } // If this transaction triggers generating a batch, stop batch timer - if batch := n.mempool.ProcessTransactions(txList, true); batch != nil { + if batch := n.mempool.ProcessTransactions(txList, true, isLocal); batch != nil { n.batchTimerMgr.StopBatchTimer() n.postProposal(batch) } } else { - n.mempool.ProcessTransactions(txList, false) + n.mempool.ProcessTransactions(txList, false, isLocal) } } @@ -579,7 +579,7 @@ func (n *Node) processRaftCoreMsg(msg []byte) error { if err := txSlice.Unmarshal(rm.Data); err != nil { return err } - n.processTransactions(txSlice.TxList) + n.processTransactions(txSlice.TxList, false) default: return fmt.Errorf("unexpected raft message received") diff --git a/pkg/order/mempool/btree_index.go b/pkg/order/mempool/btree_index.go index eb15eb0..1618403 100644 --- a/pkg/order/mempool/btree_index.go +++ b/pkg/order/mempool/btree_index.go @@ -1,6 +1,8 @@ package mempool import ( + "fmt" + "github.com/meshplus/bitxhub-model/pb" "github.com/google/btree" @@ -81,6 +83,10 @@ func makeSortedNonceKey(nonce uint64) *sortedNonceKey { } } +func makeAccountNonceKey(account string, nonce uint64) string { + return fmt.Sprintf("%s-%d", account, nonce) +} + type btreeIndex struct { data *btree.BTree } diff --git a/pkg/order/mempool/mempool.go b/pkg/order/mempool/mempool.go index feb7c95..52da959 100644 --- a/pkg/order/mempool/mempool.go +++ b/pkg/order/mempool/mempool.go @@ -3,8 +3,6 @@ package mempool import ( "time" - "github.com/google/btree" - "github.com/meshplus/bitxhub-model/pb" raftproto "github.com/meshplus/bitxhub/pkg/order/etcdraft/proto" ) @@ -13,7 +11,7 @@ var _ MemPool = (*mempoolImpl)(nil) type MemPool interface { // ProcessTransactions process transaction from api and other vp nodes. - ProcessTransactions(txs []*pb.Transaction, isLeader bool) *raftproto.RequestBatch + ProcessTransactions(txs []*pb.Transaction, isLeader, isLocal bool) *raftproto.RequestBatch // GenerateBlock generate a block GenerateBlock() *raftproto.RequestBatch @@ -83,60 +81,3 @@ func (mpi *mempoolImpl) IsPoolFull() bool { func (mpi *mempoolImpl) SetBatchSeqNo(batchSeq uint64) { mpi.batchSeqNo = batchSeq } - -func (mpi *mempoolImpl) GetTimeoutTransactions(rebroadcastDuration time.Duration) [][]*pb.Transaction { - txList := make([]*pb.Transaction, 0, mpi.txStore.ttlIndex.index.Len()) - // all the tx whose live time is less than lowBoundTime should be rebroadcast - mpi.logger.Debugf("------- Start gathering timeout txs, ttl index len is %d -----------", mpi.txStore.ttlIndex.index.Len()) - allItem := make([]*sortedTtlKey, 0, mpi.txStore.ttlIndex.index.Len()) - currentTime := time.Now().UnixNano() - mpi.txStore.ttlIndex.index.Ascend(func(i btree.Item) bool { - item := i.(*sortedTtlKey) - timeoutTime := item.liveTime + rebroadcastDuration.Nanoseconds() - if txMap, ok := mpi.txStore.allTxs[item.account]; ok && currentTime > timeoutTime { - tx := txMap.items[item.nonce].tx - txList = append(txList, tx) - allItem = append(allItem, item) - } - return true - }) - if len(txList) == 0 { - return nil - } - for _, item := range allItem { - // update the liveTime of each tx - item.liveTime = currentTime - mpi.txStore.ttlIndex.items[makeKey(item.account, item.nonce)] = currentTime - } - // shard txList into fixed size in case txList is too large to broadcast one time - return shardTxList(txList, mpi.txSliceSize) -} - -func shardTxList(txs []*pb.Transaction, batchLen uint64) [][]*pb.Transaction { - begin := uint64(0) - end := uint64(len(txs)) - 1 - totalLen := uint64(len(txs)) - - // shape txs to fixed size in case totalLen is too large - batchNums := totalLen / batchLen - if totalLen%batchLen != 0 { - batchNums++ - } - shardedLists := make([][]*pb.Transaction, 0, batchNums) - for i := uint64(0); i <= batchNums; i++ { - actualLen := batchLen - if end-begin+1 < batchLen { - actualLen = end - begin + 1 - } - if actualLen == 0 { - continue - } - shardedList := make([]*pb.Transaction, actualLen) - for j := uint64(0); j < batchLen && begin <= end; j++ { - shardedList[j] = txs[begin] - begin++ - } - shardedLists = append(shardedLists, shardedList) - } - return shardedLists -} diff --git a/pkg/order/mempool/mempool_impl.go b/pkg/order/mempool/mempool_impl.go index 582cfcb..dcbeb7c 100644 --- a/pkg/order/mempool/mempool_impl.go +++ b/pkg/order/mempool/mempool_impl.go @@ -1,8 +1,9 @@ package mempool import ( - "fmt" + "math" "sync" + "time" "github.com/google/btree" "github.com/meshplus/bitxhub-model/pb" @@ -50,7 +51,7 @@ func newMempoolImpl(config *Config) *mempoolImpl { return mpi } -func (mpi *mempoolImpl) ProcessTransactions(txs []*pb.Transaction, isLeader bool) *raftproto.RequestBatch { +func (mpi *mempoolImpl) ProcessTransactions(txs []*pb.Transaction, isLeader, isLocal bool) *raftproto.RequestBatch { validTxs := make(map[string][]*pb.Transaction) for _, tx := range txs { // check the sequence number of tx @@ -74,7 +75,7 @@ func (mpi *mempoolImpl) ProcessTransactions(txs []*pb.Transaction, isLeader bool } // Process all the new transaction and merge any errors into the original slice - dirtyAccounts := mpi.txStore.insertTxs(validTxs) + dirtyAccounts := mpi.txStore.insertTxs(validTxs, isLocal) // send tx to mempool store mpi.processDirtyAccount(dirtyAccounts) @@ -100,13 +101,17 @@ func (mpi *mempoolImpl) processDirtyAccount(dirtyAccounts map[string]bool) { readyTxs, nonReadyTxs, nextDemandNonce := list.filterReady(pendingNonce) mpi.txStore.nonceCache.setPendingNonce(account, nextDemandNonce) - // inset ready txs into priorityIndex and set ttlIndex for these txs. + // insert ready txs into priorityIndex and set ttlIndex for these txs. for _, tx := range readyTxs { if !mpi.txStore.priorityIndex.data.Has(makeTimeoutKey(account, tx)) { mpi.txStore.priorityIndex.insertByTimeoutKey(account, tx) - mpi.txStore.ttlIndex.insertByTtlKey(account, tx.Nonce, tx.Timestamp) + if list.items[tx.Nonce].local { + // no need to rebroadcast tx from other nodes to reduce network overhead + mpi.txStore.ttlIndex.insertByTtlKey(account, tx.Nonce, tx.Timestamp) + } } } + mpi.txStore.updateEarliestTimestamp() mpi.txStore.priorityNonBatchSize = mpi.txStore.priorityNonBatchSize + uint64(len(readyTxs)) // inset non-ready txs into parkingLotIndex. @@ -252,6 +257,7 @@ func (mpi *mempoolImpl) processCommitTransactions(state *ChainState) { go func(ready map[string][]*pb.Transaction) { defer wg.Done() mpi.txStore.ttlIndex.removeByTtlKey(removedTxs) + mpi.txStore.updateEarliestTimestamp() }(removedTxs) go func(ready map[string][]*pb.Transaction) { defer wg.Done() @@ -273,6 +279,72 @@ func (mpi *mempoolImpl) processCommitTransactions(state *ChainState) { mpi.txStore.priorityIndex.size(), mpi.txStore.parkingLotIndex.size(), len(mpi.txStore.batchedTxs), len(mpi.txStore.txHashMap)) } -func makeKey(account string, nonce uint64) string { - return fmt.Sprintf("%s-%d", account, nonce) +func (mpi *mempoolImpl) GetTimeoutTransactions(rebroadcastDuration time.Duration) [][]*pb.Transaction { + // all the tx whose live time is less than lowBoundTime should be rebroadcast + mpi.logger.Debugf("------- Start gathering timeout txs, ttl index len is %d -----------", mpi.txStore.ttlIndex.index.Len()) + currentTime := time.Now().UnixNano() + if currentTime < mpi.txStore.earliestTimestamp+rebroadcastDuration.Nanoseconds() { + // if the latest incoming tx has not exceeded the timeout limit, then none will be timeout + return [][]*pb.Transaction{} + } + + txList := make([]*pb.Transaction, 0) + timeoutItems := make([]*sortedTtlKey, 0) + mpi.txStore.ttlIndex.index.Ascend(func(i btree.Item) bool { + item := i.(*sortedTtlKey) + if item.liveTime > math.MaxInt64 { + // TODO(tyx): if this tx has rebroadcast many times and exceeded a final limit, + // it is expired and will be removed from mempool + return true + } + // if this tx has not exceeded the rebroadcast duration, break iteration + timeoutTime := item.liveTime + rebroadcastDuration.Nanoseconds() + txMap, ok := mpi.txStore.allTxs[item.account] + if !ok || currentTime < timeoutTime { + return false + } + tx := txMap.items[item.nonce].tx + txList = append(txList, tx) + timeoutItems = append(timeoutItems, item) + return true + }) + if len(txList) == 0 { + return [][]*pb.Transaction{} + } + for _, item := range timeoutItems { + // update the liveTime of timeout txs + item.liveTime = currentTime + mpi.txStore.ttlIndex.items[makeAccountNonceKey(item.account, item.nonce)] = currentTime + } + // shard txList into fixed size in case txList is too large to broadcast one time + return shardTxList(txList, mpi.txSliceSize) +} + +func shardTxList(txs []*pb.Transaction, batchLen uint64) [][]*pb.Transaction { + begin := uint64(0) + end := uint64(len(txs)) - 1 + totalLen := uint64(len(txs)) + + // shape txs to fixed size in case totalLen is too large + batchNums := totalLen / batchLen + if totalLen%batchLen != 0 { + batchNums++ + } + shardedLists := make([][]*pb.Transaction, 0, batchNums) + for i := uint64(0); i <= batchNums; i++ { + actualLen := batchLen + if end-begin+1 < batchLen { + actualLen = end - begin + 1 + } + if actualLen == 0 { + continue + } + shardedList := make([]*pb.Transaction, actualLen) + for j := uint64(0); j < batchLen && begin <= end; j++ { + shardedList[j] = txs[begin] + begin++ + } + shardedLists = append(shardedLists, shardedList) + } + return shardedLists } diff --git a/pkg/order/mempool/mempool_test.go b/pkg/order/mempool/mempool_test.go index 02d68da..b1993fc 100644 --- a/pkg/order/mempool/mempool_test.go +++ b/pkg/order/mempool/mempool_test.go @@ -28,17 +28,17 @@ func TestGetBlock(t *testing.T) { var txList []*pb.Transaction txList = append(txList, tx1, tx2, tx3, tx4) // mock follower - batch := mpi.ProcessTransactions(txList, false) + batch := mpi.ProcessTransactions(txList, false, true) ast.Nil(batch) // mock leader - batch = mpi.ProcessTransactions(txList, true) + batch = mpi.ProcessTransactions(txList, true, true) ast.Nil(batch) // mock leader to getBlock txList = make([]*pb.Transaction, 0) tx5 := constructTx(uint64(1), &privKey2) txList = append(txList, tx5) - batch = mpi.ProcessTransactions(txList, true) + batch = mpi.ProcessTransactions(txList, true, true) ast.Equal(4, len(batch.TxList)) } @@ -59,7 +59,7 @@ func TestGetPendingNonceByAccount(t *testing.T) { tx5 := constructTx(uint64(4), &privKey2) var txList []*pb.Transaction txList = append(txList, tx1, tx2, tx3, tx4, tx5) - batch := mpi.ProcessTransactions(txList, false) + batch := mpi.ProcessTransactions(txList, false, true) ast.Nil(batch) nonce = mpi.GetPendingNonceByAccount(account1.String()) ast.Equal(uint64(3), nonce) @@ -81,7 +81,7 @@ func TestCommitTransactions(t *testing.T) { tx4 := constructTx(uint64(4), &privKey2) var txList []*pb.Transaction txList = append(txList, tx1, tx2, tx3, tx4) - batch := mpi.ProcessTransactions(txList, true) + batch := mpi.ProcessTransactions(txList, true, true) ast.Nil(batch) ast.Equal(3, mpi.txStore.priorityIndex.size()) ast.Equal(1, mpi.txStore.parkingLotIndex.size()) @@ -92,7 +92,7 @@ func TestCommitTransactions(t *testing.T) { tx5 := constructTx(uint64(2), &privKey2) txList = []*pb.Transaction{} txList = append(txList, tx5) - batch = mpi.ProcessTransactions(txList, true) + batch = mpi.ProcessTransactions(txList, true, true) ast.Equal(4, len(batch.TxList)) ast.Equal(4, mpi.txStore.priorityIndex.size()) ast.Equal(1, mpi.txStore.parkingLotIndex.size()) @@ -133,7 +133,7 @@ func TestProcessTransactions(t *testing.T) { tx4 := constructTx(uint64(2), &privKey2) tx5 := constructTx(uint64(4), &privKey2) txList = append(txList, tx1, tx2, tx3, tx4, tx5) - batch := mpi.ProcessTransactions(txList, false) + batch := mpi.ProcessTransactions(txList, false, true) ast.Nil(batch) ast.Equal(4, mpi.txStore.priorityIndex.size()) ast.Equal(1, mpi.txStore.parkingLotIndex.size()) @@ -150,7 +150,7 @@ func TestProcessTransactions(t *testing.T) { tx7 := constructTx(uint64(5), &privKey2) txList = make([]*pb.Transaction, 0) txList = append(txList, tx6, tx7) - batch = mpi.ProcessTransactions(txList, true) + batch = mpi.ProcessTransactions(txList, true, true) ast.Equal(4, len(batch.TxList)) ast.Equal(uint64(2), batch.Height) ast.Equal(uint64(1), mpi.txStore.priorityNonBatchSize) @@ -175,7 +175,7 @@ func TestForward(t *testing.T) { tx4 := constructTx(uint64(4), &privKey1) tx5 := constructTx(uint64(6), &privKey1) txList = append(txList, tx1, tx2, tx3, tx4, tx5) - batch := mpi.ProcessTransactions(txList, false) + batch := mpi.ProcessTransactions(txList, false, true) ast.Nil(batch) list := mpi.txStore.allTxs[account1.String()] ast.Equal(5, list.index.size()) @@ -189,7 +189,7 @@ func TestForward(t *testing.T) { ast.Equal(uint64(2), removeList[account1.String()][1].Nonce) } -func TestGetTimeoutTransaction(t *testing.T) { +func TestUnorderedIncomingTxs(t *testing.T) { ast := assert.New(t) mpi, _ := mockMempoolImpl() mpi.batchSize = 5 @@ -205,7 +205,7 @@ func TestGetTimeoutTransaction(t *testing.T) { tx2 := constructTx(i, &privKey2) txList = append(txList, tx1, tx2) } - batch := mpi.ProcessTransactions(txList, true) + batch := mpi.ProcessTransactions(txList, true, true) ast.Nil(batch) // not enough for 5 txs to generate batch ast.Equal(4, mpi.txStore.priorityIndex.size()) ast.Equal(2, mpi.txStore.parkingLotIndex.size()) @@ -221,7 +221,7 @@ func TestGetTimeoutTransaction(t *testing.T) { tx8 := constructTx(uint64(5), &privKey2) txList = make([]*pb.Transaction, 0) txList = append(txList, tx7, tx8) - batch = mpi.ProcessTransactions(txList, true) + batch = mpi.ProcessTransactions(txList, true, true) ast.NotNil(batch) var hashes []types.Hash @@ -271,7 +271,7 @@ func TestGetTimeoutTransaction(t *testing.T) { tx1 := constructTx(i, &privKey2) txList = append(txList, tx1) } - batch = mpi.ProcessTransactions(txList, true) + batch = mpi.ProcessTransactions(txList, true, true) ast.NotNil(batch) ast.Equal(uint64(2), mpi.txStore.priorityNonBatchSize) ast.Equal(7, mpi.txStore.priorityIndex.size()) @@ -283,4 +283,70 @@ func TestGetTimeoutTransaction(t *testing.T) { ast.Equal(uint64(7), mpi.txStore.nonceCache.getPendingNonce(account1.String())) ast.Equal(uint64(3), mpi.txStore.nonceCache.getCommitNonce(account2.String())) ast.Equal(uint64(7), mpi.txStore.nonceCache.getPendingNonce(account2.String())) + +} + +func TestGetTimeoutTransaction(t *testing.T) { + ast := assert.New(t) + mpi, _ := mockMempoolImpl() + mpi.txSliceSize = 3 + + txList := make([]*pb.Transaction, 0) + privKey1 := genPrivKey() + account1, _ := privKey1.PublicKey().Address() + privKey2 := genPrivKey() + account2, _ := privKey2.PublicKey().Address() + nonceArr := []uint64{4, 5} + for _, i := range nonceArr { + tx1 := constructTx(i, &privKey1) + tx2 := constructTx(i, &privKey2) + txList = append(txList, tx1, tx2) + } + batch := mpi.ProcessTransactions(txList, true, true) + ast.Nil(batch) + + // set another incoming list for timeout + time.Sleep(150 * time.Millisecond) + nonceArr = []uint64{1, 2} + for _, i := range nonceArr { + tx1 := constructTx(i, &privKey1) + tx2 := constructTx(i, &privKey2) + txList = append(txList, tx1, tx2) + } + batch = mpi.ProcessTransactions(txList, true, false) + ast.NotNil(batch) + // tx1,tx2 for account1 and account2 will be batched. + // all txs for account1 and account2 will be in priorityIndex. + // And tx4,tx5 for account1 and account2 will be timeout while tx3 will not + ast.Equal(4, mpi.txStore.priorityIndex.size()) + ast.Equal(4, mpi.txStore.parkingLotIndex.size()) + ast.Equal(8, len(mpi.txStore.txHashMap)) + ast.Equal(4, mpi.txStore.allTxs[account1.String()].index.size()) + ast.Equal(4, mpi.txStore.allTxs[account2.String()].index.size()) + ast.Equal(uint64(1), mpi.txStore.nonceCache.getCommitNonce(account1.String())) + ast.Equal(uint64(3), mpi.txStore.nonceCache.getPendingNonce(account1.String())) + ast.Equal(uint64(1), mpi.txStore.nonceCache.getCommitNonce(account2.String())) + ast.Equal(uint64(3), mpi.txStore.nonceCache.getPendingNonce(account2.String())) + + tx1 := constructTx(3, &privKey1) + tx2 := constructTx(3, &privKey2) + txList = append(txList, tx1, tx2) + batch = mpi.ProcessTransactions(txList, true, true) + ast.NotNil(batch) + + // tx4,tx5 should be timeout + timeoutList := mpi.GetTimeoutTransactions(100 * time.Millisecond) + ast.NotNil(timeoutList) + ast.Equal(2, len(timeoutList)) + ast.Equal(3, len(timeoutList[0])) + ast.Equal(1, len(timeoutList[1])) + + // wait another 150 millisecond, tx3 be timeout too. + // though tx1,tx2 has wait a long time, they are not local and they won't be broadcast + time.Sleep(150 * time.Millisecond) + timeoutList = mpi.GetTimeoutTransactions(100 * time.Millisecond) + ast.NotNil(timeoutList) + ast.Equal(2, len(timeoutList)) + ast.Equal(3, len(timeoutList[0])) + ast.Equal(3, len(timeoutList[1])) } diff --git a/pkg/order/mempool/tx_store.go b/pkg/order/mempool/tx_store.go index 07f5ef8..afcf52a 100644 --- a/pkg/order/mempool/tx_store.go +++ b/pkg/order/mempool/tx_store.go @@ -1,7 +1,7 @@ package mempool import ( - "fmt" + "math" "sync" "github.com/google/btree" @@ -15,6 +15,8 @@ type transactionStore struct { allTxs map[string]*txSortedMap // track the commit nonce and pending nonce of each account. nonceCache *nonceCache + // keep track of the latest timestamp of ready txs in ttlIndex + earliestTimestamp int64 // keep track of the livetime of ready txs in priorityIndex ttlIndex *txLiveTimeMap // keeps track of "non-ready" txs (txs that can't be included in next block) @@ -40,7 +42,7 @@ func newTransactionStore() *transactionStore { } } -func (txStore *transactionStore) insertTxs(txs map[string][]*pb.Transaction) map[string]bool { +func (txStore *transactionStore) insertTxs(txs map[string][]*pb.Transaction, isLocal bool) map[string]bool { dirtyAccounts := make(map[string]bool) for account, list := range txs { for _, tx := range list { @@ -59,6 +61,7 @@ func (txStore *transactionStore) insertTxs(txs map[string][]*pb.Transaction) map txItem := &txItem{ account: account, tx: tx, + local: isLocal, } txList.items[tx.Nonce] = txItem txList.index.insertBySortedNonceKey(tx) @@ -80,6 +83,16 @@ func (txStore *transactionStore) getTxByOrderKey(account string, seqNo uint64) * return nil } +func (txStore *transactionStore) updateEarliestTimestamp() { + // find the earliest tx in ttlIndex + earliestTime := int64(math.MaxInt64) + latestItem := txStore.ttlIndex.index.Min() + if latestItem != nil { + earliestTime = latestItem.(*sortedTtlKey).liveTime + } + txStore.earliestTimestamp = earliestTime +} + type txSortedMap struct { items map[uint64]*txItem // map nonce to transaction index *btreeIndex // index for items @@ -193,19 +206,18 @@ func newTxLiveTimeMap() *txLiveTimeMap { func (tlm *txLiveTimeMap) insertByTtlKey(account string, nonce uint64, liveTime int64) { tlm.index.ReplaceOrInsert(&sortedTtlKey{account, nonce, liveTime}) - tlm.items[makeKey(account, nonce)] = liveTime + tlm.items[makeAccountNonceKey(account, nonce)] = liveTime } func (tlm *txLiveTimeMap) removeByTtlKey(txs map[string][]*pb.Transaction) { for account, list := range txs { for _, tx := range list { - liveTime, ok := tlm.items[makeKey(account, tx.Nonce)] + liveTime, ok := tlm.items[makeAccountNonceKey(account, tx.Nonce)] if !ok { - fmt.Printf("ttl key for %s not found\n", account) return } tlm.index.Delete(&sortedTtlKey{account, tx.Nonce, liveTime}) - delete(tlm.items, makeKey(account, tx.Nonce)) + delete(tlm.items, makeAccountNonceKey(account, tx.Nonce)) } } } diff --git a/pkg/order/mempool/types.go b/pkg/order/mempool/types.go index fa696cb..43d44ab 100644 --- a/pkg/order/mempool/types.go +++ b/pkg/order/mempool/types.go @@ -34,6 +34,7 @@ type Config struct { type txItem struct { account string tx *pb.Transaction + local bool } type ChainState struct { diff --git a/pkg/order/solo/node.go b/pkg/order/solo/node.go index 7f025b4..d09cc3d 100644 --- a/pkg/order/solo/node.go +++ b/pkg/order/solo/node.go @@ -179,7 +179,7 @@ func (n *Node) listenReadyBlock() { if !n.batchMgr.IsBatchTimerActive() { n.batchMgr.StartBatchTimer() } - if batch := n.mempool.ProcessTransactions(txSet.TxList, true); batch != nil { + if batch := n.mempool.ProcessTransactions(txSet.TxList, true, true); batch != nil { n.batchMgr.StopBatchTimer() n.proposeC <- batch } diff --git a/pkg/peermgr/mock_peermgr/mock_peermgr.go b/pkg/peermgr/mock_peermgr/mock_peermgr.go index 9ec924e..ab4ff65 100644 --- a/pkg/peermgr/mock_peermgr/mock_peermgr.go +++ b/pkg/peermgr/mock_peermgr/mock_peermgr.go @@ -20,10 +20,6 @@ type MockPeerManager struct { recorder *MockPeerManagerMockRecorder } -func (m *MockPeerManager) CountConnectedPeers() uint64 { - return 0 -} - // MockPeerManagerMockRecorder is the mock recorder for MockPeerManager type MockPeerManagerMockRecorder struct { mock *MockPeerManager From f328d1d6a157f9e794d66153a683741c0292f450 Mon Sep 17 00:00:00 2001 From: Alexader Date: Tue, 5 Jan 2021 15:18:32 +0800 Subject: [PATCH 3/5] fix(order): add rebroadcast timeout non-zero check --- pkg/order/etcdraft/config.go | 6 ++-- pkg/order/etcdraft/node.go | 53 ++++++++++++++++++++---------------- pkg/order/etcdraft/util.go | 5 ++-- 3 files changed, 36 insertions(+), 28 deletions(-) diff --git a/pkg/order/etcdraft/config.go b/pkg/order/etcdraft/config.go index 8229f45..cacee28 100644 --- a/pkg/order/etcdraft/config.go +++ b/pkg/order/etcdraft/config.go @@ -52,10 +52,10 @@ func defaultRaftConfig() raft.Config { } } -func generateEtcdRaftConfig(id uint64, repoRoot string, logger logrus.FieldLogger, ram MemoryStorage) (*raft.Config, time.Duration, time.Duration, error) { +func generateEtcdRaftConfig(id uint64, repoRoot string, logger logrus.FieldLogger, ram MemoryStorage) (*raft.Config, time.Duration, error) { readConfig, err := readConfig(repoRoot) if err != nil { - return &raft.Config{}, 100 * time.Millisecond, 3 * time.Minute, nil + return &raft.Config{}, 100 * time.Millisecond, nil } defaultConfig := defaultRaftConfig() defaultConfig.ID = id @@ -76,7 +76,7 @@ func generateEtcdRaftConfig(id uint64, repoRoot string, logger logrus.FieldLogge defaultConfig.PreVote = readConfig.RAFT.PreVote defaultConfig.CheckQuorum = readConfig.RAFT.CheckQuorum defaultConfig.DisableProposalForwarding = readConfig.RAFT.DisableProposalForwarding - return &defaultConfig, readConfig.RAFT.TickTimeout, readConfig.RAFT.RebroadcastTimeout, nil + return &defaultConfig, readConfig.RAFT.TickTimeout, nil } func generateRaftConfig(repoRoot string) (*RAFTConfig, error) { diff --git a/pkg/order/etcdraft/node.go b/pkg/order/etcdraft/node.go index 71c5905..fda7510 100644 --- a/pkg/order/etcdraft/node.go +++ b/pkg/order/etcdraft/node.go @@ -122,28 +122,36 @@ func NewNode(opts ...order.Option) (order.Order, error) { snapCount = DefaultSnapshotCount } + var rebroadcastTimeout time.Duration + if raftConfig.RAFT.RebroadcastTimeout == 0 { + rebroadcastTimeout = DefaultRebroadcastTimeout + } else { + rebroadcastTimeout = raftConfig.RAFT.RebroadcastTimeout + } + node := &Node{ - id: config.ID, - lastExec: config.Applied, - confChangeC: make(chan raftpb.ConfChange), - commitC: make(chan *pb.CommitEvent, 1024), - errorC: make(chan<- error), - msgC: make(chan []byte), - stateC: make(chan *mempool.ChainState), - proposeC: make(chan *raftproto.RequestBatch), - snapCount: snapCount, - repoRoot: repoRoot, - peerMgr: config.PeerMgr, - txCache: txCache, - batchTimerMgr: batchTimerMgr, - peers: peers, - logger: config.Logger, - getChainMetaFunc: config.GetChainMetaFunc, - storage: dbStorage, - raftStorage: raftStorage, - readyPool: readyPool, - ctx: context.Background(), - mempool: mempoolInst, + id: config.ID, + lastExec: config.Applied, + confChangeC: make(chan raftpb.ConfChange), + commitC: make(chan *pb.CommitEvent, 1024), + errorC: make(chan<- error), + msgC: make(chan []byte), + stateC: make(chan *mempool.ChainState), + proposeC: make(chan *raftproto.RequestBatch), + snapCount: snapCount, + repoRoot: repoRoot, + peerMgr: config.PeerMgr, + txCache: txCache, + batchTimerMgr: batchTimerMgr, + peers: peers, + logger: config.Logger, + getChainMetaFunc: config.GetChainMetaFunc, + storage: dbStorage, + raftStorage: raftStorage, + readyPool: readyPool, + ctx: context.Background(), + mempool: mempoolInst, + rebroadcastTimeout: rebroadcastTimeout, } node.raftStorage.SnapshotCatchUpEntries = node.snapCount @@ -167,7 +175,7 @@ func NewNode(opts ...order.Option) (order.Order, error) { // Start or restart raft node func (n *Node) Start() error { n.blockAppliedIndex.Store(n.lastExec, n.loadAppliedIndex()) - rc, tickTimeout, rebroadcastTimeout, err := generateEtcdRaftConfig(n.id, n.repoRoot, n.logger, n.raftStorage.ram) + rc, tickTimeout, err := generateEtcdRaftConfig(n.id, n.repoRoot, n.logger, n.raftStorage.ram) if err != nil { return fmt.Errorf("generate raft config: %w", err) } @@ -177,7 +185,6 @@ func (n *Node) Start() error { n.node = raft.StartNode(rc, n.peers) } n.tickTimeout = tickTimeout - n.rebroadcastTimeout = rebroadcastTimeout go n.run() go n.txCache.ListenEvent() diff --git a/pkg/order/etcdraft/util.go b/pkg/order/etcdraft/util.go index 80dfb2c..233c2a1 100644 --- a/pkg/order/etcdraft/util.go +++ b/pkg/order/etcdraft/util.go @@ -14,8 +14,9 @@ import ( ) const ( - DefaultBatchTick = 500 * time.Millisecond - DefaultSnapshotCount = 1000 + DefaultBatchTick = 500 * time.Millisecond + DefaultSnapshotCount = 1000 + DefaultRebroadcastTimeout = 3 * time.Minute ) func generateRaftPeers(config *order.Config) ([]raft.Peer, error) { From 2aed7f6ecdde2433120950821e977edf4d3e81e0 Mon Sep 17 00:00:00 2001 From: Alexader Date: Mon, 18 Jan 2021 20:17:04 +0800 Subject: [PATCH 4/5] refactor(mempool): refactor code according to comments --- config/order.toml | 2 +- pkg/order/etcdraft/config.go | 2 +- pkg/order/etcdraft/node.go | 74 ++++++++++++------------ pkg/order/etcdraft/util.go | 6 +- pkg/order/mempool/btree_index.go | 13 +---- pkg/order/mempool/mempool_impl.go | 45 ++++++-------- pkg/order/mempool/tx_store.go | 10 +++- pkg/peermgr/mock_peermgr/mock_peermgr.go | 67 +++++++++++++++++++++ 8 files changed, 134 insertions(+), 85 deletions(-) diff --git a/config/order.toml b/config/order.toml index 6b151e2..7bf048e 100644 --- a/config/order.toml +++ b/config/order.toml @@ -1,6 +1,6 @@ [raft] batch_timeout = "0.3s" # Block packaging time period. -rebroadcast_timeout = "3m" # Node timeout tick to check if there exist txs to be rebroadcast +check_interval = "3m" # Node check interval to check if there exist txs to be rebroadcast tick_timeout = "0.1s" # TickTimeout is the internal logical clock for the Node by a single tick, Election timeouts and heartbeat timeouts are in units of ticks. election_tick = 10 # ElectionTick is the number of Node.Tick invocations that must pass between elections. heartbeat_tick = 1 # HeartbeatTick is the number of Node.Tick invocations that must pass between heartbeats. diff --git a/pkg/order/etcdraft/config.go b/pkg/order/etcdraft/config.go index cacee28..3ed5f20 100644 --- a/pkg/order/etcdraft/config.go +++ b/pkg/order/etcdraft/config.go @@ -27,7 +27,7 @@ type SyncerConfig struct { type RAFT struct { BatchTimeout time.Duration `mapstructure:"batch_timeout"` - RebroadcastTimeout time.Duration `mapstructure:"rebroadcast_timeout"` + CheckInterval time.Duration `mapstructure:"check_interval"` TickTimeout time.Duration `mapstructure:"tick_timeout"` ElectionTick int `mapstructure:"election_tick"` HeartbeatTick int `mapstructure:"heartbeat_tick"` diff --git a/pkg/order/etcdraft/node.go b/pkg/order/etcdraft/node.go index fda7510..ab50c67 100644 --- a/pkg/order/etcdraft/node.go +++ b/pkg/order/etcdraft/node.go @@ -39,15 +39,15 @@ type Node struct { txCache *mempool.TxCache // cache the transactions received from api batchTimerMgr *BatchTimer - proposeC chan *raftproto.RequestBatch // proposed ready, input channel - confChangeC <-chan raftpb.ConfChange // proposed cluster config changes - commitC chan *pb.CommitEvent // the hash commit channel - errorC chan<- error // errors from raft session - tickTimeout time.Duration // tick timeout - rebroadcastTimeout time.Duration // rebroadcast timeout - msgC chan []byte // receive messages from remote peer - stateC chan *mempool.ChainState // receive the executed block state - rebroadcastTicker chan *raftproto.TxSlice // receive the executed block state + proposeC chan *raftproto.RequestBatch // proposed ready, input channel + confChangeC <-chan raftpb.ConfChange // proposed cluster config changes + commitC chan *pb.CommitEvent // the hash commit channel + errorC chan<- error // errors from raft session + tickTimeout time.Duration // tick timeout + checkInterval time.Duration // interval for rebroadcast + msgC chan []byte // receive messages from remote peer + stateC chan *mempool.ChainState // receive the executed block state + rebroadcastTicker chan *raftproto.TxSlice // receive the executed block state confState raftpb.ConfState // raft requires ConfState to be persisted within snapshot blockAppliedIndex sync.Map // mapping of block height and apply index in raft log @@ -122,36 +122,36 @@ func NewNode(opts ...order.Option) (order.Order, error) { snapCount = DefaultSnapshotCount } - var rebroadcastTimeout time.Duration - if raftConfig.RAFT.RebroadcastTimeout == 0 { - rebroadcastTimeout = DefaultRebroadcastTimeout + var checkInterval time.Duration + if raftConfig.RAFT.CheckInterval == 0 { + checkInterval = DefaultCheckInterval } else { - rebroadcastTimeout = raftConfig.RAFT.RebroadcastTimeout + checkInterval = raftConfig.RAFT.CheckInterval } node := &Node{ - id: config.ID, - lastExec: config.Applied, - confChangeC: make(chan raftpb.ConfChange), - commitC: make(chan *pb.CommitEvent, 1024), - errorC: make(chan<- error), - msgC: make(chan []byte), - stateC: make(chan *mempool.ChainState), - proposeC: make(chan *raftproto.RequestBatch), - snapCount: snapCount, - repoRoot: repoRoot, - peerMgr: config.PeerMgr, - txCache: txCache, - batchTimerMgr: batchTimerMgr, - peers: peers, - logger: config.Logger, - getChainMetaFunc: config.GetChainMetaFunc, - storage: dbStorage, - raftStorage: raftStorage, - readyPool: readyPool, - ctx: context.Background(), - mempool: mempoolInst, - rebroadcastTimeout: rebroadcastTimeout, + id: config.ID, + lastExec: config.Applied, + confChangeC: make(chan raftpb.ConfChange), + commitC: make(chan *pb.CommitEvent, 1024), + errorC: make(chan<- error), + msgC: make(chan []byte), + stateC: make(chan *mempool.ChainState), + proposeC: make(chan *raftproto.RequestBatch), + snapCount: snapCount, + repoRoot: repoRoot, + peerMgr: config.PeerMgr, + txCache: txCache, + batchTimerMgr: batchTimerMgr, + peers: peers, + logger: config.Logger, + getChainMetaFunc: config.GetChainMetaFunc, + storage: dbStorage, + raftStorage: raftStorage, + readyPool: readyPool, + ctx: context.Background(), + mempool: mempoolInst, + checkInterval: checkInterval, } node.raftStorage.SnapshotCatchUpEntries = node.snapCount @@ -260,7 +260,7 @@ func (n *Node) run() { n.snapshotIndex = snap.Metadata.Index n.appliedIndex = snap.Metadata.Index ticker := time.NewTicker(n.tickTimeout) - rebroadcastTicker := time.NewTicker(n.rebroadcastTimeout) + rebroadcastTicker := time.NewTicker(n.checkInterval) defer ticker.Stop() defer rebroadcastTicker.Stop() @@ -328,7 +328,7 @@ func (n *Node) run() { case <-rebroadcastTicker.C: // check periodically if there are long-pending txs in mempool - rebroadcastTxs := n.mempool.GetTimeoutTransactions(n.rebroadcastTimeout) + rebroadcastTxs := n.mempool.GetTimeoutTransactions(n.checkInterval) for _, txSlice := range rebroadcastTxs { txSet := &raftproto.TxSlice{TxList: txSlice} data, err := txSet.Marshal() diff --git a/pkg/order/etcdraft/util.go b/pkg/order/etcdraft/util.go index 233c2a1..de6f26a 100644 --- a/pkg/order/etcdraft/util.go +++ b/pkg/order/etcdraft/util.go @@ -14,9 +14,9 @@ import ( ) const ( - DefaultBatchTick = 500 * time.Millisecond - DefaultSnapshotCount = 1000 - DefaultRebroadcastTimeout = 3 * time.Minute + DefaultBatchTick = 500 * time.Millisecond + DefaultSnapshotCount = 1000 + DefaultCheckInterval = 3 * time.Minute ) func generateRaftPeers(config *order.Config) ([]raft.Peer, error) { diff --git a/pkg/order/mempool/btree_index.go b/pkg/order/mempool/btree_index.go index 1618403..79e3059 100644 --- a/pkg/order/mempool/btree_index.go +++ b/pkg/order/mempool/btree_index.go @@ -24,17 +24,6 @@ func (oik *orderedIndexKey) Less(than btree.Item) bool { return oik.nonce < other.nonce } -type sortedTtlKey struct { - account string - nonce uint64 - liveTime int64 // the latest live time of tx after it is received or rebroadcast -} - -func (stk *sortedTtlKey) Less(than btree.Item) bool { - other := than.(*sortedTtlKey) - return stk.liveTime < other.liveTime -} - type sortedNonceKey struct { nonce uint64 } @@ -48,7 +37,7 @@ func (snk *sortedNonceKey) Less(item btree.Item) bool { type orderedTimeoutKey struct { account string nonce uint64 - timestamp int64 // the timestamp of tx when it is received + timestamp int64 // the timestamp of index key created } func (otk *orderedTimeoutKey) Less(than btree.Item) bool { diff --git a/pkg/order/mempool/mempool_impl.go b/pkg/order/mempool/mempool_impl.go index dcbeb7c..755c5c2 100644 --- a/pkg/order/mempool/mempool_impl.go +++ b/pkg/order/mempool/mempool_impl.go @@ -101,14 +101,10 @@ func (mpi *mempoolImpl) processDirtyAccount(dirtyAccounts map[string]bool) { readyTxs, nonReadyTxs, nextDemandNonce := list.filterReady(pendingNonce) mpi.txStore.nonceCache.setPendingNonce(account, nextDemandNonce) - // insert ready txs into priorityIndex and set ttlIndex for these txs. + // insert ready txs into priorityIndex. for _, tx := range readyTxs { if !mpi.txStore.priorityIndex.data.Has(makeTimeoutKey(account, tx)) { mpi.txStore.priorityIndex.insertByTimeoutKey(account, tx) - if list.items[tx.Nonce].local { - // no need to rebroadcast tx from other nodes to reduce network overhead - mpi.txStore.ttlIndex.insertByTtlKey(account, tx.Nonce, tx.Timestamp) - } } } mpi.txStore.updateEarliestTimestamp() @@ -281,67 +277,60 @@ func (mpi *mempoolImpl) processCommitTransactions(state *ChainState) { func (mpi *mempoolImpl) GetTimeoutTransactions(rebroadcastDuration time.Duration) [][]*pb.Transaction { // all the tx whose live time is less than lowBoundTime should be rebroadcast - mpi.logger.Debugf("------- Start gathering timeout txs, ttl index len is %d -----------", mpi.txStore.ttlIndex.index.Len()) + mpi.logger.Debugf("Start gathering timeout txs, ttl index len is %d", mpi.txStore.ttlIndex.index.Len()) currentTime := time.Now().UnixNano() if currentTime < mpi.txStore.earliestTimestamp+rebroadcastDuration.Nanoseconds() { // if the latest incoming tx has not exceeded the timeout limit, then none will be timeout return [][]*pb.Transaction{} } - txList := make([]*pb.Transaction, 0) - timeoutItems := make([]*sortedTtlKey, 0) + timeoutItems := make([]*orderedTimeoutKey, 0) mpi.txStore.ttlIndex.index.Ascend(func(i btree.Item) bool { - item := i.(*sortedTtlKey) - if item.liveTime > math.MaxInt64 { + item := i.(*orderedTimeoutKey) + if item.timestamp > math.MaxInt64 { // TODO(tyx): if this tx has rebroadcast many times and exceeded a final limit, // it is expired and will be removed from mempool return true } // if this tx has not exceeded the rebroadcast duration, break iteration - timeoutTime := item.liveTime + rebroadcastDuration.Nanoseconds() - txMap, ok := mpi.txStore.allTxs[item.account] + timeoutTime := item.timestamp + rebroadcastDuration.Nanoseconds() + _, ok := mpi.txStore.allTxs[item.account] if !ok || currentTime < timeoutTime { return false } - tx := txMap.items[item.nonce].tx - txList = append(txList, tx) timeoutItems = append(timeoutItems, item) return true }) - if len(txList) == 0 { - return [][]*pb.Transaction{} - } for _, item := range timeoutItems { // update the liveTime of timeout txs - item.liveTime = currentTime + item.timestamp = currentTime mpi.txStore.ttlIndex.items[makeAccountNonceKey(item.account, item.nonce)] = currentTime } // shard txList into fixed size in case txList is too large to broadcast one time - return shardTxList(txList, mpi.txSliceSize) + return mpi.shardTxList(timeoutItems, mpi.txSliceSize) } -func shardTxList(txs []*pb.Transaction, batchLen uint64) [][]*pb.Transaction { +func (mpi *mempoolImpl) shardTxList(timeoutItems []*orderedTimeoutKey, batchLen uint64) [][]*pb.Transaction { begin := uint64(0) - end := uint64(len(txs)) - 1 - totalLen := uint64(len(txs)) + end := uint64(len(timeoutItems)) - 1 + totalLen := uint64(len(timeoutItems)) - // shape txs to fixed size in case totalLen is too large + // shape timeout txs to batch size in case totalLen is too large batchNums := totalLen / batchLen if totalLen%batchLen != 0 { batchNums++ } shardedLists := make([][]*pb.Transaction, 0, batchNums) - for i := uint64(0); i <= batchNums; i++ { + for i := uint64(0); i < batchNums; i++ { actualLen := batchLen if end-begin+1 < batchLen { actualLen = end - begin + 1 } - if actualLen == 0 { - continue - } + shardedList := make([]*pb.Transaction, actualLen) for j := uint64(0); j < batchLen && begin <= end; j++ { - shardedList[j] = txs[begin] + txMap, _ := mpi.txStore.allTxs[timeoutItems[begin].account] + shardedList[j] = txMap.items[timeoutItems[begin].nonce].tx begin++ } shardedLists = append(shardedLists, shardedList) diff --git a/pkg/order/mempool/tx_store.go b/pkg/order/mempool/tx_store.go index afcf52a..6dde2bd 100644 --- a/pkg/order/mempool/tx_store.go +++ b/pkg/order/mempool/tx_store.go @@ -65,6 +65,10 @@ func (txStore *transactionStore) insertTxs(txs map[string][]*pb.Transaction, isL } txList.items[tx.Nonce] = txItem txList.index.insertBySortedNonceKey(tx) + if isLocal { + // no need to rebroadcast tx from other nodes to reduce network overhead + txStore.ttlIndex.insertByTtlKey(account, tx.Nonce, tx.Timestamp) + } } dirtyAccounts[account] = true } @@ -88,7 +92,7 @@ func (txStore *transactionStore) updateEarliestTimestamp() { earliestTime := int64(math.MaxInt64) latestItem := txStore.ttlIndex.index.Min() if latestItem != nil { - earliestTime = latestItem.(*sortedTtlKey).liveTime + earliestTime = latestItem.(*orderedTimeoutKey).timestamp } txStore.earliestTimestamp = earliestTime } @@ -205,7 +209,7 @@ func newTxLiveTimeMap() *txLiveTimeMap { } func (tlm *txLiveTimeMap) insertByTtlKey(account string, nonce uint64, liveTime int64) { - tlm.index.ReplaceOrInsert(&sortedTtlKey{account, nonce, liveTime}) + tlm.index.ReplaceOrInsert(&orderedTimeoutKey{account, nonce, liveTime}) tlm.items[makeAccountNonceKey(account, nonce)] = liveTime } @@ -216,7 +220,7 @@ func (tlm *txLiveTimeMap) removeByTtlKey(txs map[string][]*pb.Transaction) { if !ok { return } - tlm.index.Delete(&sortedTtlKey{account, tx.Nonce, liveTime}) + tlm.index.Delete(&orderedTimeoutKey{account, tx.Nonce, liveTime}) delete(tlm.items, makeAccountNonceKey(account, tx.Nonce)) } } diff --git a/pkg/peermgr/mock_peermgr/mock_peermgr.go b/pkg/peermgr/mock_peermgr/mock_peermgr.go index ab4ff65..c08f90b 100644 --- a/pkg/peermgr/mock_peermgr/mock_peermgr.go +++ b/pkg/peermgr/mock_peermgr/mock_peermgr.go @@ -10,6 +10,7 @@ import ( peer "github.com/libp2p/go-libp2p-core/peer" pb "github.com/meshplus/bitxhub-model/pb" events "github.com/meshplus/bitxhub/internal/model/events" + peermgr "github.com/meshplus/bitxhub/pkg/peermgr" network "github.com/meshplus/go-lightp2p" reflect "reflect" ) @@ -227,3 +228,69 @@ func (mr *MockPeerManagerMockRecorder) Disconnect(vpInfos interface{}) *gomock.C mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Disconnect", reflect.TypeOf((*MockPeerManager)(nil).Disconnect), vpInfos) } + +// PierManager mocks base method +func (m *MockPeerManager) PierManager() peermgr.PierManager { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "PierManager") + ret0, _ := ret[0].(peermgr.PierManager) + return ret0 +} + +// PierManager indicates an expected call of PierManager +func (mr *MockPeerManagerMockRecorder) PierManager() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PierManager", reflect.TypeOf((*MockPeerManager)(nil).PierManager)) +} + +// MockPierManager is a mock of PierManager interface +type MockPierManager struct { + ctrl *gomock.Controller + recorder *MockPierManagerMockRecorder +} + +// MockPierManagerMockRecorder is the mock recorder for MockPierManager +type MockPierManagerMockRecorder struct { + mock *MockPierManager +} + +// NewMockPierManager creates a new mock instance +func NewMockPierManager(ctrl *gomock.Controller) *MockPierManager { + mock := &MockPierManager{ctrl: ctrl} + mock.recorder = &MockPierManagerMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use +func (m *MockPierManager) EXPECT() *MockPierManagerMockRecorder { + return m.recorder +} + +// Piers mocks base method +func (m *MockPierManager) Piers() *peermgr.Piers { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Piers") + ret0, _ := ret[0].(*peermgr.Piers) + return ret0 +} + +// Piers indicates an expected call of Piers +func (mr *MockPierManagerMockRecorder) Piers() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Piers", reflect.TypeOf((*MockPierManager)(nil).Piers)) +} + +// AskPierMaster mocks base method +func (m *MockPierManager) AskPierMaster(arg0 string) (bool, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "AskPierMaster", arg0) + ret0, _ := ret[0].(bool) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// AskPierMaster indicates an expected call of AskPierMaster +func (mr *MockPierManagerMockRecorder) AskPierMaster(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AskPierMaster", reflect.TypeOf((*MockPierManager)(nil).AskPierMaster), arg0) +} From 1c98689f2d917c2ee2fac65f18bb7cb9e4c8b20d Mon Sep 17 00:00:00 2001 From: Alexader Date: Tue, 19 Jan 2021 11:24:31 +0800 Subject: [PATCH 5/5] fix(mempool): old ttl key not released in txLiveTimeMap bug --- pkg/order/mempool/mempool_impl.go | 7 +------ pkg/order/mempool/mempool_test.go | 31 ++++++++++++++++++++++++------- pkg/order/mempool/tx_store.go | 8 +++++++- 3 files changed, 32 insertions(+), 14 deletions(-) diff --git a/pkg/order/mempool/mempool_impl.go b/pkg/order/mempool/mempool_impl.go index 755c5c2..fbbb7bb 100644 --- a/pkg/order/mempool/mempool_impl.go +++ b/pkg/order/mempool/mempool_impl.go @@ -146,10 +146,6 @@ func (mpi *mempoolImpl) generateBlock() (*raftproto.RequestBatch, error) { if txSeq >= 1 { _, seenPrevious = mpi.txStore.batchedTxs[orderedIndexKey{account: tx.account, nonce: txSeq - 1}] } - if txSeq == 3 { - mpi.logger.Infof("seenPrevious %v and commitNonce is %d", seenPrevious, commitNonce) - mpi.logger.Infof("batched txs is %v", mpi.txStore.batchedTxs) - } // include transaction if it's "next" for given account or // we've already sent its ancestor to Consensus if seenPrevious || (txSeq == commitNonce) { @@ -303,8 +299,7 @@ func (mpi *mempoolImpl) GetTimeoutTransactions(rebroadcastDuration time.Duration }) for _, item := range timeoutItems { // update the liveTime of timeout txs - item.timestamp = currentTime - mpi.txStore.ttlIndex.items[makeAccountNonceKey(item.account, item.nonce)] = currentTime + mpi.txStore.ttlIndex.updateByTtlKey(item, currentTime) } // shard txList into fixed size in case txList is too large to broadcast one time return mpi.shardTxList(timeoutItems, mpi.txSliceSize) diff --git a/pkg/order/mempool/mempool_test.go b/pkg/order/mempool/mempool_test.go index b1993fc..ef512bd 100644 --- a/pkg/order/mempool/mempool_test.go +++ b/pkg/order/mempool/mempool_test.go @@ -1,7 +1,7 @@ package mempool import ( - "fmt" + "math" "testing" "time" @@ -19,12 +19,6 @@ func TestGetBlock(t *testing.T) { tx2 := constructTx(uint64(2), &privKey1) tx3 := constructTx(uint64(2), &privKey2) tx4 := constructTx(uint64(4), &privKey2) - from1, err := privKey1.PublicKey().Address() - ast.Nil(err) - fmt.Printf("from 1 is %s\n", from1) - from2, err := privKey2.PublicKey().Address() - ast.Nil(err) - fmt.Printf("from 2 is %s\n", from2) var txList []*pb.Transaction txList = append(txList, tx1, tx2, tx3, tx4) // mock follower @@ -290,6 +284,7 @@ func TestGetTimeoutTransaction(t *testing.T) { ast := assert.New(t) mpi, _ := mockMempoolImpl() mpi.txSliceSize = 3 + allTxHashes := make([]*types.Hash, 0) txList := make([]*pb.Transaction, 0) privKey1 := genPrivKey() @@ -301,6 +296,7 @@ func TestGetTimeoutTransaction(t *testing.T) { tx1 := constructTx(i, &privKey1) tx2 := constructTx(i, &privKey2) txList = append(txList, tx1, tx2) + allTxHashes = append(allTxHashes, tx1.Hash(), tx2.Hash()) } batch := mpi.ProcessTransactions(txList, true, true) ast.Nil(batch) @@ -312,6 +308,7 @@ func TestGetTimeoutTransaction(t *testing.T) { tx1 := constructTx(i, &privKey1) tx2 := constructTx(i, &privKey2) txList = append(txList, tx1, tx2) + allTxHashes = append(allTxHashes, tx1.Hash(), tx2.Hash()) } batch = mpi.ProcessTransactions(txList, true, false) ast.NotNil(batch) @@ -331,6 +328,7 @@ func TestGetTimeoutTransaction(t *testing.T) { tx1 := constructTx(3, &privKey1) tx2 := constructTx(3, &privKey2) txList = append(txList, tx1, tx2) + allTxHashes = append(allTxHashes, tx1.Hash(), tx2.Hash()) batch = mpi.ProcessTransactions(txList, true, true) ast.NotNil(batch) @@ -340,6 +338,7 @@ func TestGetTimeoutTransaction(t *testing.T) { ast.Equal(2, len(timeoutList)) ast.Equal(3, len(timeoutList[0])) ast.Equal(1, len(timeoutList[1])) + ast.Equal(6, mpi.txStore.ttlIndex.index.Len()) // wait another 150 millisecond, tx3 be timeout too. // though tx1,tx2 has wait a long time, they are not local and they won't be broadcast @@ -349,4 +348,22 @@ func TestGetTimeoutTransaction(t *testing.T) { ast.Equal(2, len(timeoutList)) ast.Equal(3, len(timeoutList[0])) ast.Equal(3, len(timeoutList[1])) + ast.Equal(6, mpi.txStore.ttlIndex.index.Len()) + + // check if all indices are normally cleaned after commit + ast.Equal(10, len(allTxHashes)) + state := &ChainState{ + TxHashList: allTxHashes, + Height: uint64(2), + } + mpi.CommitTransactions(state) + time.Sleep(100 * time.Millisecond) + ast.Equal(0, mpi.txStore.ttlIndex.index.Len()) + ast.Equal(0, len(mpi.txStore.ttlIndex.items)) + ast.Equal(int64(math.MaxInt64), mpi.txStore.earliestTimestamp) + ast.Equal(0, mpi.txStore.priorityIndex.size()) + ast.Equal(0, mpi.txStore.parkingLotIndex.size()) + ast.Equal(0, len(mpi.txStore.batchedTxs)) + ast.Equal(0, len(mpi.txStore.txHashMap)) + ast.Equal(uint64(0), mpi.txStore.priorityNonBatchSize) } diff --git a/pkg/order/mempool/tx_store.go b/pkg/order/mempool/tx_store.go index 6dde2bd..e9419b7 100644 --- a/pkg/order/mempool/tx_store.go +++ b/pkg/order/mempool/tx_store.go @@ -218,10 +218,16 @@ func (tlm *txLiveTimeMap) removeByTtlKey(txs map[string][]*pb.Transaction) { for _, tx := range list { liveTime, ok := tlm.items[makeAccountNonceKey(account, tx.Nonce)] if !ok { - return + continue } tlm.index.Delete(&orderedTimeoutKey{account, tx.Nonce, liveTime}) delete(tlm.items, makeAccountNonceKey(account, tx.Nonce)) } } } + +func (tlm *txLiveTimeMap) updateByTtlKey(originalKey *orderedTimeoutKey, newTime int64) { + tlm.index.Delete(originalKey) + delete(tlm.items, makeAccountNonceKey(originalKey.account, originalKey.nonce)) + tlm.insertByTtlKey(originalKey.account, originalKey.nonce, newTime) +}