diff --git a/gossip/state/payloads_buffer.go b/gossip/state/payloads_buffer.go index 47c63bdbe86..f8c6c541bb2 100644 --- a/gossip/state/payloads_buffer.go +++ b/gossip/state/payloads_buffer.go @@ -123,8 +123,8 @@ func (b *PayloadsBufferImpl) Pop() *proto.Payload { // Size returns current number of payloads stored within buffer func (b *PayloadsBufferImpl) Size() int { - b.mutex.Lock() - defer b.mutex.Unlock() + b.mutex.RLock() + defer b.mutex.RUnlock() return len(b.buf) } diff --git a/gossip/state/state.go b/gossip/state/state.go index 08eb1264dd1..b7fa5194333 100644 --- a/gossip/state/state.go +++ b/gossip/state/state.go @@ -24,6 +24,7 @@ import ( "github.com/hyperledger/fabric/protos/common" proto "github.com/hyperledger/fabric/protos/gossip" "github.com/op/go-logging" + "github.com/spf13/viper" ) // GossipStateProvider is the interface to acquire sequences of the ledger blocks @@ -48,6 +49,11 @@ const ( defAntiEntropyMaxRetries = 3 defMaxBlockDistance = 100 + + blocking = true + nonBlocking = false + + enqueueRetryInterval = time.Millisecond * 100 ) // GossipAdapter defines gossip/communication required interface for state provider @@ -445,7 +451,7 @@ func (s *GossipStateProviderImpl) queueNewMessage(msg *proto.GossipMessage) { dataMsg := msg.GetDataMsg() if dataMsg != nil { - if err := s.AddPayload(dataMsg.GetPayload()); err != nil { + if err := s.addPayload(dataMsg.GetPayload(), nonBlocking); err != nil { logger.Warning("Failed adding payload:", err) return } @@ -668,8 +674,20 @@ func (s *GossipStateProviderImpl) GetBlock(index uint64) *common.Block { return nil } -// AddPayload add new payload into state +// AddPayload add new payload into state. func (s *GossipStateProviderImpl) AddPayload(payload *proto.Payload) error { + blockingMode := blocking + if viper.GetBool("peer.gossip.nonBlockingCommitMode") { + blockingMode = false + } + return s.addPayload(payload, blockingMode) +} + +// addPayload add new payload into state. It may (or may not) block according to the +// given parameter. If it gets a block while in blocking mode - it would wait until +// the block is sent into the payloads buffer. +// Else - it may drop the block, if the payload buffer is too full. +func (s *GossipStateProviderImpl) addPayload(payload *proto.Payload, blockingMode bool) error { if payload == nil { return errors.New("Given payload is nil") } @@ -679,10 +697,14 @@ func (s *GossipStateProviderImpl) AddPayload(payload *proto.Payload) error { return fmt.Errorf("Failed obtaining ledger height: %v", err) } - if payload.SeqNum-height >= defMaxBlockDistance { + if !blockingMode && payload.SeqNum-height >= defMaxBlockDistance { return fmt.Errorf("Ledger height is at %d, cannot enqueue block with sequence of %d", height, payload.SeqNum) } + for blockingMode && s.payloads.Size() > defMaxBlockDistance*2 { + time.Sleep(enqueueRetryInterval) + } + return s.payloads.Push(payload) } diff --git a/gossip/state/state_test.go b/gossip/state/state_test.go index d28497d2134..8ff855aeb9e 100644 --- a/gossip/state/state_test.go +++ b/gossip/state/state_test.go @@ -10,6 +10,7 @@ import ( "bytes" "errors" "fmt" + "math/rand" "strconv" "sync" "testing" @@ -147,7 +148,7 @@ func bootPeers(ids ...int) []string { type peerNode struct { port int g gossip.Gossip - s GossipStateProvider + s *GossipStateProviderImpl cs *cryptoServiceMock commit committer.Committer } @@ -164,17 +165,21 @@ type mockCommitter struct { } func (mc *mockCommitter) Commit(block *pcomm.Block) error { - mc.Called(block) + mc.Lock() + m := mc.Mock + mc.Unlock() + m.Called(block) return nil } func (mc *mockCommitter) LedgerHeight() (uint64, error) { mc.Lock() - defer mc.Unlock() - if mc.Called().Get(1) == nil { - return mc.Called().Get(0).(uint64), nil + m := mc.Mock + mc.Unlock() + if m.Called().Get(1) == nil { + return m.Called().Get(0).(uint64), nil } - return mc.Called().Get(0).(uint64), mc.Called().Get(1).(error) + return m.Called().Get(0).(uint64), m.Called().Get(1).(error) } func (mc *mockCommitter) GetBlocks(blockSeqs []uint64) []*pcomm.Block { @@ -246,7 +251,7 @@ func newPeerNodeWithGossip(config *gossip.Config, committer committer.Committer, return &peerNode{ port: config.BindPort, g: g, - s: sp, + s: sp.(*GossipStateProviderImpl), commit: committer, cs: cs, } @@ -265,13 +270,13 @@ func TestNilDirectMsg(t *testing.T) { g.On("Accept", mock.Anything, true).Return(nil, make(<-chan proto.ReceivedMessage)) p := newPeerNodeWithGossip(newGossipConfig(0), mc, noopPeerIdentityAcceptor, g) defer p.shutdown() - p.s.(*GossipStateProviderImpl).handleStateRequest(nil) - p.s.(*GossipStateProviderImpl).directMessage(nil) - sMsg, _ := p.s.(*GossipStateProviderImpl).stateRequestMessage(uint64(10), uint64(8)).NoopSign() + p.s.handleStateRequest(nil) + p.s.directMessage(nil) + sMsg, _ := p.s.stateRequestMessage(uint64(10), uint64(8)).NoopSign() req := &comm.ReceivedMessageImpl{ SignedGossipMessage: sMsg, } - p.s.(*GossipStateProviderImpl).directMessage(req) + p.s.directMessage(req) } func TestNilAddPayload(t *testing.T) { @@ -335,10 +340,10 @@ func TestOverPopulation(t *testing.T) { for i := 1; i <= 4; i++ { rawblock := pcomm.NewBlock(uint64(i), []byte{}) b, _ := pb.Marshal(rawblock) - assert.NoError(t, p.s.AddPayload(&proto.Payload{ + assert.NoError(t, p.s.addPayload(&proto.Payload{ SeqNum: uint64(i), Data: b, - })) + }, nonBlocking)) } // Add payloads from 10 to defMaxBlockDistance, while we're missing blocks [5,9] @@ -346,10 +351,10 @@ func TestOverPopulation(t *testing.T) { for i := 10; i <= defMaxBlockDistance; i++ { rawblock := pcomm.NewBlock(uint64(i), []byte{}) b, _ := pb.Marshal(rawblock) - assert.NoError(t, p.s.AddPayload(&proto.Payload{ + assert.NoError(t, p.s.addPayload(&proto.Payload{ SeqNum: uint64(i), Data: b, - })) + }, nonBlocking)) } // Add payloads from defMaxBlockDistance + 2 to defMaxBlockDistance * 10 @@ -357,10 +362,10 @@ func TestOverPopulation(t *testing.T) { for i := defMaxBlockDistance + 1; i <= defMaxBlockDistance*10; i++ { rawblock := pcomm.NewBlock(uint64(i), []byte{}) b, _ := pb.Marshal(rawblock) - assert.Error(t, p.s.AddPayload(&proto.Payload{ + assert.Error(t, p.s.addPayload(&proto.Payload{ SeqNum: uint64(i), Data: b, - })) + }, nonBlocking)) } // Ensure only blocks 1-4 were passed to the ledger @@ -373,9 +378,76 @@ func TestOverPopulation(t *testing.T) { assert.Equal(t, 5, i) // Ensure we don't store too many blocks in memory - sp := p.s.(*GossipStateProviderImpl) + sp := p.s assert.True(t, sp.payloads.Size() < defMaxBlockDistance) +} +func TestBlockingEnqueue(t *testing.T) { + // Scenario: In parallel, get blocks from gossip and from the orderer. + // The blocks from the orderer we get are X2 times the amount of blocks from gossip. + // The blocks we get from gossip are random indices, to maximize disruption. + mc := &mockCommitter{} + blocksPassedToLedger := make(chan uint64, 10) + mc.On("Commit", mock.Anything).Run(func(arg mock.Arguments) { + blocksPassedToLedger <- arg.Get(0).(*pcomm.Block).Header.Number + }) + mc.On("LedgerHeight", mock.Anything).Return(uint64(1), nil) + g := &mocks.GossipMock{} + g.On("Accept", mock.Anything, false).Return(make(<-chan *proto.GossipMessage), nil) + g.On("Accept", mock.Anything, true).Return(nil, make(<-chan proto.ReceivedMessage)) + p := newPeerNode(newGossipConfig(0), mc, noopPeerIdentityAcceptor) + defer p.shutdown() + + numBlocksReceived := 500 + receivedBlockCount := 0 + // Get a block from the orderer every 1ms + go func() { + for i := 1; i <= numBlocksReceived; i++ { + rawblock := pcomm.NewBlock(uint64(i), []byte{}) + b, _ := pb.Marshal(rawblock) + block := &proto.Payload{ + SeqNum: uint64(i), + Data: b, + } + p.s.AddPayload(block) + time.Sleep(time.Millisecond) + } + }() + + // Get a block from gossip every 1ms too + go func() { + rand.Seed(time.Now().UnixNano()) + for i := 1; i <= numBlocksReceived/2; i++ { + blockSeq := rand.Intn(numBlocksReceived) + rawblock := pcomm.NewBlock(uint64(blockSeq), []byte{}) + b, _ := pb.Marshal(rawblock) + block := &proto.Payload{ + SeqNum: uint64(blockSeq), + Data: b, + } + p.s.addPayload(block, nonBlocking) + time.Sleep(time.Millisecond) + } + }() + + for { + receivedBlock := <-blocksPassedToLedger + receivedBlockCount++ + m := mock.Mock{} + m.On("LedgerHeight", mock.Anything).Return(receivedBlock, nil) + m.On("Commit", mock.Anything).Run(func(arg mock.Arguments) { + blocksPassedToLedger <- arg.Get(0).(*pcomm.Block).Header.Number + }) + mc.Lock() + mc.Mock = m + mc.Unlock() + assert.Equal(t, receivedBlock, uint64(receivedBlockCount)) + if int(receivedBlockCount) == numBlocksReceived { + break + } + time.Sleep(time.Millisecond * 10) + t.Log("got block", receivedBlock) + } } func TestFailures(t *testing.T) {