Skip to content

Commit

Permalink
Schema Registry: Change Encoder to Serializer (Azure#27662)
Browse files Browse the repository at this point in the history
* Adding explicit dependency on azure-core.

* Updating class name from Encoder -> Serializer.

* Rename encode/decode to deserialize/serialize.

* Rename playback files.

* Fix README file snippets.

* Fix CHANGELOG entries.
  • Loading branch information
conniey committed Mar 15, 2022
1 parent 6904105 commit 48cb91c
Show file tree
Hide file tree
Showing 14 changed files with 139 additions and 130 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@

### Breaking Changes

- Changed `SchemaRegistryApacheAvroEncoder` to `SchemaRegistryApacheAvroSerializer`.
- Changed `decodeMessageData` and `decodeMessageDataAsync` to `deserializeMessageData` and `deserializeMessageDataAsync`.
- Changed `encodeMessageData` and `encodeMessageDataAsync` to `serializeMessageData` and `serializeMessageDataAsync`.

### Bugs Fixed

### Other Changes
Expand All @@ -14,7 +18,7 @@

### Features Added

- Changed `SchemaRegistryApacheAvroEncoder` to deserialize `MessageWithMetadata` rather than tied to a binary format
- Changed `SchemaRegistryApacheAvroEncoder` to deserialize `MessageWithMetadata` rather than tied to a binary format
with preamble. Backwards compatibility with preamble format supported for this release. See issue #26449.

### Breaking Changes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,11 +72,11 @@ SchemaRegistryAsyncClient schemaRegistryAsyncClient = new SchemaRegistryClientBu

#### Create `SchemaRegistryAvroSerializer` through the builder

```java readme-sample-createSchemaRegistryAvroEncoder
SchemaRegistryApacheAvroEncoder encoder = new SchemaRegistryApacheAvroEncoderBuilder()
```java readme-sample-createSchemaRegistryAvroSerializer
SchemaRegistryApacheAvroSerializer serializer = new SchemaRegistryApacheAvroSerializerBuilder()
.schemaRegistryAsyncClient(schemaRegistryAsyncClient)
.schemaGroup("{schema-group}")
.buildEncoder();
.buildSerializer();
```

## Key concepts
Expand Down Expand Up @@ -105,13 +105,13 @@ The serializer in this library creates messages in a wire format. The format is
### Serialize
Serialize a strongly-typed object into Schema Registry-compatible avro payload.

```java readme-sample-encodeSample
```java readme-sample-serializeSample
PlayingCard playingCard = new PlayingCard();
playingCard.setPlayingCardSuit(PlayingCardSuit.SPADES);
playingCard.setIsFaceCard(false);
playingCard.setCardValue(5);

MessageWithMetadata message = encoder.encodeMessageData(playingCard,
MessageWithMetadata message = serializer.serializeMessageData(playingCard,
TypeReference.createInstance(MessageWithMetadata.class));
```

Expand All @@ -121,10 +121,10 @@ The avro type `PlayingCard` is available in samples package
### Deserialize
Deserialize a Schema Registry-compatible avro payload into a strongly-type object.

```java readme-sample-decodeSample
SchemaRegistryApacheAvroEncoder encoder = createAvroSchemaRegistryEncoder();
```java readme-sample-deserializeSample
SchemaRegistryApacheAvroSerializer serializer = createAvroSchemaRegistrySerializer();
MessageWithMetadata message = getSchemaRegistryAvroMessage();
PlayingCard playingCard = encoder.decodeMessageData(message, TypeReference.createInstance(PlayingCard.class));
PlayingCard playingCard = serializer.deserializeMessageData(message, TypeReference.createInstance(PlayingCard.class));
```

## Troubleshooting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,11 @@
</properties>

<dependencies>
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-core</artifactId>
<version>1.26.0</version> <!-- {x-version-update;com.azure:azure-core;dependency} -->
</dependency>
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-core-experimental</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,13 @@
/**
* Schema Registry-based serializer implementation for Avro data format using Apache Avro.
*/
public final class SchemaRegistryApacheAvroEncoder {
public final class SchemaRegistryApacheAvroSerializer {
static final String AVRO_MIME_TYPE = "avro/binary";
static final byte[] RECORD_FORMAT_INDICATOR = new byte[]{0x00, 0x00, 0x00, 0x00};
static final int RECORD_FORMAT_INDICATOR_SIZE = RECORD_FORMAT_INDICATOR.length;
static final int SCHEMA_ID_SIZE = 32;

private final ClientLogger logger = new ClientLogger(SchemaRegistryApacheAvroEncoder.class);
private final ClientLogger logger = new ClientLogger(SchemaRegistryApacheAvroSerializer.class);
private final SchemaRegistryAsyncClient schemaRegistryClient;
private final AvroSerializer avroSerializer;
private final SerializerOptions serializerOptions;
Expand All @@ -46,7 +46,7 @@ public final class SchemaRegistryApacheAvroEncoder {
* @param avroSerializer Serializer implemented using Apache Avro.
* @param serializerOptions Options to configure the serializer with.
*/
SchemaRegistryApacheAvroEncoder(SchemaRegistryAsyncClient schemaRegistryClient,
SchemaRegistryApacheAvroSerializer(SchemaRegistryAsyncClient schemaRegistryClient,
AvroSerializer avroSerializer, SerializerOptions serializerOptions) {
this.schemaRegistryClient = Objects.requireNonNull(schemaRegistryClient,
"'schemaRegistryClient' cannot be null.");
Expand All @@ -56,84 +56,84 @@ public final class SchemaRegistryApacheAvroEncoder {
}

/**
* Encodes an object into a message.
* Serializes an object into a message.
*
* @param object Object to encode.
* @param object Object to serialize.
* @param typeReference Type of message to create.
* @param <T> Concrete type of {@link MessageWithMetadata}.
*
* @return The message encoded or {@code null} if the message could not be encoded.
* @return The message encoded or {@code null} if the message could not be serialized.
*
* @throws IllegalArgumentException if {@code messageFactory} is null and type {@code T} does not have a no
* argument constructor. Or if the schema could not ve fetched from {@code T}.
* argument constructor. Or if the schema could not be fetched from {@code T}.
* @throws RuntimeException if an instance of {@code T} could not be instantiated. Or there was a problem
* encoding the object.
* @throws NullPointerException if the {@code object} is null or {@code typeReference} is null.
*/
public <T extends MessageWithMetadata> T encodeMessageData(Object object, TypeReference<T> typeReference) {
return encodeMessageDataAsync(object, typeReference).block();
public <T extends MessageWithMetadata> T serializeMessageData(Object object, TypeReference<T> typeReference) {
return serializeMessageDataAsync(object, typeReference).block();
}

/**
* Encodes an object into a message.
* Serializes an object into a message.
*
* @param object Object to encode.
* @param object Object to serialize.
* @param typeReference Type of message to create.
* @param messageFactory Factory to create an instance given the serialized Avro.
* @param <T> Concrete type of {@link MessageWithMetadata}.
*
* @return The message encoded or {@code null} if the message could not be encoded.
* @return The message encoded or {@code null} if the message could not be serialized.
*
* @throws IllegalArgumentException if {@code messageFactory} is null and type {@code T} does not have a no
* argument constructor. Or if the schema could not ve fetched from {@code T}.
* argument constructor. Or if the schema could not be fetched from {@code T}.
* @throws RuntimeException if an instance of {@code T} could not be instantiated. Or there was a problem
* encoding the object.
* @throws NullPointerException if the {@code object} is null or {@code typeReference} is null.
*/
public <T extends MessageWithMetadata> T encodeMessageData(Object object, TypeReference<T> typeReference,
public <T extends MessageWithMetadata> T serializeMessageData(Object object, TypeReference<T> typeReference,
Function<BinaryData, T> messageFactory) {
return encodeMessageDataAsync(object, typeReference, messageFactory).block();
return serializeMessageDataAsync(object, typeReference, messageFactory).block();
}

/**
* Encodes an object into a message.
* Serializes an object into a message.
*
* @param object Object to encode.
* @param object Object to serialize.
* @param typeReference Type of message to create.
* @param <T> Concrete type of {@link MessageWithMetadata}.
*
* @return A Mono that completes with the encoded message.
* @return A Mono that completes with the serialized message.
*
* @throws IllegalArgumentException if {@code messageFactory} is null and type {@code T} does not have a no
* argument constructor. Or if the schema could not ve fetched from {@code T}.
* argument constructor. Or if the schema could not be fetched from {@code T}.
* @throws RuntimeException if an instance of {@code T} could not be instantiated. Or there was a problem
* encoding the object.
* @throws NullPointerException if the {@code object} is null or {@code typeReference} is null.
*/
public <T extends MessageWithMetadata> Mono<T> encodeMessageDataAsync(Object object,
public <T extends MessageWithMetadata> Mono<T> serializeMessageDataAsync(Object object,
TypeReference<T> typeReference) {

return encodeMessageDataAsync(object, typeReference, null);
return serializeMessageDataAsync(object, typeReference, null);
}

/**
* Encodes an object into a message.
* Serializes an object into a message.
*
* @param object Object to encode.
* @param object Object to serialize.
* @param typeReference Type of message to create.
* @param messageFactory Factory to create an instance given the serialized Avro. If null is passed in, then the
* no argument constructor will be used.
* @param <T> Concrete type of {@link MessageWithMetadata}.
*
* @return A Mono that completes with the encoded message.
* @return A Mono that completes with the serialized message.
*
* @throws IllegalArgumentException if {@code messageFactory} is null and type {@code T} does not have a no
* argument constructor. Or if the schema could not ve fetched from {@code T}.
* argument constructor. Or if the schema could not be fetched from {@code T}.
* @throws RuntimeException if an instance of {@code T} could not be instantiated. Or there was a problem
* encoding the object.
* @throws NullPointerException if the {@code object} is null or {@code typeReference} is null.
*/
public <T extends MessageWithMetadata> Mono<T> encodeMessageDataAsync(Object object,
public <T extends MessageWithMetadata> Mono<T> serializeMessageDataAsync(Object object,
TypeReference<T> typeReference, Function<BinaryData, T> messageFactory) {

if (object == null) {
Expand Down Expand Up @@ -189,33 +189,33 @@ public <T extends MessageWithMetadata> Mono<T> encodeMessageDataAsync(Object obj
}

/**
* Decodes a message into its object.
* Deserializes a message into its object.
*
* @param message Object to encode.
* @param typeReference Message to encode to.
* @param message Object to deserialize.
* @param typeReference Message type to deserialize to.
* @param <T> Concrete type of {@link MessageWithMetadata}.
*
* @return The message encoded.
* @return The message deserialized.
*
* @throws NullPointerException if {@code message} or {@code typeReference} is null.
*/
public <T> T decodeMessageData(MessageWithMetadata message, TypeReference<T> typeReference) {
return decodeMessageDataAsync(message, typeReference).block();
public <T> T deserializeMessageData(MessageWithMetadata message, TypeReference<T> typeReference) {
return deserializeMessageDataAsync(message, typeReference).block();
}

/**
* Decodes a message into its object.
* Deserializes a message into its object.
*
* @param message Object to encode.
* @param typeReference Message to encode to.
* @param message Object to deserialize.
* @param typeReference Message to deserialize to.
* @param <T> Concrete type of {@link MessageWithMetadata}.
*
* @return A Mono that completes when the message encoded. If {@code message.getBodyAsBinaryData()} is null or
* empty, then an empty Mono is returned.
*
* @throws NullPointerException if {@code message} or {@code typeReference} is null.
*/
public <T> Mono<T> decodeMessageDataAsync(MessageWithMetadata message, TypeReference<T> typeReference) {
public <T> Mono<T> deserializeMessageDataAsync(MessageWithMetadata message, TypeReference<T> typeReference) {
if (message == null) {
return monoError(logger, new NullPointerException("'message' cannot be null."));
} else if (typeReference == null) {
Expand Down Expand Up @@ -282,10 +282,10 @@ public <T> Mono<T> decodeMessageDataAsync(MessageWithMetadata message, TypeRefer
contents.reset();
}

return decodeMessageDataAsync(schemaId, contents, typeReference);
return deserializeMessageDataAsync(schemaId, contents, typeReference);
}

private <T> Mono<T> decodeMessageDataAsync(String schemaId, ByteBuffer buffer, TypeReference<T> typeReference) {
private <T> Mono<T> deserializeMessageDataAsync(String schemaId, ByteBuffer buffer, TypeReference<T> typeReference) {
return this.schemaRegistryClient.getSchema(schemaId)
.handle((registryObject, sink) -> {
final byte[] payloadSchema = registryObject.getDefinition().getBytes(StandardCharsets.UTF_8);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,15 @@
import java.util.Objects;

/**
* The builder implementation for building {@link SchemaRegistryApacheAvroEncoder}.
* The builder implementation for building {@link SchemaRegistryApacheAvroSerializer}.
*
* @see SchemaRegistryApacheAvroEncoder
* @see SchemaRegistryApacheAvroSerializer
*/
public final class SchemaRegistryApacheAvroEncoderBuilder {
public final class SchemaRegistryApacheAvroSerializerBuilder {
private static final boolean AVRO_SPECIFIC_READER_DEFAULT = false;
private static final int MAX_CACHE_SIZE = 128;

private final ClientLogger logger = new ClientLogger(SchemaRegistryApacheAvroEncoderBuilder.class);
private final ClientLogger logger = new ClientLogger(SchemaRegistryApacheAvroSerializerBuilder.class);
private Boolean autoRegisterSchemas;
private Boolean avroSpecificReader;
private SchemaRegistryAsyncClient schemaRegistryAsyncClient;
Expand All @@ -32,7 +32,7 @@ public final class SchemaRegistryApacheAvroEncoderBuilder {
/**
* Instantiates instance of Builder class. Supplies client defaults.
*/
public SchemaRegistryApacheAvroEncoderBuilder() {
public SchemaRegistryApacheAvroSerializerBuilder() {
this.autoRegisterSchemas = false;
this.avroSpecificReader = false;
}
Expand All @@ -46,9 +46,9 @@ public SchemaRegistryApacheAvroEncoderBuilder() {
*
* @param schemaGroup Azure Schema Registry schema group
*
* @return updated {@link SchemaRegistryApacheAvroEncoderBuilder} instance
* @return updated {@link SchemaRegistryApacheAvroSerializerBuilder} instance
*/
public SchemaRegistryApacheAvroEncoderBuilder schemaGroup(String schemaGroup) {
public SchemaRegistryApacheAvroSerializerBuilder schemaGroup(String schemaGroup) {
this.schemaGroup = schemaGroup;
return this;
}
Expand All @@ -64,9 +64,9 @@ public SchemaRegistryApacheAvroEncoderBuilder schemaGroup(String schemaGroup) {
*
* @param autoRegisterSchemas flag for schema auto-registration
*
* @return updated {@link SchemaRegistryApacheAvroEncoderBuilder} instance
* @return updated {@link SchemaRegistryApacheAvroSerializerBuilder} instance
*/
public SchemaRegistryApacheAvroEncoderBuilder autoRegisterSchema(boolean autoRegisterSchemas) {
public SchemaRegistryApacheAvroSerializerBuilder autoRegisterSchema(boolean autoRegisterSchemas) {
this.autoRegisterSchemas = autoRegisterSchemas;
return this;
}
Expand All @@ -78,9 +78,9 @@ public SchemaRegistryApacheAvroEncoderBuilder autoRegisterSchema(boolean autoReg
* @param avroSpecificReader {@code true} to deserialize into {@link SpecificRecord} via {@link
* SpecificDatumReader}; {@code false} otherwise.
*
* @return updated {@link SchemaRegistryApacheAvroEncoderBuilder} instance.
* @return updated {@link SchemaRegistryApacheAvroSerializerBuilder} instance.
*/
public SchemaRegistryApacheAvroEncoderBuilder avroSpecificReader(boolean avroSpecificReader) {
public SchemaRegistryApacheAvroSerializerBuilder avroSpecificReader(boolean avroSpecificReader) {
this.avroSpecificReader = avroSpecificReader;
return this;
}
Expand All @@ -90,9 +90,9 @@ public SchemaRegistryApacheAvroEncoderBuilder avroSpecificReader(boolean avroSpe
*
* @param schemaRegistryAsyncClient The {@link SchemaRegistryAsyncClient}.
*
* @return updated {@link SchemaRegistryApacheAvroEncoderBuilder} instance.
* @return updated {@link SchemaRegistryApacheAvroSerializerBuilder} instance.
*/
public SchemaRegistryApacheAvroEncoderBuilder schemaRegistryAsyncClient(
public SchemaRegistryApacheAvroSerializerBuilder schemaRegistryAsyncClient(
SchemaRegistryAsyncClient schemaRegistryAsyncClient) {
this.schemaRegistryAsyncClient = schemaRegistryAsyncClient;
return this;
Expand All @@ -101,13 +101,13 @@ public SchemaRegistryApacheAvroEncoderBuilder schemaRegistryAsyncClient(
/**
* Creates a new instance of Schema Registry serializer.
*
* @return A new instance of {@link SchemaRegistryApacheAvroEncoder}.
* @return A new instance of {@link SchemaRegistryApacheAvroSerializer}.
*
* @throws NullPointerException if {@link #schemaRegistryAsyncClient(SchemaRegistryAsyncClient)} is {@code null}
* @throws IllegalStateException if {@link #autoRegisterSchema(boolean)} is {@code true} but {@link
* #schemaGroup(String) schemaGroup} is {@code null}.
*/
public SchemaRegistryApacheAvroEncoder buildEncoder() {
public SchemaRegistryApacheAvroSerializer buildSerializer() {
final boolean isAutoRegister = autoRegisterSchemas != null && autoRegisterSchemas;

if (Objects.isNull(schemaRegistryAsyncClient)) {
Expand All @@ -126,6 +126,6 @@ public SchemaRegistryApacheAvroEncoder buildEncoder() {
EncoderFactory.get(), DecoderFactory.get());
final SerializerOptions options = new SerializerOptions(schemaGroup, isAutoRegister, MAX_CACHE_SIZE);

return new SchemaRegistryApacheAvroEncoder(schemaRegistryAsyncClient, codec, options);
return new SchemaRegistryApacheAvroSerializer(schemaRegistryAsyncClient, codec, options);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ This project welcomes contributions and suggestions. See [Contributing][sdk_read
[sdk_readme_troubleshooting]: https://github.com/Azure/azure-sdk-for-java/blob/main/sdk/schemaregistry/azure-data-schemaregistry-apacheavro/README.md#troubleshooting
[sdk_readme_next_steps]: https://github.com/Azure/azure-sdk-for-java/blob/main/sdk/schemaregistry/azure-data-schemaregistry-apacheavro/README.md#next-steps
[sdk_readme_contributing]: https://github.com/Azure/azure-sdk-for-java/blob/main/sdk/schemaregistry/azure-data-schemaregistry-apacheavro/README.md#contributing
[sample_avro_serialization]: https://github.com/Azure/azure-sdk-for-java/blob/main/sdk/schemaregistry/azure-data-schemaregistry-apacheavro/src/samples/java/com/azure/data/schemaregistry/apacheavro/SchemaRegistryApacheAvroEncoderSample.java
[sample_avro_serialization]: https://github.com/Azure/azure-sdk-for-java/blob/main/sdk/schemaregistry/azure-data-schemaregistry-apacheavro/src/samples/java/com/azure/data/schemaregistry/apacheavro/SchemaRegistryApacheAvroSerializerSample.java
[sample_avro_deserialization]: https://github.com/Azure/azure-sdk-for-java/blob/main/sdk/schemaregistry/azure-data-schemaregistry-apacheavro/src/samples/java/com/azure/data/schemaregistry/apacheavro/SchemaRegistryAvroDeserializationSample.java

![Impressions](https://azure-sdk-impressions.azurewebsites.net/api/impressions/azure-sdk-for-java%2Fsdk%schemaregistry%2Fazure-data-schemaregistry-apacheavro%2Fsrc%2Fsamples%2README.png)
Loading

0 comments on commit 48cb91c

Please sign in to comment.