Skip to content

Commit

Permalink
[FAB-5271] Solo to new orderer validation flow
Browse files Browse the repository at this point in the history
With all of the preliminary work finally done.  It is safe for the solo
consensus code to skip the second round of validation on transactions,
so long as the configuration sequence for the channel has not advanced.

This CR checks the current configSeq when processing a message, and if
it has not advanced, commits it directly.  If the confiSeq has advanced,
only then does it revalidate the message, dropping it if it is invalid,
or simply committing it otherwise.

Very unscientific tests on my local laptop saw throughput on Broadcast
for solo jump from 900 tps to 2000 tps.

Change-Id: Ic7340cdb66631539df3826c7a52844a53849b3ad
Signed-off-by: Jason Yellick <jyellick@us.ibm.com>
  • Loading branch information
Jason Yellick committed Jul 27, 2017
1 parent 204f0f4 commit 6506805
Show file tree
Hide file tree
Showing 4 changed files with 67 additions and 48 deletions.
5 changes: 4 additions & 1 deletion orderer/consensus/consensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ type Consenter interface {
// 1. Messages are ordered into a stream, the stream is cut into blocks, the blocks are committed (solo, kafka)
// 2. Messages are cut into blocks, the blocks are ordered, then the blocks are committed (sbft)
type Chain interface {
// NOTE: The solo/kafka consenters have not been updated to perform the revalidation
// NOTE: The kafka consenter has not been updated to perform the revalidation
// checks conditionally. For now, Order/Configure are essentially Enqueue as before.
// This does not cause data inconsistency, but it wastes cycles and will be required
// to properly support the ConfigUpdate concept once introduced
Expand Down Expand Up @@ -91,6 +91,9 @@ type ConsenterSupport interface {
// WriteBlock commits a block to the ledger, and applies the config update inside.
WriteConfigBlock(block *cb.Block, encodedMetadataValue []byte) *cb.Block

// Sequence returns the current config squence.
Sequence() uint64

// ChainID returns the channel ID this support is associated with.
ChainID() string

Expand Down
98 changes: 54 additions & 44 deletions orderer/consensus/solo/consensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,8 @@ import (
"fmt"
"time"

"github.com/hyperledger/fabric/orderer/common/msgprocessor"
"github.com/hyperledger/fabric/orderer/consensus"
cb "github.com/hyperledger/fabric/protos/common"
"github.com/hyperledger/fabric/protos/utils"
"github.com/op/go-logging"
)

Expand All @@ -33,10 +31,16 @@ type consenter struct{}

type chain struct {
support consensus.ConsenterSupport
sendChan chan *cb.Envelope
sendChan chan *message
exitChan chan struct{}
}

type message struct {
configSeq uint64
configMsg *cb.Envelope
initialMsg *cb.Envelope
}

// New creates a new consenter for the solo consensus scheme.
// The solo consensus scheme is very simple, and allows only one consenter for a given chain (this process).
// It accepts messages being delivered via Order/Configure, orders them, and then uses the blockcutter to form the messages
Expand All @@ -52,7 +56,7 @@ func (solo *consenter) HandleChain(support consensus.ConsenterSupport, metadata
func newChain(support consensus.ConsenterSupport) *chain {
return &chain{
support: support,
sendChan: make(chan *cb.Envelope),
sendChan: make(chan *message),
exitChan: make(chan struct{}),
}
}
Expand All @@ -73,17 +77,28 @@ func (ch *chain) Halt() {
// Order accepts normal messages for ordering
func (ch *chain) Order(env *cb.Envelope, configSeq uint64) error {
select {
case ch.sendChan <- env:
case ch.sendChan <- &message{
configSeq: configSeq,
initialMsg: env,
}:
return nil
case <-ch.exitChan:
return fmt.Errorf("Exiting")
}
}

// Order accepts normal messages for ordering
func (ch *chain) Configure(configUpdate *cb.Envelope, config *cb.Envelope, configSeq uint64) error {
// TODO, handle this specially
return ch.Order(config, configSeq)
// Configure accepts configuration update messages for ordering
func (ch *chain) Configure(impetus *cb.Envelope, config *cb.Envelope, configSeq uint64) error {
select {
case ch.sendChan <- &message{
configSeq: configSeq,
initialMsg: impetus,
configMsg: config,
}:
return nil
case <-ch.exitChan:
return fmt.Errorf("Exiting")
}
}

// Errored only closes on exit
Expand All @@ -93,44 +108,23 @@ func (ch *chain) Errored() <-chan struct{} {

func (ch *chain) main() {
var timer <-chan time.Time
var err error

for {
seq := ch.support.Sequence()
err = nil
select {
case msg := <-ch.sendChan:
chdr, err := utils.ChannelHeader(msg)
if err != nil {
logger.Panicf("If a message has arrived to this point, it should already have had its header inspected once")
}

class, err := ch.support.ClassifyMsg(chdr)
if err != nil {
logger.Panicf("If a message has arrived to this point, it should already have been classified once: %s", err)
}
switch class {
case msgprocessor.ConfigUpdateMsg:
_, err := ch.support.ProcessNormalMsg(msg)
if err != nil {
logger.Warningf("Discarding bad config message: %s", err)
continue
if msg.configMsg == nil {
// NormalMsg
if msg.configSeq < seq {
_, err = ch.support.ProcessNormalMsg(msg.initialMsg)
if err != nil {
logger.Warningf("Discarding bad normal message: %s", err)
continue
}
}

batch := ch.support.BlockCutter().Cut()
if batch != nil {
block := ch.support.CreateNextBlock(batch)
ch.support.WriteBlock(block, nil)
}

block := ch.support.CreateNextBlock([]*cb.Envelope{msg})
ch.support.WriteConfigBlock(block, nil)
timer = nil
case msgprocessor.NormalMsg:
_, err := ch.support.ProcessNormalMsg(msg)
if err != nil {
logger.Warningf("Discarding bad normal message: %s", err)
continue
}

batches, ok := ch.support.BlockCutter().Ordered(msg)
batches, ok := ch.support.BlockCutter().Ordered(msg.initialMsg)
if ok && len(batches) == 0 && timer == nil {
timer = time.After(ch.support.SharedConfig().BatchTimeout())
continue
Expand All @@ -142,8 +136,24 @@ func (ch *chain) main() {
if len(batches) > 0 {
timer = nil
}
default:
logger.Panicf("Unsupported msg classification: %v", class)
} else {
// ConfigMsg
if msg.configSeq < seq {
msg.configMsg, _, err = ch.support.ProcessConfigUpdateMsg(msg.initialMsg)
if err != nil {
logger.Warningf("Discarding bad config message: %s", err)
continue
}
}
batch := ch.support.BlockCutter().Cut()
if batch != nil {
block := ch.support.CreateNextBlock(batch)
ch.support.WriteBlock(block, nil)
}

block := ch.support.CreateNextBlock([]*cb.Envelope{msg.configMsg})
ch.support.WriteConfigBlock(block, nil)
timer = nil
}
case <-timer:
//clear the timer
Expand Down
4 changes: 1 addition & 3 deletions orderer/consensus/solo/consensus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"time"

mockconfig "github.com/hyperledger/fabric/common/mocks/config"
"github.com/hyperledger/fabric/orderer/common/msgprocessor"
mockblockcutter "github.com/hyperledger/fabric/orderer/mocks/common/blockcutter"
mockmultichannel "github.com/hyperledger/fabric/orderer/mocks/common/multichannel"
cb "github.com/hyperledger/fabric/protos/common"
Expand Down Expand Up @@ -258,8 +257,7 @@ func TestConfigMsg(t *testing.T) {
defer bs.Halt()

syncQueueMessage(testMessage, bs, support.BlockCutterVal)
support.ClassifyMsgVal = msgprocessor.ConfigUpdateMsg
assert.Nil(t, bs.Order(testMessage, 0))
assert.Nil(t, bs.Configure(nil, testMessage, 0))

select {
case <-support.Blocks:
Expand Down
8 changes: 8 additions & 0 deletions orderer/mocks/common/multichannel/multichannel.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,9 @@ type ConsenterSupport struct {

// ProcessConfigUpdateMsgErr is returned as the error for ProcessConfigUpdateMsg
ProcessConfigUpdateMsgErr error

// SequenceVal is returned by Sequence
SequenceVal uint64
}

// BlockCutter returns BlockCutterVal
Expand Down Expand Up @@ -141,3 +144,8 @@ func (mcs *ConsenterSupport) ProcessNormalMsg(env *cb.Envelope) (configSeq uint6
func (mcs *ConsenterSupport) ProcessConfigUpdateMsg(env *cb.Envelope) (config *cb.Envelope, configSeq uint64, err error) {
return mcs.ProcessConfigUpdateMsgVal, mcs.ConfigSeqVal, mcs.ProcessConfigUpdateMsgErr
}

// Sequence returns SequenceVal
func (mcs *ConsenterSupport) Sequence() uint64 {
return mcs.SequenceVal
}

0 comments on commit 6506805

Please sign in to comment.