Skip to content

Commit

Permalink
Serializer-level updates to API defs (#13668)
Browse files Browse the repository at this point in the history
* remove setting codecs

* update avro serializer constructor

* move codec load to constructor

* javadocs for constructor
  • Loading branch information
arerlend committed Jul 31, 2020
1 parent a69684a commit 6e73407
Show file tree
Hide file tree
Showing 6 changed files with 61 additions and 75 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,21 @@ public class AvroCodec implements Codec {
private final ClientLogger logger = new ClientLogger(AvroCodec.class);
private static final EncoderFactory ENCODER_FACTORY = EncoderFactory.get();
private static final DecoderFactory DECODER_FACTORY = DecoderFactory.get();
private final boolean avroSpecificReader;
private static final Boolean AVRO_SPECIFIC_READER_DEFAULT = false;

private final Boolean avroSpecificReader;

/**
* Instantiates AvroCodec instance
* @param avroSpecificReader flag indicating if decoder should decode records as SpecificRecords
*/
public AvroCodec(boolean avroSpecificReader) {
this.avroSpecificReader = avroSpecificReader;
public AvroCodec(Boolean avroSpecificReader) {
if (avroSpecificReader == null) {
this.avroSpecificReader = AvroCodec.AVRO_SPECIFIC_READER_DEFAULT;
}
else {
this.avroSpecificReader = avroSpecificReader;
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import reactor.core.publisher.Mono;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Collections;

/**
* Asynchronous registry-based serializer implementation.
Expand All @@ -26,15 +27,17 @@ public class SchemaRegistryAvroAsyncSerializer extends AbstractSchemaRegistrySer
* @param autoRegisterSchemas
*/
SchemaRegistryAvroAsyncSerializer(CachedSchemaRegistryAsyncClient registryClient, AvroCodec codec,
String schemaGroup, boolean autoRegisterSchemas) {
super(registryClient);

setSerializerCodec(codec);
addDeserializerCodec(codec);
String schemaGroup, Boolean autoRegisterSchemas) {
super(registryClient, codec, Collections.singletonList(codec));

// send configurations only
this.autoRegisterSchemas = autoRegisterSchemas;
this.schemaGroup = schemaGroup;
if (autoRegisterSchemas != null) {
this.autoRegisterSchemas = autoRegisterSchemas;
}

if (schemaGroup != null) {
this.schemaGroup = schemaGroup;
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@
public final class SchemaRegistryAvroSerializerBuilder {
private String registryUrl;
private TokenCredential credential;
private boolean autoRegisterSchemas;
private Boolean autoRegisterSchemas;
private String schemaGroup;
private Integer maxSchemaMapSize;
private boolean avroSpecificReader;
private Boolean avroSpecificReader;

/**
* Instantiates instance of Builder class.
Expand All @@ -28,8 +28,8 @@ public final class SchemaRegistryAvroSerializerBuilder {
public SchemaRegistryAvroSerializerBuilder() {
this.registryUrl = null;
this.credential = null;
this.autoRegisterSchemas = AbstractSchemaRegistrySerializer.AUTO_REGISTER_SCHEMAS_DEFAULT;
this.schemaGroup = AbstractSchemaRegistrySerializer.SCHEMA_GROUP_DEFAULT;
this.autoRegisterSchemas = null;
this.schemaGroup = null;
this.maxSchemaMapSize = null;
this.avroSpecificReader = false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentSkipListMap;

import static com.azure.core.util.FluxUtil.monoError;
Expand All @@ -30,42 +32,48 @@ public abstract class AbstractSchemaRegistrySerializer {

public static final Boolean AUTO_REGISTER_SCHEMAS_DEFAULT = false;
public static final String SCHEMA_GROUP_DEFAULT = "$default";
public static final int SCHEMA_ID_SIZE = 32;
static final int SCHEMA_ID_SIZE = 32;

protected CachedSchemaRegistryAsyncClient schemaRegistryClient;
CachedSchemaRegistryAsyncClient schemaRegistryClient;

protected Codec serializerCodec = null;
private Codec serializerCodec;
private final Map<String, Codec> deserializerCodecMap = new ConcurrentSkipListMap<>(String.CASE_INSENSITIVE_ORDER);
protected String schemaType;
private String schemaType;

protected Boolean autoRegisterSchemas = AbstractSchemaRegistrySerializer.AUTO_REGISTER_SCHEMAS_DEFAULT;
protected String schemaGroup = AbstractSchemaRegistrySerializer.SCHEMA_GROUP_DEFAULT;

/**
* @param schemaRegistryClient registry client to be used for storing schemas. Not null.
* Constructor for AbstractSchemaRegistrySerializer implementations.
*
* @param schemaRegistryClient client to be used for interfacing with Schema Registry service
* @param serializerCodec Codec to be used for serialization operations
* @param deserializerCodecList list of Codecs to be used to deserialize incoming payloads
*/
public AbstractSchemaRegistrySerializer(CachedSchemaRegistryAsyncClient schemaRegistryClient,
Codec serializerCodec, Map<String, Codec> deserializerCodecMap) {
Codec serializerCodec, List<Codec> deserializerCodecList) {
Objects.requireNonNull(serializerCodec);
Objects.requireNonNull(deserializerCodecList);

if (schemaRegistryClient == null) {
throw logger.logExceptionAsError(
new IllegalArgumentException("Schema registry client must be initialized and passed into builder."));
}
this.schemaRegistryClient = schemaRegistryClient;
this.serializerCodec = serializerCodec;
this.deserializerCodecMap.putAll(deserializerCodecMap);
}

/**
* Set Codec class to be used for serialized objects into bytes
*
* @param codec Codec instance
*/
protected void setSerializerCodec(Codec codec) {
if (this.serializerCodec != null) {
if (deserializerCodecList.size() == 0) {
throw logger.logExceptionAsError(
new IllegalArgumentException("Setting multiple encoders on serializer not permitted"));
new IllegalArgumentException("At least one Codec must be provided for deserialization."));
}

this.schemaRegistryClient = schemaRegistryClient;
this.serializerCodec = serializerCodec;
for (Codec c : deserializerCodecList) {
if (this.deserializerCodecMap.containsKey(c.getSchemaType())) {
throw logger.logExceptionAsError(
new IllegalArgumentException("Only on Codec can be provided per schema serialization type."));
}
this.deserializerCodecMap.put(c.getSchemaType(), c);
}
this.serializerCodec = codec;
this.schemaType = codec.getSchemaType();
}

/**
Expand Down Expand Up @@ -247,19 +255,6 @@ private String getSchemaIdFromPayload(ByteBuffer buffer) throws SerializationExc
return new String(schemaGuidByteArray, schemaRegistryClient.getEncoding());
}

/**
* Loads Codec to be used for decoding message payloads of specified schema type.
*
* @param codec Codec class instance to be loaded
*/
protected void addDeserializerCodec(Codec codec) {
if (codec == null) {
throw logger.logExceptionAsError(new IllegalArgumentException("'codec' cannot be null"));
}

this.deserializerCodecMap.put(codec.getSchemaType(), codec);
}

/**
* If auto-registering is enabled, register schema against Schema Registry.
* If auto-registering is disabled, fetch schema ID for provided schema. Requires pre-registering of schema
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public void testRegistryGuidPrefixedToPayload() {
.thenReturn(Mono.just(encoder.getSchemaString(null)));

TestDummySerializer serializer = new TestDummySerializer(
mockRegistryClient, true, false);
mockRegistryClient, false);

try {
ByteArrayOutputStream payload = serializer.serializeImpl(new ByteArrayOutputStream(), 1).block();
Expand Down Expand Up @@ -79,7 +79,6 @@ public void testRegistryGuidPrefixedToPayload() {
public void testNullPayloadThrowsSerializationException() {
TestDummySerializer serializer = new TestDummySerializer(
getMockClient(),
true,
false);

try {
Expand All @@ -90,24 +89,12 @@ public void testNullPayloadThrowsSerializationException() {
}
}

@Test
public void testSerializeWithNullByteEncoderThrows() {
// don't set byte encoder on constructor
TestDummySerializer serializer = new TestDummySerializer(
getMockClient(), false, false);

try {
serializer.serializeImpl(new ByteArrayOutputStream(), null);
} catch (SerializationException e) {
assert (true);
}
}

@Test
public void testIfRegistryNullThenThrow() {
try {
TestDummySerializer serializer = new TestDummySerializer(
null, true, false);
null, false);
fail("Building serializer instance with null registry client failed to throw");
} catch (IllegalArgumentException e) {
assertTrue(true);
Expand Down Expand Up @@ -135,7 +122,7 @@ public void testAddDeserializerCodec() throws IOException, SchemaRegistryClientE
Mockito.when(mockClient.getEncoding()).thenReturn(StandardCharsets.UTF_8);

// constructor loads deserializer codec
TestDummySerializer serializer = new TestDummySerializer(mockClient, true, true);
TestDummySerializer serializer = new TestDummySerializer(mockClient, true);

assertEquals(MOCK_GUID,
serializer.schemaRegistryClient.getSchemaById(MOCK_GUID).block().getSchemaId());
Expand All @@ -153,14 +140,14 @@ public void testAddDeserializerCodec() throws IOException, SchemaRegistryClientE
@Test
public void testNullPayload() throws SchemaRegistryClientException, SerializationException {
TestDummySerializer deserializer = new TestDummySerializer(
getMockClient(), true, true);
getMockClient(), true);
assertNull(deserializer.deserializeImpl(null).block());
}

@Test
public void testIfTooShortPayloadThrow() {
TestDummySerializer serializer = new TestDummySerializer(
getMockClient(), true, true);
getMockClient(), true);

try {
serializer.deserializeImpl(new ByteArrayInputStream("bad payload".getBytes())).block();
Expand All @@ -175,7 +162,7 @@ public void testIfTooShortPayloadThrow() {
@Test
public void testIfRegistryClientNullOnBuildThrow() {
try {
TestDummySerializer deserializer = new TestDummySerializer(null, true, true);
TestDummySerializer deserializer = new TestDummySerializer(null, true);
fail("should not get here.");
} catch (IllegalArgumentException e) {
// good
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,13 @@

import com.azure.data.schemaregistry.client.CachedSchemaRegistryAsyncClient;

import java.util.Collections;

public class TestDummySerializer extends AbstractSchemaRegistrySerializer {
TestDummySerializer(
CachedSchemaRegistryAsyncClient mockClient,
boolean byteEncoder,
boolean autoRegisterSchemas) {
super(mockClient);

// allows simulating improperly written serializer constructor that does not initialize byte encoder
if (byteEncoder) {
setSerializerCodec(new SampleCodec());
}

this.addDeserializerCodec(new SampleCodec());
super(mockClient, new SampleCodec(), Collections.singletonList(new SampleCodec()));

this.autoRegisterSchemas = autoRegisterSchemas;
}
Expand Down

0 comments on commit 6e73407

Please sign in to comment.