diff --git a/instrumentation/elasticsearch/elasticsearch-api-client-7.16/javaagent/build.gradle.kts b/instrumentation/elasticsearch/elasticsearch-api-client-7.16/javaagent/build.gradle.kts index 01e2416df29d..1a1cca49c0aa 100644 --- a/instrumentation/elasticsearch/elasticsearch-api-client-7.16/javaagent/build.gradle.kts +++ b/instrumentation/elasticsearch/elasticsearch-api-client-7.16/javaagent/build.gradle.kts @@ -22,6 +22,8 @@ dependencies { testImplementation("com.fasterxml.jackson.core:jackson-databind:2.14.2") testImplementation("org.testcontainers:elasticsearch") + + latestDepTestLibrary("co.elastic.clients:elasticsearch-java:8.0.+") } tasks { diff --git a/instrumentation/reactor/reactor-kafka-1.0/javaagent/build.gradle.kts b/instrumentation/reactor/reactor-kafka-1.0/javaagent/build.gradle.kts index 68a9b40d28a1..43af8a95939b 100644 --- a/instrumentation/reactor/reactor-kafka-1.0/javaagent/build.gradle.kts +++ b/instrumentation/reactor/reactor-kafka-1.0/javaagent/build.gradle.kts @@ -22,9 +22,9 @@ dependencies { implementation(project(":instrumentation:kafka:kafka-clients:kafka-clients-common:library")) implementation(project(":instrumentation:reactor:reactor-3.1:library")) - // using 1.3.0 to be able to implement several new KafkaReceiver methods added in 1.3.3 + // using 1.3 to be able to implement several new KafkaReceiver methods added in 1.3.3 and 1.3.21 // @NoMuzzle is used to ensure that this does not break muzzle checks - compileOnly("io.projectreactor.kafka:reactor-kafka:1.3.3") + compileOnly("io.projectreactor.kafka:reactor-kafka:1.3.21") testInstrumentation(project(":instrumentation:kafka:kafka-clients:kafka-clients-0.11:javaagent")) testInstrumentation(project(":instrumentation:reactor:reactor-3.1:javaagent")) @@ -60,6 +60,27 @@ testing { } } } + + val testV1_3_21 by registering(JvmTestSuite::class) { + dependencies { + implementation(project(":instrumentation:reactor:reactor-kafka-1.0:testing")) + + if (testLatestDeps) { + implementation("io.projectreactor.kafka:reactor-kafka:+") + implementation("io.projectreactor:reactor-core:3.4.+") + } else { + implementation("io.projectreactor.kafka:reactor-kafka:1.3.21") + } + } + + targets { + all { + testTask.configure { + systemProperty("hasConsumerGroupAndId", true) + } + } + } + } } } diff --git a/instrumentation/reactor/reactor-kafka-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactor/kafka/v1_0/InstrumentedKafkaReceiver.java b/instrumentation/reactor/reactor-kafka-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactor/kafka/v1_0/InstrumentedKafkaReceiver.java index 62b827f2fdf7..a8342732177d 100644 --- a/instrumentation/reactor/reactor-kafka-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactor/kafka/v1_0/InstrumentedKafkaReceiver.java +++ b/instrumentation/reactor/reactor-kafka-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactor/kafka/v1_0/InstrumentedKafkaReceiver.java @@ -25,7 +25,7 @@ public InstrumentedKafkaReceiver(KafkaReceiver actual) { // added in 1.3.3 @Override public Flux> receive(Integer prefetch) { - return wrap(KafkaReceiver133Access.receive(actual, prefetch)); + return wrap(KafkaReceiver13Access.receive(actual, prefetch)); } @Override @@ -36,7 +36,7 @@ public Flux> receive() { // added in 1.3.3 @Override public Flux>> receiveAutoAck(Integer prefetch) { - return KafkaReceiver133Access.receiveAutoAck(actual, prefetch) + return KafkaReceiver13Access.receiveAutoAck(actual, prefetch) .map(InstrumentedKafkaReceiver::wrap); } @@ -48,7 +48,7 @@ public Flux>> receiveAutoAck() { // added in 1.3.3 @Override public Flux> receiveAtmostOnce(Integer prefetch) { - return wrap(KafkaReceiver133Access.receiveAtmostOnce(actual, prefetch)); + return wrap(KafkaReceiver13Access.receiveAtmostOnce(actual, prefetch)); } @Override @@ -66,7 +66,7 @@ public Flux>> receiveExactlyOnce( @Override public Flux>> receiveExactlyOnce( TransactionManager transactionManager, Integer prefetch) { - return KafkaReceiver133Access.receiveExactlyOnce(actual, transactionManager, prefetch) + return KafkaReceiver13Access.receiveExactlyOnce(actual, transactionManager, prefetch) .map(InstrumentedKafkaReceiver::wrap); } @@ -75,6 +75,19 @@ public Mono doOnConsumer(Function, ? extends T> function) return actual.doOnConsumer(function); } + // added in 1.3.21 + @Override + public Flux>> receiveBatch(Integer prefetch) { + return KafkaReceiver13Access.receiveBatch(actual, prefetch) + .map(InstrumentedKafkaReceiver::wrap); + } + + // added in 1.3.21 + @Override + public Flux>> receiveBatch() { + return KafkaReceiver13Access.receiveBatch(actual).map(InstrumentedKafkaReceiver::wrap); + } + private static > Flux wrap(Flux flux) { return flux instanceof InstrumentedKafkaFlux ? flux : new InstrumentedKafkaFlux<>(flux); } diff --git a/instrumentation/reactor/reactor-kafka-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactor/kafka/v1_0/KafkaReceiver133Access.java b/instrumentation/reactor/reactor-kafka-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactor/kafka/v1_0/KafkaReceiver13Access.java similarity index 75% rename from instrumentation/reactor/reactor-kafka-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactor/kafka/v1_0/KafkaReceiver133Access.java rename to instrumentation/reactor/reactor-kafka-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactor/kafka/v1_0/KafkaReceiver13Access.java index 2e21f39d8829..de50fa9b353e 100644 --- a/instrumentation/reactor/reactor-kafka-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactor/kafka/v1_0/KafkaReceiver133Access.java +++ b/instrumentation/reactor/reactor-kafka-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactor/kafka/v1_0/KafkaReceiver13Access.java @@ -12,7 +12,7 @@ import reactor.kafka.receiver.ReceiverRecord; import reactor.kafka.sender.TransactionManager; -final class KafkaReceiver133Access { +final class KafkaReceiver13Access { @NoMuzzle static Flux> receive(KafkaReceiver receiver, Integer prefetch) { @@ -37,5 +37,16 @@ static Flux>> receiveExactlyOnce( return receiver.receiveExactlyOnce(transactionManager, prefetch); } - private KafkaReceiver133Access() {} + @NoMuzzle + static Flux>> receiveBatch( + KafkaReceiver receiver, Integer prefetch) { + return receiver.receiveBatch(prefetch); + } + + @NoMuzzle + static Flux>> receiveBatch(KafkaReceiver receiver) { + return receiver.receiveBatch(); + } + + private KafkaReceiver13Access() {} } diff --git a/instrumentation/reactor/reactor-kafka-1.0/javaagent/src/testV1_3_21/java/io/opentelemetry/javaagent/instrumentation/reactor/kafka/v1_0/ReactorKafka1321InstrumentationTest.java b/instrumentation/reactor/reactor-kafka-1.0/javaagent/src/testV1_3_21/java/io/opentelemetry/javaagent/instrumentation/reactor/kafka/v1_0/ReactorKafka1321InstrumentationTest.java new file mode 100644 index 000000000000..c146c009cb5f --- /dev/null +++ b/instrumentation/reactor/reactor-kafka-1.0/javaagent/src/testV1_3_21/java/io/opentelemetry/javaagent/instrumentation/reactor/kafka/v1_0/ReactorKafka1321InstrumentationTest.java @@ -0,0 +1,33 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.reactor.kafka.v1_0; + +import org.junit.jupiter.api.Test; + +class ReactorKafka1321InstrumentationTest extends AbstractReactorKafkaTest { + + @Test + void receiveBatch() { + testSingleRecordProcess( + recordConsumer -> + receiver + .receiveBatch() + .concatMap(r -> r) + .doOnNext(r -> r.receiverOffset().acknowledge()) + .subscribe(recordConsumer)); + } + + @Test + void receiveBatchWithSize() { + testSingleRecordProcess( + recordConsumer -> + receiver + .receiveBatch(1) + .concatMap(r -> r) + .doOnNext(r -> r.receiverOffset().acknowledge()) + .subscribe(recordConsumer)); + } +}