From fa7488cbb01833568f35d42567be9e4f2b8e9951 Mon Sep 17 00:00:00 2001 From: YACOVM Date: Tue, 28 Feb 2017 01:04:39 +0200 Subject: [PATCH] [FAB-2424] Gossip state transfer: channel validation In this commit, we enforce that only peers that are eligible of receiving blocks for a specific channel are allowed to do so, by filtering out peers that do not conform to the policy set by the MSP. The policy is checked by invoking VerifyByChannel on the certificate, message and signature that are obtained from the connection information - leveraging the fact that the state transfer of gossip is point-to-point and peers have to authenticate before sending subsequent messages. Change-Id: Ibdad5cd838489abdd1e97bfb0663f6946020f7fb Signed-off-by: Yacov Manevich --- gossip/service/gossip_service.go | 8 +- gossip/state/state.go | 34 ++++++- gossip/state/state_test.go | 168 +++++++++++++++++++++++++------ 3 files changed, 172 insertions(+), 38 deletions(-) diff --git a/gossip/service/gossip_service.go b/gossip/service/gossip_service.go index 922ac7614d8..8cbbae2ffa4 100644 --- a/gossip/service/gossip_service.go +++ b/gossip/service/gossip_service.go @@ -77,7 +77,8 @@ type gossipServiceImpl struct { deliveryService deliverclient.DeliverService deliveryFactory DeliveryServiceFactory lock sync.RWMutex - msgCrypto identity.Mapper + idMapper identity.Mapper + mcs api.MessageCryptoService peerIdentity []byte secAdv api.SecurityAdvisor } @@ -132,10 +133,11 @@ func InitGossipServiceCustomDeliveryFactory(peerIdentity []byte, endpoint string gossip := integration.NewGossipComponent(peerIdentity, endpoint, s, secAdv, mcs, idMapper, dialOpts, bootPeers...) gossipServiceInstance = &gossipServiceImpl{ + mcs: mcs, gossipSvc: gossip, chains: make(map[string]state.GossipStateProvider), deliveryFactory: factory, - msgCrypto: idMapper, + idMapper: idMapper, peerIdentity: peerIdentity, secAdv: secAdv, } @@ -158,7 +160,7 @@ func (g *gossipServiceImpl) InitializeChannel(chainID string, committer committe defer g.lock.Unlock() // Initialize new state provider for given committer logger.Debug("Creating state provider for chainID", chainID) - g.chains[chainID] = state.NewGossipStateProvider(chainID, g, committer) + g.chains[chainID] = state.NewGossipStateProvider(chainID, g, committer, g.mcs) if g.deliveryService == nil { var err error g.deliveryService, err = g.deliveryFactory.Service(gossipServiceInstance) diff --git a/gossip/state/state.go b/gossip/state/state.go index e6789154f3c..40c7b5aee9f 100644 --- a/gossip/state/state.go +++ b/gossip/state/state.go @@ -25,6 +25,7 @@ import ( pb "github.com/golang/protobuf/proto" "github.com/hyperledger/fabric/core/committer" + "github.com/hyperledger/fabric/gossip/api" "github.com/hyperledger/fabric/gossip/comm" common2 "github.com/hyperledger/fabric/gossip/common" "github.com/hyperledger/fabric/gossip/gossip" @@ -47,10 +48,6 @@ type GossipStateProvider interface { Stop() } -var remoteStateMsgFilter = func(message interface{}) bool { - return message.(proto.ReceivedMessage).GetGossipMessage().IsRemoteStateMessage() -} - const ( defPollingPeriod = 200 * time.Millisecond defAntiEntropyInterval = 10 * time.Second @@ -60,6 +57,9 @@ const ( // the struct to handle in memory sliding window of // new ledger block to be acquired by hyper ledger type GossipStateProviderImpl struct { + // MessageCryptoService + mcs api.MessageCryptoService + // Chain id chainID string @@ -87,7 +87,7 @@ type GossipStateProviderImpl struct { } // NewGossipStateProvider creates initialized instance of gossip state provider -func NewGossipStateProvider(chainID string, g gossip.Gossip, committer committer.Committer) GossipStateProvider { +func NewGossipStateProvider(chainID string, g gossip.Gossip, committer committer.Committer, mcs api.MessageCryptoService) GossipStateProvider { logger := util.GetLogger(util.LoggingStateModule, "") gossipChan, _ := g.Accept(func(message interface{}) bool { @@ -96,6 +96,26 @@ func NewGossipStateProvider(chainID string, g gossip.Gossip, committer committer bytes.Equal(message.(*proto.GossipMessage).Channel, []byte(chainID)) }, false) + remoteStateMsgFilter := func(message interface{}) bool { + receivedMsg := message.(proto.ReceivedMessage) + msg := receivedMsg.GetGossipMessage() + if !msg.IsRemoteStateMessage() { + return false + } + // If we're not running with authentication, no point + // in enforcing access control + if !receivedMsg.GetConnectionInfo().IsAuthenticated() { + return true + } + connInfo := receivedMsg.GetConnectionInfo() + authErr := mcs.VerifyByChannel(msg.Channel, connInfo.Identity, connInfo.Auth.Signature, connInfo.Auth.SignedData) + if authErr != nil { + logger.Warning("Got unauthorized state transfer request from", string(connInfo.Identity)) + return false + } + return true + } + // Filter message which are only relevant for state transfer _, commChan := g.Accept(remoteStateMsgFilter, true) @@ -109,6 +129,10 @@ func NewGossipStateProvider(chainID string, g gossip.Gossip, committer committer } s := &GossipStateProviderImpl{ + // MessageCryptoService + mcs: mcs, + + // Chain ID chainID: chainID, // Instance of the gossip diff --git a/gossip/state/state_test.go b/gossip/state/state_test.go index de2de563ba7..dac5d2f9eed 100644 --- a/gossip/state/state_test.go +++ b/gossip/state/state_test.go @@ -24,6 +24,8 @@ import ( "testing" "time" + "errors" + pb "github.com/golang/protobuf/proto" "github.com/hyperledger/fabric/common/configtx/test" "github.com/hyperledger/fabric/common/util" @@ -49,6 +51,12 @@ var ( var orgID = []byte("ORG1") +type peerIdentityAcceptor func(identity api.PeerIdentityType) error + +var noopPeerIdentityAcceptor = func(identity api.PeerIdentityType) error { + return nil +} + type joinChanMsg struct { } @@ -78,30 +86,33 @@ func (*orgCryptoService) Verify(joinChanMsg api.JoinChannelMessage) error { return nil } -type naiveCryptoService struct { +type cryptoServiceMock struct { + acceptor peerIdentityAcceptor } // GetPKIidOfCert returns the PKI-ID of a peer's identity -func (*naiveCryptoService) GetPKIidOfCert(peerIdentity api.PeerIdentityType) common.PKIidType { +func (*cryptoServiceMock) GetPKIidOfCert(peerIdentity api.PeerIdentityType) common.PKIidType { return common.PKIidType(peerIdentity) } // VerifyBlock returns nil if the block is properly signed, // else returns error -func (*naiveCryptoService) VerifyBlock(chainID common.ChainID, signedBlock api.SignedBlock) error { +func (*cryptoServiceMock) VerifyBlock(chainID common.ChainID, signedBlock api.SignedBlock) error { return nil } // Sign signs msg with this peer's signing key and outputs // the signature if no error occurred. -func (*naiveCryptoService) Sign(msg []byte) ([]byte, error) { - return msg, nil +func (*cryptoServiceMock) Sign(msg []byte) ([]byte, error) { + clone := make([]byte, len(msg)) + copy(clone, msg) + return clone, nil } // Verify checks that signature is a valid signature of message under a peer's verification key. // If the verification succeeded, Verify returns nil meaning no error occurred. // If peerCert is nil, then the signature is verified against this peer's verification key. -func (*naiveCryptoService) Verify(peerIdentity api.PeerIdentityType, signature, message []byte) error { +func (*cryptoServiceMock) Verify(peerIdentity api.PeerIdentityType, signature, message []byte) error { equal := bytes.Equal(signature, message) if !equal { return fmt.Errorf("Wrong signature:%v, %v", signature, message) @@ -113,11 +124,11 @@ func (*naiveCryptoService) Verify(peerIdentity api.PeerIdentityType, signature, // under a peer's verification key, but also in the context of a specific channel. // If the verification succeeded, Verify returns nil meaning no error occurred. // If peerIdentity is nil, then the signature is verified against this peer's verification key. -func (*naiveCryptoService) VerifyByChannel(chainID common.ChainID, peerIdentity api.PeerIdentityType, signature, message []byte) error { - return nil +func (cs *cryptoServiceMock) VerifyByChannel(chainID common.ChainID, peerIdentity api.PeerIdentityType, signature, message []byte) error { + return cs.acceptor(peerIdentity) } -func (*naiveCryptoService) ValidateIdentity(peerIdentity api.PeerIdentityType) error { +func (*cryptoServiceMock) ValidateIdentity(peerIdentity api.PeerIdentityType) error { return nil } @@ -132,9 +143,10 @@ func bootPeers(ids ...int) []string { // Simple presentation of peer which includes only // communication module, gossip and state transfer type peerNode struct { - g gossip.Gossip - s GossipStateProvider - + port int + g gossip.Gossip + s GossipStateProvider + cs *cryptoServiceMock commit committer.Committer } @@ -145,13 +157,13 @@ func (node *peerNode) shutdown() { } // Default configuration to be used for gossip and communication modules -func newGossipConfig(id int, maxMsgCount int, boot ...int) *gossip.Config { +func newGossipConfig(id int, boot ...int) *gossip.Config { port := id + portPrefix return &gossip.Config{ BindPort: port, BootstrapPeers: bootPeers(boot...), ID: fmt.Sprintf("p%d", id), - MaxBlockCountToStore: maxMsgCount, + MaxBlockCountToStore: 0, MaxPropagationBurstLatency: time.Duration(10) * time.Millisecond, MaxPropagationBurstSize: 10, PropagateIterations: 1, @@ -166,11 +178,10 @@ func newGossipConfig(id int, maxMsgCount int, boot ...int) *gossip.Config { } // Create gossip instance -func newGossipInstance(config *gossip.Config) gossip.Gossip { - cryptoService := &naiveCryptoService{} - idMapper := identity.NewIdentityMapper(cryptoService) +func newGossipInstance(config *gossip.Config, mcs api.MessageCryptoService) gossip.Gossip { + idMapper := identity.NewIdentityMapper(mcs) - return gossip.NewGossipServiceWithServer(config, &orgCryptoService{}, cryptoService, idMapper, []byte(config.InternalEndpoint)) + return gossip.NewGossipServiceWithServer(config, &orgCryptoService{}, mcs, idMapper, []byte(config.InternalEndpoint)) } // Create new instance of KVLedger to be used for testing @@ -182,24 +193,117 @@ func newCommitter(id int) committer.Committer { } // Constructing pseudo peer node, simulating only gossip and state transfer part -func newPeerNode(config *gossip.Config, committer committer.Committer) *peerNode { - +func newPeerNode(config *gossip.Config, committer committer.Committer, acceptor peerIdentityAcceptor) *peerNode { + cs := &cryptoServiceMock{acceptor: acceptor} // Gossip component based on configuration provided and communication module - gossip := newGossipInstance(config) + gossip := newGossipInstance(config, &cryptoServiceMock{acceptor: noopPeerIdentityAcceptor}) logger.Debug("Joinning channel", util.GetTestChainID()) gossip.JoinChan(&joinChanMsg{}, common.ChainID(util.GetTestChainID())) // Initialize pseudo peer simulator, which has only three // basic parts - return &peerNode{ - g: gossip, - s: NewGossipStateProvider(util.GetTestChainID(), gossip, committer), + return &peerNode{ + port: config.BindPort, + g: gossip, + s: NewGossipStateProvider(util.GetTestChainID(), gossip, committer, cs), commit: committer, + cs: cs, } } +func TestAccessControl(t *testing.T) { + viper.Set("peer.fileSystemPath", "/tmp/tests/ledger/node") + ledgermgmt.InitializeTestEnv() + defer ledgermgmt.CleanupTestEnv() + + bootstrapSetSize := 5 + bootstrapSet := make([]*peerNode, 0) + + authorizedPeers := map[string]struct{}{ + "localhost:5610": {}, + "localhost:5615": {}, + "localhost:5618": {}, + "localhost:5621": {}, + } + + blockPullPolicy := func(identity api.PeerIdentityType) error { + if _, isAuthorized := authorizedPeers[string(identity)]; isAuthorized { + return nil + } + return errors.New("Not authorized") + } + + for i := 0; i < bootstrapSetSize; i++ { + committer := newCommitter(i) + bootstrapSet = append(bootstrapSet, newPeerNode(newGossipConfig(i), committer, blockPullPolicy)) + } + + defer func() { + for _, p := range bootstrapSet { + p.shutdown() + } + }() + + msgCount := 5 + + for i := 1; i <= msgCount; i++ { + rawblock := pcomm.NewBlock(uint64(i), []byte{}) + if bytes, err := pb.Marshal(rawblock); err == nil { + payload := &proto.Payload{uint64(i), "", bytes} + bootstrapSet[0].s.AddPayload(payload) + } else { + t.Fail() + } + } + + standardPeerSetSize := 10 + peersSet := make([]*peerNode, 0) + + for i := 0; i < standardPeerSetSize; i++ { + committer := newCommitter(bootstrapSetSize + i) + peersSet = append(peersSet, newPeerNode(newGossipConfig(bootstrapSetSize+i, 0, 1, 2, 3, 4), committer, blockPullPolicy)) + } + + defer func() { + for _, p := range peersSet { + p.shutdown() + } + }() + + waitUntilTrueOrTimeout(t, func() bool { + for _, p := range peersSet { + if len(p.g.PeersOfChannel(common.ChainID(util.GetTestChainID()))) != bootstrapSetSize+standardPeerSetSize-1 { + logger.Debug("Peer discovery has not finished yet") + return false + } + } + logger.Debug("All peer discovered each other!!!") + return true + }, 30*time.Second) + + logger.Debug("Waiting for all blocks to arrive.") + waitUntilTrueOrTimeout(t, func() bool { + logger.Debug("Trying to see all authorized peers get all blocks, and all non-authorized didn't") + for _, p := range peersSet { + height, err := p.commit.LedgerHeight() + id := fmt.Sprintf("localhost:%d", p.port) + if _, isAuthorized := authorizedPeers[id]; isAuthorized { + if height != uint64(msgCount+1) || err != nil { + return false + } + } else { + if err == nil && height > 1 { + assert.Fail(t, "Peer", id, "got message but isn't authorized! Height:", height) + } + } + } + logger.Debug("All peers have same ledger height!!!") + return true + }, 60*time.Second) +} + /*// Simple scenario to start first booting node, gossip a message // then start second node and verify second node also receives it func TestNewGossipStateProvider_GossipingOneMessage(t *testing.T) { @@ -289,7 +393,7 @@ func TestNewGossipStateProvider_SendingManyMessages(t *testing.T) { for i := 0; i < bootstrapSetSize; i++ { committer := newCommitter(i) - bootstrapSet = append(bootstrapSet, newPeerNode(newGossipConfig(i, 100), committer)) + bootstrapSet = append(bootstrapSet, newPeerNode(newGossipConfig(i), committer, noopPeerIdentityAcceptor)) } defer func() { @@ -315,7 +419,7 @@ func TestNewGossipStateProvider_SendingManyMessages(t *testing.T) { for i := 0; i < standartPeersSize; i++ { committer := newCommitter(bootstrapSetSize + i) - peersSet = append(peersSet, newPeerNode(newGossipConfig(bootstrapSetSize+i, 100, 0, 1, 2, 3, 4), committer)) + peersSet = append(peersSet, newPeerNode(newGossipConfig(bootstrapSetSize+i, 0, 1, 2, 3, 4), committer, noopPeerIdentityAcceptor)) } defer func() { @@ -354,14 +458,18 @@ func TestGossipStateProvider_TestStateMessages(t *testing.T) { ledgermgmt.InitializeTestEnv() defer ledgermgmt.CleanupTestEnv() - bootPeer := newPeerNode(newGossipConfig(0, 100), newCommitter(0)) + bootPeer := newPeerNode(newGossipConfig(0), newCommitter(0), noopPeerIdentityAcceptor) defer bootPeer.shutdown() - peer := newPeerNode(newGossipConfig(1, 100, 0), newCommitter(1)) + peer := newPeerNode(newGossipConfig(1, 0), newCommitter(1), noopPeerIdentityAcceptor) defer peer.shutdown() - _, bootCh := bootPeer.g.Accept(remoteStateMsgFilter, true) - _, peerCh := peer.g.Accept(remoteStateMsgFilter, true) + naiveStateMsgPredicate := func(message interface{}) bool { + return message.(proto.ReceivedMessage).GetGossipMessage().IsRemoteStateMessage() + } + + _, bootCh := bootPeer.g.Accept(naiveStateMsgPredicate, true) + _, peerCh := peer.g.Accept(naiveStateMsgPredicate, true) wg := sync.WaitGroup{} wg.Add(2)