Skip to content

Commit

Permalink
[FAB-2198] Gossip envelope refactoring
Browse files Browse the repository at this point in the history
In the previous episode of [FAB-2198]:
	https://gerrit.hyperledger.org/r/#/c/5907/
	Adjust gossip membership layer
We adjusted the discovery layer and got rid of the usage of the protos
there.

Now, I'm gradually integrating the envelope with the gossip message and actually
making the signing and verification work on the raw payload instead of
on a computed payload that is non deterministic.

Also, the SignedEndpoint which was a part of the membership entity "Member"
is no more, and its functionality was moved outside of the GossipMessage,
to an external "Secret" message type.
This type will be used to hold parts of GossipMessage that the peers may want to
omit as they forward messages to peers that shouldn't get this information.

The current use-case for this, is FAB-2007 that enforces peers to not expose
the internal endpoints of peers in their own organization.
This data can't reside inside the GossipMessage anymore, because it is marshalled
into a payload and signed.
Therefore, we need to extract it into a side entity that will be
part of the Envelope that will be sent in gossip. T
hen, the peers can easily omit this envelope while
preserving the signature on the payload that the source peer produced.

In the (very-soon) future, I'll get rid of the coupling between the GossipMessage
and the Envelope reference inside of it.

Change-Id: Ib910cba1f69bd356174ceb64ee22e2a1d9d15cf5
Signed-off-by: Yacov Manevich <yacovm@il.ibm.com>
  • Loading branch information
yacovm committed Feb 25, 2017
1 parent ea7015e commit b7b5c4e
Show file tree
Hide file tree
Showing 19 changed files with 610 additions and 398 deletions.
43 changes: 27 additions & 16 deletions gossip/comm/comm_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -407,11 +407,12 @@ func (c *commImpl) authenticateRemotePeer(stream stream) (common.PKIidType, erro
cMsg = c.createConnectionMsg(c.PKIID, c.selfCertHash, c.peerIdentity, signer)

c.logger.Debug("Sending", cMsg, "to", remoteAddress)
stream.Send(cMsg)
m := readWithTimeout(stream, util.GetDurationOrDefault("peer.gossip.connTimeout", defConnTimeout))
if m == nil {
c.logger.Warning("Timed out waiting for connection message from", remoteAddress)
return nil, errors.New("Timed out")
stream.Send(cMsg.Envelope)
m, err := readWithTimeout(stream, util.GetDurationOrDefault("peer.gossip.connTimeout", defConnTimeout), remoteAddress)
if err != nil {
err := fmt.Errorf("Failed reading messge from %s, reason: %v", remoteAddress, err)
c.logger.Warning(err)
return nil, err
}
receivedMsg := m.GetConn()
if receivedMsg == nil {
Expand Down Expand Up @@ -505,25 +506,38 @@ func (c *commImpl) disconnect(pkiID common.PKIidType) {
c.connStore.closeByPKIid(pkiID)
}

func readWithTimeout(stream interface{}, timeout time.Duration) *proto.GossipMessage {
func readWithTimeout(stream interface{}, timeout time.Duration, address string) (*proto.GossipMessage, error) {
incChan := make(chan *proto.GossipMessage, 1)
errChan := make(chan error, 1)
go func() {
if srvStr, isServerStr := stream.(proto.Gossip_GossipStreamServer); isServerStr {
if m, err := srvStr.Recv(); err == nil {
incChan <- m
msg, err := m.ToGossipMessage()
if err != nil {
errChan <- err
return
}
incChan <- msg
}
}
if clStr, isClientStr := stream.(proto.Gossip_GossipStreamClient); isClientStr {
if m, err := clStr.Recv(); err == nil {
incChan <- m
msg, err := m.ToGossipMessage()
if err != nil {
errChan <- err
return
}
incChan <- msg
}
}
}()
select {
case <-time.NewTicker(timeout).C:
return nil
return nil, fmt.Errorf("Timed out waiting for connection message from %s", address)
case m := <-incChan:
return m
return m, nil
case err := <-errChan:
return nil, err
}
}

Expand All @@ -539,16 +553,13 @@ func (c *commImpl) createConnectionMsg(pkiID common.PKIidType, hash []byte, cert
},
},
}
if err := m.Sign(signer); err != nil {
c.logger.Panicf("Gossip failed to sign a message using the peer identity.\n Halting execution.\nActual error: %v", err)
}

m.Sign(signer)
return m
}

type stream interface {
Send(*proto.GossipMessage) error
Recv() (*proto.GossipMessage, error)
Send(envelope *proto.Envelope) error
Recv() (*proto.Envelope, error)
grpc.Stream
}

Expand Down
22 changes: 10 additions & 12 deletions gossip/comm/comm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,24 +130,26 @@ func handshaker(endpoint string, comm Comm, t *testing.T, sigMutator func([]byte
})

if sigMutator != nil {
msg.Signature = sigMutator(msg.Signature)
msg.Envelope.Signature = sigMutator(msg.Envelope.Signature)
}

stream.Send(msg)
msg, err = stream.Recv()
stream.Send(msg.Envelope)
envelope, err := stream.Recv()
assert.NoError(t, err, "%v", err)
msg, err = envelope.ToGossipMessage()
assert.NoError(t, err, "%v", err)
if sigMutator == nil {
hash := extractCertificateHashFromContext(stream.Context())
expectedMsg := c.createConnectionMsg(common.PKIidType("localhost:9611"), hash, []byte("localhost:9611"), func(msg []byte) ([]byte, error) {
return msg, nil
})
assert.Equal(t, expectedMsg.Signature, msg.Signature)
assert.Equal(t, expectedMsg.Envelope.Signature, msg.Envelope.Signature)
}
assert.Equal(t, []byte("localhost:9611"), msg.GetConn().PkiID)
msg2Send := createGossipMsg()
nonce := uint64(rand.Int())
msg2Send.Nonce = nonce
go stream.Send(msg2Send)
go stream.Send(msg2Send.NoopSign())
return acceptChan
}

Expand Down Expand Up @@ -347,17 +349,13 @@ func TestResponses(t *testing.T) {
defer comm1.Stop()
defer comm2.Stop()

nonceIncrememter := func(msg proto.ReceivedMessage) proto.ReceivedMessage {
msg.GetGossipMessage().Nonce++
return msg
}

msg := createGossipMsg()
go func() {
inChan := comm1.Accept(acceptAll)
for m := range inChan {
m = nonceIncrememter(m)
m.Respond(m.GetGossipMessage())
reply := createGossipMsg()
reply.Nonce = m.GetGossipMessage().Nonce + 1
m.Respond(reply)
}
}()
expectedNOnce := uint64(msg.Nonce + 1)
Expand Down
21 changes: 15 additions & 6 deletions gossip/comm/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,9 +248,13 @@ func (conn *connection) send(msg *proto.GossipMessage, onErr func(error)) {
return
}

if msg.Envelope == nil {
msg.NoopSign()
}

m := &msgSending{
msg: msg,
onErr: onErr,
envelope: msg.Envelope,
onErr: onErr,
}

conn.outBuff <- m
Expand Down Expand Up @@ -294,7 +298,7 @@ func (conn *connection) writeToStream() {
}
select {
case m := <-conn.outBuff:
err := stream.Send(m.msg)
err := stream.Send(m.envelope)
if err != nil {
go m.onErr(err)
return
Expand All @@ -319,7 +323,7 @@ func (conn *connection) readFromStream(errChan chan error, msgChan chan *proto.G
errChan <- errors.New("Stream is nil")
return
}
msg, err := stream.Recv()
envelope, err := stream.Recv()
if conn.toDie() {
conn.logger.Debug(conn.pkiID, "canceling read because closing")
return
Expand All @@ -329,6 +333,11 @@ func (conn *connection) readFromStream(errChan chan error, msgChan chan *proto.G
conn.logger.Debug(conn.pkiID, "Got error, aborting:", err)
return
}
msg, err := envelope.ToGossipMessage()
if err != nil {
errChan <- err
conn.logger.Warning(conn.pkiID, "Got error, aborting:", err)
}
msgChan <- msg
}
}
Expand All @@ -354,6 +363,6 @@ func (conn *connection) getStream() stream {
}

type msgSending struct {
msg *proto.GossipMessage
onErr func(error)
envelope *proto.Envelope
onErr func(error)
}
4 changes: 2 additions & 2 deletions gossip/comm/mock/mock_comm.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,9 @@ func (packet *packetMock) Respond(msg *proto.GossipMessage) {
}
}

// GetSourceMessage Returns the SignedGossipMessage the ReceivedMessage was
// GetSourceEnvelope Returns the Envelope the ReceivedMessage was
// constructed with
func (packet *packetMock) GetSourceMessage() *proto.SignedGossipMessage {
func (packet *packetMock) GetSourceEnvelope() *proto.Envelope {
return nil
}

Expand Down
6 changes: 3 additions & 3 deletions gossip/comm/msg.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,10 @@ type ReceivedMessageImpl struct {
conn *connection
}

// GetSourceMessage Returns the SignedGossipMessage the ReceivedMessage was
// GetSourceEnvelope Returns the Envelope the ReceivedMessage was
// constructed with
func (m *ReceivedMessageImpl) GetSourceMessage() *proto.SignedGossipMessage {
return nil
func (m *ReceivedMessageImpl) GetSourceEnvelope() *proto.Envelope {
return m.Envelope
}

// Respond sends a msg to the source that sent the ReceivedMessageImpl
Expand Down
8 changes: 4 additions & 4 deletions gossip/discovery/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ type CryptoService interface {
ValidateAliveMsg(*proto.GossipMessage) bool

// SignMessage signs a message
SignMessage(m *proto.GossipMessage) *proto.GossipMessage
SignMessage(m *proto.GossipMessage, internalEndpoint string) *proto.Envelope
}

// CommService is an interface that the discovery expects to be implemented and passed on creation
Expand Down Expand Up @@ -57,15 +57,15 @@ type NetworkMember struct {
Endpoint string
Metadata []byte
PKIid common.PKIidType
InternalEndpoint *proto.SignedEndpoint
InternalEndpoint string
}

// PreferredEndpoint computes the endpoint to connect to,
// while preferring internal endpoint over the standard
// endpoint
func (nm NetworkMember) PreferredEndpoint() string {
if nm.InternalEndpoint != nil && nm.InternalEndpoint.Endpoint != "" {
return nm.InternalEndpoint.Endpoint
if nm.InternalEndpoint != "" {
return nm.InternalEndpoint
}
return nm.Endpoint
}
Expand Down
Loading

0 comments on commit b7b5c4e

Please sign in to comment.