package etcdraft import ( "context" "errors" "fmt" "path/filepath" "sync" "sync/atomic" "time" "github.com/coreos/etcd/raft" "github.com/coreos/etcd/raft/raftpb" "github.com/ethereum/go-ethereum/event" "github.com/meshplus/bitxhub-kit/log" "github.com/meshplus/bitxhub-kit/storage" "github.com/meshplus/bitxhub-kit/types" "github.com/meshplus/bitxhub-model/pb" "github.com/meshplus/bitxhub/pkg/order" raftproto "github.com/meshplus/bitxhub/pkg/order/etcdraft/proto" "github.com/meshplus/bitxhub/pkg/order/mempool" "github.com/meshplus/bitxhub/pkg/order/syncer" "github.com/meshplus/bitxhub/pkg/peermgr" "github.com/sirupsen/logrus" ) type Node struct { id uint64 // raft id leader uint64 // leader id repoRoot string // project path logger logrus.FieldLogger // logger node raft.Node // raft node peerMgr peermgr.PeerManager // network manager peers []raft.Peer // raft peers syncer syncer.Syncer // state syncer raftStorage *RaftStorage // the raft backend storage system storage storage.Storage // db mempool mempool.MemPool // transaction pool txCache *mempool.TxCache // cache the transactions received from api batchTimerMgr *BatchTimer proposeC chan *raftproto.RequestBatch // proposed ready, input channel confChangeC chan raftpb.ConfChange // proposed cluster config changes commitC chan *pb.CommitEvent // the hash commit channel errorC chan<- error // errors from raft session tickTimeout time.Duration // tick timeout checkInterval time.Duration // interval for rebroadcast msgC chan []byte // receive messages from remote peer stateC chan *mempool.ChainState // receive the executed block state rebroadcastTicker chan *raftproto.TxSlice // receive the executed block state confState raftpb.ConfState // raft requires ConfState to be persisted within snapshot blockAppliedIndex sync.Map // mapping of block height and apply index in raft log appliedIndex uint64 // current apply index in raft log snapCount uint64 // snapshot count snapshotIndex uint64 // current snapshot apply index in raft log lastIndex uint64 // last apply index in raft log lastExec uint64 // the index of the last-applied block readyPool *sync.Pool // ready pool, avoiding memory growth fast justElected bool // track new leader status getChainMetaFunc func() *pb.ChainMeta // current chain meta ctx context.Context // context haltC chan struct{} // exit signal } // NewNode new raft node func NewNode(opts ...order.Option) (order.Order, error) { config, err := order.GenerateConfig(opts...) if err != nil { return nil, fmt.Errorf("generate config: %w", err) } repoRoot := config.RepoRoot // raft storage directory walDir := filepath.Join(config.StoragePath, "wal") snapDir := filepath.Join(config.StoragePath, "snap") dbDir := filepath.Join(config.StoragePath, "state") raftStorage, dbStorage, err := CreateStorage(config.Logger, walDir, snapDir, dbDir, raft.NewMemoryStorage()) if err != nil { return nil, err } // generate raft peers peers, err := generateRaftPeers(config) if err != nil { return nil, fmt.Errorf("generate raft peers: %w", err) } raftConfig, err := generateRaftConfig(repoRoot) if err != nil { return nil, fmt.Errorf("generate raft txpool config: %w", err) } mempoolConf := &mempool.Config{ ID: config.ID, ChainHeight: config.Applied, Logger: config.Logger, StoragePath: config.StoragePath, GetAccountNonce: config.GetAccountNonce, BatchSize: raftConfig.RAFT.MempoolConfig.BatchSize, PoolSize: raftConfig.RAFT.MempoolConfig.PoolSize, TxSliceSize: raftConfig.RAFT.MempoolConfig.TxSliceSize, TxSliceTimeout: raftConfig.RAFT.MempoolConfig.TxSliceTimeout, } mempoolInst, err := mempool.NewMempool(mempoolConf) if err != nil { return nil, fmt.Errorf("create mempool instance: %w", err) } var batchTimeout time.Duration if raftConfig.RAFT.BatchTimeout == 0 { batchTimeout = DefaultBatchTick } else { batchTimeout = raftConfig.RAFT.BatchTimeout } batchTimerMgr := NewTimer(batchTimeout, config.Logger) txCache := mempool.NewTxCache(mempoolConf.TxSliceTimeout, mempoolConf.TxSliceSize, config.Logger) readyPool := &sync.Pool{New: func() interface{} { return new(raftproto.RequestBatch) }} snapCount := raftConfig.RAFT.SyncerConfig.SnapshotCount if snapCount == 0 { snapCount = DefaultSnapshotCount } var checkInterval time.Duration if raftConfig.RAFT.CheckInterval == 0 { checkInterval = DefaultCheckInterval } else { checkInterval = raftConfig.RAFT.CheckInterval } node := &Node{ id: config.ID, lastExec: config.Applied, confChangeC: make(chan raftpb.ConfChange), commitC: make(chan *pb.CommitEvent, 1024), errorC: make(chan<- error), msgC: make(chan []byte), stateC: make(chan *mempool.ChainState), proposeC: make(chan *raftproto.RequestBatch), snapCount: snapCount, repoRoot: repoRoot, peerMgr: config.PeerMgr, txCache: txCache, batchTimerMgr: batchTimerMgr, peers: peers, logger: config.Logger, getChainMetaFunc: config.GetChainMetaFunc, storage: dbStorage, raftStorage: raftStorage, readyPool: readyPool, ctx: context.Background(), mempool: mempoolInst, checkInterval: checkInterval, } node.raftStorage.SnapshotCatchUpEntries = node.snapCount otherPeers := node.peerMgr.OtherPeers() peerIds := make([]uint64, 0, len(otherPeers)) for id, _ := range otherPeers { peerIds = append(peerIds, id) } stateSyncer, err := syncer.New(raftConfig.RAFT.SyncerConfig.SyncBlocks, config.PeerMgr, node.Quorum(), peerIds, log.NewWithModule("syncer")) if err != nil { return nil, fmt.Errorf("new state syncer error:%s", err.Error()) } node.syncer = stateSyncer node.logger.Infof("Raft localID = %d", node.id) node.logger.Infof("Raft lastExec = %d ", node.lastExec) node.logger.Infof("Raft snapshotCount = %d", node.snapCount) return node, nil } // Start or restart raft node func (n *Node) Start() error { n.blockAppliedIndex.Store(n.lastExec, n.loadAppliedIndex()) rc, tickTimeout, err := generateEtcdRaftConfig(n.id, n.repoRoot, n.logger, n.raftStorage.ram) if err != nil { return fmt.Errorf("generate raft config: %w", err) } if restart { n.node = raft.RestartNode(rc) } else { n.node = raft.StartNode(rc, n.peers) } n.tickTimeout = tickTimeout go n.run() go n.txCache.ListenEvent() n.logger.Info("Consensus module started") return nil } // Stop the raft node func (n *Node) Stop() { n.node.Stop() n.logger.Infof("Consensus stopped") } // Add the transaction into txpool and broadcast it to other nodes func (n *Node) Prepare(tx pb.Transaction) error { if err := n.Ready(); err != nil { return err } if n.txCache.IsFull() && n.mempool.IsPoolFull() { return errors.New("transaction cache are full, we will drop this transaction") } n.txCache.RecvTxC <- tx return nil } func (n *Node) Commit() chan *pb.CommitEvent { return n.commitC } func (n *Node) ReportState(height uint64, blockHash *types.Hash, txHashList []*types.Hash) { state := &mempool.ChainState{ Height: height, BlockHash: blockHash, TxHashList: txHashList, } n.stateC <- state } func (n *Node) Quorum() uint64 { return uint64(len(n.peers)/2 + 1) } func (n *Node) Step(msg []byte) error { n.msgC <- msg return nil } func (n *Node) Ready() error { hasLeader := n.leader != 0 if !hasLeader { return errors.New("in leader election status") } return nil } func (n *Node) GetPendingNonceByAccount(account string) uint64 { return n.mempool.GetPendingNonceByAccount(account) } // DelNode sends a delete vp request by given id. func (n *Node) DelNode(delID uint64) error { return nil } // SubscribeTxEvent subscribes tx event func (n *Node) SubscribeTxEvent(events chan<- pb.Transactions) event.Subscription { return n.mempool.SubscribeTxEvent(events) } // main work loop func (n *Node) run() { snap, err := n.raftStorage.ram.Snapshot() if err != nil { n.logger.Panic(err) } n.confState = snap.Metadata.ConfState n.snapshotIndex = snap.Metadata.Index n.appliedIndex = snap.Metadata.Index ticker := time.NewTicker(n.tickTimeout) rebroadcastTicker := time.NewTicker(n.checkInterval) defer ticker.Stop() defer rebroadcastTicker.Stop() // handle input request go func() { // // TODO: does it matter that this will restart from 0 whenever we restart a cluster? // confChangeCount := uint64(0) for n.proposeC != nil && n.confChangeC != nil { select { case batch := <-n.proposeC: data, err := batch.Marshal() if err != nil { n.logger.Error("Marshal batch failed") } n.logger.Debugf("Proposed block %d to raft core consensus", batch.Height) if err := n.node.Propose(n.ctx, data); err != nil { n.logger.Errorf("Failed to propose block [%d] to raft: %s", batch.Height, err) } case cc, ok := <-n.confChangeC: if !ok { n.confChangeC = nil } else { confChangeCount++ cc.ID = confChangeCount if err := n.node.ProposeConfChange(n.ctx, cc); err != nil { n.logger.Errorf("Failed to propose configuration update to Raft node: %s", err) } } case <-n.ctx.Done(): return } } }() // handle messages from raft state machine for { select { case <-ticker.C: n.node.Tick() case msg := <-n.msgC: if err := n.processRaftCoreMsg(msg); err != nil { n.logger.Errorf("Process consensus message failed, err: %s", err.Error()) } case txSet := <-n.txCache.TxSetC: // 1. send transactions to other peer data, err := txSet.Marshal() if err != nil { n.logger.Errorf("Marshal failed, err: %s", err.Error()) return } pbMsg := msgToConsensusPbMsg(data, raftproto.RaftMessage_BROADCAST_TX, n.id) _ = n.peerMgr.Broadcast(pbMsg) // 2. process transactions n.processTransactions(txSet.Transactions, true) case state := <-n.stateC: n.reportState(state) case <-rebroadcastTicker.C: // check periodically if there are long-pending txs in mempool rebroadcastTxs := n.mempool.GetTimeoutTransactions(n.checkInterval) for _, txSlice := range rebroadcastTxs { txSet := &pb.Transactions{Transactions: txSlice} data, err := txSet.Marshal() if err != nil { n.logger.Errorf("Marshal failed, err: %s", err.Error()) return } pbMsg := msgToConsensusPbMsg(data, raftproto.RaftMessage_BROADCAST_TX, n.id) _ = n.peerMgr.Broadcast(pbMsg) } case <-n.batchTimerMgr.BatchTimeoutEvent(): n.batchTimerMgr.StopBatchTimer() // call txPool module to generate a tx batch if n.isLeader() { n.logger.Debug("Leader batch timer expired, try to create a batch") if n.mempool.HasPendingRequest() { if batch := n.mempool.GenerateBlock(); batch != nil { n.postProposal(batch) } } else { n.logger.Debug("The length of priorityIndex is 0, skip the batch timer") } } else { n.logger.Warningf("Replica %d try to generate batch, but the leader is %d", n.id, n.leader) } // when the node is first ready it gives us entries to commit and messages // to immediately publish case rd := <-n.node.Ready(): // 1: Write HardState, Entries, and Snapshot to persistent storage if they // are not empty. if err := n.raftStorage.Store(rd.Entries, rd.HardState, rd.Snapshot); err != nil { n.logger.Errorf("Failed to persist etcd/raft data: %s", err) } if !raft.IsEmptySnap(rd.Snapshot) { n.recoverFromSnapshot() } if rd.SoftState != nil { newLeader := atomic.LoadUint64(&rd.SoftState.Lead) if newLeader != n.leader { // new leader should not serve requests directly. if newLeader == n.id { n.justElected = true } // notify old leader to stop batching if n.leader == n.id { n.becomeFollower() } n.logger.Infof("Raft leader changed: %d -> %d", n.leader, newLeader) n.leader = newLeader } } // 2: Apply Snapshot (if any) and CommittedEntries to the state machine. if len(rd.CommittedEntries) != 0 { if ok := n.publishEntries(n.entriesToApply(rd.CommittedEntries)); !ok { n.Stop() return } } if n.justElected { msgInflight := n.ramLastIndex() > n.appliedIndex+1 if msgInflight { n.logger.Debug("There are in flight blocks, new leader should not generate new batches") } else { n.justElected = false } } // 3: AsyncSend all Messages to the nodes named in the To field. n.send(rd.Messages) n.maybeTriggerSnapshot() // 4: Call Node.Advance() to signal readiness for the next batch of updates. n.node.Advance() case <-n.ctx.Done(): n.Stop() } } } func (n *Node) processTransactions(txList []pb.Transaction, isLocal bool) { // leader node would check if this transaction triggered generating a batch or not if n.isLeader() { // start batch timer when this node receives the first transaction if !n.batchTimerMgr.IsBatchTimerActive() { n.batchTimerMgr.StartBatchTimer() } // If this transaction triggers generating a batch, stop batch timer if batch := n.mempool.ProcessTransactions(txList, true, isLocal); batch != nil { n.batchTimerMgr.StopBatchTimer() n.postProposal(batch) } } else { n.mempool.ProcessTransactions(txList, false, isLocal) } } func (n *Node) publishEntries(ents []raftpb.Entry) bool { for i := range ents { switch ents[i].Type { case raftpb.EntryNormal: if len(ents[i].Data) == 0 { // ignore empty messages break } // This can happen: // // if (1) we crashed after applying this block to the chain, but // before writing appliedIndex to LDB. // or (2) we crashed in a scenario where we applied further than // raft *durably persisted* its committed index (see // https://github.com/coreos/etcd/pull/7899). In this // scenario, when the node comes back up, we will re-apply // a few entries. blockAppliedIndex := n.getBlockAppliedIndex() if blockAppliedIndex >= ents[i].Index { n.appliedIndex = ents[i].Index continue } requestBatch := n.readyPool.Get().(*raftproto.RequestBatch) if err := requestBatch.Unmarshal(ents[i].Data); err != nil { n.logger.Error(err) continue } // strictly avoid writing the same block if requestBatch.Height != n.lastExec+1 { n.logger.Warningf("Replica %d expects to execute seq=%d, but get seq=%d, ignore it", n.id, n.lastExec+1, requestBatch.Height) continue } n.mint(requestBatch) n.blockAppliedIndex.Store(requestBatch.Height, ents[i].Index) n.setLastExec(requestBatch.Height) // update followers' batch sequence number if !n.isLeader() { n.mempool.SetBatchSeqNo(requestBatch.Height) } case raftpb.EntryConfChange: var cc raftpb.ConfChange if err := cc.Unmarshal(ents[i].Data); err != nil { continue } n.confState = *n.node.ApplyConfChange(cc) switch cc.Type { case raftpb.ConfChangeAddNode: //if len(cc.Context) > 0 { // _ := types.Bytes2Address(cc.Context) //} case raftpb.ConfChangeRemoveNode: //if cc.NodeID == n.id { // n.logger.Infoln("I've been removed from the cluster! Shutting down.") // continue //} } } // after commit, update appliedIndex n.appliedIndex = ents[i].Index // special nil commit to signal replay has finished if ents[i].Index == n.lastIndex { select { case n.commitC <- nil: case <-n.haltC: return false } } } return true } // mint the block func (n *Node) mint(requestBatch *raftproto.RequestBatch) { n.logger.WithFields(logrus.Fields{ "height": requestBatch.Height, "count": len(requestBatch.TxList.Transactions), }).Debugln("block will be mint") n.logger.Infof("======== Replica %d call execute, height=%d", n.id, requestBatch.Height) block := &pb.Block{ BlockHeader: &pb.BlockHeader{ Version: []byte("1.0.0"), Number: requestBatch.Height, Timestamp: time.Now().UnixNano(), }, Transactions: requestBatch.TxList, } // TODO (YH): refactor localLost localList := make([]bool, len(requestBatch.TxList.Transactions)) for i := 0; i < len(requestBatch.TxList.Transactions); i++ { localList[i] = false } executeEvent := &pb.CommitEvent{ Block: block, LocalList: localList, } n.commitC <- executeEvent } //Determines whether the current apply index triggers a snapshot func (n *Node) maybeTriggerSnapshot() { if n.appliedIndex-n.snapshotIndex < n.snapCount { return } data, err := n.getSnapshot() if err != nil { n.logger.Error(err) return } err = n.raftStorage.TakeSnapshot(n.appliedIndex, n.confState, data) if err != nil { n.logger.Error(err) return } n.snapshotIndex = n.appliedIndex } func (n *Node) reportState(state *mempool.ChainState) { height := state.Height if height%10 == 0 { n.logger.WithFields(logrus.Fields{ "height": height, }).Info("Report checkpoint") } appliedIndex, ok := n.blockAppliedIndex.Load(height) if !ok { n.logger.Debugf("can not found appliedIndex:", height) return } // block already persisted, record the apply index in db n.writeAppliedIndex(appliedIndex.(uint64)) n.blockAppliedIndex.Delete(height - 1) n.mempool.CommitTransactions(state) } func (n *Node) processRaftCoreMsg(msg []byte) error { rm := &raftproto.RaftMessage{} if err := rm.Unmarshal(msg); err != nil { return err } switch rm.Type { case raftproto.RaftMessage_CONSENSUS: msg := &raftpb.Message{} if err := msg.Unmarshal(rm.Data); err != nil { return err } return n.node.Step(context.Background(), *msg) case raftproto.RaftMessage_BROADCAST_TX: txSlice := &pb.Transactions{} if err := txSlice.Unmarshal(rm.Data); err != nil { return err } n.processTransactions(txSlice.Transactions, false) default: return fmt.Errorf("unexpected raft message received") } return nil } // send raft consensus message func (n *Node) send(messages []raftpb.Message) { for _, msg := range messages { go func(msg raftpb.Message) { if msg.To == 0 { return } status := raft.SnapshotFinish data, err := (&msg).Marshal() if err != nil { n.logger.Error(err) return } rm := &raftproto.RaftMessage{ Type: raftproto.RaftMessage_CONSENSUS, Data: data, } rmData, err := rm.Marshal() if err != nil { n.logger.Error(err) return } p2pMsg := &pb.Message{ Type: pb.Message_CONSENSUS, Data: rmData, } err = n.peerMgr.AsyncSend(msg.To, p2pMsg) if err != nil { n.logger.WithFields(logrus.Fields{ "from": n.id, "to": msg.To, "msg_type": msg.Type, "err": err.Error(), }).Debugf("async send msg") n.node.ReportUnreachable(msg.To) status = raft.SnapshotFailure } if msg.Type == raftpb.MsgSnap { n.node.ReportSnapshot(msg.To, status) } }(msg) } } func (n *Node) postProposal(batch *raftproto.RequestBatch) { n.proposeC <- batch n.batchTimerMgr.StartBatchTimer() } func (n *Node) becomeFollower() { n.logger.Debugf("Replica %d became follower", n.id) n.batchTimerMgr.StopBatchTimer() } func (n *Node) setLastExec(height uint64) { n.lastExec = height }