Skip to content

Commit

Permalink
Hook multichain manager into main path
Browse files Browse the repository at this point in the history
The previous changeset created the multichain manager but left it
entirely out of the execution path.  This changeset hooks the multichain
manager into the primary path and removes much of the redundant code it
replaces.

This changeset only supports solo as a consumer of the multichain
manager, but kafka will follow next (as the multichain manager gives
multichain support for free to implementers of its Consenter interface).

Change-Id: I4e5b19971afe5b434843f3e0011fd18df098aa0f
Signed-off-by: Jason Yellick <jyellick@us.ibm.com>
  • Loading branch information
Jason Yellick committed Nov 28, 2016
1 parent 157479b commit a8af1e9
Show file tree
Hide file tree
Showing 12 changed files with 401 additions and 267 deletions.
2 changes: 1 addition & 1 deletion orderer/common/blockcutter/blockcutter.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func init() {
logging.SetLevel(logging.DEBUG, "")
}

// Target defines a sink for the ordered broadcast messages
// Receiver defines a sink for the ordered broadcast messages
type Receiver interface {
// Ordered should be invoked sequentially as messages are ordered
// If the message is a valid normal message and does not fill the batch, nil, true is returned
Expand Down
57 changes: 33 additions & 24 deletions orderer/common/broadcast/broadcast.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,11 @@ package broadcast

import (
"github.com/hyperledger/fabric/orderer/common/broadcastfilter"
"github.com/hyperledger/fabric/orderer/common/configtx"
"github.com/hyperledger/fabric/orderer/multichain"
cb "github.com/hyperledger/fabric/protos/common"
ab "github.com/hyperledger/fabric/protos/orderer"

"github.com/golang/protobuf/proto"
"github.com/op/go-logging"
)

Expand All @@ -31,34 +32,24 @@ func init() {
logging.SetLevel(logging.DEBUG, "")
}

// Target defines an interface which the broadcast handler will direct broadcasts to
type Target interface {
// Enqueue accepts a message and returns true on acceptance, or false on shutdown
Enqueue(env *cb.Envelope) bool
}

// Handler defines an interface which handles broadcasts
type Handler interface {
// Handle starts a service thread for a given gRPC connection and services the broadcast connection
Handle(srv ab.AtomicBroadcast_BroadcastServer) error
}

type handlerImpl struct {
queueSize int
target Target
filters *broadcastfilter.RuleSet
configManager configtx.Manager
exitChan chan struct{}
queueSize int
ml multichain.Manager
exitChan chan struct{}
}

// NewHandlerImpl constructs a new implementation of the Handler interface
func NewHandlerImpl(queueSize int, target Target, filters *broadcastfilter.RuleSet, configManager configtx.Manager) Handler {
func NewHandlerImpl(queueSize int, ml multichain.Manager) Handler {
return &handlerImpl{
queueSize: queueSize,
filters: filters,
configManager: configManager,
target: target,
exitChan: make(chan struct{}),
queueSize: queueSize,
ml: ml,
exitChan: make(chan struct{}),
}
}

Expand All @@ -70,25 +61,30 @@ func (bh *handlerImpl) Handle(srv ab.AtomicBroadcast_BroadcastServer) error {
return b.queueEnvelopes(srv)
}

type msgAndChainSupport struct {
msg *cb.Envelope
chainSupport multichain.ChainSupport
}

type broadcaster struct {
bs *handlerImpl
queue chan *cb.Envelope
queue chan *msgAndChainSupport
}

func newBroadcaster(bs *handlerImpl) *broadcaster {
b := &broadcaster{
bs: bs,
queue: make(chan *cb.Envelope, bs.queueSize),
queue: make(chan *msgAndChainSupport, bs.queueSize),
}
return b
}

func (b *broadcaster) drainQueue() {
for {
select {
case msg, ok := <-b.queue:
case msgAndChainSupport, ok := <-b.queue:
if ok {
if !b.bs.target.Enqueue(msg) {
if !msgAndChainSupport.chainSupport.Chain().Enqueue(msgAndChainSupport.msg) {
return
}
} else {
Expand All @@ -108,14 +104,27 @@ func (b *broadcaster) queueEnvelopes(srv ab.AtomicBroadcast_BroadcastServer) err
return err
}

action, _ := b.bs.filters.Apply(msg)
payload := &cb.Payload{}
err = proto.Unmarshal(msg.Payload, payload)
if payload.Header == nil || payload.Header.ChainHeader == nil || payload.Header.ChainHeader.ChainID == nil {
logger.Debugf("Received malformed message, dropping connection")
return srv.Send(&ab.BroadcastResponse{Status: cb.Status_BAD_REQUEST})
}

chainSupport, ok := b.bs.ml.GetChain(payload.Header.ChainHeader.ChainID)
if !ok {
// XXX Hook in chain creation logic here
panic("Unimplemented")
}

action, _ := chainSupport.Filters().Apply(msg)

switch action {
case broadcastfilter.Reconfigure:
fallthrough
case broadcastfilter.Accept:
select {
case b.queue <- msg:
case b.queue <- &msgAndChainSupport{msg: msg, chainSupport: chainSupport}:
err = srv.Send(&ab.BroadcastResponse{Status: cb.Status_SUCCESS})
default:
err = srv.Send(&ab.BroadcastResponse{Status: cb.Status_SERVICE_UNAVAILABLE})
Expand Down
Loading

0 comments on commit a8af1e9

Please sign in to comment.