Compare commits

...

17 Commits

Author SHA1 Message Date
Aiden X a5e5f793b6
Merge pull request #246 from meshplus/fix-txpool
fix(txpool): add some logs for txpool and delete the executing check on proposal gorutine.
2020-11-19 15:53:51 +08:00
Lizen0512 f28230e701 fix(txpool): add some logs for txpool and delete the executing check on proposal gorutine. 2020-11-05 16:43:14 +08:00
Aiden X 500b92cd6c
Merge pull request #244 from meshplus/fix-raft
1: new leader should not serve requests until finising executing the old batch; 2:  fix out of date cancel message.
2020-11-04 19:12:44 +08:00
Lizen0512 798e4ea7d0 fix(txpool): fix out of date cancel message 2020-11-04 17:14:55 +08:00
Lizen0512 9e15fa2163 fix(raft): new leader should not serve requests until finising executing the old batch 2020-11-04 16:03:44 +08:00
Aiden X 6562305531 build: bump 1.0.1 2020-11-03 11:14:04 +08:00
Aiden X b86a6dd1aa
Merge pull request #231 from meshplus/fix/show-key-error-1.0
fix(cmd): fix "bitxhub key show" command with wrong path
2020-10-30 15:13:29 +08:00
Alexader 7e0e7f9f2e fix(cmd): fix "bitxhub key show" command with wrong path
fix "btxhub key show" command should return error but not bug
2020-10-29 20:32:42 +08:00
Aiden X f99efe3884
Merge pull request #206 from meshplus/fix/post-state-after-persist-1.0
fix(executor): post block event after block is persisted
2020-10-20 10:40:58 +08:00
zhourong b3d61faeb9 fix(executor): post block event after block is persisted 2020-10-16 20:10:05 +08:00
Aiden X 0c47d3190e
Merge pull request #162 from meshplus/chore/add-release-ci-1.0
chore(.github): add CI/CD requirement for release branch
2020-09-17 14:25:17 +08:00
Alexader 7669d26732 chore(.github): add CI/CD requirement for release branch 2020-09-17 13:00:23 +08:00
Aiden X aad9ecf522
Merge pull request #146 from meshplus/scripts/add-deploy-script
scripts(*): modify deploy bash
2020-08-25 20:16:21 +08:00
Aiden X 36c853c0d9 scripts(*): modify deploy bash 2020-08-25 20:15:49 +08:00
jzhe 8ec50083c8
Merge pull request #116 from meshplus/fix/fix-parse-block-error-in-restful-service
fix(api):fix parse block struct in restful service
2020-07-10 14:09:32 +08:00
Aiden X e8486dee35
Merge pull request #112 from meshplus/fix/executor-1.0
fix(executor): add normalTxs initialization
2020-07-09 16:49:39 +08:00
zhourong 5b37d77439 fix(executor): add normalTxs initialization 2020-07-09 16:39:29 +08:00
9 changed files with 196 additions and 99 deletions

View File

@ -2,9 +2,13 @@ name: build
on:
push:
branches: [ master ]
branches:
- master
- release-*
pull_request:
branches: [ master ]
branches:
- master
- release-*
jobs:
lint:

View File

@ -2,7 +2,7 @@
SHELL := /bin/bash
CURRENT_PATH = $(shell pwd)
APP_NAME = bitxhub
APP_VERSION = 1.0.0-rc1
APP_VERSION = 1.0.1
# build with verison infos
VERSION_DIR = github.com/meshplus/${APP_NAME}

View File

@ -6,11 +6,10 @@ import (
"os"
"path/filepath"
"github.com/meshplus/bitxhub-kit/key"
"github.com/libp2p/go-libp2p-core/crypto"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/meshplus/bitxhub-kit/crypto/asym/ecdsa"
"github.com/meshplus/bitxhub-kit/key"
"github.com/meshplus/bitxhub/internal/repo"
"github.com/meshplus/bitxhub/pkg/cert"
"github.com/urfave/cli"
@ -41,12 +40,6 @@ func keyCMD() cli.Command {
Name: "show",
Usage: "Show key from cert",
Action: showKey,
Flags: []cli.Flag{
cli.StringFlag{
Name: "path",
Usage: "Node Path",
},
},
},
{
Name: "pid",

View File

@ -129,6 +129,7 @@ func (exec *BlockExecutor) listenExecuteEvent() {
func (exec *BlockExecutor) persistData() {
for data := range exec.persistC {
exec.ledger.PersistBlockData(data)
exec.postBlockEvent(data.Block, data.InterchainMeta)
}
}

View File

@ -55,6 +55,7 @@ func (exec *BlockExecutor) processExecuteEvent(block *pb.Block) {
"count": len(block.Transactions),
}).Infof("Execute block")
exec.normalTxs = make([]types.Hash, 0)
validTxs, invalidReceipts := exec.verifySign(block)
receipts := exec.applyTransactions(validTxs)
receipts = append(receipts, invalidReceipts...)
@ -97,7 +98,6 @@ func (exec *BlockExecutor) processExecuteEvent(block *pb.Block) {
Counter: counter,
L2Roots: l2Roots,
}
exec.postBlockEvent(block, interchainMeta)
exec.clear()
exec.currentHeight = block.BlockHeader.Number

View File

@ -6,6 +6,7 @@ import (
"fmt"
"path/filepath"
"sync"
"sync/atomic"
"time"
"github.com/coreos/etcd/raft"
@ -48,10 +49,12 @@ type Node struct {
snapshotIndex uint64 // current snapshot apply index in raft log
lastIndex uint64 // last apply index in raft log
readyPool *sync.Pool // ready pool, avoiding memory growth fast
readyCache sync.Map //ready cache
ctx context.Context // context
haltC chan struct{} // exit signal
readyPool *sync.Pool // ready pool, avoiding memory growth fast
readyCache sync.Map //ready cache
ctx context.Context // context
haltC chan struct{} // exit signal
justElected bool
isRestart bool
}
// NewNode new raft node
@ -121,6 +124,7 @@ func (n *Node) Start() error {
}
if restart {
n.node = raft.RestartNode(rc)
n.isRestart = true
} else {
n.node = raft.StartNode(rc, n.peers)
}
@ -141,6 +145,7 @@ func (n *Node) Stop() {
//Add the transaction into txpool and broadcast it to other nodes
func (n *Node) Prepare(tx *pb.Transaction) error {
if !n.Ready() {
n.logger.Warningf("Replica %d is unready, we will drop the transaction")
return nil
}
if err := n.tp.AddPendingTx(tx, false); err != nil {
@ -171,7 +176,7 @@ func (n *Node) ReportState(height uint64, hash types.Hash) {
}
//block already persisted, record the apply index in db
n.writeAppliedIndex(appliedIndex.(uint64))
n.blockAppliedIndex.Delete(height)
n.blockAppliedIndex.Delete(height - 1)
n.tp.BuildReqLookUp() //store bloom filter
@ -279,17 +284,14 @@ func (n *Node) run() {
if !ok {
n.proposeC = nil
} else {
if !n.IsLeader() {
n.tp.CheckExecute(false)
continue
}
data, err := ready.Marshal()
if err != nil {
n.logger.Panic(err)
}
n.tp.BatchStore(ready.TxHashes)
n.logger.Debugf("Proposed block %d to raft core consensus", ready.Height)
if err := n.node.Propose(n.ctx, data); err != nil {
n.logger.Panic("Failed to propose block [%d] to raft: %s", ready.Height, err)
n.logger.Errorf("Failed to propose block [%d] to raft: %s", ready.Height, err)
}
}
case cc, ok := <-n.confChangeC:
@ -299,7 +301,7 @@ func (n *Node) run() {
confChangeCount++
cc.ID = confChangeCount
if err := n.node.ProposeConfChange(n.ctx, cc); err != nil {
n.logger.Panic("Failed to propose configuration update to Raft node: %s", err)
n.logger.Errorf("Failed to propose configuration update to Raft node: %s", err)
}
}
case <-n.ctx.Done():
@ -320,7 +322,29 @@ func (n *Node) run() {
// 1: Write HardState, Entries, and Snapshot to persistent storage if they
// are not empty.
if err := n.raftStorage.Store(rd.Entries, rd.HardState, rd.Snapshot); err != nil {
n.logger.Fatalf("failed to persist etcd/raft data: %s", err)
n.logger.Errorf("Failed to persist etcd/raft data: %s", err)
}
if rd.SoftState != nil {
newLeader := atomic.LoadUint64(&rd.SoftState.Lead)
if newLeader != n.leader {
n.logger.Infof("Raft leader changed: %d -> %d", n.leader, newLeader)
oldLeader := n.leader
n.leader = newLeader
if newLeader == n.id {
// If the cluster is started for the first time, the leader node starts listening requests directly.
if !n.isRestart && n.getBlockAppliedIndex() == uint64(0) {
n.tp.CheckExecute(true)
} else {
// new leader should not serve requests
n.justElected = true
}
}
// old leader node stop batch block
if oldLeader == n.id {
n.tp.CheckExecute(false)
}
}
}
// 2: Apply Snapshot (if any) and CommittedEntries to the state machine.
@ -330,10 +354,17 @@ func (n *Node) run() {
return
}
}
if rd.SoftState != nil {
n.leader = rd.SoftState.Lead
if n.justElected {
msgInflight := n.ramLastIndex() > n.appliedIndex+1
if msgInflight {
n.logger.Debugf("There are in flight blocks, new leader should not serve requests")
continue
}
n.justElected = false
n.tp.CheckExecute(n.IsLeader())
}
// 3: AsyncSend all Messages to the nodes named in the To field.
go n.send(rd.Messages)
@ -348,6 +379,12 @@ func (n *Node) run() {
}
}
func (n *Node) ramLastIndex() uint64 {
i, _ := n.raftStorage.ram.LastIndex()
n.logger.Infof("New Leader's last index is %d, appliedIndex is %d", i, n.appliedIndex)
return i
}
// send raft consensus message
func (n *Node) send(messages []raftpb.Message) {
for _, msg := range messages {
@ -393,6 +430,9 @@ func (n *Node) publishEntries(ents []raftpb.Entry) bool {
switch ents[i].Type {
case raftpb.EntryNormal:
if len(ents[i].Data) == 0 {
if ents[i].Index > n.appliedIndex {
n.appliedIndex = ents[i].Index
}
// ignore empty messages
break
}

View File

@ -4,16 +4,16 @@ import (
"container/list"
"context"
"fmt"
"strconv"
"sync"
"sync/atomic"
"time"
"github.com/meshplus/bitxhub/pkg/order"
"github.com/Rican7/retry"
"github.com/Rican7/retry/strategy"
"github.com/meshplus/bitxhub-kit/types"
"github.com/meshplus/bitxhub-model/pb"
"github.com/meshplus/bitxhub/pkg/order"
raftproto "github.com/meshplus/bitxhub/pkg/order/etcdraft/proto"
"github.com/meshplus/bitxhub/pkg/peermgr"
"github.com/meshplus/bitxhub/pkg/storage"
@ -22,6 +22,8 @@ import (
type getTransactionFunc func(hash types.Hash) (*pb.Transaction, error)
var cancelKey string = "cancelKey"
type TxPool struct {
sync.RWMutex //lock for the pendingTxs
nodeId uint64 //node id
@ -36,9 +38,8 @@ type TxPool struct {
reqLookUp *order.ReqLookUp //bloom filter
storage storage.Storage //storage pending tx
config *Config //tx pool config
ctx context.Context //context
cancel context.CancelFunc //stop Execute
getTransactionFunc getTransactionFunc //get transaction by ledger
poolContext *poolContext
getTransactionFunc getTransactionFunc //get transaction by ledger
}
type Config struct {
@ -48,6 +49,12 @@ type Config struct {
SetSize int //how many transactions should the node broadcast at once
}
type poolContext struct {
ctx context.Context //context
cancel context.CancelFunc //stop Execute
timestamp string
}
//New txpool
func New(config *order.Config, storage storage.Storage, txPoolConfig *Config) (*TxPool, chan *raftproto.Ready) {
readyC := make(chan *raftproto.Ready)
@ -55,8 +62,7 @@ func New(config *order.Config, storage storage.Storage, txPoolConfig *Config) (*
if err != nil {
return nil, nil
}
ctx, cancel := context.WithCancel(context.Background())
return &TxPool{
txPool := &TxPool{
nodeId: config.ID,
peerMgr: config.PeerMgr,
logger: config.Logger,
@ -68,15 +74,15 @@ func New(config *order.Config, storage storage.Storage, txPoolConfig *Config) (*
storage: storage,
getTransactionFunc: config.GetTransactionFunc,
config: txPoolConfig,
ctx: ctx,
cancel: cancel,
}, readyC
poolContext: newTxPoolContext(),
}
return txPool, readyC
}
//AddPendingTx add pending transaction into txpool
func (tp *TxPool) AddPendingTx(tx *pb.Transaction, isAckTx bool) error {
if tp.PoolSize() >= tp.config.PoolSize {
tp.logger.Debugf("Tx pool size: %d is full", tp.PoolSize())
tp.logger.Warningf("Tx pool size: %d is full", tp.PoolSize())
return nil
}
hash := tx.TransactionHash
@ -132,7 +138,7 @@ func (tp *TxPool) CheckExecute(isLeader bool) {
}
} else {
if tp.isExecuting {
tp.cancel()
tp.poolContext.cancel()
}
}
}
@ -144,12 +150,14 @@ func (tp *TxPool) executeInit() {
tp.isExecuting = true
tp.pendingTxs.Init()
tp.presenceTxs = sync.Map{}
tp.logger.Debugln("start txpool execute")
tp.poolContext = newTxPoolContext()
tp.logger.Debugf("Replica %d start txpool execute", tp.nodeId)
}
//execute schedule to collect txs to the ready channel
func (tp *TxPool) execute() {
tp.executeInit()
timestamp := tp.poolContext.ctx.Value(cancelKey).(string)
ticker := time.NewTicker(tp.config.BlockTick)
defer ticker.Stop()
@ -161,10 +169,15 @@ func (tp *TxPool) execute() {
continue
}
tp.readyC <- ready
case <-tp.ctx.Done():
tp.isExecuting = false
tp.logger.Infoln("done txpool execute")
return
case <-tp.poolContext.ctx.Done():
value := tp.poolContext.ctx.Value(cancelKey)
newTimestamp := value.(string)
if timestamp == newTimestamp {
tp.isExecuting = false
tp.logger.Info("Stop batching the transactions")
return
}
tp.logger.Warning("Out of date done execute message")
}
}
@ -196,12 +209,13 @@ func (tp *TxPool) ready() *raftproto.Ready {
continue
}
hashes = append(hashes, hash)
}
if len(hashes) == 0 {
return nil
}
height := tp.UpdateHeight()
tp.logger.Debugf("Leader generate a transaction batch with %d txs, which height is %d, " +
"and now there are %d pending txs in txPool", len(hashes), height, tp.pendingTxs.Len())
return &raftproto.Ready{
TxHashes: hashes,
Height: height,
@ -263,7 +277,7 @@ func (tp *TxPool) Broadcast(tx *pb.Transaction) error {
continue
}
if err := tp.peerMgr.AsyncSend(id, msg); err != nil {
tp.logger.Debugf("send tx to:%d %s", id, err.Error())
tp.logger.Warningf("Send tx to:%d %s", id, err.Error())
continue
}
}
@ -389,3 +403,15 @@ func (tp *TxPool) BatchDelete(hashes []types.Hash) {
}
batch.Commit()
}
func newTxPoolContext() *poolContext {
timestamp := time.Now().UnixNano()
key := strconv.FormatInt(timestamp, 10)
newCtx := context.WithValue(context.Background(), cancelKey, key)
ctx, cancel := context.WithCancel(newCtx)
return &poolContext{
ctx: ctx,
cancel: cancel,
timestamp: key,
}
}

View File

@ -0,0 +1,29 @@
package txpool
import (
"github.com/stretchr/testify/assert"
"testing"
"time"
)
func TestPoolContext(t *testing.T) {
ast := assert.New(t)
poolContext := newTxPoolContext()
oldSeed := poolContext.ctx.Value(cancelKey)
oldCancel := poolContext
s1 := oldSeed.(string)
var s2 string
go func() {
for {
select {
case <- poolContext.ctx.Done():
s2 = s1
}
}
}()
poolContext = newTxPoolContext()
oldCancel.cancel()
time.Sleep(500 * time.Millisecond)
ast.NotEqual(s1,s2)
}

View File

@ -6,22 +6,41 @@ CURRENT_PATH=$(pwd)
PROJECT_PATH=$(dirname "${CURRENT_PATH}")
BUILD_PATH=${CURRENT_PATH}/build
APP_VERSION=$(git describe --tag)
N=4
# help prompt message
function printHelp() {
print_blue "Usage: "
echo " deploy.sh [-a <bitxhub_addr>] [-n <node_num>] [-r <if_recompile>] [-u <username>] [-p <build_path>]"
echo " deploy.sh [-a <bitxhub_addr>] [-r <if_recompile>] [-u <username>] [-p <build_path>]"
echo " - 'a' - the ip address of bitxhub node"
echo " - 'n' - node number to be deployed in one server"
echo " - 'r' - if need to recompile locally"
echo " - 'u' - the username of remote linux server"
echo " - 'p' - the deploy path relative to HOME directory in linux server"
echo " deploy.sh -h (print this message)"
}
function splitWindow() {
tmux splitw -v -p 50
tmux splitw -h -p 50
tmux selectp -t 0
tmux splitw -h -p 50
}
function deploy() {
rm -rf "${BUILD_PATH}"
mkdir "${BUILD_PATH}"
print_blue "1. Generate config"
bash config.sh "$NODE_NUM"
for ((i = 1; i < $N + 1; i = i + 1)); do
root=${BUILD_PATH}/node${i}
mkdir -p "${root}"
cp -rf "${CURRENT_PATH}"/certs/node${i}/certs "${root}"
cp -rf "${PROJECT_PATH}"/config/* "${root}"
echo " #!/usr/bin/env bash" >"${root}"/start.sh
echo "./bitxhub --repo \$(pwd)" start >>"${root}"/start.sh
x_replace 1s/1/${i}/ ${root}/network.toml
done
print_blue "2. Compile bitxhub"
if [[ $IF_RECOMPILE == true ]]; then
@ -30,73 +49,58 @@ function deploy() {
echo "Do not need compile"
fi
ips=$(echo $SERVER_ADDRESSES | tr "," "\n")
## prepare deploy package
cd "${CURRENT_PATH}"
cp ../bin/bitxhub_linux-amd64 "${BUILD_PATH}"/bitxhub
cp ../internal/plugins/build/*.so "${BUILD_PATH}"/
tar cf build${APP_VERSION}.tar.gz build
for ((i = 1; i < $N + 1; i = i + 1)); do
root=${BUILD_PATH}/node${i}
cp ../bin/bitxhub_linux-amd64 ${root}/bitxhub
mkdir -p ${root}/plugins
cp ../internal/plugins/build/*.so ${root}/plugins
j=1
for ip in $ips; do
x_replace s#ip4/127.0.0.1/tcp/400${j}#ip4/"$ip"/tcp/4001#g ${root}/network.toml
((j = j + 1))
done
done
print_blue "3. Deploy bitxhub"
i=1
cd "${CURRENT_PATH}"
scp build${APP_VERSION}.tar.gz ${USERNAME}@"${BITXHUB_ADDR}":${SERVER_BUILD_PATH}
for ip in $ips; do
scp -r build/node${i} ${USERNAME}@$ip:~/
((i = i + 1))
done
ssh -t ${USERNAME}@"${BITXHUB_ADDR}" '
cd '${SERVER_BUILD_PATH}'
CURRENT_PATH=$(pwd)
BUILD_PATH=${CURRENT_PATH}/build
N='$NODE_NUM'
tmux new -d -s deploy || (tmux kill-session -t deploy && tmux new -d -s deploy)
function splitWindow() {
tmux splitw -v -p 50
tmux splitw -h -p 50
tmux selectp -t 0
tmux splitw -h -p 50
}
for ((i = 0; i < $N / 4; i = i + 1)); do
splitWindow
tmux new-window
done
splitWindow
function start() {
cd ${CURRENT_PATH}
rm -rf build
tar -xf build.tar.gz
pkill -9 bitxhub
tmux kill-session -t bitxhub
tmux new -d -s bitxhub
cd build
for ((i=0;i<N/4;i=i+1)); do
splitWindow
tmux new-window
done
splitWindow
for ((i = 0;i < N;i = i + 1)); do
tmux selectw -t $(($i / 4))
tmux selectp -t $(($i % 4))
cp bitxhub ./node$(($i + 1))/
if [ ! -d ./node$(($i + 1))/plugins ]; then
mkdir ./node$(($i + 1))/plugins
fi
cp *.so ./node$(($i + 1))/plugins/
tmux send-keys "cd ${BUILD_PATH} && ./node$(($i + 1))/bitxhub --repo=${BUILD_PATH}/node$(($i + 1)) start" C-m
done
tmux selectw -t 0
}
start
tmux attach-session -t bitxhub
'
i=0
for ip in $ips; do
tmux selectw -t $(($i / 4))
tmux selectp -t $(($i % 4))
((i = i + 1))
tmux send-keys "ssh -t $USERNAME@$ip 'cd node${i} && bash start.sh'" C-m
done
tmux selectw -t 0
tmux attach-session -t deploy
}
while getopts "h?a:n:r:u:p:" opt; do
while getopts "h?a:r:u:p:" opt; do
case "$opt" in
h | \?)
printHelp
exit 0
;;
a)
BITXHUB_ADDR=$OPTARG
;;
n)
NODE_NUM=$OPTARG
SERVER_ADDRESSES=$OPTARG
;;
r)
IF_RECOMPILE=$OPTARG