Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Schema registry api view #13676

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

import com.azure.core.experimental.serializer.ObjectSerializer;
import com.azure.core.util.logging.ClientLogger;
import com.azure.data.schemaregistry.AbstractSchemaRegistrySerializer;
import com.azure.data.schemaregistry.SchemaRegistrySerializer;
import com.azure.data.schemaregistry.SerializationException;
import com.azure.data.schemaregistry.client.CachedSchemaRegistryAsyncClient;
import reactor.core.publisher.Mono;
Expand All @@ -16,7 +16,7 @@
/**
* Asynchronous registry-based serializer implementation.
*/
public class SchemaRegistryAvroAsyncSerializer extends AbstractSchemaRegistrySerializer implements ObjectSerializer {
public class SchemaRegistryAvroAsyncSerializer extends SchemaRegistrySerializer implements ObjectSerializer {
private final ClientLogger logger = new ClientLogger(SchemaRegistryAvroAsyncSerializer.class);

/**
Expand All @@ -28,16 +28,7 @@ public class SchemaRegistryAvroAsyncSerializer extends AbstractSchemaRegistrySer
*/
SchemaRegistryAvroAsyncSerializer(CachedSchemaRegistryAsyncClient registryClient, AvroCodec codec,
String schemaGroup, Boolean autoRegisterSchemas) {
super(registryClient, codec, Collections.singletonList(codec));

// send configurations only
if (autoRegisterSchemas != null) {
this.autoRegisterSchemas = autoRegisterSchemas;
}

if (schemaGroup != null) {
this.schemaGroup = schemaGroup;
}
super(registryClient, codec, Collections.singletonList(codec), autoRegisterSchemas, schemaGroup);
}

@Override
Expand All @@ -46,12 +37,12 @@ public <S extends OutputStream> Mono<S> serialize(S s, Object o) {
return Mono.empty();
}

return this.serializeImpl(s, o);
return super.serialize(s, o);
}

@Override
public <T> Mono<T> deserialize(InputStream stream, Class<T> clazz) {
return this.deserializeImpl(stream)
return this.deserialize(stream)
.map(o -> {
if (clazz.isInstance(o)) {
return clazz.cast(o);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

package com.azure.data.schemaregistry.avro;

import com.azure.data.schemaregistry.AbstractSchemaRegistrySerializer;
import com.azure.data.schemaregistry.SchemaRegistrySerializer;
import com.azure.data.schemaregistry.SerializationException;

import java.io.ByteArrayInputStream;
Expand All @@ -17,7 +17,7 @@
*
* Pluggable with the core Azure SDK Serializer interface.
*
* @see AbstractSchemaRegistrySerializer See AbstractSchemaRegistrySerializer for internal serialization implementation
* @see SchemaRegistrySerializer See AbstractSchemaRegistrySerializer for internal serialization implementation
*/
public class SchemaRegistryAvroSerializer {
private final SchemaRegistryAvroAsyncSerializer serializer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
package com.azure.data.schemaregistry.avro;

import com.azure.core.credential.TokenCredential;
import com.azure.data.schemaregistry.AbstractSchemaRegistrySerializer;
import com.azure.data.schemaregistry.client.CachedSchemaRegistryAsyncClient;
import com.azure.data.schemaregistry.client.CachedSchemaRegistryClientBuilder;

Expand All @@ -18,7 +17,7 @@ public final class SchemaRegistryAvroSerializerBuilder {
private TokenCredential credential;
private Boolean autoRegisterSchemas;
private String schemaGroup;
private Integer maxSchemaMapSize;
private Integer maxCacheSize;
private Boolean avroSpecificReader;

/**
Expand All @@ -30,7 +29,7 @@ public SchemaRegistryAvroSerializerBuilder() {
this.credential = null;
this.autoRegisterSchemas = null;
this.schemaGroup = null;
this.maxSchemaMapSize = null;
this.maxCacheSize = null;
this.avroSpecificReader = false;
}

Expand Down Expand Up @@ -102,11 +101,11 @@ public SchemaRegistryAvroSerializerBuilder avroSpecificReader(boolean avroSpecif
* Specifies maximum schema object cache size for underlying CachedSchemaRegistryAsyncClient. If specified cache
* size is exceeded, all caches are recycled.
*
* @param maxSchemaMapSize maximum number of schemas per cache
* @param maxCacheSize maximum number of schemas per cache
* @return updated {@link SchemaRegistryAvroSerializerBuilder} instance
*/
public SchemaRegistryAvroSerializerBuilder maxSchemaMapSize(int maxSchemaMapSize) {
this.maxSchemaMapSize = maxSchemaMapSize;
public SchemaRegistryAvroSerializerBuilder maxCacheSize(int maxCacheSize) {
this.maxCacheSize = maxCacheSize;
return this;
}

Expand All @@ -117,8 +116,8 @@ public SchemaRegistryAvroSerializerBuilder maxSchemaMapSize(int maxSchemaMapSize
* @throws NullPointerException if parameters are incorrectly set.
* @throws IllegalArgumentException if credential is not set.
*/
public SchemaRegistryAvroSerializer buildClient() {
return new SchemaRegistryAvroSerializer(this.buildAsyncClient());
public SchemaRegistryAvroSerializer buildSerializer() {
return new SchemaRegistryAvroSerializer(this.buildAsyncSerializer());
}

/**
Expand All @@ -128,19 +127,19 @@ public SchemaRegistryAvroSerializer buildClient() {
* @throws NullPointerException if parameters are incorrectly set.
* @throws IllegalArgumentException if credential is not set.
*/
public SchemaRegistryAvroAsyncSerializer buildAsyncClient() {
public SchemaRegistryAvroAsyncSerializer buildAsyncSerializer() {
CachedSchemaRegistryClientBuilder builder = new CachedSchemaRegistryClientBuilder()
.endpoint(registryUrl)
.credential(credential);

if (maxSchemaMapSize != null) {
builder.maxSchemaMapSize(maxSchemaMapSize);
if (maxCacheSize != null) {
builder.maxCacheSize(maxCacheSize);
}

AvroCodec codec = new AvroCodec(this.avroSpecificReader);

CachedSchemaRegistryAsyncClient client = builder
.addSchemaParser(codec)
.addCodec(codec)
.buildAsyncClient();

return new SchemaRegistryAvroAsyncSerializer(client, codec, this.schemaGroup,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,11 @@
/**
* Common implementation for all registry-based serializers.
*/
public abstract class AbstractSchemaRegistrySerializer {
private final ClientLogger logger = new ClientLogger(AbstractSchemaRegistrySerializer.class);
public abstract class SchemaRegistrySerializer {
private final ClientLogger logger = new ClientLogger(SchemaRegistrySerializer.class);

public static final Boolean AUTO_REGISTER_SCHEMAS_DEFAULT = false;
public static final String SCHEMA_GROUP_DEFAULT = "$default";
static final Boolean AUTO_REGISTER_SCHEMAS_DEFAULT = false;
static final String SCHEMA_GROUP_DEFAULT = "$default";
static final int SCHEMA_ID_SIZE = 32;

CachedSchemaRegistryAsyncClient schemaRegistryClient;
Expand All @@ -40,8 +40,8 @@ public abstract class AbstractSchemaRegistrySerializer {
private final Map<String, Codec> deserializerCodecMap = new ConcurrentSkipListMap<>(String.CASE_INSENSITIVE_ORDER);
private String schemaType;

protected Boolean autoRegisterSchemas = AbstractSchemaRegistrySerializer.AUTO_REGISTER_SCHEMAS_DEFAULT;
protected String schemaGroup = AbstractSchemaRegistrySerializer.SCHEMA_GROUP_DEFAULT;
Boolean autoRegisterSchemas = SchemaRegistrySerializer.AUTO_REGISTER_SCHEMAS_DEFAULT;
String schemaGroup = SchemaRegistrySerializer.SCHEMA_GROUP_DEFAULT;

/**
* Constructor for AbstractSchemaRegistrySerializer implementations.
Expand All @@ -50,8 +50,15 @@ public abstract class AbstractSchemaRegistrySerializer {
* @param serializerCodec Codec to be used for serialization operations
* @param deserializerCodecList list of Codecs to be used to deserialize incoming payloads
*/
public AbstractSchemaRegistrySerializer(CachedSchemaRegistryAsyncClient schemaRegistryClient,
public SchemaRegistrySerializer(CachedSchemaRegistryAsyncClient schemaRegistryClient,
Codec serializerCodec, List<Codec> deserializerCodecList) {
this(schemaRegistryClient, serializerCodec, deserializerCodecList, null, null);
}

public <T> SchemaRegistrySerializer(CachedSchemaRegistryAsyncClient schemaRegistryClient,
Codec serializerCodec, List<Codec> deserializerCodecList, Boolean autoRegisterSchemas,
String schemaGroup) {

Objects.requireNonNull(serializerCodec);
Objects.requireNonNull(deserializerCodecList);

Expand All @@ -74,6 +81,15 @@ public AbstractSchemaRegistrySerializer(CachedSchemaRegistryAsyncClient schemaRe
}
this.deserializerCodecMap.put(c.getSchemaType(), c);
}

// send configurations only
if (autoRegisterSchemas != null) {
this.autoRegisterSchemas = autoRegisterSchemas;
}

if (schemaGroup != null) {
this.schemaGroup = schemaGroup;
}
}

/**
Expand All @@ -87,7 +103,7 @@ public AbstractSchemaRegistrySerializer(CachedSchemaRegistryAsyncClient schemaRe
* @return byte array containing encoded bytes with prefixed schema ID
* @throws SerializationException if serialization operation fails during runtime.
*/
protected <T extends OutputStream> Mono<T> serializeImpl(T s, Object object) {
protected <T extends OutputStream> Mono<T> serialize(T s, Object object) {
if (object == null) {
return monoError(logger, new SerializationException(
"Null object, behavior should be defined in concrete serializer implementation."));
Expand Down Expand Up @@ -133,7 +149,7 @@ protected <T extends OutputStream> Mono<T> serializeImpl(T s, Object object) {
}
})
.handle((id, sink) -> {
ByteBuffer idBuffer = ByteBuffer.allocate(AbstractSchemaRegistrySerializer.SCHEMA_ID_SIZE)
ByteBuffer idBuffer = ByteBuffer.allocate(SchemaRegistrySerializer.SCHEMA_ID_SIZE)
.put(id.getBytes(StandardCharsets.UTF_8));
try {
s.write(idBuffer.array());
Expand All @@ -154,7 +170,7 @@ protected <T extends OutputStream> Mono<T> serializeImpl(T s, Object object) {
* @return object, deserialized with the prefixed schema
* @throws SerializationException if deserialization of registry schema or message payload fails.
*/
protected Mono<Object> deserializeImpl(InputStream s) throws SerializationException {
protected Mono<Object> deserialize(InputStream s) throws SerializationException {
if (s == null) {
return Mono.empty();
}
Expand All @@ -173,9 +189,9 @@ protected Mono<Object> deserializeImpl(InputStream s) throws SerializationExcept
String schemaId = getSchemaIdFromPayload(buffer);
System.out.println(schemaId);

SchemaRegistryObject block = this.schemaRegistryClient.getSchemaById(schemaId).block();
SchemaRegistryObject block = this.schemaRegistryClient.getSchema(schemaId).block();
System.out.println(block);
return this.schemaRegistryClient.getSchemaById(schemaId)
return this.schemaRegistryClient.getSchema(schemaId)
.onErrorMap(IOException.class,
e -> logger.logExceptionAsError(new SerializationException(e.getMessage(), e)))
.handle((registryObject, sink) -> {
Expand All @@ -190,7 +206,7 @@ protected Mono<Object> deserializeImpl(InputStream s) throws SerializationExcept
}

int start = buffer.position() + buffer.arrayOffset();
int length = buffer.limit() - AbstractSchemaRegistrySerializer.SCHEMA_ID_SIZE;
int length = buffer.limit() - SchemaRegistrySerializer.SCHEMA_ID_SIZE;
byte[] b = Arrays.copyOfRange(buffer.array(), start, start + length);

Codec codec = getDeserializerCodec(registryObject);
Expand Down Expand Up @@ -245,14 +261,14 @@ private Codec getDeserializerCodec(SchemaRegistryObject registryObject) throws S
* @throws SerializationException if schema ID could not be extracted from payload
*/
private String getSchemaIdFromPayload(ByteBuffer buffer) throws SerializationException {
byte[] schemaGuidByteArray = new byte[AbstractSchemaRegistrySerializer.SCHEMA_ID_SIZE];
byte[] schemaGuidByteArray = new byte[SchemaRegistrySerializer.SCHEMA_ID_SIZE];
try {
buffer.get(schemaGuidByteArray);
} catch (BufferUnderflowException e) {
throw logger.logExceptionAsError(new SerializationException("Payload too short, no readable guid.", e));
}

return new String(schemaGuidByteArray, schemaRegistryClient.getEncoding());
return new String(schemaGuidByteArray, StandardCharsets.UTF_8);
}

/**
Expand All @@ -270,7 +286,7 @@ private String getSchemaIdFromPayload(ByteBuffer buffer) throws SerializationExc
private Mono<String> maybeRegisterSchema(
String schemaGroup, String schemaName, String schemaString, String schemaType) {
if (this.autoRegisterSchemas) {
return this.schemaRegistryClient.register(schemaGroup, schemaName, schemaString, schemaType)
return this.schemaRegistryClient.registerSchema(schemaGroup, schemaName, schemaString, schemaType)
.map(SchemaRegistryObject::getSchemaId);
} else {
return this.schemaRegistryClient.getSchemaId(
Expand Down
Loading