diff --git a/orderer/common/blockcutter/blockcutter.go b/orderer/common/blockcutter/blockcutter.go index 310a9c0b064..e377c277d79 100644 --- a/orderer/common/blockcutter/blockcutter.go +++ b/orderer/common/blockcutter/blockcutter.go @@ -31,7 +31,7 @@ func init() { logging.SetLevel(logging.DEBUG, "") } -// Target defines a sink for the ordered broadcast messages +// Receiver defines a sink for the ordered broadcast messages type Receiver interface { // Ordered should be invoked sequentially as messages are ordered // If the message is a valid normal message and does not fill the batch, nil, true is returned diff --git a/orderer/common/broadcast/broadcast.go b/orderer/common/broadcast/broadcast.go index 57244b618d2..82f4f35b00f 100644 --- a/orderer/common/broadcast/broadcast.go +++ b/orderer/common/broadcast/broadcast.go @@ -18,10 +18,11 @@ package broadcast import ( "github.com/hyperledger/fabric/orderer/common/broadcastfilter" - "github.com/hyperledger/fabric/orderer/common/configtx" + "github.com/hyperledger/fabric/orderer/multichain" cb "github.com/hyperledger/fabric/protos/common" ab "github.com/hyperledger/fabric/protos/orderer" + "github.com/golang/protobuf/proto" "github.com/op/go-logging" ) @@ -31,12 +32,6 @@ func init() { logging.SetLevel(logging.DEBUG, "") } -// Target defines an interface which the broadcast handler will direct broadcasts to -type Target interface { - // Enqueue accepts a message and returns true on acceptance, or false on shutdown - Enqueue(env *cb.Envelope) bool -} - // Handler defines an interface which handles broadcasts type Handler interface { // Handle starts a service thread for a given gRPC connection and services the broadcast connection @@ -44,21 +39,17 @@ type Handler interface { } type handlerImpl struct { - queueSize int - target Target - filters *broadcastfilter.RuleSet - configManager configtx.Manager - exitChan chan struct{} + queueSize int + ml multichain.Manager + exitChan chan struct{} } // NewHandlerImpl constructs a new implementation of the Handler interface -func NewHandlerImpl(queueSize int, target Target, filters *broadcastfilter.RuleSet, configManager configtx.Manager) Handler { +func NewHandlerImpl(queueSize int, ml multichain.Manager) Handler { return &handlerImpl{ - queueSize: queueSize, - filters: filters, - configManager: configManager, - target: target, - exitChan: make(chan struct{}), + queueSize: queueSize, + ml: ml, + exitChan: make(chan struct{}), } } @@ -70,15 +61,20 @@ func (bh *handlerImpl) Handle(srv ab.AtomicBroadcast_BroadcastServer) error { return b.queueEnvelopes(srv) } +type msgAndChainSupport struct { + msg *cb.Envelope + chainSupport multichain.ChainSupport +} + type broadcaster struct { bs *handlerImpl - queue chan *cb.Envelope + queue chan *msgAndChainSupport } func newBroadcaster(bs *handlerImpl) *broadcaster { b := &broadcaster{ bs: bs, - queue: make(chan *cb.Envelope, bs.queueSize), + queue: make(chan *msgAndChainSupport, bs.queueSize), } return b } @@ -86,9 +82,9 @@ func newBroadcaster(bs *handlerImpl) *broadcaster { func (b *broadcaster) drainQueue() { for { select { - case msg, ok := <-b.queue: + case msgAndChainSupport, ok := <-b.queue: if ok { - if !b.bs.target.Enqueue(msg) { + if !msgAndChainSupport.chainSupport.Chain().Enqueue(msgAndChainSupport.msg) { return } } else { @@ -108,14 +104,27 @@ func (b *broadcaster) queueEnvelopes(srv ab.AtomicBroadcast_BroadcastServer) err return err } - action, _ := b.bs.filters.Apply(msg) + payload := &cb.Payload{} + err = proto.Unmarshal(msg.Payload, payload) + if payload.Header == nil || payload.Header.ChainHeader == nil || payload.Header.ChainHeader.ChainID == nil { + logger.Debugf("Received malformed message, dropping connection") + return srv.Send(&ab.BroadcastResponse{Status: cb.Status_BAD_REQUEST}) + } + + chainSupport, ok := b.bs.ml.GetChain(payload.Header.ChainHeader.ChainID) + if !ok { + // XXX Hook in chain creation logic here + panic("Unimplemented") + } + + action, _ := chainSupport.Filters().Apply(msg) switch action { case broadcastfilter.Reconfigure: fallthrough case broadcastfilter.Accept: select { - case b.queue <- msg: + case b.queue <- &msgAndChainSupport{msg: msg, chainSupport: chainSupport}: err = srv.Send(&ab.BroadcastResponse{Status: cb.Status_SUCCESS}) default: err = srv.Send(&ab.BroadcastResponse{Status: cb.Status_SERVICE_UNAVAILABLE}) diff --git a/orderer/common/broadcast/broadcast_test.go b/orderer/common/broadcast/broadcast_test.go index a827e986ce0..88265214e39 100644 --- a/orderer/common/broadcast/broadcast_test.go +++ b/orderer/common/broadcast/broadcast_test.go @@ -21,15 +21,20 @@ import ( "fmt" "testing" - "google.golang.org/grpc" - "github.com/golang/protobuf/proto" "github.com/hyperledger/fabric/orderer/common/broadcastfilter" "github.com/hyperledger/fabric/orderer/common/configtx" + "github.com/hyperledger/fabric/orderer/common/policies" + "github.com/hyperledger/fabric/orderer/multichain" + "github.com/hyperledger/fabric/orderer/rawledger" cb "github.com/hyperledger/fabric/protos/common" ab "github.com/hyperledger/fabric/protos/orderer" + + "google.golang.org/grpc" ) +var systemChain = []byte("systemChain") + var configTx []byte func init() { @@ -42,9 +47,7 @@ func init() { type mockConfigManager struct { validated bool - applied bool validateErr error - applyErr error } func (mcm *mockConfigManager) Validate(configtx *cb.ConfigurationEnvelope) error { @@ -53,8 +56,7 @@ func (mcm *mockConfigManager) Validate(configtx *cb.ConfigurationEnvelope) error } func (mcm *mockConfigManager) Apply(message *cb.ConfigurationEnvelope) error { - mcm.applied = true - return mcm.applyErr + panic("Unimplemented") } func (mcm *mockConfigManager) ChainID() []byte { @@ -66,7 +68,12 @@ type mockConfigFilter struct { } func (mcf *mockConfigFilter) Apply(msg *cb.Envelope) broadcastfilter.Action { - if bytes.Equal(msg.Payload, configTx) { + payload := &cb.Payload{} + err := proto.Unmarshal(msg.Payload, payload) + if err != nil { + panic(err) + } + if bytes.Equal(payload.Data, configTx) { if mcf.manager == nil || mcf.manager.Validate(nil) != nil { return broadcastfilter.Reject } @@ -75,24 +82,6 @@ func (mcf *mockConfigFilter) Apply(msg *cb.Envelope) broadcastfilter.Action { return broadcastfilter.Forward } -type mockTarget struct { - queue chan *cb.Envelope - done bool -} - -func (mt *mockTarget) Enqueue(env *cb.Envelope) bool { - mt.queue <- env - return !mt.done -} - -func (mt *mockTarget) halt() { - mt.done = true - select { - case <-mt.queue: - default: - } -} - type mockB struct { grpc.ServerStream recvChan chan *cb.Envelope @@ -119,36 +108,125 @@ func (m *mockB) Recv() (*cb.Envelope, error) { return msg, nil } -func getFiltersConfigMockTarget() (*broadcastfilter.RuleSet, *mockConfigManager, *mockTarget) { +type mockMultichainManager struct { + chains map[string]*mockChainSupport +} + +func (mm *mockMultichainManager) GetChain(chainID []byte) (multichain.ChainSupport, bool) { + chain, ok := mm.chains[string(chainID)] + return chain, ok +} + +func (mm *mockMultichainManager) halt() { + for _, chain := range mm.chains { + chain.mockChain.Halt() + } +} + +type mockChainSupport struct { + configManager *mockConfigManager + filters *broadcastfilter.RuleSet + mockChain *mockChain +} + +func (mcs *mockChainSupport) ConfigManager() configtx.Manager { + return mcs.configManager +} + +func (mcs *mockChainSupport) PolicyManager() policies.Manager { + panic("Unimplemented") +} + +func (mcs *mockChainSupport) Filters() *broadcastfilter.RuleSet { + return mcs.filters +} + +func (mcs *mockChainSupport) Reader() rawledger.Reader { + panic("Unimplemented") +} + +func (mcs *mockChainSupport) Chain() multichain.Chain { + return mcs.mockChain +} + +type mockChain struct { + queue chan *cb.Envelope + done bool +} + +func (mc *mockChain) Enqueue(env *cb.Envelope) bool { + mc.queue <- env + return !mc.done +} + +func (mc *mockChain) Start() { + panic("Unimplemented") +} + +func (mc *mockChain) Halt() { + mc.done = true + select { + case <-mc.queue: + default: + } +} + +func makeMessage(chainID []byte, data []byte) *cb.Envelope { + payload := &cb.Payload{ + Data: data, + Header: &cb.Header{ + ChainHeader: &cb.ChainHeader{ + ChainID: chainID, + }, + }, + } + data, err := proto.Marshal(payload) + if err != nil { + panic(err) + } + env := &cb.Envelope{ + Payload: data, + } + return env +} + +func getMultichainManager() *mockMultichainManager { cm := &mockConfigManager{} filters := broadcastfilter.NewRuleSet([]broadcastfilter.Rule{ broadcastfilter.EmptyRejectRule, &mockConfigFilter{cm}, broadcastfilter.AcceptRule, }) - mt := &mockTarget{queue: make(chan *cb.Envelope)} - return filters, cm, mt - + mc := &mockChain{queue: make(chan *cb.Envelope)} + mm := &mockMultichainManager{ + chains: make(map[string]*mockChainSupport), + } + mm.chains[string(systemChain)] = &mockChainSupport{ + filters: filters, + configManager: cm, + mockChain: mc, + } + return mm } func TestQueueOverflow(t *testing.T) { - filters, cm, mt := getFiltersConfigMockTarget() - defer mt.halt() - bh := NewHandlerImpl(2, mt, filters, cm) + mm := getMultichainManager() + defer mm.halt() + bh := NewHandlerImpl(2, mm) m := newMockB() defer close(m.recvChan) b := newBroadcaster(bh.(*handlerImpl)) go b.queueEnvelopes(m) for i := 0; i < 2; i++ { - m.recvChan <- &cb.Envelope{Payload: []byte("Some bytes")} + m.recvChan <- makeMessage(systemChain, []byte("Some bytes")) reply := <-m.sendChan if reply.Status != cb.Status_SUCCESS { t.Fatalf("Should have successfully queued the message") } } - m.recvChan <- &cb.Envelope{Payload: []byte("Some bytes")} + m.recvChan <- makeMessage(systemChain, []byte("Some bytes")) reply := <-m.sendChan if reply.Status != cb.Status_SERVICE_UNAVAILABLE { t.Fatalf("Should not have successfully queued the message") @@ -157,9 +235,9 @@ func TestQueueOverflow(t *testing.T) { } func TestMultiQueueOverflow(t *testing.T) { - filters, cm, mt := getFiltersConfigMockTarget() - defer mt.halt() - bh := NewHandlerImpl(2, mt, filters, cm) + mm := getMultichainManager() + defer mm.halt() + bh := NewHandlerImpl(2, mm) ms := []*mockB{newMockB(), newMockB(), newMockB()} for _, m := range ms { @@ -170,7 +248,7 @@ func TestMultiQueueOverflow(t *testing.T) { for _, m := range ms { for i := 0; i < 2; i++ { - m.recvChan <- &cb.Envelope{Payload: []byte("Some bytes")} + m.recvChan <- makeMessage(systemChain, []byte("Some bytes")) reply := <-m.sendChan if reply.Status != cb.Status_SUCCESS { t.Fatalf("Should have successfully queued the message") @@ -179,7 +257,7 @@ func TestMultiQueueOverflow(t *testing.T) { } for _, m := range ms { - m.recvChan <- &cb.Envelope{Payload: []byte("Some bytes")} + m.recvChan <- makeMessage(systemChain, []byte("Some bytes")) reply := <-m.sendChan if reply.Status != cb.Status_SERVICE_UNAVAILABLE { t.Fatalf("Should not have successfully queued the message") @@ -188,9 +266,9 @@ func TestMultiQueueOverflow(t *testing.T) { } func TestEmptyEnvelope(t *testing.T) { - filters, cm, mt := getFiltersConfigMockTarget() - defer mt.halt() - bh := NewHandlerImpl(2, mt, filters, cm) + mm := getMultichainManager() + defer mm.halt() + bh := NewHandlerImpl(2, mm) m := newMockB() defer close(m.recvChan) go bh.Handle(m) @@ -204,35 +282,35 @@ func TestEmptyEnvelope(t *testing.T) { } func TestReconfigureAccept(t *testing.T) { - filters, cm, mt := getFiltersConfigMockTarget() - defer mt.halt() - bh := NewHandlerImpl(2, mt, filters, cm) + mm := getMultichainManager() + defer mm.halt() + bh := NewHandlerImpl(2, mm) m := newMockB() defer close(m.recvChan) go bh.Handle(m) - m.recvChan <- &cb.Envelope{Payload: configTx} + m.recvChan <- makeMessage(systemChain, configTx) reply := <-m.sendChan if reply.Status != cb.Status_SUCCESS { t.Fatalf("Should have successfully queued the message") } - if !cm.validated { + if !mm.chains[string(systemChain)].configManager.validated { t.Errorf("ConfigTx should have been validated before processing") } } func TestReconfigureReject(t *testing.T) { - filters, cm, mt := getFiltersConfigMockTarget() - cm.validateErr = fmt.Errorf("Fail to validate") - defer mt.halt() - bh := NewHandlerImpl(2, mt, filters, cm) + mm := getMultichainManager() + mm.chains[string(systemChain)].configManager.validateErr = fmt.Errorf("Fail to validate") + defer mm.halt() + bh := NewHandlerImpl(2, mm) m := newMockB() defer close(m.recvChan) go bh.Handle(m) - m.recvChan <- &cb.Envelope{Payload: configTx} + m.recvChan <- makeMessage(systemChain, configTx) reply := <-m.sendChan if reply.Status != cb.Status_BAD_REQUEST { diff --git a/orderer/common/deliver/deliver.go b/orderer/common/deliver/deliver.go index 7eb87511f5f..faf660560a0 100644 --- a/orderer/common/deliver/deliver.go +++ b/orderer/common/deliver/deliver.go @@ -17,6 +17,7 @@ limitations under the License. package deliver import ( + "github.com/hyperledger/fabric/orderer/multichain" "github.com/hyperledger/fabric/orderer/rawledger" cb "github.com/hyperledger/fabric/protos/common" ab "github.com/hyperledger/fabric/protos/orderer" @@ -35,13 +36,13 @@ type Handler interface { } type DeliverServer struct { - rl rawledger.Reader + ml multichain.Manager maxWindow int } -func NewHandlerImpl(rl rawledger.Reader, maxWindow int) Handler { +func NewHandlerImpl(ml multichain.Manager, maxWindow int) Handler { return &DeliverServer{ - rl: rl, + ml: ml, maxWindow: maxWindow, } } @@ -185,14 +186,21 @@ func (d *deliverer) processUpdate(update *ab.SeekInfo) bool { } logger.Debugf("Updating properties for client") - if update == nil || update.WindowSize == 0 || update.WindowSize > uint64(d.ds.maxWindow) { + if update == nil || update.WindowSize == 0 || update.WindowSize > uint64(d.ds.maxWindow) || update.ChainID == nil { close(d.exitChan) return d.sendErrorReply(cb.Status_BAD_REQUEST) } + chain, ok := d.ds.ml.GetChain(update.ChainID) + if !ok { + return d.sendErrorReply(cb.Status_NOT_FOUND) + } + + // XXX add deliver authorization checking + d.windowSize = update.WindowSize - d.cursor, d.nextBlockNumber = d.ds.rl.Iterator(update.Start, update.SpecifiedNumber) + d.cursor, d.nextBlockNumber = chain.Reader().Iterator(update.Start, update.SpecifiedNumber) d.lastAck = d.nextBlockNumber - 1 return true diff --git a/orderer/common/deliver/deliver_test.go b/orderer/common/deliver/deliver_test.go index 991fbaf9347..0fd1d00944a 100644 --- a/orderer/common/deliver/deliver_test.go +++ b/orderer/common/deliver/deliver_test.go @@ -22,6 +22,11 @@ import ( "time" "github.com/hyperledger/fabric/orderer/common/bootstrap/static" + "github.com/hyperledger/fabric/orderer/common/broadcastfilter" + "github.com/hyperledger/fabric/orderer/common/configtx" + "github.com/hyperledger/fabric/orderer/common/policies" + "github.com/hyperledger/fabric/orderer/multichain" + "github.com/hyperledger/fabric/orderer/rawledger" "github.com/hyperledger/fabric/orderer/rawledger/ramledger" cb "github.com/hyperledger/fabric/protos/common" ab "github.com/hyperledger/fabric/protos/orderer" @@ -31,6 +36,10 @@ import ( var genesisBlock *cb.Block +var systemChainID = []byte("systemChain") + +const ledgerSize = 10 + func init() { bootstrapper := static.New() var err error @@ -69,20 +78,63 @@ func (m *mockD) Recv() (*ab.DeliverUpdate, error) { return msg, nil } -func TestOldestSeek(t *testing.T) { - ledgerSize := 5 +type mockMultichainManager struct { + chains map[string]*mockChainSupport +} + +func (mm *mockMultichainManager) GetChain(chainID []byte) (multichain.ChainSupport, bool) { + cs, ok := mm.chains[string(chainID)] + return cs, ok +} + +type mockChainSupport struct { + ledger rawledger.ReadWriter +} + +func (mcs *mockChainSupport) ConfigManager() configtx.Manager { + panic("Unimplemented") +} + +func (mcs *mockChainSupport) PolicyManager() policies.Manager { + panic("Unimplemented") +} + +func (mcs *mockChainSupport) Filters() *broadcastfilter.RuleSet { + panic("Unimplemented") +} + +func (mcs *mockChainSupport) Reader() rawledger.Reader { + return mcs.ledger +} + +func (mcs *mockChainSupport) Chain() multichain.Chain { + panic("Unimplemented") +} + +func newMockMultichainManager() *mockMultichainManager { _, rl := ramledger.New(ledgerSize, genesisBlock) + mm := &mockMultichainManager{ + chains: make(map[string]*mockChainSupport), + } + mm.chains[string(systemChainID)] = &mockChainSupport{ + ledger: rl, + } + return mm +} + +func TestOldestSeek(t *testing.T) { + mm := newMockMultichainManager() for i := 1; i < ledgerSize; i++ { - rl.Append([]*cb.Envelope{&cb.Envelope{Payload: []byte(fmt.Sprintf("%d", i))}}, nil) + mm.chains[string(systemChainID)].ledger.Append([]*cb.Envelope{&cb.Envelope{Payload: []byte(fmt.Sprintf("%d", i))}}, nil) } m := newMockD() defer close(m.recvChan) - ds := NewHandlerImpl(rl, MagicLargestWindow) + ds := NewHandlerImpl(mm, MagicLargestWindow) go ds.Handle(m) - m.recvChan <- &ab.DeliverUpdate{Type: &ab.DeliverUpdate_Seek{Seek: &ab.SeekInfo{WindowSize: uint64(MagicLargestWindow), Start: ab.SeekInfo_OLDEST}}} + m.recvChan <- &ab.DeliverUpdate{Type: &ab.DeliverUpdate_Seek{Seek: &ab.SeekInfo{WindowSize: uint64(MagicLargestWindow), Start: ab.SeekInfo_OLDEST, ChainID: systemChainID}}} count := 0 for { @@ -102,19 +154,18 @@ func TestOldestSeek(t *testing.T) { } func TestNewestSeek(t *testing.T) { - ledgerSize := 5 - _, rl := ramledger.New(ledgerSize, genesisBlock) + mm := newMockMultichainManager() for i := 1; i < ledgerSize; i++ { - rl.Append([]*cb.Envelope{&cb.Envelope{Payload: []byte(fmt.Sprintf("%d", i))}}, nil) + mm.chains[string(systemChainID)].ledger.Append([]*cb.Envelope{&cb.Envelope{Payload: []byte(fmt.Sprintf("%d", i))}}, nil) } m := newMockD() defer close(m.recvChan) - ds := NewHandlerImpl(rl, MagicLargestWindow) + ds := NewHandlerImpl(mm, MagicLargestWindow) go ds.Handle(m) - m.recvChan <- &ab.DeliverUpdate{Type: &ab.DeliverUpdate_Seek{Seek: &ab.SeekInfo{WindowSize: uint64(MagicLargestWindow), Start: ab.SeekInfo_NEWEST}}} + m.recvChan <- &ab.DeliverUpdate{Type: &ab.DeliverUpdate_Seek{Seek: &ab.SeekInfo{WindowSize: uint64(MagicLargestWindow), Start: ab.SeekInfo_NEWEST, ChainID: systemChainID}}} select { case blockReply := <-m.sendChan: @@ -131,18 +182,18 @@ func TestNewestSeek(t *testing.T) { } func TestSpecificSeek(t *testing.T) { - ledgerSize := 5 - _, rl := ramledger.New(ledgerSize, genesisBlock) + mm := newMockMultichainManager() for i := 1; i < ledgerSize; i++ { - rl.Append([]*cb.Envelope{&cb.Envelope{Payload: []byte(fmt.Sprintf("%d", i))}}, nil) + mm.chains[string(systemChainID)].ledger.Append([]*cb.Envelope{&cb.Envelope{Payload: []byte(fmt.Sprintf("%d", i))}}, nil) } m := newMockD() - ds := NewHandlerImpl(rl, MagicLargestWindow) + defer close(m.recvChan) + ds := NewHandlerImpl(mm, MagicLargestWindow) go ds.Handle(m) - m.recvChan <- &ab.DeliverUpdate{Type: &ab.DeliverUpdate_Seek{Seek: &ab.SeekInfo{WindowSize: uint64(MagicLargestWindow), Start: ab.SeekInfo_SPECIFIED, SpecifiedNumber: uint64(ledgerSize - 1)}}} + m.recvChan <- &ab.DeliverUpdate{Type: &ab.DeliverUpdate_Seek{Seek: &ab.SeekInfo{WindowSize: uint64(MagicLargestWindow), Start: ab.SeekInfo_NEWEST, ChainID: systemChainID}}} select { case blockReply := <-m.sendChan: @@ -151,7 +202,7 @@ func TestSpecificSeek(t *testing.T) { } if blockReply.GetBlock().Header.Number != uint64(ledgerSize-1) { - t.Fatalf("Expected only to get block 4") + t.Fatalf("Expected only the most recent block") } case <-time.After(time.Second): t.Fatalf("Timed out waiting to get all blocks") @@ -159,19 +210,18 @@ func TestSpecificSeek(t *testing.T) { } func TestBadSeek(t *testing.T) { - ledgerSize := 5 - _, rl := ramledger.New(ledgerSize, genesisBlock) + mm := newMockMultichainManager() for i := 1; i < 2*ledgerSize; i++ { - rl.Append([]*cb.Envelope{&cb.Envelope{Payload: []byte(fmt.Sprintf("%d", i))}}, nil) + mm.chains[string(systemChainID)].ledger.Append([]*cb.Envelope{&cb.Envelope{Payload: []byte(fmt.Sprintf("%d", i))}}, nil) } m := newMockD() defer close(m.recvChan) - ds := NewHandlerImpl(rl, MagicLargestWindow) + ds := NewHandlerImpl(mm, MagicLargestWindow) go ds.Handle(m) - m.recvChan <- &ab.DeliverUpdate{Type: &ab.DeliverUpdate_Seek{Seek: &ab.SeekInfo{WindowSize: uint64(MagicLargestWindow), Start: ab.SeekInfo_SPECIFIED, SpecifiedNumber: uint64(ledgerSize - 1)}}} + m.recvChan <- &ab.DeliverUpdate{Type: &ab.DeliverUpdate_Seek{Seek: &ab.SeekInfo{WindowSize: uint64(MagicLargestWindow), Start: ab.SeekInfo_SPECIFIED, SpecifiedNumber: uint64(ledgerSize - 1), ChainID: systemChainID}}} select { case blockReply := <-m.sendChan: @@ -182,7 +232,7 @@ func TestBadSeek(t *testing.T) { t.Fatalf("Timed out waiting to get all blocks") } - m.recvChan <- &ab.DeliverUpdate{Type: &ab.DeliverUpdate_Seek{Seek: &ab.SeekInfo{WindowSize: uint64(MagicLargestWindow), Start: ab.SeekInfo_SPECIFIED, SpecifiedNumber: uint64(3 * ledgerSize)}}} + m.recvChan <- &ab.DeliverUpdate{Type: &ab.DeliverUpdate_Seek{Seek: &ab.SeekInfo{WindowSize: uint64(MagicLargestWindow), Start: ab.SeekInfo_SPECIFIED, SpecifiedNumber: uint64(3 * ledgerSize), ChainID: systemChainID}}} select { case blockReply := <-m.sendChan: @@ -195,16 +245,15 @@ func TestBadSeek(t *testing.T) { } func TestBadWindow(t *testing.T) { - ledgerSize := 5 - _, rl := ramledger.New(ledgerSize, genesisBlock) + mm := newMockMultichainManager() m := newMockD() defer close(m.recvChan) - ds := NewHandlerImpl(rl, MagicLargestWindow) + ds := NewHandlerImpl(mm, MagicLargestWindow) go ds.Handle(m) - m.recvChan <- &ab.DeliverUpdate{Type: &ab.DeliverUpdate_Seek{Seek: &ab.SeekInfo{WindowSize: uint64(MagicLargestWindow) * 2, Start: ab.SeekInfo_OLDEST}}} + m.recvChan <- &ab.DeliverUpdate{Type: &ab.DeliverUpdate_Seek{Seek: &ab.SeekInfo{WindowSize: uint64(MagicLargestWindow) * 2, Start: ab.SeekInfo_OLDEST, ChainID: systemChainID}}} select { case blockReply := <-m.sendChan: @@ -217,20 +266,19 @@ func TestBadWindow(t *testing.T) { } func TestAck(t *testing.T) { - ledgerSize := 10 + mm := newMockMultichainManager() windowSize := uint64(2) - _, rl := ramledger.New(ledgerSize, genesisBlock) for i := 1; i < ledgerSize; i++ { - rl.Append([]*cb.Envelope{&cb.Envelope{Payload: []byte(fmt.Sprintf("%d", i))}}, nil) + mm.chains[string(systemChainID)].ledger.Append([]*cb.Envelope{&cb.Envelope{Payload: []byte(fmt.Sprintf("%d", i))}}, nil) } m := newMockD() defer close(m.recvChan) - ds := NewHandlerImpl(rl, MagicLargestWindow) + ds := NewHandlerImpl(mm, MagicLargestWindow) go ds.Handle(m) - m.recvChan <- &ab.DeliverUpdate{Type: &ab.DeliverUpdate_Seek{Seek: &ab.SeekInfo{WindowSize: windowSize, Start: ab.SeekInfo_OLDEST}}} + m.recvChan <- &ab.DeliverUpdate{Type: &ab.DeliverUpdate_Seek{Seek: &ab.SeekInfo{WindowSize: windowSize, Start: ab.SeekInfo_OLDEST, ChainID: systemChainID}}} count := uint64(0) for { diff --git a/orderer/main.go b/orderer/main.go index 7be69ed7f23..e234cadf1b3 100644 --- a/orderer/main.go +++ b/orderer/main.go @@ -29,22 +29,16 @@ import ( "github.com/hyperledger/fabric/orderer/common/bootstrap" "github.com/hyperledger/fabric/orderer/common/bootstrap/static" - "github.com/hyperledger/fabric/orderer/common/broadcastfilter" - // "github.com/hyperledger/fabric/orderer/common/broadcastfilter/configfilter" - "github.com/hyperledger/fabric/orderer/common/configtx" - "github.com/hyperledger/fabric/orderer/common/policies" - "github.com/hyperledger/fabric/orderer/common/util" "github.com/hyperledger/fabric/orderer/config" "github.com/hyperledger/fabric/orderer/kafka" + "github.com/hyperledger/fabric/orderer/multichain" "github.com/hyperledger/fabric/orderer/rawledger" "github.com/hyperledger/fabric/orderer/rawledger/fileledger" "github.com/hyperledger/fabric/orderer/rawledger/ramledger" "github.com/hyperledger/fabric/orderer/solo" - cb "github.com/hyperledger/fabric/protos/common" ab "github.com/hyperledger/fabric/protos/orderer" "github.com/Shopify/sarama" - "github.com/golang/protobuf/proto" "github.com/op/go-logging" "google.golang.org/grpc" ) @@ -65,7 +59,7 @@ func main() { switch conf.General.OrdererType { case "solo": - launchSolo(conf) + launchGeneric(conf) case "kafka": launchKafka(conf) default: @@ -73,81 +67,11 @@ func main() { } } -// XXX This crypto helper is a stand in until we have a real crypto handler -// it considers all signatures to be valid -type xxxCryptoHelper struct{} - -func (xxx xxxCryptoHelper) VerifySignature(msg []byte, ids []byte, sigs []byte) bool { - return true -} - func init() { logging.SetLevel(logging.DEBUG, "") } -func retrieveConfiguration(rl rawledger.Reader) *cb.ConfigurationEnvelope { - var lastConfigTx *cb.ConfigurationEnvelope - - envelope := new(cb.Envelope) - payload := new(cb.Payload) - configurationEnvelope := new(cb.ConfigurationEnvelope) - - it, _ := rl.Iterator(ab.SeekInfo_OLDEST, 0) - // Iterate over the blockchain, looking for config transactions, track the most recent one encountered - // This will be the transaction which is returned - for { - select { - case <-it.ReadyChan(): - block, status := it.Next() - if status != cb.Status_SUCCESS { - panic(fmt.Errorf("Error parsing blockchain at startup: %v", status)) - } - if len(block.Data.Data) != 1 { - continue - } - envelope = util.ExtractEnvelopeOrPanic(block, 0) - payload = util.ExtractPayloadOrPanic(envelope) - if payload.Header.ChainHeader.Type != int32(cb.HeaderType_CONFIGURATION_TRANSACTION) { - continue - } - if err := proto.Unmarshal(payload.Data, configurationEnvelope); err == nil { - lastConfigTx = configurationEnvelope - } - default: - return lastConfigTx - } - } -} - -func bootstrapConfigManager(lastConfigTx *cb.ConfigurationEnvelope) configtx.Manager { - policyManager := policies.NewManagerImpl(xxxCryptoHelper{}) - configHandlerMap := make(map[cb.ConfigurationItem_ConfigurationType]configtx.Handler) - for ctype := range cb.ConfigurationItem_ConfigurationType_name { - rtype := cb.ConfigurationItem_ConfigurationType(ctype) - switch rtype { - case cb.ConfigurationItem_Policy: - configHandlerMap[rtype] = policyManager - default: - configHandlerMap[rtype] = configtx.NewBytesHandler() - } - } - - configManager, err := configtx.NewConfigurationManager(lastConfigTx, policyManager, configHandlerMap) - if err != nil { - panic(err) - } - return configManager -} - -func createBroadcastRuleset(configManager configtx.Manager) *broadcastfilter.RuleSet { - return broadcastfilter.NewRuleSet([]broadcastfilter.Rule{ - broadcastfilter.EmptyRejectRule, - // configfilter.New(configManager), - broadcastfilter.AcceptRule, - }) -} - -func launchSolo(conf *config.TopLevel) { +func launchGeneric(conf *config.TopLevel) { grpcServer := grpc.NewServer() lis, err := net.Listen("tcp", fmt.Sprintf("%s:%d", conf.General.ListenAddress, conf.General.ListenPort)) @@ -174,7 +98,7 @@ func launchSolo(conf *config.TopLevel) { // Stand in until real config ledgerType := os.Getenv("ORDERER_LEDGER_TYPE") - var rawledger rawledger.ReadWriter + var lf rawledger.Factory switch ledgerType { case "file": location := conf.FileLedger.Location @@ -186,36 +110,22 @@ func launchSolo(conf *config.TopLevel) { } } - _, rawledger = fileledger.New(location, genesisBlock) + lf, _ = fileledger.New(location, genesisBlock) case "ram": fallthrough default: - _, rawledger = ramledger.New(int(conf.RAMLedger.HistorySize), genesisBlock) + lf, _ = ramledger.New(int(conf.RAMLedger.HistorySize), genesisBlock) } - lastConfigTx := retrieveConfiguration(rawledger) - if lastConfigTx == nil { - panic("No chain configuration found") - } - - configManager := bootstrapConfigManager(lastConfigTx) - filters := createBroadcastRuleset(configManager) + consenters := make(map[string]multichain.Consenter) + consenters["solo"] = solo.New(conf.General.BatchTimeout) - soloConsenter := solo.NewConsenter( - int(conf.General.BatchSize), - conf.General.BatchTimeout, - rawledger, - filters, - configManager, - ) + manager := multichain.NewManagerImpl(lf, consenters) server := NewServer( - soloConsenter, - rawledger, + manager, int(conf.General.QueueSize), int(conf.General.MaxWindowSize), - filters, - configManager, ) ab.RegisterAtomicBroadcastServer(grpcServer, server) diff --git a/orderer/sbft/backend/backendab.go b/orderer/sbft/backend/backendab.go index 7c918453fa3..f824e69142f 100644 --- a/orderer/sbft/backend/backendab.go +++ b/orderer/sbft/backend/backendab.go @@ -17,21 +17,90 @@ limitations under the License. package backend import ( - "github.com/golang/protobuf/proto" + "bytes" + + "github.com/hyperledger/fabric/orderer/common/broadcastfilter" + "github.com/hyperledger/fabric/orderer/common/configtx" "github.com/hyperledger/fabric/orderer/common/deliver" + "github.com/hyperledger/fabric/orderer/common/policies" + "github.com/hyperledger/fabric/orderer/multichain" + "github.com/hyperledger/fabric/orderer/rawledger" cb "github.com/hyperledger/fabric/protos/common" ab "github.com/hyperledger/fabric/protos/orderer" + + "github.com/golang/protobuf/proto" ) +type xxxMultichain struct { + chainID []byte + chainSupport *xxxChainSupport +} + +func (xxx *xxxMultichain) GetChain(id []byte) (multichain.ChainSupport, bool) { + if !bytes.Equal(id, xxx.chainID) { + return nil, false + } + return xxx.chainSupport, true +} + +type xxxChainSupport struct { + reader rawledger.Reader +} + +func (xxx *xxxChainSupport) ConfigManager() configtx.Manager { + panic("Unimplemented") +} + +func (xxx *xxxChainSupport) PolicyManager() policies.Manager { + panic("Unimplemented") +} + +func (xxx *xxxChainSupport) Filters() *broadcastfilter.RuleSet { + panic("Unimplemented") +} + +func (xxx *xxxChainSupport) Reader() rawledger.Reader { + return xxx.reader +} + +func (xxx *xxxChainSupport) Chain() multichain.Chain { + panic("Unimplemented") +} + type BackendAB struct { backend *Backend deliverserver deliver.Handler } func NewBackendAB(backend *Backend) *BackendAB { + + // XXX All the code below is a hacky shim until sbft can be adapter to the new multichain interface + it, _ := backend.ledger.Iterator(ab.SeekInfo_OLDEST, 0) + block, status := it.Next() + if status != cb.Status_SUCCESS { + panic("Error getting a block from the ledger") + } + env := &cb.Envelope{} + err := proto.Unmarshal(block.Data.Data[0], env) + if err != nil { + panic(err) + } + + payload := &cb.Payload{} + err = proto.Unmarshal(env.Payload, payload) + if err != nil { + panic(err) + } + + manager := &xxxMultichain{ + chainID: payload.Header.ChainHeader.ChainID, + chainSupport: &xxxChainSupport{reader: backend.ledger}, + } + // XXX End hackiness + bab := &BackendAB{ backend: backend, - deliverserver: deliver.NewHandlerImpl(backend.ledger, 1000), + deliverserver: deliver.NewHandlerImpl(manager, 1000), } return bab } diff --git a/orderer/sbft/main.go b/orderer/sbft/main.go index 4ec5dbe9e73..5a3e7a5a74b 100644 --- a/orderer/sbft/main.go +++ b/orderer/sbft/main.go @@ -126,6 +126,7 @@ func serve(c flags) { if err != nil { panic(err) } + _, ledger := fileledger.New(c.dataDir, genesisBlock) s.backend, err = backend.NewBackend(config.Peers, conn, ledger, persist) if err != nil { diff --git a/orderer/sbft/sbft_test.go b/orderer/sbft/sbft_test.go index f986c3f72ac..38971c6079b 100644 --- a/orderer/sbft/sbft_test.go +++ b/orderer/sbft/sbft_test.go @@ -24,6 +24,7 @@ import ( "time" "github.com/golang/protobuf/proto" + "github.com/hyperledger/fabric/orderer/common/bootstrap/static" cb "github.com/hyperledger/fabric/protos/common" ab "github.com/hyperledger/fabric/protos/orderer" "github.com/op/go-logging" @@ -129,7 +130,7 @@ func updateReceiver(t *testing.T, resultch chan byte, errorch chan error, client errorch <- fmt.Errorf("Failed to get Deliver stream: %s", err) return } - dstream.Send(&ab.DeliverUpdate{Type: &ab.DeliverUpdate_Seek{Seek: &ab.SeekInfo{Start: ab.SeekInfo_NEWEST, WindowSize: 10}}}) + dstream.Send(&ab.DeliverUpdate{Type: &ab.DeliverUpdate_Seek{Seek: &ab.SeekInfo{Start: ab.SeekInfo_NEWEST, WindowSize: 10, ChainID: static.TestChainID}}}) logger.Info("{Update Receiver} Listening to ledger updates.") for i := 0; i < 2; i++ { m, inerr := dstream.Recv() diff --git a/orderer/server.go b/orderer/server.go index c52996fea10..9740dcefc58 100644 --- a/orderer/server.go +++ b/orderer/server.go @@ -18,10 +18,8 @@ package main import ( "github.com/hyperledger/fabric/orderer/common/broadcast" - "github.com/hyperledger/fabric/orderer/common/broadcastfilter" - "github.com/hyperledger/fabric/orderer/common/configtx" "github.com/hyperledger/fabric/orderer/common/deliver" - "github.com/hyperledger/fabric/orderer/rawledger" + "github.com/hyperledger/fabric/orderer/multichain" ab "github.com/hyperledger/fabric/protos/orderer" ) @@ -31,12 +29,12 @@ type server struct { } // NewServer creates a ab.AtomicBroadcastServer based on the broadcast target and ledger Reader -func NewServer(consenter broadcast.Target, rl rawledger.Reader, queueSize, maxWindowSize int, filters *broadcastfilter.RuleSet, configManager configtx.Manager) ab.AtomicBroadcastServer { - logger.Infof("Starting orderer with consenter=%T, and ledger=%T", consenter, rl) +func NewServer(ml multichain.Manager, queueSize, maxWindowSize int) ab.AtomicBroadcastServer { + logger.Infof("Starting orderer") s := &server{ - dh: deliver.NewHandlerImpl(rl, maxWindowSize), - bh: broadcast.NewHandlerImpl(queueSize, consenter, filters, configManager), + dh: deliver.NewHandlerImpl(ml, maxWindowSize), + bh: broadcast.NewHandlerImpl(queueSize, ml), } return s } diff --git a/orderer/solo/consensus.go b/orderer/solo/consensus.go index 81a31aa5df3..ba114a47351 100644 --- a/orderer/solo/consensus.go +++ b/orderer/solo/consensus.go @@ -20,8 +20,8 @@ import ( "time" "github.com/hyperledger/fabric/orderer/common/blockcutter" - "github.com/hyperledger/fabric/orderer/common/broadcastfilter" "github.com/hyperledger/fabric/orderer/common/configtx" + "github.com/hyperledger/fabric/orderer/multichain" "github.com/hyperledger/fabric/orderer/rawledger" cb "github.com/hyperledger/fabric/protos/common" "github.com/op/go-logging" @@ -35,56 +35,68 @@ func init() { type consenter struct { batchTimeout time.Duration - cutter blockcutter.Receiver +} + +type chain struct { + batchTimeout time.Duration rl rawledger.Writer + cutter blockcutter.Receiver sendChan chan *cb.Envelope exitChan chan struct{} } -func NewConsenter(batchSize int, batchTimeout time.Duration, rl rawledger.Writer, filters *broadcastfilter.RuleSet, configManager configtx.Manager) *consenter { - bs := newPlainConsenter(batchSize, batchTimeout, rl, filters, configManager) - go bs.main() - return bs +func New(batchTimeout time.Duration) multichain.Consenter { + return &consenter{ + // TODO, ultimately this should come from the configManager at HandleChain + batchTimeout: batchTimeout, + } +} + +func (solo *consenter) HandleChain(configManager configtx.Manager, cutter blockcutter.Receiver, rl rawledger.Writer, metadata []byte) multichain.Chain { + return newChain(solo.batchTimeout, configManager, cutter, rl, metadata) } -func newPlainConsenter(batchSize int, batchTimeout time.Duration, rl rawledger.Writer, filters *broadcastfilter.RuleSet, configManager configtx.Manager) *consenter { - bs := &consenter{ - cutter: blockcutter.NewReceiverImpl(batchSize, filters, configManager), +func newChain(batchTimeout time.Duration, configManager configtx.Manager, cutter blockcutter.Receiver, rl rawledger.Writer, metadata []byte) *chain { + return &chain{ batchTimeout: batchTimeout, rl: rl, + cutter: cutter, sendChan: make(chan *cb.Envelope), exitChan: make(chan struct{}), } - return bs } -func (bs *consenter) halt() { - close(bs.exitChan) +func (ch *chain) Start() { + go ch.main() +} + +func (ch *chain) Halt() { + close(ch.exitChan) } // Enqueue accepts a message and returns true on acceptance, or false on shutdown -func (bs *consenter) Enqueue(env *cb.Envelope) bool { +func (ch *chain) Enqueue(env *cb.Envelope) bool { select { - case bs.sendChan <- env: + case ch.sendChan <- env: return true - case <-bs.exitChan: + case <-ch.exitChan: return false } } -func (bs *consenter) main() { +func (ch *chain) main() { var timer <-chan time.Time for { select { - case msg := <-bs.sendChan: - batches, ok := bs.cutter.Ordered(msg) + case msg := <-ch.sendChan: + batches, ok := ch.cutter.Ordered(msg) if ok && len(batches) == 0 && timer == nil { - timer = time.After(bs.batchTimeout) + timer = time.After(ch.batchTimeout) continue } for _, batch := range batches { - bs.rl.Append(batch, nil) + ch.rl.Append(batch, nil) } if len(batches) > 0 { timer = nil @@ -93,14 +105,14 @@ func (bs *consenter) main() { //clear the timer timer = nil - batch := bs.cutter.Cut() + batch := ch.cutter.Cut() if len(batch) == 0 { logger.Warningf("Batch timer expired with no pending requests, this might indicate a bug") continue } logger.Debugf("Batch timer expired, creating block") - bs.rl.Append(batch, nil) - case <-bs.exitChan: + ch.rl.Append(batch, nil) + case <-ch.exitChan: logger.Debugf("Exiting") return } diff --git a/orderer/solo/consensus_test.go b/orderer/solo/consensus_test.go index 0d48ad483a2..260daf78b0a 100644 --- a/orderer/solo/consensus_test.go +++ b/orderer/solo/consensus_test.go @@ -24,6 +24,7 @@ import ( "google.golang.org/grpc" + "github.com/hyperledger/fabric/orderer/common/blockcutter" "github.com/hyperledger/fabric/orderer/common/bootstrap/static" "github.com/hyperledger/fabric/orderer/common/broadcastfilter" "github.com/hyperledger/fabric/orderer/common/configtx" @@ -70,14 +71,16 @@ func (mcf *mockConfigFilter) Apply(msg *cb.Envelope) broadcastfilter.Action { return broadcastfilter.Forward } -func getFiltersAndConfig() (*broadcastfilter.RuleSet, *mockConfigManager) { +func getEverything(batchSize int) (*mockConfigManager, blockcutter.Receiver, rawledger.ReadWriter) { cm := &mockConfigManager{} filters := broadcastfilter.NewRuleSet([]broadcastfilter.Rule{ broadcastfilter.EmptyRejectRule, &mockConfigFilter{cm}, broadcastfilter.AcceptRule, }) - return filters, cm + cutter := blockcutter.NewReceiverImpl(batchSize, filters, cm) + _, rl := ramledger.New(10, genesisBlock) + return cm, cutter, rl } @@ -126,20 +129,19 @@ func (m *mockB) Recv() (*cb.Envelope, error) { } func TestEmptyBatch(t *testing.T) { - filters, cm := getFiltersAndConfig() - _, rl := ramledger.New(10, genesisBlock) - bs := newPlainConsenter(1, time.Millisecond, rl, filters, cm) + cm, cutter, rl := getEverything(1) + bs := newChain(time.Millisecond, cm, cutter, rl, nil) if bs.rl.(rawledger.Reader).Height() != 1 { t.Fatalf("Expected no new blocks created") } } func TestBatchTimer(t *testing.T) { - filters, cm := getFiltersAndConfig() batchSize := 2 - _, rl := ramledger.New(10, genesisBlock) - bs := NewConsenter(batchSize, time.Millisecond, rl, filters, cm) - defer bs.halt() + cm, cutter, rl := getEverything(batchSize) + bs := newChain(time.Millisecond, cm, cutter, rl, nil) + bs.Start() + defer bs.Halt() it, _ := rl.Iterator(ab.SeekInfo_SPECIFIED, 1) bs.sendChan <- &cb.Envelope{Payload: []byte("Some bytes")} @@ -160,11 +162,11 @@ func TestBatchTimer(t *testing.T) { } func TestBatchTimerHaltOnFilledBatch(t *testing.T) { - filters, cm := getFiltersAndConfig() batchSize := 2 - _, rl := ramledger.New(10, genesisBlock) - bs := NewConsenter(batchSize, time.Hour, rl, filters, cm) - defer bs.halt() + cm, cutter, rl := getEverything(batchSize) + bs := newChain(time.Millisecond, cm, cutter, rl, nil) + bs.Start() + defer bs.Halt() it, _ := rl.Iterator(ab.SeekInfo_SPECIFIED, 1) bs.sendChan <- &cb.Envelope{Payload: []byte("Some bytes")} @@ -189,11 +191,10 @@ func TestBatchTimerHaltOnFilledBatch(t *testing.T) { } func TestFilledBatch(t *testing.T) { - filters, cm := getFiltersAndConfig() batchSize := 2 + cm, cutter, rl := getEverything(batchSize) + bs := newChain(time.Hour, cm, cutter, rl, nil) messages := 10 - _, rl := ramledger.New(10, genesisBlock) - bs := newPlainConsenter(batchSize, time.Hour, rl, filters, cm) done := make(chan struct{}) go func() { bs.main() @@ -202,7 +203,7 @@ func TestFilledBatch(t *testing.T) { for i := 0; i < messages; i++ { bs.sendChan <- &cb.Envelope{Payload: []byte("Some bytes")} } - bs.halt() + bs.Halt() <-done expected := uint64(1 + messages/batchSize) if bs.rl.(rawledger.Reader).Height() != expected { @@ -211,10 +212,9 @@ func TestFilledBatch(t *testing.T) { } func TestReconfigureGoodPath(t *testing.T) { - filters, cm := getFiltersAndConfig() batchSize := 2 - _, rl := ramledger.New(10, genesisBlock) - bs := newPlainConsenter(batchSize, time.Hour, rl, filters, cm) + cm, cutter, rl := getEverything(batchSize) + bs := newChain(time.Hour, cm, cutter, rl, nil) done := make(chan struct{}) go func() { bs.main() @@ -226,7 +226,7 @@ func TestReconfigureGoodPath(t *testing.T) { bs.sendChan <- &cb.Envelope{Payload: []byte("Msg2")} bs.sendChan <- &cb.Envelope{Payload: []byte("Msg3")} - bs.halt() + bs.Halt() <-done expected := uint64(4) if bs.rl.(rawledger.Reader).Height() != expected {