Merge pull request #313 from meshplus/feat/ha-module

feat: add ha module
This commit is contained in:
jzhe 2021-01-11 14:51:40 +08:00 committed by GitHub
commit 16cf3eb102
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 400 additions and 2 deletions

88
api/grpc/pier_monitor.go Normal file
View File

@ -0,0 +1,88 @@
package grpc
import (
"context"
"fmt"
"github.com/meshplus/bitxhub-model/pb"
)
func (cbs *ChainBrokerService) CheckMasterPier(ctx context.Context, req *pb.Address) (*pb.Response, error) {
resp := &pb.CheckPierResponse{}
ret, err := cbs.checkMasterPier(req.Address)
if err != nil {
return nil, err
}
if ret {
resp.Status = pb.CheckPierResponse_HAS_MASTER
} else {
resp.Status = pb.CheckPierResponse_NO_MASTER
}
resp.Address = req.Address
data, err := resp.Marshal()
if err != nil {
return nil, err
}
return &pb.Response{
Data: data,
}, nil
}
func (cbs *ChainBrokerService) SetMasterPier(ctx context.Context, req *pb.PierInfo) (*pb.Response, error) {
ret, err := cbs.checkMasterPier(req.Address)
if err != nil {
return nil, err
}
if ret {
return nil, fmt.Errorf("master pier already exist")
}
err = cbs.api.Network().PierManager().Piers().SetMaster(req.Address, req.Index, req.Timeout)
if err != nil {
return nil, err
}
resp := &pb.CheckPierResponse{
Address: req.Address,
}
data, err := resp.Marshal()
if err != nil {
return nil, err
}
return &pb.Response{
Data: data,
}, nil
}
func (cbs *ChainBrokerService) HeartBeat(ctx context.Context, req *pb.PierInfo) (*pb.Response, error) {
err := cbs.api.Network().PierManager().Piers().HeartBeat(req.Address, req.Index)
if err != nil {
return nil, err
}
resp := &pb.CheckPierResponse{
Address: req.Address,
}
data, err := resp.Marshal()
if err != nil {
return nil, err
}
return &pb.Response{
Data: data,
}, nil
}
func (cbs *ChainBrokerService) checkMasterPier(address string) (bool, error) {
pmgr := cbs.api.Network().PierManager()
if pmgr.Piers().HasPier(address) {
// cbs.logger.Infoln("native master")
return pmgr.Piers().CheckMaster(address), nil
} else {
// cbs.logger.Infoln("remote master")
return pmgr.AskPierMaster(address)
}
}

2
go.mod
View File

@ -27,7 +27,7 @@ require (
github.com/magiconair/properties v1.8.4
github.com/meshplus/bitxhub-core v0.1.0-rc1.0.20201125025329-ac1187099a88
github.com/meshplus/bitxhub-kit v1.1.2-0.20201203072410-8a0383a6870d
github.com/meshplus/bitxhub-model v1.1.2-0.20201229110212-37dd343b4c76
github.com/meshplus/bitxhub-model v1.1.2-0.20210107045700-cee670a2e117
github.com/meshplus/go-lightp2p v0.0.0-20210105060927-1c7850047415
github.com/mitchellh/go-homedir v1.1.0
github.com/multiformats/go-multiaddr v0.2.2

10
go.sum
View File

@ -594,6 +594,7 @@ github.com/mattn/go-runewidth v0.0.3/go.mod h1:LwmH8dsx7+W8Uxz3IHJYH5QSwggIsqBzp
github.com/mattn/go-runewidth v0.0.4/go.mod h1:LwmH8dsx7+W8Uxz3IHJYH5QSwggIsqBzpuz5H//U1FU=
github.com/matttproud/golang_protobuf_extensions v1.0.1 h1:4hp9jkHxhMHkqkrB3Ix0jegS5sx/RkqARlsWZ6pIwiU=
github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0=
github.com/meshplus/bitxhub-core v0.1.0-rc1 h1:i8/8Ay7McOdgAdeZIMeXRzMSzM0usN+hnkGVyIfrOtU=
github.com/meshplus/bitxhub-core v0.1.0-rc1.0.20201125025329-ac1187099a88 h1:2EZDE1IuN0R6FQ3XiB26wxV5LG0XuBV6LegOon1Rm2M=
github.com/meshplus/bitxhub-core v0.1.0-rc1.0.20201125025329-ac1187099a88/go.mod h1:MHf0waxqnW4Qwfpq66jqvJP+FritN5OTs/8wlQcNlJY=
github.com/meshplus/bitxhub-kit v1.0.0-rc1/go.mod h1:ra/AhOkPvpElI+wXrB9G6DjdcrdxFU3vMwA5MYKr9D0=
@ -603,9 +604,18 @@ github.com/meshplus/bitxhub-kit v1.1.2-0.20201023030558-9f36554d5d5d/go.mod h1:r
github.com/meshplus/bitxhub-kit v1.1.2-0.20201203072410-8a0383a6870d h1:J9tzTNf29mR0r97An3KoAtZQYlwpNhlMItWKyzKJLHU=
github.com/meshplus/bitxhub-kit v1.1.2-0.20201203072410-8a0383a6870d/go.mod h1:KR7ZlXhII9n0Bu8viaZTScvXCYn0MCQnYlsTvHPp0XA=
github.com/meshplus/bitxhub-model v1.0.0-rc3/go.mod h1:ZCctQIYTlE3vJ8Lhkrgs9bWwNA+Dw4JzojOSIzLVU6E=
github.com/meshplus/bitxhub-model v1.1.1 h1:/0Si29e14YW1GUbkJbCL8A70yXzxyiV/u36kxFC+gqI=
github.com/meshplus/bitxhub-model v1.1.2-0.20201021152621-0b3c17c54b23/go.mod h1:4qWBZx5wv7WZzUqiuBsbkQqQ2Ju8aOFpsoNpBBNy8Us=
github.com/meshplus/bitxhub-model v1.1.2-0.20201229110212-37dd343b4c76 h1:3EndfTR7Zb1RbqjLNavrQ9BzjNz2WFWak4l+muq3klA=
github.com/meshplus/bitxhub-model v1.1.2-0.20201229110212-37dd343b4c76/go.mod h1:x3H+TL24wcByzHegenLfs+5PQkQGNsk8eCm31QJMa+Q=
github.com/meshplus/bitxhub-model v1.1.2-0.20210104075011-9507270d5dd6 h1:tuASbdN84CMSyyaVEG5u7hKVJrGn42qhfaG+cDCtSRQ=
github.com/meshplus/bitxhub-model v1.1.2-0.20210104075011-9507270d5dd6/go.mod h1:x3H+TL24wcByzHegenLfs+5PQkQGNsk8eCm31QJMa+Q=
github.com/meshplus/bitxhub-model v1.1.2-0.20210104075736-467a8cb65719 h1:YtRTHvyIBfL9pBx7Kg3dOkOY11+FhB9Dmc1Yv4AR/BQ=
github.com/meshplus/bitxhub-model v1.1.2-0.20210104075736-467a8cb65719/go.mod h1:x3H+TL24wcByzHegenLfs+5PQkQGNsk8eCm31QJMa+Q=
github.com/meshplus/bitxhub-model v1.1.2-0.20210107045700-cee670a2e117 h1:q1FT1DYWg2Ntf6A6wWiEx2WIEaEGBVgaIE1gAIVJKeI=
github.com/meshplus/bitxhub-model v1.1.2-0.20210107045700-cee670a2e117/go.mod h1:x3H+TL24wcByzHegenLfs+5PQkQGNsk8eCm31QJMa+Q=
github.com/meshplus/go-lightp2p v0.0.0-20201203044909-e09b34cd93ab h1:JclTakVV0dcXxl/dScmN77htnYe3n19hh7m2eMk9Abs=
github.com/meshplus/go-lightp2p v0.0.0-20201203044909-e09b34cd93ab/go.mod h1:L3pEzDMouz+xcIVwG2fj+mAsM95GAkzoo7cEd2CzmCQ=
github.com/meshplus/go-lightp2p v0.0.0-20210105060927-1c7850047415 h1:LgKHkjzq+Vlf37gkQmw7qK89kJLohSSfl4nSpI8tM4A=
github.com/meshplus/go-lightp2p v0.0.0-20210105060927-1c7850047415/go.mod h1:L3pEzDMouz+xcIVwG2fj+mAsM95GAkzoo7cEd2CzmCQ=
github.com/mgutz/ansi v0.0.0-20170206155736-9520e82c474b/go.mod h1:01TrycV0kFyexm33Z7vhZRXopbI8J3TDReVlkTgMUxE=

View File

@ -6,6 +6,7 @@ import (
"github.com/meshplus/bitxhub-model/pb"
"github.com/meshplus/bitxhub/internal/ledger"
"github.com/meshplus/bitxhub/internal/model/events"
"github.com/meshplus/bitxhub/pkg/peermgr"
)
//go:generate mockgen -destination mock_api/mock_api.go -package mock_api -source api.go
@ -50,6 +51,7 @@ type BrokerAPI interface {
type NetworkAPI interface {
PeerInfo() ([]byte, error)
PierManager() peermgr.PierManager
}
type ChainAPI interface {

View File

@ -5,6 +5,7 @@ import (
"fmt"
"github.com/meshplus/bitxhub/internal/coreapi/api"
"github.com/meshplus/bitxhub/pkg/peermgr"
)
type NetworkAPI CoreAPI
@ -29,3 +30,7 @@ func (network *NetworkAPI) DelVPNode(pid string) ([]byte, error) {
}
return nil, nil
}
func (network *NetworkAPI) PierManager() peermgr.PierManager {
return network.bxh.PeerMgr.PierManager()
}

View File

@ -42,6 +42,10 @@ func (swarm *Swarm) handleMessage(s network.Stream, data []byte) {
swarm.handleFetchAssetExchangeSignMessage(s, m.Data)
case pb.Message_FETCH_IBTP_SIGN:
swarm.handleFetchIBTPSignMessage(s, m.Data)
case pb.Message_CHECK_MASTER_PIER:
swarm.handleAskPierMaster(s, m.Data)
case pb.Message_CHECK_MASTER_PIER_ACK:
swarm.handleReplyPierMaster(s, m.Data)
default:
swarm.logger.WithField("module", "p2p").Errorf("can't handle msg[type: %v]", m.Type)
return nil
@ -288,7 +292,6 @@ func (swarm *Swarm) handleFetchIBTPSignMessage(s network.Stream, data []byte) {
}
}
func (swarm *Swarm) handleGetBlocksPack(s network.Stream, msg *pb.Message) error {
req := &pb.GetBlocksRequest{}
if err := req.Unmarshal(msg.Data); err != nil {
@ -320,3 +323,50 @@ func (swarm *Swarm) handleGetBlocksPack(s network.Stream, msg *pb.Message) error
return nil
}
func (swarm *Swarm) handleAskPierMaster(s network.Stream, data []byte) {
address := string(data)
if !swarm.piers.pierMap.hasPier(address) {
return
}
resp := &pb.CheckPierResponse{}
if swarm.piers.pierChan.checkAddress(address) {
resp.Status = pb.CheckPierResponse_HAS_MASTER
} else {
if swarm.piers.pierMap.checkMaster(address) {
resp.Status = pb.CheckPierResponse_HAS_MASTER
} else {
resp.Status = pb.CheckPierResponse_NO_MASTER
}
}
resp.Address = address
msgData, err := resp.Marshal()
if err != nil {
swarm.logger.Errorf("marshal ask pier master response: %s", err)
return
}
message := &pb.Message{
Data: msgData,
Type: pb.Message_CHECK_MASTER_PIER_ACK,
}
msg, err := message.Marshal()
if err != nil {
swarm.logger.Errorf("marshal response message: %s", err)
return
}
if err := swarm.p2p.AsyncSend(s.RemotePeerID(), msg); err != nil {
swarm.logger.Errorf("send response: %s", err)
return
}
}
func (swarm *Swarm) handleReplyPierMaster(s network.Stream, data []byte) {
resp := &pb.CheckPierResponse{}
err := resp.Unmarshal(data)
if err != nil {
swarm.logger.Errorf("unmarshal response: %s", err)
return
}
swarm.piers.pierChan.writeChan(resp)
}

View File

@ -51,4 +51,13 @@ type PeerManager interface {
// Disconnect disconnect with all vp peers.
Disconnect(vpInfos map[uint64]*pb.VpInfo)
// PierManager
PierManager() PierManager
}
type PierManager interface {
Piers() *Piers
AskPierMaster(string) (bool, error)
}

228
pkg/peermgr/piermgr.go Normal file
View File

@ -0,0 +1,228 @@
package peermgr
import (
"context"
"fmt"
"sync"
"time"
"github.com/meshplus/bitxhub-model/pb"
)
const (
askTimeout = 10 * time.Second
)
func (swarm *Swarm) Piers() *Piers {
return swarm.piers
}
func (swarm *Swarm) AskPierMaster(address string) (bool, error) {
message := &pb.Message{
Data: []byte(address),
Type: pb.Message_CHECK_MASTER_PIER,
}
if swarm.piers.pierChan.checkAddress(address) {
return false, fmt.Errorf("is checking pier master")
}
ch := swarm.piers.pierChan.newChan(address)
for id := range swarm.Peers() {
if err := swarm.AsyncSend(id, message); err != nil {
swarm.logger.Debugf("send tx to:%d %s", id, err.Error())
continue
}
}
ctx, cancel := context.WithTimeout(context.Background(), askTimeout)
for {
select {
case resp, ok := <-ch:
if !ok {
cancel()
return false, fmt.Errorf("channel closed unexpectedly")
}
if resp.Status == pb.CheckPierResponse_HAS_MASTER {
swarm.logger.Infoln("get p2p response")
swarm.piers.pierChan.closeChan(address)
cancel()
return true, nil
}
if resp.Status == pb.CheckPierResponse_NO_MASTER {
swarm.piers.pierChan.closeChan(address)
cancel()
return false, nil
}
case <-ctx.Done():
// swarm.logger.Infoln("timeout!")
swarm.piers.pierChan.closeChan(address)
cancel()
return false, nil
}
}
}
type Piers struct {
pierMap *pierMap
pierChan *pierChan
}
type pierMap struct {
statusMap map[string]*pierStatus
sync.RWMutex
}
type pierStatus struct {
index string
lastActive time.Time
timeout int64
}
func newPiers() *Piers {
return &Piers{
pierMap: newPierMap(),
pierChan: newPierChan(),
}
}
func (p *Piers) HasPier(address string) bool {
return p.pierMap.hasPier(address)
}
func (p *Piers) CheckMaster(address string) bool {
res := p.pierMap.checkMaster(address)
if !res {
p.pierMap.rmPier(address)
}
return res
}
func (p *Piers) SetMaster(address string, index string, timeout int64) error {
return p.pierMap.setMaster(address, index, timeout)
}
func (p *Piers) HeartBeat(address string, index string) error {
return p.pierMap.heartBeat(address, index)
}
func newPierMap() *pierMap {
return &pierMap{
statusMap: make(map[string]*pierStatus),
}
}
func (pm *pierMap) hasPier(address string) bool {
pm.RLock()
defer pm.RUnlock()
_, ok := pm.statusMap[address]
return ok
}
func (pm *pierMap) rmPier(address string) {
pm.Lock()
defer pm.Unlock()
delete(pm.statusMap, address)
}
func (pm *pierMap) checkMaster(address string) bool {
pm.RLock()
defer pm.RUnlock()
return pm.cmpOffset(address)
}
func (pm *pierMap) setMaster(address string, index string, timeout int64) error {
pm.Lock()
defer pm.Unlock()
if pm.cmpOffset(address) {
if pm.statusMap[address].index != index {
return fmt.Errorf("already has master pier")
}
}
pm.statusMap[address] = &pierStatus{
index: index,
lastActive: time.Now(),
timeout: timeout,
}
return nil
}
func (pm *pierMap) heartBeat(address string, index string) error {
pm.RLock()
defer pm.RUnlock()
p, ok := pm.statusMap[address]
if !ok {
return fmt.Errorf("no master pier")
}
if p.index != index {
return fmt.Errorf("wrong pier heart beat")
}
p.lastActive = time.Now()
return nil
}
func (pm *pierMap) cmpOffset(address string) bool {
p, ok := pm.statusMap[address]
if !ok {
return false
}
offset := time.Now().Unix() - p.lastActive.Unix()
return p.timeout > offset
}
type pierChan struct {
chanMap map[string]chan *pb.CheckPierResponse
sync.RWMutex
}
func newPierChan() *pierChan {
return &pierChan{
chanMap: make(map[string]chan *pb.CheckPierResponse),
}
}
func (pc *pierChan) checkAddress(address string) bool {
pc.RLock()
defer pc.RUnlock()
_, ok := pc.chanMap[address]
return ok
}
func (pc *pierChan) newChan(address string) chan *pb.CheckPierResponse {
pc.Lock()
defer pc.Unlock()
ch := make(chan *pb.CheckPierResponse, 1)
pc.chanMap[address] = ch
return ch
}
func (pc *pierChan) writeChan(resp *pb.CheckPierResponse) {
pc.RLock()
defer pc.RUnlock()
ch, ok := pc.chanMap[resp.Address]
if !ok {
return
}
ch <- resp
}
func (pc *pierChan) closeChan(address string) {
pc.Lock()
defer pc.Unlock()
close(pc.chanMap[address])
delete(pc.chanMap, address)
}

View File

@ -36,6 +36,7 @@ type Swarm struct {
multiAddrs map[uint64]*peer.AddrInfo
connectedPeers sync.Map
notifiee *notifiee
piers *Piers
ledger ledger.Ledger
orderMessageFeed event.Feed
@ -94,6 +95,7 @@ func New(repoConfig *repo.Repo, logger logrus.FieldLogger, ledger ledger.Ledger)
pingTimeout: repoConfig.Config.Ping.Duration,
routers: routers,
multiAddrs: multiAddrs,
piers: newPiers(),
connectedPeers: sync.Map{},
notifiee: notifiee,
ctx: ctx,
@ -502,3 +504,7 @@ func constructMultiaddr(vpInfo *pb.VpInfo) (*peer.AddrInfo, error) {
}
return addrInfo, nil
}
func (swarm *Swarm) PierManager() PierManager {
return swarm
}