diff --git a/json-schema-provider/src/main/java/io/confluent/kafka/schemaregistry/json/JsonSchema.java b/json-schema-provider/src/main/java/io/confluent/kafka/schemaregistry/json/JsonSchema.java index 0faca24a764..8990fa71499 100644 --- a/json-schema-provider/src/main/java/io/confluent/kafka/schemaregistry/json/JsonSchema.java +++ b/json-schema-provider/src/main/java/io/confluent/kafka/schemaregistry/json/JsonSchema.java @@ -262,6 +262,11 @@ public void validate() { } public void validate(Object value) throws JsonProcessingException, ValidationException { + validate(rawSchema(), value); + } + + public static void validate(Schema schema, Object value) + throws JsonProcessingException, ValidationException { Object primitiveValue = NONE_MARKER; if (isPrimitive(value)) { primitiveValue = value; @@ -277,7 +282,7 @@ public void validate(Object value) throws JsonProcessingException, ValidationExc primitiveValue = ((TextNode) value).asText(); } if (primitiveValue != NONE_MARKER) { - rawSchema().validate(primitiveValue); + schema.validate(primitiveValue); } else { Object jsonObject; if (value instanceof ArrayNode) { @@ -289,7 +294,7 @@ public void validate(Object value) throws JsonProcessingException, ValidationExc } else { jsonObject = objectMapper.convertValue(value, JSONObject.class); } - rawSchema().validate(jsonObject); + schema.validate(jsonObject); } } diff --git a/json-schema-serializer/src/main/java/io/confluent/kafka/serializers/json/AbstractKafkaJsonSchemaDeserializer.java b/json-schema-serializer/src/main/java/io/confluent/kafka/serializers/json/AbstractKafkaJsonSchemaDeserializer.java index df2caf2b330..428056fc8ae 100644 --- a/json-schema-serializer/src/main/java/io/confluent/kafka/serializers/json/AbstractKafkaJsonSchemaDeserializer.java +++ b/json-schema-serializer/src/main/java/io/confluent/kafka/serializers/json/AbstractKafkaJsonSchemaDeserializer.java @@ -24,6 +24,9 @@ import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.common.errors.InvalidConfigurationException; import org.apache.kafka.common.errors.SerializationException; +import org.everit.json.schema.CombinedSchema; +import org.everit.json.schema.ReferenceSchema; +import org.everit.json.schema.Schema; import org.everit.json.schema.ValidationException; import java.io.ByteArrayInputStream; @@ -115,8 +118,6 @@ protected Object deserialize( int length = buffer.limit() - 1 - idSize; int start = buffer.position() + buffer.arrayOffset(); - String typeName = schema.getString(typeProperty); - JsonNode jsonNode = null; if (validate) { try { @@ -135,19 +136,30 @@ protected Object deserialize( value = jsonNode != null ? objectMapper.convertValue(jsonNode, type) : objectMapper.readValue(buffer.array(), start, length, type); - } else if (typeName != null) { - value = jsonNode != null - ? deriveType(jsonNode, typeName) - : deriveType(buffer, length, start, typeName); - } else if (Object.class.equals(type)) { - value = jsonNode != null - ? objectMapper.convertValue(jsonNode, type) - : objectMapper.readValue(buffer.array(), start, length, type); } else { - // Return JsonNode if type is null - value = jsonNode != null - ? jsonNode - : objectMapper.readTree(new ByteArrayInputStream(buffer.array(), start, length)); + String typeName; + if (schema.rawSchema() instanceof CombinedSchema) { + if (jsonNode == null) { + jsonNode = objectMapper.readValue(buffer.array(), start, length, JsonNode.class); + } + typeName = getTypeName(schema.rawSchema(), jsonNode); + } else { + typeName = schema.getString(typeProperty); + } + if (typeName != null) { + value = jsonNode != null + ? deriveType(jsonNode, typeName) + : deriveType(buffer, length, start, typeName); + } else if (Object.class.equals(type)) { + value = jsonNode != null + ? objectMapper.convertValue(jsonNode, type) + : objectMapper.readValue(buffer.array(), start, length, type); + } else { + // Return JsonNode if type is null + value = jsonNode != null + ? jsonNode + : objectMapper.readTree(new ByteArrayInputStream(buffer.array(), start, length)); + } } if (includeSchemaAndVersion) { @@ -173,6 +185,26 @@ protected Object deserialize( } } + private String getTypeName(Schema schema, JsonNode jsonNode) { + if (schema instanceof CombinedSchema) { + for (Schema subschema : ((CombinedSchema) schema).getSubschemas()) { + boolean valid = false; + try { + JsonSchema.validate(subschema, jsonNode); + valid = true; + } catch (Exception e) { + // noop + } + if (valid) { + return getTypeName(subschema, jsonNode); + } + } + } else if (schema instanceof ReferenceSchema) { + return getTypeName(((ReferenceSchema)schema).getReferredSchema(), jsonNode); + } + return (String) schema.getUnprocessedProperties().get(typeProperty); + } + private Object deriveType( ByteBuffer buffer, int length, int start, String typeName ) throws IOException { diff --git a/json-schema-serializer/src/test/java/io/confluent/kafka/serializers/json/KafkaJsonSchemaSerializerTest.java b/json-schema-serializer/src/test/java/io/confluent/kafka/serializers/json/KafkaJsonSchemaSerializerTest.java index 42d64d995a2..d31d07d1dc5 100644 --- a/json-schema-serializer/src/test/java/io/confluent/kafka/serializers/json/KafkaJsonSchemaSerializerTest.java +++ b/json-schema-serializer/src/test/java/io/confluent/kafka/serializers/json/KafkaJsonSchemaSerializerTest.java @@ -16,9 +16,17 @@ package io.confluent.kafka.serializers.json; import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; import com.kjetland.jackson.jsonSchema.annotations.JsonSchemaInject; import com.kjetland.jackson.jsonSchema.annotations.JsonSchemaString; +import io.confluent.kafka.schemaregistry.annotations.SchemaReference; +import io.confluent.kafka.schemaregistry.json.JsonSchema; +import io.confluent.kafka.schemaregistry.json.JsonSchemaProvider; +import io.confluent.kafka.schemaregistry.json.JsonSchemaUtils; import java.time.LocalDate; +import java.util.Collections; +import java.util.List; import org.apache.kafka.common.errors.SerializationException; import org.junit.Test; @@ -41,6 +49,7 @@ public class KafkaJsonSchemaSerializerTest { private final Properties config; private final SchemaRegistryClient schemaRegistry; private KafkaJsonSchemaSerializer serializer; + private KafkaJsonSchemaSerializer latestSerializer; private KafkaJsonSchemaDeserializer deserializer; private final String topic; @@ -50,9 +59,16 @@ public KafkaJsonSchemaSerializerTest() { config.put(KafkaJsonSchemaSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, "bogus"); config.put(KafkaJsonSchemaSerializerConfig.FAIL_INVALID_SCHEMA, true); config.put(KafkaJsonSchemaSerializerConfig.WRITE_DATES_AS_ISO8601, true); - schemaRegistry = new MockSchemaRegistryClient(); + schemaRegistry = new MockSchemaRegistryClient( + Collections.singletonList(new JsonSchemaProvider())); serializer = new KafkaJsonSchemaSerializer<>(schemaRegistry, new HashMap(config)); deserializer = getDeserializer(Object.class); + Properties latestConfig = new Properties(config); + latestConfig.put(KafkaJsonSchemaSerializerConfig.AUTO_REGISTER_SCHEMAS, false); + latestConfig.put(KafkaJsonSchemaSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, "bogus"); + latestConfig.put(KafkaJsonSchemaSerializerConfig.USE_LATEST_VERSION, true); + latestConfig.put(KafkaJsonSchemaSerializerConfig.LATEST_COMPATIBILITY_STRICT, false); + latestSerializer = new KafkaJsonSchemaSerializer<>(schemaRegistry, new HashMap(latestConfig)); topic = "test"; } @@ -130,6 +146,84 @@ public void serializeInvalidUser() throws Exception { assertEquals(user, deserialized); } + @Test + public void serializeUserRef() throws Exception { + String schema = "{\n" + + " \"$schema\": \"http://json-schema.org/draft-07/schema#\",\n" + + " \"title\": \"Schema references\",\n" + + " \"description\": \"List of schema references for multiple types in a single topic\",\n" + + " \"oneOf\": [\n" + + " { \"$ref\": \"customer.json\"},\n" + + " { \"$ref\": \"user.json\"}\n" + + " ]\n" + + "}"; + + Customer customer = new Customer("acme", null); + User user = new User("john", "doe", (short) 50, "jack", null); + JsonSchema userSchema = JsonSchemaUtils.getSchema(user); + JsonSchema customerSchema = JsonSchemaUtils.getSchema(customer); + schemaRegistry.register("user", userSchema); + schemaRegistry.register("customer", customerSchema); + List refs = + ImmutableList.of( + new io.confluent.kafka.schemaregistry.client.rest.entities.SchemaReference( + "user.json", "user", 1), + new io.confluent.kafka.schemaregistry.client.rest.entities.SchemaReference( + "customer.json", "customer", 1)); + Map resolvedRefs = ImmutableMap.of( + "user.json", userSchema.canonicalString(), + "customer.json", customerSchema.canonicalString()); + JsonSchema jsonSchema = new JsonSchema(schema, refs, resolvedRefs, null); + schemaRegistry.register(topic + "-value", jsonSchema); + + byte[] bytes = latestSerializer.serialize(topic, user); + + // Test for javaType property + Object deserialized = getDeserializer(null).deserialize(topic, bytes); + assertEquals(user, deserialized); + + bytes = latestSerializer.serialize(topic, customer); + + // Test for javaType property + deserialized = getDeserializer(null).deserialize(topic, bytes); + assertEquals(customer, deserialized); + } + + // Generate javaType property + @JsonSchemaInject(strings = {@JsonSchemaString(path="javaType", + value="io.confluent.kafka.serializers.json.KafkaJsonSchemaSerializerTest$Customer")}) + public static class Customer { + @JsonProperty + public String customerName; + @JsonProperty + public LocalDate acquireDate; + + public Customer() {} + + public Customer(String customerName, LocalDate acquireDate) { + this.customerName = customerName; + this.acquireDate = acquireDate; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + Customer customer = (Customer) o; + return Objects.equals(customerName, customer.customerName) + && Objects.equals(acquireDate, customer.acquireDate); + } + + @Override + public int hashCode() { + return Objects.hash(customerName, acquireDate); + } + } + // Generate javaType property @JsonSchemaInject(strings = {@JsonSchemaString(path="javaType", value="io.confluent.kafka.serializers.json.KafkaJsonSchemaSerializerTest$User")})