From a99e792ad9668193b9f64f31f22378e341a467ec Mon Sep 17 00:00:00 2001 From: Kostas Christidis Date: Sun, 11 Dec 2016 00:51:46 -0500 Subject: [PATCH] [FAB-1365] Introduce Kafka container message types https://jira.hyperledger.org/browse/FAB-1365 The revised Kafka consenter needs two special messages: 1. A time-to-cut message that is used to mark the end of a block, and 2. A no-op message that each shim posts when bootstrapped by the multichain manager to prevent the possibility of "listening in" (seeking and consuming) on a topic/partition that nobody has posted to yet [1]. This is an operation that panics in Kafka: "[ERROR] Cannot retrieve required offset from Kafka cluster: kafka server: The request attempted to perform an operation on an invalid topic." These messages are special because they don't carry transactions, and because the Kafka consenter will treat them in a special way: it will ignore every time-to-cut message (for a specific block number) besides the first one, and it will ignore all "no-op" messages when processing incoming messages from the chain partition. This changeset defines the types that will carry these messages, as well as helper functions to generate them. Note that these are not hooked into the main path yet, though a preview of these in action can be found here: https://github.com/kchristidis/fabric/blob/47752ed61fcab1b26207a9e9075c1c793d723912/orderer/kafka/main.go#L142 https://github.com/kchristidis/fabric/blob/47752ed61fcab1b26207a9e9075c1c793d723912/orderer/kafka/main.go#L164 https://github.com/kchristidis/fabric/blob/47752ed61fcab1b26207a9e9075c1c793d723912/orderer/kafka/main.go#L204 These changes will be hooked into the main path in a follow-up changeset that introduces the revised Kafka consenter. [1] We ask the consenter to "listen in" on an empty topic/partition every time a new chain is created, since we never actually post the genesis block to that chain's partition. Change-Id: Ic7ebbf2585e6e8e5080866e0d110d9cff5a16de5 Signed-off-by: Kostas Christidis --- orderer/kafka/util.go | 31 ++++ protos/orderer/ab.pb.go | 5 + protos/orderer/configuration.pb.go | 24 +++- protos/orderer/kafka.pb.go | 223 +++++++++++++++++++++++++++++ protos/orderer/kafka.proto | 41 ++++++ 5 files changed, 317 insertions(+), 7 deletions(-) create mode 100644 protos/orderer/kafka.pb.go create mode 100644 protos/orderer/kafka.proto diff --git a/orderer/kafka/util.go b/orderer/kafka/util.go index bba74d1aaea..ad31b5021f7 100644 --- a/orderer/kafka/util.go +++ b/orderer/kafka/util.go @@ -19,6 +19,7 @@ package kafka import ( "github.com/Shopify/sarama" "github.com/hyperledger/fabric/orderer/localconfig" + ab "github.com/hyperledger/fabric/protos/orderer" ) const ( @@ -41,6 +42,36 @@ func newBrokerConfig(conf *config.TopLevel) *sarama.Config { return brokerConfig } +func newConnectMessage() *ab.KafkaMessage { + return &ab.KafkaMessage{ + Type: &ab.KafkaMessage_Connect{ + Connect: &ab.KafkaMessageConnect{ + Payload: nil, + }, + }, + } +} + +func newRegularMessage(payload []byte) *ab.KafkaMessage { + return &ab.KafkaMessage{ + Type: &ab.KafkaMessage_Regular{ + Regular: &ab.KafkaMessageRegular{ + Payload: payload, + }, + }, + } +} + +func newTimeToCutMessage(blockNumber uint64) *ab.KafkaMessage { + return &ab.KafkaMessage{ + Type: &ab.KafkaMessage_TimeToCut{ + TimeToCut: &ab.KafkaMessageTimeToCut{ + BlockNumber: blockNumber, + }, + }, + } +} + func newMsg(payload []byte, topic string) *sarama.ProducerMessage { return &sarama.ProducerMessage{ Topic: topic, diff --git a/protos/orderer/ab.pb.go b/protos/orderer/ab.pb.go index 49742fc361b..723b92e9ad4 100644 --- a/protos/orderer/ab.pb.go +++ b/protos/orderer/ab.pb.go @@ -8,6 +8,7 @@ Package orderer is a generated protocol buffer package. It is generated from these files: orderer/ab.proto orderer/configuration.proto + orderer/kafka.proto It has these top-level messages: BroadcastResponse @@ -20,6 +21,10 @@ It has these top-level messages: CreationPolicy ChainCreators KafkaBrokers + KafkaMessage + KafkaMessageRegular + KafkaMessageTimeToCut + KafkaMessageConnect */ package orderer diff --git a/protos/orderer/configuration.pb.go b/protos/orderer/configuration.pb.go index f9e2e69d0c5..196169c2085 100644 --- a/protos/orderer/configuration.pb.go +++ b/protos/orderer/configuration.pb.go @@ -24,7 +24,8 @@ func (*ConsensusType) ProtoMessage() {} func (*ConsensusType) Descriptor() ([]byte, []int) { return fileDescriptor1, []int{0} } type BatchSize struct { - // Simply specified as messages for now, in the future we may want to allow this to be specified by size in bytes + // Simply specified as messages for now, in the future we may want to allow + // this to be specified by size in bytes Messages uint32 `protobuf:"varint,1,opt,name=messages" json:"messages,omitempty"` } @@ -33,12 +34,16 @@ func (m *BatchSize) String() string { return proto.CompactTextString( func (*BatchSize) ProtoMessage() {} func (*BatchSize) Descriptor() ([]byte, []int) { return fileDescriptor1, []int{1} } -// When submitting a new chain configuration transaction to create a new chain, the first configuration item must of of type -// Orderer with Key CreationPolicy and contents of a Marshaled CreationPolicy. The policy should be set to the policy which -// was supplied by the ordering service for the client's chain creation. The digest should be the hash of the concatenation -// of the remaining ConfigurationItem bytes. The signatures of the configuration item should satisfy the policy for chain creation +// When submitting a new chain configuration transaction to create a new chain, +// the first configuration item must be of type Orderer with Key CreationPolicy +// and contents of a Marshaled CreationPolicy. The policy should be set to the +// policy which was supplied by the ordering service for the client's chain +// creation. The digest should be the hash of the concatenation of the remaining +// ConfigurationItem bytes. The signatures of the configuration item should +// satisfy the policy for chain creation. type CreationPolicy struct { - // The name of the policy which should be used to validate the creation of this chain + // The name of the policy which should be used to validate the creation of + // this chain Policy string `protobuf:"bytes,1,opt,name=policy" json:"policy,omitempty"` // The hash of the concatenation of remaining configuration item bytes Digest []byte `protobuf:"bytes,2,opt,name=digest,proto3" json:"digest,omitempty"` @@ -50,7 +55,8 @@ func (*CreationPolicy) ProtoMessage() {} func (*CreationPolicy) Descriptor() ([]byte, []int) { return fileDescriptor1, []int{2} } type ChainCreators struct { - // A list of policies, any of which may be specified as the chain creation policy in a chain creation request + // A list of policies, any of which may be specified as the chain creation + // policy in a chain creation request Policies []string `protobuf:"bytes,1,rep,name=policies" json:"policies,omitempty"` } @@ -59,7 +65,11 @@ func (m *ChainCreators) String() string { return proto.CompactTextStr func (*ChainCreators) ProtoMessage() {} func (*ChainCreators) Descriptor() ([]byte, []int) { return fileDescriptor1, []int{3} } +// Carries a list of bootstrap brokers, i.e. this is not the exclusive set of +// brokers an ordering service type KafkaBrokers struct { + // Each broker here should be identified using the (IP|host):port notation, + // e.g. 127.0.0.1:7050, or localhost:7050 are valid entries Brokers []string `protobuf:"bytes,1,rep,name=brokers" json:"brokers,omitempty"` } diff --git a/protos/orderer/kafka.pb.go b/protos/orderer/kafka.pb.go new file mode 100644 index 00000000000..2d1194a68d6 --- /dev/null +++ b/protos/orderer/kafka.pb.go @@ -0,0 +1,223 @@ +// Code generated by protoc-gen-go. +// source: orderer/kafka.proto +// DO NOT EDIT! + +package orderer + +import proto "github.com/golang/protobuf/proto" +import fmt "fmt" +import math "math" + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = fmt.Errorf +var _ = math.Inf + +type KafkaMessage struct { + // Types that are valid to be assigned to Type: + // *KafkaMessage_Regular + // *KafkaMessage_TimeToCut + // *KafkaMessage_Connect + Type isKafkaMessage_Type `protobuf_oneof:"Type"` +} + +func (m *KafkaMessage) Reset() { *m = KafkaMessage{} } +func (m *KafkaMessage) String() string { return proto.CompactTextString(m) } +func (*KafkaMessage) ProtoMessage() {} +func (*KafkaMessage) Descriptor() ([]byte, []int) { return fileDescriptor2, []int{0} } + +type isKafkaMessage_Type interface { + isKafkaMessage_Type() +} + +type KafkaMessage_Regular struct { + Regular *KafkaMessageRegular `protobuf:"bytes,1,opt,name=regular,oneof"` +} +type KafkaMessage_TimeToCut struct { + TimeToCut *KafkaMessageTimeToCut `protobuf:"bytes,2,opt,name=time_to_cut,json=timeToCut,oneof"` +} +type KafkaMessage_Connect struct { + Connect *KafkaMessageConnect `protobuf:"bytes,3,opt,name=connect,oneof"` +} + +func (*KafkaMessage_Regular) isKafkaMessage_Type() {} +func (*KafkaMessage_TimeToCut) isKafkaMessage_Type() {} +func (*KafkaMessage_Connect) isKafkaMessage_Type() {} + +func (m *KafkaMessage) GetType() isKafkaMessage_Type { + if m != nil { + return m.Type + } + return nil +} + +func (m *KafkaMessage) GetRegular() *KafkaMessageRegular { + if x, ok := m.GetType().(*KafkaMessage_Regular); ok { + return x.Regular + } + return nil +} + +func (m *KafkaMessage) GetTimeToCut() *KafkaMessageTimeToCut { + if x, ok := m.GetType().(*KafkaMessage_TimeToCut); ok { + return x.TimeToCut + } + return nil +} + +func (m *KafkaMessage) GetConnect() *KafkaMessageConnect { + if x, ok := m.GetType().(*KafkaMessage_Connect); ok { + return x.Connect + } + return nil +} + +// XXX_OneofFuncs is for the internal use of the proto package. +func (*KafkaMessage) XXX_OneofFuncs() (func(msg proto.Message, b *proto.Buffer) error, func(msg proto.Message, tag, wire int, b *proto.Buffer) (bool, error), func(msg proto.Message) (n int), []interface{}) { + return _KafkaMessage_OneofMarshaler, _KafkaMessage_OneofUnmarshaler, _KafkaMessage_OneofSizer, []interface{}{ + (*KafkaMessage_Regular)(nil), + (*KafkaMessage_TimeToCut)(nil), + (*KafkaMessage_Connect)(nil), + } +} + +func _KafkaMessage_OneofMarshaler(msg proto.Message, b *proto.Buffer) error { + m := msg.(*KafkaMessage) + // Type + switch x := m.Type.(type) { + case *KafkaMessage_Regular: + b.EncodeVarint(1<<3 | proto.WireBytes) + if err := b.EncodeMessage(x.Regular); err != nil { + return err + } + case *KafkaMessage_TimeToCut: + b.EncodeVarint(2<<3 | proto.WireBytes) + if err := b.EncodeMessage(x.TimeToCut); err != nil { + return err + } + case *KafkaMessage_Connect: + b.EncodeVarint(3<<3 | proto.WireBytes) + if err := b.EncodeMessage(x.Connect); err != nil { + return err + } + case nil: + default: + return fmt.Errorf("KafkaMessage.Type has unexpected type %T", x) + } + return nil +} + +func _KafkaMessage_OneofUnmarshaler(msg proto.Message, tag, wire int, b *proto.Buffer) (bool, error) { + m := msg.(*KafkaMessage) + switch tag { + case 1: // Type.regular + if wire != proto.WireBytes { + return true, proto.ErrInternalBadWireType + } + msg := new(KafkaMessageRegular) + err := b.DecodeMessage(msg) + m.Type = &KafkaMessage_Regular{msg} + return true, err + case 2: // Type.time_to_cut + if wire != proto.WireBytes { + return true, proto.ErrInternalBadWireType + } + msg := new(KafkaMessageTimeToCut) + err := b.DecodeMessage(msg) + m.Type = &KafkaMessage_TimeToCut{msg} + return true, err + case 3: // Type.connect + if wire != proto.WireBytes { + return true, proto.ErrInternalBadWireType + } + msg := new(KafkaMessageConnect) + err := b.DecodeMessage(msg) + m.Type = &KafkaMessage_Connect{msg} + return true, err + default: + return false, nil + } +} + +func _KafkaMessage_OneofSizer(msg proto.Message) (n int) { + m := msg.(*KafkaMessage) + // Type + switch x := m.Type.(type) { + case *KafkaMessage_Regular: + s := proto.Size(x.Regular) + n += proto.SizeVarint(1<<3 | proto.WireBytes) + n += proto.SizeVarint(uint64(s)) + n += s + case *KafkaMessage_TimeToCut: + s := proto.Size(x.TimeToCut) + n += proto.SizeVarint(2<<3 | proto.WireBytes) + n += proto.SizeVarint(uint64(s)) + n += s + case *KafkaMessage_Connect: + s := proto.Size(x.Connect) + n += proto.SizeVarint(3<<3 | proto.WireBytes) + n += proto.SizeVarint(uint64(s)) + n += s + case nil: + default: + panic(fmt.Sprintf("proto: unexpected type %T in oneof", x)) + } + return n +} + +type KafkaMessageRegular struct { + Payload []byte `protobuf:"bytes,1,opt,name=payload,proto3" json:"payload,omitempty"` +} + +func (m *KafkaMessageRegular) Reset() { *m = KafkaMessageRegular{} } +func (m *KafkaMessageRegular) String() string { return proto.CompactTextString(m) } +func (*KafkaMessageRegular) ProtoMessage() {} +func (*KafkaMessageRegular) Descriptor() ([]byte, []int) { return fileDescriptor2, []int{1} } + +type KafkaMessageTimeToCut struct { + BlockNumber uint64 `protobuf:"varint,1,opt,name=block_number,json=blockNumber" json:"block_number,omitempty"` +} + +func (m *KafkaMessageTimeToCut) Reset() { *m = KafkaMessageTimeToCut{} } +func (m *KafkaMessageTimeToCut) String() string { return proto.CompactTextString(m) } +func (*KafkaMessageTimeToCut) ProtoMessage() {} +func (*KafkaMessageTimeToCut) Descriptor() ([]byte, []int) { return fileDescriptor2, []int{2} } + +type KafkaMessageConnect struct { + Payload []byte `protobuf:"bytes,1,opt,name=payload,proto3" json:"payload,omitempty"` +} + +func (m *KafkaMessageConnect) Reset() { *m = KafkaMessageConnect{} } +func (m *KafkaMessageConnect) String() string { return proto.CompactTextString(m) } +func (*KafkaMessageConnect) ProtoMessage() {} +func (*KafkaMessageConnect) Descriptor() ([]byte, []int) { return fileDescriptor2, []int{3} } + +func init() { + proto.RegisterType((*KafkaMessage)(nil), "orderer.KafkaMessage") + proto.RegisterType((*KafkaMessageRegular)(nil), "orderer.KafkaMessageRegular") + proto.RegisterType((*KafkaMessageTimeToCut)(nil), "orderer.KafkaMessageTimeToCut") + proto.RegisterType((*KafkaMessageConnect)(nil), "orderer.KafkaMessageConnect") +} + +func init() { proto.RegisterFile("orderer/kafka.proto", fileDescriptor2) } + +var fileDescriptor2 = []byte{ + // 266 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0x7c, 0x91, 0x3f, 0x4b, 0xc4, 0x30, + 0x18, 0xc6, 0xaf, 0x7a, 0x5c, 0x31, 0xed, 0xd4, 0x43, 0xe8, 0x20, 0xa2, 0x9d, 0x1c, 0x24, 0x01, + 0x5d, 0xc4, 0x49, 0xee, 0x96, 0x03, 0xd1, 0x21, 0x74, 0x72, 0x29, 0x49, 0xfa, 0x5e, 0xaf, 0xf4, + 0x4f, 0x4a, 0x9a, 0x0c, 0xfd, 0x8e, 0x7e, 0x28, 0x69, 0x9a, 0x82, 0x48, 0xbd, 0xf1, 0x79, 0xf2, + 0x7b, 0x79, 0x9e, 0x37, 0x2f, 0xda, 0x4a, 0x95, 0x83, 0x02, 0x45, 0x2a, 0x76, 0xac, 0x18, 0xee, + 0x94, 0xd4, 0x32, 0xf2, 0x9d, 0x99, 0x7c, 0x7b, 0x28, 0x7c, 0x1f, 0x1f, 0x3e, 0xa0, 0xef, 0x59, + 0x01, 0xd1, 0x0b, 0xf2, 0x15, 0x14, 0xa6, 0x66, 0x2a, 0xf6, 0xee, 0xbc, 0x87, 0xe0, 0xe9, 0x06, + 0x3b, 0x16, 0xff, 0xe6, 0xe8, 0xc4, 0x1c, 0x56, 0x74, 0xc6, 0xa3, 0x37, 0x14, 0xe8, 0xb2, 0x81, + 0x4c, 0xcb, 0x4c, 0x18, 0x1d, 0x5f, 0xd8, 0xe9, 0xdb, 0xc5, 0xe9, 0xb4, 0x6c, 0x20, 0x95, 0x7b, + 0xa3, 0x0f, 0x2b, 0x7a, 0xa5, 0x67, 0x31, 0x66, 0x0b, 0xd9, 0xb6, 0x20, 0x74, 0x7c, 0x79, 0x26, + 0x7b, 0x3f, 0x31, 0x63, 0xb6, 0xc3, 0x77, 0x1b, 0xb4, 0x4e, 0x87, 0x0e, 0x12, 0x82, 0xb6, 0x0b, + 0x2d, 0xa3, 0x18, 0xf9, 0x1d, 0x1b, 0x6a, 0xc9, 0x72, 0xbb, 0x54, 0x48, 0x67, 0x99, 0xbc, 0xa2, + 0xeb, 0xc5, 0x62, 0xd1, 0x3d, 0x0a, 0x79, 0x2d, 0x45, 0x95, 0xb5, 0xa6, 0xe1, 0x30, 0x7d, 0xc6, + 0x9a, 0x06, 0xd6, 0xfb, 0xb4, 0xd6, 0xdf, 0x30, 0x57, 0xeb, 0xff, 0xb0, 0x1d, 0xfe, 0x7a, 0x2c, + 0x4a, 0x7d, 0x32, 0x1c, 0x0b, 0xd9, 0x90, 0xd3, 0xd0, 0x81, 0xaa, 0x21, 0x2f, 0x40, 0x91, 0x23, + 0xe3, 0xaa, 0x14, 0xc4, 0x1e, 0xa7, 0x27, 0x6e, 0x69, 0xbe, 0xb1, 0xfa, 0xf9, 0x27, 0x00, 0x00, + 0xff, 0xff, 0x9c, 0x1e, 0x0b, 0x94, 0xc3, 0x01, 0x00, 0x00, +} diff --git a/protos/orderer/kafka.proto b/protos/orderer/kafka.proto new file mode 100644 index 00000000000..bff6567f08c --- /dev/null +++ b/protos/orderer/kafka.proto @@ -0,0 +1,41 @@ +/* +Copyright IBM Corp. 2016 All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +syntax = "proto3"; + +option go_package = "github.com/hyperledger/fabric/protos/orderer"; + +package orderer; + +message KafkaMessage { + oneof Type { + KafkaMessageRegular regular = 1; + KafkaMessageTimeToCut time_to_cut = 2; + KafkaMessageConnect connect = 3; + } +} + +message KafkaMessageRegular { + bytes payload = 1; +} + +message KafkaMessageTimeToCut { + uint64 block_number = 1; +} + +message KafkaMessageConnect { + bytes payload = 1; +}