Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Schema registry api view updates #13678

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -408,7 +408,7 @@
<!-- Suppress the check on code-gen classes -->
<suppress checks="LineLength" files="com.azure.ai.textanalytics.implementation.TextAnalyticsClientImplBuilder"/>
<suppress checks="LineLength" files="com.azure.ai.textanalytics.implementation.TextAnalyticsClientImpl"/>
<suppress checks="." files="com.azure.data.schemaregistry.client.implementation.AzureSchemaRegistryRestService"/>
<suppress checks="." files="AzureSchemaRegistryRestService"/>

<!-- Suppress the check on code-gen classes -->
<suppress checks="LineLength" files="com.azure.ai.formrecognizer.implementation.FormRecognizerClientImplBuilder"/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

import com.azure.core.util.logging.ClientLogger;
import com.azure.data.schemaregistry.Codec;
import com.azure.data.schemaregistry.SerializationException;
import com.azure.data.schemaregistry.models.SerializationException;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericDatumWriter;
Expand Down Expand Up @@ -79,14 +79,19 @@ public String getSchemaName(Object object) {
return AvroSchemaUtils.getSchema(object).getFullName();
}

@Override
public String getSchemaGroup() {
return "$Default";
}

/**
* Returns ByteArrayOutputStream containing Avro encoding of object parameter
* @param object Object to be encoded into byte stream
* @return closed ByteArrayOutputStream
* @throws SerializationException wraps runtime exceptions
*/
@Override
public ByteArrayOutputStream encode(Object object) {
public byte[] encode(Object object) {
Schema schema = AvroSchemaUtils.getSchema(object);

try {
Expand All @@ -104,7 +109,7 @@ public ByteArrayOutputStream encode(Object object) {
writer.write(object, encoder);
encoder.flush();
}
return out;
return out.toByteArray();
} catch (IOException | RuntimeException e) {
// Avro serialization can throw AvroRuntimeException, NullPointerException, ClassCastException, etc
throw logger.logExceptionAsError(
Expand All @@ -119,7 +124,7 @@ public ByteArrayOutputStream encode(Object object) {
* @return deserialized object
* @throws SerializationException upon deserialization failure
*/
public Object decodeBytes(byte[] b, Object object) {
public Object decode(byte[] b, Object object) {
Objects.requireNonNull(object, "Schema must not be null.");

if (!(object instanceof Schema)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@
import com.azure.core.experimental.serializer.ObjectSerializer;
import com.azure.core.util.logging.ClientLogger;
import com.azure.data.schemaregistry.SchemaRegistrySerializer;
import com.azure.data.schemaregistry.SerializationException;
import com.azure.data.schemaregistry.client.CachedSchemaRegistryAsyncClient;
import com.azure.data.schemaregistry.models.SerializationException;
import com.azure.data.schemaregistry.CachedSchemaRegistryAsyncClient;
import reactor.core.publisher.Mono;
import java.io.InputStream;
import java.io.OutputStream;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
package com.azure.data.schemaregistry.avro;

import com.azure.data.schemaregistry.SchemaRegistrySerializer;
import com.azure.data.schemaregistry.SerializationException;
import com.azure.data.schemaregistry.models.SerializationException;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
package com.azure.data.schemaregistry.avro;

import com.azure.core.credential.TokenCredential;
import com.azure.data.schemaregistry.client.CachedSchemaRegistryAsyncClient;
import com.azure.data.schemaregistry.client.CachedSchemaRegistryClientBuilder;
import com.azure.data.schemaregistry.CachedSchemaRegistryAsyncClient;
import com.azure.data.schemaregistry.CachedSchemaRegistryClientBuilder;

import java.util.Objects;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,20 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.data.schemaregistry.client;
package com.azure.data.schemaregistry;

import com.azure.core.annotation.ReturnType;
import com.azure.core.annotation.ServiceClient;
import com.azure.core.annotation.ServiceMethod;
import com.azure.core.exception.HttpResponseException;
import com.azure.core.http.rest.Response;
import com.azure.core.http.rest.SimpleResponse;
import com.azure.core.util.Context;
import com.azure.core.util.logging.ClientLogger;
import com.azure.data.schemaregistry.client.implementation.AzureSchemaRegistryRestService;
import com.azure.data.schemaregistry.client.implementation.models.SchemaId;
import com.azure.data.schemaregistry.implementation.AzureSchemaRegistryRestService;
import com.azure.data.schemaregistry.implementation.models.SchemaId;
import com.azure.data.schemaregistry.models.SchemaRegistryClientException;
import com.azure.data.schemaregistry.models.SchemaRegistryObject;
import reactor.core.publisher.Mono;

import java.nio.charset.Charset;
Expand Down Expand Up @@ -78,8 +82,10 @@ public final class CachedSchemaRegistryAsyncClient {
this.maxSchemaMapSize = MAX_SCHEMA_MAP_SIZE_DEFAULT;
}

@ServiceMethod(returns = ReturnType.SINGLE)
public Mono<SchemaRegistryObject> registerSchema(
String schemaGroup, String schemaName, String schemaString, String schemaType) {

if (schemaStringCache.containsKey(getSchemaStringCacheKey(schemaGroup, schemaName, schemaString))) {
logger.verbose(
"Cache hit schema string. Group: '{}', name: '{}', schema type: '{}', payload: '{}'",
Expand All @@ -92,6 +98,7 @@ public Mono<SchemaRegistryObject> registerSchema(
.map(response -> response.getValue());
}

@ServiceMethod(returns = ReturnType.SINGLE)
public Mono<Response<SchemaRegistryObject>> registerSchemaWithResponse(String schemaGroup, String schemaName,
String schemaString, String schemaType) {
return registerSchemaWithResponse(schemaGroup, schemaName, schemaString, schemaType, Context.NONE);
Expand Down Expand Up @@ -123,6 +130,8 @@ Mono<Response<SchemaRegistryObject>> registerSchemaWithResponse(String schemaGro

SchemaRegistryObject registered = new SchemaRegistryObject(schemaId.getId(),
schemaType,
schemaName,
schemaGroup,
schemaString.getBytes(SCHEMA_REGISTRY_SERVICE_ENCODING),
getParseFunc(schemaType));

Expand All @@ -137,6 +146,7 @@ Mono<Response<SchemaRegistryObject>> registerSchemaWithResponse(String schemaGro
});
}

@ServiceMethod(returns = ReturnType.SINGLE)
public Mono<SchemaRegistryObject> getSchema(String schemaId) {
if (idCache.containsKey(schemaId)) {
logger.verbose("Cache hit for schema id '{}'", schemaId);
Expand All @@ -145,6 +155,7 @@ public Mono<SchemaRegistryObject> getSchema(String schemaId) {
return getSchemaWithResponse(schemaId).map(Response::getValue);
}

@ServiceMethod(returns = ReturnType.SINGLE)
public Mono<Response<SchemaRegistryObject>> getSchemaWithResponse(String schemaId) {
return getSchemaWithResponse(schemaId, Context.NONE);
}
Expand Down Expand Up @@ -172,6 +183,8 @@ Mono<Response<SchemaRegistryObject>> getSchemaWithResponse(String schemaId, Cont

SchemaRegistryObject schemaObject = new SchemaRegistryObject(schemaId,
schemaType,
null,
null,
response.getValue().getBytes(SCHEMA_REGISTRY_SERVICE_ENCODING),
getParseFunc(schemaType));

Expand All @@ -186,6 +199,7 @@ Mono<Response<SchemaRegistryObject>> getSchemaWithResponse(String schemaId, Cont
}


@ServiceMethod(returns = ReturnType.SINGLE)
public Mono<String> getSchemaId(String schemaGroup, String schemaName, String schemaString, String schemaType) {

if (schemaStringCache.containsKey(getSchemaStringCacheKey(schemaGroup, schemaName, schemaString))) {
Expand All @@ -197,6 +211,7 @@ public Mono<String> getSchemaId(String schemaGroup, String schemaName, String sc
.map(response -> response.getValue());
}

@ServiceMethod(returns = ReturnType.SINGLE)
public Mono<Response<String>> getSchemaIdWithResponse(String schemaGroup, String schemaName, String schemaString,
String schemaType) {
return getSchemaIdWithResponse(schemaGroup, schemaName, schemaString, schemaType, Context.NONE);
Expand Down Expand Up @@ -231,6 +246,8 @@ Mono<Response<String>> getSchemaIdWithResponse(String schemaGroup, String schema
new SchemaRegistryObject(
schemaId.getId(),
schemaType,
schemaName,
schemaGroup,
schemaString.getBytes(SCHEMA_REGISTRY_SERVICE_ENCODING),
getParseFunc(schemaType)));
logger.verbose("Cached schema string. Group: '{}', name: '{}'", schemaGroup, schemaName);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
package com.azure.data.schemaregistry.client;
package com.azure.data.schemaregistry;

import com.azure.core.annotation.ReturnType;
import com.azure.core.annotation.ServiceClient;
import com.azure.core.annotation.ServiceMethod;
import com.azure.core.http.rest.Response;
import com.azure.core.util.Context;
import com.azure.data.schemaregistry.client.implementation.AzureSchemaRegistryRestService;
import com.azure.data.schemaregistry.implementation.AzureSchemaRegistryRestService;
import com.azure.data.schemaregistry.models.SchemaRegistryObject;

@ServiceClient(
builder = CachedSchemaRegistryClientBuilder.class,
Expand All @@ -15,33 +18,43 @@ public final class CachedSchemaRegistryClient {
this.asyncClient = asyncClient;
}

@ServiceMethod(returns = ReturnType.SINGLE)
public SchemaRegistryObject registerSchema(String schemaGroup, String schemaName, String schemaString,
String schemaType) {
return registerSchemaWithResponse(schemaGroup, schemaName, schemaString, schemaType, Context.NONE).getValue();
}

@ServiceMethod(returns = ReturnType.SINGLE)
public Response<SchemaRegistryObject> registerSchemaWithResponse(String schemaGroup, String schemaName,
String schemaString, String schemaType, Context context) {
return this.asyncClient.registerSchemaWithResponse(schemaGroup, schemaName, schemaString, schemaType,
context).block();
}

@ServiceMethod(returns = ReturnType.SINGLE)
public SchemaRegistryObject getSchema(String schemaId) {
return getSchemaWithResponse(schemaId, Context.NONE).getValue();
}

@ServiceMethod(returns = ReturnType.SINGLE)
public Response<SchemaRegistryObject> getSchemaWithResponse(String schemaId, Context context) {
return this.asyncClient.getSchemaWithResponse(schemaId).block();
}

@ServiceMethod(returns = ReturnType.SINGLE)
public String getSchemaId(String schemaGroup, String schemaName, String schemaString, String schemaType) {
return getSchemaIdWithResponse(schemaGroup, schemaName, schemaString, schemaType, Context.NONE).getValue();
}

@ServiceMethod(returns = ReturnType.SINGLE)
public Response<String> getSchemaIdWithResponse(String schemaGroup, String schemaName, String schemaString,
String schemaType, Context context) {
return this.asyncClient.getSchemaIdWithResponse(schemaGroup, schemaName, schemaString, schemaType, context)
.block();
}

public void clearCache() {
this.asyncClient.clearCache();
}

}
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.data.schemaregistry.client;
package com.azure.data.schemaregistry;

import com.azure.core.annotation.ServiceClientBuilder;
import com.azure.core.credential.TokenCredential;
Expand All @@ -23,9 +23,8 @@
import com.azure.core.util.Configuration;
import com.azure.core.util.CoreUtils;
import com.azure.core.util.logging.ClientLogger;
import com.azure.data.schemaregistry.Codec;
import com.azure.data.schemaregistry.client.implementation.AzureSchemaRegistryRestService;
import com.azure.data.schemaregistry.client.implementation.AzureSchemaRegistryRestServiceClientBuilder;
import com.azure.data.schemaregistry.implementation.AzureSchemaRegistryRestService;
import com.azure.data.schemaregistry.implementation.AzureSchemaRegistryRestServiceClientBuilder;
import java.net.MalformedURLException;
import java.net.URL;
import java.time.temporal.ChronoUnit;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

package com.azure.data.schemaregistry;

import java.io.ByteArrayOutputStream;
import com.azure.data.schemaregistry.models.SerializationException;

/**
* An interface defining operations required for registry-based serialization and deserialization.
Expand Down Expand Up @@ -34,6 +34,8 @@ public interface Codec {
*/
String getSchemaName(Object object);

String getSchemaGroup();

/**
* Returns string representation of schema object to be stored in the service.
*
Expand All @@ -49,7 +51,7 @@ public interface Codec {
* @return output stream containing byte representation of object
* @throws SerializationException if generating byte representation of object fails
*/
ByteArrayOutputStream encode(Object object);
byte[] encode(Object object);

/**
* Decodes byte array into Object given provided schema object.
Expand All @@ -58,5 +60,5 @@ public interface Codec {
* @return deserialized object
* @throws SerializationException if decode operation fails
*/
Object decodeBytes(byte[] encodedBytes, Object schemaObject);
Object decode(byte[] encodedBytes, Object schemaObject);
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@

import com.azure.core.exception.HttpResponseException;
import com.azure.core.util.logging.ClientLogger;
import com.azure.data.schemaregistry.client.CachedSchemaRegistryAsyncClient;
import com.azure.data.schemaregistry.client.SchemaRegistryClientException;
import com.azure.data.schemaregistry.client.SchemaRegistryObject;
import com.azure.data.schemaregistry.models.SchemaRegistryClientException;
import com.azure.data.schemaregistry.models.SchemaRegistryObject;
import com.azure.data.schemaregistry.models.SerializationException;
import reactor.core.publisher.Mono;

import java.io.IOException;
Expand Down Expand Up @@ -55,7 +55,7 @@ public SchemaRegistrySerializer(CachedSchemaRegistryAsyncClient schemaRegistryCl
this(schemaRegistryClient, serializerCodec, deserializerCodecList, null, null);
}

public <T> SchemaRegistrySerializer(CachedSchemaRegistryAsyncClient schemaRegistryClient,
public SchemaRegistrySerializer(CachedSchemaRegistryAsyncClient schemaRegistryClient,
Codec serializerCodec, List<Codec> deserializerCodecList, Boolean autoRegisterSchemas,
String schemaGroup) {

Expand Down Expand Up @@ -153,7 +153,7 @@ protected <T extends OutputStream> Mono<T> serialize(T s, Object object) {
.put(id.getBytes(StandardCharsets.UTF_8));
try {
s.write(idBuffer.array());
serializerCodec.encode(object).writeTo(s);
s.write(serializerCodec.encode(object));
} catch (IOException e) {
sink.error(new SerializationException(e.getMessage(), e));
}
Expand All @@ -170,7 +170,7 @@ protected <T extends OutputStream> Mono<T> serialize(T s, Object object) {
* @return object, deserialized with the prefixed schema
* @throws SerializationException if deserialization of registry schema or message payload fails.
*/
protected Mono<Object> deserialize(InputStream s) throws SerializationException {
protected Mono<Object> deserialize(InputStream s) {
if (s == null) {
return Mono.empty();
}
Expand All @@ -195,7 +195,7 @@ protected Mono<Object> deserialize(InputStream s) throws SerializationException
.onErrorMap(IOException.class,
e -> logger.logExceptionAsError(new SerializationException(e.getMessage(), e)))
.handle((registryObject, sink) -> {
Object payloadSchema = registryObject.deserialize();
Object payloadSchema = registryObject.getSchema();

if (payloadSchema == null) {
sink.error(logger.logExceptionAsError(
Expand All @@ -210,7 +210,7 @@ protected Mono<Object> deserialize(InputStream s) throws SerializationException
byte[] b = Arrays.copyOfRange(buffer.array(), start, start + length);

Codec codec = getDeserializerCodec(registryObject);
sink.next(codec.decodeBytes(b, payloadSchema));
sink.next(codec.decode(b, payloadSchema));
})
.onErrorMap(e -> {
if (e instanceof SchemaRegistryClientException) {
Expand Down

This file was deleted.

Loading