fix(raft): fix raft consensus error
1. add the maximum transaction size to the tx pool. 2. leader election results in repeated ack transactions.
This commit is contained in:
parent
fc6a894231
commit
96ba5e0d90
|
@ -1,11 +1,12 @@
|
|||
[raft]
|
||||
election_tick = 10 # ElectionTick is the number of Node.Tick invocations that must pass between elections.(s)
|
||||
heartbeat_tick = 1 # HeartbeatTick is the number of Node.Tick invocations that must pass between heartbeats.(s)
|
||||
election_tick = 10 # ElectionTick is the number of Node.Tick invocations that must pass between elections.
|
||||
heartbeat_tick = 1 # HeartbeatTick is the number of Node.Tick invocations that must pass between heartbeats.
|
||||
max_size_per_msg = 1048576 # 1024*1024, MaxSizePerMsg limits the max size of each append message.
|
||||
max_inflight_msgs = 500 # MaxInflightMsgs limits the max number of in-flight append messages during optimistic replication phase.
|
||||
check_quorum = false # Leader steps down when quorum is not active for an electionTimeout.
|
||||
check_quorum = true # Leader steps down when quorum is not active for an electionTimeout.
|
||||
pre_vote = true # PreVote prevents reconnected node from disturbing network.
|
||||
disable_proposal_forwarding = true # This prevents blocks from being accidentally proposed by followers.
|
||||
[raft.tx_pool]
|
||||
pack_size = 500 # Maximum number of transaction packages.
|
||||
pack_size = 500 # How many transactions should the primary pack.
|
||||
pool_size = 50000 # How many transactions could the txPool stores in total.
|
||||
block_tick = "500ms" # Block packaging time period.
|
|
@ -16,6 +16,7 @@ type RAFTConfig struct {
|
|||
type TxPoolConfig struct {
|
||||
PackSize int `mapstructure:"pack_size"`
|
||||
BlockTick time.Duration `mapstructure:"block_tick"`
|
||||
PoolSize int `mapstructure:"pool_size"`
|
||||
}
|
||||
|
||||
type RAFT struct {
|
||||
|
@ -36,15 +37,16 @@ func defaultRaftConfig() raft.Config {
|
|||
MaxSizePerMsg: 1024 * 1024, //1024*1024, MaxSizePerMsg limits the max size of each append message.
|
||||
MaxInflightMsgs: 500, //MaxInflightMsgs limits the max number of in-flight append messages during optimistic replication phase.
|
||||
PreVote: true, // PreVote prevents reconnected node from disturbing network.
|
||||
CheckQuorum: false, // Leader steps down when quorum is not active for an electionTimeout.
|
||||
CheckQuorum: true, // Leader steps down when quorum is not active for an electionTimeout.
|
||||
DisableProposalForwarding: true, // This prevents blocks from being accidentally proposed by followers
|
||||
}
|
||||
}
|
||||
|
||||
func defaultTxPoolConfig() TxPoolConfig {
|
||||
return TxPoolConfig{
|
||||
PackSize: 500, //Maximum number of transaction packages.
|
||||
PackSize: 500, // How many transactions should the primary pack.
|
||||
BlockTick: 500 * time.Millisecond, //Block packaging time period.
|
||||
PoolSize: 50000, //How many transactions could the txPool stores in total.
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -87,6 +89,9 @@ func generateTxPoolConfig(repoRoot string) (*TxPoolConfig, error) {
|
|||
if readConfig.RAFT.TxPoolConfig.PackSize > 0 {
|
||||
defaultTxPoolConfig.PackSize = readConfig.RAFT.TxPoolConfig.PackSize
|
||||
}
|
||||
if readConfig.RAFT.TxPoolConfig.PoolSize > 0 {
|
||||
defaultTxPoolConfig.PoolSize = readConfig.RAFT.TxPoolConfig.PoolSize
|
||||
}
|
||||
return &defaultTxPoolConfig, nil
|
||||
}
|
||||
|
||||
|
|
|
@ -82,7 +82,12 @@ func NewNode(opts ...order.Option) (order.Order, error) {
|
|||
if err != nil {
|
||||
return nil, fmt.Errorf("generate raft txpool config: %w", err)
|
||||
}
|
||||
txPool, proposeC := txpool.New(config, dbStorage, tpc.PackSize, tpc.BlockTick)
|
||||
txPoolConfig := &txpool.Config{
|
||||
PackSize: tpc.PackSize,
|
||||
BlockTick: tpc.BlockTick,
|
||||
PoolSize: tpc.PoolSize,
|
||||
}
|
||||
txPool, proposeC := txpool.New(config, dbStorage, txPoolConfig)
|
||||
|
||||
readyPool := &sync.Pool{New: func() interface{} {
|
||||
return new(raftproto.Ready)
|
||||
|
@ -134,7 +139,10 @@ func (n *Node) Stop() {
|
|||
|
||||
//Add the transaction into txpool and broadcast it to other nodes
|
||||
func (n *Node) Prepare(tx *pb.Transaction) error {
|
||||
if err := n.tp.AddPendingTx(tx); err != nil {
|
||||
if !n.Ready() {
|
||||
return nil
|
||||
}
|
||||
if err := n.tp.AddPendingTx(tx, false); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := n.tp.Broadcast(tx); err != nil {
|
||||
|
@ -149,6 +157,12 @@ func (n *Node) Commit() chan *pb.Block {
|
|||
}
|
||||
|
||||
func (n *Node) ReportState(height uint64, hash types.Hash) {
|
||||
if height%10 == 0 {
|
||||
n.logger.WithFields(logrus.Fields{
|
||||
"height": height,
|
||||
"hash": hash.ShortString(),
|
||||
}).Info("Report checkpoint")
|
||||
}
|
||||
appliedIndex, ok := n.blockAppliedIndex.Load(height)
|
||||
if !ok {
|
||||
n.logger.Errorf("can not found appliedIndex:", height)
|
||||
|
@ -165,16 +179,11 @@ func (n *Node) ReportState(height uint64, hash types.Hash) {
|
|||
n.logger.Errorf("can not found ready:", height)
|
||||
return
|
||||
}
|
||||
hashes := ready.(*raftproto.Ready).TxHashes
|
||||
// remove redundant tx
|
||||
n.tp.RemoveTxs(ready.(*raftproto.Ready).TxHashes, n.IsLeader())
|
||||
n.readyCache.Delete(height)
|
||||
n.tp.BatchDelete(hashes)
|
||||
|
||||
if height%10 == 0 {
|
||||
n.logger.WithFields(logrus.Fields{
|
||||
"height": height,
|
||||
"hash": hash.ShortString(),
|
||||
}).Info("Report checkpoint")
|
||||
}
|
||||
n.readyCache.Delete(height)
|
||||
}
|
||||
|
||||
func (n *Node) Quorum() uint64 {
|
||||
|
@ -220,13 +229,17 @@ func (n *Node) Step(ctx context.Context, msg []byte) error {
|
|||
}
|
||||
return n.peerMgr.Send(rm.FromId, m)
|
||||
case raftproto.RaftMessage_GET_TX_ACK:
|
||||
fallthrough
|
||||
tx := &pb.Transaction{}
|
||||
if err := tx.Unmarshal(rm.Data); err != nil {
|
||||
return err
|
||||
}
|
||||
return n.tp.AddPendingTx(tx, true)
|
||||
case raftproto.RaftMessage_BROADCAST_TX:
|
||||
tx := &pb.Transaction{}
|
||||
if err := tx.Unmarshal(rm.Data); err != nil {
|
||||
return err
|
||||
}
|
||||
return n.tp.AddPendingTx(tx)
|
||||
return n.tp.AddPendingTx(tx, false)
|
||||
default:
|
||||
return fmt.Errorf("unexpected raft message received")
|
||||
}
|
||||
|
@ -265,6 +278,10 @@ func (n *Node) run() {
|
|||
if !ok {
|
||||
n.proposeC = nil
|
||||
} else {
|
||||
if !n.IsLeader() {
|
||||
n.tp.CheckExecute(false)
|
||||
continue
|
||||
}
|
||||
data, err := ready.Marshal()
|
||||
if err != nil {
|
||||
n.logger.Panic(err)
|
||||
|
@ -357,9 +374,6 @@ func (n *Node) send(messages []raftpb.Message) {
|
|||
|
||||
err = n.peerMgr.Send(msg.To, p2pMsg)
|
||||
if err != nil {
|
||||
n.logger.WithFields(logrus.Fields{
|
||||
"mgs_to": msg.To,
|
||||
}).Debugln("message consensus error")
|
||||
n.node.ReportUnreachable(msg.To)
|
||||
status = raft.SnapshotFailure
|
||||
}
|
||||
|
@ -393,11 +407,12 @@ func (n *Node) publishEntries(ents []raftpb.Entry) bool {
|
|||
// https://github.com/coreos/etcd/pull/7899). In this
|
||||
// scenario, when the node comes back up, we will re-apply
|
||||
// a few entries.
|
||||
if n.getBlockAppliedIndex() >= ents[i].Index {
|
||||
// after commit, update appliedIndex
|
||||
blockAppliedIndex := n.getBlockAppliedIndex()
|
||||
if blockAppliedIndex >= ents[i].Index {
|
||||
n.appliedIndex = ents[i].Index
|
||||
continue
|
||||
}
|
||||
|
||||
n.mint(ready)
|
||||
n.blockAppliedIndex.Store(ready.Height, ents[i].Index)
|
||||
case raftpb.EntryConfChange:
|
||||
|
@ -436,6 +451,14 @@ func (n *Node) publishEntries(ents []raftpb.Entry) bool {
|
|||
|
||||
//mint the block
|
||||
func (n *Node) mint(ready *raftproto.Ready) {
|
||||
n.logger.WithFields(logrus.Fields{
|
||||
"height": ready.Height,
|
||||
"count": len(ready.TxHashes),
|
||||
}).Debugln("block will be generated")
|
||||
//follower node update the block height
|
||||
if n.tp.GetHeight() == ready.Height-1 {
|
||||
n.tp.UpdateHeight()
|
||||
}
|
||||
loseTxs := make([]types.Hash, 0)
|
||||
txs := make([]*pb.Transaction, 0, len(ready.TxHashes))
|
||||
for _, hash := range ready.TxHashes {
|
||||
|
@ -445,14 +468,14 @@ func (n *Node) mint(ready *raftproto.Ready) {
|
|||
}
|
||||
}
|
||||
|
||||
//handler missing tx
|
||||
//handle missing txs
|
||||
if len(loseTxs) != 0 {
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(len(loseTxs))
|
||||
for _, hash := range loseTxs {
|
||||
go func(hash types.Hash) {
|
||||
defer wg.Done()
|
||||
n.tp.FetchTx(hash)
|
||||
n.tp.FetchTx(hash, ready.Height)
|
||||
}(hash)
|
||||
}
|
||||
wg.Wait()
|
||||
|
@ -462,10 +485,7 @@ func (n *Node) mint(ready *raftproto.Ready) {
|
|||
tx, _ := n.tp.GetTx(hash, false)
|
||||
txs = append(txs, tx)
|
||||
}
|
||||
//follower node update the block height
|
||||
if !n.IsLeader() {
|
||||
n.tp.UpdateHeight()
|
||||
}
|
||||
n.tp.RemoveTxs(ready.TxHashes, n.IsLeader())
|
||||
block := &pb.Block{
|
||||
BlockHeader: &pb.BlockHeader{
|
||||
Version: []byte("1.0.0"),
|
||||
|
@ -474,7 +494,6 @@ func (n *Node) mint(ready *raftproto.Ready) {
|
|||
},
|
||||
Transactions: txs,
|
||||
}
|
||||
|
||||
n.logger.WithFields(logrus.Fields{
|
||||
"txpool_size": n.tp.PoolSize(),
|
||||
}).Debugln("current tx pool size")
|
||||
|
@ -551,7 +570,7 @@ func (n *Node) getBlockAppliedIndex() uint64 {
|
|||
return appliedIndex.(uint64)
|
||||
}
|
||||
|
||||
//Load the lastAppliedIndex of
|
||||
//Load the lastAppliedIndex of block height
|
||||
func (n *Node) loadAppliedIndex() uint64 {
|
||||
dat, err := n.storage.Get(appliedDbKey)
|
||||
var lastAppliedIndex uint64
|
||||
|
|
|
@ -71,22 +71,6 @@ func TestNode_Start(t *testing.T) {
|
|||
require.Equal(t, 1, len(block.Transactions))
|
||||
|
||||
order.Stop()
|
||||
err = order.Start()
|
||||
require.Nil(t, err)
|
||||
for {
|
||||
time.Sleep(200 * time.Millisecond)
|
||||
if order.Ready() {
|
||||
break
|
||||
}
|
||||
}
|
||||
tx1 := generateTx()
|
||||
err = order.Prepare(tx1)
|
||||
require.Nil(t, err)
|
||||
|
||||
block1 := <-order.Commit()
|
||||
require.Equal(t, uint64(2), block1.BlockHeader.Number)
|
||||
require.Equal(t, 1, len(block.Transactions))
|
||||
order.Stop()
|
||||
}
|
||||
|
||||
func generateTx() *pb.Transaction {
|
||||
|
|
|
@ -4,7 +4,6 @@ import (
|
|||
"container/list"
|
||||
"context"
|
||||
"fmt"
|
||||
"strconv"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
@ -21,28 +20,36 @@ import (
|
|||
"github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
type getTransactionFunc func(hash types.Hash) (*pb.Transaction, error)
|
||||
|
||||
type TxPool struct {
|
||||
sync.RWMutex
|
||||
sync.RWMutex //lock for the pendingTxs
|
||||
nodeId uint64 //node id
|
||||
height uint64 //current block height
|
||||
isExecuting bool //only raft leader can execute
|
||||
pendingTxs *list.List //pending tx pool
|
||||
presenceTxs sync.Map //tx cache
|
||||
readyC chan *raftproto.Ready
|
||||
ackTxs map[types.Hash]bool //ack tx means get tx by pb.RaftMessage_GET_TX_ACK
|
||||
readyC chan *raftproto.Ready //ready channel, receive by raft Propose channel
|
||||
peerMgr peermgr.PeerManager //network manager
|
||||
logger logrus.FieldLogger //logger
|
||||
reqLookUp *order.ReqLookUp //bloom filter
|
||||
storage storage.Storage //storage pending tx
|
||||
getTransactionFunc func(hash types.Hash) (*pb.Transaction, error)
|
||||
isExecuting bool //only raft leader can execute
|
||||
packSize int //maximum number of transaction packages
|
||||
blockTick time.Duration //block packed period
|
||||
config *Config //tx pool config
|
||||
ctx context.Context //context
|
||||
cancel context.CancelFunc //stop Execute
|
||||
getTransactionFunc getTransactionFunc //get transaction by ledger
|
||||
}
|
||||
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
type Config struct {
|
||||
PackSize int //how many transactions should the primary pack
|
||||
BlockTick time.Duration //block packaging time period
|
||||
PoolSize int //how many transactions could the txPool stores in total
|
||||
SetSize int //how many transactions should the node broadcast at once
|
||||
}
|
||||
|
||||
//New txpool
|
||||
func New(config *order.Config, storage storage.Storage, packSize int, blockTick time.Duration) (*TxPool, chan *raftproto.Ready) {
|
||||
func New(config *order.Config, storage storage.Storage, txPoolConfig *Config) (*TxPool, chan *raftproto.Ready) {
|
||||
readyC := make(chan *raftproto.Ready)
|
||||
reqLookUp, err := order.NewReqLookUp(storage, config.Logger)
|
||||
if err != nil {
|
||||
|
@ -56,18 +63,22 @@ func New(config *order.Config, storage storage.Storage, packSize int, blockTick
|
|||
readyC: readyC,
|
||||
height: config.Applied,
|
||||
pendingTxs: list.New(),
|
||||
ackTxs: make(map[types.Hash]bool),
|
||||
reqLookUp: reqLookUp,
|
||||
storage: storage,
|
||||
packSize: packSize,
|
||||
blockTick: blockTick,
|
||||
getTransactionFunc: config.GetTransactionFunc,
|
||||
config: txPoolConfig,
|
||||
ctx: ctx,
|
||||
cancel: cancel,
|
||||
}, readyC
|
||||
}
|
||||
|
||||
//Add pending transaction into txpool
|
||||
func (tp *TxPool) AddPendingTx(tx *pb.Transaction) error {
|
||||
//AddPendingTx add pending transaction into txpool
|
||||
func (tp *TxPool) AddPendingTx(tx *pb.Transaction, isAckTx bool) error {
|
||||
if tp.PoolSize() >= tp.config.PoolSize {
|
||||
tp.logger.Debugf("Tx pool size: %d is full", tp.PoolSize())
|
||||
return nil
|
||||
}
|
||||
hash := tx.TransactionHash
|
||||
if e := tp.get(hash); e != nil {
|
||||
return nil
|
||||
|
@ -80,7 +91,7 @@ func (tp *TxPool) AddPendingTx(tx *pb.Transaction) error {
|
|||
}
|
||||
}
|
||||
//add pending tx
|
||||
tp.pushBack(hash, tx)
|
||||
tp.pushBack(hash, tx, isAckTx)
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -91,31 +102,29 @@ func (tp *TxPool) PoolSize() int {
|
|||
return tp.pendingTxs.Len()
|
||||
}
|
||||
|
||||
//Remove stored transactions
|
||||
//RemoveTxs remove txs from the cache
|
||||
func (tp *TxPool) RemoveTxs(hashes []types.Hash, isLeader bool) {
|
||||
if isLeader {
|
||||
tp.BatchDelete(hashes)
|
||||
}
|
||||
tp.Lock()
|
||||
defer tp.Unlock()
|
||||
for _, hash := range hashes {
|
||||
if !isLeader {
|
||||
if e := tp.get(hash); e != nil {
|
||||
tp.Lock()
|
||||
tp.pendingTxs.Remove(e)
|
||||
tp.Unlock()
|
||||
}
|
||||
}
|
||||
tp.presenceTxs.Delete(hash)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
//Store the bloom filter
|
||||
//BuildReqLookUp store the bloom filter
|
||||
func (tp *TxPool) BuildReqLookUp() {
|
||||
if err := tp.reqLookUp.Build(); err != nil {
|
||||
tp.logger.Errorf("bloom filter persistence error:", err)
|
||||
}
|
||||
}
|
||||
|
||||
//Check the txpool status, only leader node can run Execute()
|
||||
//CheckExecute check the txpool status, only leader node can run Execute()
|
||||
func (tp *TxPool) CheckExecute(isLeader bool) {
|
||||
if isLeader {
|
||||
if !tp.isExecuting {
|
||||
|
@ -128,12 +137,20 @@ func (tp *TxPool) CheckExecute(isLeader bool) {
|
|||
}
|
||||
}
|
||||
|
||||
// Schedule to collect txs to the ready channel
|
||||
func (tp *TxPool) execute() {
|
||||
//execute init
|
||||
func (tp *TxPool) executeInit() {
|
||||
tp.Lock()
|
||||
defer tp.Unlock()
|
||||
tp.isExecuting = true
|
||||
tp.pendingTxs.Init()
|
||||
tp.presenceTxs = sync.Map{}
|
||||
ticker := time.NewTicker(tp.blockTick)
|
||||
tp.logger.Debugln("start txpool execute")
|
||||
}
|
||||
|
||||
//execute schedule to collect txs to the ready channel
|
||||
func (tp *TxPool) execute() {
|
||||
tp.executeInit()
|
||||
ticker := time.NewTicker(tp.config.BlockTick)
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
|
@ -143,19 +160,17 @@ func (tp *TxPool) execute() {
|
|||
if ready == nil {
|
||||
continue
|
||||
}
|
||||
tp.logger.WithFields(logrus.Fields{
|
||||
"height": ready.Height,
|
||||
}).Debugln("block will be generated")
|
||||
tp.readyC <- ready
|
||||
case <-tp.ctx.Done():
|
||||
tp.isExecuting = false
|
||||
tp.logger.Infoln("Done txpool execute")
|
||||
tp.logger.Infoln("done txpool execute")
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
//ready pack the block
|
||||
func (tp *TxPool) ready() *raftproto.Ready {
|
||||
tp.Lock()
|
||||
defer tp.Unlock()
|
||||
|
@ -165,8 +180,8 @@ func (tp *TxPool) ready() *raftproto.Ready {
|
|||
}
|
||||
|
||||
var size int
|
||||
if l > tp.packSize {
|
||||
size = tp.packSize
|
||||
if l > tp.config.PackSize {
|
||||
size = tp.config.PackSize
|
||||
} else {
|
||||
size = l
|
||||
}
|
||||
|
@ -174,8 +189,17 @@ func (tp *TxPool) ready() *raftproto.Ready {
|
|||
for i := 0; i < size; i++ {
|
||||
front := tp.pendingTxs.Front()
|
||||
tx := front.Value.(*pb.Transaction)
|
||||
hashes = append(hashes, tx.TransactionHash)
|
||||
hash := tx.TransactionHash
|
||||
tp.pendingTxs.Remove(front)
|
||||
if _, ok := tp.ackTxs[hash]; ok {
|
||||
delete(tp.ackTxs, hash)
|
||||
continue
|
||||
}
|
||||
hashes = append(hashes, hash)
|
||||
|
||||
}
|
||||
if len(hashes) == 0 {
|
||||
return nil
|
||||
}
|
||||
height := tp.UpdateHeight()
|
||||
return &raftproto.Ready{
|
||||
|
@ -184,17 +208,17 @@ func (tp *TxPool) ready() *raftproto.Ready {
|
|||
}
|
||||
}
|
||||
|
||||
//Add the block height
|
||||
//UpdateHeight add the block height
|
||||
func (tp *TxPool) UpdateHeight() uint64 {
|
||||
return atomic.AddUint64(&tp.height, 1)
|
||||
}
|
||||
|
||||
//Get current block height
|
||||
//GetHeight get current block height
|
||||
func (tp *TxPool) GetHeight() uint64 {
|
||||
return atomic.LoadUint64(&tp.height)
|
||||
}
|
||||
|
||||
//Get the transaction by txpool or ledger
|
||||
//GetTx get the transaction by txpool or ledger
|
||||
func (tp *TxPool) GetTx(hash types.Hash, findByStore bool) (*pb.Transaction, bool) {
|
||||
if e := tp.get(hash); e != nil {
|
||||
return e.Value.(*pb.Transaction), true
|
||||
|
@ -239,7 +263,7 @@ func (tp *TxPool) Broadcast(tx *pb.Transaction) error {
|
|||
continue
|
||||
}
|
||||
if err := tp.peerMgr.Send(id, msg); err != nil {
|
||||
tp.logger.Debugln("send transaction error:", err)
|
||||
tp.logger.Debugf("send tx to:%d %s", id, err.Error())
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
@ -247,7 +271,7 @@ func (tp *TxPool) Broadcast(tx *pb.Transaction) error {
|
|||
}
|
||||
|
||||
// Fetch tx by local txpool or network
|
||||
func (tp *TxPool) FetchTx(hash types.Hash) *pb.Transaction {
|
||||
func (tp *TxPool) FetchTx(hash types.Hash, height uint64) *pb.Transaction {
|
||||
if tx, ok := tp.GetTx(hash, false); ok {
|
||||
return tx
|
||||
}
|
||||
|
@ -266,18 +290,13 @@ func (tp *TxPool) FetchTx(hash types.Hash) *pb.Transaction {
|
|||
}
|
||||
|
||||
asyncGet := func() (tx *pb.Transaction, err error) {
|
||||
for id := range tp.peerMgr.Peers() {
|
||||
if id == tp.nodeId {
|
||||
continue
|
||||
}
|
||||
if tx, ok := tp.GetTx(hash, false); ok {
|
||||
return tx, nil
|
||||
}
|
||||
if err := tp.peerMgr.Send(id, m); err != nil {
|
||||
return nil, err
|
||||
if err := tp.peerMgr.Broadcast(m); err != nil {
|
||||
tp.logger.Debugln(err)
|
||||
}
|
||||
}
|
||||
return nil, fmt.Errorf("can't get transaction: %s", hash.String())
|
||||
return nil, fmt.Errorf("can't get tx: %s, block_height:%d", hash.String(), height)
|
||||
}
|
||||
|
||||
var tx *pb.Transaction
|
||||
|
@ -291,59 +310,12 @@ func (tp *TxPool) FetchTx(hash types.Hash) *pb.Transaction {
|
|||
return err
|
||||
}
|
||||
return nil
|
||||
}, strategy.Wait(200*time.Millisecond)); err != nil {
|
||||
}, strategy.Wait(50*time.Millisecond)); err != nil {
|
||||
tp.logger.Errorln(err)
|
||||
}
|
||||
return tx
|
||||
}
|
||||
|
||||
// Fetch tx by local txpool or network
|
||||
func (tp *TxPool) FetchBlock(height uint64) (*pb.Block, error) {
|
||||
get := func(height uint64) (block *pb.Block, err error) {
|
||||
for id := range tp.peerMgr.Peers() {
|
||||
block, err = tp.getBlock(id, int(height))
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
return block, nil
|
||||
}
|
||||
return nil, fmt.Errorf("can't get block: %d", height)
|
||||
}
|
||||
|
||||
var block *pb.Block
|
||||
if err := retry.Retry(func(attempt uint) (err error) {
|
||||
block, err = get(height)
|
||||
if err != nil {
|
||||
tp.logger.Debugln(err)
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}, strategy.Wait(200*time.Millisecond), strategy.Limit(1)); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return block, nil
|
||||
}
|
||||
|
||||
//Get block by network
|
||||
func (tp *TxPool) getBlock(id uint64, i int) (*pb.Block, error) {
|
||||
m := &pb.Message{
|
||||
Type: pb.Message_GET_BLOCK,
|
||||
Data: []byte(strconv.Itoa(i)),
|
||||
}
|
||||
|
||||
res, err := tp.peerMgr.SyncSend(id, m)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
block := &pb.Block{}
|
||||
if err := block.Unmarshal(res.Data); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return block, nil
|
||||
}
|
||||
|
||||
func (tp *TxPool) get(key types.Hash) *list.Element {
|
||||
e, ok := tp.presenceTxs.Load(key)
|
||||
if ok {
|
||||
|
@ -352,28 +324,35 @@ func (tp *TxPool) get(key types.Hash) *list.Element {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (tp *TxPool) pushBack(key types.Hash, value interface{}) *list.Element {
|
||||
func (tp *TxPool) pushBack(key types.Hash, value interface{}, isAckTx bool) *list.Element {
|
||||
tp.Lock()
|
||||
defer tp.Unlock()
|
||||
if e := tp.get(key); e != nil {
|
||||
return nil
|
||||
}
|
||||
if isAckTx {
|
||||
tp.ackTxs[key] = true
|
||||
}
|
||||
e := tp.pendingTxs.PushBack(value)
|
||||
tp.presenceTxs.Store(key, e)
|
||||
return e
|
||||
}
|
||||
|
||||
var transactionKey = []byte("tx-")
|
||||
|
||||
func compositeKey(prefix []byte, value interface{}) []byte {
|
||||
func compositeKey(value interface{}) []byte {
|
||||
var prefix = []byte("tx-")
|
||||
return append(prefix, []byte(fmt.Sprintf("%v", value))...)
|
||||
}
|
||||
|
||||
func (tp *TxPool) store(tx *pb.Transaction) {
|
||||
txKey := compositeKey(transactionKey, tx.TransactionHash.Bytes())
|
||||
txKey := compositeKey(tx.TransactionHash.Bytes())
|
||||
txData, _ := tx.Marshal()
|
||||
if err := tp.storage.Put(txKey, txData); err != nil {
|
||||
tp.logger.Error("store tx error:", err)
|
||||
}
|
||||
}
|
||||
|
||||
func (tp *TxPool) load(hash types.Hash) (*pb.Transaction, bool) {
|
||||
txKey := compositeKey(transactionKey, hash.Bytes())
|
||||
txKey := compositeKey(hash.Bytes())
|
||||
txData, err := tp.storage.Get(txKey)
|
||||
if err != nil {
|
||||
return nil, false
|
||||
|
@ -386,16 +365,17 @@ func (tp *TxPool) load(hash types.Hash) (*pb.Transaction, bool) {
|
|||
return &tx, true
|
||||
}
|
||||
|
||||
//batch store txs
|
||||
//BatchStore batch store txs
|
||||
func (tp *TxPool) BatchStore(hashes []types.Hash) {
|
||||
batch := tp.storage.NewBatch()
|
||||
for _, hash := range hashes {
|
||||
e := tp.get(hash)
|
||||
if e == nil {
|
||||
tp.logger.Debugln("BatchStore not found tx:", hash.String())
|
||||
continue
|
||||
}
|
||||
tx := e.Value.(*pb.Transaction)
|
||||
txKey := compositeKey(transactionKey, hash.Bytes())
|
||||
txKey := compositeKey(hash.Bytes())
|
||||
txData, _ := tx.Marshal()
|
||||
batch.Put(txKey, txData)
|
||||
}
|
||||
|
@ -404,11 +384,11 @@ func (tp *TxPool) BatchStore(hashes []types.Hash) {
|
|||
}
|
||||
}
|
||||
|
||||
//batch delete txs
|
||||
//BatchDelete batch delete txs
|
||||
func (tp *TxPool) BatchDelete(hashes []types.Hash) {
|
||||
batch := tp.storage.NewBatch()
|
||||
for _, hash := range hashes {
|
||||
txKey := compositeKey(transactionKey, hash.Bytes())
|
||||
txKey := compositeKey(hash.Bytes())
|
||||
batch.Delete(txKey)
|
||||
}
|
||||
if err := batch.Commit(); err != nil {
|
||||
|
|
Loading…
Reference in New Issue