Merge pull request #412 from meshplus/refactor/adapter-rbft

refactor(order): adapter rbft
This commit is contained in:
dawn-to-dusk 2021-04-30 18:28:00 +08:00 committed by GitHub
commit 8736e13208
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 28 additions and 26 deletions

View File

@ -103,7 +103,7 @@ type EventSystem struct {
// Channels
install chan *subscription // install filter for event notification
uninstall chan *subscription // remove filter for event notification
txsCh chan events2.NewTxsEvent // Channel to receive new transactions event
txsCh chan pb.Transactions // Channel to receive new transactions event
logsCh chan []*pb.EvmLog // Channel to receive new log event
//pendingLogsCh chan []*pb.EvmLog // Channel to receive new log event
chainCh chan events2.ExecutedEvent // Channel to receive new chain event
@ -121,7 +121,7 @@ func NewEventSystem(api api.CoreAPI, lightMode bool) *EventSystem {
lightMode: lightMode,
install: make(chan *subscription),
uninstall: make(chan *subscription),
txsCh: make(chan events2.NewTxsEvent, txChanSize),
txsCh: make(chan pb.Transactions, txChanSize),
logsCh: make(chan []*pb.EvmLog, logsChanSize),
//pendingLogsCh: make(chan []*pb.EvmLog, logsChanSize),
chainCh: make(chan events2.ExecutedEvent, chainEvChanSize),
@ -335,9 +335,9 @@ func (es *EventSystem) handlePendingLogs(filters filterIndex, ev []*pb.EvmLog) {
}
}
func (es *EventSystem) handleTxsEvent(filters filterIndex, ev events2.NewTxsEvent) {
hashes := make([]*types2.Hash, 0, len(ev.Txs))
for _, tx := range ev.Txs {
func (es *EventSystem) handleTxsEvent(filters filterIndex, ev pb.Transactions) {
hashes := make([]*types2.Hash, 0, len(ev.Transactions))
for _, tx := range ev.Transactions {
hashes = append(hashes, tx.GetHash())
}
for _, f := range filters[PendingTransactionsSubscription] {

View File

@ -65,7 +65,7 @@ type ChainAPI interface {
type FeedAPI interface {
SubscribeLogsEvent(chan<- []*pb.EvmLog) event.Subscription
SubscribeNewTxEvent(chan<- events.NewTxsEvent) event.Subscription
SubscribeNewTxEvent(chan<- pb.Transactions) event.Subscription
SubscribeNewBlockEvent(chan<- events.ExecutedEvent) event.Subscription
BloomStatus() (uint64, uint64)
}

View File

@ -312,9 +312,12 @@ func (b BrokerAPI) DelVPNode(delID uint64) error {
}
func (b BrokerAPI) GetPendingTransactions(max int) []pb.Transaction {
return b.bxh.Order.GetPool().GetPendingTransactions(max)
// TODO
return nil
}
func (b BrokerAPI) GetPoolTransaction(hash *types.Hash) pb.Transaction {
return b.bxh.Order.GetPool().GetTransaction(hash)
// TODO
return nil
}

View File

@ -11,8 +11,8 @@ type FeedAPI CoreAPI
var _ api.FeedAPI = (*FeedAPI)(nil)
func (api *FeedAPI) SubscribeNewTxEvent(ch chan<- events.NewTxsEvent) event.Subscription {
return api.bxh.Order.GetPool().SubscribeTxEvent(ch)
func (api *FeedAPI) SubscribeNewTxEvent(ch chan<- pb.Transactions) event.Subscription {
return api.bxh.Order.SubscribeTxEvent(ch)
}
func (api *FeedAPI) SubscribeNewBlockEvent(ch chan<- events.ExecutedEvent) event.Subscription {

View File

@ -11,6 +11,7 @@ import (
"github.com/coreos/etcd/raft"
"github.com/coreos/etcd/raft/raftpb"
"github.com/ethereum/go-ethereum/event"
"github.com/meshplus/bitxhub-kit/log"
"github.com/meshplus/bitxhub-kit/storage"
"github.com/meshplus/bitxhub-kit/types"
@ -255,9 +256,9 @@ func (n *Node) DelNode(delID uint64) error {
return nil
}
// GetPool returns memory pool.
func (n *Node) GetPool() mempool.MemPool {
return n.mempool
// SubscribeTxEvent subscribes tx event
func (n *Node) SubscribeTxEvent(events chan<- pb.Transactions) event.Subscription {
return n.mempool.SubscribeTxEvent(events)
}
// main work loop

View File

@ -4,7 +4,6 @@ import (
"testing"
"github.com/meshplus/bitxhub-kit/log"
"github.com/meshplus/bitxhub-kit/storage/leveldb"
"github.com/stretchr/testify/require"
)

View File

@ -6,7 +6,6 @@ import (
"github.com/ethereum/go-ethereum/event"
"github.com/meshplus/bitxhub-kit/types"
"github.com/meshplus/bitxhub-model/pb"
"github.com/meshplus/bitxhub/internal/model/events"
raftproto "github.com/meshplus/bitxhub/pkg/order/etcdraft/proto"
)
@ -29,7 +28,7 @@ type MemPool interface {
GetTimeoutTransactions(rebroadcastDuration time.Duration) [][]pb.Transaction
SubscribeTxEvent(chan<- events.NewTxsEvent) event.Subscription
SubscribeTxEvent(chan<- pb.Transactions) event.Subscription
External
}

View File

@ -13,7 +13,6 @@ import (
"github.com/meshplus/bitxhub-kit/storage/leveldb"
"github.com/meshplus/bitxhub-kit/types"
"github.com/meshplus/bitxhub-model/pb"
"github.com/meshplus/bitxhub/internal/model/events"
raftproto "github.com/meshplus/bitxhub/pkg/order/etcdraft/proto"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
@ -384,10 +383,10 @@ func loadOrCreateStorage(memPoolDir string) (storage.Storage, error) {
return leveldb.New(memPoolDir)
}
func (mpi *mempoolImpl) SubscribeTxEvent(ch chan<- events.NewTxsEvent) event.Subscription {
func (mpi *mempoolImpl) SubscribeTxEvent(ch chan<- pb.Transactions) event.Subscription {
return mpi.txFeed.Subscribe(ch)
}
func (mpi *mempoolImpl) postTxsEvent(txList []pb.Transaction) {
go mpi.txFeed.Send(events.NewTxsEvent{Txs: txList})
go mpi.txFeed.Send(pb.Transactions{Transactions: txList})
}

View File

@ -1,9 +1,9 @@
package order
import (
"github.com/ethereum/go-ethereum/event"
"github.com/meshplus/bitxhub-kit/types"
"github.com/meshplus/bitxhub-model/pb"
"github.com/meshplus/bitxhub/pkg/order/mempool"
)
//go:generate mockgen -destination mock_order/mock_order.go -package mock_order -source order.go
@ -38,5 +38,5 @@ type Order interface {
// DelNode sends a delete vp request by given id.
DelNode(delID uint64) error
GetPool() mempool.MemPool
SubscribeTxEvent(events chan<- pb.Transactions) event.Subscription
}

View File

@ -6,6 +6,7 @@ import (
"sync"
"time"
"github.com/ethereum/go-ethereum/event"
"github.com/meshplus/bitxhub-kit/types"
"github.com/meshplus/bitxhub-model/pb"
"github.com/meshplus/bitxhub/pkg/order"
@ -86,8 +87,8 @@ func (n *Node) Quorum() uint64 {
return 1
}
func (n *Node) GetPool() mempool.MemPool {
return n.mempool
func (n *Node) SubscribeTxEvent(ch chan<- pb.Transactions) event.Subscription {
return n.mempool.SubscribeTxEvent(ch)
}
func NewNode(opts ...order.Option) (order.Order, error) {