Merge pull request #197 from meshplus/pprof-perf
feat(pprof): add runtime type for pprof
This commit is contained in:
commit
9a6019d57e
|
@ -7,6 +7,7 @@ import (
|
|||
"os"
|
||||
"os/signal"
|
||||
"path/filepath"
|
||||
"runtime/pprof"
|
||||
"sync"
|
||||
"syscall"
|
||||
"time"
|
||||
|
@ -19,6 +20,7 @@ import (
|
|||
"github.com/meshplus/bitxhub/internal/coreapi"
|
||||
"github.com/meshplus/bitxhub/internal/loggers"
|
||||
"github.com/meshplus/bitxhub/internal/repo"
|
||||
|
||||
"github.com/prometheus/client_golang/prometheus/promhttp"
|
||||
"github.com/urfave/cli"
|
||||
)
|
||||
|
@ -60,7 +62,12 @@ func start(ctx *cli.Context) error {
|
|||
loggers.Initialize(repo.Config)
|
||||
|
||||
if repo.Config.PProf.Enable {
|
||||
runPProf(repo.Config.Port.PProf)
|
||||
switch repo.Config.PProf.PType {
|
||||
case "runtime":
|
||||
go runtimePProf(repo.Config.PProf.Mode, repo.NetworkConfig.ID, repo.Config.PProf.Duration)
|
||||
case "http":
|
||||
httpPProf(repo.Config.Port.PProf)
|
||||
}
|
||||
}
|
||||
|
||||
if repo.Config.Monitor.Enable {
|
||||
|
@ -135,7 +142,45 @@ func handleShutdown(node *app.BitXHub, wg *sync.WaitGroup) {
|
|||
}()
|
||||
}
|
||||
|
||||
func runPProf(port int64) {
|
||||
// runtimePProf will record the cpu or memory profiles every 5 second.
|
||||
func runtimePProf(mode string, id uint64, duration time.Duration) {
|
||||
tick := time.NewTicker(duration)
|
||||
rootPath := fmt.Sprint("./scripts/build/", "node", id, "/pprof/")
|
||||
exist := fileExist(rootPath)
|
||||
if !exist {
|
||||
err := os.Mkdir(rootPath, os.ModePerm)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
var cpuFile *os.File
|
||||
if mode == "cpu" {
|
||||
cpuPath := fmt.Sprint(rootPath,"cpu-", time.Now().Format("20060102-15:04:05"))
|
||||
cpuFile, _ = os.Create(cpuPath)
|
||||
_ = pprof.StartCPUProfile(cpuFile)
|
||||
}
|
||||
for {
|
||||
select {
|
||||
case <- tick.C:
|
||||
switch mode {
|
||||
case "cpu":
|
||||
pprof.StopCPUProfile()
|
||||
_ = cpuFile.Close()
|
||||
cpuPath := fmt.Sprint(rootPath,"cpu-", time.Now().Format("20060102-15:04:05"))
|
||||
cpuFile, _ := os.Create(cpuPath)
|
||||
_ = pprof.StartCPUProfile(cpuFile)
|
||||
case "memory":
|
||||
memPath := fmt.Sprint(rootPath,"mem-", time.Now().Format("2006-01-02-15:04:05"))
|
||||
memFile, _ := os.Create(memPath)
|
||||
_ = pprof.WriteHeapProfile(memFile)
|
||||
_ = memFile.Close()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func httpPProf(port int64) {
|
||||
go func() {
|
||||
addr := fmt.Sprintf(":%d", port)
|
||||
logger.WithField("port", port).Info("Start pprof")
|
||||
|
@ -163,3 +208,14 @@ func runMonitor(port int64) {
|
|||
}
|
||||
}()
|
||||
}
|
||||
|
||||
func fileExist(path string) bool {
|
||||
_, err := os.Stat(path)
|
||||
if err != nil {
|
||||
if os.IsExist(err) {
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
|
|
@ -10,6 +10,9 @@ solo = false
|
|||
|
||||
[pprof]
|
||||
enable = true
|
||||
ptype = "runtime" # including two types: runtime (recommended for use during the development phase) and http
|
||||
mode = "cpu" # only required for runtime ptype, including two modes: cpu、memory
|
||||
duration = "5s" # only required for runtime ptype, every duration will generation a profile.
|
||||
|
||||
[monitor]
|
||||
enable = true
|
||||
|
|
|
@ -53,7 +53,10 @@ type Monitor struct {
|
|||
}
|
||||
|
||||
type PProf struct {
|
||||
Enable bool
|
||||
Enable bool `toml:"enbale" json:"enable"`
|
||||
PType string `toml:"ptype" json:"ptype"`
|
||||
Mode string `toml:"mode" json:"mode"`
|
||||
Duration time.Duration `toml:"duration" json:"duration"`
|
||||
}
|
||||
|
||||
type Gateway struct {
|
||||
|
|
|
@ -410,9 +410,9 @@ func (mpi *mempoolImpl) processCommitTransactions(ready *raftproto.Ready) {
|
|||
for account := range dirtyAccounts {
|
||||
commitNonce := mpi.txStore.nonceCache.getCommitNonce(account)
|
||||
if list, ok := mpi.txStore.allTxs[account]; ok {
|
||||
// removeBySortedNonceKey all previous seq number txs for this account.
|
||||
// remove all previous seq number txs for this account.
|
||||
removedTxs := list.forward(commitNonce)
|
||||
// removeBySortedNonceKey index smaller than commitNonce delete index.
|
||||
// remove index smaller than commitNonce delete index.
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(3)
|
||||
go func(ready map[string][]*pb.Transaction) {
|
||||
|
|
|
@ -13,84 +13,84 @@ import (
|
|||
|
||||
func TestRecvTransaction(t *testing.T) {
|
||||
ast := assert.New(t)
|
||||
mempool, _ := mockMempoolImpl()
|
||||
mpi, _ := mockMempoolImpl()
|
||||
defer cleanTestData()
|
||||
|
||||
privKey1 := genPrivKey()
|
||||
tx1 := constructTx(uint64(1), &privKey1)
|
||||
go mempool.txCache.listenEvent()
|
||||
go mpi.txCache.listenEvent()
|
||||
go func() {
|
||||
_ = mempool.RecvTransaction(tx1)
|
||||
_ = mpi.RecvTransaction(tx1)
|
||||
}()
|
||||
select {
|
||||
case txSet := <-mempool.txCache.txSetC:
|
||||
case txSet := <-mpi.txCache.txSetC:
|
||||
ast.Equal(1, len(txSet.TxList))
|
||||
}
|
||||
|
||||
err := mempool.Start()
|
||||
err := mpi.Start()
|
||||
ast.Nil(err)
|
||||
privKey2 := genPrivKey()
|
||||
go func() {
|
||||
_ = mempool.RecvTransaction(tx1)
|
||||
_ = mpi.RecvTransaction(tx1)
|
||||
}()
|
||||
time.Sleep(1 * time.Millisecond)
|
||||
ast.Equal(1, mempool.txStore.priorityIndex.size())
|
||||
ast.Equal(0, mempool.txStore.parkingLotIndex.size())
|
||||
ast.Equal(1, mpi.txStore.priorityIndex.size())
|
||||
ast.Equal(0, mpi.txStore.parkingLotIndex.size())
|
||||
|
||||
tx2 := constructTx(uint64(2), &privKey1)
|
||||
tx3 := constructTx(uint64(1), &privKey2)
|
||||
tx4 := constructTx(uint64(2), &privKey2)
|
||||
go func() {
|
||||
_ = mempool.RecvTransaction(tx4)
|
||||
_ = mpi.RecvTransaction(tx4)
|
||||
}()
|
||||
time.Sleep(1 * time.Millisecond)
|
||||
ast.Equal(1, mempool.txStore.priorityIndex.size())
|
||||
ast.Equal(1, mempool.txStore.parkingLotIndex.size())
|
||||
ast.Equal(1, mpi.txStore.priorityIndex.size())
|
||||
ast.Equal(1, mpi.txStore.parkingLotIndex.size())
|
||||
go func() {
|
||||
_ = mempool.RecvTransaction(tx2)
|
||||
_ = mpi.RecvTransaction(tx2)
|
||||
}()
|
||||
time.Sleep(1 * time.Millisecond)
|
||||
ast.Equal(2, mempool.txStore.priorityIndex.size())
|
||||
ast.Equal(1, mempool.txStore.parkingLotIndex.size())
|
||||
ast.Equal(2, mpi.txStore.priorityIndex.size())
|
||||
ast.Equal(1, mpi.txStore.parkingLotIndex.size())
|
||||
go func() {
|
||||
_ = mempool.RecvTransaction(tx3)
|
||||
_ = mpi.RecvTransaction(tx3)
|
||||
}()
|
||||
time.Sleep(1 * time.Millisecond)
|
||||
ast.Equal(4, mempool.txStore.priorityIndex.size())
|
||||
ast.Equal(1, mempool.txStore.parkingLotIndex.size(), "delete tx4 until finishing executor")
|
||||
mempool.Stop()
|
||||
ast.Equal(4, mpi.txStore.priorityIndex.size())
|
||||
ast.Equal(1, mpi.txStore.parkingLotIndex.size(), "delete tx4 until finishing executor")
|
||||
mpi.Stop()
|
||||
}
|
||||
|
||||
func TestRecvForwardTxs(t *testing.T) {
|
||||
ast := assert.New(t)
|
||||
mempool, _ := mockMempoolImpl()
|
||||
mpi, _ := mockMempoolImpl()
|
||||
defer cleanTestData()
|
||||
|
||||
privKey1 := genPrivKey()
|
||||
tx := constructTx(uint64(1), &privKey1)
|
||||
txList := []*pb.Transaction{tx}
|
||||
txSlice := &TxSlice{TxList: txList}
|
||||
go mempool.RecvForwardTxs(txSlice)
|
||||
go mpi.RecvForwardTxs(txSlice)
|
||||
select {
|
||||
case txSet := <-mempool.subscribe.txForwardC:
|
||||
case txSet := <-mpi.subscribe.txForwardC:
|
||||
ast.Equal(1, len(txSet.TxList))
|
||||
}
|
||||
}
|
||||
|
||||
func TestUpdateLeader(t *testing.T) {
|
||||
ast := assert.New(t)
|
||||
mempool, _ := mockMempoolImpl()
|
||||
mempool.Start()
|
||||
mpi, _ := mockMempoolImpl()
|
||||
mpi.Start()
|
||||
defer cleanTestData()
|
||||
go mempool.UpdateLeader(uint64(2))
|
||||
go mpi.UpdateLeader(uint64(2))
|
||||
time.Sleep(1 * time.Millisecond)
|
||||
ast.Equal(uint64(2), mempool.leader)
|
||||
ast.Equal(uint64(2), mpi.leader)
|
||||
}
|
||||
|
||||
func TestGetBlock(t *testing.T) {
|
||||
ast := assert.New(t)
|
||||
mempool, _ := mockMempoolImpl()
|
||||
err := mempool.Start()
|
||||
mpi, _ := mockMempoolImpl()
|
||||
err := mpi.Start()
|
||||
ast.Nil(err)
|
||||
defer cleanTestData()
|
||||
|
||||
|
@ -105,20 +105,34 @@ func TestGetBlock(t *testing.T) {
|
|||
var txHashList []types.Hash
|
||||
txList = append(txList, tx1, tx2, tx3, tx4)
|
||||
txHashList = append(txHashList, tx1.TransactionHash, tx2.TransactionHash, tx3.TransactionHash, tx5.TransactionHash)
|
||||
err = mempool.processTransactions(txList)
|
||||
err = mpi.processTransactions(txList)
|
||||
ast.Nil(err)
|
||||
ready := &raftproto.Ready{
|
||||
Height: uint64(2),
|
||||
TxHashes: txHashList,
|
||||
}
|
||||
missingTxnHashList, txList := mempool.GetBlock(ready)
|
||||
missingTxnHashList, txList := mpi.GetBlock(ready)
|
||||
ast.Equal(1, len(missingTxnHashList), "missing tx5")
|
||||
ast.Equal(3, len(txList))
|
||||
|
||||
// mock leader to getBlock
|
||||
mpi.leader = uint64(1)
|
||||
missingTxnHashList, txList = mpi.GetBlock(ready)
|
||||
ast.Equal(4, len(missingTxnHashList))
|
||||
ast.Equal(0, len(txList))
|
||||
|
||||
// mock follower
|
||||
mpi.leader = uint64(2)
|
||||
txList = []*pb.Transaction{}
|
||||
txList = append(txList, tx5)
|
||||
err = mempool.processTransactions(txList)
|
||||
missingTxnHashList, txList = mempool.GetBlock(ready)
|
||||
err = mpi.processTransactions(txList)
|
||||
missingTxnHashList, txList = mpi.GetBlock(ready)
|
||||
ast.Equal(0, len(missingTxnHashList))
|
||||
ast.Equal(4, len(txList))
|
||||
|
||||
// mock leader to getBlock
|
||||
mpi.leader = uint64(1)
|
||||
missingTxnHashList, txList = mpi.GetBlock(ready)
|
||||
ast.Equal(0, len(missingTxnHashList))
|
||||
ast.Equal(4, len(txList))
|
||||
}
|
||||
|
|
|
@ -2,9 +2,11 @@ package mempool
|
|||
|
||||
import (
|
||||
"fmt"
|
||||
"github.com/meshplus/bitxhub-model/pb"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"testing"
|
||||
|
||||
"github.com/meshplus/bitxhub-model/pb"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
func TestGetAccount(t *testing.T) {
|
||||
|
@ -27,3 +29,15 @@ func TestGetAccount(t *testing.T) {
|
|||
_, err = getAccount(tx)
|
||||
ast.NotNil(err.Error(), "unmarshal invoke payload faile")
|
||||
}
|
||||
|
||||
func TestPoolIsFull(t *testing.T) {
|
||||
ast := assert.New(t)
|
||||
mpi, _ := mockMempoolImpl()
|
||||
defer cleanTestData()
|
||||
|
||||
isFull := mpi.poolIsFull()
|
||||
ast.Equal(false, isFull)
|
||||
mpi.txStore.poolSize = DefaultPoolSize
|
||||
isFull = mpi.poolIsFull()
|
||||
ast.Equal(true, isFull)
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue