Skip to content

Commit

Permalink
[FAB-2007] Gossip:Add support of external endpoint
Browse files Browse the repository at this point in the history
Peers in the same organization might need to
communicate by connecting to endpoints that are only route-able
from within the organization, and when communication with peers
in different organizations, they should connect to endpoints
that are route-able from outside the organization.

For example if the peers are behind a NAT with port-forwarding rules
for the peers.

This commit adds an internal endpoint to the Member proto message.
The endpoint is signed separately and when a message would cross
an organization, the inner endpoint would be deleted from the last
peer that was in the source organization.

The access control and filtering will be done in upcoming commits.
This commit only adds the internal endpoint, and changes the gossip
logic to prefer using the internal endpoint over the external one.

Unit tests: I changed the discovery and gossip unit tests accordingly,
and added a check in the discovery unit test that both endpoints are
gossiped to peers.
In the gossip unit-tests, made sure that the external endpoint is
never used, and only the internal one is used.
In upcoming commits, I would add tests that have several organizations.

How was this tested?
This commit doesn't add any new functionality, so I just tested
that the communication and discovery between peers wasn't damaged.

I ran a modified docs/channel-setup.md with an additional peer (peer1)
- Once with CORE_PEER_GOSSIP_BOOTSTRAP=ip-of-peer-0:7051
  and saw that peers were able to connect to one another

- Once without any bootstrap set, but created an anchor peer file
  with the ip address of peer0,
  and made peer0 and peer1 join the channel and saw in the logs
  that channel-based messages were sent between the peers.

Signed-off-by: Yacov Manevich <yacovm@il.ibm.com>
Change-Id: I243b55fc192c3d7e516598a899cd58039ded7587
  • Loading branch information
yacovm committed Feb 11, 2017
1 parent 7a09dfb commit 40fb3a7
Show file tree
Hide file tree
Showing 14 changed files with 378 additions and 211 deletions.
21 changes: 18 additions & 3 deletions gossip/discovery/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,14 +54,29 @@ type CommService interface {

// NetworkMember is a peer's representation
type NetworkMember struct {
Endpoint string
Metadata []byte
PKIid common.PKIidType
Endpoint string
Metadata []byte
PKIid common.PKIidType
InternalEndpoint *proto.SignedEndpoint
}

// PreferredEndpoint computes the endpoint to connect to,
// while preferring internal endpoint over the standard
// endpoint
func (nm NetworkMember) PreferredEndpoint() string {
if nm.InternalEndpoint != nil && nm.InternalEndpoint.Endpoint != "" {
return nm.InternalEndpoint.Endpoint
}
return nm.Endpoint
}

// Discovery is the interface that represents a discovery module
type Discovery interface {

// Exists returns whether a peer with given
// PKI-ID is known
Exists(PKIID common.PKIidType) bool

// Self returns this instance's membership information
Self() NetworkMember

Expand Down
124 changes: 71 additions & 53 deletions gossip/discovery/discovery_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,13 +71,9 @@ func (ts *timestamp) String() string {
}

type gossipDiscoveryImpl struct {
pkiID common.PKIidType
endpoint string
incTime uint64
metadata []byte

seqNum uint64

incTime uint64
seqNum uint64
self NetworkMember
deadLastTS map[string]*timestamp // H
aliveLastTS map[string]*timestamp // V
id2Member map[string]*NetworkMember // all known members
Expand All @@ -97,10 +93,8 @@ type gossipDiscoveryImpl struct {
// NewDiscoveryService returns a new discovery service with the comm module passed and the crypto service passed
func NewDiscoveryService(bootstrapPeers []string, self NetworkMember, comm CommService, crypt CryptoService) Discovery {
d := &gossipDiscoveryImpl{
endpoint: self.Endpoint,
self: self,
incTime: uint64(time.Now().UnixNano()),
metadata: self.Metadata,
pkiID: self.PKIid,
seqNum: uint64(0),
deadLastTS: make(map[string]*timestamp),
aliveLastTS: make(map[string]*timestamp),
Expand All @@ -109,13 +103,12 @@ func NewDiscoveryService(bootstrapPeers []string, self NetworkMember, comm CommS
Alive: make([]*proto.GossipMessage, 0),
Dead: make([]*proto.GossipMessage, 0),
},
crypt: crypt,
bootstrapPeers: bootstrapPeers,
comm: comm,
lock: &sync.RWMutex{},
toDieChan: make(chan struct{}, 1),
toDieFlag: int32(0),
logger: util.GetLogger(util.LoggingDiscoveryModule, self.Endpoint),
crypt: crypt,
comm: comm,
lock: &sync.RWMutex{},
toDieChan: make(chan struct{}, 1),
toDieFlag: int32(0),
logger: util.GetLogger(util.LoggingDiscoveryModule, self.InternalEndpoint.Endpoint),
}

go d.periodicalSendAlive()
Expand All @@ -131,6 +124,15 @@ func NewDiscoveryService(bootstrapPeers []string, self NetworkMember, comm CommS
return d
}

// Exists returns whether a peer with given
// PKI-ID is known
func (d *gossipDiscoveryImpl) Exists(PKIID common.PKIidType) bool {
d.lock.RLock()
defer d.lock.RUnlock()
_, exists := d.id2Member[string(PKIID)]
return exists
}

func (d *gossipDiscoveryImpl) Connect(member NetworkMember) {
d.logger.Debug("Entering", member)
defer d.logger.Debug("Exiting")
Expand Down Expand Up @@ -167,6 +169,9 @@ func (d *gossipDiscoveryImpl) connect2BootstrapPeers(endpoints []string) {
defer wg.Done()
peer := &NetworkMember{
Endpoint: endpoint,
InternalEndpoint: &proto.SignedEndpoint{
Endpoint: endpoint,
},
}
d.comm.SendToPeer(peer, req)
}(endpoint)
Expand All @@ -192,9 +197,10 @@ func (d *gossipDiscoveryImpl) InitiateSync(peerNum int) {
for _, i := range util.GetRandomIndices(k, n-1) {
pulledPeer := d.cachedMembership.Alive[i].GetAliveMsg().Membership
netMember := &NetworkMember{
Endpoint: pulledPeer.Endpoint,
Metadata: pulledPeer.Metadata,
PKIid: pulledPeer.PkiID,
Endpoint: pulledPeer.Endpoint,
Metadata: pulledPeer.Metadata,
PKIid: pulledPeer.PkiID,
InternalEndpoint: pulledPeer.InternalEndpoint,
}
peers2SendTo = append(peers2SendTo, netMember)
}
Expand Down Expand Up @@ -285,7 +291,7 @@ func (d *gossipDiscoveryImpl) handleMsgFromComm(m *proto.GossipMessage) {

for _, dm := range memResp.Dead {
if !d.crypt.ValidateAliveMsg(m) {
d.logger.Warningf("Alive message isn't authentic, someone spoofed %s's identity", dm.GetAliveMsg().Membership.Endpoint)
d.logger.Warningf("Alive message isn't authentic, someone spoofed %s's identity", dm.GetAliveMsg().Membership)
continue
}

Expand All @@ -308,9 +314,10 @@ func (d *gossipDiscoveryImpl) sendMemResponse(member *proto.Member, known [][]by
defer d.logger.Debug("Exiting, replying with", memResp)

d.comm.SendToPeer(&NetworkMember{
Endpoint: member.Endpoint,
Metadata: member.Metadata,
PKIid: member.PkiID,
Endpoint: member.Endpoint,
Metadata: member.Metadata,
PKIid: member.PkiID,
InternalEndpoint: member.InternalEndpoint,
}, &proto.GossipMessage{
Tag: proto.GossipMessage_EMPTY,
Nonce: uint64(0),
Expand Down Expand Up @@ -353,12 +360,12 @@ func (d *gossipDiscoveryImpl) handleAliveMessage(m *proto.GossipMessage) {
defer d.logger.Debug("Exiting")

if !d.crypt.ValidateAliveMsg(m) {
d.logger.Warningf("Alive message isn't authentic, someone must be spoofing %s's identity", m.GetAliveMsg().Membership.Endpoint)
d.logger.Warningf("Alive message isn't authentic, someone must be spoofing %s's identity", m.GetAliveMsg())
return
}

pkiID := m.GetAliveMsg().Membership.PkiID
if equalPKIid(pkiID, d.pkiID) {
if equalPKIid(pkiID, d.self.PKIid) {
d.logger.Debug("Got alive message about ourselves,", m)
return
}
Expand All @@ -385,7 +392,7 @@ func (d *gossipDiscoveryImpl) handleAliveMessage(m *proto.GossipMessage) {
}

if isAlive && isDead {
d.logger.Panicf("Member %s is both alive and dead at the same time", m.GetAliveMsg().Membership.Endpoint)
d.logger.Panicf("Member %s is both alive and dead at the same time", m.GetAliveMsg().Membership)
return
}

Expand All @@ -394,7 +401,7 @@ func (d *gossipDiscoveryImpl) handleAliveMessage(m *proto.GossipMessage) {
// resurrect peer
d.resurrectMember(m, *ts)
} else if !same(lastDeadTS, ts) {
d.logger.Debug(m.GetAliveMsg().Membership.Endpoint, "lastDeadTS:", lastDeadTS, "but got ts:", ts)
d.logger.Debug(m.GetAliveMsg().Membership, "lastDeadTS:", lastDeadTS, "but got ts:", ts)
}
return
}
Expand All @@ -407,7 +414,7 @@ func (d *gossipDiscoveryImpl) handleAliveMessage(m *proto.GossipMessage) {
if before(lastAliveTS, ts) {
d.learnExistingMembers([]*proto.GossipMessage{m})
} else if !same(lastAliveTS, ts) {
d.logger.Debug(m.GetAliveMsg().Membership.Endpoint, "lastAliveTS:", lastAliveTS, "but got ts:", ts)
d.logger.Debug(m.GetAliveMsg().Membership, "lastAliveTS:", lastAliveTS, "but got ts:", ts)
}

}
Expand All @@ -429,9 +436,10 @@ func (d *gossipDiscoveryImpl) resurrectMember(am *proto.GossipMessage, t proto.P
}

d.id2Member[string(pkiID)] = &NetworkMember{
Endpoint: member.Endpoint,
Metadata: member.Metadata,
PKIid: member.PkiID,
Endpoint: member.Endpoint,
Metadata: member.Metadata,
PKIid: member.PkiID,
InternalEndpoint: member.InternalEndpoint,
}
delete(d.deadLastTS, string(pkiID))

Expand Down Expand Up @@ -570,7 +578,7 @@ func (d *gossipDiscoveryImpl) expireDeadMembers(dead []common.PKIidType) {
d.lock.Unlock()

for _, member2Expire := range deadMembers2Expire {
d.logger.Warning("Closing connection to", member2Expire.Endpoint)
d.logger.Warning("Closing connection to", member2Expire)
d.comm.CloseConn(member2Expire)
}
}
Expand Down Expand Up @@ -605,9 +613,10 @@ func (d *gossipDiscoveryImpl) createAliveMessage() *proto.GossipMessage {
d.seqNum++
seqNum := d.seqNum

endpoint := d.endpoint
meta := d.metadata
pkiID := d.pkiID
endpoint := d.self.Endpoint
meta := d.self.Metadata
pkiID := d.self.PKIid
internalEndpoint := d.self.InternalEndpoint

d.lock.Unlock()

Expand All @@ -616,9 +625,10 @@ func (d *gossipDiscoveryImpl) createAliveMessage() *proto.GossipMessage {
Content: &proto.GossipMessage_AliveMsg{
AliveMsg: &proto.AliveMessage{
Membership: &proto.Member{
Endpoint: endpoint,
Metadata: meta,
PkiID: pkiID,
Endpoint: endpoint,
Metadata: meta,
PkiID: pkiID,
InternalEndpoint: internalEndpoint,
},
Timestamp: &proto.PeerTime{
IncNumber: uint64(d.incTime),
Expand Down Expand Up @@ -649,14 +659,15 @@ func (d *gossipDiscoveryImpl) learnExistingMembers(aliveArr []*proto.GossipMessa
member := d.id2Member[string(am.Membership.PkiID)]
member.Endpoint = am.Membership.Endpoint
member.Metadata = am.Membership.Metadata
member.InternalEndpoint = am.Membership.InternalEndpoint

if _, isKnownAsDead := d.deadLastTS[string(am.Membership.PkiID)]; isKnownAsDead {
d.logger.Warning(am.Membership.Endpoint, "has already expired")
d.logger.Warning(am.Membership, "has already expired")
continue
}

if _, isKnownAsAlive := d.aliveLastTS[string(am.Membership.PkiID)]; !isKnownAsAlive {
d.logger.Warning(am.Membership.Endpoint, "has already expired")
d.logger.Warning(am.Membership, "has already expired")
continue
} else {
d.logger.Debug("Updating aliveness data:", am)
Expand Down Expand Up @@ -686,7 +697,7 @@ func (d *gossipDiscoveryImpl) learnNewMembers(aliveMembers []*proto.GossipMessag
defer d.lock.Unlock()

for _, am := range aliveMembers {
if equalPKIid(am.GetAliveMsg().Membership.PkiID, d.pkiID) {
if equalPKIid(am.GetAliveMsg().Membership.PkiID, d.self.PKIid) {
continue
}
d.aliveLastTS[string(am.GetAliveMsg().Membership.PkiID)] = &timestamp{
Expand All @@ -700,7 +711,7 @@ func (d *gossipDiscoveryImpl) learnNewMembers(aliveMembers []*proto.GossipMessag
}

for _, dm := range deadMembers {
if equalPKIid(dm.GetAliveMsg().Membership.PkiID, d.pkiID) {
if equalPKIid(dm.GetAliveMsg().Membership.PkiID, d.self.PKIid) {
continue
}
d.deadLastTS[string(dm.GetAliveMsg().Membership.PkiID)] = &timestamp{
Expand All @@ -722,9 +733,10 @@ func (d *gossipDiscoveryImpl) learnNewMembers(aliveMembers []*proto.GossipMessag
return
}
d.id2Member[string(member.Membership.PkiID)] = &NetworkMember{
Endpoint: member.Membership.Endpoint,
Metadata: member.Membership.Metadata,
PKIid: member.Membership.PkiID,
Endpoint: member.Membership.Endpoint,
Metadata: member.Membership.Metadata,
PKIid: member.Membership.PkiID,
InternalEndpoint: member.Membership.InternalEndpoint,
}
}
}
Expand All @@ -741,9 +753,10 @@ func (d *gossipDiscoveryImpl) GetMembership() []NetworkMember {
for _, m := range d.cachedMembership.Alive {
member := m.GetAliveMsg()
response = append(response, NetworkMember{
PKIid: member.Membership.PkiID,
Endpoint: member.Membership.Endpoint,
Metadata: member.Membership.Metadata,
PKIid: member.Membership.PkiID,
Endpoint: member.Membership.Endpoint,
Metadata: member.Membership.Metadata,
InternalEndpoint: member.Membership.InternalEndpoint,
})
}
return response
Expand All @@ -757,18 +770,23 @@ func tsToTime(ts uint64) time.Time {
func (d *gossipDiscoveryImpl) UpdateMetadata(md []byte) {
d.lock.Lock()
defer d.lock.Unlock()
d.metadata = md
d.self.Metadata = md
}

func (d *gossipDiscoveryImpl) UpdateEndpoint(endpoint string) {
d.lock.Lock()
defer d.lock.Unlock()

d.endpoint = endpoint
d.self.Endpoint = endpoint
}

func (d *gossipDiscoveryImpl) Self() NetworkMember {
return NetworkMember{Endpoint: d.endpoint, Metadata: d.metadata, PKIid: d.pkiID}
return NetworkMember{
Endpoint: d.self.Endpoint,
Metadata: d.self.Metadata,
PKIid: d.self.PKIid,
InternalEndpoint: d.self.InternalEndpoint,
}
}

func (d *gossipDiscoveryImpl) toDie() bool {
Expand All @@ -788,7 +806,7 @@ func equalPKIid(a, b common.PKIidType) bool {
}

func same(a *timestamp, b *proto.PeerTime) bool {
return (uint64(a.incTime.UnixNano()) == b.IncNumber && a.seqNum == b.SeqNum)
return uint64(a.incTime.UnixNano()) == b.IncNumber && a.seqNum == b.SeqNum
}

func before(a *timestamp, b *proto.PeerTime) bool {
Expand Down
20 changes: 20 additions & 0 deletions gossip/discovery/discovery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,10 @@ func createDiscoveryInstanceThatGossips(port int, id string, bootstrapPeers []st
Metadata: []byte{},
PKIid: []byte(endpoint),
Endpoint: endpoint,
InternalEndpoint: &proto.SignedEndpoint{
Endpoint: endpoint,
Signature: []byte{},
},
}

listenAddress := fmt.Sprintf("%s:%d", "", port)
Expand Down Expand Up @@ -414,6 +418,22 @@ func TestGetFullMembership(t *testing.T) {
}

assertMembership(t, instances, nodeNum-1)

// Ensure that internal endpoint was propagated to everyone
for _, inst := range instances {
for _, member := range inst.GetMembership() {
assert.NotEmpty(t, member.InternalEndpoint.Endpoint)
assert.NotEmpty(t, member.Endpoint)
}
}

// Check that Exists() is valid
for _, inst := range instances {
for _, member := range inst.GetMembership() {
assert.True(t, inst.Exists(member.PKIid))
}
}

stopInstances(t, instances)
}

Expand Down
2 changes: 1 addition & 1 deletion gossip/filter/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func SelectPeers(k int, peerPool []discovery.NetworkMember, filters ...RoutingFi
var filteredPeers []*comm.RemotePeer
for _, peer := range peerPool {
if CombineRoutingFilters(filters...)(peer) {
filteredPeers = append(filteredPeers, &comm.RemotePeer{PKIID: peer.PKIid, Endpoint: peer.Endpoint})
filteredPeers = append(filteredPeers, &comm.RemotePeer{PKIID: peer.PKIid, Endpoint: peer.PreferredEndpoint()})
}
}

Expand Down
4 changes: 3 additions & 1 deletion gossip/gossip/gossip.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@ type Gossip interface {
type Config struct {
BindPort int // Port we bind to, used only for tests
ID string // ID of this instance
SelfEndpoint string // Endpoint we publish to remote peers
BootstrapPeers []string // Peers we connect to at startup
PropagateIterations int // Number of times a message is pushed to remote peers
PropagatePeerNum int // Number of peers selected to push messages to
Expand All @@ -89,4 +88,7 @@ type Config struct {
PublishStateInfoInterval time.Duration // Determines frequency of pushing state info messages to peers
RequestStateInfoInterval time.Duration // Determines frequency of pulling state info messages from peers
TLSServerCert *tls.Certificate // TLS certificate of the peer

InternalEndpoint string // Endpoint we publish to peers in our organization
ExternalEndpoint string // Peer publishes this endpoint instead of SelfEndpoint to foreign organizations
}
Loading

0 comments on commit 40fb3a7

Please sign in to comment.