From 157479b99f9ccd5311ca27020c7c33c841924555 Mon Sep 17 00:00:00 2001 From: Jason Yellick Date: Tue, 22 Nov 2016 13:16:58 -0500 Subject: [PATCH] Create multichain manager The multichain manager will ultimately supplant much of what is main.go and generally coordinate the creation of the consenter backed chains and provide an interface for the common broadcast/deliver logic. This changeset introduces the manager but makes no attempt to hook it into the rest of the system (for ease of review). Change-Id: I89072ff497dbf0395f1864587e034d5906217cea Signed-off-by: Jason Yellick Signed-off-by: Yacov Manevich --- orderer/multichain/chainsupport.go | 152 ++++++++++++++++ orderer/multichain/chainsupport_mock_test.go | 65 +++++++ orderer/multichain/manager.go | 174 +++++++++++++++++++ orderer/multichain/manager_test.go | 157 +++++++++++++++++ 4 files changed, 548 insertions(+) create mode 100644 orderer/multichain/chainsupport.go create mode 100644 orderer/multichain/chainsupport_mock_test.go create mode 100644 orderer/multichain/manager.go create mode 100644 orderer/multichain/manager_test.go diff --git a/orderer/multichain/chainsupport.go b/orderer/multichain/chainsupport.go new file mode 100644 index 00000000000..c2216bd75f4 --- /dev/null +++ b/orderer/multichain/chainsupport.go @@ -0,0 +1,152 @@ +/* +Copyright IBM Corp. 2016 All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package multichain + +import ( + "github.com/hyperledger/fabric/orderer/common/blockcutter" + "github.com/hyperledger/fabric/orderer/common/broadcastfilter" + "github.com/hyperledger/fabric/orderer/common/configtx" + "github.com/hyperledger/fabric/orderer/common/policies" + "github.com/hyperledger/fabric/orderer/rawledger" + cb "github.com/hyperledger/fabric/protos/common" +) + +const XXXBatchSize = 10 // XXX + +// Consenter defines the backing ordering mechanism +type Consenter interface { + // HandleChain should create a return a reference to a Chain for the given set of resources + // It will only be invoked for a given chain once per process. See the description of Chain + // for more details + HandleChain(configManager configtx.Manager, cutter blockcutter.Receiver, rl rawledger.Writer, metadata []byte) Chain +} + +// Chain defines a way to inject messages for ordering +// Note, that in order to allow flexibility in the implementation, it is the responsibility of the implementer +// to take the ordered messages, send them through the blockcutter.Receiver supplied via HandleChain to cut blocks, +// and ultimately write the ledger also supplied via HandleChain. This flow allows for two primary flows +// 1. Messages are ordered into a stream, the stream is cut into blocks, the blocks are committed (solo, kafka) +// 2. Messages are cut into blocks, the blocks are ordered, then the blocks are committed (sbft) +type Chain interface { + // Enqueue accepts a message and returns true on acceptance, or false on shutdown + Enqueue(env *cb.Envelope) bool + + // Start should allocate whatever resources are needed for staying up to date with the chain + // Typically, this involves creating a thread which reads from the ordering source, passes those + // messages to a block cutter, and writes the resulting blocks to the ledger + Start() + + // Halt frees the resources which were allocated for this Chain + Halt() +} + +// ChainSupport provides a wrapper for the resources backing a chain +type ChainSupport interface { + // ConfigManager returns the current config for the chain + ConfigManager() configtx.Manager + + // PolicyManager returns the current policy manager as specified by the chain configuration + PolicyManager() policies.Manager + + // Filters returns the set of broadcast filters for this chain + Filters() *broadcastfilter.RuleSet + + // Reader returns the chain Reader for the chain + Reader() rawledger.Reader + + // Chain returns the consenter backed chain + Chain() Chain +} + +type chainSupport struct { + chain Chain + configManager configtx.Manager + policyManager policies.Manager + reader rawledger.Reader + writer rawledger.Writer + filters *broadcastfilter.RuleSet +} + +func newChainSupport(configManager configtx.Manager, policyManager policies.Manager, backing rawledger.ReadWriter, consenters map[string]Consenter) *chainSupport { + batchSize := XXXBatchSize // XXX Pull this from chain config + filters := createBroadcastRuleset(configManager) + cutter := blockcutter.NewReceiverImpl(batchSize, filters, configManager) + consenterType := "solo" // XXX retrieve this from the chain config + consenter, ok := consenters[consenterType] + if !ok { + logger.Fatalf("Error retrieving consenter of type: %s", consenterType) + } + + cs := &chainSupport{ + configManager: configManager, + policyManager: policyManager, + filters: filters, + reader: backing, + writer: newWriteInterceptor(configManager, backing), + } + + cs.chain = consenter.HandleChain(configManager, cutter, cs.writer, nil) + + return cs +} + +func createBroadcastRuleset(configManager configtx.Manager) *broadcastfilter.RuleSet { + return broadcastfilter.NewRuleSet([]broadcastfilter.Rule{ + broadcastfilter.EmptyRejectRule, + // configfilter.New(configManager), + broadcastfilter.AcceptRule, + }) +} + +func (cs *chainSupport) start() { + cs.chain.Start() +} + +func (cs *chainSupport) ConfigManager() configtx.Manager { + return cs.configManager +} + +func (cs *chainSupport) PolicyManager() policies.Manager { + return cs.policyManager +} + +func (cs *chainSupport) Filters() *broadcastfilter.RuleSet { + return cs.filters +} + +func (cs *chainSupport) Reader() rawledger.Reader { + return cs.reader +} + +func (cs *chainSupport) Chain() Chain { + return cs.chain +} + +type writeInterceptor struct { + backing rawledger.Writer +} + +// TODO ultimately set write interception policy by config +func newWriteInterceptor(configManager configtx.Manager, backing rawledger.Writer) *writeInterceptor { + return &writeInterceptor{ + backing: backing, + } +} + +func (wi *writeInterceptor) Append(blockContents []*cb.Envelope, metadata [][]byte) *cb.Block { + return wi.backing.Append(blockContents, metadata) +} diff --git a/orderer/multichain/chainsupport_mock_test.go b/orderer/multichain/chainsupport_mock_test.go new file mode 100644 index 00000000000..1be04b4f792 --- /dev/null +++ b/orderer/multichain/chainsupport_mock_test.go @@ -0,0 +1,65 @@ +/* +Copyright IBM Corp. 2016 All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package multichain + +import ( + "github.com/hyperledger/fabric/orderer/common/blockcutter" + "github.com/hyperledger/fabric/orderer/common/configtx" + "github.com/hyperledger/fabric/orderer/rawledger" + cb "github.com/hyperledger/fabric/protos/common" +) + +type mockConsenter struct { +} + +func (mc *mockConsenter) HandleChain(configManager configtx.Manager, cutter blockcutter.Receiver, rl rawledger.Writer, metadata []byte) Chain { + return &mockChain{ + queue: make(chan *cb.Envelope), + ledger: rl, + cutter: cutter, + } +} + +type mockChain struct { + queue chan *cb.Envelope + ledger rawledger.Writer + cutter blockcutter.Receiver +} + +func (mch *mockChain) Enqueue(env *cb.Envelope) bool { + mch.queue <- env + return true +} + +func (mch *mockChain) Start() { + go func() { + for { + msg, ok := <-mch.queue + if !ok { + return + } + batches, _ := mch.cutter.Ordered(msg) + for _, batch := range batches { + mch.ledger.Append(batch, nil) + } + } + }() +} + +func (mch *mockChain) Halt() { + close(mch.queue) +} diff --git a/orderer/multichain/manager.go b/orderer/multichain/manager.go new file mode 100644 index 00000000000..d23f48afe37 --- /dev/null +++ b/orderer/multichain/manager.go @@ -0,0 +1,174 @@ +/* +Copyright IBM Corp. 2016 All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package multichain + +import ( + "sync" + + "github.com/hyperledger/fabric/orderer/common/configtx" + "github.com/hyperledger/fabric/orderer/common/policies" + "github.com/hyperledger/fabric/orderer/rawledger" + cb "github.com/hyperledger/fabric/protos/common" + ab "github.com/hyperledger/fabric/protos/orderer" + + "github.com/golang/protobuf/proto" + "github.com/op/go-logging" +) + +var logger = logging.MustGetLogger("orderer/multichain") + +// XXX This crypto helper is a stand in until we have a real crypto handler +// it considers all signatures to be valid +type xxxCryptoHelper struct{} + +func (xxx xxxCryptoHelper) VerifySignature(msg []byte, ids []byte, sigs []byte) bool { + return true +} + +func init() { + logging.SetLevel(logging.DEBUG, "") +} + +// Manager coordinates the creation and access of chains +type Manager interface { + // GetChain retrieves the chain support for a chain (and whether it exists) + GetChain(chainID []byte) (ChainSupport, bool) +} + +type multiLedger struct { + chains map[string]*chainSupport + consenters map[string]Consenter + ledgerFactory rawledger.Factory + mutex sync.Mutex +} + +// getConfigTx, this should ultimately be done more intelligently, but for now, we search the whole chain for txs and pick the last config one +func getConfigTx(reader rawledger.Reader) *cb.Envelope { + var lastConfigTx *cb.Envelope + + it, _ := reader.Iterator(ab.SeekInfo_OLDEST, 0) + // Iterate over the blockchain, looking for config transactions, track the most recent one encountered + // this will be the transaction which is returned + for { + select { + case <-it.ReadyChan(): + block, status := it.Next() + if status != cb.Status_SUCCESS { + logger.Fatalf("Error parsing blockchain at startup: %v", status) + } + // ConfigTxs should always be by themselves + if len(block.Data.Data) != 1 { + continue + } + + maybeConfigTx := &cb.Envelope{} + + err := proto.Unmarshal(block.Data.Data[0], maybeConfigTx) + + if err != nil { + logger.Fatalf("Found data which was not an envelope: %s", err) + } + + payload := &cb.Payload{} + err = proto.Unmarshal(maybeConfigTx.Payload, payload) + + if payload.Header.ChainHeader.Type != int32(cb.HeaderType_CONFIGURATION_TRANSACTION) { + continue + } + + logger.Debugf("Found configuration transaction for chain %x at block %d", payload.Header.ChainHeader.ChainID, block.Header.Number) + lastConfigTx = maybeConfigTx + default: + return lastConfigTx + } + } +} + +// NewManagerImpl produces an instance of a Manager +func NewManagerImpl(ledgerFactory rawledger.Factory, consenters map[string]Consenter) Manager { + ml := &multiLedger{ + chains: make(map[string]*chainSupport), + ledgerFactory: ledgerFactory, + } + + existingChains := ledgerFactory.ChainIDs() + for _, chainID := range existingChains { + rl, err := ledgerFactory.GetOrCreate(chainID) + if err != nil { + logger.Fatalf("Ledger factory reported chainID %x but could not retrieve it: %s", chainID, err) + } + configTx := getConfigTx(rl) + if configTx == nil { + logger.Fatalf("Could not find configuration transaction for chain %x", chainID) + } + configManager, policyManager, backingLedger := ml.newResources(configTx) + chainID := configManager.ChainID() + ml.chains[string(chainID)] = newChainSupport(configManager, policyManager, backingLedger, consenters) + } + + for _, cs := range ml.chains { + cs.start() + } + + return ml +} + +// GetChain retrieves the chain support for a chain (and whether it exists) +func (ml *multiLedger) GetChain(chainID []byte) (ChainSupport, bool) { + cs, ok := ml.chains[string(chainID)] + return cs, ok +} + +func (ml *multiLedger) newResources(configTx *cb.Envelope) (configtx.Manager, policies.Manager, rawledger.ReadWriter) { + policyManager := policies.NewManagerImpl(xxxCryptoHelper{}) + configHandlerMap := make(map[cb.ConfigurationItem_ConfigurationType]configtx.Handler) + for ctype := range cb.ConfigurationItem_ConfigurationType_name { + rtype := cb.ConfigurationItem_ConfigurationType(ctype) + switch rtype { + case cb.ConfigurationItem_Policy: + configHandlerMap[rtype] = policyManager + default: + configHandlerMap[rtype] = configtx.NewBytesHandler() + } + } + + payload := &cb.Payload{} + err := proto.Unmarshal(configTx.Payload, payload) + if err != nil { + logger.Fatalf("Error unmarshaling a config transaction payload: %s", err) + } + + configEnvelope := &cb.ConfigurationEnvelope{} + err = proto.Unmarshal(payload.Data, configEnvelope) + if err != nil { + logger.Fatalf("Error unmarshaling a config transaction to config envelope: %s", err) + } + + configManager, err := configtx.NewConfigurationManager(configEnvelope, policyManager, configHandlerMap) + if err != nil { + logger.Fatalf("Error unpacking configuration transaction: %s", err) + } + + chainID := configManager.ChainID() + + ledger, err := ml.ledgerFactory.GetOrCreate(chainID) + if err != nil { + logger.Fatalf("Error getting ledger for %x", chainID) + } + + return configManager, policyManager, ledger +} diff --git a/orderer/multichain/manager_test.go b/orderer/multichain/manager_test.go new file mode 100644 index 00000000000..f7e398eed26 --- /dev/null +++ b/orderer/multichain/manager_test.go @@ -0,0 +1,157 @@ +/* +Copyright IBM Corp. 2016 All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package multichain + +import ( + "fmt" + "reflect" + "testing" + "time" + + "github.com/hyperledger/fabric/orderer/common/bootstrap/static" + "github.com/hyperledger/fabric/orderer/common/util" + "github.com/hyperledger/fabric/orderer/rawledger/ramledger" + cb "github.com/hyperledger/fabric/protos/common" + ab "github.com/hyperledger/fabric/protos/orderer" +) + +var genesisBlock *cb.Block + +func init() { + var err error + genesisBlock, err = static.New().GenesisBlock() + if err != nil { + panic(err) + } +} + +func makeNormalTx(chainID []byte, i int) *cb.Envelope { + payload := &cb.Payload{ + Header: &cb.Header{ + ChainHeader: &cb.ChainHeader{ + Type: int32(cb.HeaderType_ENDORSER_TRANSACTION), + ChainID: chainID, + }, + }, + Data: []byte(fmt.Sprintf("%d", i)), + } + return &cb.Envelope{ + Payload: util.MarshalOrPanic(payload), + } +} + +func makeConfigTx(chainID []byte, i int) *cb.Envelope { + payload := &cb.Payload{ + Header: &cb.Header{ + ChainHeader: &cb.ChainHeader{ + Type: int32(cb.HeaderType_CONFIGURATION_TRANSACTION), + ChainID: chainID, + }, + }, + Data: util.MarshalOrPanic(&cb.ConfigurationEnvelope{ + Items: []*cb.SignedConfigurationItem{&cb.SignedConfigurationItem{ + ConfigurationItem: util.MarshalOrPanic(&cb.ConfigurationItem{ + Value: []byte(fmt.Sprintf("%d", i)), + }), + }}, + }), + } + return &cb.Envelope{ + Payload: util.MarshalOrPanic(payload), + } +} + +// Tests for a normal chain which contains 3 config transactions and other normal transactions to make sure the right one returned +func TestGetConfigTx(t *testing.T) { + _, rl := ramledger.New(10, genesisBlock) + for i := 0; i < 5; i++ { + rl.Append([]*cb.Envelope{makeNormalTx(static.TestChainID, i)}, nil) + } + rl.Append([]*cb.Envelope{makeConfigTx(static.TestChainID, 5)}, nil) + ctx := makeConfigTx(static.TestChainID, 6) + rl.Append([]*cb.Envelope{ctx}, nil) + rl.Append([]*cb.Envelope{makeNormalTx(static.TestChainID, 7)}, nil) + + pctx := getConfigTx(rl) + + if !reflect.DeepEqual(ctx, pctx) { + t.Fatalf("Did not select most recent config transaction") + } +} + +// Tests a chain which contains blocks with multi-transactions mixed with config txs, and a single tx which is not a config tx, none count as config blocks so nil should return +func TestGetConfigTxFailure(t *testing.T) { + _, rl := ramledger.New(10, genesisBlock) + for i := 0; i < 10; i++ { + rl.Append([]*cb.Envelope{ + makeNormalTx(static.TestChainID, i), + makeConfigTx(static.TestChainID, i), + }, nil) + } + rl.Append([]*cb.Envelope{makeNormalTx(static.TestChainID, 11)}, nil) + pctx := getConfigTx(rl) + + if pctx != nil { + t.Fatalf("Should not have found a configuration tx") + } +} + +// This test essentially brings the entire system up and is ultimately what main.go will replicate +func TestManagerImpl(t *testing.T) { + lf, rl := ramledger.New(10, genesisBlock) + + consenters := make(map[string]Consenter) + consenters["solo"] = &mockConsenter{} + + manager := NewManagerImpl(lf, consenters) + + _, ok := manager.GetChain([]byte("Fake")) + if ok { + t.Errorf("Should not have found a chain that was not created") + } + + chainSupport, ok := manager.GetChain(static.TestChainID) + + if !ok { + t.Fatalf("Should have gotten chain which was initialized by ramledger") + } + + messages := make([]*cb.Envelope, XXXBatchSize) + for i := 0; i < XXXBatchSize; i++ { + messages[i] = makeNormalTx(static.TestChainID, i) + } + + for _, message := range messages { + chainSupport.Chain().Enqueue(message) + } + + it, _ := rl.Iterator(ab.SeekInfo_SPECIFIED, 1) + select { + case <-it.ReadyChan(): + block, status := it.Next() + if status != cb.Status_SUCCESS { + t.Fatalf("Could not retrieve block") + } + for i := 0; i < XXXBatchSize; i++ { + if !reflect.DeepEqual(util.ExtractEnvelopeOrPanic(block, i), messages[i]) { + t.Errorf("Block contents wrong at index %d", i) + } + } + case <-time.After(time.Second): + t.Fatalf("Block 1 not produced after timeout") + } +}