Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update schema registry API names #23886

Merged
merged 8 commits into from
Sep 2, 2021
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -73,57 +73,56 @@ public final class SchemaRegistryAsyncClient {
* Registers a new schema in the specified schema group with the given schema name. If the schema name already
* exists in this schema group, a new version with the updated schema string will be registered.
*
* @param schemaGroup The schema group.
* @param schemaName The schema name.
* @param schemaString The string representation of the schema.
* @param groupName The schema group.
* @param name The schema name.
* @param content The string representation of the schema.
* @param serializationType The serialization type of this schema.
*
* @return The {@link SchemaProperties} of a successfully registered schema.
*/
@ServiceMethod(returns = ReturnType.SINGLE)
public Mono<SchemaProperties> registerSchema(
String schemaGroup, String schemaName, String schemaString, SerializationType serializationType) {
return registerSchemaWithResponse(schemaGroup, schemaName, schemaString, serializationType)
public Mono<SchemaProperties> registerSchema(String groupName, String name, String content,
SerializationType serializationType) {
return registerSchemaWithResponse(groupName, name, content, serializationType)
.map(Response::getValue);
}

/**
* Registers a new schema in the specified schema group with the given schema name. If the schema name already
* exists in this schema group, a new version with the updated schema string will be registered.
*
* @param schemaGroup The schema group.
* @param schemaName The schema name.
* @param schemaString The string representation of the schema.
* @param groupName The schema group.
* @param name The schema name.
* @param content The string representation of the schema.
* @param serializationType The serialization type of this schema.
*
* @return The schema properties on successful registration of the schema.
*/
@ServiceMethod(returns = ReturnType.SINGLE)
public Mono<Response<SchemaProperties>> registerSchemaWithResponse(String schemaGroup, String schemaName,
String schemaString, SerializationType serializationType) {
return FluxUtil.withContext(context -> registerSchemaWithResponse(schemaGroup, schemaName, schemaString,
Mono<Response<SchemaProperties>> registerSchemaWithResponse(String groupName, String name, String content,
conniey marked this conversation as resolved.
Show resolved Hide resolved
SerializationType serializationType) {
return FluxUtil.withContext(context -> registerSchemaWithResponse(groupName, name, content,
serializationType, context));
}

Mono<Response<SchemaProperties>> registerSchemaWithResponse(String schemaGroup, String schemaName,
String schemaString, SerializationType serializationType, Context context) {
Mono<Response<SchemaProperties>> registerSchemaWithResponse(String groupName, String name, String content,
SerializationType serializationType, Context context) {
logger.verbose("Registering schema. Group: '{}', name: '{}', serialization type: '{}', payload: '{}'",
schemaGroup, schemaName, serializationType, schemaString);
groupName, name, serializationType, content);

return this.restService.getSchemas().registerWithResponseAsync(schemaGroup, schemaName,
com.azure.data.schemaregistry.implementation.models.SerializationType.AVRO, schemaString)
return this.restService.getSchemas().registerWithResponseAsync(groupName, name,
com.azure.data.schemaregistry.implementation.models.SerializationType.AVRO, content)
.handle((response, sink) -> {
SchemaId schemaId = response.getValue();
SchemaProperties registered = new SchemaProperties(schemaId.getId(),
serializationType,
schemaName,
schemaString.getBytes(SCHEMA_REGISTRY_SERVICE_ENCODING));
name,
content.getBytes(SCHEMA_REGISTRY_SERVICE_ENCODING));

schemaStringCache.putIfAbsent(getSchemaStringCacheKey(schemaGroup, schemaName, schemaString),
schemaStringCache.putIfAbsent(getSchemaStringCacheKey(groupName, name, content),
registered);
idCache.putIfAbsent(schemaId.getId(), registered);

logger.verbose("Cached schema string. Group: '{}', name: '{}'", schemaGroup, schemaName);
logger.verbose("Cached schema string. Group: '{}', name: '{}'", groupName, name);
SimpleResponse<SchemaProperties> schemaRegistryObjectSimpleResponse = new SimpleResponse<>(
response.getRequest(), response.getStatusCode(),
response.getHeaders(), registered);
Expand All @@ -132,34 +131,35 @@ Mono<Response<SchemaProperties>> registerSchemaWithResponse(String schemaGroup,
}

/**
* Gets the schema properties of the schema associated with the unique schemaId.
* @param schemaId The unique identifier of the schema.
* Gets the schema properties of the schema associated with the unique schema id.
*
* @param id The unique identifier of the schema.
*
* @return The {@link SchemaProperties} associated with the given {@code schemaId}.
* @return The {@link SchemaProperties} associated with the given {@code id}.
*/
@ServiceMethod(returns = ReturnType.SINGLE)
public Mono<SchemaProperties> getSchema(String schemaId) {
if (idCache.containsKey(schemaId)) {
logger.verbose("Cache hit for schema id '{}'", schemaId);
return Mono.fromCallable(() -> idCache.get(schemaId));
public Mono<SchemaProperties> getSchema(String id) {
if (idCache.containsKey(id)) {
logger.verbose("Cache hit for schema id '{}'", id);
return Mono.fromCallable(() -> idCache.get(id));
}
return getSchemaWithResponse(schemaId).map(Response::getValue);
return getSchemaWithResponse(id).map(Response::getValue);
}

/**
* Gets the schema properties of the schema associated with the unique schemaId.
* @param schemaId The unique identifier of the schema.
* Gets the schema properties of the schema associated with the unique schema id.
*
* @return The {@link SchemaProperties} associated with the given {@code schemaId} along with the HTTP
* response.
* @param id The unique identifier of the schema.
*
* @return The {@link SchemaProperties} associated with the given {@code id} along with the HTTP response.
*/
Mono<Response<SchemaProperties>> getSchemaWithResponse(String schemaId) {
return FluxUtil.withContext(context -> getSchemaWithResponse(schemaId, context));
Mono<Response<SchemaProperties>> getSchemaWithResponse(String id) {
return FluxUtil.withContext(context -> getSchemaWithResponse(id, context));
}

Mono<Response<SchemaProperties>> getSchemaWithResponse(String schemaId, Context context) {
Objects.requireNonNull(schemaId, "'schemaId' should not be null");
return this.restService.getSchemas().getByIdWithResponseAsync(schemaId)
Mono<Response<SchemaProperties>> getSchemaWithResponse(String id, Context context) {
Objects.requireNonNull(id, "'id' should not be null");
return this.restService.getSchemas().getByIdWithResponseAsync(id)
.handle((response, sink) -> {
final SerializationType serializationType =
SerializationType.fromString(response.getDeserializedHeaders().getSchemaType());
Expand All @@ -175,17 +175,17 @@ Mono<Response<SchemaProperties>> getSchemaWithResponse(String schemaId, Context

final String schemaGroup = matcher.group("schemaGroup");
final String schemaName = matcher.group("schemaName");
final SchemaProperties schemaObject = new SchemaProperties(schemaId,
final SchemaProperties schemaObject = new SchemaProperties(id,
serializationType,
schemaName,
response.getValue());
final String schemaCacheKey = getSchemaStringCacheKey(schemaGroup, schemaName,
new String(response.getValue(), SCHEMA_REGISTRY_SERVICE_ENCODING));
new String(response.getValue(), SCHEMA_REGISTRY_SERVICE_ENCODING));

schemaStringCache.putIfAbsent(schemaCacheKey, schemaObject);
idCache.putIfAbsent(schemaId, schemaObject);
idCache.putIfAbsent(id, schemaObject);

logger.verbose("Cached schema object. Path: '{}'", schemaId);
logger.verbose("Cached schema object. Path: '{}'", id);

SimpleResponse<SchemaProperties> schemaResponse = new SimpleResponse<>(
response.getRequest(), response.getStatusCode(),
Expand All @@ -199,73 +199,74 @@ Mono<Response<SchemaProperties>> getSchemaWithResponse(String schemaId, Context
* Gets the schema identifier associated with the given schema. Gets a cached value if it exists, otherwise makes a
* call to the service.
*
* @param schemaGroup The schema group.
* @param schemaName The schema name.
* @param schemaString The string representation of the schema.
* @param groupName The schema group.
* @param name The schema name.
* @param content The string representation of the schema.
* @param serializationType The serialization type of this schema.
*
* @return The unique identifier for this schema.
*/
@ServiceMethod(returns = ReturnType.SINGLE)
public Mono<String> getSchemaId(String schemaGroup, String schemaName, String schemaString,
public Mono<String> getSchemaId(String groupName, String name, String content,
SerializationType serializationType) {

String schemaStringCacheKey = getSchemaStringCacheKey(schemaGroup, schemaName, schemaString);
String schemaStringCacheKey = getSchemaStringCacheKey(groupName, name, content);

if (schemaStringCache.containsKey(schemaStringCacheKey)) {
return Mono.fromCallable(() -> {
logger.verbose("Cache hit schema string. Group: '{}', name: '{}'", schemaGroup, schemaName);
logger.verbose("Cache hit schema string. Group: '{}', name: '{}'", groupName, name);
return schemaStringCache.get(schemaStringCacheKey).getSchemaId();
});
}

return getSchemaIdWithResponse(schemaGroup, schemaName, schemaString, serializationType)
return getSchemaIdWithResponse(groupName, name, content, serializationType)
.map(response -> response.getValue());
}

/**
* Gets the schema identifier associated with the given schema. Always makes a call to the service.
*
* @param schemaGroup The schema group.
* @param schemaName The schema name.
* @param schemaString The string representation of the schema.
* @param groupName The schema group.
* @param name The schema name.
* @param content The string representation of the schema.
* @param serializationType The serialization type of this schema.
*
* @return The unique identifier for this schema.
*/
Mono<Response<String>> getSchemaIdWithResponse(String schemaGroup, String schemaName, String schemaString,
Mono<Response<String>> getSchemaIdWithResponse(String groupName, String name, String content,
SerializationType serializationType) {

return FluxUtil.withContext(context ->
getSchemaIdWithResponse(schemaGroup, schemaName, schemaString, serializationType, context));
getSchemaIdWithResponse(groupName, name, content, serializationType, context));
}

/**
* Gets the schema id associated with the schema name a string representation of the schema.
*
* @param schemaGroup The schema group.
* @param schemaName The schema name.
* @param schemaString The string representation of the schema.
* @param groupName The schema group.
* @param name The schema name.
* @param content The string representation of the schema.
* @param serializationType The serialization type of this schema.
* @param context Context to pass along with this request.
*
* @return A mono that completes with the schema id.
*/
Mono<Response<String>> getSchemaIdWithResponse(String schemaGroup, String schemaName, String schemaString,
Mono<Response<String>> getSchemaIdWithResponse(String groupName, String name, String content,
SerializationType serializationType, Context context) {

return this.restService.getSchemas()
.queryIdByContentWithResponseAsync(schemaGroup, schemaName,
com.azure.data.schemaregistry.implementation.models.SerializationType.AVRO, schemaString)
.queryIdByContentWithResponseAsync(groupName, name,
com.azure.data.schemaregistry.implementation.models.SerializationType.AVRO, content)
.handle((response, sink) -> {
SchemaId schemaId = response.getValue();
SchemaProperties properties = new SchemaProperties(schemaId.getId(), serializationType, schemaName,
schemaString.getBytes(SCHEMA_REGISTRY_SERVICE_ENCODING));
SchemaProperties properties = new SchemaProperties(schemaId.getId(), serializationType, name,
content.getBytes(SCHEMA_REGISTRY_SERVICE_ENCODING));

schemaStringCache.putIfAbsent(
getSchemaStringCacheKey(schemaGroup, schemaName, schemaString), properties);
getSchemaStringCacheKey(groupName, name, content), properties);
idCache.putIfAbsent(schemaId.getId(), properties);

logger.verbose("Cached schema string. Group: '{}', name: '{}'", schemaGroup, schemaName);
logger.verbose("Cached schema string. Group: '{}', name: '{}'", groupName, name);

SimpleResponse<String> schemaIdResponse = new SimpleResponse<>(
response.getRequest(), response.getStatusCode(),
Expand All @@ -274,16 +275,7 @@ Mono<Response<String>> getSchemaIdWithResponse(String schemaGroup, String schema
});
}

/**
* Explicit call to clear all caches.
*/
void clearCache() {
idCache.clear();
schemaStringCache.clear();
typeParserMap.clear();
}

private static String getSchemaStringCacheKey(String schemaGroup, String schemaName, String schemaString) {
return schemaGroup + schemaName + schemaString;
private static String getSchemaStringCacheKey(String groupName, String name, String content) {
return groupName + name + content;
}
}
Loading