From 56596566332f8cc1026e051a52b8aec09064d41a Mon Sep 17 00:00:00 2001 From: Luis Sanchez Date: Wed, 28 Sep 2016 13:55:58 -0400 Subject: [PATCH] Apply QueueSize on a per-client basis. FAB-507 https://jira.hyperledger.org/browse/FAB-507 The implementation of the solo orderer has a send queue of size QueueSize that is shared amoungst all clients. This changeset provides each client with it's own send queue of size QueueSize. Change-Id: I2ecb690f484ba1e531c99c3f5e6ff046239cb0a0 Signed-off-by: Luis Sanchez --- orderer/solo/broadcast.go | 50 +++++++++++++++++++++++++++++++--- orderer/solo/broadcast_test.go | 38 ++++++++++++++++++++++++-- 2 files changed, 82 insertions(+), 6 deletions(-) diff --git a/orderer/solo/broadcast.go b/orderer/solo/broadcast.go index 2212e30e0df..45f7a16f87c 100644 --- a/orderer/solo/broadcast.go +++ b/orderer/solo/broadcast.go @@ -24,10 +24,11 @@ import ( ) type broadcastServer struct { - queue chan *ab.BroadcastMessage + queueSize int batchSize int batchTimeout time.Duration rl rawledger.Writer + sendChan chan *ab.BroadcastMessage exitChan chan struct{} } @@ -39,10 +40,11 @@ func newBroadcastServer(queueSize, batchSize int, batchTimeout time.Duration, rl func newPlainBroadcastServer(queueSize, batchSize int, batchTimeout time.Duration, rl rawledger.Writer) *broadcastServer { bs := &broadcastServer{ - queue: make(chan *ab.BroadcastMessage, queueSize), + queueSize: queueSize, batchSize: batchSize, batchTimeout: batchTimeout, rl: rl, + sendChan: make(chan *ab.BroadcastMessage), exitChan: make(chan struct{}), } return bs @@ -59,7 +61,7 @@ outer: timer := time.After(bs.batchTimeout) for { select { - case msg := <-bs.queue: + case msg := <-bs.sendChan: curBatch = append(curBatch, msg) if len(curBatch) < bs.batchSize { continue @@ -83,6 +85,38 @@ outer: } func (bs *broadcastServer) handleBroadcast(srv ab.AtomicBroadcast_BroadcastServer) error { + b := newBroadcaster(bs) + defer close(b.queue) + go b.drainQueue() + return b.queueBroadcastMessages(srv) +} + +type broadcaster struct { + bs *broadcastServer + queue chan *ab.BroadcastMessage +} + +func (b *broadcaster) drainQueue() { + for { + select { + case msg, ok := <-b.queue: + if ok { + select { + case b.bs.sendChan <- msg: + case <-b.bs.exitChan: + return + } + } else { + return + } + case <-b.bs.exitChan: + return + } + } +} + +func (b *broadcaster) queueBroadcastMessages(srv ab.AtomicBroadcast_BroadcastServer) error { + for { msg, err := srv.Recv() if err != nil { @@ -97,7 +131,7 @@ func (bs *broadcastServer) handleBroadcast(srv ab.AtomicBroadcast_BroadcastServe } select { - case bs.queue <- msg: + case b.queue <- msg: err = srv.Send(&ab.BroadcastResponse{ab.Status_SUCCESS}) default: err = srv.Send(&ab.BroadcastResponse{ab.Status_SERVICE_UNAVAILABLE}) @@ -108,3 +142,11 @@ func (bs *broadcastServer) handleBroadcast(srv ab.AtomicBroadcast_BroadcastServe } } } + +func newBroadcaster(bs *broadcastServer) *broadcaster { + b := &broadcaster{ + bs: bs, + queue: make(chan *ab.BroadcastMessage, bs.queueSize), + } + return b +} diff --git a/orderer/solo/broadcast_test.go b/orderer/solo/broadcast_test.go index 796fdb71615..bb1e8932180 100644 --- a/orderer/solo/broadcast_test.go +++ b/orderer/solo/broadcast_test.go @@ -57,9 +57,12 @@ func (m *mockB) Recv() (*ab.BroadcastMessage, error) { func TestQueueOverflow(t *testing.T) { bs := newPlainBroadcastServer(2, 1, time.Second, nil) // queueSize, batchSize (unused), batchTimeout (unused), ramLedger (unused) m := newMockB() - go bs.handleBroadcast(m) + b := newBroadcaster(bs) + go b.queueBroadcastMessages(m) defer close(m.recvChan) + bs.halt() + for i := 0; i < 2; i++ { m.recvChan <- &ab.BroadcastMessage{[]byte("Some bytes")} reply := <-m.sendChan @@ -73,6 +76,37 @@ func TestQueueOverflow(t *testing.T) { if reply.Status != ab.Status_SERVICE_UNAVAILABLE { t.Fatalf("Should not have successfully queued the message") } + +} + +func TestMultiQueueOverflow(t *testing.T) { + bs := newPlainBroadcastServer(2, 1, time.Second, nil) // queueSize, batchSize (unused), batchTimeout (unused), ramLedger (unused) + // m := newMockB() + ms := []*mockB{newMockB(), newMockB(), newMockB()} + + for _, m := range ms { + b := newBroadcaster(bs) + go b.queueBroadcastMessages(m) + defer close(m.recvChan) + } + + for _, m := range ms { + for i := 0; i < 2; i++ { + m.recvChan <- &ab.BroadcastMessage{[]byte("Some bytes")} + reply := <-m.sendChan + if reply.Status != ab.Status_SUCCESS { + t.Fatalf("Should have successfully queued the message") + } + } + } + + for _, m := range ms { + m.recvChan <- &ab.BroadcastMessage{[]byte("Some bytes")} + reply := <-m.sendChan + if reply.Status != ab.Status_SERVICE_UNAVAILABLE { + t.Fatalf("Should not have successfully queued the message") + } + } } func TestEmptyBroadcastMessage(t *testing.T) { @@ -103,7 +137,7 @@ func TestFilledBatch(t *testing.T) { defer bs.halt() messages := 11 // Sending 11 messages, with a batch size of 2, ensures the 10th message is processed before we proceed for 5 blocks for i := 0; i < messages; i++ { - bs.queue <- &ab.BroadcastMessage{[]byte("Some bytes")} + bs.sendChan <- &ab.BroadcastMessage{[]byte("Some bytes")} } expected := uint64(1 + messages/batchSize) if bs.rl.(rawledger.Reader).Height() != expected {