Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Transactions on Kafka are blocked when using instrumentation for Reactor Kafka receiver with receiveExactlyOnce #10330

Closed
lukny opened this issue Jan 24, 2024 · 0 comments · Fixed by #10333
Labels
bug Something isn't working needs triage New issue that requires triage

Comments

@lukny
Copy link

lukny commented Jan 24, 2024

Describe the bug

Transactions on Kafka are blocked when using kafkaReceiver.receiveExactlyOnce(transactionManager) from Reactor Kafka Reciever with instrumentation attached from Java agent.
Temporary workaround is to use another method signature with two arguments kafkaReceiver.receiveExactlyOnce(transactionManager, null)__

Steps to reproduce

  1. Setup Java project with library io.projectreactor.kafka:reactor-kafka:1.3.22and Apache Kafka
  2. Configure KafkaSender and KafkaReceiver instances
  3. Use receiveExactlyOnce(transactionManager) method to create transactions between consumer and producer, for example:
public void startConsuming() {
        kafkaReceiver
            .receiveExactlyOnce(kafkaProducer.transactionManager())
            .concatMap(consumerRecordFlux -> kafkaProducer.send(
                    handleMessages(consumerRecordFlux).map(record -> SenderRecord.create(record, record.key())))
                .concatWith(kafkaProducer.transactionManager().commit())
            )
            .onErrorResume(t -> kafkaProducer.transactionManager().abort().then(Mono.error(t)))
            .subscribe();
    }

Expected behavior

Transaction between consumer and producer works uninterrupted by instrumentation when using receiveExactlyOnce(transactionManager).

Actual behavior

Consumer and producer transactions cannot start because the instrumentation code blocks it.

Javaagent or library instrumentation version

at least higher than 1.30, Javaagent

Environment

JDK: 17 and 21
OS: MacOS Sonoma

Additional context

Implementation of method

public Flux<Flux<ConsumerRecord<K, V>>> receiveExactlyOnce(TransactionManager transactionManager) {
    return actual.receiveAutoAck().map(InstrumentedKafkaReceiver::wrap);
  }

uses receiveAutoAck() instead of receiveExactlyOnce()

@lukny lukny added bug Something isn't working needs triage New issue that requires triage labels Jan 24, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working needs triage New issue that requires triage
Projects
None yet
Development

Successfully merging a pull request may close this issue.

1 participant