diff --git a/kaskade/__init__.py b/kaskade/__init__.py index 8ddfb03..fedcc26 100644 --- a/kaskade/__init__.py +++ b/kaskade/__init__.py @@ -34,3 +34,4 @@ def get_kaskade_home() -> Path: logger = logging.getLogger() logger.addHandler(logger_handler) logger.setLevel(logging.INFO) +logging.captureWarnings(True) diff --git a/kaskade/services.py b/kaskade/services.py index 9818768..572fdcf 100644 --- a/kaskade/services.py +++ b/kaskade/services.py @@ -18,7 +18,9 @@ ConfigSource, ) from confluent_kafka.cimpl import NewTopic, NewPartitions +from confluent_kafka.schema_registry.protobuf import ProtobufDeserializer from google.protobuf import descriptor_pb2, message_factory, json_format +from google.protobuf.message import DecodeError from kaskade import logger from kaskade.models import ( @@ -432,6 +434,8 @@ def sort_by_topic_name(topic: TopicMetadata) -> Any: if __name__ == "__main__": # protoc --include_imports --proto_path=. --python_out=. --descriptor_set_out=./Invoice.desc ./Invoice.proto + # https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format + file_desc = "/home/saul/Workspace/kafka-sandbox/kafka-protobuf/src/main/proto/Invoice.desc" with open(file_desc, "rb") as f: file_content = f.read() @@ -442,9 +446,17 @@ def sort_by_topic_name(topic: TopicMetadata) -> Any: def deserialize(raw_message: bytes | None) -> dict[str, Any] | None: if raw_message is None: return None - new_message = deserialization_class() - new_message.ParseFromString(raw_message) - return json_format.MessageToDict(new_message) + + try: + new_message = deserialization_class() + new_message.ParseFromString(raw_message) + return json_format.MessageToDict(new_message, always_print_fields_with_no_presence=True) + except DecodeError: + protobuf_deserializer = ProtobufDeserializer( + deserialization_class, {"use.deprecated.format": False} + ) + new_message = protobuf_deserializer(raw_message, None) + return json_format.MessageToDict(new_message, always_print_fields_with_no_presence=True) async def main() -> None: deserializer = DeserializerFactory() @@ -454,7 +466,7 @@ async def main() -> None: deserializer, Format.STRING, Format.BYTES, - page_size=5, + page_size=15, ) messages = await consumer_service.consume() for message in messages: