Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Schema registry protobuf compatibility issue #3633

Closed
zimbatm opened this issue Jan 27, 2022 · 4 comments · Fixed by #3663 or #5568
Closed

Schema registry protobuf compatibility issue #3633

zimbatm opened this issue Jan 27, 2022 · 4 comments · Fixed by #3663 or #5568
Assignees
Labels
area/schema-registry Schema Registry service within Redpanda community kind/bug Something isn't working
Milestone

Comments

@zimbatm
Copy link
Contributor

zimbatm commented Jan 27, 2022

Version & Environment

Redpanda version: 21.11.3 (inside of Docker)

What went wrong?

When switching from the cp-schema-registry to the redpanda one, our software encounters some issues. The full log is below but I believe there is a hint in this part:

Caused by: java.io.IOException: Invalid schema syntax = "proto3";
package mycompany.public.config_sender_deny_list;
message Key {
  string sender = 1;
}
 with refs [] of type AVRO

"of type AVRO", but we only use protobuf in the software.

What should have happened instead?

Be compatible with cp-schema-registry.

How to reproduce the issue?

I don't have a minimal repro yet. If the issue is not obvious on your side, I'm happy to try and produce one for you.

Additional information

CLICK ME - full log

imestamp=14:03:58 level=INFO traceId= parentId= spanId= class=co.ma.co.DebeziumCacheManager thread=executor-thread-2 Assigning topic partitions = [config_sender_deny_list-0] exception= 
timestamp=14:03:58 level=INFO traceId= parentId= spanId= class=co.ma.co.DebeziumCacheManager thread=executor-thread-1 message="Catching up" targetEndOffsets={config_sender_deny_list-0=9} exception= 
timestamp=14:03:58 level=ERROR traceId= parentId= spanId= class=io.co.ka.sc.cl.CachedSchemaRegistryClient thread=executor-thread-1 Invalid schema type AVRO exception= 
timestamp=14:03:58 level=ERROR traceId= parentId= spanId= class=co.ma.co.DebeziumCacheManager thread=executor-thread-2 Failure exception=: org.apache.kafka.common.errors.RecordDeserializationException: Error deserializing key/value for partition config_sender_deny_list-0 at offset 0. If needed, please seek past the record to continue consumption.
	at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:1429)
	at org.apache.kafka.clients.consumer.internals.Fetcher.access$3400(Fetcher.java:134)
	at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.fetchRecords(Fetcher.java:1652)
	at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.access$1800(Fetcher.java:1488)
	at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:721)
	at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:672)
	at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1304)
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1238)
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1211)
	at com.mycompany.config.DebeziumCacheManager.poll$lambda-22(DebeziumCacheManager.kt:177)
	at io.smallrye.context.impl.wrappers.SlowContextualSupplier.get(SlowContextualSupplier.java:21)
	at io.smallrye.mutiny.operators.uni.builders.UniCreateFromItemSupplier.subscribe(UniCreateFromItemSupplier.java:28)
	at io.smallrye.mutiny.operators.AbstractUni.subscribe(AbstractUni.java:36)
	at io.smallrye.mutiny.operators.uni.UniEmitOn.subscribe(UniEmitOn.java:22)
	at io.smallrye.mutiny.operators.AbstractUni.subscribe(AbstractUni.java:36)
	at io.smallrye.mutiny.operators.uni.UniRunSubscribeOn.lambda$subscribe$0(UniRunSubscribeOn.java:27)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
	at io.quarkus.vertx.core.runtime.VertxCoreRecorder$13.runWith(VertxCoreRecorder.java:548)
	at org.jboss.threads.EnhancedQueueExecutor$Task.run(EnhancedQueueExecutor.java:2449)
	at org.jboss.threads.EnhancedQueueExecutor$ThreadBody.run(EnhancedQueueExecutor.java:1478)
	at org.jboss.threads.DelegatingRunnable.run(DelegatingRunnable.java:29)
	at org.jboss.threads.ThreadLocalResettingRunnable.run(ThreadLocalResettingRunnable.java:29)
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Protobuf message for id 3
	at com.mycompany.kafka.serialization.AbstractKafkaProtobufDeserializer.deserialize(AbstractKafkaProtobufDeserializer.java:159)
	at com.mycompany.kafka.serialization.AbstractKafkaProtobufDeserializer.deserialize(AbstractKafkaProtobufDeserializer.java:84)
	at com.mycompany.kafka.serialization.KafkaProtobufDeserializer.deserialize(KafkaProtobufDeserializer.java:59)
	at com.mycompany.kafka.serialization.KafkaProtobufDeserializer.deserialize(KafkaProtobufDeserializer.java:11)
	at org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:60)
	at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:1417)
	... 25 more
Caused by: java.io.IOException: Invalid schema syntax = "proto3";
package mycompany.public.config_sender_deny_list;
message Key {
  string sender = 1;
}
 with refs [] of type AVRO
	at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.lambda$getSchemaByIdFromRegistry$6(CachedSchemaRegistryClient.java:288)
	at java.base/java.util.Optional.orElseThrow(Optional.java:403)
	at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaByIdFromRegistry(CachedSchemaRegistryClient.java:286)
	at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaBySubjectAndId(CachedSchemaRegistryClient.java:406)
	at com.mycompany.kafka.serialization.AbstractKafkaProtobufDeserializer.deserialize(AbstractKafkaProtobufDeserializer.java:110)
	... 30 more
 
timestamp=14:03:58 level=ERROR traceId= parentId= spanId= class=co.ma.co.DebeziumCacheManager thread=executor-thread-2 Processing failure exception=: org.apache.kafka.common.errors.RecordDeserializationException: Error deserializing key/value for partition config_sender_deny_list-0 at offset 0. If needed, please seek past the record to continue consumption.
	at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:1429)
	at org.apache.kafka.clients.consumer.internals.Fetcher.access$3400(Fetcher.java:134)
	at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.fetchRecords(Fetcher.java:1652)
	at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.access$1800(Fetcher.java:1488)
	at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:721)
	at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:672)
	at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1304)
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1238)
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1211)
	at com.mycompany.config.DebeziumCacheManager.poll$lambda-22(DebeziumCacheManager.kt:177)
	at io.smallrye.context.impl.wrappers.SlowContextualSupplier.get(SlowContextualSupplier.java:21)
	at io.smallrye.mutiny.operators.uni.builders.UniCreateFromItemSupplier.subscribe(UniCreateFromItemSupplier.java:28)
	at io.smallrye.mutiny.operators.AbstractUni.subscribe(AbstractUni.java:36)
	at io.smallrye.mutiny.operators.uni.UniEmitOn.subscribe(UniEmitOn.java:22)
	at io.smallrye.mutiny.operators.AbstractUni.subscribe(AbstractUni.java:36)
	at io.smallrye.mutiny.operators.uni.UniRunSubscribeOn.lambda$subscribe$0(UniRunSubscribeOn.java:27)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
	at io.quarkus.vertx.core.runtime.VertxCoreRecorder$13.runWith(VertxCoreRecorder.java:548)
	at org.jboss.threads.EnhancedQueueExecutor$Task.run(EnhancedQueueExecutor.java:2449)
	at org.jboss.threads.EnhancedQueueExecutor$ThreadBody.run(EnhancedQueueExecutor.java:1478)
	at org.jboss.threads.DelegatingRunnable.run(DelegatingRunnable.java:29)
	at org.jboss.threads.ThreadLocalResettingRunnable.run(ThreadLocalResettingRunnable.java:29)
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Protobuf message for id 3
	at com.mycompany.kafka.serialization.AbstractKafkaProtobufDeserializer.deserialize(AbstractKafkaProtobufDeserializer.java:159)
	at com.mycompany.kafka.serialization.AbstractKafkaProtobufDeserializer.deserialize(AbstractKafkaProtobufDeserializer.java:84)
	at com.mycompany.kafka.serialization.KafkaProtobufDeserializer.deserialize(KafkaProtobufDeserializer.java:59)
	at com.mycompany.kafka.serialization.KafkaProtobufDeserializer.deserialize(KafkaProtobufDeserializer.java:11)
	at org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:60)
	at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:1417)
	... 25 more
Caused by: java.io.IOException: Invalid schema syntax = "proto3";
package mycompany.public.config_sender_deny_list;
message Key {
  string sender = 1;
}
 with refs [] of type AVRO
	at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.lambda$getSchemaByIdFromRegistry$6(CachedSchemaRegistryClient.java:288)
	at java.base/java.util.Optional.orElseThrow(Optional.java:403)
	at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaByIdFromRegistry(CachedSchemaRegistryClient.java:286)
	at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaBySubjectAndId(CachedSchemaRegistryClient.java:406)
	at com.mycompany.kafka.serialization.AbstractKafkaProtobufDeserializer.deserialize(AbstractKafkaProtobufDeserializer.java:110)
	... 30 more
 
timestamp=14:04:34 level=INFO traceId= parentId= spanId= class=co.ma.rp.pr.GasPricesRebalanceListener thread=smallrye-kafka-consumer-thread-0 message="Seeking" topic=eth_gas_prices partition=0 offset=0 exception= 

@zimbatm zimbatm added the kind/bug Something isn't working label Jan 27, 2022
@zimbatm zimbatm changed the title Schema registry protobug compatibility issue Schema registry protobuf compatibility issue Jan 27, 2022
@BenPope BenPope self-assigned this Jan 27, 2022
@BenPope BenPope added the area/schema-registry Schema Registry service within Redpanda label Jan 27, 2022
@BenPope
Copy link
Member

BenPope commented Jan 28, 2022

There''s a couple of issues here:

  • The error is wrong - it's essentially returning a default constructed schema, which has type avro.
  • There's not currently support for a protobuf encoded protobuf FileDescriptor, which is what the serializer sends.

I have a proof of concept working on a branch. I will tidy it up and add tests.

@BenPope BenPope added this to the v21.11.4 milestone Jan 28, 2022
BenPope added a commit to BenPope/redpanda-dxfeed-financial-data that referenced this issue Jan 29, 2022
Requires fix for: redpanda-data/redpanda#3633

Signed-off-by: Ben Pope <ben@vectorized.io>
BenPope added a commit to BenPope/redpanda that referenced this issue Jan 31, 2022
Support decoding a protobuf FileDescriptor as encoded protobuf.

Fix redpanda-data#3633

Signed-off-by: Ben Pope <ben@vectorized.io>
BenPope added a commit to BenPope/redpanda that referenced this issue Feb 2, 2022
Support decoding a protobuf FileDescriptor as encoded protobuf.

Fix redpanda-data#3633

Signed-off-by: Ben Pope <ben@vectorized.io>
(cherry picked from commit d74b9e5)
ajfabbri pushed a commit to ajfabbri/redpanda that referenced this issue Feb 3, 2022
Support decoding a protobuf FileDescriptor as encoded protobuf.

Fix redpanda-data#3633

Signed-off-by: Ben Pope <ben@vectorized.io>
@zimbatm
Copy link
Contributor Author

zimbatm commented Feb 7, 2022

Testing with v21.11.4-beta1, I am still getting the same issue. I would like to send a repro but that requires sharing the whole Java program, unfortunately.

@JuxhinDB
Copy link

This is also being replicated on v22.1.5 with the same behaviour as described in the issue. Some details after debugging this that may help.

We noticed that despite pushing a valid proto3 schema to the schema registry, the sink connector was parsing this with the schema type set to AVRO which then subsequently fails to parse the schema (as its protobuf). Likely this is because the connector is silently failing and defaulting to AVRO on failure:

https://github.com/confluentinc/schema-registry/blob/master/client/src/main/java/io/confluent/kafka/schemaregistry/client/CachedSchemaRegistryClient.java#L269-L281

@BenPope
Copy link
Member

BenPope commented Jul 21, 2022

Hi @JuxhinDB - do you think you could send me a repro? Perhaps the schema would be useful. If it's sensitive you can email me or DM in slack: https://redpanda.com/slack

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area/schema-registry Schema Registry service within Redpanda community kind/bug Something isn't working
Projects
None yet
4 participants