diff --git a/api/jsonrpc/namespaces/eth/filters/filter_system.go b/api/jsonrpc/namespaces/eth/filters/filter_system.go index 552ef3b..4449aff 100644 --- a/api/jsonrpc/namespaces/eth/filters/filter_system.go +++ b/api/jsonrpc/namespaces/eth/filters/filter_system.go @@ -101,10 +101,10 @@ type EventSystem struct { chainSub event.Subscription // Subscription for new chain event // Channels - install chan *subscription // install filter for event notification - uninstall chan *subscription // remove filter for event notification - txsCh chan events2.NewTxsEvent // Channel to receive new transactions event - logsCh chan []*pb.EvmLog // Channel to receive new log event + install chan *subscription // install filter for event notification + uninstall chan *subscription // remove filter for event notification + txsCh chan pb.Transactions // Channel to receive new transactions event + logsCh chan []*pb.EvmLog // Channel to receive new log event //pendingLogsCh chan []*pb.EvmLog // Channel to receive new log event chainCh chan events2.ExecutedEvent // Channel to receive new chain event } @@ -121,7 +121,7 @@ func NewEventSystem(api api.CoreAPI, lightMode bool) *EventSystem { lightMode: lightMode, install: make(chan *subscription), uninstall: make(chan *subscription), - txsCh: make(chan events2.NewTxsEvent, txChanSize), + txsCh: make(chan pb.Transactions, txChanSize), logsCh: make(chan []*pb.EvmLog, logsChanSize), //pendingLogsCh: make(chan []*pb.EvmLog, logsChanSize), chainCh: make(chan events2.ExecutedEvent, chainEvChanSize), @@ -335,9 +335,9 @@ func (es *EventSystem) handlePendingLogs(filters filterIndex, ev []*pb.EvmLog) { } } -func (es *EventSystem) handleTxsEvent(filters filterIndex, ev events2.NewTxsEvent) { - hashes := make([]*types2.Hash, 0, len(ev.Txs)) - for _, tx := range ev.Txs { +func (es *EventSystem) handleTxsEvent(filters filterIndex, ev pb.Transactions) { + hashes := make([]*types2.Hash, 0, len(ev.Transactions)) + for _, tx := range ev.Transactions { hashes = append(hashes, tx.GetHash()) } for _, f := range filters[PendingTransactionsSubscription] { diff --git a/internal/coreapi/api/api.go b/internal/coreapi/api/api.go index 181fbba..b3e402e 100644 --- a/internal/coreapi/api/api.go +++ b/internal/coreapi/api/api.go @@ -65,7 +65,7 @@ type ChainAPI interface { type FeedAPI interface { SubscribeLogsEvent(chan<- []*pb.EvmLog) event.Subscription - SubscribeNewTxEvent(chan<- events.NewTxsEvent) event.Subscription + SubscribeNewTxEvent(chan<- pb.Transactions) event.Subscription SubscribeNewBlockEvent(chan<- events.ExecutedEvent) event.Subscription BloomStatus() (uint64, uint64) } diff --git a/internal/coreapi/broker.go b/internal/coreapi/broker.go index 41add4c..831a162 100644 --- a/internal/coreapi/broker.go +++ b/internal/coreapi/broker.go @@ -312,9 +312,12 @@ func (b BrokerAPI) DelVPNode(delID uint64) error { } func (b BrokerAPI) GetPendingTransactions(max int) []pb.Transaction { - return b.bxh.Order.GetPool().GetPendingTransactions(max) + // TODO + return nil } func (b BrokerAPI) GetPoolTransaction(hash *types.Hash) pb.Transaction { - return b.bxh.Order.GetPool().GetTransaction(hash) + // TODO + + return nil } diff --git a/internal/coreapi/feed.go b/internal/coreapi/feed.go index 050c847..214ff4e 100644 --- a/internal/coreapi/feed.go +++ b/internal/coreapi/feed.go @@ -11,8 +11,8 @@ type FeedAPI CoreAPI var _ api.FeedAPI = (*FeedAPI)(nil) -func (api *FeedAPI) SubscribeNewTxEvent(ch chan<- events.NewTxsEvent) event.Subscription { - return api.bxh.Order.GetPool().SubscribeTxEvent(ch) +func (api *FeedAPI) SubscribeNewTxEvent(ch chan<- pb.Transactions) event.Subscription { + return api.bxh.Order.SubscribeTxEvent(ch) } func (api *FeedAPI) SubscribeNewBlockEvent(ch chan<- events.ExecutedEvent) event.Subscription { diff --git a/pkg/order/etcdraft/node.go b/pkg/order/etcdraft/node.go index b488dbf..29acde3 100644 --- a/pkg/order/etcdraft/node.go +++ b/pkg/order/etcdraft/node.go @@ -11,6 +11,7 @@ import ( "github.com/coreos/etcd/raft" "github.com/coreos/etcd/raft/raftpb" + "github.com/ethereum/go-ethereum/event" "github.com/meshplus/bitxhub-kit/log" "github.com/meshplus/bitxhub-kit/storage" "github.com/meshplus/bitxhub-kit/types" @@ -255,9 +256,9 @@ func (n *Node) DelNode(delID uint64) error { return nil } -// GetPool returns memory pool. -func (n *Node) GetPool() mempool.MemPool { - return n.mempool +// SubscribeTxEvent subscribes tx event +func (n *Node) SubscribeTxEvent(events chan<- pb.Transactions) event.Subscription { + return n.mempool.SubscribeTxEvent(events) } // main work loop diff --git a/pkg/order/filter_test.go b/pkg/order/filter_test.go index af460ff..9c3ecd8 100644 --- a/pkg/order/filter_test.go +++ b/pkg/order/filter_test.go @@ -4,7 +4,6 @@ import ( "testing" "github.com/meshplus/bitxhub-kit/log" - "github.com/meshplus/bitxhub-kit/storage/leveldb" "github.com/stretchr/testify/require" ) diff --git a/pkg/order/mempool/mempool.go b/pkg/order/mempool/mempool.go index c127419..2307a2e 100644 --- a/pkg/order/mempool/mempool.go +++ b/pkg/order/mempool/mempool.go @@ -6,7 +6,6 @@ import ( "github.com/ethereum/go-ethereum/event" "github.com/meshplus/bitxhub-kit/types" "github.com/meshplus/bitxhub-model/pb" - "github.com/meshplus/bitxhub/internal/model/events" raftproto "github.com/meshplus/bitxhub/pkg/order/etcdraft/proto" ) @@ -29,7 +28,7 @@ type MemPool interface { GetTimeoutTransactions(rebroadcastDuration time.Duration) [][]pb.Transaction - SubscribeTxEvent(chan<- events.NewTxsEvent) event.Subscription + SubscribeTxEvent(chan<- pb.Transactions) event.Subscription External } diff --git a/pkg/order/mempool/mempool_impl.go b/pkg/order/mempool/mempool_impl.go index 7d63d4f..820e992 100644 --- a/pkg/order/mempool/mempool_impl.go +++ b/pkg/order/mempool/mempool_impl.go @@ -13,7 +13,6 @@ import ( "github.com/meshplus/bitxhub-kit/storage/leveldb" "github.com/meshplus/bitxhub-kit/types" "github.com/meshplus/bitxhub-model/pb" - "github.com/meshplus/bitxhub/internal/model/events" raftproto "github.com/meshplus/bitxhub/pkg/order/etcdraft/proto" "github.com/pkg/errors" "github.com/sirupsen/logrus" @@ -384,10 +383,10 @@ func loadOrCreateStorage(memPoolDir string) (storage.Storage, error) { return leveldb.New(memPoolDir) } -func (mpi *mempoolImpl) SubscribeTxEvent(ch chan<- events.NewTxsEvent) event.Subscription { +func (mpi *mempoolImpl) SubscribeTxEvent(ch chan<- pb.Transactions) event.Subscription { return mpi.txFeed.Subscribe(ch) } func (mpi *mempoolImpl) postTxsEvent(txList []pb.Transaction) { - go mpi.txFeed.Send(events.NewTxsEvent{Txs: txList}) + go mpi.txFeed.Send(pb.Transactions{Transactions: txList}) } diff --git a/pkg/order/order.go b/pkg/order/order.go index 0a6f3d1..d484963 100644 --- a/pkg/order/order.go +++ b/pkg/order/order.go @@ -1,9 +1,9 @@ package order import ( + "github.com/ethereum/go-ethereum/event" "github.com/meshplus/bitxhub-kit/types" "github.com/meshplus/bitxhub-model/pb" - "github.com/meshplus/bitxhub/pkg/order/mempool" ) //go:generate mockgen -destination mock_order/mock_order.go -package mock_order -source order.go @@ -38,5 +38,5 @@ type Order interface { // DelNode sends a delete vp request by given id. DelNode(delID uint64) error - GetPool() mempool.MemPool + SubscribeTxEvent(events chan<- pb.Transactions) event.Subscription } diff --git a/pkg/order/solo/node.go b/pkg/order/solo/node.go index 1274570..b3f967a 100644 --- a/pkg/order/solo/node.go +++ b/pkg/order/solo/node.go @@ -6,6 +6,7 @@ import ( "sync" "time" + "github.com/ethereum/go-ethereum/event" "github.com/meshplus/bitxhub-kit/types" "github.com/meshplus/bitxhub-model/pb" "github.com/meshplus/bitxhub/pkg/order" @@ -86,8 +87,8 @@ func (n *Node) Quorum() uint64 { return 1 } -func (n *Node) GetPool() mempool.MemPool { - return n.mempool +func (n *Node) SubscribeTxEvent(ch chan<- pb.Transactions) event.Subscription { + return n.mempool.SubscribeTxEvent(ch) } func NewNode(opts ...order.Option) (order.Order, error) {