feat(solo): add mempool for solo plugin

1. add mempool module to solo plugin and its unit test
2. add solo related starting scripts and config files
This commit is contained in:
Alexader 2020-11-18 22:31:51 +08:00
parent 4c961bf3b2
commit 05f184d112
4 changed files with 157 additions and 75 deletions

View File

@ -79,4 +79,8 @@ linter:
cluster:install${TAGS}
@cd scripts && bash cluster.sh
## make solo: Run one node in solo mode
solo:install${TAGS}
@cd scripts && bash solo.sh
.PHONY: tester build

31
pkg/order/solo/config.go Normal file
View File

@ -0,0 +1,31 @@
package solo
import (
"time"
)
type RAFTConfig struct {
RAFT RAFT
}
type MempoolConfig struct {
BatchSize uint64 `mapstructure:"batch_size"`
PoolSize uint64 `mapstructure:"pool_size"`
TxSliceSize uint64 `mapstructure:"tx_slice_size"`
BatchTick time.Duration `mapstructure:"batch_tick"`
FetchTimeout time.Duration `mapstructure:"fetch_timeout"`
TxSliceTimeout time.Duration `mapstructure:"tx_slice_timeout"`
}
type RAFT struct {
TickTimeout time.Duration `mapstructure:"tick_timeout"`
ElectionTick int `mapstructure:"election_tick"`
HeartbeatTick int `mapstructure:"heartbeat_tick"`
MaxSizePerMsg uint64 `mapstructure:"max_size_per_msg"`
MaxInflightMsgs int `mapstructure:"max_inflight_msgs"`
CheckQuorum bool `mapstructure:"check_quorum"`
PreVote bool `mapstructure:"pre_vote"`
DisableProposalForwarding bool `mapstructure:"disable_proposal_forwarding"`
MempoolConfig MempoolConfig `mapstructure:"mempool"`
}

View File

@ -1,27 +1,36 @@
package solo
import (
"container/list"
"context"
"fmt"
"path/filepath"
"sync"
"testing"
"time"
"github.com/golang/mock/gomock"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/meshplus/bitxhub-kit/storage/leveldb"
"github.com/meshplus/bitxhub-kit/types"
"github.com/meshplus/bitxhub-model/pb"
"github.com/meshplus/bitxhub/pkg/order"
raftproto "github.com/meshplus/bitxhub/pkg/order/etcdraft/proto"
"github.com/meshplus/bitxhub/pkg/order/mempool"
"github.com/meshplus/bitxhub/pkg/peermgr/mock_peermgr"
"github.com/sirupsen/logrus"
"github.com/spf13/viper"
)
type Node struct {
ID uint64
sync.RWMutex
height uint64 // current block height
pendingTxs *list.List // pending tx pool
commitC chan *pb.Block // block channel
logger logrus.FieldLogger // logger
reqLookUp *order.ReqLookUp // bloom filter
getTransactionFunc func(hash *types.Hash) (*pb.Transaction, error)
mempool mempool.MemPool // transaction pool
proposeC chan *raftproto.Ready // proposed listenReadyBlock, input channel
packSize int // maximum number of transaction packages
blockTick time.Duration // block packed period
@ -31,40 +40,28 @@ type Node struct {
}
func (n *Node) Start() error {
go n.execute()
go n.listenReadyBlock()
if err := n.mempool.Start(); err != nil {
return err
}
n.mempool.UpdateLeader(n.ID)
return nil
}
func (n *Node) Stop() {
n.mempool.Stop()
n.cancel()
}
func (n *Node) GetPendingNonceByAccount(account string) uint64 {
// TODO: implement me
return 1
return n.mempool.GetPendingNonceByAccount(account)
}
func (n *Node) Prepare(tx *pb.Transaction) error {
hash := tx.TransactionHash
if ok := n.reqLookUp.LookUp(hash.Bytes()); ok {
if tx, _ := n.getTransactionFunc(hash); tx != nil {
return nil
}
if !n.Ready() {
return nil
}
n.pushBack(tx)
if n.PoolSize() >= n.packSize {
if r := n.ready(); r != nil {
n.commitC <- r
}
}
return nil
}
//Current txpool's size
func (n *Node) PoolSize() int {
n.RLock()
defer n.RUnlock()
return n.pendingTxs.Len()
return n.mempool.RecvTransaction(tx)
}
func (n *Node) Commit() chan *pb.Block {
@ -110,73 +107,100 @@ func NewNode(opts ...order.Option) (order.Order, error) {
return nil, fmt.Errorf("new bloom filter: %w", err)
}
ctx, cancel := context.WithCancel(context.Background())
mockCtl := gomock.NewController(&testing.T{})
peerMgr := mock_peermgr.NewMockPeerManager(mockCtl)
peerMgr.EXPECT().Peers().Return(map[uint64]*peer.AddrInfo{}).AnyTimes()
memConfig, err := generateMempoolConfig(config.RepoRoot)
mempoolConf := &mempool.Config{
ID: config.ID,
ChainHeight: config.Applied,
GetTransactionFunc: config.GetTransactionFunc,
PeerMgr: peerMgr,
Logger: config.Logger,
BatchSize: memConfig.BatchSize,
BatchTick: memConfig.BatchTick,
PoolSize: memConfig.PoolSize,
TxSliceSize: memConfig.TxSliceSize,
FetchTimeout: memConfig.FetchTimeout,
TxSliceTimeout: memConfig.TxSliceTimeout,
}
batchC := make(chan *raftproto.Ready, 10)
mempoolInst := mempool.NewMempool(mempoolConf, storage, batchC)
return &Node{
ID: config.ID,
height: config.Applied,
pendingTxs: list.New(),
commitC: make(chan *pb.Block, 1024),
packSize: 500,
blockTick: 500 * time.Millisecond,
reqLookUp: reqLookUp,
getTransactionFunc: config.GetTransactionFunc,
mempool: mempoolInst,
proposeC: batchC,
logger: config.Logger,
ctx: ctx,
cancel: cancel,
}, nil
}
// Schedule to collect txs to the ready channel
func (n *Node) execute() {
ticker := time.NewTicker(n.blockTick)
defer ticker.Stop()
// Schedule to collect txs to the listenReadyBlock channel
func (n *Node) listenReadyBlock() {
for {
select {
case <-ticker.C:
if r := n.ready(); r != nil {
n.commitC <- r
}
case <-n.ctx.Done():
n.logger.Infoln("Done txpool execute")
n.logger.Info("----- Exit listen ready block loop -----")
return
case proposal := <-n.proposeC:
n.logger.WithFields(logrus.Fields{
"proposal_height": proposal.Height,
"tx_count": len(proposal.TxHashes),
}).Debugf("Receive proposal from mempool")
// collect txs from proposalC
_, txs := n.mempool.GetBlockByHashList(proposal)
n.height++
block := &pb.Block{
BlockHeader: &pb.BlockHeader{
Version: []byte("1.0.0"),
Number: n.height,
Timestamp: time.Now().UnixNano(),
},
Transactions: txs,
}
n.mempool.CommitTransactions(proposal)
n.mempool.IncreaseChainHeight()
n.commitC <- block
}
}
}
func (n *Node) ready() *pb.Block {
n.Lock()
defer n.Unlock()
l := n.pendingTxs.Len()
if l == 0 {
return nil
func generateMempoolConfig(repoRoot string) (*MempoolConfig, error) {
readConfig, err := readConfig(repoRoot)
if err != nil {
return nil, err
}
var size int
if l > n.packSize {
size = n.packSize
} else {
size = l
}
txs := make([]*pb.Transaction, 0, size)
for i := 0; i < size; i++ {
front := n.pendingTxs.Front()
tx := front.Value.(*pb.Transaction)
txs = append(txs, tx)
n.pendingTxs.Remove(front)
}
n.height++
block := &pb.Block{
BlockHeader: &pb.BlockHeader{
Version: []byte("1.0.0"),
Number: n.height,
Timestamp: time.Now().UnixNano(),
},
Transactions: txs,
}
return block
mempoolConf := &MempoolConfig{}
mempoolConf.BatchSize = readConfig.RAFT.MempoolConfig.BatchSize
mempoolConf.PoolSize = readConfig.RAFT.MempoolConfig.PoolSize
mempoolConf.TxSliceSize = readConfig.RAFT.MempoolConfig.TxSliceSize
mempoolConf.BatchTick = readConfig.RAFT.MempoolConfig.BatchTick
mempoolConf.FetchTimeout = readConfig.RAFT.MempoolConfig.FetchTimeout
mempoolConf.TxSliceTimeout = readConfig.RAFT.MempoolConfig.TxSliceTimeout
return mempoolConf, nil
}
func (n *Node) pushBack(value interface{}) *list.Element {
n.Lock()
defer n.Unlock()
return n.pendingTxs.PushBack(value)
func readConfig(repoRoot string) (*RAFTConfig, error) {
v := viper.New()
v.SetConfigFile(filepath.Join(repoRoot, "order.toml"))
v.SetConfigType("toml")
if err := v.ReadInConfig(); err != nil {
return nil, err
}
config := &RAFTConfig{}
if err := v.Unmarshal(config); err != nil {
return nil, err
}
return config, nil
}

View File

@ -2,11 +2,13 @@ package solo
import (
"io/ioutil"
"math/rand"
"os"
"path/filepath"
"testing"
"time"
"github.com/golang/mock/gomock"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/meshplus/bitxhub-kit/crypto"
"github.com/meshplus/bitxhub-kit/crypto/asym"
"github.com/meshplus/bitxhub-kit/log"
@ -14,6 +16,7 @@ import (
"github.com/meshplus/bitxhub-model/pb"
"github.com/meshplus/bitxhub/internal/repo"
"github.com/meshplus/bitxhub/pkg/order"
"github.com/meshplus/bitxhub/pkg/peermgr/mock_peermgr"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
@ -24,11 +27,31 @@ func TestNode_Start(t *testing.T) {
repoRoot, err := ioutil.TempDir("", "node")
defer os.RemoveAll(repoRoot)
assert.Nil(t, err)
// write config file for order module
fileData, err := ioutil.ReadFile("../../../config/order.toml")
require.Nil(t, err)
err = ioutil.WriteFile(filepath.Join(repoRoot, "order.toml"), fileData, 0644)
require.Nil(t, err)
mockCtl := gomock.NewController(t)
mockPeermgr := mock_peermgr.NewMockPeerManager(mockCtl)
peers := make(map[uint64]*peer.AddrInfo)
mockPeermgr.EXPECT().Peers().Return(peers).AnyTimes()
nodes := make(map[uint64]types.Address)
hash := types.NewAddressByStr("000000000000000000000000000000000000000a")
nodes[1] = *hash
order, err := NewNode(
order.WithRepoRoot(repoRoot),
order.WithStoragePath(repo.GetStoragePath(repoRoot, "order")),
order.WithLogger(log.NewWithModule("consensus")),
order.WithApplied(1),
order.WithPeerManager(mockPeermgr),
order.WithID(1),
order.WithNodes(nodes),
order.WithApplied(1),
)
require.Nil(t, err)
@ -42,10 +65,10 @@ func TestNode_Start(t *testing.T) {
require.Nil(t, err)
tx := &pb.Transaction{
From: from,
To: types.NewAddressByStr(to),
From: from,
To: types.NewAddressByStr(to),
Timestamp: time.Now().UnixNano(),
Nonce: uint64(rand.Int63()),
Nonce: 1,
}
tx.TransactionHash = tx.Hash()
err = tx.Sign(privKey)