bitxhub/pkg/order/etcdraft/node.go

669 lines
19 KiB
Go

package etcdraft
import (
"context"
"errors"
"fmt"
"path/filepath"
"sync"
"sync/atomic"
"time"
"github.com/coreos/etcd/raft"
"github.com/coreos/etcd/raft/raftpb"
"github.com/ethereum/go-ethereum/event"
"github.com/meshplus/bitxhub-kit/log"
"github.com/meshplus/bitxhub-kit/storage"
"github.com/meshplus/bitxhub-kit/types"
"github.com/meshplus/bitxhub-model/pb"
"github.com/meshplus/bitxhub/pkg/order"
raftproto "github.com/meshplus/bitxhub/pkg/order/etcdraft/proto"
"github.com/meshplus/bitxhub/pkg/order/mempool"
"github.com/meshplus/bitxhub/pkg/order/syncer"
"github.com/meshplus/bitxhub/pkg/peermgr"
"github.com/sirupsen/logrus"
)
type Node struct {
id uint64 // raft id
leader uint64 // leader id
repoRoot string // project path
logger logrus.FieldLogger // logger
node raft.Node // raft node
peerMgr peermgr.PeerManager // network manager
peers []raft.Peer // raft peers
syncer syncer.Syncer // state syncer
raftStorage *RaftStorage // the raft backend storage system
storage storage.Storage // db
mempool mempool.MemPool // transaction pool
txCache *mempool.TxCache // cache the transactions received from api
batchTimerMgr *BatchTimer
proposeC chan *raftproto.RequestBatch // proposed ready, input channel
confChangeC chan raftpb.ConfChange // proposed cluster config changes
commitC chan *pb.CommitEvent // the hash commit channel
errorC chan<- error // errors from raft session
tickTimeout time.Duration // tick timeout
checkInterval time.Duration // interval for rebroadcast
msgC chan []byte // receive messages from remote peer
stateC chan *mempool.ChainState // receive the executed block state
rebroadcastTicker chan *raftproto.TxSlice // receive the executed block state
confState raftpb.ConfState // raft requires ConfState to be persisted within snapshot
blockAppliedIndex sync.Map // mapping of block height and apply index in raft log
appliedIndex uint64 // current apply index in raft log
snapCount uint64 // snapshot count
snapshotIndex uint64 // current snapshot apply index in raft log
lastIndex uint64 // last apply index in raft log
lastExec uint64 // the index of the last-applied block
readyPool *sync.Pool // ready pool, avoiding memory growth fast
justElected bool // track new leader status
getChainMetaFunc func() *pb.ChainMeta // current chain meta
ctx context.Context // context
haltC chan struct{} // exit signal
}
// NewNode new raft node
func NewNode(opts ...order.Option) (order.Order, error) {
config, err := order.GenerateConfig(opts...)
if err != nil {
return nil, fmt.Errorf("generate config: %w", err)
}
repoRoot := config.RepoRoot
// raft storage directory
walDir := filepath.Join(config.StoragePath, "wal")
snapDir := filepath.Join(config.StoragePath, "snap")
dbDir := filepath.Join(config.StoragePath, "state")
raftStorage, dbStorage, err := CreateStorage(config.Logger, walDir, snapDir, dbDir, raft.NewMemoryStorage())
if err != nil {
return nil, err
}
// generate raft peers
peers, err := generateRaftPeers(config)
if err != nil {
return nil, fmt.Errorf("generate raft peers: %w", err)
}
raftConfig, err := generateRaftConfig(repoRoot)
if err != nil {
return nil, fmt.Errorf("generate raft txpool config: %w", err)
}
mempoolConf := &mempool.Config{
ID: config.ID,
ChainHeight: config.Applied,
Logger: config.Logger,
StoragePath: config.StoragePath,
GetAccountNonce: config.GetAccountNonce,
BatchSize: raftConfig.RAFT.MempoolConfig.BatchSize,
PoolSize: raftConfig.RAFT.MempoolConfig.PoolSize,
TxSliceSize: raftConfig.RAFT.MempoolConfig.TxSliceSize,
TxSliceTimeout: raftConfig.RAFT.MempoolConfig.TxSliceTimeout,
}
mempoolInst, err := mempool.NewMempool(mempoolConf)
if err != nil {
return nil, fmt.Errorf("create mempool instance: %w", err)
}
var batchTimeout time.Duration
if raftConfig.RAFT.BatchTimeout == 0 {
batchTimeout = DefaultBatchTick
} else {
batchTimeout = raftConfig.RAFT.BatchTimeout
}
batchTimerMgr := NewTimer(batchTimeout, config.Logger)
txCache := mempool.NewTxCache(mempoolConf.TxSliceTimeout, mempoolConf.TxSliceSize, config.Logger)
readyPool := &sync.Pool{New: func() interface{} {
return new(raftproto.RequestBatch)
}}
snapCount := raftConfig.RAFT.SyncerConfig.SnapshotCount
if snapCount == 0 {
snapCount = DefaultSnapshotCount
}
var checkInterval time.Duration
if raftConfig.RAFT.CheckInterval == 0 {
checkInterval = DefaultCheckInterval
} else {
checkInterval = raftConfig.RAFT.CheckInterval
}
node := &Node{
id: config.ID,
lastExec: config.Applied,
confChangeC: make(chan raftpb.ConfChange),
commitC: make(chan *pb.CommitEvent, 1024),
errorC: make(chan<- error),
msgC: make(chan []byte),
stateC: make(chan *mempool.ChainState),
proposeC: make(chan *raftproto.RequestBatch),
snapCount: snapCount,
repoRoot: repoRoot,
peerMgr: config.PeerMgr,
txCache: txCache,
batchTimerMgr: batchTimerMgr,
peers: peers,
logger: config.Logger,
getChainMetaFunc: config.GetChainMetaFunc,
storage: dbStorage,
raftStorage: raftStorage,
readyPool: readyPool,
ctx: context.Background(),
mempool: mempoolInst,
checkInterval: checkInterval,
}
node.raftStorage.SnapshotCatchUpEntries = node.snapCount
otherPeers := node.peerMgr.OtherPeers()
peerIds := make([]uint64, 0, len(otherPeers))
for id, _ := range otherPeers {
peerIds = append(peerIds, id)
}
stateSyncer, err := syncer.New(raftConfig.RAFT.SyncerConfig.SyncBlocks, config.PeerMgr, node.Quorum(), peerIds, log.NewWithModule("syncer"))
if err != nil {
return nil, fmt.Errorf("new state syncer error:%s", err.Error())
}
node.syncer = stateSyncer
node.logger.Infof("Raft localID = %d", node.id)
node.logger.Infof("Raft lastExec = %d ", node.lastExec)
node.logger.Infof("Raft snapshotCount = %d", node.snapCount)
return node, nil
}
// Start or restart raft node
func (n *Node) Start() error {
n.blockAppliedIndex.Store(n.lastExec, n.loadAppliedIndex())
rc, tickTimeout, err := generateEtcdRaftConfig(n.id, n.repoRoot, n.logger, n.raftStorage.ram)
if err != nil {
return fmt.Errorf("generate raft config: %w", err)
}
if restart {
n.node = raft.RestartNode(rc)
} else {
n.node = raft.StartNode(rc, n.peers)
}
n.tickTimeout = tickTimeout
go n.run()
go n.txCache.ListenEvent()
n.logger.Info("Consensus module started")
return nil
}
// Stop the raft node
func (n *Node) Stop() {
n.node.Stop()
n.logger.Infof("Consensus stopped")
}
// Add the transaction into txpool and broadcast it to other nodes
func (n *Node) Prepare(tx pb.Transaction) error {
if err := n.Ready(); err != nil {
return err
}
if n.txCache.IsFull() && n.mempool.IsPoolFull() {
return errors.New("transaction cache are full, we will drop this transaction")
}
n.txCache.RecvTxC <- tx
return nil
}
func (n *Node) Commit() chan *pb.CommitEvent {
return n.commitC
}
func (n *Node) ReportState(height uint64, blockHash *types.Hash, txHashList []*types.Hash) {
state := &mempool.ChainState{
Height: height,
BlockHash: blockHash,
TxHashList: txHashList,
}
n.stateC <- state
}
func (n *Node) Quorum() uint64 {
return uint64(len(n.peers)/2 + 1)
}
func (n *Node) Step(msg []byte) error {
n.msgC <- msg
return nil
}
func (n *Node) Ready() error {
hasLeader := n.leader != 0
if !hasLeader {
return errors.New("in leader election status")
}
return nil
}
func (n *Node) GetPendingNonceByAccount(account string) uint64 {
return n.mempool.GetPendingNonceByAccount(account)
}
// DelNode sends a delete vp request by given id.
func (n *Node) DelNode(delID uint64) error {
return nil
}
// SubscribeTxEvent subscribes tx event
func (n *Node) SubscribeTxEvent(events chan<- pb.Transactions) event.Subscription {
return n.mempool.SubscribeTxEvent(events)
}
// main work loop
func (n *Node) run() {
snap, err := n.raftStorage.ram.Snapshot()
if err != nil {
n.logger.Panic(err)
}
n.confState = snap.Metadata.ConfState
n.snapshotIndex = snap.Metadata.Index
n.appliedIndex = snap.Metadata.Index
ticker := time.NewTicker(n.tickTimeout)
rebroadcastTicker := time.NewTicker(n.checkInterval)
defer ticker.Stop()
defer rebroadcastTicker.Stop()
// handle input request
go func() {
//
// TODO: does it matter that this will restart from 0 whenever we restart a cluster?
//
confChangeCount := uint64(0)
for n.proposeC != nil && n.confChangeC != nil {
select {
case batch := <-n.proposeC:
data, err := batch.Marshal()
if err != nil {
n.logger.Error("Marshal batch failed")
}
n.logger.Debugf("Proposed block %d to raft core consensus", batch.Height)
if err := n.node.Propose(n.ctx, data); err != nil {
n.logger.Errorf("Failed to propose block [%d] to raft: %s", batch.Height, err)
}
case cc, ok := <-n.confChangeC:
if !ok {
n.confChangeC = nil
} else {
confChangeCount++
cc.ID = confChangeCount
if err := n.node.ProposeConfChange(n.ctx, cc); err != nil {
n.logger.Errorf("Failed to propose configuration update to Raft node: %s", err)
}
}
case <-n.ctx.Done():
return
}
}
}()
// handle messages from raft state machine
for {
select {
case <-ticker.C:
n.node.Tick()
case msg := <-n.msgC:
if err := n.processRaftCoreMsg(msg); err != nil {
n.logger.Errorf("Process consensus message failed, err: %s", err.Error())
}
case txSet := <-n.txCache.TxSetC:
// 1. send transactions to other peer
data, err := txSet.Marshal()
if err != nil {
n.logger.Errorf("Marshal failed, err: %s", err.Error())
return
}
pbMsg := msgToConsensusPbMsg(data, raftproto.RaftMessage_BROADCAST_TX, n.id)
_ = n.peerMgr.Broadcast(pbMsg)
// 2. process transactions
n.processTransactions(txSet.Transactions, true)
case state := <-n.stateC:
n.reportState(state)
case <-rebroadcastTicker.C:
// check periodically if there are long-pending txs in mempool
rebroadcastTxs := n.mempool.GetTimeoutTransactions(n.checkInterval)
for _, txSlice := range rebroadcastTxs {
txSet := &pb.Transactions{Transactions: txSlice}
data, err := txSet.Marshal()
if err != nil {
n.logger.Errorf("Marshal failed, err: %s", err.Error())
return
}
pbMsg := msgToConsensusPbMsg(data, raftproto.RaftMessage_BROADCAST_TX, n.id)
_ = n.peerMgr.Broadcast(pbMsg)
}
case <-n.batchTimerMgr.BatchTimeoutEvent():
n.batchTimerMgr.StopBatchTimer()
// call txPool module to generate a tx batch
if n.isLeader() {
n.logger.Debug("Leader batch timer expired, try to create a batch")
if n.mempool.HasPendingRequest() {
if batch := n.mempool.GenerateBlock(); batch != nil {
n.postProposal(batch)
}
} else {
n.logger.Debug("The length of priorityIndex is 0, skip the batch timer")
}
} else {
n.logger.Warningf("Replica %d try to generate batch, but the leader is %d", n.id, n.leader)
}
// when the node is first ready it gives us entries to commit and messages
// to immediately publish
case rd := <-n.node.Ready():
// 1: Write HardState, Entries, and Snapshot to persistent storage if they
// are not empty.
if err := n.raftStorage.Store(rd.Entries, rd.HardState, rd.Snapshot); err != nil {
n.logger.Errorf("Failed to persist etcd/raft data: %s", err)
}
if !raft.IsEmptySnap(rd.Snapshot) {
n.recoverFromSnapshot()
}
if rd.SoftState != nil {
newLeader := atomic.LoadUint64(&rd.SoftState.Lead)
if newLeader != n.leader {
// new leader should not serve requests directly.
if newLeader == n.id {
n.justElected = true
}
// notify old leader to stop batching
if n.leader == n.id {
n.becomeFollower()
}
n.logger.Infof("Raft leader changed: %d -> %d", n.leader, newLeader)
n.leader = newLeader
}
}
// 2: Apply Snapshot (if any) and CommittedEntries to the state machine.
if len(rd.CommittedEntries) != 0 {
if ok := n.publishEntries(n.entriesToApply(rd.CommittedEntries)); !ok {
n.Stop()
return
}
}
if n.justElected {
msgInflight := n.ramLastIndex() > n.appliedIndex+1
if msgInflight {
n.logger.Debug("There are in flight blocks, new leader should not generate new batches")
} else {
n.justElected = false
}
}
// 3: AsyncSend all Messages to the nodes named in the To field.
n.send(rd.Messages)
n.maybeTriggerSnapshot()
// 4: Call Node.Advance() to signal readiness for the next batch of updates.
n.node.Advance()
case <-n.ctx.Done():
n.Stop()
}
}
}
func (n *Node) processTransactions(txList []pb.Transaction, isLocal bool) {
// leader node would check if this transaction triggered generating a batch or not
if n.isLeader() {
// start batch timer when this node receives the first transaction
if !n.batchTimerMgr.IsBatchTimerActive() {
n.batchTimerMgr.StartBatchTimer()
}
// If this transaction triggers generating a batch, stop batch timer
if batch := n.mempool.ProcessTransactions(txList, true, isLocal); batch != nil {
n.batchTimerMgr.StopBatchTimer()
n.postProposal(batch)
}
} else {
n.mempool.ProcessTransactions(txList, false, isLocal)
}
}
func (n *Node) publishEntries(ents []raftpb.Entry) bool {
for i := range ents {
switch ents[i].Type {
case raftpb.EntryNormal:
if len(ents[i].Data) == 0 {
// ignore empty messages
break
}
// This can happen:
//
// if (1) we crashed after applying this block to the chain, but
// before writing appliedIndex to LDB.
// or (2) we crashed in a scenario where we applied further than
// raft *durably persisted* its committed index (see
// https://github.com/coreos/etcd/pull/7899). In this
// scenario, when the node comes back up, we will re-apply
// a few entries.
blockAppliedIndex := n.getBlockAppliedIndex()
if blockAppliedIndex >= ents[i].Index {
n.appliedIndex = ents[i].Index
continue
}
requestBatch := n.readyPool.Get().(*raftproto.RequestBatch)
if err := requestBatch.Unmarshal(ents[i].Data); err != nil {
n.logger.Error(err)
continue
}
// strictly avoid writing the same block
if requestBatch.Height != n.lastExec+1 {
n.logger.Warningf("Replica %d expects to execute seq=%d, but get seq=%d, ignore it",
n.id, n.lastExec+1, requestBatch.Height)
continue
}
n.mint(requestBatch)
n.blockAppliedIndex.Store(requestBatch.Height, ents[i].Index)
n.setLastExec(requestBatch.Height)
// update followers' batch sequence number
if !n.isLeader() {
n.mempool.SetBatchSeqNo(requestBatch.Height)
}
case raftpb.EntryConfChange:
var cc raftpb.ConfChange
if err := cc.Unmarshal(ents[i].Data); err != nil {
continue
}
n.confState = *n.node.ApplyConfChange(cc)
switch cc.Type {
case raftpb.ConfChangeAddNode:
//if len(cc.Context) > 0 {
// _ := types.Bytes2Address(cc.Context)
//}
case raftpb.ConfChangeRemoveNode:
//if cc.NodeID == n.id {
// n.logger.Infoln("I've been removed from the cluster! Shutting down.")
// continue
//}
}
}
// after commit, update appliedIndex
n.appliedIndex = ents[i].Index
// special nil commit to signal replay has finished
if ents[i].Index == n.lastIndex {
select {
case n.commitC <- nil:
case <-n.haltC:
return false
}
}
}
return true
}
// mint the block
func (n *Node) mint(requestBatch *raftproto.RequestBatch) {
n.logger.WithFields(logrus.Fields{
"height": requestBatch.Height,
"count": len(requestBatch.TxList.Transactions),
}).Debugln("block will be mint")
n.logger.Infof("======== Replica %d call execute, height=%d", n.id, requestBatch.Height)
block := &pb.Block{
BlockHeader: &pb.BlockHeader{
Version: []byte("1.0.0"),
Number: requestBatch.Height,
Timestamp: time.Now().UnixNano(),
},
Transactions: requestBatch.TxList,
}
// TODO (YH): refactor localLost
localList := make([]bool, len(requestBatch.TxList.Transactions))
for i := 0; i < len(requestBatch.TxList.Transactions); i++ {
localList[i] = false
}
executeEvent := &pb.CommitEvent{
Block: block,
LocalList: localList,
}
n.commitC <- executeEvent
}
//Determines whether the current apply index triggers a snapshot
func (n *Node) maybeTriggerSnapshot() {
if n.appliedIndex-n.snapshotIndex < n.snapCount {
return
}
data, err := n.getSnapshot()
if err != nil {
n.logger.Error(err)
return
}
err = n.raftStorage.TakeSnapshot(n.appliedIndex, n.confState, data)
if err != nil {
n.logger.Error(err)
return
}
n.snapshotIndex = n.appliedIndex
}
func (n *Node) reportState(state *mempool.ChainState) {
height := state.Height
if height%10 == 0 {
n.logger.WithFields(logrus.Fields{
"height": height,
}).Info("Report checkpoint")
}
appliedIndex, ok := n.blockAppliedIndex.Load(height)
if !ok {
n.logger.Debugf("can not found appliedIndex:", height)
return
}
// block already persisted, record the apply index in db
n.writeAppliedIndex(appliedIndex.(uint64))
n.blockAppliedIndex.Delete(height - 1)
n.mempool.CommitTransactions(state)
}
func (n *Node) processRaftCoreMsg(msg []byte) error {
rm := &raftproto.RaftMessage{}
if err := rm.Unmarshal(msg); err != nil {
return err
}
switch rm.Type {
case raftproto.RaftMessage_CONSENSUS:
msg := &raftpb.Message{}
if err := msg.Unmarshal(rm.Data); err != nil {
return err
}
return n.node.Step(context.Background(), *msg)
case raftproto.RaftMessage_BROADCAST_TX:
txSlice := &pb.Transactions{}
if err := txSlice.Unmarshal(rm.Data); err != nil {
return err
}
n.processTransactions(txSlice.Transactions, false)
default:
return fmt.Errorf("unexpected raft message received")
}
return nil
}
// send raft consensus message
func (n *Node) send(messages []raftpb.Message) {
for _, msg := range messages {
go func(msg raftpb.Message) {
if msg.To == 0 {
return
}
status := raft.SnapshotFinish
data, err := (&msg).Marshal()
if err != nil {
n.logger.Error(err)
return
}
rm := &raftproto.RaftMessage{
Type: raftproto.RaftMessage_CONSENSUS,
Data: data,
}
rmData, err := rm.Marshal()
if err != nil {
n.logger.Error(err)
return
}
p2pMsg := &pb.Message{
Type: pb.Message_CONSENSUS,
Data: rmData,
}
err = n.peerMgr.AsyncSend(msg.To, p2pMsg)
if err != nil {
n.logger.WithFields(logrus.Fields{
"from": n.id,
"to": msg.To,
"msg_type": msg.Type,
"err": err.Error(),
}).Debugf("async send msg")
n.node.ReportUnreachable(msg.To)
status = raft.SnapshotFailure
}
if msg.Type == raftpb.MsgSnap {
n.node.ReportSnapshot(msg.To, status)
}
}(msg)
}
}
func (n *Node) postProposal(batch *raftproto.RequestBatch) {
n.proposeC <- batch
n.batchTimerMgr.StartBatchTimer()
}
func (n *Node) becomeFollower() {
n.logger.Debugf("Replica %d became follower", n.id)
n.batchTimerMgr.StopBatchTimer()
}
func (n *Node) setLastExec(height uint64) {
n.lastExec = height
}