From a8af1e93b7d851a19826c59976ba6956959e28a6 Mon Sep 17 00:00:00 2001 From: Jason Yellick Date: Tue, 22 Nov 2016 13:35:32 -0500 Subject: [PATCH] Hook multichain manager into main path The previous changeset created the multichain manager but left it entirely out of the execution path. This changeset hooks the multichain manager into the primary path and removes much of the redundant code it replaces. This changeset only supports solo as a consumer of the multichain manager, but kafka will follow next (as the multichain manager gives multichain support for free to implementers of its Consenter interface). Change-Id: I4e5b19971afe5b434843f3e0011fd18df098aa0f Signed-off-by: Jason Yellick --- orderer/common/blockcutter/blockcutter.go | 2 +- orderer/common/broadcast/broadcast.go | 57 ++++--- orderer/common/broadcast/broadcast_test.go | 182 +++++++++++++++------ orderer/common/deliver/deliver.go | 18 +- orderer/common/deliver/deliver_test.go | 110 +++++++++---- orderer/main.go | 110 ++----------- orderer/sbft/backend/backendab.go | 73 ++++++++- orderer/sbft/main.go | 1 + orderer/sbft/sbft_test.go | 3 +- orderer/server.go | 12 +- orderer/solo/consensus.go | 58 ++++--- orderer/solo/consensus_test.go | 42 ++--- 12 files changed, 401 insertions(+), 267 deletions(-) diff --git a/orderer/common/blockcutter/blockcutter.go b/orderer/common/blockcutter/blockcutter.go index 310a9c0b064..e377c277d79 100644 --- a/orderer/common/blockcutter/blockcutter.go +++ b/orderer/common/blockcutter/blockcutter.go @@ -31,7 +31,7 @@ func init() { logging.SetLevel(logging.DEBUG, "") } -// Target defines a sink for the ordered broadcast messages +// Receiver defines a sink for the ordered broadcast messages type Receiver interface { // Ordered should be invoked sequentially as messages are ordered // If the message is a valid normal message and does not fill the batch, nil, true is returned diff --git a/orderer/common/broadcast/broadcast.go b/orderer/common/broadcast/broadcast.go index 57244b618d2..82f4f35b00f 100644 --- a/orderer/common/broadcast/broadcast.go +++ b/orderer/common/broadcast/broadcast.go @@ -18,10 +18,11 @@ package broadcast import ( "github.com/hyperledger/fabric/orderer/common/broadcastfilter" - "github.com/hyperledger/fabric/orderer/common/configtx" + "github.com/hyperledger/fabric/orderer/multichain" cb "github.com/hyperledger/fabric/protos/common" ab "github.com/hyperledger/fabric/protos/orderer" + "github.com/golang/protobuf/proto" "github.com/op/go-logging" ) @@ -31,12 +32,6 @@ func init() { logging.SetLevel(logging.DEBUG, "") } -// Target defines an interface which the broadcast handler will direct broadcasts to -type Target interface { - // Enqueue accepts a message and returns true on acceptance, or false on shutdown - Enqueue(env *cb.Envelope) bool -} - // Handler defines an interface which handles broadcasts type Handler interface { // Handle starts a service thread for a given gRPC connection and services the broadcast connection @@ -44,21 +39,17 @@ type Handler interface { } type handlerImpl struct { - queueSize int - target Target - filters *broadcastfilter.RuleSet - configManager configtx.Manager - exitChan chan struct{} + queueSize int + ml multichain.Manager + exitChan chan struct{} } // NewHandlerImpl constructs a new implementation of the Handler interface -func NewHandlerImpl(queueSize int, target Target, filters *broadcastfilter.RuleSet, configManager configtx.Manager) Handler { +func NewHandlerImpl(queueSize int, ml multichain.Manager) Handler { return &handlerImpl{ - queueSize: queueSize, - filters: filters, - configManager: configManager, - target: target, - exitChan: make(chan struct{}), + queueSize: queueSize, + ml: ml, + exitChan: make(chan struct{}), } } @@ -70,15 +61,20 @@ func (bh *handlerImpl) Handle(srv ab.AtomicBroadcast_BroadcastServer) error { return b.queueEnvelopes(srv) } +type msgAndChainSupport struct { + msg *cb.Envelope + chainSupport multichain.ChainSupport +} + type broadcaster struct { bs *handlerImpl - queue chan *cb.Envelope + queue chan *msgAndChainSupport } func newBroadcaster(bs *handlerImpl) *broadcaster { b := &broadcaster{ bs: bs, - queue: make(chan *cb.Envelope, bs.queueSize), + queue: make(chan *msgAndChainSupport, bs.queueSize), } return b } @@ -86,9 +82,9 @@ func newBroadcaster(bs *handlerImpl) *broadcaster { func (b *broadcaster) drainQueue() { for { select { - case msg, ok := <-b.queue: + case msgAndChainSupport, ok := <-b.queue: if ok { - if !b.bs.target.Enqueue(msg) { + if !msgAndChainSupport.chainSupport.Chain().Enqueue(msgAndChainSupport.msg) { return } } else { @@ -108,14 +104,27 @@ func (b *broadcaster) queueEnvelopes(srv ab.AtomicBroadcast_BroadcastServer) err return err } - action, _ := b.bs.filters.Apply(msg) + payload := &cb.Payload{} + err = proto.Unmarshal(msg.Payload, payload) + if payload.Header == nil || payload.Header.ChainHeader == nil || payload.Header.ChainHeader.ChainID == nil { + logger.Debugf("Received malformed message, dropping connection") + return srv.Send(&ab.BroadcastResponse{Status: cb.Status_BAD_REQUEST}) + } + + chainSupport, ok := b.bs.ml.GetChain(payload.Header.ChainHeader.ChainID) + if !ok { + // XXX Hook in chain creation logic here + panic("Unimplemented") + } + + action, _ := chainSupport.Filters().Apply(msg) switch action { case broadcastfilter.Reconfigure: fallthrough case broadcastfilter.Accept: select { - case b.queue <- msg: + case b.queue <- &msgAndChainSupport{msg: msg, chainSupport: chainSupport}: err = srv.Send(&ab.BroadcastResponse{Status: cb.Status_SUCCESS}) default: err = srv.Send(&ab.BroadcastResponse{Status: cb.Status_SERVICE_UNAVAILABLE}) diff --git a/orderer/common/broadcast/broadcast_test.go b/orderer/common/broadcast/broadcast_test.go index a827e986ce0..88265214e39 100644 --- a/orderer/common/broadcast/broadcast_test.go +++ b/orderer/common/broadcast/broadcast_test.go @@ -21,15 +21,20 @@ import ( "fmt" "testing" - "google.golang.org/grpc" - "github.com/golang/protobuf/proto" "github.com/hyperledger/fabric/orderer/common/broadcastfilter" "github.com/hyperledger/fabric/orderer/common/configtx" + "github.com/hyperledger/fabric/orderer/common/policies" + "github.com/hyperledger/fabric/orderer/multichain" + "github.com/hyperledger/fabric/orderer/rawledger" cb "github.com/hyperledger/fabric/protos/common" ab "github.com/hyperledger/fabric/protos/orderer" + + "google.golang.org/grpc" ) +var systemChain = []byte("systemChain") + var configTx []byte func init() { @@ -42,9 +47,7 @@ func init() { type mockConfigManager struct { validated bool - applied bool validateErr error - applyErr error } func (mcm *mockConfigManager) Validate(configtx *cb.ConfigurationEnvelope) error { @@ -53,8 +56,7 @@ func (mcm *mockConfigManager) Validate(configtx *cb.ConfigurationEnvelope) error } func (mcm *mockConfigManager) Apply(message *cb.ConfigurationEnvelope) error { - mcm.applied = true - return mcm.applyErr + panic("Unimplemented") } func (mcm *mockConfigManager) ChainID() []byte { @@ -66,7 +68,12 @@ type mockConfigFilter struct { } func (mcf *mockConfigFilter) Apply(msg *cb.Envelope) broadcastfilter.Action { - if bytes.Equal(msg.Payload, configTx) { + payload := &cb.Payload{} + err := proto.Unmarshal(msg.Payload, payload) + if err != nil { + panic(err) + } + if bytes.Equal(payload.Data, configTx) { if mcf.manager == nil || mcf.manager.Validate(nil) != nil { return broadcastfilter.Reject } @@ -75,24 +82,6 @@ func (mcf *mockConfigFilter) Apply(msg *cb.Envelope) broadcastfilter.Action { return broadcastfilter.Forward } -type mockTarget struct { - queue chan *cb.Envelope - done bool -} - -func (mt *mockTarget) Enqueue(env *cb.Envelope) bool { - mt.queue <- env - return !mt.done -} - -func (mt *mockTarget) halt() { - mt.done = true - select { - case <-mt.queue: - default: - } -} - type mockB struct { grpc.ServerStream recvChan chan *cb.Envelope @@ -119,36 +108,125 @@ func (m *mockB) Recv() (*cb.Envelope, error) { return msg, nil } -func getFiltersConfigMockTarget() (*broadcastfilter.RuleSet, *mockConfigManager, *mockTarget) { +type mockMultichainManager struct { + chains map[string]*mockChainSupport +} + +func (mm *mockMultichainManager) GetChain(chainID []byte) (multichain.ChainSupport, bool) { + chain, ok := mm.chains[string(chainID)] + return chain, ok +} + +func (mm *mockMultichainManager) halt() { + for _, chain := range mm.chains { + chain.mockChain.Halt() + } +} + +type mockChainSupport struct { + configManager *mockConfigManager + filters *broadcastfilter.RuleSet + mockChain *mockChain +} + +func (mcs *mockChainSupport) ConfigManager() configtx.Manager { + return mcs.configManager +} + +func (mcs *mockChainSupport) PolicyManager() policies.Manager { + panic("Unimplemented") +} + +func (mcs *mockChainSupport) Filters() *broadcastfilter.RuleSet { + return mcs.filters +} + +func (mcs *mockChainSupport) Reader() rawledger.Reader { + panic("Unimplemented") +} + +func (mcs *mockChainSupport) Chain() multichain.Chain { + return mcs.mockChain +} + +type mockChain struct { + queue chan *cb.Envelope + done bool +} + +func (mc *mockChain) Enqueue(env *cb.Envelope) bool { + mc.queue <- env + return !mc.done +} + +func (mc *mockChain) Start() { + panic("Unimplemented") +} + +func (mc *mockChain) Halt() { + mc.done = true + select { + case <-mc.queue: + default: + } +} + +func makeMessage(chainID []byte, data []byte) *cb.Envelope { + payload := &cb.Payload{ + Data: data, + Header: &cb.Header{ + ChainHeader: &cb.ChainHeader{ + ChainID: chainID, + }, + }, + } + data, err := proto.Marshal(payload) + if err != nil { + panic(err) + } + env := &cb.Envelope{ + Payload: data, + } + return env +} + +func getMultichainManager() *mockMultichainManager { cm := &mockConfigManager{} filters := broadcastfilter.NewRuleSet([]broadcastfilter.Rule{ broadcastfilter.EmptyRejectRule, &mockConfigFilter{cm}, broadcastfilter.AcceptRule, }) - mt := &mockTarget{queue: make(chan *cb.Envelope)} - return filters, cm, mt - + mc := &mockChain{queue: make(chan *cb.Envelope)} + mm := &mockMultichainManager{ + chains: make(map[string]*mockChainSupport), + } + mm.chains[string(systemChain)] = &mockChainSupport{ + filters: filters, + configManager: cm, + mockChain: mc, + } + return mm } func TestQueueOverflow(t *testing.T) { - filters, cm, mt := getFiltersConfigMockTarget() - defer mt.halt() - bh := NewHandlerImpl(2, mt, filters, cm) + mm := getMultichainManager() + defer mm.halt() + bh := NewHandlerImpl(2, mm) m := newMockB() defer close(m.recvChan) b := newBroadcaster(bh.(*handlerImpl)) go b.queueEnvelopes(m) for i := 0; i < 2; i++ { - m.recvChan <- &cb.Envelope{Payload: []byte("Some bytes")} + m.recvChan <- makeMessage(systemChain, []byte("Some bytes")) reply := <-m.sendChan if reply.Status != cb.Status_SUCCESS { t.Fatalf("Should have successfully queued the message") } } - m.recvChan <- &cb.Envelope{Payload: []byte("Some bytes")} + m.recvChan <- makeMessage(systemChain, []byte("Some bytes")) reply := <-m.sendChan if reply.Status != cb.Status_SERVICE_UNAVAILABLE { t.Fatalf("Should not have successfully queued the message") @@ -157,9 +235,9 @@ func TestQueueOverflow(t *testing.T) { } func TestMultiQueueOverflow(t *testing.T) { - filters, cm, mt := getFiltersConfigMockTarget() - defer mt.halt() - bh := NewHandlerImpl(2, mt, filters, cm) + mm := getMultichainManager() + defer mm.halt() + bh := NewHandlerImpl(2, mm) ms := []*mockB{newMockB(), newMockB(), newMockB()} for _, m := range ms { @@ -170,7 +248,7 @@ func TestMultiQueueOverflow(t *testing.T) { for _, m := range ms { for i := 0; i < 2; i++ { - m.recvChan <- &cb.Envelope{Payload: []byte("Some bytes")} + m.recvChan <- makeMessage(systemChain, []byte("Some bytes")) reply := <-m.sendChan if reply.Status != cb.Status_SUCCESS { t.Fatalf("Should have successfully queued the message") @@ -179,7 +257,7 @@ func TestMultiQueueOverflow(t *testing.T) { } for _, m := range ms { - m.recvChan <- &cb.Envelope{Payload: []byte("Some bytes")} + m.recvChan <- makeMessage(systemChain, []byte("Some bytes")) reply := <-m.sendChan if reply.Status != cb.Status_SERVICE_UNAVAILABLE { t.Fatalf("Should not have successfully queued the message") @@ -188,9 +266,9 @@ func TestMultiQueueOverflow(t *testing.T) { } func TestEmptyEnvelope(t *testing.T) { - filters, cm, mt := getFiltersConfigMockTarget() - defer mt.halt() - bh := NewHandlerImpl(2, mt, filters, cm) + mm := getMultichainManager() + defer mm.halt() + bh := NewHandlerImpl(2, mm) m := newMockB() defer close(m.recvChan) go bh.Handle(m) @@ -204,35 +282,35 @@ func TestEmptyEnvelope(t *testing.T) { } func TestReconfigureAccept(t *testing.T) { - filters, cm, mt := getFiltersConfigMockTarget() - defer mt.halt() - bh := NewHandlerImpl(2, mt, filters, cm) + mm := getMultichainManager() + defer mm.halt() + bh := NewHandlerImpl(2, mm) m := newMockB() defer close(m.recvChan) go bh.Handle(m) - m.recvChan <- &cb.Envelope{Payload: configTx} + m.recvChan <- makeMessage(systemChain, configTx) reply := <-m.sendChan if reply.Status != cb.Status_SUCCESS { t.Fatalf("Should have successfully queued the message") } - if !cm.validated { + if !mm.chains[string(systemChain)].configManager.validated { t.Errorf("ConfigTx should have been validated before processing") } } func TestReconfigureReject(t *testing.T) { - filters, cm, mt := getFiltersConfigMockTarget() - cm.validateErr = fmt.Errorf("Fail to validate") - defer mt.halt() - bh := NewHandlerImpl(2, mt, filters, cm) + mm := getMultichainManager() + mm.chains[string(systemChain)].configManager.validateErr = fmt.Errorf("Fail to validate") + defer mm.halt() + bh := NewHandlerImpl(2, mm) m := newMockB() defer close(m.recvChan) go bh.Handle(m) - m.recvChan <- &cb.Envelope{Payload: configTx} + m.recvChan <- makeMessage(systemChain, configTx) reply := <-m.sendChan if reply.Status != cb.Status_BAD_REQUEST { diff --git a/orderer/common/deliver/deliver.go b/orderer/common/deliver/deliver.go index 7eb87511f5f..faf660560a0 100644 --- a/orderer/common/deliver/deliver.go +++ b/orderer/common/deliver/deliver.go @@ -17,6 +17,7 @@ limitations under the License. package deliver import ( + "github.com/hyperledger/fabric/orderer/multichain" "github.com/hyperledger/fabric/orderer/rawledger" cb "github.com/hyperledger/fabric/protos/common" ab "github.com/hyperledger/fabric/protos/orderer" @@ -35,13 +36,13 @@ type Handler interface { } type DeliverServer struct { - rl rawledger.Reader + ml multichain.Manager maxWindow int } -func NewHandlerImpl(rl rawledger.Reader, maxWindow int) Handler { +func NewHandlerImpl(ml multichain.Manager, maxWindow int) Handler { return &DeliverServer{ - rl: rl, + ml: ml, maxWindow: maxWindow, } } @@ -185,14 +186,21 @@ func (d *deliverer) processUpdate(update *ab.SeekInfo) bool { } logger.Debugf("Updating properties for client") - if update == nil || update.WindowSize == 0 || update.WindowSize > uint64(d.ds.maxWindow) { + if update == nil || update.WindowSize == 0 || update.WindowSize > uint64(d.ds.maxWindow) || update.ChainID == nil { close(d.exitChan) return d.sendErrorReply(cb.Status_BAD_REQUEST) } + chain, ok := d.ds.ml.GetChain(update.ChainID) + if !ok { + return d.sendErrorReply(cb.Status_NOT_FOUND) + } + + // XXX add deliver authorization checking + d.windowSize = update.WindowSize - d.cursor, d.nextBlockNumber = d.ds.rl.Iterator(update.Start, update.SpecifiedNumber) + d.cursor, d.nextBlockNumber = chain.Reader().Iterator(update.Start, update.SpecifiedNumber) d.lastAck = d.nextBlockNumber - 1 return true diff --git a/orderer/common/deliver/deliver_test.go b/orderer/common/deliver/deliver_test.go index 991fbaf9347..0fd1d00944a 100644 --- a/orderer/common/deliver/deliver_test.go +++ b/orderer/common/deliver/deliver_test.go @@ -22,6 +22,11 @@ import ( "time" "github.com/hyperledger/fabric/orderer/common/bootstrap/static" + "github.com/hyperledger/fabric/orderer/common/broadcastfilter" + "github.com/hyperledger/fabric/orderer/common/configtx" + "github.com/hyperledger/fabric/orderer/common/policies" + "github.com/hyperledger/fabric/orderer/multichain" + "github.com/hyperledger/fabric/orderer/rawledger" "github.com/hyperledger/fabric/orderer/rawledger/ramledger" cb "github.com/hyperledger/fabric/protos/common" ab "github.com/hyperledger/fabric/protos/orderer" @@ -31,6 +36,10 @@ import ( var genesisBlock *cb.Block +var systemChainID = []byte("systemChain") + +const ledgerSize = 10 + func init() { bootstrapper := static.New() var err error @@ -69,20 +78,63 @@ func (m *mockD) Recv() (*ab.DeliverUpdate, error) { return msg, nil } -func TestOldestSeek(t *testing.T) { - ledgerSize := 5 +type mockMultichainManager struct { + chains map[string]*mockChainSupport +} + +func (mm *mockMultichainManager) GetChain(chainID []byte) (multichain.ChainSupport, bool) { + cs, ok := mm.chains[string(chainID)] + return cs, ok +} + +type mockChainSupport struct { + ledger rawledger.ReadWriter +} + +func (mcs *mockChainSupport) ConfigManager() configtx.Manager { + panic("Unimplemented") +} + +func (mcs *mockChainSupport) PolicyManager() policies.Manager { + panic("Unimplemented") +} + +func (mcs *mockChainSupport) Filters() *broadcastfilter.RuleSet { + panic("Unimplemented") +} + +func (mcs *mockChainSupport) Reader() rawledger.Reader { + return mcs.ledger +} + +func (mcs *mockChainSupport) Chain() multichain.Chain { + panic("Unimplemented") +} + +func newMockMultichainManager() *mockMultichainManager { _, rl := ramledger.New(ledgerSize, genesisBlock) + mm := &mockMultichainManager{ + chains: make(map[string]*mockChainSupport), + } + mm.chains[string(systemChainID)] = &mockChainSupport{ + ledger: rl, + } + return mm +} + +func TestOldestSeek(t *testing.T) { + mm := newMockMultichainManager() for i := 1; i < ledgerSize; i++ { - rl.Append([]*cb.Envelope{&cb.Envelope{Payload: []byte(fmt.Sprintf("%d", i))}}, nil) + mm.chains[string(systemChainID)].ledger.Append([]*cb.Envelope{&cb.Envelope{Payload: []byte(fmt.Sprintf("%d", i))}}, nil) } m := newMockD() defer close(m.recvChan) - ds := NewHandlerImpl(rl, MagicLargestWindow) + ds := NewHandlerImpl(mm, MagicLargestWindow) go ds.Handle(m) - m.recvChan <- &ab.DeliverUpdate{Type: &ab.DeliverUpdate_Seek{Seek: &ab.SeekInfo{WindowSize: uint64(MagicLargestWindow), Start: ab.SeekInfo_OLDEST}}} + m.recvChan <- &ab.DeliverUpdate{Type: &ab.DeliverUpdate_Seek{Seek: &ab.SeekInfo{WindowSize: uint64(MagicLargestWindow), Start: ab.SeekInfo_OLDEST, ChainID: systemChainID}}} count := 0 for { @@ -102,19 +154,18 @@ func TestOldestSeek(t *testing.T) { } func TestNewestSeek(t *testing.T) { - ledgerSize := 5 - _, rl := ramledger.New(ledgerSize, genesisBlock) + mm := newMockMultichainManager() for i := 1; i < ledgerSize; i++ { - rl.Append([]*cb.Envelope{&cb.Envelope{Payload: []byte(fmt.Sprintf("%d", i))}}, nil) + mm.chains[string(systemChainID)].ledger.Append([]*cb.Envelope{&cb.Envelope{Payload: []byte(fmt.Sprintf("%d", i))}}, nil) } m := newMockD() defer close(m.recvChan) - ds := NewHandlerImpl(rl, MagicLargestWindow) + ds := NewHandlerImpl(mm, MagicLargestWindow) go ds.Handle(m) - m.recvChan <- &ab.DeliverUpdate{Type: &ab.DeliverUpdate_Seek{Seek: &ab.SeekInfo{WindowSize: uint64(MagicLargestWindow), Start: ab.SeekInfo_NEWEST}}} + m.recvChan <- &ab.DeliverUpdate{Type: &ab.DeliverUpdate_Seek{Seek: &ab.SeekInfo{WindowSize: uint64(MagicLargestWindow), Start: ab.SeekInfo_NEWEST, ChainID: systemChainID}}} select { case blockReply := <-m.sendChan: @@ -131,18 +182,18 @@ func TestNewestSeek(t *testing.T) { } func TestSpecificSeek(t *testing.T) { - ledgerSize := 5 - _, rl := ramledger.New(ledgerSize, genesisBlock) + mm := newMockMultichainManager() for i := 1; i < ledgerSize; i++ { - rl.Append([]*cb.Envelope{&cb.Envelope{Payload: []byte(fmt.Sprintf("%d", i))}}, nil) + mm.chains[string(systemChainID)].ledger.Append([]*cb.Envelope{&cb.Envelope{Payload: []byte(fmt.Sprintf("%d", i))}}, nil) } m := newMockD() - ds := NewHandlerImpl(rl, MagicLargestWindow) + defer close(m.recvChan) + ds := NewHandlerImpl(mm, MagicLargestWindow) go ds.Handle(m) - m.recvChan <- &ab.DeliverUpdate{Type: &ab.DeliverUpdate_Seek{Seek: &ab.SeekInfo{WindowSize: uint64(MagicLargestWindow), Start: ab.SeekInfo_SPECIFIED, SpecifiedNumber: uint64(ledgerSize - 1)}}} + m.recvChan <- &ab.DeliverUpdate{Type: &ab.DeliverUpdate_Seek{Seek: &ab.SeekInfo{WindowSize: uint64(MagicLargestWindow), Start: ab.SeekInfo_NEWEST, ChainID: systemChainID}}} select { case blockReply := <-m.sendChan: @@ -151,7 +202,7 @@ func TestSpecificSeek(t *testing.T) { } if blockReply.GetBlock().Header.Number != uint64(ledgerSize-1) { - t.Fatalf("Expected only to get block 4") + t.Fatalf("Expected only the most recent block") } case <-time.After(time.Second): t.Fatalf("Timed out waiting to get all blocks") @@ -159,19 +210,18 @@ func TestSpecificSeek(t *testing.T) { } func TestBadSeek(t *testing.T) { - ledgerSize := 5 - _, rl := ramledger.New(ledgerSize, genesisBlock) + mm := newMockMultichainManager() for i := 1; i < 2*ledgerSize; i++ { - rl.Append([]*cb.Envelope{&cb.Envelope{Payload: []byte(fmt.Sprintf("%d", i))}}, nil) + mm.chains[string(systemChainID)].ledger.Append([]*cb.Envelope{&cb.Envelope{Payload: []byte(fmt.Sprintf("%d", i))}}, nil) } m := newMockD() defer close(m.recvChan) - ds := NewHandlerImpl(rl, MagicLargestWindow) + ds := NewHandlerImpl(mm, MagicLargestWindow) go ds.Handle(m) - m.recvChan <- &ab.DeliverUpdate{Type: &ab.DeliverUpdate_Seek{Seek: &ab.SeekInfo{WindowSize: uint64(MagicLargestWindow), Start: ab.SeekInfo_SPECIFIED, SpecifiedNumber: uint64(ledgerSize - 1)}}} + m.recvChan <- &ab.DeliverUpdate{Type: &ab.DeliverUpdate_Seek{Seek: &ab.SeekInfo{WindowSize: uint64(MagicLargestWindow), Start: ab.SeekInfo_SPECIFIED, SpecifiedNumber: uint64(ledgerSize - 1), ChainID: systemChainID}}} select { case blockReply := <-m.sendChan: @@ -182,7 +232,7 @@ func TestBadSeek(t *testing.T) { t.Fatalf("Timed out waiting to get all blocks") } - m.recvChan <- &ab.DeliverUpdate{Type: &ab.DeliverUpdate_Seek{Seek: &ab.SeekInfo{WindowSize: uint64(MagicLargestWindow), Start: ab.SeekInfo_SPECIFIED, SpecifiedNumber: uint64(3 * ledgerSize)}}} + m.recvChan <- &ab.DeliverUpdate{Type: &ab.DeliverUpdate_Seek{Seek: &ab.SeekInfo{WindowSize: uint64(MagicLargestWindow), Start: ab.SeekInfo_SPECIFIED, SpecifiedNumber: uint64(3 * ledgerSize), ChainID: systemChainID}}} select { case blockReply := <-m.sendChan: @@ -195,16 +245,15 @@ func TestBadSeek(t *testing.T) { } func TestBadWindow(t *testing.T) { - ledgerSize := 5 - _, rl := ramledger.New(ledgerSize, genesisBlock) + mm := newMockMultichainManager() m := newMockD() defer close(m.recvChan) - ds := NewHandlerImpl(rl, MagicLargestWindow) + ds := NewHandlerImpl(mm, MagicLargestWindow) go ds.Handle(m) - m.recvChan <- &ab.DeliverUpdate{Type: &ab.DeliverUpdate_Seek{Seek: &ab.SeekInfo{WindowSize: uint64(MagicLargestWindow) * 2, Start: ab.SeekInfo_OLDEST}}} + m.recvChan <- &ab.DeliverUpdate{Type: &ab.DeliverUpdate_Seek{Seek: &ab.SeekInfo{WindowSize: uint64(MagicLargestWindow) * 2, Start: ab.SeekInfo_OLDEST, ChainID: systemChainID}}} select { case blockReply := <-m.sendChan: @@ -217,20 +266,19 @@ func TestBadWindow(t *testing.T) { } func TestAck(t *testing.T) { - ledgerSize := 10 + mm := newMockMultichainManager() windowSize := uint64(2) - _, rl := ramledger.New(ledgerSize, genesisBlock) for i := 1; i < ledgerSize; i++ { - rl.Append([]*cb.Envelope{&cb.Envelope{Payload: []byte(fmt.Sprintf("%d", i))}}, nil) + mm.chains[string(systemChainID)].ledger.Append([]*cb.Envelope{&cb.Envelope{Payload: []byte(fmt.Sprintf("%d", i))}}, nil) } m := newMockD() defer close(m.recvChan) - ds := NewHandlerImpl(rl, MagicLargestWindow) + ds := NewHandlerImpl(mm, MagicLargestWindow) go ds.Handle(m) - m.recvChan <- &ab.DeliverUpdate{Type: &ab.DeliverUpdate_Seek{Seek: &ab.SeekInfo{WindowSize: windowSize, Start: ab.SeekInfo_OLDEST}}} + m.recvChan <- &ab.DeliverUpdate{Type: &ab.DeliverUpdate_Seek{Seek: &ab.SeekInfo{WindowSize: windowSize, Start: ab.SeekInfo_OLDEST, ChainID: systemChainID}}} count := uint64(0) for { diff --git a/orderer/main.go b/orderer/main.go index 7be69ed7f23..e234cadf1b3 100644 --- a/orderer/main.go +++ b/orderer/main.go @@ -29,22 +29,16 @@ import ( "github.com/hyperledger/fabric/orderer/common/bootstrap" "github.com/hyperledger/fabric/orderer/common/bootstrap/static" - "github.com/hyperledger/fabric/orderer/common/broadcastfilter" - // "github.com/hyperledger/fabric/orderer/common/broadcastfilter/configfilter" - "github.com/hyperledger/fabric/orderer/common/configtx" - "github.com/hyperledger/fabric/orderer/common/policies" - "github.com/hyperledger/fabric/orderer/common/util" "github.com/hyperledger/fabric/orderer/config" "github.com/hyperledger/fabric/orderer/kafka" + "github.com/hyperledger/fabric/orderer/multichain" "github.com/hyperledger/fabric/orderer/rawledger" "github.com/hyperledger/fabric/orderer/rawledger/fileledger" "github.com/hyperledger/fabric/orderer/rawledger/ramledger" "github.com/hyperledger/fabric/orderer/solo" - cb "github.com/hyperledger/fabric/protos/common" ab "github.com/hyperledger/fabric/protos/orderer" "github.com/Shopify/sarama" - "github.com/golang/protobuf/proto" "github.com/op/go-logging" "google.golang.org/grpc" ) @@ -65,7 +59,7 @@ func main() { switch conf.General.OrdererType { case "solo": - launchSolo(conf) + launchGeneric(conf) case "kafka": launchKafka(conf) default: @@ -73,81 +67,11 @@ func main() { } } -// XXX This crypto helper is a stand in until we have a real crypto handler -// it considers all signatures to be valid -type xxxCryptoHelper struct{} - -func (xxx xxxCryptoHelper) VerifySignature(msg []byte, ids []byte, sigs []byte) bool { - return true -} - func init() { logging.SetLevel(logging.DEBUG, "") } -func retrieveConfiguration(rl rawledger.Reader) *cb.ConfigurationEnvelope { - var lastConfigTx *cb.ConfigurationEnvelope - - envelope := new(cb.Envelope) - payload := new(cb.Payload) - configurationEnvelope := new(cb.ConfigurationEnvelope) - - it, _ := rl.Iterator(ab.SeekInfo_OLDEST, 0) - // Iterate over the blockchain, looking for config transactions, track the most recent one encountered - // This will be the transaction which is returned - for { - select { - case <-it.ReadyChan(): - block, status := it.Next() - if status != cb.Status_SUCCESS { - panic(fmt.Errorf("Error parsing blockchain at startup: %v", status)) - } - if len(block.Data.Data) != 1 { - continue - } - envelope = util.ExtractEnvelopeOrPanic(block, 0) - payload = util.ExtractPayloadOrPanic(envelope) - if payload.Header.ChainHeader.Type != int32(cb.HeaderType_CONFIGURATION_TRANSACTION) { - continue - } - if err := proto.Unmarshal(payload.Data, configurationEnvelope); err == nil { - lastConfigTx = configurationEnvelope - } - default: - return lastConfigTx - } - } -} - -func bootstrapConfigManager(lastConfigTx *cb.ConfigurationEnvelope) configtx.Manager { - policyManager := policies.NewManagerImpl(xxxCryptoHelper{}) - configHandlerMap := make(map[cb.ConfigurationItem_ConfigurationType]configtx.Handler) - for ctype := range cb.ConfigurationItem_ConfigurationType_name { - rtype := cb.ConfigurationItem_ConfigurationType(ctype) - switch rtype { - case cb.ConfigurationItem_Policy: - configHandlerMap[rtype] = policyManager - default: - configHandlerMap[rtype] = configtx.NewBytesHandler() - } - } - - configManager, err := configtx.NewConfigurationManager(lastConfigTx, policyManager, configHandlerMap) - if err != nil { - panic(err) - } - return configManager -} - -func createBroadcastRuleset(configManager configtx.Manager) *broadcastfilter.RuleSet { - return broadcastfilter.NewRuleSet([]broadcastfilter.Rule{ - broadcastfilter.EmptyRejectRule, - // configfilter.New(configManager), - broadcastfilter.AcceptRule, - }) -} - -func launchSolo(conf *config.TopLevel) { +func launchGeneric(conf *config.TopLevel) { grpcServer := grpc.NewServer() lis, err := net.Listen("tcp", fmt.Sprintf("%s:%d", conf.General.ListenAddress, conf.General.ListenPort)) @@ -174,7 +98,7 @@ func launchSolo(conf *config.TopLevel) { // Stand in until real config ledgerType := os.Getenv("ORDERER_LEDGER_TYPE") - var rawledger rawledger.ReadWriter + var lf rawledger.Factory switch ledgerType { case "file": location := conf.FileLedger.Location @@ -186,36 +110,22 @@ func launchSolo(conf *config.TopLevel) { } } - _, rawledger = fileledger.New(location, genesisBlock) + lf, _ = fileledger.New(location, genesisBlock) case "ram": fallthrough default: - _, rawledger = ramledger.New(int(conf.RAMLedger.HistorySize), genesisBlock) + lf, _ = ramledger.New(int(conf.RAMLedger.HistorySize), genesisBlock) } - lastConfigTx := retrieveConfiguration(rawledger) - if lastConfigTx == nil { - panic("No chain configuration found") - } - - configManager := bootstrapConfigManager(lastConfigTx) - filters := createBroadcastRuleset(configManager) + consenters := make(map[string]multichain.Consenter) + consenters["solo"] = solo.New(conf.General.BatchTimeout) - soloConsenter := solo.NewConsenter( - int(conf.General.BatchSize), - conf.General.BatchTimeout, - rawledger, - filters, - configManager, - ) + manager := multichain.NewManagerImpl(lf, consenters) server := NewServer( - soloConsenter, - rawledger, + manager, int(conf.General.QueueSize), int(conf.General.MaxWindowSize), - filters, - configManager, ) ab.RegisterAtomicBroadcastServer(grpcServer, server) diff --git a/orderer/sbft/backend/backendab.go b/orderer/sbft/backend/backendab.go index 7c918453fa3..f824e69142f 100644 --- a/orderer/sbft/backend/backendab.go +++ b/orderer/sbft/backend/backendab.go @@ -17,21 +17,90 @@ limitations under the License. package backend import ( - "github.com/golang/protobuf/proto" + "bytes" + + "github.com/hyperledger/fabric/orderer/common/broadcastfilter" + "github.com/hyperledger/fabric/orderer/common/configtx" "github.com/hyperledger/fabric/orderer/common/deliver" + "github.com/hyperledger/fabric/orderer/common/policies" + "github.com/hyperledger/fabric/orderer/multichain" + "github.com/hyperledger/fabric/orderer/rawledger" cb "github.com/hyperledger/fabric/protos/common" ab "github.com/hyperledger/fabric/protos/orderer" + + "github.com/golang/protobuf/proto" ) +type xxxMultichain struct { + chainID []byte + chainSupport *xxxChainSupport +} + +func (xxx *xxxMultichain) GetChain(id []byte) (multichain.ChainSupport, bool) { + if !bytes.Equal(id, xxx.chainID) { + return nil, false + } + return xxx.chainSupport, true +} + +type xxxChainSupport struct { + reader rawledger.Reader +} + +func (xxx *xxxChainSupport) ConfigManager() configtx.Manager { + panic("Unimplemented") +} + +func (xxx *xxxChainSupport) PolicyManager() policies.Manager { + panic("Unimplemented") +} + +func (xxx *xxxChainSupport) Filters() *broadcastfilter.RuleSet { + panic("Unimplemented") +} + +func (xxx *xxxChainSupport) Reader() rawledger.Reader { + return xxx.reader +} + +func (xxx *xxxChainSupport) Chain() multichain.Chain { + panic("Unimplemented") +} + type BackendAB struct { backend *Backend deliverserver deliver.Handler } func NewBackendAB(backend *Backend) *BackendAB { + + // XXX All the code below is a hacky shim until sbft can be adapter to the new multichain interface + it, _ := backend.ledger.Iterator(ab.SeekInfo_OLDEST, 0) + block, status := it.Next() + if status != cb.Status_SUCCESS { + panic("Error getting a block from the ledger") + } + env := &cb.Envelope{} + err := proto.Unmarshal(block.Data.Data[0], env) + if err != nil { + panic(err) + } + + payload := &cb.Payload{} + err = proto.Unmarshal(env.Payload, payload) + if err != nil { + panic(err) + } + + manager := &xxxMultichain{ + chainID: payload.Header.ChainHeader.ChainID, + chainSupport: &xxxChainSupport{reader: backend.ledger}, + } + // XXX End hackiness + bab := &BackendAB{ backend: backend, - deliverserver: deliver.NewHandlerImpl(backend.ledger, 1000), + deliverserver: deliver.NewHandlerImpl(manager, 1000), } return bab } diff --git a/orderer/sbft/main.go b/orderer/sbft/main.go index 4ec5dbe9e73..5a3e7a5a74b 100644 --- a/orderer/sbft/main.go +++ b/orderer/sbft/main.go @@ -126,6 +126,7 @@ func serve(c flags) { if err != nil { panic(err) } + _, ledger := fileledger.New(c.dataDir, genesisBlock) s.backend, err = backend.NewBackend(config.Peers, conn, ledger, persist) if err != nil { diff --git a/orderer/sbft/sbft_test.go b/orderer/sbft/sbft_test.go index f986c3f72ac..38971c6079b 100644 --- a/orderer/sbft/sbft_test.go +++ b/orderer/sbft/sbft_test.go @@ -24,6 +24,7 @@ import ( "time" "github.com/golang/protobuf/proto" + "github.com/hyperledger/fabric/orderer/common/bootstrap/static" cb "github.com/hyperledger/fabric/protos/common" ab "github.com/hyperledger/fabric/protos/orderer" "github.com/op/go-logging" @@ -129,7 +130,7 @@ func updateReceiver(t *testing.T, resultch chan byte, errorch chan error, client errorch <- fmt.Errorf("Failed to get Deliver stream: %s", err) return } - dstream.Send(&ab.DeliverUpdate{Type: &ab.DeliverUpdate_Seek{Seek: &ab.SeekInfo{Start: ab.SeekInfo_NEWEST, WindowSize: 10}}}) + dstream.Send(&ab.DeliverUpdate{Type: &ab.DeliverUpdate_Seek{Seek: &ab.SeekInfo{Start: ab.SeekInfo_NEWEST, WindowSize: 10, ChainID: static.TestChainID}}}) logger.Info("{Update Receiver} Listening to ledger updates.") for i := 0; i < 2; i++ { m, inerr := dstream.Recv() diff --git a/orderer/server.go b/orderer/server.go index c52996fea10..9740dcefc58 100644 --- a/orderer/server.go +++ b/orderer/server.go @@ -18,10 +18,8 @@ package main import ( "github.com/hyperledger/fabric/orderer/common/broadcast" - "github.com/hyperledger/fabric/orderer/common/broadcastfilter" - "github.com/hyperledger/fabric/orderer/common/configtx" "github.com/hyperledger/fabric/orderer/common/deliver" - "github.com/hyperledger/fabric/orderer/rawledger" + "github.com/hyperledger/fabric/orderer/multichain" ab "github.com/hyperledger/fabric/protos/orderer" ) @@ -31,12 +29,12 @@ type server struct { } // NewServer creates a ab.AtomicBroadcastServer based on the broadcast target and ledger Reader -func NewServer(consenter broadcast.Target, rl rawledger.Reader, queueSize, maxWindowSize int, filters *broadcastfilter.RuleSet, configManager configtx.Manager) ab.AtomicBroadcastServer { - logger.Infof("Starting orderer with consenter=%T, and ledger=%T", consenter, rl) +func NewServer(ml multichain.Manager, queueSize, maxWindowSize int) ab.AtomicBroadcastServer { + logger.Infof("Starting orderer") s := &server{ - dh: deliver.NewHandlerImpl(rl, maxWindowSize), - bh: broadcast.NewHandlerImpl(queueSize, consenter, filters, configManager), + dh: deliver.NewHandlerImpl(ml, maxWindowSize), + bh: broadcast.NewHandlerImpl(queueSize, ml), } return s } diff --git a/orderer/solo/consensus.go b/orderer/solo/consensus.go index 81a31aa5df3..ba114a47351 100644 --- a/orderer/solo/consensus.go +++ b/orderer/solo/consensus.go @@ -20,8 +20,8 @@ import ( "time" "github.com/hyperledger/fabric/orderer/common/blockcutter" - "github.com/hyperledger/fabric/orderer/common/broadcastfilter" "github.com/hyperledger/fabric/orderer/common/configtx" + "github.com/hyperledger/fabric/orderer/multichain" "github.com/hyperledger/fabric/orderer/rawledger" cb "github.com/hyperledger/fabric/protos/common" "github.com/op/go-logging" @@ -35,56 +35,68 @@ func init() { type consenter struct { batchTimeout time.Duration - cutter blockcutter.Receiver +} + +type chain struct { + batchTimeout time.Duration rl rawledger.Writer + cutter blockcutter.Receiver sendChan chan *cb.Envelope exitChan chan struct{} } -func NewConsenter(batchSize int, batchTimeout time.Duration, rl rawledger.Writer, filters *broadcastfilter.RuleSet, configManager configtx.Manager) *consenter { - bs := newPlainConsenter(batchSize, batchTimeout, rl, filters, configManager) - go bs.main() - return bs +func New(batchTimeout time.Duration) multichain.Consenter { + return &consenter{ + // TODO, ultimately this should come from the configManager at HandleChain + batchTimeout: batchTimeout, + } +} + +func (solo *consenter) HandleChain(configManager configtx.Manager, cutter blockcutter.Receiver, rl rawledger.Writer, metadata []byte) multichain.Chain { + return newChain(solo.batchTimeout, configManager, cutter, rl, metadata) } -func newPlainConsenter(batchSize int, batchTimeout time.Duration, rl rawledger.Writer, filters *broadcastfilter.RuleSet, configManager configtx.Manager) *consenter { - bs := &consenter{ - cutter: blockcutter.NewReceiverImpl(batchSize, filters, configManager), +func newChain(batchTimeout time.Duration, configManager configtx.Manager, cutter blockcutter.Receiver, rl rawledger.Writer, metadata []byte) *chain { + return &chain{ batchTimeout: batchTimeout, rl: rl, + cutter: cutter, sendChan: make(chan *cb.Envelope), exitChan: make(chan struct{}), } - return bs } -func (bs *consenter) halt() { - close(bs.exitChan) +func (ch *chain) Start() { + go ch.main() +} + +func (ch *chain) Halt() { + close(ch.exitChan) } // Enqueue accepts a message and returns true on acceptance, or false on shutdown -func (bs *consenter) Enqueue(env *cb.Envelope) bool { +func (ch *chain) Enqueue(env *cb.Envelope) bool { select { - case bs.sendChan <- env: + case ch.sendChan <- env: return true - case <-bs.exitChan: + case <-ch.exitChan: return false } } -func (bs *consenter) main() { +func (ch *chain) main() { var timer <-chan time.Time for { select { - case msg := <-bs.sendChan: - batches, ok := bs.cutter.Ordered(msg) + case msg := <-ch.sendChan: + batches, ok := ch.cutter.Ordered(msg) if ok && len(batches) == 0 && timer == nil { - timer = time.After(bs.batchTimeout) + timer = time.After(ch.batchTimeout) continue } for _, batch := range batches { - bs.rl.Append(batch, nil) + ch.rl.Append(batch, nil) } if len(batches) > 0 { timer = nil @@ -93,14 +105,14 @@ func (bs *consenter) main() { //clear the timer timer = nil - batch := bs.cutter.Cut() + batch := ch.cutter.Cut() if len(batch) == 0 { logger.Warningf("Batch timer expired with no pending requests, this might indicate a bug") continue } logger.Debugf("Batch timer expired, creating block") - bs.rl.Append(batch, nil) - case <-bs.exitChan: + ch.rl.Append(batch, nil) + case <-ch.exitChan: logger.Debugf("Exiting") return } diff --git a/orderer/solo/consensus_test.go b/orderer/solo/consensus_test.go index 0d48ad483a2..260daf78b0a 100644 --- a/orderer/solo/consensus_test.go +++ b/orderer/solo/consensus_test.go @@ -24,6 +24,7 @@ import ( "google.golang.org/grpc" + "github.com/hyperledger/fabric/orderer/common/blockcutter" "github.com/hyperledger/fabric/orderer/common/bootstrap/static" "github.com/hyperledger/fabric/orderer/common/broadcastfilter" "github.com/hyperledger/fabric/orderer/common/configtx" @@ -70,14 +71,16 @@ func (mcf *mockConfigFilter) Apply(msg *cb.Envelope) broadcastfilter.Action { return broadcastfilter.Forward } -func getFiltersAndConfig() (*broadcastfilter.RuleSet, *mockConfigManager) { +func getEverything(batchSize int) (*mockConfigManager, blockcutter.Receiver, rawledger.ReadWriter) { cm := &mockConfigManager{} filters := broadcastfilter.NewRuleSet([]broadcastfilter.Rule{ broadcastfilter.EmptyRejectRule, &mockConfigFilter{cm}, broadcastfilter.AcceptRule, }) - return filters, cm + cutter := blockcutter.NewReceiverImpl(batchSize, filters, cm) + _, rl := ramledger.New(10, genesisBlock) + return cm, cutter, rl } @@ -126,20 +129,19 @@ func (m *mockB) Recv() (*cb.Envelope, error) { } func TestEmptyBatch(t *testing.T) { - filters, cm := getFiltersAndConfig() - _, rl := ramledger.New(10, genesisBlock) - bs := newPlainConsenter(1, time.Millisecond, rl, filters, cm) + cm, cutter, rl := getEverything(1) + bs := newChain(time.Millisecond, cm, cutter, rl, nil) if bs.rl.(rawledger.Reader).Height() != 1 { t.Fatalf("Expected no new blocks created") } } func TestBatchTimer(t *testing.T) { - filters, cm := getFiltersAndConfig() batchSize := 2 - _, rl := ramledger.New(10, genesisBlock) - bs := NewConsenter(batchSize, time.Millisecond, rl, filters, cm) - defer bs.halt() + cm, cutter, rl := getEverything(batchSize) + bs := newChain(time.Millisecond, cm, cutter, rl, nil) + bs.Start() + defer bs.Halt() it, _ := rl.Iterator(ab.SeekInfo_SPECIFIED, 1) bs.sendChan <- &cb.Envelope{Payload: []byte("Some bytes")} @@ -160,11 +162,11 @@ func TestBatchTimer(t *testing.T) { } func TestBatchTimerHaltOnFilledBatch(t *testing.T) { - filters, cm := getFiltersAndConfig() batchSize := 2 - _, rl := ramledger.New(10, genesisBlock) - bs := NewConsenter(batchSize, time.Hour, rl, filters, cm) - defer bs.halt() + cm, cutter, rl := getEverything(batchSize) + bs := newChain(time.Millisecond, cm, cutter, rl, nil) + bs.Start() + defer bs.Halt() it, _ := rl.Iterator(ab.SeekInfo_SPECIFIED, 1) bs.sendChan <- &cb.Envelope{Payload: []byte("Some bytes")} @@ -189,11 +191,10 @@ func TestBatchTimerHaltOnFilledBatch(t *testing.T) { } func TestFilledBatch(t *testing.T) { - filters, cm := getFiltersAndConfig() batchSize := 2 + cm, cutter, rl := getEverything(batchSize) + bs := newChain(time.Hour, cm, cutter, rl, nil) messages := 10 - _, rl := ramledger.New(10, genesisBlock) - bs := newPlainConsenter(batchSize, time.Hour, rl, filters, cm) done := make(chan struct{}) go func() { bs.main() @@ -202,7 +203,7 @@ func TestFilledBatch(t *testing.T) { for i := 0; i < messages; i++ { bs.sendChan <- &cb.Envelope{Payload: []byte("Some bytes")} } - bs.halt() + bs.Halt() <-done expected := uint64(1 + messages/batchSize) if bs.rl.(rawledger.Reader).Height() != expected { @@ -211,10 +212,9 @@ func TestFilledBatch(t *testing.T) { } func TestReconfigureGoodPath(t *testing.T) { - filters, cm := getFiltersAndConfig() batchSize := 2 - _, rl := ramledger.New(10, genesisBlock) - bs := newPlainConsenter(batchSize, time.Hour, rl, filters, cm) + cm, cutter, rl := getEverything(batchSize) + bs := newChain(time.Hour, cm, cutter, rl, nil) done := make(chan struct{}) go func() { bs.main() @@ -226,7 +226,7 @@ func TestReconfigureGoodPath(t *testing.T) { bs.sendChan <- &cb.Envelope{Payload: []byte("Msg2")} bs.sendChan <- &cb.Envelope{Payload: []byte("Msg3")} - bs.halt() + bs.Halt() <-done expected := uint64(4) if bs.rl.(rawledger.Reader).Height() != expected {