feat:add raft state update

This commit is contained in:
jzhe 2020-12-23 16:29:57 +08:00
parent 65a421c770
commit 93d86402d7
8 changed files with 157 additions and 59 deletions

View File

@ -15,6 +15,9 @@ disable_proposal_forwarding = true # This prevents blocks from being accidentall
tx_slice_size = 10 # How many transactions should the node broadcast at once
tx_slice_timeout = "0.1s" # Node broadcasts transactions if there are cached transactions, although set_size isn't reached yet
[raft.syncer]
sync_blocks = 1 # How many blocks should the behind node fetch at once
snapshot_count = 1000 # How many apply index(blocks) should the node trigger at once
[rbft] #RBFT configurations
set_size = 25 # How many transactions should the node broadcast at once
@ -40,6 +43,8 @@ batch_max_mem = 10000 # The max memory size of one batch
update = "4s" # How long may a update-n take
set = "0.1s" # Node broadcasts transactions if there are cached transactions, although set_size isn't reached yet
[rbft.syncer]
sync_blocks = 1 # How many blocks should the behind node fetch at once
[solo]
batch_timeout = "0.3s" # Block packaging time period.

View File

@ -14,12 +14,17 @@ type RAFTConfig struct {
}
type MempoolConfig struct {
BatchSize uint64 `mapstructure:"batch_size"`
PoolSize uint64 `mapstructure:"pool_size"`
TxSliceSize uint64 `mapstructure:"tx_slice_size"`
BatchSize uint64 `mapstructure:"batch_size"`
PoolSize uint64 `mapstructure:"pool_size"`
TxSliceSize uint64 `mapstructure:"tx_slice_size"`
TxSliceTimeout time.Duration `mapstructure:"tx_slice_timeout"`
}
type SyncerConfig struct {
SyncBlocks uint64 `mapstructure:"sync_blocks"`
SnapshotCount uint64 `mapstructure:"snapshot_count"`
}
type RAFT struct {
BatchTimeout time.Duration `mapstructure:"batch_timeout"`
TickTimeout time.Duration `mapstructure:"tick_timeout"`
@ -31,6 +36,7 @@ type RAFT struct {
PreVote bool `mapstructure:"pre_vote"`
DisableProposalForwarding bool `mapstructure:"disable_proposal_forwarding"`
MempoolConfig MempoolConfig `mapstructure:"mempool"`
SyncerConfig SyncerConfig `mapstructure:"syncer"`
}
func defaultRaftConfig() raft.Config {

View File

@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"github.com/meshplus/bitxhub-kit/log"
"path/filepath"
"sync"
"sync/atomic"
@ -17,12 +18,11 @@ import (
"github.com/meshplus/bitxhub/pkg/order"
raftproto "github.com/meshplus/bitxhub/pkg/order/etcdraft/proto"
"github.com/meshplus/bitxhub/pkg/order/mempool"
"github.com/meshplus/bitxhub/pkg/order/syncer"
"github.com/meshplus/bitxhub/pkg/peermgr"
"github.com/sirupsen/logrus"
)
var defaultSnapshotCount uint64 = 10000
type Node struct {
id uint64 // raft id
leader uint64 // leader id
@ -32,6 +32,7 @@ type Node struct {
node raft.Node // raft node
peerMgr peermgr.PeerManager // network manager
peers []raft.Peer // raft peers
syncer syncer.Syncer // state syncer
raftStorage *RaftStorage // the raft backend storage system
storage storage.Storage // db
mempool mempool.MemPool // transaction pool
@ -55,6 +56,7 @@ type Node struct {
lastExec uint64 // the index of the last-applied block
readyPool *sync.Pool // ready pool, avoiding memory growth fast
justElected bool // track new leader status
getChainMetaFunc func() *pb.ChainMeta // current chain meta
ctx context.Context // context
haltC chan struct{} // exit signal
}
@ -112,31 +114,51 @@ func NewNode(opts ...order.Option) (order.Order, error) {
readyPool := &sync.Pool{New: func() interface{} {
return new(raftproto.RequestBatch)
}}
node := &Node{
id: config.ID,
lastExec: config.Applied,
confChangeC: make(chan raftpb.ConfChange),
commitC: make(chan *pb.CommitEvent, 1024),
errorC: make(chan<- error),
msgC: make(chan []byte),
stateC: make(chan *mempool.ChainState),
proposeC: make(chan *raftproto.RequestBatch),
repoRoot: repoRoot,
snapCount: defaultSnapshotCount,
peerMgr: config.PeerMgr,
txCache: txCache,
batchTimerMgr: batchTimerMgr,
peers: peers,
logger: config.Logger,
storage: dbStorage,
raftStorage: raftStorage,
readyPool: readyPool,
ctx: context.Background(),
mempool: mempoolInst,
snapCount := raftConfig.RAFT.SyncerConfig.SnapshotCount
if snapCount == 0 {
snapCount = DefaultSnapshotCount
}
node := &Node{
id: config.ID,
lastExec: config.Applied,
confChangeC: make(chan raftpb.ConfChange),
commitC: make(chan *pb.CommitEvent, 1024),
errorC: make(chan<- error),
msgC: make(chan []byte),
stateC: make(chan *mempool.ChainState),
proposeC: make(chan *raftproto.RequestBatch),
snapCount: snapCount,
repoRoot: repoRoot,
peerMgr: config.PeerMgr,
txCache: txCache,
batchTimerMgr: batchTimerMgr,
peers: peers,
logger: config.Logger,
getChainMetaFunc: config.GetChainMetaFunc,
storage: dbStorage,
raftStorage: raftStorage,
readyPool: readyPool,
ctx: context.Background(),
mempool: mempoolInst,
}
node.raftStorage.SnapshotCatchUpEntries = node.snapCount
otherPeers := node.peerMgr.OtherPeers()
peerIds := make([]uint64, 0, len(otherPeers))
for id, _ := range otherPeers {
peerIds = append(peerIds, id)
}
stateSyncer, err := syncer.New(raftConfig.RAFT.SyncerConfig.SyncBlocks, config.PeerMgr, node.Quorum(), peerIds, log.NewWithModule("syncer"))
if err != nil {
return nil, fmt.Errorf("new state syncer error:%s", err.Error())
}
node.syncer = stateSyncer
node.logger.Infof("Raft localID = %d", node.id)
node.logger.Infof("Raft lastExec = %d ", node.lastExec)
node.logger.Infof("Raft defaultSnapshotCount = %d", node.snapCount)
node.logger.Infof("Raft snapshotCount = %d", node.snapCount)
return node, nil
}
@ -153,7 +175,6 @@ func (n *Node) Start() error {
n.node = raft.StartNode(rc, n.peers)
}
n.tickTimeout = tickTimeout
go n.run()
go n.txCache.ListenEvent()
n.logger.Info("Consensus module started")
@ -317,6 +338,9 @@ func (n *Node) run() {
n.logger.Errorf("failed to persist etcd/raft data: %s", err)
}
if !raft.IsEmptySnap(rd.Snapshot) {
n.recoverFromSnapshot()
}
if rd.SoftState != nil {
newLeader := atomic.LoadUint64(&rd.SoftState.Lead)
if newLeader != n.leader {
@ -484,28 +508,19 @@ func (n *Node) mint(requestBatch *raftproto.RequestBatch) {
//Determines whether the current apply index triggers a snapshot
func (n *Node) maybeTriggerSnapshot() {
if n.appliedIndex-n.snapshotIndex <= n.snapCount {
if n.appliedIndex-n.snapshotIndex < n.snapCount {
return
}
data := n.raftStorage.Snapshot().Data
n.logger.Infof("Start snapshot [applied index: %d | last snapshot index: %d]", n.appliedIndex, n.snapshotIndex)
snap, err := n.raftStorage.ram.CreateSnapshot(n.appliedIndex, &n.confState, data)
data, err := n.getSnapshot()
if err != nil {
panic(err)
n.logger.Error(err)
return
}
if err := n.raftStorage.saveSnap(snap); err != nil {
panic(err)
err = n.raftStorage.TakeSnapshot(n.appliedIndex, n.confState, data)
if err != nil {
n.logger.Error(err)
return
}
compactIndex := uint64(1)
if n.appliedIndex > n.raftStorage.SnapshotCatchUpEntries {
compactIndex = n.appliedIndex - n.raftStorage.SnapshotCatchUpEntries
}
if err := n.raftStorage.ram.Compact(compactIndex); err != nil {
panic(err)
}
n.logger.Infof("compacted log at index %d", compactIndex)
n.snapshotIndex = n.appliedIndex
}

View File

@ -52,7 +52,9 @@ func TestNode_Start(t *testing.T) {
mockPeermgr := mock_peermgr.NewMockPeerManager(mockCtl)
peers := make(map[uint64]*pb.VpInfo)
otherPeers := make(map[uint64]*peer.AddrInfo, 5)
mockPeermgr.EXPECT().Peers().Return(peers).AnyTimes()
mockPeermgr.EXPECT().OtherPeers().Return(otherPeers).AnyTimes()
mockPeermgr.EXPECT().Broadcast(gomock.Any()).AnyTimes()
order, err := NewNode(

View File

@ -123,7 +123,6 @@ func CreateStorage(
walDir: walDir,
snapDir: snapDir,
db: db,
SnapshotCatchUpEntries: 4,
snapshotIndex: ListSnapshots(lg, snapDir),
}, db, nil
}

View File

@ -10,10 +10,12 @@ import (
"github.com/meshplus/bitxhub-model/pb"
"github.com/meshplus/bitxhub/pkg/order"
raftproto "github.com/meshplus/bitxhub/pkg/order/etcdraft/proto"
"github.com/sirupsen/logrus"
)
const (
DefaultBatchTick = 500 * time.Millisecond
DefaultSnapshotCount = 1000
)
func generateRaftPeers(config *order.Config) ([]raft.Peer, error) {
@ -109,4 +111,62 @@ func msgToConsensusPbMsg(data []byte, tyr raftproto.RaftMessage_Type, replicaID
Data: cmData,
}
return msg
}
}
func (n *Node) getSnapshot() ([]byte, error) {
cm := pb.ChainMeta{
Height: n.lastExec,
}
return cm.Marshal()
}
func (n *Node) recoverFromSnapshot() {
snapshot, err := n.raftStorage.snap.Load()
if err != nil {
n.logger.Error(err)
return
}
targetChainMeta := &pb.ChainMeta{}
err = targetChainMeta.Unmarshal(snapshot.Data)
if err != nil {
n.logger.Error(err)
return
}
syncBlocks := func() {
chainMeta := n.getChainMetaFunc()
n.logger.WithFields(logrus.Fields{
"target": targetChainMeta.Height,
"current": chainMeta.Height,
"current_hash": chainMeta.BlockHash.String(),
}).Info("State Update")
blockCh := make(chan *pb.Block, 1024)
go n.syncer.SyncCFTBlocks(chainMeta.Height+1, targetChainMeta.Height, blockCh)
for block := range blockCh {
// indicates that the synchronization blocks function has been completed
if block == nil {
break
}
if block.Height() == n.lastExec+1 {
localList := make([]bool, len(block.Transactions))
for i := 0; i < len(block.Transactions); i++ {
localList[i] = false
}
executeEvent := &pb.CommitEvent{
Block: block,
LocalList: localList,
}
n.commitC <- executeEvent
n.lastExec = block.Height()
}
}
}
syncBlocks()
if n.lastExec != targetChainMeta.Height {
n.logger.Warnf("The lastExec is %d, but not equal the target block height %d", n.lastExec, targetChainMeta.Height)
syncBlocks()
}
n.appliedIndex = snapshot.Metadata.Index
n.snapshotIndex = snapshot.Metadata.Index
}

View File

@ -16,8 +16,10 @@ import (
var _ Syncer = (*StateSyncer)(nil)
const defaultBlockFetch = 5
type StateSyncer struct {
checkpoint uint64 // check point
blockFetch uint64 // amount of blocks to be fetched per retrieval request
peerMgr peermgr.PeerManager // network manager
badPeers *sync.Map // peer node set who return bad block
quorum uint64 // quorum node numbers
@ -30,18 +32,15 @@ type rangeHeight struct {
end uint64
}
func New(checkpoint uint64, peerMgr peermgr.PeerManager, quorum uint64, peerIds []uint64, logger logrus.FieldLogger) (*StateSyncer, error) {
if checkpoint == 0 {
return nil, fmt.Errorf("checkpoint not be 0")
func New(blockFetch uint64, peerMgr peermgr.PeerManager, quorum uint64, peerIds []uint64, logger logrus.FieldLogger) (*StateSyncer, error) {
if blockFetch == 0 {
blockFetch = defaultBlockFetch
}
if quorum <= 0 {
return nil, fmt.Errorf("the vp nodes' quorum must be positive")
}
if len(peerIds) < int(quorum) {
return nil, fmt.Errorf("the peers num must be gather than quorum")
}
return &StateSyncer{
checkpoint: checkpoint,
blockFetch: blockFetch,
peerMgr: peerMgr,
logger: logger,
quorum: quorum,
@ -65,7 +64,6 @@ func (s *StateSyncer) SyncCFTBlocks(begin, end uint64, blockCh chan *pb.Block) e
return err
}
s.logger.WithFields(logrus.Fields{
"begin": rangeTmp.begin,
"end": rangeTmp.end,
@ -150,7 +148,6 @@ func (s *StateSyncer) syncQuorumRangeBlockHeaders(rangeHeight *rangeHeight, pare
blockHeadersM[blockHash.String()] = headers
}
for _, id := range s.peerIds {
fetchAndVerifyBlockHeaders(id)
@ -229,11 +226,11 @@ func (s *StateSyncer) calcRangeHeight(begin, end uint64) ([]*rangeHeight, error)
if begin > end {
return nil, fmt.Errorf("the end height:%d is less than the start height:%d", end, begin)
}
startNo := begin / s.checkpoint
startNo := begin / s.blockFetch
rangeHeights := make([]*rangeHeight, 0)
for ; begin <= end; {
rangeBegin := begin
rangeEnd := (startNo + 1) * s.checkpoint
rangeEnd := (startNo + 1) * s.blockFetch
if rangeEnd > end {
rangeEnd = end
}

View File

@ -126,6 +126,20 @@ func (mr *MockPeerManagerMockRecorder) Broadcast(arg0 interface{}) *gomock.Call
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Broadcast", reflect.TypeOf((*MockPeerManager)(nil).Broadcast), arg0)
}
// CountConnectedPeers mocks base method
func (m *MockPeerManager) CountConnectedPeers() uint64 {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "CountConnectedPeers")
ret0, _ := ret[0].(uint64)
return ret0
}
// CountConnectedPeers indicates an expected call of CountConnectedPeers
func (mr *MockPeerManagerMockRecorder) CountConnectedPeers() *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CountConnectedPeers", reflect.TypeOf((*MockPeerManager)(nil).CountConnectedPeers))
}
// Peers mocks base method
func (m *MockPeerManager) Peers() map[uint64]*pb.VpInfo {
m.ctrl.T.Helper()