Skip to content

Commit

Permalink
Add example of producing and consuming avro messages
Browse files Browse the repository at this point in the history
Signed-off-by: Ben Pope <ben@vectorized.io>
  • Loading branch information
BenPope committed Jan 29, 2022
1 parent 4d33ba5 commit 8091488
Show file tree
Hide file tree
Showing 6 changed files with 350 additions and 0 deletions.
12 changes: 12 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -65,3 +65,15 @@ COPY dxfeed/requirements.txt .
RUN pip install -r requirements.txt
COPY dxfeed/publish_series_table.py .
CMD [ "python3", "publish_series_table.py"]

FROM python:3.8 AS avro-producer
COPY avro/requirements.txt .
RUN pip install -r requirements.txt
COPY avro/avro_producer.py .
CMD [ "python3", "avro_producer.py", "-b", "redpanda:29092", "-s", "http://redpanda:8081", "-c", "1000000"]

FROM python:3.8 AS avro-consumer
COPY avro/requirements.txt .
RUN pip install -r requirements.txt
COPY avro/avro_consumer.py .
CMD [ "python3", "avro_consumer.py", "-b", "redpanda:29092", "-s", "http://redpanda:8081"]
157 changes: 157 additions & 0 deletions avro/avro_consumer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# Copyright 2020 Confluent Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

#
# This is a simple example of the SerializingProducer using Avro.
#
import argparse
import time

from confluent_kafka import DeserializingConsumer
from confluent_kafka.error import ConsumeError
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroDeserializer
from confluent_kafka.serialization import StringDeserializer


class User(object):
"""
User record
Args:
name (str): User's name
favorite_number (int): User's favorite number
favorite_color (str): User's favorite color
"""

def __init__(self, name=None, favorite_number=None, favorite_color=None):
self.name = name
self.favorite_number = favorite_number
self.favorite_color = favorite_color


def dict_to_user(obj, ctx):
"""
Converts object literal(dict) to a User instance.
Args:
obj (dict): Object literal(dict)
ctx (SerializationContext): Metadata pertaining to the serialization
operation.
"""
if obj is None:
return None

return User(name=obj['name'],
favorite_number=obj['favorite_number'],
favorite_color=obj['favorite_color'])


def main(args):
topic = args.topic
subject = f"{topic}-value"

schema_str = """
{
"namespace": "confluent.io.examples.serialization.avro",
"name": "User",
"type": "record",
"fields": [
{"name": "name", "type": "string"},
{"name": "favorite_number", "type": "int"},
{"name": "favorite_color", "type": "string"}
]
}
"""

sr_conf = {'url': args.schema_registry}
schema_registry_client = SchemaRegistryClient(sr_conf)

avro_deserializer = AvroDeserializer(schema_registry_client,
schema_str,
dict_to_user)
string_deserializer = StringDeserializer('utf_8')

consumer_conf = {'bootstrap.servers': args.bootstrap_servers,
'key.deserializer': string_deserializer,
'value.deserializer': avro_deserializer,
'group.id': args.group,
'auto.offset.reset': "earliest"}

consumer = None
schema_registry_conf = {'url': args.schema_registry}

for i in range(60):
try:
schema_registry_client = SchemaRegistryClient(schema_registry_conf)
subs = schema_registry_client.get_subjects()
if subject in subs:
consumer = DeserializingConsumer(consumer_conf)
consumer.subscribe([topic])
break
else:
print(
f"Failed to get subject: {subject}, retrying", flush=True)
time.sleep(1)
except Exception as e:
print(f"Failed to connect: {e}, retrying", flush=True)
time.sleep(1)

while consumer:
try:
# SIGINT can't be handled when polling, limit timeout to 1 second.
msg = consumer.poll(1.0)
if msg is None:
continue

user = msg.value()
if user is not None:
print("User record {}:\n"
"\tname: {}\n"
"\tfavorite_number: {}\n"
"\tfavorite_color: {}\n"
.format(msg.key(), user.name,
user.favorite_color,
user.favorite_number))
except KeyboardInterrupt:
break
except ConsumeError as e:
print(f"Error consuming, retrying {e}")
time.sleep(1)

consumer.close()


if __name__ == '__main__':
parser = argparse.ArgumentParser(description="Consumer Example client with "
"serialization capabilities")
parser.add_argument('-b', dest="bootstrap_servers", required=True,
help="Bootstrap broker(s) (host[:port])")
parser.add_argument('-s', dest="schema_registry", required=True,
help="Schema Registry (http(s)://host[:port]")
parser.add_argument('-t', dest="topic", default="example_serde_avro",
help="Topic name")
parser.add_argument('-g', dest="group", default="example_serde_avro",
help="Consumer group")

main(parser.parse_args())
166 changes: 166 additions & 0 deletions avro/avro_producer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# Copyright 2020 Confluent Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

#
# This is a simple example of the SerializingProducer using Avro.
#
import argparse
import time
from uuid import uuid4

from confluent_kafka import SerializingProducer
from confluent_kafka.serialization import StringSerializer
from confluent_kafka.schema_registry import SchemaRegistryClient, SchemaRegistryError
from confluent_kafka.schema_registry.avro import AvroSerializer


class User(object):
"""
User record
Args:
name (str): User's name
favorite_number (int): User's favorite number
favorite_color (str): User's favorite color
"""

def __init__(self, name, favorite_number, favorite_color):
self.name = name
self.favorite_number = favorite_number
self.favorite_color = favorite_color


def user_to_dict(user, ctx):
"""
Returns a dict representation of a User instance for serialization.
Args:
user (User): User instance.
ctx (SerializationContext): Metadata pertaining to the serialization
operation.
Returns:
dict: Dict populated with user attributes to be serialized.
"""
return dict(name=user.name,
favorite_number=user.favorite_number,
favorite_color=user.favorite_color)


def delivery_report(err, msg):
"""
Reports the failure or success of a message delivery.
Args:
err (KafkaError): The error that occurred on None on success.
msg (Message): The message that was produced or failed.
Note:
In the delivery report callback the Message.key() and Message.value()
will be the binary format as encoded by any configured Serializers and
not the same object that was passed to produce().
If you wish to pass the original object(s) for key and value to delivery
report callback we recommend a bound callback or lambda where you pass
the objects along.
"""
if err is not None:
print("Delivery failed for User record {}: {}".format(msg.key(), err))
return
print('User record {} successfully produced to {} [{}] at offset {}'.format(
msg.key(), msg.topic(), msg.partition(), msg.offset()))


def main(args):
topic = args.topic

schema_str = """
{
"namespace": "confluent.io.examples.serialization.avro",
"name": "User",
"type": "record",
"fields": [
{"name": "name", "type": "string"},
{"name": "favorite_number", "type": "int"},
{"name": "favorite_color", "type": "string"}
]
}
"""
schema_registry_conf = {'url': args.schema_registry}

for i in range(60):
try:
schema_registry_client = SchemaRegistryClient(schema_registry_conf)
schema_registry_client.get_subjects()
except Exception:
print("Failed to connect, retrying", flush=True)
time.sleep(1)

avro_serializer = AvroSerializer(schema_registry_client,
schema_str,
user_to_dict)

producer_conf = {'bootstrap.servers': args.bootstrap_servers,
'key.serializer': StringSerializer('utf_8'),
'value.serializer': avro_serializer}

producer = SerializingProducer(producer_conf)

print("Producing user records to topic {}. ^C to exit.".format(topic))
for i in range(int(args.count)):
# Prevent overflow of buffer
while len(producer) > 50000:
# Serve on_delivery callbacks from previous calls to produce()
producer.poll(0.001)
try:
user_name = "Ben"
user_favorite_number = i
user_favorite_color = "blue"
user = User(name=user_name,
favorite_color=user_favorite_color,
favorite_number=user_favorite_number)
producer.produce(topic=topic, key=str(uuid4()), value=user,
on_delivery=delivery_report)
except KeyboardInterrupt:
break
except ValueError:
print("Invalid input, discarding record...")
continue

print("\nFlushing records...")
producer.flush()


if __name__ == '__main__':
parser = argparse.ArgumentParser(description="SerializingProducer Example")
parser.add_argument('-b', dest="bootstrap_servers", required=True,
help="Bootstrap broker(s) (host[:port])")
parser.add_argument('-s', dest="schema_registry", required=True,
help="Schema Registry (http(s)://host[:port]")
parser.add_argument('-t', dest="topic", default="example_serde_avro",
help="Topic name")
parser.add_argument('-c', dest="count", default="1000000",
help="Number of messages to send")

main(parser.parse_args())
3 changes: 3 additions & 0 deletions avro/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
confluent_kafka==1.7.0
fastavro==1.4.9
requests==v2.27.1
10 changes: 10 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,16 @@ services:
dxfeed-series:
image: redpanda-dxfeed-financial-data/dxfeed-publish-series:latest

avro-producer:
image: redpanda-dxfeed-financial-data/avro-producer:latest
depends_on:
- redpanda

avro-consumer:
image: redpanda-dxfeed-financial-data/avro-consumer:latest
depends_on:
- redpanda

volumes:
web-tmp:
api-cache:
2 changes: 2 additions & 0 deletions redpanda_start.sh
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,6 @@ docker build --target dxfeed-publish-order -t redpanda-dxfeed-financial-data/dxf
docker build --target dxfeed-publish-underlying -t redpanda-dxfeed-financial-data/dxfeed-publish-underlying:latest .
docker build --target dxfeed-publish-timeandsale -t redpanda-dxfeed-financial-data/dxfeed-publish-timeandsale:latest .
docker build --target dxfeed-publish-series -t redpanda-dxfeed-financial-data/dxfeed-publish-series:latest .
docker build --target avro-producer -t redpanda-dxfeed-financial-data/avro-producer:latest .
docker build --target avro-consumer -t redpanda-dxfeed-financial-data/avro-consumer:latest .
docker-compose up -d

0 comments on commit 8091488

Please sign in to comment.