From 2f0aa7df3aedcedf8623cce06974addb58c9f354 Mon Sep 17 00:00:00 2001 From: Kostas Christidis Date: Thu, 19 Jan 2017 19:32:52 -0500 Subject: [PATCH] [FAB-1623] Add restart support to Kafka orderer https://jira.hyperledger.org/browse/FAB-1623 Allow a restarted ordering shim to pick up ordering exactly from where it left it off. This changeset: 1. Adds a `KafkaMetadata` proto message definition to hold the orderer-related metadata that will be written with every block on the local ledger. 2. Modifies the Kafka-based orderer so that (a) it reads that info upon booting up and (b) it then seeks to the appropriate offset. A convoluted and somewhat tautological unit ("unit") test has been added to test this functionality, but we are really starting to hit the limit of what we can do with the provided mock structures. This path will be revisited with BDD tests within the next few weeks. I've also renamed the offset-related variables in all of the tests to make things easier to keep track of. Review starting point: - fabric/protos/orderer/kafka.proto - fabric/orderer/kafka/orderer.go [1] https://jira.hyperledger.org/browse/FAB-1773 [2] https://jira.hyperledger.org/browse/FAB-1623 Change-Id: I1314a2361844dd4adbd78bf57bb08aaee358dc6f Signed-off-by: Kostas Christidis --- orderer/kafka/orderer.go | 56 ++++++---- orderer/kafka/orderer_test.go | 157 +++++++++++++++++++++++------ orderer/multichain/chainsupport.go | 1 + protos/orderer/ab.pb.go | 1 + protos/orderer/kafka.pb.go | 60 +++++++---- protos/orderer/kafka.proto | 16 +++ 6 files changed, 219 insertions(+), 72 deletions(-) diff --git a/orderer/kafka/orderer.go b/orderer/kafka/orderer.go index 18af7d51b35..70dba31272a 100644 --- a/orderer/kafka/orderer.go +++ b/orderer/kafka/orderer.go @@ -79,7 +79,19 @@ type consenterImpl struct { // Implements the multichain.Consenter interface. Called by multichain.newChainSupport(), which // is itself called by multichain.NewManagerImpl() when ranging over the ledgerFactory's existingChains. func (co *consenterImpl) HandleChain(cs multichain.ConsenterSupport, metadata *cb.Metadata) (multichain.Chain, error) { - return newChain(co, cs), nil + return newChain(co, cs, getLastOffsetPersisted(metadata)), nil +} + +func getLastOffsetPersisted(metadata *cb.Metadata) int64 { + if metadata.Value != nil { + // Extract orderer-related metadata from the tip of the ledger first + kafkaMetadata := &ab.KafkaMetadata{} + if err := proto.Unmarshal(metadata.Value, kafkaMetadata); err != nil { + panic("Ledger may be corrupted: cannot unmarshal orderer metadata in most recent block") + } + return kafkaMetadata.LastOffsetPersisted + } + return (sarama.OffsetOldest - 1) // default } // When testing we need to inject our own broker/producer/consumer. @@ -90,18 +102,19 @@ func (co *consenterImpl) HandleChain(cs multichain.ConsenterSupport, metadata *c // definition of an interface (see testableConsenter below) that will // be satisfied by both the actual and the mock object and will allow // us to retrieve these constructors. -func newChain(consenter testableConsenter, support multichain.ConsenterSupport) *chainImpl { +func newChain(consenter testableConsenter, support multichain.ConsenterSupport, lastOffsetPersisted int64) *chainImpl { + logger.Debug("Starting chain with last persisted offset:", lastOffsetPersisted) return &chainImpl{ - consenter: consenter, - support: support, - partition: newChainPartition(support.ChainID(), rawPartition), - batchTimeout: support.SharedConfig().BatchTimeout(), - lastProcessed: sarama.OffsetOldest - 1, // TODO This should be retrieved by ConsenterSupport; also see note in loop() below - producer: consenter.prodFunc()(support.SharedConfig().KafkaBrokers(), consenter.kafkaVersion(), consenter.retryOptions()), - halted: false, // Redundant as the default value for booleans is false but added for readability - exitChan: make(chan struct{}), - haltedChan: make(chan struct{}), - setupChan: make(chan struct{}), + consenter: consenter, + support: support, + partition: newChainPartition(support.ChainID(), rawPartition), + batchTimeout: support.SharedConfig().BatchTimeout(), + lastOffsetPersisted: lastOffsetPersisted, + producer: consenter.prodFunc()(support.SharedConfig().KafkaBrokers(), consenter.kafkaVersion(), consenter.retryOptions()), + halted: false, // Redundant as the default value for booleans is false but added for readability + exitChan: make(chan struct{}), + haltedChan: make(chan struct{}), + setupChan: make(chan struct{}), } } @@ -125,10 +138,10 @@ type chainImpl struct { consenter testableConsenter support multichain.ConsenterSupport - partition ChainPartition - batchTimeout time.Duration - lastProcessed int64 - lastCutBlock uint64 + partition ChainPartition + batchTimeout time.Duration + lastOffsetPersisted int64 + lastCutBlock uint64 producer Producer consumer Consumer @@ -156,9 +169,7 @@ func (ch *chainImpl) Start() { } // 2. Set up the listener/consumer for this partition. - // TODO When restart support gets added to the common components level, start - // the consumer from lastProcessed. For now, hard-code to oldest available. - consumer, err := ch.consenter.consFunc()(ch.support.SharedConfig().KafkaBrokers(), ch.consenter.kafkaVersion(), ch.partition, ch.lastProcessed+1) + consumer, err := ch.consenter.consFunc()(ch.support.SharedConfig().KafkaBrokers(), ch.consenter.kafkaVersion(), ch.partition, ch.lastOffsetPersisted+1) if err != nil { logger.Criticalf("Cannot retrieve required offset from Kafka cluster for chain %s: %s", ch.partition, err) close(ch.exitChan) @@ -206,6 +217,7 @@ func (ch *chainImpl) loop() { msg := new(ab.KafkaMessage) var timer <-chan time.Time var ttcNumber uint64 + var encodedLastOffsetPersisted []byte defer close(ch.haltedChan) defer ch.producer.Close() @@ -237,7 +249,8 @@ func (ch *chainImpl) loop() { return } block := ch.support.CreateNextBlock(batch) - ch.support.WriteBlock(block, committers, nil) + encodedLastOffsetPersisted = utils.MarshalOrPanic(&ab.KafkaMetadata{LastOffsetPersisted: in.Offset}) + ch.support.WriteBlock(block, committers, encodedLastOffsetPersisted) ch.lastCutBlock++ logger.Debug("Proper time-to-cut received, just cut block", ch.lastCutBlock) continue @@ -264,7 +277,8 @@ func (ch *chainImpl) loop() { // If !ok, batches == nil, so this will be skipped for i, batch := range batches { block := ch.support.CreateNextBlock(batch) - ch.support.WriteBlock(block, committers[i], nil) + encodedLastOffsetPersisted = utils.MarshalOrPanic(&ab.KafkaMetadata{LastOffsetPersisted: in.Offset}) + ch.support.WriteBlock(block, committers[i], encodedLastOffsetPersisted) ch.lastCutBlock++ logger.Debug("Batch filled, just cut block", ch.lastCutBlock) } diff --git a/orderer/kafka/orderer_test.go b/orderer/kafka/orderer_test.go index 0624091955e..8c5712112c4 100644 --- a/orderer/kafka/orderer_test.go +++ b/orderer/kafka/orderer_test.go @@ -17,6 +17,7 @@ limitations under the License. package kafka import ( + "fmt" "sync" "testing" "time" @@ -39,11 +40,6 @@ func newMockSharedConfigManager() *mocksharedconfig.Manager { return &mocksharedconfig.Manager{KafkaBrokersVal: testConf.Kafka.Brokers} } -func syncQueueMessage(msg *cb.Envelope, chain multichain.Chain, bc *mockblockcutter.Receiver) { - chain.Enqueue(msg) - bc.Block <- struct{}{} -} - type mockConsenterImpl struct { consenterImpl prodDisk, consDisk chan *ab.KafkaMessage @@ -51,7 +47,7 @@ type mockConsenterImpl struct { t *testing.T } -func mockNewConsenter(t *testing.T, kafkaVersion sarama.KafkaVersion, retryOptions config.Retry) *mockConsenterImpl { +func mockNewConsenter(t *testing.T, kafkaVersion sarama.KafkaVersion, retryOptions config.Retry, nextProducedOffset int64) *mockConsenterImpl { prodDisk := make(chan *ab.KafkaMessage) consDisk := make(chan *ab.KafkaMessage) @@ -59,10 +55,14 @@ func mockNewConsenter(t *testing.T, kafkaVersion sarama.KafkaVersion, retryOptio return mockNewBroker(t, cp) } mockPfValue := func(brokers []string, kafkaVersion sarama.KafkaVersion, retryOptions config.Retry) Producer { - return mockNewProducer(t, cp, testOldestOffset, prodDisk) + // The first Send on this producer will return a blob with offset #nextProducedOffset + return mockNewProducer(t, cp, nextProducedOffset, prodDisk) } - mockCfValue := func(brokers []string, kafkaVersion sarama.KafkaVersion, cp ChainPartition, offset int64) (Consumer, error) { - return mockNewConsumer(t, cp, offset, consDisk) + mockCfValue := func(brokers []string, kafkaVersion sarama.KafkaVersion, cp ChainPartition, lastPersistedOffset int64) (Consumer, error) { + if lastPersistedOffset != nextProducedOffset { + panic(fmt.Errorf("Mock objects about to be set up incorrectly (consumer to seek to %d, producer to post %d)", lastPersistedOffset, nextProducedOffset)) + } + return mockNewConsumer(t, cp, lastPersistedOffset, consDisk) } return &mockConsenterImpl{ @@ -96,6 +96,11 @@ func prepareMockObjectDisks(t *testing.T, co *mockConsenterImpl, ch *chainImpl) } } +func syncQueueMessage(msg *cb.Envelope, chain multichain.Chain, bc *mockblockcutter.Receiver) { + chain.Enqueue(msg) + bc.Block <- struct{}{} +} + func waitableSyncQueueMessage(env *cb.Envelope, messagesToPickUp int, wg *sync.WaitGroup, co *mockConsenterImpl, cs *mockmultichain.ConsenterSupport, ch *chainImpl) { wg.Add(1) @@ -128,9 +133,10 @@ func TestKafkaConsenterEmptyBatch(t *testing.T) { } defer close(cs.BlockCutterVal.Block) - co := mockNewConsenter(t, testConf.Kafka.Version, testConf.Kafka.Retry) - ch := newChain(co, cs) - ch.lastProcessed = testOldestOffset - 1 + lastPersistedOffset := testOldestOffset - 1 + nextProducedOffset := lastPersistedOffset + 1 + co := mockNewConsenter(t, testConf.Kafka.Version, testConf.Kafka.Retry, nextProducedOffset) + ch := newChain(co, cs, lastPersistedOffset) go ch.Start() defer ch.Halt() @@ -162,9 +168,10 @@ func TestKafkaConsenterBatchTimer(t *testing.T) { } defer close(cs.BlockCutterVal.Block) - co := mockNewConsenter(t, testConf.Kafka.Version, testConf.Kafka.Retry) - ch := newChain(co, cs) - ch.lastProcessed = testOldestOffset - 1 + lastPersistedOffset := testOldestOffset - 1 + nextProducedOffset := lastPersistedOffset + 1 + co := mockNewConsenter(t, testConf.Kafka.Version, testConf.Kafka.Retry, nextProducedOffset) + ch := newChain(co, cs, lastPersistedOffset) go ch.Start() defer ch.Halt() @@ -213,9 +220,10 @@ func TestKafkaConsenterTimerHaltOnFilledBatch(t *testing.T) { } defer close(cs.BlockCutterVal.Block) - co := mockNewConsenter(t, testConf.Kafka.Version, testConf.Kafka.Retry) - ch := newChain(co, cs) - ch.lastProcessed = testOldestOffset - 1 + lastPersistedOffset := testOldestOffset - 1 + nextProducedOffset := lastPersistedOffset + 1 + co := mockNewConsenter(t, testConf.Kafka.Version, testConf.Kafka.Retry, nextProducedOffset) + ch := newChain(co, cs, lastPersistedOffset) go ch.Start() defer ch.Halt() @@ -272,9 +280,10 @@ func TestKafkaConsenterConfigStyleMultiBatch(t *testing.T) { } defer close(cs.BlockCutterVal.Block) - co := mockNewConsenter(t, testConf.Kafka.Version, testConf.Kafka.Retry) - ch := newChain(co, cs) - ch.lastProcessed = testOldestOffset - 1 + lastPersistedOffset := testOldestOffset - 1 + nextProducedOffset := lastPersistedOffset + 1 + co := mockNewConsenter(t, testConf.Kafka.Version, testConf.Kafka.Retry, nextProducedOffset) + ch := newChain(co, cs, lastPersistedOffset) go ch.Start() defer ch.Halt() @@ -321,9 +330,10 @@ func TestKafkaConsenterTimeToCutForced(t *testing.T) { } defer close(cs.BlockCutterVal.Block) - co := mockNewConsenter(t, testConf.Kafka.Version, testConf.Kafka.Retry) - ch := newChain(co, cs) - ch.lastProcessed = testOldestOffset - 1 + lastPersistedOffset := testOldestOffset - 1 + nextProducedOffset := lastPersistedOffset + 1 + co := mockNewConsenter(t, testConf.Kafka.Version, testConf.Kafka.Retry, nextProducedOffset) + ch := newChain(co, cs, lastPersistedOffset) go ch.Start() defer ch.Halt() @@ -377,9 +387,10 @@ func TestKafkaConsenterTimeToCutDuplicate(t *testing.T) { } defer close(cs.BlockCutterVal.Block) - co := mockNewConsenter(t, testConf.Kafka.Version, testConf.Kafka.Retry) - ch := newChain(co, cs) - ch.lastProcessed = testOldestOffset - 1 + lastPersistedOffset := testOldestOffset - 1 + nextProducedOffset := lastPersistedOffset + 1 + co := mockNewConsenter(t, testConf.Kafka.Version, testConf.Kafka.Retry, nextProducedOffset) + ch := newChain(co, cs, lastPersistedOffset) go ch.Start() defer ch.Halt() @@ -465,9 +476,10 @@ func TestKafkaConsenterTimeToCutStale(t *testing.T) { } defer close(cs.BlockCutterVal.Block) - co := mockNewConsenter(t, testConf.Kafka.Version, testConf.Kafka.Retry) - ch := newChain(co, cs) - ch.lastProcessed = testOldestOffset - 1 + lastPersistedOffset := testOldestOffset - 1 + nextProducedOffset := lastPersistedOffset + 1 + co := mockNewConsenter(t, testConf.Kafka.Version, testConf.Kafka.Retry, nextProducedOffset) + ch := newChain(co, cs, lastPersistedOffset) go ch.Start() defer ch.Halt() @@ -523,9 +535,10 @@ func TestKafkaConsenterTimeToCutLarger(t *testing.T) { } defer close(cs.BlockCutterVal.Block) - co := mockNewConsenter(t, testConf.Kafka.Version, testConf.Kafka.Retry) - ch := newChain(co, cs) - ch.lastProcessed = testOldestOffset - 1 + lastPersistedOffset := testOldestOffset - 1 + nextProducedOffset := lastPersistedOffset + 1 + co := mockNewConsenter(t, testConf.Kafka.Version, testConf.Kafka.Retry, nextProducedOffset) + ch := newChain(co, cs, lastPersistedOffset) go ch.Start() defer ch.Halt() @@ -574,3 +587,81 @@ func TestKafkaConsenterTimeToCutLarger(t *testing.T) { case <-ch.haltedChan: // If we're here, we definitely had a chance to invoke Append but didn't (which is great) } } + +func TestGetLastOffsetPersistedEmpty(t *testing.T) { + expected := sarama.OffsetOldest - 1 + actual := getLastOffsetPersisted(&cb.Metadata{}) + if actual != expected { + t.Fatalf("Expected last offset %d, got %d", expected, actual) + } +} + +func TestGetLastOffsetPersistedRight(t *testing.T) { + expected := int64(100) + actual := getLastOffsetPersisted(&cb.Metadata{Value: utils.MarshalOrPanic(&ab.KafkaMetadata{LastOffsetPersisted: expected})}) + if actual != expected { + t.Fatalf("Expected last offset %d, got %d", expected, actual) + } +} + +func TestKafkaConsenterRestart(t *testing.T) { + var wg sync.WaitGroup + defer wg.Wait() + + batchTimeout, _ := time.ParseDuration("1ms") + cs := &mockmultichain.ConsenterSupport{ + Batches: make(chan []*cb.Envelope), + BlockCutterVal: mockblockcutter.NewReceiver(), + ChainIDVal: provisional.TestChainID, + SharedConfigVal: &mocksharedconfig.Manager{BatchTimeoutVal: batchTimeout}, + } + defer close(cs.BlockCutterVal.Block) + + lastPersistedOffset := testOldestOffset - 1 + nextProducedOffset := lastPersistedOffset + 1 + co := mockNewConsenter(t, testConf.Kafka.Version, testConf.Kafka.Retry, nextProducedOffset) + ch := newChain(co, cs, lastPersistedOffset) + + go ch.Start() + defer ch.Halt() + + prepareMockObjectDisks(t, co, ch) + + // The second message that will be picked up is the time-to-cut message + // that will be posted when the short timer expires + waitableSyncQueueMessage(newTestEnvelope("one"), 2, &wg, co, cs, ch) + + select { + case <-cs.Batches: // This is the success path + case <-time.After(testTimePadding): + t.Fatal("Expected block to be cut because batch timer expired") + } + + // Stop the loop + ch.Halt() + + select { + case <-cs.Batches: + t.Fatal("Expected no invocations of Append") + case <-ch.haltedChan: // If we're here, we definitely had a chance to invoke Append but didn't (which is great) + } + + lastBlock := cs.WriteBlockVal + metadata, err := utils.GetMetadataFromBlock(lastBlock, cb.BlockMetadataIndex_ORDERER) + if err != nil { + logger.Fatalf("Error extracting orderer metadata for chain %x: %s", cs.ChainIDVal, err) + } + + lastPersistedOffset = getLastOffsetPersisted(metadata) + nextProducedOffset = lastPersistedOffset + 1 + + co = mockNewConsenter(t, testConf.Kafka.Version, testConf.Kafka.Retry, nextProducedOffset) + ch = newChain(co, cs, lastPersistedOffset) + go ch.Start() + prepareMockObjectDisks(t, co, ch) + + actual := ch.producer.(*mockProducerImpl).producedOffset + if actual != nextProducedOffset { + t.Fatalf("Restarted orderer post-connect should have been at offset %d, got %d instead", nextProducedOffset, actual) + } +} diff --git a/orderer/multichain/chainsupport.go b/orderer/multichain/chainsupport.go index 1d4f58c2de6..00e6217f682 100644 --- a/orderer/multichain/chainsupport.go +++ b/orderer/multichain/chainsupport.go @@ -139,6 +139,7 @@ func newChainSupport( if err != nil { logger.Fatalf("Error extracting orderer metadata for chain %x: %s", configManager.ChainID(), err) } + logger.Debugf("Retrieved metadata for tip of chain (block #%d): %+v", cs.Reader().Height()-1, metadata) cs.chain, err = consenter.HandleChain(cs, metadata) if err != nil { diff --git a/protos/orderer/ab.pb.go b/protos/orderer/ab.pb.go index 999775d4b58..0809e4b841c 100644 --- a/protos/orderer/ab.pb.go +++ b/protos/orderer/ab.pb.go @@ -30,6 +30,7 @@ It has these top-level messages: KafkaMessageRegular KafkaMessageTimeToCut KafkaMessageConnect + KafkaMetadata */ package orderer diff --git a/protos/orderer/kafka.pb.go b/protos/orderer/kafka.pb.go index 2d1194a68d6..9e21b58c736 100644 --- a/protos/orderer/kafka.pb.go +++ b/protos/orderer/kafka.pb.go @@ -13,6 +13,8 @@ var _ = proto.Marshal var _ = fmt.Errorf var _ = math.Inf +// KafkaMessage is a wrapper type for the messages +// that the Kafka-based orderer deals with. type KafkaMessage struct { // Types that are valid to be assigned to Type: // *KafkaMessage_Regular @@ -165,6 +167,7 @@ func _KafkaMessage_OneofSizer(msg proto.Message) (n int) { return n } +// KafkaMessageRegular wraps a marshalled envelope. type KafkaMessageRegular struct { Payload []byte `protobuf:"bytes,1,opt,name=payload,proto3" json:"payload,omitempty"` } @@ -174,6 +177,8 @@ func (m *KafkaMessageRegular) String() string { return proto.CompactT func (*KafkaMessageRegular) ProtoMessage() {} func (*KafkaMessageRegular) Descriptor() ([]byte, []int) { return fileDescriptor2, []int{1} } +// KafkaMessageTimeToCut is used to signal to the orderers +// that it is time to cut block . type KafkaMessageTimeToCut struct { BlockNumber uint64 `protobuf:"varint,1,opt,name=block_number,json=blockNumber" json:"block_number,omitempty"` } @@ -183,6 +188,10 @@ func (m *KafkaMessageTimeToCut) String() string { return proto.Compac func (*KafkaMessageTimeToCut) ProtoMessage() {} func (*KafkaMessageTimeToCut) Descriptor() ([]byte, []int) { return fileDescriptor2, []int{2} } +// KafkaMessageConnect is posted by an orderer upon booting up. +// It is used to prevent the panic that would be caused if we +// were to consume an empty partition. It is ignored by all +// orderers when processing the partition. type KafkaMessageConnect struct { Payload []byte `protobuf:"bytes,1,opt,name=payload,proto3" json:"payload,omitempty"` } @@ -192,32 +201,47 @@ func (m *KafkaMessageConnect) String() string { return proto.CompactT func (*KafkaMessageConnect) ProtoMessage() {} func (*KafkaMessageConnect) Descriptor() ([]byte, []int) { return fileDescriptor2, []int{3} } +// LastOffsetPersisted is the encoded value for the Metadata message +// which is encoded in the ORDERER block metadata index for the case +// of the Kafka-based orderer. +type KafkaMetadata struct { + LastOffsetPersisted int64 `protobuf:"varint,1,opt,name=last_offset_persisted,json=lastOffsetPersisted" json:"last_offset_persisted,omitempty"` +} + +func (m *KafkaMetadata) Reset() { *m = KafkaMetadata{} } +func (m *KafkaMetadata) String() string { return proto.CompactTextString(m) } +func (*KafkaMetadata) ProtoMessage() {} +func (*KafkaMetadata) Descriptor() ([]byte, []int) { return fileDescriptor2, []int{4} } + func init() { proto.RegisterType((*KafkaMessage)(nil), "orderer.KafkaMessage") proto.RegisterType((*KafkaMessageRegular)(nil), "orderer.KafkaMessageRegular") proto.RegisterType((*KafkaMessageTimeToCut)(nil), "orderer.KafkaMessageTimeToCut") proto.RegisterType((*KafkaMessageConnect)(nil), "orderer.KafkaMessageConnect") + proto.RegisterType((*KafkaMetadata)(nil), "orderer.KafkaMetadata") } func init() { proto.RegisterFile("orderer/kafka.proto", fileDescriptor2) } var fileDescriptor2 = []byte{ - // 266 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0x7c, 0x91, 0x3f, 0x4b, 0xc4, 0x30, - 0x18, 0xc6, 0xaf, 0x7a, 0x5c, 0x31, 0xed, 0xd4, 0x43, 0xe8, 0x20, 0xa2, 0x9d, 0x1c, 0x24, 0x01, - 0x5d, 0xc4, 0x49, 0xee, 0x96, 0x03, 0xd1, 0x21, 0x74, 0x72, 0x29, 0x49, 0xfa, 0x5e, 0xaf, 0xf4, - 0x4f, 0x4a, 0x9a, 0x0c, 0xfd, 0x8e, 0x7e, 0x28, 0x69, 0x9a, 0x82, 0x48, 0xbd, 0xf1, 0x79, 0xf2, - 0x7b, 0x79, 0x9e, 0x37, 0x2f, 0xda, 0x4a, 0x95, 0x83, 0x02, 0x45, 0x2a, 0x76, 0xac, 0x18, 0xee, - 0x94, 0xd4, 0x32, 0xf2, 0x9d, 0x99, 0x7c, 0x7b, 0x28, 0x7c, 0x1f, 0x1f, 0x3e, 0xa0, 0xef, 0x59, - 0x01, 0xd1, 0x0b, 0xf2, 0x15, 0x14, 0xa6, 0x66, 0x2a, 0xf6, 0xee, 0xbc, 0x87, 0xe0, 0xe9, 0x06, - 0x3b, 0x16, 0xff, 0xe6, 0xe8, 0xc4, 0x1c, 0x56, 0x74, 0xc6, 0xa3, 0x37, 0x14, 0xe8, 0xb2, 0x81, - 0x4c, 0xcb, 0x4c, 0x18, 0x1d, 0x5f, 0xd8, 0xe9, 0xdb, 0xc5, 0xe9, 0xb4, 0x6c, 0x20, 0x95, 0x7b, - 0xa3, 0x0f, 0x2b, 0x7a, 0xa5, 0x67, 0x31, 0x66, 0x0b, 0xd9, 0xb6, 0x20, 0x74, 0x7c, 0x79, 0x26, - 0x7b, 0x3f, 0x31, 0x63, 0xb6, 0xc3, 0x77, 0x1b, 0xb4, 0x4e, 0x87, 0x0e, 0x12, 0x82, 0xb6, 0x0b, - 0x2d, 0xa3, 0x18, 0xf9, 0x1d, 0x1b, 0x6a, 0xc9, 0x72, 0xbb, 0x54, 0x48, 0x67, 0x99, 0xbc, 0xa2, - 0xeb, 0xc5, 0x62, 0xd1, 0x3d, 0x0a, 0x79, 0x2d, 0x45, 0x95, 0xb5, 0xa6, 0xe1, 0x30, 0x7d, 0xc6, - 0x9a, 0x06, 0xd6, 0xfb, 0xb4, 0xd6, 0xdf, 0x30, 0x57, 0xeb, 0xff, 0xb0, 0x1d, 0xfe, 0x7a, 0x2c, - 0x4a, 0x7d, 0x32, 0x1c, 0x0b, 0xd9, 0x90, 0xd3, 0xd0, 0x81, 0xaa, 0x21, 0x2f, 0x40, 0x91, 0x23, - 0xe3, 0xaa, 0x14, 0xc4, 0x1e, 0xa7, 0x27, 0x6e, 0x69, 0xbe, 0xb1, 0xfa, 0xf9, 0x27, 0x00, 0x00, - 0xff, 0xff, 0x9c, 0x1e, 0x0b, 0x94, 0xc3, 0x01, 0x00, 0x00, + // 304 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0x7c, 0x91, 0x3d, 0x6b, 0xf3, 0x30, + 0x14, 0x85, 0x93, 0x37, 0x21, 0xe1, 0x55, 0xd2, 0x45, 0x21, 0xe0, 0xa1, 0x94, 0x36, 0x53, 0x87, + 0x62, 0x41, 0xba, 0x94, 0x4e, 0x25, 0x59, 0x02, 0xa5, 0x1f, 0x08, 0x4f, 0x5d, 0x8c, 0x2c, 0x5f, + 0x3b, 0xc6, 0x1f, 0x32, 0xd2, 0xf5, 0xe0, 0xff, 0xd8, 0x1f, 0x55, 0x2c, 0xcb, 0x50, 0x8a, 0xc9, + 0x78, 0xef, 0x79, 0x0e, 0xe7, 0xe8, 0x8a, 0x6c, 0x94, 0x8e, 0x41, 0x83, 0x66, 0xb9, 0x48, 0x72, + 0xe1, 0xd7, 0x5a, 0xa1, 0xa2, 0x4b, 0xb7, 0xdc, 0x7d, 0x4f, 0xc9, 0xfa, 0xb5, 0x13, 0xde, 0xc0, + 0x18, 0x91, 0x02, 0x7d, 0x22, 0x4b, 0x0d, 0x69, 0x53, 0x08, 0xed, 0x4d, 0x6f, 0xa7, 0xf7, 0xab, + 0xfd, 0xb5, 0xef, 0x58, 0xff, 0x37, 0xc7, 0x7b, 0xe6, 0x34, 0xe1, 0x03, 0x4e, 0x5f, 0xc8, 0x0a, + 0xb3, 0x12, 0x42, 0x54, 0xa1, 0x6c, 0xd0, 0xfb, 0x67, 0xdd, 0x37, 0xa3, 0xee, 0x20, 0x2b, 0x21, + 0x50, 0xc7, 0x06, 0x4f, 0x13, 0xfe, 0x1f, 0x87, 0xa1, 0xcb, 0x96, 0xaa, 0xaa, 0x40, 0xa2, 0x37, + 0xbb, 0x90, 0x7d, 0xec, 0x99, 0x2e, 0xdb, 0xe1, 0x87, 0x05, 0x99, 0x07, 0x6d, 0x0d, 0x3b, 0x46, + 0x36, 0x23, 0x2d, 0xa9, 0x47, 0x96, 0xb5, 0x68, 0x0b, 0x25, 0x62, 0xfb, 0xa8, 0x35, 0x1f, 0xc6, + 0xdd, 0x33, 0xd9, 0x8e, 0x16, 0xa3, 0x77, 0x64, 0x1d, 0x15, 0x4a, 0xe6, 0x61, 0xd5, 0x94, 0x11, + 0xf4, 0xc7, 0x98, 0xf3, 0x95, 0xdd, 0xbd, 0xdb, 0xd5, 0xdf, 0x30, 0x57, 0xeb, 0x42, 0xd8, 0x91, + 0x5c, 0x39, 0x03, 0x8a, 0x58, 0xa0, 0xa0, 0x7b, 0xb2, 0x2d, 0x84, 0xc1, 0x50, 0x25, 0x89, 0x01, + 0x0c, 0x6b, 0xd0, 0x26, 0x33, 0x08, 0xbd, 0x71, 0xc6, 0x37, 0x9d, 0xf8, 0x61, 0xb5, 0xcf, 0x41, + 0x3a, 0xf8, 0x5f, 0x0f, 0x69, 0x86, 0xe7, 0x26, 0xf2, 0xa5, 0x2a, 0xd9, 0xb9, 0xad, 0x41, 0x17, + 0x10, 0xa7, 0xa0, 0x59, 0x22, 0x22, 0x9d, 0x49, 0x66, 0x7f, 0xd8, 0x30, 0x77, 0xb9, 0x68, 0x61, + 0xe7, 0xc7, 0x9f, 0x00, 0x00, 0x00, 0xff, 0xff, 0x38, 0x89, 0x66, 0xa4, 0x08, 0x02, 0x00, 0x00, } diff --git a/protos/orderer/kafka.proto b/protos/orderer/kafka.proto index bff6567f08c..ac0ee639c1f 100644 --- a/protos/orderer/kafka.proto +++ b/protos/orderer/kafka.proto @@ -20,6 +20,8 @@ option go_package = "github.com/hyperledger/fabric/protos/orderer"; package orderer; +// KafkaMessage is a wrapper type for the messages +// that the Kafka-based orderer deals with. message KafkaMessage { oneof Type { KafkaMessageRegular regular = 1; @@ -28,14 +30,28 @@ message KafkaMessage { } } +// KafkaMessageRegular wraps a marshalled envelope. message KafkaMessageRegular { bytes payload = 1; } +// KafkaMessageTimeToCut is used to signal to the orderers +// that it is time to cut block . message KafkaMessageTimeToCut { uint64 block_number = 1; } +// KafkaMessageConnect is posted by an orderer upon booting up. +// It is used to prevent the panic that would be caused if we +// were to consume an empty partition. It is ignored by all +// orderers when processing the partition. message KafkaMessageConnect { bytes payload = 1; } + +// LastOffsetPersisted is the encoded value for the Metadata message +// which is encoded in the ORDERER block metadata index for the case +// of the Kafka-based orderer. +message KafkaMetadata { + int64 last_offset_persisted = 1; +}