Skip to content

Commit

Permalink
Merge "[FAB-2424] Gossip state transfer: channel validation"
Browse files Browse the repository at this point in the history
  • Loading branch information
mastersingh24 authored and Gerrit Code Review committed Mar 1, 2017
2 parents 9b4c8ed + fa7488c commit 6325834
Show file tree
Hide file tree
Showing 3 changed files with 172 additions and 38 deletions.
8 changes: 5 additions & 3 deletions gossip/service/gossip_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,8 @@ type gossipServiceImpl struct {
deliveryService deliverclient.DeliverService
deliveryFactory DeliveryServiceFactory
lock sync.RWMutex
msgCrypto identity.Mapper
idMapper identity.Mapper
mcs api.MessageCryptoService
peerIdentity []byte
secAdv api.SecurityAdvisor
}
Expand Down Expand Up @@ -132,10 +133,11 @@ func InitGossipServiceCustomDeliveryFactory(peerIdentity []byte, endpoint string

gossip := integration.NewGossipComponent(peerIdentity, endpoint, s, secAdv, mcs, idMapper, dialOpts, bootPeers...)
gossipServiceInstance = &gossipServiceImpl{
mcs: mcs,
gossipSvc: gossip,
chains: make(map[string]state.GossipStateProvider),
deliveryFactory: factory,
msgCrypto: idMapper,
idMapper: idMapper,
peerIdentity: peerIdentity,
secAdv: secAdv,
}
Expand All @@ -158,7 +160,7 @@ func (g *gossipServiceImpl) InitializeChannel(chainID string, committer committe
defer g.lock.Unlock()
// Initialize new state provider for given committer
logger.Debug("Creating state provider for chainID", chainID)
g.chains[chainID] = state.NewGossipStateProvider(chainID, g, committer)
g.chains[chainID] = state.NewGossipStateProvider(chainID, g, committer, g.mcs)
if g.deliveryService == nil {
var err error
g.deliveryService, err = g.deliveryFactory.Service(gossipServiceInstance)
Expand Down
34 changes: 29 additions & 5 deletions gossip/state/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (

pb "github.com/golang/protobuf/proto"
"github.com/hyperledger/fabric/core/committer"
"github.com/hyperledger/fabric/gossip/api"
"github.com/hyperledger/fabric/gossip/comm"
common2 "github.com/hyperledger/fabric/gossip/common"
"github.com/hyperledger/fabric/gossip/gossip"
Expand All @@ -47,10 +48,6 @@ type GossipStateProvider interface {
Stop()
}

var remoteStateMsgFilter = func(message interface{}) bool {
return message.(proto.ReceivedMessage).GetGossipMessage().IsRemoteStateMessage()
}

const (
defPollingPeriod = 200 * time.Millisecond
defAntiEntropyInterval = 10 * time.Second
Expand All @@ -60,6 +57,9 @@ const (
// the struct to handle in memory sliding window of
// new ledger block to be acquired by hyper ledger
type GossipStateProviderImpl struct {
// MessageCryptoService
mcs api.MessageCryptoService

// Chain id
chainID string

Expand Down Expand Up @@ -87,7 +87,7 @@ type GossipStateProviderImpl struct {
}

// NewGossipStateProvider creates initialized instance of gossip state provider
func NewGossipStateProvider(chainID string, g gossip.Gossip, committer committer.Committer) GossipStateProvider {
func NewGossipStateProvider(chainID string, g gossip.Gossip, committer committer.Committer, mcs api.MessageCryptoService) GossipStateProvider {
logger := util.GetLogger(util.LoggingStateModule, "")

gossipChan, _ := g.Accept(func(message interface{}) bool {
Expand All @@ -96,6 +96,26 @@ func NewGossipStateProvider(chainID string, g gossip.Gossip, committer committer
bytes.Equal(message.(*proto.GossipMessage).Channel, []byte(chainID))
}, false)

remoteStateMsgFilter := func(message interface{}) bool {
receivedMsg := message.(proto.ReceivedMessage)
msg := receivedMsg.GetGossipMessage()
if !msg.IsRemoteStateMessage() {
return false
}
// If we're not running with authentication, no point
// in enforcing access control
if !receivedMsg.GetConnectionInfo().IsAuthenticated() {
return true
}
connInfo := receivedMsg.GetConnectionInfo()
authErr := mcs.VerifyByChannel(msg.Channel, connInfo.Identity, connInfo.Auth.Signature, connInfo.Auth.SignedData)
if authErr != nil {
logger.Warning("Got unauthorized state transfer request from", string(connInfo.Identity))
return false
}
return true
}

// Filter message which are only relevant for state transfer
_, commChan := g.Accept(remoteStateMsgFilter, true)

Expand All @@ -109,6 +129,10 @@ func NewGossipStateProvider(chainID string, g gossip.Gossip, committer committer
}

s := &GossipStateProviderImpl{
// MessageCryptoService
mcs: mcs,

// Chain ID
chainID: chainID,

// Instance of the gossip
Expand Down
168 changes: 138 additions & 30 deletions gossip/state/state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import (
"testing"
"time"

"errors"

pb "github.com/golang/protobuf/proto"
"github.com/hyperledger/fabric/common/configtx/test"
"github.com/hyperledger/fabric/common/util"
Expand All @@ -49,6 +51,12 @@ var (

var orgID = []byte("ORG1")

type peerIdentityAcceptor func(identity api.PeerIdentityType) error

var noopPeerIdentityAcceptor = func(identity api.PeerIdentityType) error {
return nil
}

type joinChanMsg struct {
}

Expand Down Expand Up @@ -78,30 +86,33 @@ func (*orgCryptoService) Verify(joinChanMsg api.JoinChannelMessage) error {
return nil
}

type naiveCryptoService struct {
type cryptoServiceMock struct {
acceptor peerIdentityAcceptor
}

// GetPKIidOfCert returns the PKI-ID of a peer's identity
func (*naiveCryptoService) GetPKIidOfCert(peerIdentity api.PeerIdentityType) common.PKIidType {
func (*cryptoServiceMock) GetPKIidOfCert(peerIdentity api.PeerIdentityType) common.PKIidType {
return common.PKIidType(peerIdentity)
}

// VerifyBlock returns nil if the block is properly signed,
// else returns error
func (*naiveCryptoService) VerifyBlock(chainID common.ChainID, signedBlock api.SignedBlock) error {
func (*cryptoServiceMock) VerifyBlock(chainID common.ChainID, signedBlock api.SignedBlock) error {
return nil
}

// Sign signs msg with this peer's signing key and outputs
// the signature if no error occurred.
func (*naiveCryptoService) Sign(msg []byte) ([]byte, error) {
return msg, nil
func (*cryptoServiceMock) Sign(msg []byte) ([]byte, error) {
clone := make([]byte, len(msg))
copy(clone, msg)
return clone, nil
}

// Verify checks that signature is a valid signature of message under a peer's verification key.
// If the verification succeeded, Verify returns nil meaning no error occurred.
// If peerCert is nil, then the signature is verified against this peer's verification key.
func (*naiveCryptoService) Verify(peerIdentity api.PeerIdentityType, signature, message []byte) error {
func (*cryptoServiceMock) Verify(peerIdentity api.PeerIdentityType, signature, message []byte) error {
equal := bytes.Equal(signature, message)
if !equal {
return fmt.Errorf("Wrong signature:%v, %v", signature, message)
Expand All @@ -113,11 +124,11 @@ func (*naiveCryptoService) Verify(peerIdentity api.PeerIdentityType, signature,
// under a peer's verification key, but also in the context of a specific channel.
// If the verification succeeded, Verify returns nil meaning no error occurred.
// If peerIdentity is nil, then the signature is verified against this peer's verification key.
func (*naiveCryptoService) VerifyByChannel(chainID common.ChainID, peerIdentity api.PeerIdentityType, signature, message []byte) error {
return nil
func (cs *cryptoServiceMock) VerifyByChannel(chainID common.ChainID, peerIdentity api.PeerIdentityType, signature, message []byte) error {
return cs.acceptor(peerIdentity)
}

func (*naiveCryptoService) ValidateIdentity(peerIdentity api.PeerIdentityType) error {
func (*cryptoServiceMock) ValidateIdentity(peerIdentity api.PeerIdentityType) error {
return nil
}

Expand All @@ -132,9 +143,10 @@ func bootPeers(ids ...int) []string {
// Simple presentation of peer which includes only
// communication module, gossip and state transfer
type peerNode struct {
g gossip.Gossip
s GossipStateProvider

port int
g gossip.Gossip
s GossipStateProvider
cs *cryptoServiceMock
commit committer.Committer
}

Expand All @@ -145,13 +157,13 @@ func (node *peerNode) shutdown() {
}

// Default configuration to be used for gossip and communication modules
func newGossipConfig(id int, maxMsgCount int, boot ...int) *gossip.Config {
func newGossipConfig(id int, boot ...int) *gossip.Config {
port := id + portPrefix
return &gossip.Config{
BindPort: port,
BootstrapPeers: bootPeers(boot...),
ID: fmt.Sprintf("p%d", id),
MaxBlockCountToStore: maxMsgCount,
MaxBlockCountToStore: 0,
MaxPropagationBurstLatency: time.Duration(10) * time.Millisecond,
MaxPropagationBurstSize: 10,
PropagateIterations: 1,
Expand All @@ -166,11 +178,10 @@ func newGossipConfig(id int, maxMsgCount int, boot ...int) *gossip.Config {
}

// Create gossip instance
func newGossipInstance(config *gossip.Config) gossip.Gossip {
cryptoService := &naiveCryptoService{}
idMapper := identity.NewIdentityMapper(cryptoService)
func newGossipInstance(config *gossip.Config, mcs api.MessageCryptoService) gossip.Gossip {
idMapper := identity.NewIdentityMapper(mcs)

return gossip.NewGossipServiceWithServer(config, &orgCryptoService{}, cryptoService, idMapper, []byte(config.InternalEndpoint))
return gossip.NewGossipServiceWithServer(config, &orgCryptoService{}, mcs, idMapper, []byte(config.InternalEndpoint))
}

// Create new instance of KVLedger to be used for testing
Expand All @@ -182,24 +193,117 @@ func newCommitter(id int) committer.Committer {
}

// Constructing pseudo peer node, simulating only gossip and state transfer part
func newPeerNode(config *gossip.Config, committer committer.Committer) *peerNode {

func newPeerNode(config *gossip.Config, committer committer.Committer, acceptor peerIdentityAcceptor) *peerNode {
cs := &cryptoServiceMock{acceptor: acceptor}
// Gossip component based on configuration provided and communication module
gossip := newGossipInstance(config)
gossip := newGossipInstance(config, &cryptoServiceMock{acceptor: noopPeerIdentityAcceptor})

logger.Debug("Joinning channel", util.GetTestChainID())
gossip.JoinChan(&joinChanMsg{}, common.ChainID(util.GetTestChainID()))

// Initialize pseudo peer simulator, which has only three
// basic parts
return &peerNode{
g: gossip,
s: NewGossipStateProvider(util.GetTestChainID(), gossip, committer),

return &peerNode{
port: config.BindPort,
g: gossip,
s: NewGossipStateProvider(util.GetTestChainID(), gossip, committer, cs),
commit: committer,
cs: cs,
}
}

func TestAccessControl(t *testing.T) {
viper.Set("peer.fileSystemPath", "/tmp/tests/ledger/node")
ledgermgmt.InitializeTestEnv()
defer ledgermgmt.CleanupTestEnv()

bootstrapSetSize := 5
bootstrapSet := make([]*peerNode, 0)

authorizedPeers := map[string]struct{}{
"localhost:5610": {},
"localhost:5615": {},
"localhost:5618": {},
"localhost:5621": {},
}

blockPullPolicy := func(identity api.PeerIdentityType) error {
if _, isAuthorized := authorizedPeers[string(identity)]; isAuthorized {
return nil
}
return errors.New("Not authorized")
}

for i := 0; i < bootstrapSetSize; i++ {
committer := newCommitter(i)
bootstrapSet = append(bootstrapSet, newPeerNode(newGossipConfig(i), committer, blockPullPolicy))
}

defer func() {
for _, p := range bootstrapSet {
p.shutdown()
}
}()

msgCount := 5

for i := 1; i <= msgCount; i++ {
rawblock := pcomm.NewBlock(uint64(i), []byte{})
if bytes, err := pb.Marshal(rawblock); err == nil {
payload := &proto.Payload{uint64(i), "", bytes}
bootstrapSet[0].s.AddPayload(payload)
} else {
t.Fail()
}
}

standardPeerSetSize := 10
peersSet := make([]*peerNode, 0)

for i := 0; i < standardPeerSetSize; i++ {
committer := newCommitter(bootstrapSetSize + i)
peersSet = append(peersSet, newPeerNode(newGossipConfig(bootstrapSetSize+i, 0, 1, 2, 3, 4), committer, blockPullPolicy))
}

defer func() {
for _, p := range peersSet {
p.shutdown()
}
}()

waitUntilTrueOrTimeout(t, func() bool {
for _, p := range peersSet {
if len(p.g.PeersOfChannel(common.ChainID(util.GetTestChainID()))) != bootstrapSetSize+standardPeerSetSize-1 {
logger.Debug("Peer discovery has not finished yet")
return false
}
}
logger.Debug("All peer discovered each other!!!")
return true
}, 30*time.Second)

logger.Debug("Waiting for all blocks to arrive.")
waitUntilTrueOrTimeout(t, func() bool {
logger.Debug("Trying to see all authorized peers get all blocks, and all non-authorized didn't")
for _, p := range peersSet {
height, err := p.commit.LedgerHeight()
id := fmt.Sprintf("localhost:%d", p.port)
if _, isAuthorized := authorizedPeers[id]; isAuthorized {
if height != uint64(msgCount+1) || err != nil {
return false
}
} else {
if err == nil && height > 1 {
assert.Fail(t, "Peer", id, "got message but isn't authorized! Height:", height)
}
}
}
logger.Debug("All peers have same ledger height!!!")
return true
}, 60*time.Second)
}

/*// Simple scenario to start first booting node, gossip a message
// then start second node and verify second node also receives it
func TestNewGossipStateProvider_GossipingOneMessage(t *testing.T) {
Expand Down Expand Up @@ -289,7 +393,7 @@ func TestNewGossipStateProvider_SendingManyMessages(t *testing.T) {

for i := 0; i < bootstrapSetSize; i++ {
committer := newCommitter(i)
bootstrapSet = append(bootstrapSet, newPeerNode(newGossipConfig(i, 100), committer))
bootstrapSet = append(bootstrapSet, newPeerNode(newGossipConfig(i), committer, noopPeerIdentityAcceptor))
}

defer func() {
Expand All @@ -315,7 +419,7 @@ func TestNewGossipStateProvider_SendingManyMessages(t *testing.T) {

for i := 0; i < standartPeersSize; i++ {
committer := newCommitter(bootstrapSetSize + i)
peersSet = append(peersSet, newPeerNode(newGossipConfig(bootstrapSetSize+i, 100, 0, 1, 2, 3, 4), committer))
peersSet = append(peersSet, newPeerNode(newGossipConfig(bootstrapSetSize+i, 0, 1, 2, 3, 4), committer, noopPeerIdentityAcceptor))
}

defer func() {
Expand Down Expand Up @@ -354,14 +458,18 @@ func TestGossipStateProvider_TestStateMessages(t *testing.T) {
ledgermgmt.InitializeTestEnv()
defer ledgermgmt.CleanupTestEnv()

bootPeer := newPeerNode(newGossipConfig(0, 100), newCommitter(0))
bootPeer := newPeerNode(newGossipConfig(0), newCommitter(0), noopPeerIdentityAcceptor)
defer bootPeer.shutdown()

peer := newPeerNode(newGossipConfig(1, 100, 0), newCommitter(1))
peer := newPeerNode(newGossipConfig(1, 0), newCommitter(1), noopPeerIdentityAcceptor)
defer peer.shutdown()

_, bootCh := bootPeer.g.Accept(remoteStateMsgFilter, true)
_, peerCh := peer.g.Accept(remoteStateMsgFilter, true)
naiveStateMsgPredicate := func(message interface{}) bool {
return message.(proto.ReceivedMessage).GetGossipMessage().IsRemoteStateMessage()
}

_, bootCh := bootPeer.g.Accept(naiveStateMsgPredicate, true)
_, peerCh := peer.g.Accept(naiveStateMsgPredicate, true)

wg := sync.WaitGroup{}
wg.Add(2)
Expand Down

0 comments on commit 6325834

Please sign in to comment.