Merge pull request #58 from meshplus/refactor/extract-order-module

refactor(order):extract order module to pkg module
This commit is contained in:
Aiden X 2020-04-27 18:52:46 +08:00 committed by GitHub
commit d0511641cd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 772 additions and 751 deletions

View File

@ -1,597 +1,10 @@
package main
import (
"context"
"encoding/binary"
"fmt"
"path/filepath"
"sync"
"time"
"github.com/coreos/etcd/raft"
"github.com/coreos/etcd/raft/raftpb"
"github.com/meshplus/bitxhub-kit/types"
"github.com/meshplus/bitxhub-model/pb"
raftproto "github.com/meshplus/bitxhub/internal/plugins/order/etcdraft/proto"
"github.com/meshplus/bitxhub/internal/plugins/order/etcdraft/txpool"
"github.com/meshplus/bitxhub/pkg/order"
"github.com/meshplus/bitxhub/pkg/peermgr"
"github.com/meshplus/bitxhub/pkg/storage"
"github.com/sirupsen/logrus"
"github.com/meshplus/bitxhub/pkg/order/etcdraft"
)
var defaultSnapshotCount uint64 = 10000
type Node struct {
id uint64 // raft id
leader uint64 // leader id
node raft.Node // raft node
peerMgr peermgr.PeerManager // network manager
peers []raft.Peer // raft peers
proposeC chan *raftproto.Ready // proposed ready, input channel
confChangeC <-chan raftpb.ConfChange // proposed cluster config changes
confState raftpb.ConfState // raft requires ConfState to be persisted within snapshot
commitC chan *pb.Block // the hash commit channel
errorC chan<- error // errors from raft session
raftStorage *RaftStorage // the raft backend storage system
tp *txpool.TxPool // transaction pool
storage storage.Storage // db
repoRoot string //project path
logger logrus.FieldLogger //logger
blockAppliedIndex sync.Map // mapping of block height and apply index in raft log
appliedIndex uint64 // current apply index in raft log
snapCount uint64 // snapshot count
snapshotIndex uint64 // current snapshot apply index in raft log
lastIndex uint64 // last apply index in raft log
readyPool *sync.Pool // ready pool, avoiding memory growth fast
readyCache sync.Map //ready cache
ctx context.Context // context
haltC chan struct{} // exit signal
}
// NewNode new raft node
func NewNode(opts ...order.Option) (order.Order, error) {
config, err := order.GenerateConfig(opts...)
if err != nil {
return nil, fmt.Errorf("generate config: %w", err)
}
repoRoot := config.RepoRoot
// raft storage directory
walDir := filepath.Join(config.StoragePath, "wal")
snapDir := filepath.Join(config.StoragePath, "snap")
dbDir := filepath.Join(config.StoragePath, "state")
raftStorage, dbStorage, err := CreateStorage(config.Logger, walDir, snapDir, dbDir, raft.NewMemoryStorage())
if err != nil {
return nil, err
}
//generate raft peers
peers, err := GenerateRaftPeers(config)
if err != nil {
return nil, fmt.Errorf("generate raft peers: %w", err)
}
//generate txpool config
tpc, err := generateTxPoolConfig(repoRoot)
if err != nil {
return nil, fmt.Errorf("generate raft txpool config: %w", err)
}
txPoolConfig := &txpool.Config{
PackSize: tpc.PackSize,
BlockTick: tpc.BlockTick,
PoolSize: tpc.PoolSize,
}
txPool, proposeC := txpool.New(config, dbStorage, txPoolConfig)
readyPool := &sync.Pool{New: func() interface{} {
return new(raftproto.Ready)
}}
return &Node{
id: config.ID,
proposeC: proposeC,
confChangeC: make(chan raftpb.ConfChange),
commitC: make(chan *pb.Block, 1024),
errorC: make(chan<- error),
tp: txPool,
repoRoot: repoRoot,
snapCount: defaultSnapshotCount,
peerMgr: config.PeerMgr,
peers: peers,
logger: config.Logger,
storage: dbStorage,
raftStorage: raftStorage,
readyPool: readyPool,
ctx: context.Background(),
}, nil
}
//Start or restart raft node
func (n *Node) Start() error {
n.blockAppliedIndex.Store(n.tp.GetHeight(), n.loadAppliedIndex())
rc, err := generateRaftConfig(n.id, n.repoRoot, n.logger, n.raftStorage.ram)
if err != nil {
return fmt.Errorf("generate raft config: %w", err)
}
if restart {
n.node = raft.RestartNode(rc)
} else {
n.node = raft.StartNode(rc, n.peers)
}
go n.run()
n.logger.Info("Consensus module started")
return nil
}
//Stop the raft node
func (n *Node) Stop() {
n.tp.CheckExecute(false)
n.node.Stop()
n.logger.Infof("Consensus stopped")
}
//Add the transaction into txpool and broadcast it to other nodes
func (n *Node) Prepare(tx *pb.Transaction) error {
if !n.Ready() {
return nil
}
if err := n.tp.AddPendingTx(tx, false); err != nil {
return err
}
if err := n.tp.Broadcast(tx); err != nil {
return err
}
return nil
}
func (n *Node) Commit() chan *pb.Block {
return n.commitC
}
func (n *Node) ReportState(height uint64, hash types.Hash) {
if height%10 == 0 {
n.logger.WithFields(logrus.Fields{
"height": height,
"hash": hash.ShortString(),
}).Info("Report checkpoint")
}
appliedIndex, ok := n.blockAppliedIndex.Load(height)
if !ok {
n.logger.Errorf("can not found appliedIndex:", height)
return
}
//block already persisted, record the apply index in db
n.writeAppliedIndex(appliedIndex.(uint64))
n.blockAppliedIndex.Delete(height)
n.tp.BuildReqLookUp() //store bloom filter
ready, ok := n.readyCache.Load(height)
if !ok {
n.logger.Errorf("can not found ready:", height)
return
}
hashes := ready.(*raftproto.Ready).TxHashes
// remove redundant tx
n.tp.BatchDelete(hashes)
n.readyCache.Delete(height)
}
func (n *Node) Quorum() uint64 {
return uint64(len(n.peers)/2 + 1)
}
func (n *Node) Step(ctx context.Context, msg []byte) error {
rm := &raftproto.RaftMessage{}
if err := rm.Unmarshal(msg); err != nil {
return err
}
switch rm.Type {
case raftproto.RaftMessage_CONSENSUS:
msg := &raftpb.Message{}
if err := msg.Unmarshal(rm.Data); err != nil {
return err
}
return n.node.Step(ctx, *msg)
case raftproto.RaftMessage_GET_TX:
hash := types.Hash{}
if err := hash.Unmarshal(rm.Data); err != nil {
return err
}
tx, ok := n.tp.GetTx(hash, true)
if !ok {
return nil
}
v, err := tx.Marshal()
if err != nil {
return err
}
txAck := &raftproto.RaftMessage{
Type: raftproto.RaftMessage_GET_TX_ACK,
Data: v,
}
txAckData, err := txAck.Marshal()
if err != nil {
return err
}
m := &pb.Message{
Type: pb.Message_CONSENSUS,
Data: txAckData,
}
return n.peerMgr.AsyncSend(rm.FromId, m)
case raftproto.RaftMessage_GET_TX_ACK:
tx := &pb.Transaction{}
if err := tx.Unmarshal(rm.Data); err != nil {
return err
}
return n.tp.AddPendingTx(tx, true)
case raftproto.RaftMessage_BROADCAST_TX:
tx := &pb.Transaction{}
if err := tx.Unmarshal(rm.Data); err != nil {
return err
}
return n.tp.AddPendingTx(tx, false)
default:
return fmt.Errorf("unexpected raft message received")
}
}
func (n *Node) IsLeader() bool {
return n.leader == n.id
}
func (n *Node) Ready() bool {
return n.leader != 0
}
// main work loop
func (n *Node) run() {
snap, err := n.raftStorage.ram.Snapshot()
if err != nil {
n.logger.Panic(err)
}
n.confState = snap.Metadata.ConfState
n.snapshotIndex = snap.Metadata.Index
n.appliedIndex = snap.Metadata.Index
ticker := time.NewTicker(100 * time.Millisecond)
defer ticker.Stop()
// handle input request
go func() {
//
// TODO: does it matter that this will restart from 0 whenever we restart a cluster?
//
confChangeCount := uint64(0)
for n.proposeC != nil && n.confChangeC != nil {
select {
case ready, ok := <-n.proposeC:
if !ok {
n.proposeC = nil
} else {
if !n.IsLeader() {
n.tp.CheckExecute(false)
continue
}
data, err := ready.Marshal()
if err != nil {
n.logger.Panic(err)
}
n.tp.BatchStore(ready.TxHashes)
if err := n.node.Propose(n.ctx, data); err != nil {
n.logger.Panic("Failed to propose block [%d] to raft: %s", ready.Height, err)
}
}
case cc, ok := <-n.confChangeC:
if !ok {
n.confChangeC = nil
} else {
confChangeCount++
cc.ID = confChangeCount
if err := n.node.ProposeConfChange(n.ctx, cc); err != nil {
n.logger.Panic("Failed to propose configuration update to Raft node: %s", err)
}
}
case <-n.ctx.Done():
return
}
}
}()
// handle messages from raft state machine
for {
select {
case <-ticker.C:
n.node.Tick()
// when the node is first ready it gives us entries to commit and messages
// to immediately publish
case rd := <-n.node.Ready():
// 1: Write HardState, Entries, and Snapshot to persistent storage if they
// are not empty.
if err := n.raftStorage.Store(rd.Entries, rd.HardState, rd.Snapshot); err != nil {
n.logger.Fatalf("failed to persist etcd/raft data: %s", err)
}
// 2: Apply Snapshot (if any) and CommittedEntries to the state machine.
if len(rd.CommittedEntries) != 0 {
if ok := n.publishEntries(n.entriesToApply(rd.CommittedEntries)); !ok {
n.Stop()
return
}
}
if rd.SoftState != nil {
n.leader = rd.SoftState.Lead
n.tp.CheckExecute(n.IsLeader())
}
// 3: AsyncSend all Messages to the nodes named in the To field.
go n.send(rd.Messages)
n.maybeTriggerSnapshot()
// 4: Call Node.Advance() to signal readiness for the next batch of
// updates.
n.node.Advance()
case <-n.ctx.Done():
n.Stop()
}
}
}
// send raft consensus message
func (n *Node) send(messages []raftpb.Message) {
for _, msg := range messages {
if msg.To == 0 {
continue
}
status := raft.SnapshotFinish
data, err := (&msg).Marshal()
if err != nil {
n.logger.Error(err)
continue
}
rm := &raftproto.RaftMessage{
Type: raftproto.RaftMessage_CONSENSUS,
Data: data,
}
rmData, err := rm.Marshal()
if err != nil {
n.logger.Error(err)
continue
}
p2pMsg := &pb.Message{
Type: pb.Message_CONSENSUS,
Data: rmData,
}
err = n.peerMgr.AsyncSend(msg.To, p2pMsg)
if err != nil {
n.node.ReportUnreachable(msg.To)
status = raft.SnapshotFailure
}
if msg.Type == raftpb.MsgSnap {
n.node.ReportSnapshot(msg.To, status)
}
}
}
func (n *Node) publishEntries(ents []raftpb.Entry) bool {
for i := range ents {
switch ents[i].Type {
case raftpb.EntryNormal:
if len(ents[i].Data) == 0 {
// ignore empty messages
break
}
ready := n.readyPool.Get().(*raftproto.Ready)
if err := ready.Unmarshal(ents[i].Data); err != nil {
n.logger.Error(err)
continue
}
// This can happen:
//
// if (1) we crashed after applying this block to the chain, but
// before writing appliedIndex to LDB.
// or (2) we crashed in a scenario where we applied further than
// raft *durably persisted* its committed index (see
// https://github.com/coreos/etcd/pull/7899). In this
// scenario, when the node comes back up, we will re-apply
// a few entries.
blockAppliedIndex := n.getBlockAppliedIndex()
if blockAppliedIndex >= ents[i].Index {
n.appliedIndex = ents[i].Index
continue
}
n.mint(ready)
n.blockAppliedIndex.Store(ready.Height, ents[i].Index)
case raftpb.EntryConfChange:
var cc raftpb.ConfChange
if err := cc.Unmarshal(ents[i].Data); err != nil {
continue
}
n.confState = *n.node.ApplyConfChange(cc)
switch cc.Type {
case raftpb.ConfChangeAddNode:
//if len(cc.Context) > 0 {
// _ := types.Bytes2Address(cc.Context)
//}
case raftpb.ConfChangeRemoveNode:
//if cc.NodeID == n.id {
// n.logger.Infoln("I've been removed from the cluster! Shutting down.")
// continue
//}
}
}
// after commit, update appliedIndex
n.appliedIndex = ents[i].Index
// special nil commit to signal replay has finished
if ents[i].Index == n.lastIndex {
select {
case n.commitC <- nil:
case <-n.haltC:
return false
}
}
}
return true
}
//mint the block
func (n *Node) mint(ready *raftproto.Ready) {
n.logger.WithFields(logrus.Fields{
"height": ready.Height,
"count": len(ready.TxHashes),
}).Debugln("block will be generated")
//follower node update the block height
if n.tp.GetHeight() == ready.Height-1 {
n.tp.UpdateHeight()
}
loseTxs := make([]types.Hash, 0)
txs := make([]*pb.Transaction, 0, len(ready.TxHashes))
for _, hash := range ready.TxHashes {
_, ok := n.tp.GetTx(hash, false)
if !ok {
loseTxs = append(loseTxs, hash)
}
}
//handle missing txs
if len(loseTxs) != 0 {
var wg sync.WaitGroup
wg.Add(len(loseTxs))
for _, hash := range loseTxs {
go func(hash types.Hash) {
defer wg.Done()
n.tp.FetchTx(hash, ready.Height)
}(hash)
}
wg.Wait()
}
for _, hash := range ready.TxHashes {
tx, _ := n.tp.GetTx(hash, false)
txs = append(txs, tx)
}
n.tp.RemoveTxs(ready.TxHashes, n.IsLeader())
block := &pb.Block{
BlockHeader: &pb.BlockHeader{
Version: []byte("1.0.0"),
Number: ready.Height,
Timestamp: time.Now().UnixNano(),
},
Transactions: txs,
}
n.logger.WithFields(logrus.Fields{
"txpool_size": n.tp.PoolSize(),
}).Debugln("current tx pool size")
n.readyCache.Store(ready.Height, ready)
n.commitC <- block
}
//Determine whether the current apply index is normal
func (n *Node) entriesToApply(allEntries []raftpb.Entry) (entriesToApply []raftpb.Entry) {
if len(allEntries) == 0 {
return
}
firstIdx := allEntries[0].Index
if firstIdx > n.appliedIndex+1 {
n.logger.Fatalf("first index of committed entry[%d] should <= progress.appliedIndex[%d]+1", firstIdx, n.appliedIndex)
}
if n.appliedIndex-firstIdx+1 < uint64(len(allEntries)) {
entriesToApply = allEntries[n.appliedIndex-firstIdx+1:]
}
return entriesToApply
}
//Determines whether the current apply index triggers a snapshot
func (n *Node) maybeTriggerSnapshot() {
if n.appliedIndex-n.snapshotIndex <= n.snapCount {
return
}
data := n.raftStorage.Snapshot().Data
n.logger.Infof("Start snapshot [applied index: %d | last snapshot index: %d]", n.appliedIndex, n.snapshotIndex)
snap, err := n.raftStorage.ram.CreateSnapshot(n.appliedIndex, &n.confState, data)
if err != nil {
panic(err)
}
if err := n.raftStorage.saveSnap(snap); err != nil {
panic(err)
}
compactIndex := uint64(1)
if n.appliedIndex > n.raftStorage.SnapshotCatchUpEntries {
compactIndex = n.appliedIndex - n.raftStorage.SnapshotCatchUpEntries
}
if err := n.raftStorage.ram.Compact(compactIndex); err != nil {
panic(err)
}
n.logger.Infof("compacted log at index %d", compactIndex)
n.snapshotIndex = n.appliedIndex
}
func GenerateRaftPeers(config *order.Config) ([]raft.Peer, error) {
nodes := config.Nodes
peers := make([]raft.Peer, 0, len(nodes))
for id, node := range nodes {
peers = append(peers, raft.Peer{ID: id, Context: node.Bytes()})
}
return peers, nil
}
//Get the raft apply index of the highest block
func (n *Node) getBlockAppliedIndex() uint64 {
height := uint64(0)
n.blockAppliedIndex.Range(
func(key, value interface{}) bool {
k := key.(uint64)
if k > height {
height = k
}
return true
})
appliedIndex, ok := n.blockAppliedIndex.Load(height)
if !ok {
return 0
}
return appliedIndex.(uint64)
}
//Load the lastAppliedIndex of block height
func (n *Node) loadAppliedIndex() uint64 {
dat, err := n.storage.Get(appliedDbKey)
var lastAppliedIndex uint64
if err != nil {
lastAppliedIndex = 0
} else {
lastAppliedIndex = binary.LittleEndian.Uint64(dat)
}
return lastAppliedIndex
}
//Write the lastAppliedIndex
func (n *Node) writeAppliedIndex(index uint64) {
buf := make([]byte, 8)
binary.LittleEndian.PutUint64(buf, index)
if err := n.storage.Put(appliedDbKey, buf); err != nil {
n.logger.Errorf("persisted the latest applied index: %s", err)
}
return etcdraft.NewNode(opts...)
}

View File

@ -1,161 +1,10 @@
package main
import (
"container/list"
"context"
"fmt"
"sync"
"time"
"github.com/meshplus/bitxhub-kit/types"
"github.com/meshplus/bitxhub-model/pb"
"github.com/meshplus/bitxhub/pkg/order"
"github.com/meshplus/bitxhub/pkg/storage/leveldb"
"github.com/sirupsen/logrus"
"github.com/meshplus/bitxhub/pkg/order/solo"
)
type Node struct {
sync.RWMutex
height uint64 // current block height
pendingTxs *list.List //pending tx pool
commitC chan *pb.Block //block channel
logger logrus.FieldLogger //logger
reqLookUp *order.ReqLookUp //bloom filter
getTransactionFunc func(hash types.Hash) (*pb.Transaction, error)
packSize int //maximum number of transaction packages
blockTick time.Duration //block packed period
ctx context.Context
cancel context.CancelFunc
}
func (n *Node) Start() error {
go n.execute()
return nil
}
func (n *Node) Stop() {
n.cancel()
}
func (n *Node) Prepare(tx *pb.Transaction) error {
hash := tx.TransactionHash
if ok := n.reqLookUp.LookUp(hash.Bytes()); ok {
if tx, _ := n.getTransactionFunc(hash); tx != nil {
return nil
}
}
n.pushBack(tx)
return nil
}
func (n *Node) Commit() chan *pb.Block {
return n.commitC
}
func (n *Node) Step(ctx context.Context, msg []byte) error {
return nil
}
func (n *Node) Ready() bool {
return true
}
func (n *Node) ReportState(height uint64, hash types.Hash) {
if err := n.reqLookUp.Build(); err != nil {
n.logger.Errorf("bloom filter persistence error", err)
}
if height%10 == 0 {
n.logger.WithFields(logrus.Fields{
"height": height,
"hash": hash.ShortString(),
}).Info("Report checkpoint")
}
}
func (n *Node) Quorum() uint64 {
return 1
}
func NewNode(opts ...order.Option) (order.Order, error) {
config, err := order.GenerateConfig(opts...)
if err != nil {
return nil, fmt.Errorf("generate config: %w", err)
}
storage, err := leveldb.New(config.StoragePath)
if err != nil {
return nil, fmt.Errorf("new leveldb: %w", err)
}
reqLookUp, err := order.NewReqLookUp(storage, config.Logger)
if err != nil {
return nil, fmt.Errorf("new bloom filter: %w", err)
}
ctx, cancel := context.WithCancel(context.Background())
return &Node{
height: config.Applied,
pendingTxs: list.New(),
commitC: make(chan *pb.Block, 1024),
packSize: 500,
blockTick: 500 * time.Millisecond,
reqLookUp: reqLookUp,
getTransactionFunc: config.GetTransactionFunc,
logger: config.Logger,
ctx: ctx,
cancel: cancel,
}, nil
}
// Schedule to collect txs to the ready channel
func (n *Node) execute() {
ticker := time.NewTicker(n.blockTick)
defer ticker.Stop()
for {
select {
case <-ticker.C:
n.Lock()
l := n.pendingTxs.Len()
if l == 0 {
n.Unlock()
continue
}
var size int
if l > n.packSize {
size = n.packSize
} else {
size = l
}
txs := make([]*pb.Transaction, 0, size)
for i := 0; i < size; i++ {
front := n.pendingTxs.Front()
tx := front.Value.(*pb.Transaction)
txs = append(txs, tx)
n.pendingTxs.Remove(front)
}
n.height++
n.Unlock()
block := &pb.Block{
BlockHeader: &pb.BlockHeader{
Version: []byte("1.0.0"),
Number: n.height,
Timestamp: time.Now().UnixNano(),
},
Transactions: txs,
}
n.commitC <- block
case <-n.ctx.Done():
n.logger.Infoln("Done txpool execute")
return
}
}
}
func (n *Node) pushBack(value interface{}) *list.Element {
n.Lock()
defer n.Unlock()
return n.pendingTxs.PushBack(value)
return solo.NewNode(opts...)
}

View File

@ -1,4 +1,4 @@
package main
package etcdraft
import (
"path/filepath"

597
pkg/order/etcdraft/node.go Normal file
View File

@ -0,0 +1,597 @@
package etcdraft
import (
"context"
"encoding/binary"
"fmt"
"path/filepath"
"sync"
"time"
"github.com/coreos/etcd/raft"
"github.com/coreos/etcd/raft/raftpb"
"github.com/meshplus/bitxhub-kit/types"
"github.com/meshplus/bitxhub-model/pb"
"github.com/meshplus/bitxhub/pkg/order"
raftproto "github.com/meshplus/bitxhub/pkg/order/etcdraft/proto"
"github.com/meshplus/bitxhub/pkg/order/etcdraft/txpool"
"github.com/meshplus/bitxhub/pkg/peermgr"
"github.com/meshplus/bitxhub/pkg/storage"
"github.com/sirupsen/logrus"
)
var defaultSnapshotCount uint64 = 10000
type Node struct {
id uint64 // raft id
leader uint64 // leader id
node raft.Node // raft node
peerMgr peermgr.PeerManager // network manager
peers []raft.Peer // raft peers
proposeC chan *raftproto.Ready // proposed ready, input channel
confChangeC <-chan raftpb.ConfChange // proposed cluster config changes
confState raftpb.ConfState // raft requires ConfState to be persisted within snapshot
commitC chan *pb.Block // the hash commit channel
errorC chan<- error // errors from raft session
raftStorage *RaftStorage // the raft backend storage system
tp *txpool.TxPool // transaction pool
storage storage.Storage // db
repoRoot string //project path
logger logrus.FieldLogger //logger
blockAppliedIndex sync.Map // mapping of block height and apply index in raft log
appliedIndex uint64 // current apply index in raft log
snapCount uint64 // snapshot count
snapshotIndex uint64 // current snapshot apply index in raft log
lastIndex uint64 // last apply index in raft log
readyPool *sync.Pool // ready pool, avoiding memory growth fast
readyCache sync.Map //ready cache
ctx context.Context // context
haltC chan struct{} // exit signal
}
// NewNode new raft node
func NewNode(opts ...order.Option) (order.Order, error) {
config, err := order.GenerateConfig(opts...)
if err != nil {
return nil, fmt.Errorf("generate config: %w", err)
}
repoRoot := config.RepoRoot
// raft storage directory
walDir := filepath.Join(config.StoragePath, "wal")
snapDir := filepath.Join(config.StoragePath, "snap")
dbDir := filepath.Join(config.StoragePath, "state")
raftStorage, dbStorage, err := CreateStorage(config.Logger, walDir, snapDir, dbDir, raft.NewMemoryStorage())
if err != nil {
return nil, err
}
//generate raft peers
peers, err := GenerateRaftPeers(config)
if err != nil {
return nil, fmt.Errorf("generate raft peers: %w", err)
}
//generate txpool config
tpc, err := generateTxPoolConfig(repoRoot)
if err != nil {
return nil, fmt.Errorf("generate raft txpool config: %w", err)
}
txPoolConfig := &txpool.Config{
PackSize: tpc.PackSize,
BlockTick: tpc.BlockTick,
PoolSize: tpc.PoolSize,
}
txPool, proposeC := txpool.New(config, dbStorage, txPoolConfig)
readyPool := &sync.Pool{New: func() interface{} {
return new(raftproto.Ready)
}}
return &Node{
id: config.ID,
proposeC: proposeC,
confChangeC: make(chan raftpb.ConfChange),
commitC: make(chan *pb.Block, 1024),
errorC: make(chan<- error),
tp: txPool,
repoRoot: repoRoot,
snapCount: defaultSnapshotCount,
peerMgr: config.PeerMgr,
peers: peers,
logger: config.Logger,
storage: dbStorage,
raftStorage: raftStorage,
readyPool: readyPool,
ctx: context.Background(),
}, nil
}
//Start or restart raft node
func (n *Node) Start() error {
n.blockAppliedIndex.Store(n.tp.GetHeight(), n.loadAppliedIndex())
rc, err := generateRaftConfig(n.id, n.repoRoot, n.logger, n.raftStorage.ram)
if err != nil {
return fmt.Errorf("generate raft config: %w", err)
}
if restart {
n.node = raft.RestartNode(rc)
} else {
n.node = raft.StartNode(rc, n.peers)
}
go n.run()
n.logger.Info("Consensus module started")
return nil
}
//Stop the raft node
func (n *Node) Stop() {
n.tp.CheckExecute(false)
n.node.Stop()
n.logger.Infof("Consensus stopped")
}
//Add the transaction into txpool and broadcast it to other nodes
func (n *Node) Prepare(tx *pb.Transaction) error {
if !n.Ready() {
return nil
}
if err := n.tp.AddPendingTx(tx, false); err != nil {
return err
}
if err := n.tp.Broadcast(tx); err != nil {
return err
}
return nil
}
func (n *Node) Commit() chan *pb.Block {
return n.commitC
}
func (n *Node) ReportState(height uint64, hash types.Hash) {
if height%10 == 0 {
n.logger.WithFields(logrus.Fields{
"height": height,
"hash": hash.ShortString(),
}).Info("Report checkpoint")
}
appliedIndex, ok := n.blockAppliedIndex.Load(height)
if !ok {
n.logger.Errorf("can not found appliedIndex:", height)
return
}
//block already persisted, record the apply index in db
n.writeAppliedIndex(appliedIndex.(uint64))
n.blockAppliedIndex.Delete(height)
n.tp.BuildReqLookUp() //store bloom filter
ready, ok := n.readyCache.Load(height)
if !ok {
n.logger.Errorf("can not found ready:", height)
return
}
hashes := ready.(*raftproto.Ready).TxHashes
// remove redundant tx
n.tp.BatchDelete(hashes)
n.readyCache.Delete(height)
}
func (n *Node) Quorum() uint64 {
return uint64(len(n.peers)/2 + 1)
}
func (n *Node) Step(ctx context.Context, msg []byte) error {
rm := &raftproto.RaftMessage{}
if err := rm.Unmarshal(msg); err != nil {
return err
}
switch rm.Type {
case raftproto.RaftMessage_CONSENSUS:
msg := &raftpb.Message{}
if err := msg.Unmarshal(rm.Data); err != nil {
return err
}
return n.node.Step(ctx, *msg)
case raftproto.RaftMessage_GET_TX:
hash := types.Hash{}
if err := hash.Unmarshal(rm.Data); err != nil {
return err
}
tx, ok := n.tp.GetTx(hash, true)
if !ok {
return nil
}
v, err := tx.Marshal()
if err != nil {
return err
}
txAck := &raftproto.RaftMessage{
Type: raftproto.RaftMessage_GET_TX_ACK,
Data: v,
}
txAckData, err := txAck.Marshal()
if err != nil {
return err
}
m := &pb.Message{
Type: pb.Message_CONSENSUS,
Data: txAckData,
}
return n.peerMgr.AsyncSend(rm.FromId, m)
case raftproto.RaftMessage_GET_TX_ACK:
tx := &pb.Transaction{}
if err := tx.Unmarshal(rm.Data); err != nil {
return err
}
return n.tp.AddPendingTx(tx, true)
case raftproto.RaftMessage_BROADCAST_TX:
tx := &pb.Transaction{}
if err := tx.Unmarshal(rm.Data); err != nil {
return err
}
return n.tp.AddPendingTx(tx, false)
default:
return fmt.Errorf("unexpected raft message received")
}
}
func (n *Node) IsLeader() bool {
return n.leader == n.id
}
func (n *Node) Ready() bool {
return n.leader != 0
}
// main work loop
func (n *Node) run() {
snap, err := n.raftStorage.ram.Snapshot()
if err != nil {
n.logger.Panic(err)
}
n.confState = snap.Metadata.ConfState
n.snapshotIndex = snap.Metadata.Index
n.appliedIndex = snap.Metadata.Index
ticker := time.NewTicker(100 * time.Millisecond)
defer ticker.Stop()
// handle input request
go func() {
//
// TODO: does it matter that this will restart from 0 whenever we restart a cluster?
//
confChangeCount := uint64(0)
for n.proposeC != nil && n.confChangeC != nil {
select {
case ready, ok := <-n.proposeC:
if !ok {
n.proposeC = nil
} else {
if !n.IsLeader() {
n.tp.CheckExecute(false)
continue
}
data, err := ready.Marshal()
if err != nil {
n.logger.Panic(err)
}
n.tp.BatchStore(ready.TxHashes)
if err := n.node.Propose(n.ctx, data); err != nil {
n.logger.Panic("Failed to propose block [%d] to raft: %s", ready.Height, err)
}
}
case cc, ok := <-n.confChangeC:
if !ok {
n.confChangeC = nil
} else {
confChangeCount++
cc.ID = confChangeCount
if err := n.node.ProposeConfChange(n.ctx, cc); err != nil {
n.logger.Panic("Failed to propose configuration update to Raft node: %s", err)
}
}
case <-n.ctx.Done():
return
}
}
}()
// handle messages from raft state machine
for {
select {
case <-ticker.C:
n.node.Tick()
// when the node is first ready it gives us entries to commit and messages
// to immediately publish
case rd := <-n.node.Ready():
// 1: Write HardState, Entries, and Snapshot to persistent storage if they
// are not empty.
if err := n.raftStorage.Store(rd.Entries, rd.HardState, rd.Snapshot); err != nil {
n.logger.Fatalf("failed to persist etcd/raft data: %s", err)
}
// 2: Apply Snapshot (if any) and CommittedEntries to the state machine.
if len(rd.CommittedEntries) != 0 {
if ok := n.publishEntries(n.entriesToApply(rd.CommittedEntries)); !ok {
n.Stop()
return
}
}
if rd.SoftState != nil {
n.leader = rd.SoftState.Lead
n.tp.CheckExecute(n.IsLeader())
}
// 3: AsyncSend all Messages to the nodes named in the To field.
go n.send(rd.Messages)
n.maybeTriggerSnapshot()
// 4: Call Node.Advance() to signal readiness for the next batch of
// updates.
n.node.Advance()
case <-n.ctx.Done():
n.Stop()
}
}
}
// send raft consensus message
func (n *Node) send(messages []raftpb.Message) {
for _, msg := range messages {
if msg.To == 0 {
continue
}
status := raft.SnapshotFinish
data, err := (&msg).Marshal()
if err != nil {
n.logger.Error(err)
continue
}
rm := &raftproto.RaftMessage{
Type: raftproto.RaftMessage_CONSENSUS,
Data: data,
}
rmData, err := rm.Marshal()
if err != nil {
n.logger.Error(err)
continue
}
p2pMsg := &pb.Message{
Type: pb.Message_CONSENSUS,
Data: rmData,
}
err = n.peerMgr.AsyncSend(msg.To, p2pMsg)
if err != nil {
n.node.ReportUnreachable(msg.To)
status = raft.SnapshotFailure
}
if msg.Type == raftpb.MsgSnap {
n.node.ReportSnapshot(msg.To, status)
}
}
}
func (n *Node) publishEntries(ents []raftpb.Entry) bool {
for i := range ents {
switch ents[i].Type {
case raftpb.EntryNormal:
if len(ents[i].Data) == 0 {
// ignore empty messages
break
}
ready := n.readyPool.Get().(*raftproto.Ready)
if err := ready.Unmarshal(ents[i].Data); err != nil {
n.logger.Error(err)
continue
}
// This can happen:
//
// if (1) we crashed after applying this block to the chain, but
// before writing appliedIndex to LDB.
// or (2) we crashed in a scenario where we applied further than
// raft *durably persisted* its committed index (see
// https://github.com/coreos/etcd/pull/7899). In this
// scenario, when the node comes back up, we will re-apply
// a few entries.
blockAppliedIndex := n.getBlockAppliedIndex()
if blockAppliedIndex >= ents[i].Index {
n.appliedIndex = ents[i].Index
continue
}
n.mint(ready)
n.blockAppliedIndex.Store(ready.Height, ents[i].Index)
case raftpb.EntryConfChange:
var cc raftpb.ConfChange
if err := cc.Unmarshal(ents[i].Data); err != nil {
continue
}
n.confState = *n.node.ApplyConfChange(cc)
switch cc.Type {
case raftpb.ConfChangeAddNode:
//if len(cc.Context) > 0 {
// _ := types.Bytes2Address(cc.Context)
//}
case raftpb.ConfChangeRemoveNode:
//if cc.NodeID == n.id {
// n.logger.Infoln("I've been removed from the cluster! Shutting down.")
// continue
//}
}
}
// after commit, update appliedIndex
n.appliedIndex = ents[i].Index
// special nil commit to signal replay has finished
if ents[i].Index == n.lastIndex {
select {
case n.commitC <- nil:
case <-n.haltC:
return false
}
}
}
return true
}
//mint the block
func (n *Node) mint(ready *raftproto.Ready) {
n.logger.WithFields(logrus.Fields{
"height": ready.Height,
"count": len(ready.TxHashes),
}).Debugln("block will be generated")
//follower node update the block height
if n.tp.GetHeight() == ready.Height-1 {
n.tp.UpdateHeight()
}
loseTxs := make([]types.Hash, 0)
txs := make([]*pb.Transaction, 0, len(ready.TxHashes))
for _, hash := range ready.TxHashes {
_, ok := n.tp.GetTx(hash, false)
if !ok {
loseTxs = append(loseTxs, hash)
}
}
//handle missing txs
if len(loseTxs) != 0 {
var wg sync.WaitGroup
wg.Add(len(loseTxs))
for _, hash := range loseTxs {
go func(hash types.Hash) {
defer wg.Done()
n.tp.FetchTx(hash, ready.Height)
}(hash)
}
wg.Wait()
}
for _, hash := range ready.TxHashes {
tx, _ := n.tp.GetTx(hash, false)
txs = append(txs, tx)
}
n.tp.RemoveTxs(ready.TxHashes, n.IsLeader())
block := &pb.Block{
BlockHeader: &pb.BlockHeader{
Version: []byte("1.0.0"),
Number: ready.Height,
Timestamp: time.Now().UnixNano(),
},
Transactions: txs,
}
n.logger.WithFields(logrus.Fields{
"txpool_size": n.tp.PoolSize(),
}).Debugln("current tx pool size")
n.readyCache.Store(ready.Height, ready)
n.commitC <- block
}
//Determine whether the current apply index is normal
func (n *Node) entriesToApply(allEntries []raftpb.Entry) (entriesToApply []raftpb.Entry) {
if len(allEntries) == 0 {
return
}
firstIdx := allEntries[0].Index
if firstIdx > n.appliedIndex+1 {
n.logger.Fatalf("first index of committed entry[%d] should <= progress.appliedIndex[%d]+1", firstIdx, n.appliedIndex)
}
if n.appliedIndex-firstIdx+1 < uint64(len(allEntries)) {
entriesToApply = allEntries[n.appliedIndex-firstIdx+1:]
}
return entriesToApply
}
//Determines whether the current apply index triggers a snapshot
func (n *Node) maybeTriggerSnapshot() {
if n.appliedIndex-n.snapshotIndex <= n.snapCount {
return
}
data := n.raftStorage.Snapshot().Data
n.logger.Infof("Start snapshot [applied index: %d | last snapshot index: %d]", n.appliedIndex, n.snapshotIndex)
snap, err := n.raftStorage.ram.CreateSnapshot(n.appliedIndex, &n.confState, data)
if err != nil {
panic(err)
}
if err := n.raftStorage.saveSnap(snap); err != nil {
panic(err)
}
compactIndex := uint64(1)
if n.appliedIndex > n.raftStorage.SnapshotCatchUpEntries {
compactIndex = n.appliedIndex - n.raftStorage.SnapshotCatchUpEntries
}
if err := n.raftStorage.ram.Compact(compactIndex); err != nil {
panic(err)
}
n.logger.Infof("compacted log at index %d", compactIndex)
n.snapshotIndex = n.appliedIndex
}
func GenerateRaftPeers(config *order.Config) ([]raft.Peer, error) {
nodes := config.Nodes
peers := make([]raft.Peer, 0, len(nodes))
for id, node := range nodes {
peers = append(peers, raft.Peer{ID: id, Context: node.Bytes()})
}
return peers, nil
}
//Get the raft apply index of the highest block
func (n *Node) getBlockAppliedIndex() uint64 {
height := uint64(0)
n.blockAppliedIndex.Range(
func(key, value interface{}) bool {
k := key.(uint64)
if k > height {
height = k
}
return true
})
appliedIndex, ok := n.blockAppliedIndex.Load(height)
if !ok {
return 0
}
return appliedIndex.(uint64)
}
//Load the lastAppliedIndex of block height
func (n *Node) loadAppliedIndex() uint64 {
dat, err := n.storage.Get(appliedDbKey)
var lastAppliedIndex uint64
if err != nil {
lastAppliedIndex = 0
} else {
lastAppliedIndex = binary.LittleEndian.Uint64(dat)
}
return lastAppliedIndex
}
//Write the lastAppliedIndex
func (n *Node) writeAppliedIndex(index uint64) {
buf := make([]byte, 8)
binary.LittleEndian.PutUint64(buf, index)
if err := n.storage.Put(appliedDbKey, buf); err != nil {
n.logger.Errorf("persisted the latest applied index: %s", err)
}
}

View File

@ -1,4 +1,4 @@
package main
package etcdraft
import (
"io/ioutil"
@ -32,7 +32,7 @@ func TestNode_Start(t *testing.T) {
nodes := make(map[uint64]types.Address)
nodes[ID] = types.String2Address("")
fileData, err := ioutil.ReadFile("../../../../config/order.toml")
fileData, err := ioutil.ReadFile("../../../config/order.toml")
require.Nil(t, err)
err = ioutil.WriteFile(filepath.Join(repoRoot, "order.toml"), fileData, 0644)
require.Nil(t, err)

View File

@ -5,12 +5,13 @@ package proto
import (
fmt "fmt"
_ "github.com/gogo/protobuf/gogoproto"
proto "github.com/gogo/protobuf/proto"
github_com_meshplus_bitxhub_kit_types "github.com/meshplus/bitxhub-kit/types"
io "io"
math "math"
math_bits "math/bits"
_ "github.com/gogo/protobuf/gogoproto"
proto "github.com/gogo/protobuf/proto"
github_com_meshplus_bitxhub_kit_types "github.com/meshplus/bitxhub-kit/types"
)
// Reference imports to suppress errors if they are not otherwise used.

View File

@ -1,4 +1,4 @@
package main
package etcdraft
import (
"fmt"

View File

@ -14,7 +14,7 @@ import (
"github.com/Rican7/retry/strategy"
"github.com/meshplus/bitxhub-kit/types"
"github.com/meshplus/bitxhub-model/pb"
raftproto "github.com/meshplus/bitxhub/internal/plugins/order/etcdraft/proto"
raftproto "github.com/meshplus/bitxhub/pkg/order/etcdraft/proto"
"github.com/meshplus/bitxhub/pkg/peermgr"
"github.com/meshplus/bitxhub/pkg/storage"
"github.com/sirupsen/logrus"

161
pkg/order/solo/node.go Normal file
View File

@ -0,0 +1,161 @@
package solo
import (
"container/list"
"context"
"fmt"
"sync"
"time"
"github.com/meshplus/bitxhub-kit/types"
"github.com/meshplus/bitxhub-model/pb"
"github.com/meshplus/bitxhub/pkg/order"
"github.com/meshplus/bitxhub/pkg/storage/leveldb"
"github.com/sirupsen/logrus"
)
type Node struct {
sync.RWMutex
height uint64 // current block height
pendingTxs *list.List //pending tx pool
commitC chan *pb.Block //block channel
logger logrus.FieldLogger //logger
reqLookUp *order.ReqLookUp //bloom filter
getTransactionFunc func(hash types.Hash) (*pb.Transaction, error)
packSize int //maximum number of transaction packages
blockTick time.Duration //block packed period
ctx context.Context
cancel context.CancelFunc
}
func (n *Node) Start() error {
go n.execute()
return nil
}
func (n *Node) Stop() {
n.cancel()
}
func (n *Node) Prepare(tx *pb.Transaction) error {
hash := tx.TransactionHash
if ok := n.reqLookUp.LookUp(hash.Bytes()); ok {
if tx, _ := n.getTransactionFunc(hash); tx != nil {
return nil
}
}
n.pushBack(tx)
return nil
}
func (n *Node) Commit() chan *pb.Block {
return n.commitC
}
func (n *Node) Step(ctx context.Context, msg []byte) error {
return nil
}
func (n *Node) Ready() bool {
return true
}
func (n *Node) ReportState(height uint64, hash types.Hash) {
if err := n.reqLookUp.Build(); err != nil {
n.logger.Errorf("bloom filter persistence error", err)
}
if height%10 == 0 {
n.logger.WithFields(logrus.Fields{
"height": height,
"hash": hash.ShortString(),
}).Info("Report checkpoint")
}
}
func (n *Node) Quorum() uint64 {
return 1
}
func NewNode(opts ...order.Option) (order.Order, error) {
config, err := order.GenerateConfig(opts...)
if err != nil {
return nil, fmt.Errorf("generate config: %w", err)
}
storage, err := leveldb.New(config.StoragePath)
if err != nil {
return nil, fmt.Errorf("new leveldb: %w", err)
}
reqLookUp, err := order.NewReqLookUp(storage, config.Logger)
if err != nil {
return nil, fmt.Errorf("new bloom filter: %w", err)
}
ctx, cancel := context.WithCancel(context.Background())
return &Node{
height: config.Applied,
pendingTxs: list.New(),
commitC: make(chan *pb.Block, 1024),
packSize: 500,
blockTick: 500 * time.Millisecond,
reqLookUp: reqLookUp,
getTransactionFunc: config.GetTransactionFunc,
logger: config.Logger,
ctx: ctx,
cancel: cancel,
}, nil
}
// Schedule to collect txs to the ready channel
func (n *Node) execute() {
ticker := time.NewTicker(n.blockTick)
defer ticker.Stop()
for {
select {
case <-ticker.C:
n.Lock()
l := n.pendingTxs.Len()
if l == 0 {
n.Unlock()
continue
}
var size int
if l > n.packSize {
size = n.packSize
} else {
size = l
}
txs := make([]*pb.Transaction, 0, size)
for i := 0; i < size; i++ {
front := n.pendingTxs.Front()
tx := front.Value.(*pb.Transaction)
txs = append(txs, tx)
n.pendingTxs.Remove(front)
}
n.height++
n.Unlock()
block := &pb.Block{
BlockHeader: &pb.BlockHeader{
Version: []byte("1.0.0"),
Number: n.height,
Timestamp: time.Now().UnixNano(),
},
Transactions: txs,
}
n.commitC <- block
case <-n.ctx.Done():
n.logger.Infoln("Done txpool execute")
return
}
}
}
func (n *Node) pushBack(value interface{}) *list.Element {
n.Lock()
defer n.Unlock()
return n.pendingTxs.PushBack(value)
}

View File

@ -1,4 +1,4 @@
package main
package solo
import (
"io/ioutil"