Skip to content

Commit

Permalink
Create multichain manager
Browse files Browse the repository at this point in the history
The multichain manager will ultimately supplant much of what is main.go
and generally coordinate the creation of the consenter backed chains and
provide an interface for the common broadcast/deliver logic.

This changeset introduces the manager but makes no attempt to hook it
into the rest of the system (for ease of review).

Change-Id: I89072ff497dbf0395f1864587e034d5906217cea
Signed-off-by: Jason Yellick <jyellick@us.ibm.com>
Signed-off-by: Yacov Manevich <yacovm@il.ibm.com>
  • Loading branch information
Jason Yellick committed Nov 28, 2016
1 parent 69a3aa6 commit 157479b
Show file tree
Hide file tree
Showing 4 changed files with 548 additions and 0 deletions.
152 changes: 152 additions & 0 deletions orderer/multichain/chainsupport.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
/*
Copyright IBM Corp. 2016 All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package multichain

import (
"github.com/hyperledger/fabric/orderer/common/blockcutter"
"github.com/hyperledger/fabric/orderer/common/broadcastfilter"
"github.com/hyperledger/fabric/orderer/common/configtx"
"github.com/hyperledger/fabric/orderer/common/policies"
"github.com/hyperledger/fabric/orderer/rawledger"
cb "github.com/hyperledger/fabric/protos/common"
)

const XXXBatchSize = 10 // XXX

// Consenter defines the backing ordering mechanism
type Consenter interface {
// HandleChain should create a return a reference to a Chain for the given set of resources
// It will only be invoked for a given chain once per process. See the description of Chain
// for more details
HandleChain(configManager configtx.Manager, cutter blockcutter.Receiver, rl rawledger.Writer, metadata []byte) Chain
}

// Chain defines a way to inject messages for ordering
// Note, that in order to allow flexibility in the implementation, it is the responsibility of the implementer
// to take the ordered messages, send them through the blockcutter.Receiver supplied via HandleChain to cut blocks,
// and ultimately write the ledger also supplied via HandleChain. This flow allows for two primary flows
// 1. Messages are ordered into a stream, the stream is cut into blocks, the blocks are committed (solo, kafka)
// 2. Messages are cut into blocks, the blocks are ordered, then the blocks are committed (sbft)
type Chain interface {
// Enqueue accepts a message and returns true on acceptance, or false on shutdown
Enqueue(env *cb.Envelope) bool

// Start should allocate whatever resources are needed for staying up to date with the chain
// Typically, this involves creating a thread which reads from the ordering source, passes those
// messages to a block cutter, and writes the resulting blocks to the ledger
Start()

// Halt frees the resources which were allocated for this Chain
Halt()
}

// ChainSupport provides a wrapper for the resources backing a chain
type ChainSupport interface {
// ConfigManager returns the current config for the chain
ConfigManager() configtx.Manager

// PolicyManager returns the current policy manager as specified by the chain configuration
PolicyManager() policies.Manager

// Filters returns the set of broadcast filters for this chain
Filters() *broadcastfilter.RuleSet

// Reader returns the chain Reader for the chain
Reader() rawledger.Reader

// Chain returns the consenter backed chain
Chain() Chain
}

type chainSupport struct {
chain Chain
configManager configtx.Manager
policyManager policies.Manager
reader rawledger.Reader
writer rawledger.Writer
filters *broadcastfilter.RuleSet
}

func newChainSupport(configManager configtx.Manager, policyManager policies.Manager, backing rawledger.ReadWriter, consenters map[string]Consenter) *chainSupport {
batchSize := XXXBatchSize // XXX Pull this from chain config
filters := createBroadcastRuleset(configManager)
cutter := blockcutter.NewReceiverImpl(batchSize, filters, configManager)
consenterType := "solo" // XXX retrieve this from the chain config
consenter, ok := consenters[consenterType]
if !ok {
logger.Fatalf("Error retrieving consenter of type: %s", consenterType)
}

cs := &chainSupport{
configManager: configManager,
policyManager: policyManager,
filters: filters,
reader: backing,
writer: newWriteInterceptor(configManager, backing),
}

cs.chain = consenter.HandleChain(configManager, cutter, cs.writer, nil)

return cs
}

func createBroadcastRuleset(configManager configtx.Manager) *broadcastfilter.RuleSet {
return broadcastfilter.NewRuleSet([]broadcastfilter.Rule{
broadcastfilter.EmptyRejectRule,
// configfilter.New(configManager),
broadcastfilter.AcceptRule,
})
}

func (cs *chainSupport) start() {
cs.chain.Start()
}

func (cs *chainSupport) ConfigManager() configtx.Manager {
return cs.configManager
}

func (cs *chainSupport) PolicyManager() policies.Manager {
return cs.policyManager
}

func (cs *chainSupport) Filters() *broadcastfilter.RuleSet {
return cs.filters
}

func (cs *chainSupport) Reader() rawledger.Reader {
return cs.reader
}

func (cs *chainSupport) Chain() Chain {
return cs.chain
}

type writeInterceptor struct {
backing rawledger.Writer
}

// TODO ultimately set write interception policy by config
func newWriteInterceptor(configManager configtx.Manager, backing rawledger.Writer) *writeInterceptor {
return &writeInterceptor{
backing: backing,
}
}

func (wi *writeInterceptor) Append(blockContents []*cb.Envelope, metadata [][]byte) *cb.Block {
return wi.backing.Append(blockContents, metadata)
}
65 changes: 65 additions & 0 deletions orderer/multichain/chainsupport_mock_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
Copyright IBM Corp. 2016 All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package multichain

import (
"github.com/hyperledger/fabric/orderer/common/blockcutter"
"github.com/hyperledger/fabric/orderer/common/configtx"
"github.com/hyperledger/fabric/orderer/rawledger"
cb "github.com/hyperledger/fabric/protos/common"
)

type mockConsenter struct {
}

func (mc *mockConsenter) HandleChain(configManager configtx.Manager, cutter blockcutter.Receiver, rl rawledger.Writer, metadata []byte) Chain {
return &mockChain{
queue: make(chan *cb.Envelope),
ledger: rl,
cutter: cutter,
}
}

type mockChain struct {
queue chan *cb.Envelope
ledger rawledger.Writer
cutter blockcutter.Receiver
}

func (mch *mockChain) Enqueue(env *cb.Envelope) bool {
mch.queue <- env
return true
}

func (mch *mockChain) Start() {
go func() {
for {
msg, ok := <-mch.queue
if !ok {
return
}
batches, _ := mch.cutter.Ordered(msg)
for _, batch := range batches {
mch.ledger.Append(batch, nil)
}
}
}()
}

func (mch *mockChain) Halt() {
close(mch.queue)
}
174 changes: 174 additions & 0 deletions orderer/multichain/manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
/*
Copyright IBM Corp. 2016 All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package multichain

import (
"sync"

"github.com/hyperledger/fabric/orderer/common/configtx"
"github.com/hyperledger/fabric/orderer/common/policies"
"github.com/hyperledger/fabric/orderer/rawledger"
cb "github.com/hyperledger/fabric/protos/common"
ab "github.com/hyperledger/fabric/protos/orderer"

"github.com/golang/protobuf/proto"
"github.com/op/go-logging"
)

var logger = logging.MustGetLogger("orderer/multichain")

// XXX This crypto helper is a stand in until we have a real crypto handler
// it considers all signatures to be valid
type xxxCryptoHelper struct{}

func (xxx xxxCryptoHelper) VerifySignature(msg []byte, ids []byte, sigs []byte) bool {
return true
}

func init() {
logging.SetLevel(logging.DEBUG, "")
}

// Manager coordinates the creation and access of chains
type Manager interface {
// GetChain retrieves the chain support for a chain (and whether it exists)
GetChain(chainID []byte) (ChainSupport, bool)
}

type multiLedger struct {
chains map[string]*chainSupport
consenters map[string]Consenter
ledgerFactory rawledger.Factory
mutex sync.Mutex
}

// getConfigTx, this should ultimately be done more intelligently, but for now, we search the whole chain for txs and pick the last config one
func getConfigTx(reader rawledger.Reader) *cb.Envelope {
var lastConfigTx *cb.Envelope

it, _ := reader.Iterator(ab.SeekInfo_OLDEST, 0)
// Iterate over the blockchain, looking for config transactions, track the most recent one encountered
// this will be the transaction which is returned
for {
select {
case <-it.ReadyChan():
block, status := it.Next()
if status != cb.Status_SUCCESS {
logger.Fatalf("Error parsing blockchain at startup: %v", status)
}
// ConfigTxs should always be by themselves
if len(block.Data.Data) != 1 {
continue
}

maybeConfigTx := &cb.Envelope{}

err := proto.Unmarshal(block.Data.Data[0], maybeConfigTx)

if err != nil {
logger.Fatalf("Found data which was not an envelope: %s", err)
}

payload := &cb.Payload{}
err = proto.Unmarshal(maybeConfigTx.Payload, payload)

if payload.Header.ChainHeader.Type != int32(cb.HeaderType_CONFIGURATION_TRANSACTION) {
continue
}

logger.Debugf("Found configuration transaction for chain %x at block %d", payload.Header.ChainHeader.ChainID, block.Header.Number)
lastConfigTx = maybeConfigTx
default:
return lastConfigTx
}
}
}

// NewManagerImpl produces an instance of a Manager
func NewManagerImpl(ledgerFactory rawledger.Factory, consenters map[string]Consenter) Manager {
ml := &multiLedger{
chains: make(map[string]*chainSupport),
ledgerFactory: ledgerFactory,
}

existingChains := ledgerFactory.ChainIDs()
for _, chainID := range existingChains {
rl, err := ledgerFactory.GetOrCreate(chainID)
if err != nil {
logger.Fatalf("Ledger factory reported chainID %x but could not retrieve it: %s", chainID, err)
}
configTx := getConfigTx(rl)
if configTx == nil {
logger.Fatalf("Could not find configuration transaction for chain %x", chainID)
}
configManager, policyManager, backingLedger := ml.newResources(configTx)
chainID := configManager.ChainID()
ml.chains[string(chainID)] = newChainSupport(configManager, policyManager, backingLedger, consenters)
}

for _, cs := range ml.chains {
cs.start()
}

return ml
}

// GetChain retrieves the chain support for a chain (and whether it exists)
func (ml *multiLedger) GetChain(chainID []byte) (ChainSupport, bool) {
cs, ok := ml.chains[string(chainID)]
return cs, ok
}

func (ml *multiLedger) newResources(configTx *cb.Envelope) (configtx.Manager, policies.Manager, rawledger.ReadWriter) {
policyManager := policies.NewManagerImpl(xxxCryptoHelper{})
configHandlerMap := make(map[cb.ConfigurationItem_ConfigurationType]configtx.Handler)
for ctype := range cb.ConfigurationItem_ConfigurationType_name {
rtype := cb.ConfigurationItem_ConfigurationType(ctype)
switch rtype {
case cb.ConfigurationItem_Policy:
configHandlerMap[rtype] = policyManager
default:
configHandlerMap[rtype] = configtx.NewBytesHandler()
}
}

payload := &cb.Payload{}
err := proto.Unmarshal(configTx.Payload, payload)
if err != nil {
logger.Fatalf("Error unmarshaling a config transaction payload: %s", err)
}

configEnvelope := &cb.ConfigurationEnvelope{}
err = proto.Unmarshal(payload.Data, configEnvelope)
if err != nil {
logger.Fatalf("Error unmarshaling a config transaction to config envelope: %s", err)
}

configManager, err := configtx.NewConfigurationManager(configEnvelope, policyManager, configHandlerMap)
if err != nil {
logger.Fatalf("Error unpacking configuration transaction: %s", err)
}

chainID := configManager.ChainID()

ledger, err := ml.ledgerFactory.GetOrCreate(chainID)
if err != nil {
logger.Fatalf("Error getting ledger for %x", chainID)
}

return configManager, policyManager, ledger
}
Loading

0 comments on commit 157479b

Please sign in to comment.