Skip to content

Commit

Permalink
Add example of producing and consuming protobuf messages
Browse files Browse the repository at this point in the history
Requires fix for: redpanda-data/redpanda#3633

Signed-off-by: Ben Pope <ben@vectorized.io>
  • Loading branch information
BenPope committed Jan 29, 2022
1 parent 8091488 commit 7fa9717
Show file tree
Hide file tree
Showing 9 changed files with 367 additions and 0 deletions.
14 changes: 14 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -77,3 +77,17 @@ COPY avro/requirements.txt .
RUN pip install -r requirements.txt
COPY avro/avro_consumer.py .
CMD [ "python3", "avro_consumer.py", "-b", "redpanda:29092", "-s", "http://redpanda:8081"]

FROM python:3.8 AS protobuf-producer
COPY protobuf/requirements.txt .
RUN pip install -r requirements.txt
COPY protobuf/user_pb2.py .
COPY protobuf/protobuf_producer.py .
CMD [ "python3", "protobuf_producer.py", "-b", "redpanda:29092", "-s", "http://redpanda:8081", "-c", "1000000"]

FROM python:3.8 AS protobuf-consumer
COPY protobuf/requirements.txt .
RUN pip install -r requirements.txt
COPY protobuf/user_pb2.py .
COPY protobuf/protobuf_consumer.py .
CMD [ "python3", "protobuf_consumer.py", "-b", "redpanda:29092", "-s", "http://redpanda:8081"]
10 changes: 10 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,16 @@ services:
depends_on:
- redpanda

protobuf-producer:
image: redpanda-dxfeed-financial-data/protobuf-producer:latest
depends_on:
- redpanda

protobuf-consumer:
image: redpanda-dxfeed-financial-data/protobuf-consumer:latest
depends_on:
- redpanda

volumes:
web-tmp:
api-cache:
6 changes: 6 additions & 0 deletions protobuf/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@

user_pb2.py: user.proto
protoc -I=. --python_out=. ./user.proto;

clean:
rm -f $(TARGET_DIR)/*_pb2.py
113 changes: 113 additions & 0 deletions protobuf/protobuf_consumer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# Copyright 2020 Confluent Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

#
# This is a simple example of the SerializingProducer using protobuf.
#
# To regenerate Protobuf classes you must first install the protobuf
# compiler. Once installed you may call protoc directly or use make.
#
# See the protocol buffer docs for instructions on installing and using protoc.
# https://developers.google.com/protocol-buffers/docs/pythontutorial
#
# After installing protoc execute the following command from the examples
# directory to regenerate the user_pb2 module.
# `make`
#
import argparse
import time

# Protobuf generated class; resides at ./user_pb2.py
import user_pb2
from confluent_kafka import DeserializingConsumer
from confluent_kafka.error import ConsumeError
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.protobuf import ProtobufDeserializer
from confluent_kafka.serialization import StringDeserializer


def main(args):
topic = args.topic
subject = f"{topic}-value"

protobuf_deserializer = ProtobufDeserializer(user_pb2.User)
string_deserializer = StringDeserializer('utf_8')

consumer_conf = {'bootstrap.servers': args.bootstrap_servers,
'key.deserializer': string_deserializer,
'value.deserializer': protobuf_deserializer,
'group.id': args.group,
'auto.offset.reset': "earliest"}

consumer = None
schema_registry_conf = {'url': args.schema_registry}

for i in range(60):
try:
schema_registry_client = SchemaRegistryClient(schema_registry_conf)
subs = schema_registry_client.get_subjects()
if subject in subs:
consumer = DeserializingConsumer(consumer_conf)
consumer.subscribe([topic])
break
else:
print(
f"Failed to get subject: {subject}, retrying", flush=True)
time.sleep(1)
except Exception as e:
print(f"Failed to connect: {e}, retrying", flush=True)
time.sleep(1)

while consumer:
try:
# SIGINT can't be handled when polling, limit timeout to 1 second.
msg = consumer.poll(1.0)
if msg is None:
continue

user = msg.value()
if user is not None:
print("User record {}:\n"
"\tname: {}\n"
"\tfavorite_number: {}\n"
"\tfavorite_color: {}\n"
.format(msg.key(), user.name,
user.favorite_color,
user.favorite_number))
except KeyboardInterrupt:
break
except ConsumeError as e:
print(f"Error consuming, retrying {e}")
time.sleep(1)

consumer.close()


if __name__ == '__main__':
parser = argparse.ArgumentParser(
description="DeserializingConsumer Example")
parser.add_argument('-b', dest="bootstrap_servers", required=True,
help="Bootstrap broker(s) (host[:port])")
parser.add_argument('-s', dest="schema_registry", required=True,
help="Schema Registry (http(s)://host[:port]")
parser.add_argument('-t', dest="topic", default="example_serde_protobuf",
help="Topic name")
parser.add_argument('-g', dest="group", default="example_serde_protobuf",
help="Consumer group")

main(parser.parse_args())
128 changes: 128 additions & 0 deletions protobuf/protobuf_producer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# Copyright 2020 Confluent Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

#
# This is a simple example of the SerializingProducer using protobuf.
#
# To regenerate Protobuf classes you must first install the protobuf
# compiler. Once installed you may call protoc directly or use make.
#
# See the protocol buffer docs for instructions on installing and using protoc.
# https://developers.google.com/protocol-buffers/docs/pythontutorial
#
# After installing protoc execute the following command from the examples
# directory to regenerate the user_pb2 module.
# `make`
#
import argparse
import time
from uuid import uuid4

# Protobuf generated class; resides at ./user_pb2.py
import user_pb2
from confluent_kafka import SerializingProducer
from confluent_kafka.serialization import StringSerializer
from confluent_kafka.schema_registry import SchemaRegistryClient, SchemaRegistryError
from confluent_kafka.schema_registry.protobuf import ProtobufSerializer


def delivery_report(err, msg):
"""
Reports the failure or success of a message delivery.
Args:
err (KafkaError): The error that occurred on None on success.
msg (Message): The message that was produced or failed.
Note:
In the delivery report callback the Message.key() and Message.value()
will be the binary format as encoded by any configured Serializers and
not the same object that was passed to produce().
If you wish to pass the original object(s) for key and value to delivery
report callback we recommend a bound callback or lambda where you pass
the objects along.
"""
if err is not None:
print("Delivery failed for User record {}: {}".format(msg.key(), err))
return
print('User record {} successfully produced to {} [{}] at offset {}'.format(
msg.key(), msg.topic(), msg.partition(), msg.offset()))


def main(args):
topic = args.topic

schema_registry_conf = {'url': args.schema_registry}
schema_registry_client: SchemaRegistryClient

for i in range(60):
try:
schema_registry_client = SchemaRegistryClient(schema_registry_conf)
schema_registry_client.get_subjects()
except Exception:
print("Failed to connect, retrying", flush=True)
time.sleep(1)

protobuf_serializer = ProtobufSerializer(
user_pb2.User, schema_registry_client)

producer_conf = {'bootstrap.servers': args.bootstrap_servers,
'key.serializer': StringSerializer('utf_8'),
'value.serializer': protobuf_serializer}

producer = SerializingProducer(producer_conf)

print("Producing user records to topic {}. ^C to exit.".format(topic))
for i in range(int(args.count)):
# Prevent overflow of buffer
while len(producer) > 50000:
# Serve on_delivery callbacks from previous calls to produce()
producer.poll(0.001)
try:
user_name = "Ben"
user_favorite_number = i
user_favorite_color = "blue"
user = user_pb2.User(name=user_name,
favorite_color=user_favorite_color,
favorite_number=user_favorite_number)
producer.produce(topic=topic, key=str(uuid4()), value=user,
on_delivery=delivery_report)
except KeyboardInterrupt:
break
except ValueError:
print("Invalid input, discarding record...")
continue

print("\nFlushing records...")
producer.flush()


if __name__ == '__main__':
parser = argparse.ArgumentParser(description="SerializingProducer Example")
parser.add_argument('-b', dest="bootstrap_servers", required=True,
help="Bootstrap broker(s) (host[:port])")
parser.add_argument('-s', dest="schema_registry", required=True,
help="Schema Registry (http(s)://host[:port]")
parser.add_argument('-t', dest="topic", default="example_serde_protobuf",
help="Topic name")
parser.add_argument('-c', dest="count", default="1000000",
help="Number of messages to send")

main(parser.parse_args())
3 changes: 3 additions & 0 deletions protobuf/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
confluent_kafka==1.7.0
protobuf==3.19.3
requests==v2.27.1
7 changes: 7 additions & 0 deletions protobuf/user.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
syntax = "proto3";

message User {
string name = 1;
int64 favorite_number = 2;
string favorite_color = 3;
}
84 changes: 84 additions & 0 deletions protobuf/user_pb2.py

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions redpanda_start.sh
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,6 @@ docker build --target dxfeed-publish-timeandsale -t redpanda-dxfeed-financial-da
docker build --target dxfeed-publish-series -t redpanda-dxfeed-financial-data/dxfeed-publish-series:latest .
docker build --target avro-producer -t redpanda-dxfeed-financial-data/avro-producer:latest .
docker build --target avro-consumer -t redpanda-dxfeed-financial-data/avro-consumer:latest .
docker build --target protobuf-producer -t redpanda-dxfeed-financial-data/protobuf-producer:latest .
docker build --target protobuf-consumer -t redpanda-dxfeed-financial-data/protobuf-consumer:latest .
docker-compose up -d

0 comments on commit 7fa9717

Please sign in to comment.