From 9b2a64019bb444406a381a4b17d28fe77330e12d Mon Sep 17 00:00:00 2001 From: swathipil <76007337+swathipil@users.noreply.github.com> Date: Fri, 1 Oct 2021 15:38:35 -0700 Subject: [PATCH] [SchemaRegistry] remove all serializer caches (#21020) --- .../avroserializer/_avro_serializer.py | 38 +++++++++---------- .../_schema_registry_avro_serializer.py | 13 +++---- ...serializer_with_auto_register_schemas.yaml | 8 ++-- ...ializer_without_auto_register_schemas.yaml | 8 ++-- .../tests/test_avro_serializer.py | 4 -- 5 files changed, 33 insertions(+), 38 deletions(-) diff --git a/sdk/schemaregistry/azure-schemaregistry-avroserializer/azure/schemaregistry/serializer/avroserializer/_avro_serializer.py b/sdk/schemaregistry/azure-schemaregistry-avroserializer/azure/schemaregistry/serializer/avroserializer/_avro_serializer.py index a190bffe5467..0f40857e6e93 100644 --- a/sdk/schemaregistry/azure-schemaregistry-avroserializer/azure/schemaregistry/serializer/avroserializer/_avro_serializer.py +++ b/sdk/schemaregistry/azure-schemaregistry-avroserializer/azure/schemaregistry/serializer/avroserializer/_avro_serializer.py @@ -23,7 +23,11 @@ # IN THE SOFTWARE. # # -------------------------------------------------------------------------- -from typing import BinaryIO, Union, TypeVar, Dict +try: + from functools import lru_cache +except ImportError: + from backports.functools_lru_cache import lru_cache +from typing import BinaryIO, Union, TypeVar from io import BytesIO import avro from avro.io import DatumWriter, DatumReader, BinaryDecoder, BinaryEncoder @@ -38,9 +42,18 @@ def __init__(self, codec=None): :param str codec: The writer codec. If None, let the avro library decides. """ self._writer_codec = codec - self._schema_writer_cache = {} # type: Dict[str, DatumWriter] - self._schema_reader_cache = {} # type: Dict[str, DatumReader] + @lru_cache(maxsize=128) + def _get_schema_writer(self, schema): # pylint: disable=no-self-use + schema = avro.schema.parse(schema) + return DatumWriter(schema) + + @lru_cache(maxsize=128) + def _get_schema_reader(self, schema): # pylint: disable=no-self-use + schema = avro.schema.parse(schema) + return DatumReader(writers_schema=schema) + + # pylint: disable=no-self-use def serialize( self, data, # type: ObjectType @@ -60,14 +73,7 @@ def serialize( if not schema: raise ValueError("Schema is required in Avro serializer.") - if not isinstance(schema, avro.schema.Schema): - schema = avro.schema.parse(schema) - - try: - writer = self._schema_writer_cache[str(schema)] - except KeyError: - writer = DatumWriter(schema) - self._schema_writer_cache[str(schema)] = writer + writer = self._get_schema_writer(str(schema)) stream = BytesIO() with stream: @@ -75,6 +81,7 @@ def serialize( encoded_data = stream.getvalue() return encoded_data + # pylint: disable=no-self-use def deserialize( self, data, # type: Union[bytes, BinaryIO] @@ -93,14 +100,7 @@ def deserialize( if not hasattr(data, 'read'): data = BytesIO(data) - if not isinstance(schema, avro.schema.Schema): - schema = avro.schema.parse(schema) - - try: - reader = self._schema_reader_cache[str(schema)] - except KeyError: - reader = DatumReader(writers_schema=schema) - self._schema_reader_cache[str(schema)] = reader + reader = self._get_schema_reader(str(schema)) with data: bin_decoder = BinaryDecoder(data) diff --git a/sdk/schemaregistry/azure-schemaregistry-avroserializer/azure/schemaregistry/serializer/avroserializer/_schema_registry_avro_serializer.py b/sdk/schemaregistry/azure-schemaregistry-avroserializer/azure/schemaregistry/serializer/avroserializer/_schema_registry_avro_serializer.py index cbd6c1aefede..df34575dea63 100644 --- a/sdk/schemaregistry/azure-schemaregistry-avroserializer/azure/schemaregistry/serializer/avroserializer/_schema_registry_avro_serializer.py +++ b/sdk/schemaregistry/azure-schemaregistry-avroserializer/azure/schemaregistry/serializer/avroserializer/_schema_registry_avro_serializer.py @@ -63,7 +63,6 @@ def __init__(self, **kwargs): if self._auto_register_schemas else self._schema_registry_client.get_schema_id ) - self._user_input_schema_cache = {} def __enter__(self): # type: () -> SchemaRegistryAvroSerializer @@ -115,6 +114,11 @@ def _get_schema(self, schema_id, **kwargs): ).schema_content return schema_str + @classmethod + @lru_cache(maxsize=128) + def _parse_schema(cls, schema): + return avro.schema.parse(schema) + def serialize(self, value, **kwargs): # type: (Mapping[str, Any], Any) -> bytes """ @@ -132,13 +136,8 @@ def serialize(self, value, **kwargs): raw_input_schema = kwargs.pop("schema") except KeyError as e: raise TypeError("'{}' is a required keyword.".format(e.args[0])) - try: - cached_schema = self._user_input_schema_cache[raw_input_schema] - except KeyError: - parsed_schema = avro.schema.parse(raw_input_schema) - self._user_input_schema_cache[raw_input_schema] = parsed_schema - cached_schema = parsed_schema + cached_schema = AvroSerializer._parse_schema(raw_input_schema) record_format_identifier = b"\0\0\0\0" schema_id = self._get_schema_id(cached_schema.fullname, str(cached_schema), **kwargs) data_bytes = self._avro_serializer.serialize(value, cached_schema) diff --git a/sdk/schemaregistry/azure-schemaregistry-avroserializer/tests/recordings/test_avro_serializer.test_basic_sr_avro_serializer_with_auto_register_schemas.yaml b/sdk/schemaregistry/azure-schemaregistry-avroserializer/tests/recordings/test_avro_serializer.test_basic_sr_avro_serializer_with_auto_register_schemas.yaml index cfd2920100e2..0e70a2f2a6b4 100644 --- a/sdk/schemaregistry/azure-schemaregistry-avroserializer/tests/recordings/test_avro_serializer.test_basic_sr_avro_serializer_with_auto_register_schemas.yaml +++ b/sdk/schemaregistry/azure-schemaregistry-avroserializer/tests/recordings/test_avro_serializer.test_basic_sr_avro_serializer_with_auto_register_schemas.yaml @@ -23,12 +23,12 @@ interactions: uri: https://fake_resource.servicebus.windows.net/$schemagroups/fakegroup/schemas/example.avro.User?api-version=2017-04 response: body: - string: '{"id":"f666e373299048fabaa4296f5dbfed46"}' + string: '{"id":"7b4eff1c25d9438a975ff7a3d985a5c6"}' headers: content-type: - application/json date: - - Thu, 30 Sep 2021 02:05:53 GMT + - Fri, 01 Oct 2021 22:19:06 GMT location: - https://swathip-test-eventhubs.servicebus.windows.net:443/$schemagroups/fakegroup/schemas/example.avro.User/versions/1?api-version=2017-04 server: @@ -38,9 +38,9 @@ interactions: transfer-encoding: - chunked x-schema-id: - - f666e373299048fabaa4296f5dbfed46 + - 7b4eff1c25d9438a975ff7a3d985a5c6 x-schema-id-location: - - https://swathip-test-eventhubs.servicebus.windows.net:443/$schemagroups/getschemabyid/f666e373299048fabaa4296f5dbfed46?api-version=2017-04 + - https://swathip-test-eventhubs.servicebus.windows.net:443/$schemagroups/getschemabyid/7b4eff1c25d9438a975ff7a3d985a5c6?api-version=2017-04 x-schema-type: - Avro x-schema-version: diff --git a/sdk/schemaregistry/azure-schemaregistry-avroserializer/tests/recordings/test_avro_serializer.test_basic_sr_avro_serializer_without_auto_register_schemas.yaml b/sdk/schemaregistry/azure-schemaregistry-avroserializer/tests/recordings/test_avro_serializer.test_basic_sr_avro_serializer_without_auto_register_schemas.yaml index 0feb5392eba4..ad11417efc03 100644 --- a/sdk/schemaregistry/azure-schemaregistry-avroserializer/tests/recordings/test_avro_serializer.test_basic_sr_avro_serializer_without_auto_register_schemas.yaml +++ b/sdk/schemaregistry/azure-schemaregistry-avroserializer/tests/recordings/test_avro_serializer.test_basic_sr_avro_serializer_without_auto_register_schemas.yaml @@ -23,12 +23,12 @@ interactions: uri: https://fake_resource.servicebus.windows.net/$schemagroups/fakegroup/schemas/example.avro.User?api-version=2017-04 response: body: - string: '{"id":"f666e373299048fabaa4296f5dbfed46"}' + string: '{"id":"7b4eff1c25d9438a975ff7a3d985a5c6"}' headers: content-type: - application/json date: - - Thu, 30 Sep 2021 02:05:54 GMT + - Fri, 01 Oct 2021 22:19:07 GMT location: - https://swathip-test-eventhubs.servicebus.windows.net:443/$schemagroups/fakegroup/schemas/example.avro.User/versions/1?api-version=2017-04 server: @@ -38,9 +38,9 @@ interactions: transfer-encoding: - chunked x-schema-id: - - f666e373299048fabaa4296f5dbfed46 + - 7b4eff1c25d9438a975ff7a3d985a5c6 x-schema-id-location: - - https://swathip-test-eventhubs.servicebus.windows.net:443/$schemagroups/getschemabyid/f666e373299048fabaa4296f5dbfed46?api-version=2017-04 + - https://swathip-test-eventhubs.servicebus.windows.net:443/$schemagroups/getschemabyid/7b4eff1c25d9438a975ff7a3d985a5c6?api-version=2017-04 x-schema-type: - Avro x-schema-version: diff --git a/sdk/schemaregistry/azure-schemaregistry-avroserializer/tests/test_avro_serializer.py b/sdk/schemaregistry/azure-schemaregistry-avroserializer/tests/test_avro_serializer.py index f392d8431917..c93b998ce08c 100644 --- a/sdk/schemaregistry/azure-schemaregistry-avroserializer/tests/test_avro_serializer.py +++ b/sdk/schemaregistry/azure-schemaregistry-avroserializer/tests/test_avro_serializer.py @@ -86,8 +86,6 @@ def test_basic_sr_avro_serializer_with_auto_register_schemas(self, schemaregistr dict_data = {"name": u"Ben", "favorite_number": 7, "favorite_color": u"red"} encoded_data = sr_avro_serializer.serialize(dict_data, schema=schema_str) - assert schema_str in sr_avro_serializer._user_input_schema_cache - assert encoded_data[0:4] == b'\0\0\0\0' schema_id = sr_client.get_schema_id(schemaregistry_group, schema.fullname, "Avro", str(schema)).schema_id assert encoded_data[4:36] == schema_id.encode("utf-8") @@ -111,8 +109,6 @@ def test_basic_sr_avro_serializer_without_auto_register_schemas(self, schemaregi dict_data = {"name": u"Ben", "favorite_number": 7, "favorite_color": u"red"} encoded_data = sr_avro_serializer.serialize(dict_data, schema=schema_str) - assert schema_str in sr_avro_serializer._user_input_schema_cache - assert encoded_data[0:4] == b'\0\0\0\0' schema_id = sr_client.get_schema_id(schemaregistry_group, schema.fullname, "Avro", str(schema)).schema_id assert encoded_data[4:36] == schema_id.encode("utf-8")