From 34e019f9d3d42e77395e193aba4125b46f396b8f Mon Sep 17 00:00:00 2001 From: Mateusz Rzeszutek Date: Fri, 19 May 2023 14:22:01 +0200 Subject: [PATCH 1/2] Implement support for reactor-kafka 1.3+ --- .../javaagent/build.gradle.kts | 49 +++++-- .../v1_0/ConsumerHandlerInstrumentation.java | 43 ++++++ .../kafka/v1_0/InstrumentedKafkaReceiver.java | 39 ++++- .../kafka/v1_0/KafkaReceiver133Access.java | 41 ++++++ .../ReactorKafkaInstrumentationModule.java | 3 +- .../v1_0/ReactorKafkaInstrumentationTest.java | 66 ++------- .../ReactorKafka133InstrumentationTest.java | 29 ++++ .../testing/build.gradle.kts | 10 ++ .../kafka/v1_0/AbstractReactorKafkaTest.java | 136 +++++++++++++++--- settings.gradle.kts | 1 + 10 files changed, 331 insertions(+), 86 deletions(-) create mode 100644 instrumentation/reactor/reactor-kafka-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactor/kafka/v1_0/ConsumerHandlerInstrumentation.java create mode 100644 instrumentation/reactor/reactor-kafka-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactor/kafka/v1_0/KafkaReceiver133Access.java create mode 100644 instrumentation/reactor/reactor-kafka-1.0/javaagent/src/testV1_3_3/java/io/opentelemetry/javaagent/instrumentation/reactor/kafka/v1_0/ReactorKafka133InstrumentationTest.java create mode 100644 instrumentation/reactor/reactor-kafka-1.0/testing/build.gradle.kts rename instrumentation/reactor/reactor-kafka-1.0/{javaagent/src/test => testing/src/main}/java/io/opentelemetry/javaagent/instrumentation/reactor/kafka/v1_0/AbstractReactorKafkaTest.java (52%) diff --git a/instrumentation/reactor/reactor-kafka-1.0/javaagent/build.gradle.kts b/instrumentation/reactor/reactor-kafka-1.0/javaagent/build.gradle.kts index 4f4e4a23a4eb..68a9b40d28a1 100644 --- a/instrumentation/reactor/reactor-kafka-1.0/javaagent/build.gradle.kts +++ b/instrumentation/reactor/reactor-kafka-1.0/javaagent/build.gradle.kts @@ -6,12 +6,14 @@ muzzle { pass { group.set("io.projectreactor.kafka") module.set("reactor-kafka") - // TODO: add support for 1.3 - versions.set("[1.0.0,1.3.0)") + versions.set("[1.0.0,)") + assertInverse.set(true) } } dependencies { + compileOnly(project(":muzzle")) + compileOnly("com.google.auto.value:auto-value-annotations") annotationProcessor("com.google.auto.value:auto-value") @@ -20,28 +22,59 @@ dependencies { implementation(project(":instrumentation:kafka:kafka-clients:kafka-clients-common:library")) implementation(project(":instrumentation:reactor:reactor-3.1:library")) - library("io.projectreactor.kafka:reactor-kafka:1.0.0.RELEASE") + // using 1.3.0 to be able to implement several new KafkaReceiver methods added in 1.3.3 + // @NoMuzzle is used to ensure that this does not break muzzle checks + compileOnly("io.projectreactor.kafka:reactor-kafka:1.3.3") testInstrumentation(project(":instrumentation:kafka:kafka-clients:kafka-clients-0.11:javaagent")) testInstrumentation(project(":instrumentation:reactor:reactor-3.1:javaagent")) - testImplementation("org.testcontainers:kafka") + testImplementation(project(":instrumentation:reactor:reactor-kafka-1.0:testing")) - testLibrary("io.projectreactor:reactor-test:3.1.0.RELEASE") + testLibrary("io.projectreactor.kafka:reactor-kafka:1.0.0.RELEASE") latestDepTestLibrary("io.projectreactor:reactor-core:3.4.+") - // TODO: add support for 1.3 - latestDepTestLibrary("io.projectreactor.kafka:reactor-kafka:1.2.+") +} + +val testLatestDeps = findProperty("testLatestDeps") as Boolean + +testing { + suites { + val testV1_3_3 by registering(JvmTestSuite::class) { + dependencies { + implementation(project(":instrumentation:reactor:reactor-kafka-1.0:testing")) + + if (testLatestDeps) { + implementation("io.projectreactor.kafka:reactor-kafka:+") + implementation("io.projectreactor:reactor-core:3.4.+") + } else { + implementation("io.projectreactor.kafka:reactor-kafka:1.3.3") + } + } + + targets { + all { + testTask.configure { + systemProperty("hasConsumerGroupAndId", true) + } + } + } + } + } } tasks { - test { + withType().configureEach { usesService(gradle.sharedServices.registrations["testcontainersBuildService"].service) jvmArgs("-Dotel.instrumentation.kafka.experimental-span-attributes=true") jvmArgs("-Dotel.instrumentation.messaging.experimental.receive-telemetry.enabled=true") } + test { + systemProperty("hasConsumerGroupAndId", testLatestDeps) + } + check { dependsOn(testing.suites) } diff --git a/instrumentation/reactor/reactor-kafka-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactor/kafka/v1_0/ConsumerHandlerInstrumentation.java b/instrumentation/reactor/reactor-kafka-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactor/kafka/v1_0/ConsumerHandlerInstrumentation.java new file mode 100644 index 000000000000..f2857879ba74 --- /dev/null +++ b/instrumentation/reactor/reactor-kafka-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactor/kafka/v1_0/ConsumerHandlerInstrumentation.java @@ -0,0 +1,43 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.reactor.kafka.v1_0; + +import static net.bytebuddy.matcher.ElementMatchers.named; +import static net.bytebuddy.matcher.ElementMatchers.returns; + +import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; +import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer; +import net.bytebuddy.asm.Advice; +import net.bytebuddy.description.type.TypeDescription; +import net.bytebuddy.matcher.ElementMatcher; +import reactor.core.publisher.Flux; + +// handles versions 1.3.+ +public class ConsumerHandlerInstrumentation implements TypeInstrumentation { + + @Override + public ElementMatcher typeMatcher() { + return named("reactor.kafka.receiver.internals.ConsumerHandler"); + } + + @Override + public void transform(TypeTransformer transformer) { + transformer.applyAdviceToMethod( + named("receive").and(returns(named("reactor.core.publisher.Flux"))), + this.getClass().getName() + "$ReceiveAdvice"); + } + + @SuppressWarnings("unused") + public static class ReceiveAdvice { + + @Advice.OnMethodExit(suppress = Throwable.class) + public static void onExit(@Advice.Return(readOnly = false) Flux flux) { + if (!(flux instanceof TracingDisablingKafkaFlux)) { + flux = new TracingDisablingKafkaFlux<>(flux); + } + } + } +} diff --git a/instrumentation/reactor/reactor-kafka-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactor/kafka/v1_0/InstrumentedKafkaReceiver.java b/instrumentation/reactor/reactor-kafka-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactor/kafka/v1_0/InstrumentedKafkaReceiver.java index 2c24c0095246..62b827f2fdf7 100644 --- a/instrumentation/reactor/reactor-kafka-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactor/kafka/v1_0/InstrumentedKafkaReceiver.java +++ b/instrumentation/reactor/reactor-kafka-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactor/kafka/v1_0/InstrumentedKafkaReceiver.java @@ -22,29 +22,60 @@ public InstrumentedKafkaReceiver(KafkaReceiver actual) { this.actual = actual; } + // added in 1.3.3 + @Override + public Flux> receive(Integer prefetch) { + return wrap(KafkaReceiver133Access.receive(actual, prefetch)); + } + @Override public Flux> receive() { - return new InstrumentedKafkaFlux<>(actual.receive()); + return wrap(actual.receive()); + } + + // added in 1.3.3 + @Override + public Flux>> receiveAutoAck(Integer prefetch) { + return KafkaReceiver133Access.receiveAutoAck(actual, prefetch) + .map(InstrumentedKafkaReceiver::wrap); } @Override public Flux>> receiveAutoAck() { - return actual.receiveAutoAck().map(InstrumentedKafkaFlux::new); + return actual.receiveAutoAck().map(InstrumentedKafkaReceiver::wrap); + } + + // added in 1.3.3 + @Override + public Flux> receiveAtmostOnce(Integer prefetch) { + return wrap(KafkaReceiver133Access.receiveAtmostOnce(actual, prefetch)); } @Override public Flux> receiveAtmostOnce() { - return new InstrumentedKafkaFlux<>(actual.receiveAtmostOnce()); + return wrap(actual.receiveAtmostOnce()); } @Override public Flux>> receiveExactlyOnce( TransactionManager transactionManager) { - return actual.receiveAutoAck().map(InstrumentedKafkaFlux::new); + return actual.receiveAutoAck().map(InstrumentedKafkaReceiver::wrap); + } + + // added in 1.3.3 + @Override + public Flux>> receiveExactlyOnce( + TransactionManager transactionManager, Integer prefetch) { + return KafkaReceiver133Access.receiveExactlyOnce(actual, transactionManager, prefetch) + .map(InstrumentedKafkaReceiver::wrap); } @Override public Mono doOnConsumer(Function, ? extends T> function) { return actual.doOnConsumer(function); } + + private static > Flux wrap(Flux flux) { + return flux instanceof InstrumentedKafkaFlux ? flux : new InstrumentedKafkaFlux<>(flux); + } } diff --git a/instrumentation/reactor/reactor-kafka-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactor/kafka/v1_0/KafkaReceiver133Access.java b/instrumentation/reactor/reactor-kafka-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactor/kafka/v1_0/KafkaReceiver133Access.java new file mode 100644 index 000000000000..2e21f39d8829 --- /dev/null +++ b/instrumentation/reactor/reactor-kafka-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactor/kafka/v1_0/KafkaReceiver133Access.java @@ -0,0 +1,41 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.reactor.kafka.v1_0; + +import io.opentelemetry.javaagent.tooling.muzzle.NoMuzzle; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import reactor.core.publisher.Flux; +import reactor.kafka.receiver.KafkaReceiver; +import reactor.kafka.receiver.ReceiverRecord; +import reactor.kafka.sender.TransactionManager; + +final class KafkaReceiver133Access { + + @NoMuzzle + static Flux> receive(KafkaReceiver receiver, Integer prefetch) { + return receiver.receive(prefetch); + } + + @NoMuzzle + static Flux>> receiveAutoAck( + KafkaReceiver receiver, Integer prefetch) { + return receiver.receiveAutoAck(prefetch); + } + + @NoMuzzle + static Flux> receiveAtmostOnce( + KafkaReceiver receiver, Integer prefetch) { + return receiver.receiveAtmostOnce(prefetch); + } + + @NoMuzzle + static Flux>> receiveExactlyOnce( + KafkaReceiver receiver, TransactionManager transactionManager, Integer prefetch) { + return receiver.receiveExactlyOnce(transactionManager, prefetch); + } + + private KafkaReceiver133Access() {} +} diff --git a/instrumentation/reactor/reactor-kafka-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactor/kafka/v1_0/ReactorKafkaInstrumentationModule.java b/instrumentation/reactor/reactor-kafka-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactor/kafka/v1_0/ReactorKafkaInstrumentationModule.java index 43f19a9b1ceb..47524c7fec23 100644 --- a/instrumentation/reactor/reactor-kafka-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactor/kafka/v1_0/ReactorKafkaInstrumentationModule.java +++ b/instrumentation/reactor/reactor-kafka-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactor/kafka/v1_0/ReactorKafkaInstrumentationModule.java @@ -24,6 +24,7 @@ public List typeInstrumentations() { return asList( new KafkaReceiverInstrumentation(), new ReceiverRecordInstrumentation(), - new DefaultKafkaReceiverInstrumentation()); + new DefaultKafkaReceiverInstrumentation(), + new ConsumerHandlerInstrumentation()); } } diff --git a/instrumentation/reactor/reactor-kafka-1.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/reactor/kafka/v1_0/ReactorKafkaInstrumentationTest.java b/instrumentation/reactor/reactor-kafka-1.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/reactor/kafka/v1_0/ReactorKafkaInstrumentationTest.java index 65a3f77dc2f7..c04cf040fe42 100644 --- a/instrumentation/reactor/reactor-kafka-1.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/reactor/kafka/v1_0/ReactorKafkaInstrumentationTest.java +++ b/instrumentation/reactor/reactor-kafka-1.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/reactor/kafka/v1_0/ReactorKafkaInstrumentationTest.java @@ -5,63 +5,25 @@ package io.opentelemetry.javaagent.instrumentation.reactor.kafka.v1_0; -import static io.opentelemetry.instrumentation.testing.util.TelemetryDataUtil.orderByRootSpanKind; - -import io.opentelemetry.api.trace.SpanKind; -import io.opentelemetry.instrumentation.testing.internal.AutoCleanupExtension; -import io.opentelemetry.sdk.trace.data.LinkData; -import io.opentelemetry.sdk.trace.data.SpanData; -import java.time.Duration; -import java.util.concurrent.atomic.AtomicReference; import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.RegisterExtension; -import reactor.core.Disposable; -import reactor.core.publisher.Flux; -import reactor.kafka.sender.SenderRecord; - -public class ReactorKafkaInstrumentationTest extends AbstractReactorKafkaTest { - @RegisterExtension static final AutoCleanupExtension cleanup = AutoCleanupExtension.create(); +class ReactorKafkaInstrumentationTest extends AbstractReactorKafkaTest { @Test - void shouldCreateSpansForSingleRecordProcess() { - Disposable disposable = - receiver.receive().subscribe(record -> testing.runWithSpan("consumer", () -> {})); - cleanup.deferCleanup(disposable::dispose); - - SenderRecord record = - SenderRecord.create("testTopic", 0, null, "10", "testSpan", null); - Flux producer = sender.send(Flux.just(record)); - testing.runWithSpan("producer", () -> producer.blockLast(Duration.ofSeconds(30))); - - AtomicReference producerSpan = new AtomicReference<>(); + void testReceive() { + testSingleRecordProcess(recordConsumer -> receiver.receive().subscribe(recordConsumer)); + } - testing.waitAndAssertSortedTraces( - orderByRootSpanKind(SpanKind.INTERNAL, SpanKind.CONSUMER), - trace -> { - trace.hasSpansSatisfyingExactly( - span -> span.hasName("producer"), - span -> - span.hasName("testTopic send") - .hasKind(SpanKind.PRODUCER) - .hasParent(trace.getSpan(0)) - .hasAttributesSatisfyingExactly(sendAttributes(record))); + @Test + void testReceiveAutoAck() { + testSingleRecordProcess( + recordConsumer -> + receiver.receiveAutoAck().subscribe(records -> records.subscribe(recordConsumer))); + } - producerSpan.set(trace.getSpan(1)); - }, - trace -> - trace.hasSpansSatisfyingExactly( - span -> - span.hasName("testTopic receive") - .hasKind(SpanKind.CONSUMER) - .hasNoParent() - .hasAttributesSatisfyingExactly(receiveAttributes("testTopic")), - span -> - span.hasName("testTopic process") - .hasKind(SpanKind.CONSUMER) - .hasParent(trace.getSpan(0)) - .hasLinks(LinkData.create(producerSpan.get().getSpanContext())) - .hasAttributesSatisfyingExactly(processAttributes(record)), - span -> span.hasName("consumer").hasParent(trace.getSpan(1)))); + @Test + void testReceiveAtMostOnce() { + testSingleRecordProcess( + recordConsumer -> receiver.receiveAtmostOnce().subscribe(recordConsumer)); } } diff --git a/instrumentation/reactor/reactor-kafka-1.0/javaagent/src/testV1_3_3/java/io/opentelemetry/javaagent/instrumentation/reactor/kafka/v1_0/ReactorKafka133InstrumentationTest.java b/instrumentation/reactor/reactor-kafka-1.0/javaagent/src/testV1_3_3/java/io/opentelemetry/javaagent/instrumentation/reactor/kafka/v1_0/ReactorKafka133InstrumentationTest.java new file mode 100644 index 000000000000..3eed937fca28 --- /dev/null +++ b/instrumentation/reactor/reactor-kafka-1.0/javaagent/src/testV1_3_3/java/io/opentelemetry/javaagent/instrumentation/reactor/kafka/v1_0/ReactorKafka133InstrumentationTest.java @@ -0,0 +1,29 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.reactor.kafka.v1_0; + +import org.junit.jupiter.api.Test; + +class ReactorKafka133InstrumentationTest extends AbstractReactorKafkaTest { + + @Test + void testReceive() { + testSingleRecordProcess(recordConsumer -> receiver.receive(1).subscribe(recordConsumer)); + } + + @Test + void testReceiveAutoAck() { + testSingleRecordProcess( + recordConsumer -> + receiver.receiveAutoAck(1).subscribe(records -> records.subscribe(recordConsumer))); + } + + @Test + void testReceiveAtMostOnce() { + testSingleRecordProcess( + recordConsumer -> receiver.receiveAtmostOnce(1).subscribe(recordConsumer)); + } +} diff --git a/instrumentation/reactor/reactor-kafka-1.0/testing/build.gradle.kts b/instrumentation/reactor/reactor-kafka-1.0/testing/build.gradle.kts new file mode 100644 index 000000000000..a40de9c094c7 --- /dev/null +++ b/instrumentation/reactor/reactor-kafka-1.0/testing/build.gradle.kts @@ -0,0 +1,10 @@ +plugins { + id("otel.java-conventions") +} + +dependencies { + api(project(":testing-common")) + + compileOnly("io.projectreactor.kafka:reactor-kafka:1.0.0.RELEASE") + implementation("org.testcontainers:kafka") +} diff --git a/instrumentation/reactor/reactor-kafka-1.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/reactor/kafka/v1_0/AbstractReactorKafkaTest.java b/instrumentation/reactor/reactor-kafka-1.0/testing/src/main/java/io/opentelemetry/javaagent/instrumentation/reactor/kafka/v1_0/AbstractReactorKafkaTest.java similarity index 52% rename from instrumentation/reactor/reactor-kafka-1.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/reactor/kafka/v1_0/AbstractReactorKafkaTest.java rename to instrumentation/reactor/reactor-kafka-1.0/testing/src/main/java/io/opentelemetry/javaagent/instrumentation/reactor/kafka/v1_0/AbstractReactorKafkaTest.java index 0d9a2d6322f9..8ed6c051bd8b 100644 --- a/instrumentation/reactor/reactor-kafka-1.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/reactor/kafka/v1_0/AbstractReactorKafkaTest.java +++ b/instrumentation/reactor/reactor-kafka-1.0/testing/src/main/java/io/opentelemetry/javaagent/instrumentation/reactor/kafka/v1_0/AbstractReactorKafkaTest.java @@ -5,21 +5,32 @@ package io.opentelemetry.javaagent.instrumentation.reactor.kafka.v1_0; +import static io.opentelemetry.instrumentation.testing.util.TelemetryDataUtil.orderByRootSpanKind; import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.equalTo; import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.satisfies; import static java.util.Collections.singleton; import io.opentelemetry.api.common.AttributeKey; +import io.opentelemetry.api.trace.SpanKind; +import io.opentelemetry.instrumentation.testing.internal.AutoCleanupExtension; import io.opentelemetry.instrumentation.testing.junit.AgentInstrumentationExtension; import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension; import io.opentelemetry.sdk.testing.assertj.AttributeAssertion; +import io.opentelemetry.sdk.trace.data.LinkData; +import io.opentelemetry.sdk.trace.data.SpanData; import io.opentelemetry.semconv.trace.attributes.SemanticAttributes; +import java.lang.reflect.InvocationTargetException; import java.nio.charset.StandardCharsets; import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.List; import java.util.Properties; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; +import java.util.function.Function; +import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; @@ -33,21 +44,26 @@ import org.testcontainers.containers.output.Slf4jLogConsumer; import org.testcontainers.containers.wait.strategy.Wait; import org.testcontainers.utility.DockerImageName; +import reactor.core.Disposable; +import reactor.core.publisher.Flux; import reactor.kafka.receiver.KafkaReceiver; import reactor.kafka.receiver.ReceiverOptions; import reactor.kafka.sender.KafkaSender; import reactor.kafka.sender.SenderOptions; +import reactor.kafka.sender.SenderRecord; -abstract class AbstractReactorKafkaTest { +public abstract class AbstractReactorKafkaTest { private static final Logger logger = LoggerFactory.getLogger(AbstractReactorKafkaTest.class); @RegisterExtension protected static final InstrumentationExtension testing = AgentInstrumentationExtension.create(); + @RegisterExtension static final AutoCleanupExtension cleanup = AutoCleanupExtension.create(); + static KafkaContainer kafka; - static KafkaSender sender; - static KafkaReceiver receiver; + protected static KafkaSender sender; + protected static KafkaReceiver receiver; @BeforeAll static void setUpAll() { @@ -59,11 +75,8 @@ static void setUpAll() { .withStartupTimeout(Duration.ofMinutes(1)); kafka.start(); - sender = KafkaSender.create(SenderOptions.create(producerProps())); - receiver = - KafkaReceiver.create( - ReceiverOptions.create(consumerProps()) - .subscription(singleton("testTopic"))); + sender = KafkaSender.create(senderOptions()); + receiver = KafkaReceiver.create(receiverOptions()); } @AfterAll @@ -74,16 +87,26 @@ static void tearDownAll() { kafka.stop(); } - private static Properties producerProps() { + @SuppressWarnings("unchecked") + private static SenderOptions senderOptions() { Properties props = new Properties(); props.put("bootstrap.servers", kafka.getBootstrapServers()); props.put("retries", 0); props.put("key.serializer", StringSerializer.class); props.put("value.serializer", StringSerializer.class); - return props; + + try { + // SenderOptions changed from a class to an interface in 1.3.3, using reflection to avoid + // linkage error + return (SenderOptions) + SenderOptions.class.getMethod("create", Properties.class).invoke(null, props); + } catch (InvocationTargetException | IllegalAccessException | NoSuchMethodException e) { + throw new IllegalStateException(e); + } } - private static Properties consumerProps() { + @SuppressWarnings("unchecked") + private static ReceiverOptions receiverOptions() { Properties props = new Properties(); props.put("bootstrap.servers", kafka.getBootstrapServers()); props.put("group.id", "test"); @@ -93,7 +116,62 @@ private static Properties consumerProps() { props.put("auto.offset.reset", "earliest"); props.put("key.deserializer", StringDeserializer.class); props.put("value.deserializer", StringDeserializer.class); - return props; + + try { + // SenderOptions changed from a class to an interface in 1.3.3, using reflection to avoid + // linkage error + ReceiverOptions receiverOptions = + (ReceiverOptions) + ReceiverOptions.class.getMethod("create", Properties.class).invoke(null, props); + return (ReceiverOptions) + ReceiverOptions.class + .getMethod("subscription", Collection.class) + .invoke(receiverOptions, singleton("testTopic")); + } catch (InvocationTargetException | IllegalAccessException | NoSuchMethodException e) { + throw new IllegalStateException(e); + } + } + + protected void testSingleRecordProcess( + Function>, Disposable> subscriptionFunction) { + Disposable disposable = + subscriptionFunction.apply(record -> testing.runWithSpan("consumer", () -> {})); + cleanup.deferCleanup(disposable::dispose); + + SenderRecord record = + SenderRecord.create("testTopic", 0, null, "10", "testSpan", null); + Flux producer = sender.send(Flux.just(record)); + testing.runWithSpan("producer", () -> producer.blockLast(Duration.ofSeconds(30))); + + AtomicReference producerSpan = new AtomicReference<>(); + + testing.waitAndAssertSortedTraces( + orderByRootSpanKind(SpanKind.INTERNAL, SpanKind.CONSUMER), + trace -> { + trace.hasSpansSatisfyingExactly( + span -> span.hasName("producer"), + span -> + span.hasName("testTopic send") + .hasKind(SpanKind.PRODUCER) + .hasParent(trace.getSpan(0)) + .hasAttributesSatisfyingExactly(sendAttributes(record))); + + producerSpan.set(trace.getSpan(1)); + }, + trace -> + trace.hasSpansSatisfyingExactly( + span -> + span.hasName("testTopic receive") + .hasKind(SpanKind.CONSUMER) + .hasNoParent() + .hasAttributesSatisfyingExactly(receiveAttributes("testTopic")), + span -> + span.hasName("testTopic process") + .hasKind(SpanKind.CONSUMER) + .hasParent(trace.getSpan(0)) + .hasLinks(LinkData.create(producerSpan.get().getSpanContext())) + .hasAttributesSatisfyingExactly(processAttributes(record)), + span -> span.hasName("consumer").hasParent(trace.getSpan(1)))); } protected static List sendAttributes(ProducerRecord record) { @@ -120,15 +198,24 @@ protected static List sendAttributes(ProducerRecord receiveAttributes(String topic) { - return new ArrayList<>( - Arrays.asList( - equalTo(SemanticAttributes.MESSAGING_SYSTEM, "kafka"), - equalTo(SemanticAttributes.MESSAGING_DESTINATION_NAME, topic), - equalTo(SemanticAttributes.MESSAGING_DESTINATION_KIND, "topic"), - equalTo(SemanticAttributes.MESSAGING_OPERATION, "receive"), - satisfies( - SemanticAttributes.MESSAGING_KAFKA_CLIENT_ID, - stringAssert -> stringAssert.startsWith("consumer")))); + ArrayList assertions = + new ArrayList<>( + Arrays.asList( + equalTo(SemanticAttributes.MESSAGING_SYSTEM, "kafka"), + equalTo(SemanticAttributes.MESSAGING_DESTINATION_NAME, topic), + equalTo(SemanticAttributes.MESSAGING_DESTINATION_KIND, "topic"), + equalTo(SemanticAttributes.MESSAGING_OPERATION, "receive"), + satisfies( + SemanticAttributes.MESSAGING_KAFKA_CLIENT_ID, + stringAssert -> stringAssert.startsWith("consumer")))); + if (Boolean.getBoolean("hasConsumerGroupAndId")) { + assertions.add(equalTo(SemanticAttributes.MESSAGING_KAFKA_CONSUMER_GROUP, "test")); + assertions.add( + satisfies( + SemanticAttributes.MESSAGING_CONSUMER_ID, + stringAssert -> stringAssert.startsWith("test - consumer"))); + } + return assertions; } protected static List processAttributes( @@ -149,6 +236,13 @@ protected static List processAttributes( satisfies( SemanticAttributes.MESSAGING_KAFKA_MESSAGE_OFFSET, AbstractLongAssert::isNotNegative))); + if (Boolean.getBoolean("hasConsumerGroupAndId")) { + assertions.add(equalTo(SemanticAttributes.MESSAGING_KAFKA_CONSUMER_GROUP, "test")); + assertions.add( + satisfies( + SemanticAttributes.MESSAGING_CONSUMER_ID, + stringAssert -> stringAssert.startsWith("test - consumer"))); + } if (Boolean.getBoolean("otel.instrumentation.kafka.experimental-span-attributes")) { assertions.add( satisfies( diff --git a/settings.gradle.kts b/settings.gradle.kts index cda3235a610b..e3afe3b9c679 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -420,6 +420,7 @@ hideFromDependabot(":instrumentation:reactor:reactor-3.1:javaagent") hideFromDependabot(":instrumentation:reactor:reactor-3.1:library") hideFromDependabot(":instrumentation:reactor:reactor-3.1:testing") hideFromDependabot(":instrumentation:reactor:reactor-kafka-1.0:javaagent") +hideFromDependabot(":instrumentation:reactor:reactor-kafka-1.0:testing") hideFromDependabot(":instrumentation:reactor:reactor-netty:reactor-netty-0.9:javaagent") hideFromDependabot(":instrumentation:reactor:reactor-netty:reactor-netty-1.0:javaagent") hideFromDependabot(":instrumentation:reactor:reactor-netty:reactor-netty-1.0:javaagent-unit-tests") From 20dba68498a36e43f4a455675be1ee3d4c01a3db Mon Sep 17 00:00:00 2001 From: Mateusz Rzeszutek Date: Wed, 24 May 2023 14:36:54 +0200 Subject: [PATCH 2/2] static imports --- .../kafka/v1_0/AbstractReactorKafkaTest.java | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/instrumentation/reactor/reactor-kafka-1.0/testing/src/main/java/io/opentelemetry/javaagent/instrumentation/reactor/kafka/v1_0/AbstractReactorKafkaTest.java b/instrumentation/reactor/reactor-kafka-1.0/testing/src/main/java/io/opentelemetry/javaagent/instrumentation/reactor/kafka/v1_0/AbstractReactorKafkaTest.java index 8ed6c051bd8b..05b0f22dbff4 100644 --- a/instrumentation/reactor/reactor-kafka-1.0/testing/src/main/java/io/opentelemetry/javaagent/instrumentation/reactor/kafka/v1_0/AbstractReactorKafkaTest.java +++ b/instrumentation/reactor/reactor-kafka-1.0/testing/src/main/java/io/opentelemetry/javaagent/instrumentation/reactor/kafka/v1_0/AbstractReactorKafkaTest.java @@ -5,12 +5,13 @@ package io.opentelemetry.javaagent.instrumentation.reactor.kafka.v1_0; +import static io.opentelemetry.api.common.AttributeKey.longKey; import static io.opentelemetry.instrumentation.testing.util.TelemetryDataUtil.orderByRootSpanKind; import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.equalTo; import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.satisfies; +import static java.util.Arrays.asList; import static java.util.Collections.singleton; -import io.opentelemetry.api.common.AttributeKey; import io.opentelemetry.api.trace.SpanKind; import io.opentelemetry.instrumentation.testing.internal.AutoCleanupExtension; import io.opentelemetry.instrumentation.testing.junit.AgentInstrumentationExtension; @@ -23,7 +24,6 @@ import java.nio.charset.StandardCharsets; import java.time.Duration; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collection; import java.util.List; import java.util.Properties; @@ -177,7 +177,7 @@ protected void testSingleRecordProcess( protected static List sendAttributes(ProducerRecord record) { List assertions = new ArrayList<>( - Arrays.asList( + asList( equalTo(SemanticAttributes.MESSAGING_SYSTEM, "kafka"), equalTo(SemanticAttributes.MESSAGING_DESTINATION_NAME, record.topic()), equalTo(SemanticAttributes.MESSAGING_DESTINATION_KIND, "topic"), @@ -200,7 +200,7 @@ protected static List sendAttributes(ProducerRecord receiveAttributes(String topic) { ArrayList assertions = new ArrayList<>( - Arrays.asList( + asList( equalTo(SemanticAttributes.MESSAGING_SYSTEM, "kafka"), equalTo(SemanticAttributes.MESSAGING_DESTINATION_NAME, topic), equalTo(SemanticAttributes.MESSAGING_DESTINATION_KIND, "topic"), @@ -222,7 +222,7 @@ protected static List processAttributes( ProducerRecord record) { List assertions = new ArrayList<>( - Arrays.asList( + asList( equalTo(SemanticAttributes.MESSAGING_SYSTEM, "kafka"), equalTo(SemanticAttributes.MESSAGING_DESTINATION_NAME, record.topic()), equalTo(SemanticAttributes.MESSAGING_DESTINATION_KIND, "topic"), @@ -245,9 +245,7 @@ protected static List processAttributes( } if (Boolean.getBoolean("otel.instrumentation.kafka.experimental-span-attributes")) { assertions.add( - satisfies( - AttributeKey.longKey("kafka.record.queue_time_ms"), - AbstractLongAssert::isNotNegative)); + satisfies(longKey("kafka.record.queue_time_ms"), AbstractLongAssert::isNotNegative)); } String messageKey = record.key(); if (messageKey != null) {