Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Serializer-level updates to API defs #13668

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,21 @@ public class AvroCodec implements Codec {
private final ClientLogger logger = new ClientLogger(AvroCodec.class);
private static final EncoderFactory ENCODER_FACTORY = EncoderFactory.get();
private static final DecoderFactory DECODER_FACTORY = DecoderFactory.get();
private final boolean avroSpecificReader;
private static final Boolean AVRO_SPECIFIC_READER_DEFAULT = false;

private final Boolean avroSpecificReader;

/**
* Instantiates AvroCodec instance
* @param avroSpecificReader flag indicating if decoder should decode records as SpecificRecords
*/
public AvroCodec(boolean avroSpecificReader) {
this.avroSpecificReader = avroSpecificReader;
public AvroCodec(Boolean avroSpecificReader) {
if (avroSpecificReader == null) {
this.avroSpecificReader = AvroCodec.AVRO_SPECIFIC_READER_DEFAULT;
}
else {
this.avroSpecificReader = avroSpecificReader;
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import reactor.core.publisher.Mono;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Collections;

/**
* Asynchronous registry-based serializer implementation.
Expand All @@ -26,15 +27,17 @@ public class SchemaRegistryAvroAsyncSerializer extends AbstractSchemaRegistrySer
* @param autoRegisterSchemas
*/
SchemaRegistryAvroAsyncSerializer(CachedSchemaRegistryAsyncClient registryClient, AvroCodec codec,
String schemaGroup, boolean autoRegisterSchemas) {
super(registryClient);

setSerializerCodec(codec);
addDeserializerCodec(codec);
String schemaGroup, Boolean autoRegisterSchemas) {
super(registryClient, codec, Collections.singletonList(codec));

// send configurations only
this.autoRegisterSchemas = autoRegisterSchemas;
this.schemaGroup = schemaGroup;
if (autoRegisterSchemas != null) {
this.autoRegisterSchemas = autoRegisterSchemas;
}

if (schemaGroup != null) {
this.schemaGroup = schemaGroup;
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@
public final class SchemaRegistryAvroSerializerBuilder {
private String registryUrl;
private TokenCredential credential;
private boolean autoRegisterSchemas;
private Boolean autoRegisterSchemas;
private String schemaGroup;
private Integer maxSchemaMapSize;
private boolean avroSpecificReader;
private Boolean avroSpecificReader;

/**
* Instantiates instance of Builder class.
Expand All @@ -28,8 +28,8 @@ public final class SchemaRegistryAvroSerializerBuilder {
public SchemaRegistryAvroSerializerBuilder() {
this.registryUrl = null;
this.credential = null;
this.autoRegisterSchemas = AbstractSchemaRegistrySerializer.AUTO_REGISTER_SCHEMAS_DEFAULT;
this.schemaGroup = AbstractSchemaRegistrySerializer.SCHEMA_GROUP_DEFAULT;
this.autoRegisterSchemas = null;
this.schemaGroup = null;
this.maxSchemaMapSize = null;
this.avroSpecificReader = false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentSkipListMap;

import static com.azure.core.util.FluxUtil.monoError;
Expand All @@ -30,42 +32,48 @@ public abstract class AbstractSchemaRegistrySerializer {

public static final Boolean AUTO_REGISTER_SCHEMAS_DEFAULT = false;
public static final String SCHEMA_GROUP_DEFAULT = "$default";
public static final int SCHEMA_ID_SIZE = 32;
static final int SCHEMA_ID_SIZE = 32;

protected CachedSchemaRegistryAsyncClient schemaRegistryClient;
CachedSchemaRegistryAsyncClient schemaRegistryClient;

protected Codec serializerCodec = null;
private Codec serializerCodec;
private final Map<String, Codec> deserializerCodecMap = new ConcurrentSkipListMap<>(String.CASE_INSENSITIVE_ORDER);
protected String schemaType;
private String schemaType;

protected Boolean autoRegisterSchemas = AbstractSchemaRegistrySerializer.AUTO_REGISTER_SCHEMAS_DEFAULT;
protected String schemaGroup = AbstractSchemaRegistrySerializer.SCHEMA_GROUP_DEFAULT;

/**
* @param schemaRegistryClient registry client to be used for storing schemas. Not null.
* Constructor for AbstractSchemaRegistrySerializer implementations.
*
* @param schemaRegistryClient client to be used for interfacing with Schema Registry service
* @param serializerCodec Codec to be used for serialization operations
* @param deserializerCodecList list of Codecs to be used to deserialize incoming payloads
*/
public AbstractSchemaRegistrySerializer(CachedSchemaRegistryAsyncClient schemaRegistryClient,
Codec serializerCodec, Map<String, Codec> deserializerCodecMap) {
Codec serializerCodec, List<Codec> deserializerCodecList) {
Objects.requireNonNull(serializerCodec);
Objects.requireNonNull(deserializerCodecList);

if (schemaRegistryClient == null) {
throw logger.logExceptionAsError(
new IllegalArgumentException("Schema registry client must be initialized and passed into builder."));
}
this.schemaRegistryClient = schemaRegistryClient;
this.serializerCodec = serializerCodec;
this.deserializerCodecMap.putAll(deserializerCodecMap);
}

/**
* Set Codec class to be used for serialized objects into bytes
*
* @param codec Codec instance
*/
protected void setSerializerCodec(Codec codec) {
if (this.serializerCodec != null) {
if (deserializerCodecList.size() == 0) {
throw logger.logExceptionAsError(
new IllegalArgumentException("Setting multiple encoders on serializer not permitted"));
new IllegalArgumentException("At least one Codec must be provided for deserialization."));
}

this.schemaRegistryClient = schemaRegistryClient;
this.serializerCodec = serializerCodec;
for (Codec c : deserializerCodecList) {
if (this.deserializerCodecMap.containsKey(c.getSchemaType())) {
throw logger.logExceptionAsError(
new IllegalArgumentException("Only on Codec can be provided per schema serialization type."));
}
this.deserializerCodecMap.put(c.getSchemaType(), c);
}
this.serializerCodec = codec;
this.schemaType = codec.getSchemaType();
}

/**
Expand Down Expand Up @@ -247,19 +255,6 @@ private String getSchemaIdFromPayload(ByteBuffer buffer) throws SerializationExc
return new String(schemaGuidByteArray, schemaRegistryClient.getEncoding());
}

/**
* Loads Codec to be used for decoding message payloads of specified schema type.
*
* @param codec Codec class instance to be loaded
*/
protected void addDeserializerCodec(Codec codec) {
if (codec == null) {
throw logger.logExceptionAsError(new IllegalArgumentException("'codec' cannot be null"));
}

this.deserializerCodecMap.put(codec.getSchemaType(), codec);
}

/**
* If auto-registering is enabled, register schema against Schema Registry.
* If auto-registering is disabled, fetch schema ID for provided schema. Requires pre-registering of schema
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public void testRegistryGuidPrefixedToPayload() {
.thenReturn(Mono.just(encoder.getSchemaString(null)));

TestDummySerializer serializer = new TestDummySerializer(
mockRegistryClient, true, false);
mockRegistryClient, false);

try {
ByteArrayOutputStream payload = serializer.serializeImpl(new ByteArrayOutputStream(), 1).block();
Expand Down Expand Up @@ -79,7 +79,6 @@ public void testRegistryGuidPrefixedToPayload() {
public void testNullPayloadThrowsSerializationException() {
TestDummySerializer serializer = new TestDummySerializer(
getMockClient(),
true,
false);

try {
Expand All @@ -90,24 +89,12 @@ public void testNullPayloadThrowsSerializationException() {
}
}

@Test
public void testSerializeWithNullByteEncoderThrows() {
// don't set byte encoder on constructor
TestDummySerializer serializer = new TestDummySerializer(
getMockClient(), false, false);

try {
serializer.serializeImpl(new ByteArrayOutputStream(), null);
} catch (SerializationException e) {
assert (true);
}
}

@Test
public void testIfRegistryNullThenThrow() {
try {
TestDummySerializer serializer = new TestDummySerializer(
null, true, false);
null, false);
fail("Building serializer instance with null registry client failed to throw");
} catch (IllegalArgumentException e) {
assertTrue(true);
Expand Down Expand Up @@ -135,7 +122,7 @@ public void testAddDeserializerCodec() throws IOException, SchemaRegistryClientE
Mockito.when(mockClient.getEncoding()).thenReturn(StandardCharsets.UTF_8);

// constructor loads deserializer codec
TestDummySerializer serializer = new TestDummySerializer(mockClient, true, true);
TestDummySerializer serializer = new TestDummySerializer(mockClient, true);

assertEquals(MOCK_GUID,
serializer.schemaRegistryClient.getSchemaById(MOCK_GUID).block().getSchemaId());
Expand All @@ -153,14 +140,14 @@ public void testAddDeserializerCodec() throws IOException, SchemaRegistryClientE
@Test
public void testNullPayload() throws SchemaRegistryClientException, SerializationException {
TestDummySerializer deserializer = new TestDummySerializer(
getMockClient(), true, true);
getMockClient(), true);
assertNull(deserializer.deserializeImpl(null).block());
}

@Test
public void testIfTooShortPayloadThrow() {
TestDummySerializer serializer = new TestDummySerializer(
getMockClient(), true, true);
getMockClient(), true);

try {
serializer.deserializeImpl(new ByteArrayInputStream("bad payload".getBytes())).block();
Expand All @@ -175,7 +162,7 @@ public void testIfTooShortPayloadThrow() {
@Test
public void testIfRegistryClientNullOnBuildThrow() {
try {
TestDummySerializer deserializer = new TestDummySerializer(null, true, true);
TestDummySerializer deserializer = new TestDummySerializer(null, true);
fail("should not get here.");
} catch (IllegalArgumentException e) {
// good
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,13 @@

import com.azure.data.schemaregistry.client.CachedSchemaRegistryAsyncClient;

import java.util.Collections;

public class TestDummySerializer extends AbstractSchemaRegistrySerializer {
TestDummySerializer(
CachedSchemaRegistryAsyncClient mockClient,
boolean byteEncoder,
boolean autoRegisterSchemas) {
super(mockClient);

// allows simulating improperly written serializer constructor that does not initialize byte encoder
if (byteEncoder) {
setSerializerCodec(new SampleCodec());
}

this.addDeserializerCodec(new SampleCodec());
super(mockClient, new SampleCodec(), Collections.singletonList(new SampleCodec()));

this.autoRegisterSchemas = autoRegisterSchemas;
}
Expand Down