diff --git a/Dockerfile b/Dockerfile index a772064..151b176 100644 --- a/Dockerfile +++ b/Dockerfile @@ -77,3 +77,17 @@ COPY avro/requirements.txt . RUN pip install -r requirements.txt COPY avro/avro_consumer.py . CMD [ "python3", "avro_consumer.py", "-b", "redpanda:29092", "-s", "http://redpanda:8081"] + +FROM python:3.8 AS protobuf-producer +COPY protobuf/requirements.txt . +RUN pip install -r requirements.txt +COPY protobuf/user_pb2.py . +COPY protobuf/protobuf_producer.py . +CMD [ "python3", "protobuf_producer.py", "-b", "redpanda:29092", "-s", "http://redpanda:8081", "-c", "1000000"] + +FROM python:3.8 AS protobuf-consumer +COPY protobuf/requirements.txt . +RUN pip install -r requirements.txt +COPY protobuf/user_pb2.py . +COPY protobuf/protobuf_consumer.py . +CMD [ "python3", "protobuf_consumer.py", "-b", "redpanda:29092", "-s", "http://redpanda:8081"] diff --git a/docker-compose.yml b/docker-compose.yml index c6641fa..ba65daa 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -95,6 +95,16 @@ services: depends_on: - redpanda + protobuf-producer: + image: redpanda-dxfeed-financial-data/protobuf-producer:latest + depends_on: + - redpanda + + protobuf-consumer: + image: redpanda-dxfeed-financial-data/protobuf-consumer:latest + depends_on: + - redpanda + volumes: web-tmp: api-cache: diff --git a/protobuf/Makefile b/protobuf/Makefile new file mode 100644 index 0000000..b8c121a --- /dev/null +++ b/protobuf/Makefile @@ -0,0 +1,6 @@ + +user_pb2.py: user.proto + protoc -I=. --python_out=. ./user.proto; + +clean: + rm -f $(TARGET_DIR)/*_pb2.py diff --git a/protobuf/protobuf_consumer.py b/protobuf/protobuf_consumer.py new file mode 100644 index 0000000..d3408fa --- /dev/null +++ b/protobuf/protobuf_consumer.py @@ -0,0 +1,113 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# +# Copyright 2020 Confluent Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# +# This is a simple example of the SerializingProducer using protobuf. +# +# To regenerate Protobuf classes you must first install the protobuf +# compiler. Once installed you may call protoc directly or use make. +# +# See the protocol buffer docs for instructions on installing and using protoc. +# https://developers.google.com/protocol-buffers/docs/pythontutorial +# +# After installing protoc execute the following command from the examples +# directory to regenerate the user_pb2 module. +# `make` +# +import argparse +import time + +# Protobuf generated class; resides at ./user_pb2.py +import user_pb2 +from confluent_kafka import DeserializingConsumer +from confluent_kafka.error import ConsumeError +from confluent_kafka.schema_registry import SchemaRegistryClient +from confluent_kafka.schema_registry.protobuf import ProtobufDeserializer +from confluent_kafka.serialization import StringDeserializer + + +def main(args): + topic = args.topic + subject = f"{topic}-value" + + protobuf_deserializer = ProtobufDeserializer(user_pb2.User) + string_deserializer = StringDeserializer('utf_8') + + consumer_conf = {'bootstrap.servers': args.bootstrap_servers, + 'key.deserializer': string_deserializer, + 'value.deserializer': protobuf_deserializer, + 'group.id': args.group, + 'auto.offset.reset': "earliest"} + + consumer = None + schema_registry_conf = {'url': args.schema_registry} + + for i in range(60): + try: + schema_registry_client = SchemaRegistryClient(schema_registry_conf) + subs = schema_registry_client.get_subjects() + if subject in subs: + consumer = DeserializingConsumer(consumer_conf) + consumer.subscribe([topic]) + break + else: + print( + f"Failed to get subject: {subject}, retrying", flush=True) + time.sleep(1) + except Exception as e: + print(f"Failed to connect: {e}, retrying", flush=True) + time.sleep(1) + + while consumer: + try: + # SIGINT can't be handled when polling, limit timeout to 1 second. + msg = consumer.poll(1.0) + if msg is None: + continue + + user = msg.value() + if user is not None: + print("User record {}:\n" + "\tname: {}\n" + "\tfavorite_number: {}\n" + "\tfavorite_color: {}\n" + .format(msg.key(), user.name, + user.favorite_color, + user.favorite_number)) + except KeyboardInterrupt: + break + except ConsumeError as e: + print(f"Error consuming, retrying {e}") + time.sleep(1) + + consumer.close() + + +if __name__ == '__main__': + parser = argparse.ArgumentParser( + description="DeserializingConsumer Example") + parser.add_argument('-b', dest="bootstrap_servers", required=True, + help="Bootstrap broker(s) (host[:port])") + parser.add_argument('-s', dest="schema_registry", required=True, + help="Schema Registry (http(s)://host[:port]") + parser.add_argument('-t', dest="topic", default="example_serde_protobuf", + help="Topic name") + parser.add_argument('-g', dest="group", default="example_serde_protobuf", + help="Consumer group") + + main(parser.parse_args()) diff --git a/protobuf/protobuf_producer.py b/protobuf/protobuf_producer.py new file mode 100644 index 0000000..17bfb0b --- /dev/null +++ b/protobuf/protobuf_producer.py @@ -0,0 +1,128 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# +# Copyright 2020 Confluent Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# +# This is a simple example of the SerializingProducer using protobuf. +# +# To regenerate Protobuf classes you must first install the protobuf +# compiler. Once installed you may call protoc directly or use make. +# +# See the protocol buffer docs for instructions on installing and using protoc. +# https://developers.google.com/protocol-buffers/docs/pythontutorial +# +# After installing protoc execute the following command from the examples +# directory to regenerate the user_pb2 module. +# `make` +# +import argparse +import time +from uuid import uuid4 + +# Protobuf generated class; resides at ./user_pb2.py +import user_pb2 +from confluent_kafka import SerializingProducer +from confluent_kafka.serialization import StringSerializer +from confluent_kafka.schema_registry import SchemaRegistryClient, SchemaRegistryError +from confluent_kafka.schema_registry.protobuf import ProtobufSerializer + + +def delivery_report(err, msg): + """ + Reports the failure or success of a message delivery. + + Args: + err (KafkaError): The error that occurred on None on success. + + msg (Message): The message that was produced or failed. + + Note: + In the delivery report callback the Message.key() and Message.value() + will be the binary format as encoded by any configured Serializers and + not the same object that was passed to produce(). + If you wish to pass the original object(s) for key and value to delivery + report callback we recommend a bound callback or lambda where you pass + the objects along. + + """ + if err is not None: + print("Delivery failed for User record {}: {}".format(msg.key(), err)) + return + print('User record {} successfully produced to {} [{}] at offset {}'.format( + msg.key(), msg.topic(), msg.partition(), msg.offset())) + + +def main(args): + topic = args.topic + + schema_registry_conf = {'url': args.schema_registry} + schema_registry_client: SchemaRegistryClient + + for i in range(60): + try: + schema_registry_client = SchemaRegistryClient(schema_registry_conf) + schema_registry_client.get_subjects() + except Exception: + print("Failed to connect, retrying", flush=True) + time.sleep(1) + + protobuf_serializer = ProtobufSerializer( + user_pb2.User, schema_registry_client) + + producer_conf = {'bootstrap.servers': args.bootstrap_servers, + 'key.serializer': StringSerializer('utf_8'), + 'value.serializer': protobuf_serializer} + + producer = SerializingProducer(producer_conf) + + print("Producing user records to topic {}. ^C to exit.".format(topic)) + for i in range(int(args.count)): + # Prevent overflow of buffer + while len(producer) > 50000: + # Serve on_delivery callbacks from previous calls to produce() + producer.poll(0.001) + try: + user_name = "Ben" + user_favorite_number = i + user_favorite_color = "blue" + user = user_pb2.User(name=user_name, + favorite_color=user_favorite_color, + favorite_number=user_favorite_number) + producer.produce(topic=topic, key=str(uuid4()), value=user, + on_delivery=delivery_report) + except KeyboardInterrupt: + break + except ValueError: + print("Invalid input, discarding record...") + continue + + print("\nFlushing records...") + producer.flush() + + +if __name__ == '__main__': + parser = argparse.ArgumentParser(description="SerializingProducer Example") + parser.add_argument('-b', dest="bootstrap_servers", required=True, + help="Bootstrap broker(s) (host[:port])") + parser.add_argument('-s', dest="schema_registry", required=True, + help="Schema Registry (http(s)://host[:port]") + parser.add_argument('-t', dest="topic", default="example_serde_protobuf", + help="Topic name") + parser.add_argument('-c', dest="count", default="1000000", + help="Number of messages to send") + + main(parser.parse_args()) diff --git a/protobuf/requirements.txt b/protobuf/requirements.txt new file mode 100644 index 0000000..4a9e5bf --- /dev/null +++ b/protobuf/requirements.txt @@ -0,0 +1,3 @@ +confluent_kafka==1.7.0 +protobuf==3.19.3 +requests==v2.27.1 diff --git a/protobuf/user.proto b/protobuf/user.proto new file mode 100644 index 0000000..32ce6eb --- /dev/null +++ b/protobuf/user.proto @@ -0,0 +1,7 @@ +syntax = "proto3"; + +message User { + string name = 1; + int64 favorite_number = 2; + string favorite_color = 3; +} diff --git a/protobuf/user_pb2.py b/protobuf/user_pb2.py new file mode 100644 index 0000000..f0d1df0 --- /dev/null +++ b/protobuf/user_pb2.py @@ -0,0 +1,84 @@ +# -*- coding: utf-8 -*- +# Generated by the protocol buffer compiler. DO NOT EDIT! +# source: user.proto + +from google.protobuf import descriptor as _descriptor +from google.protobuf import message as _message +from google.protobuf import reflection as _reflection +from google.protobuf import symbol_database as _symbol_database +# @@protoc_insertion_point(imports) + +_sym_db = _symbol_database.Default() + + + + +DESCRIPTOR = _descriptor.FileDescriptor( + name='user.proto', + package='', + syntax='proto3', + serialized_options=None, + create_key=_descriptor._internal_create_key, + serialized_pb=b'\n\nuser.proto\"E\n\x04User\x12\x0c\n\x04name\x18\x01 \x01(\t\x12\x17\n\x0f\x66\x61vorite_number\x18\x02 \x01(\x03\x12\x16\n\x0e\x66\x61vorite_color\x18\x03 \x01(\tb\x06proto3' +) + + + + +_USER = _descriptor.Descriptor( + name='User', + full_name='User', + filename=None, + file=DESCRIPTOR, + containing_type=None, + create_key=_descriptor._internal_create_key, + fields=[ + _descriptor.FieldDescriptor( + name='name', full_name='User.name', index=0, + number=1, type=9, cpp_type=9, label=1, + has_default_value=False, default_value=b"".decode('utf-8'), + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key), + _descriptor.FieldDescriptor( + name='favorite_number', full_name='User.favorite_number', index=1, + number=2, type=3, cpp_type=2, label=1, + has_default_value=False, default_value=0, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key), + _descriptor.FieldDescriptor( + name='favorite_color', full_name='User.favorite_color', index=2, + number=3, type=9, cpp_type=9, label=1, + has_default_value=False, default_value=b"".decode('utf-8'), + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key), + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + serialized_options=None, + is_extendable=False, + syntax='proto3', + extension_ranges=[], + oneofs=[ + ], + serialized_start=14, + serialized_end=83, +) + +DESCRIPTOR.message_types_by_name['User'] = _USER +_sym_db.RegisterFileDescriptor(DESCRIPTOR) + +User = _reflection.GeneratedProtocolMessageType('User', (_message.Message,), { + 'DESCRIPTOR' : _USER, + '__module__' : 'user_pb2' + # @@protoc_insertion_point(class_scope:User) + }) +_sym_db.RegisterMessage(User) + + +# @@protoc_insertion_point(module_scope) diff --git a/redpanda_start.sh b/redpanda_start.sh index 10592ca..4b9b228 100755 --- a/redpanda_start.sh +++ b/redpanda_start.sh @@ -12,4 +12,6 @@ docker build --target dxfeed-publish-timeandsale -t redpanda-dxfeed-financial-da docker build --target dxfeed-publish-series -t redpanda-dxfeed-financial-data/dxfeed-publish-series:latest . docker build --target avro-producer -t redpanda-dxfeed-financial-data/avro-producer:latest . docker build --target avro-consumer -t redpanda-dxfeed-financial-data/avro-consumer:latest . +docker build --target protobuf-producer -t redpanda-dxfeed-financial-data/protobuf-producer:latest . +docker build --target protobuf-consumer -t redpanda-dxfeed-financial-data/protobuf-consumer:latest . docker-compose up -d