diff --git a/instrumentation/spring/spring-kafka-2.7/javaagent/build.gradle.kts b/instrumentation/spring/spring-kafka-2.7/javaagent/build.gradle.kts index ebd165acfebf..c4e08e587c53 100644 --- a/instrumentation/spring/spring-kafka-2.7/javaagent/build.gradle.kts +++ b/instrumentation/spring/spring-kafka-2.7/javaagent/build.gradle.kts @@ -69,6 +69,7 @@ tasks { test { usesService(gradle.sharedServices.registrations["testcontainersBuildService"].service) + systemProperty("testLatestDeps", findProperty("testLatestDeps") as Boolean) jvmArgs("-Dotel.instrumentation.kafka.experimental-span-attributes=true") jvmArgs("-Dotel.instrumentation.messaging.experimental.receive-telemetry.enabled=true") } diff --git a/instrumentation/spring/spring-kafka-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/kafka/v2_7/AbstractMessageListenerContainerInstrumentation.java b/instrumentation/spring/spring-kafka-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/kafka/v2_7/AbstractMessageListenerContainerInstrumentation.java index 1b65c059fee0..a80a510990cd 100644 --- a/instrumentation/spring/spring-kafka-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/kafka/v2_7/AbstractMessageListenerContainerInstrumentation.java +++ b/instrumentation/spring/spring-kafka-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/kafka/v2_7/AbstractMessageListenerContainerInstrumentation.java @@ -16,7 +16,6 @@ import net.bytebuddy.asm.Advice; import net.bytebuddy.description.type.TypeDescription; import net.bytebuddy.matcher.ElementMatcher; -import org.springframework.kafka.listener.BatchInterceptor; import org.springframework.kafka.listener.RecordInterceptor; public class AbstractMessageListenerContainerInstrumentation implements TypeInstrumentation { @@ -28,16 +27,11 @@ public ElementMatcher typeMatcher() { @Override public void transform(TypeTransformer transformer) { - // getBatchInterceptor() is called internally by AbstractMessageListenerContainer - // implementations - transformer.applyAdviceToMethod( - named("getBatchInterceptor") - .and(isProtected()) - .and(takesArguments(0)) - .and(returns(named("org.springframework.kafka.listener.BatchInterceptor"))), - this.getClass().getName() + "$GetBatchInterceptorAdvice"); // getRecordInterceptor() is called internally by AbstractMessageListenerContainer // implementations + // for batch listeners we don't instrument getBatchInterceptor() here but instead instrument + // KafkaMessageListenerContainer$ListenerConsumer because spring doesn't always call the success + // and failure methods on a batch interceptor transformer.applyAdviceToMethod( named("getRecordInterceptor") .and(isProtected()) @@ -46,24 +40,6 @@ public void transform(TypeTransformer transformer) { this.getClass().getName() + "$GetRecordInterceptorAdvice"); } - @SuppressWarnings("unused") - public static class GetBatchInterceptorAdvice { - - @Advice.OnMethodExit(suppress = Throwable.class) - public static void onExit( - @Advice.Return(readOnly = false) BatchInterceptor interceptor) { - - if (interceptor == null - || !interceptor - .getClass() - .getName() - .equals( - "io.opentelemetry.instrumentation.spring.kafka.v2_7.InstrumentedBatchInterceptor")) { - interceptor = telemetry().createBatchInterceptor(interceptor); - } - } - } - @SuppressWarnings("unused") public static class GetRecordInterceptorAdvice { diff --git a/instrumentation/spring/spring-kafka-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/kafka/v2_7/ListenerConsumerInstrumentation.java b/instrumentation/spring/spring-kafka-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/kafka/v2_7/ListenerConsumerInstrumentation.java index 48c40f89aee8..9e9900f468d4 100644 --- a/instrumentation/spring/spring-kafka-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/kafka/v2_7/ListenerConsumerInstrumentation.java +++ b/instrumentation/spring/spring-kafka-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/kafka/v2_7/ListenerConsumerInstrumentation.java @@ -5,9 +5,16 @@ package io.opentelemetry.javaagent.instrumentation.spring.kafka.v2_7; +import static io.opentelemetry.javaagent.instrumentation.spring.kafka.v2_7.SpringKafkaSingletons.batchProcessInstrumenter; import static net.bytebuddy.matcher.ElementMatchers.isConstructor; import static net.bytebuddy.matcher.ElementMatchers.named; +import static net.bytebuddy.matcher.ElementMatchers.takesArgument; +import io.opentelemetry.context.Context; +import io.opentelemetry.context.Scope; +import io.opentelemetry.instrumentation.kafka.internal.KafkaConsumerContext; +import io.opentelemetry.instrumentation.kafka.internal.KafkaConsumerContextUtil; +import io.opentelemetry.instrumentation.kafka.internal.KafkaReceiveRequest; import io.opentelemetry.javaagent.bootstrap.kafka.KafkaClientsConsumerProcessTracing; import io.opentelemetry.javaagent.bootstrap.spring.SpringSchedulingTaskTracing; import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; @@ -15,6 +22,8 @@ import net.bytebuddy.asm.Advice; import net.bytebuddy.description.type.TypeDescription; import net.bytebuddy.matcher.ElementMatcher; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerRecords; public class ListenerConsumerInstrumentation implements TypeInstrumentation { @@ -29,6 +38,10 @@ public void transform(TypeTransformer transformer) { transformer.applyAdviceToMethod(named("run"), this.getClass().getName() + "$RunLoopAdvice"); transformer.applyAdviceToMethod( isConstructor(), this.getClass().getName() + "$ConstructorAdvice"); + transformer.applyAdviceToMethod( + named("invokeBatchOnMessageWithRecordsOrList") + .and(takesArgument(0, named("org.apache.kafka.clients.consumer.ConsumerRecords"))), + this.getClass().getName() + "$InvokeBatchAdvice"); } // this advice suppresses the CONSUMER spans created by the kafka-clients instrumentation @@ -60,4 +73,41 @@ public static void onExit(@Advice.Enter boolean previousValue) { SpringSchedulingTaskTracing.setEnabled(previousValue); } } + + @SuppressWarnings("unused") + public static class InvokeBatchAdvice { + + @Advice.OnMethodEnter(suppress = Throwable.class) + public static void onEnter( + @Advice.Argument(0) ConsumerRecords records, + @Advice.FieldValue("consumer") Consumer consumer, + @Advice.Local("otelRequest") KafkaReceiveRequest request, + @Advice.Local("otelContext") Context context, + @Advice.Local("otelScope") Scope scope) { + KafkaConsumerContext consumerContext = KafkaConsumerContextUtil.get(records); + Context receiveContext = consumerContext.getContext(); + + // use the receive CONSUMER span as parent if it's available + Context parentContext = receiveContext != null ? receiveContext : Context.current(); + + request = KafkaReceiveRequest.create(records, consumer); + if (batchProcessInstrumenter().shouldStart(parentContext, request)) { + context = batchProcessInstrumenter().start(parentContext, request); + scope = context.makeCurrent(); + } + } + + @Advice.OnMethodExit(suppress = Throwable.class, onThrowable = Throwable.class) + public static void onExit( + @Advice.Thrown Throwable throwable, + @Advice.Local("otelRequest") KafkaReceiveRequest request, + @Advice.Local("otelContext") Context context, + @Advice.Local("otelScope") Scope scope) { + if (scope == null) { + return; + } + scope.close(); + batchProcessInstrumenter().end(context, request, null, throwable); + } + } } diff --git a/instrumentation/spring/spring-kafka-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/kafka/v2_7/SpringKafkaSingletons.java b/instrumentation/spring/spring-kafka-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/kafka/v2_7/SpringKafkaSingletons.java index 89f2e63a86de..a1e150bca3c0 100644 --- a/instrumentation/spring/spring-kafka-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/kafka/v2_7/SpringKafkaSingletons.java +++ b/instrumentation/spring/spring-kafka-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/kafka/v2_7/SpringKafkaSingletons.java @@ -6,24 +6,48 @@ package io.opentelemetry.javaagent.instrumentation.spring.kafka.v2_7; import io.opentelemetry.api.GlobalOpenTelemetry; +import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter; +import io.opentelemetry.instrumentation.kafka.internal.KafkaInstrumenterFactory; +import io.opentelemetry.instrumentation.kafka.internal.KafkaReceiveRequest; import io.opentelemetry.instrumentation.spring.kafka.v2_7.SpringKafkaTelemetry; +import io.opentelemetry.instrumentation.spring.kafka.v2_7.internal.SpringKafkaErrorCauseExtractor; import io.opentelemetry.javaagent.bootstrap.internal.ExperimentalConfig; import io.opentelemetry.javaagent.bootstrap.internal.InstrumentationConfig; public final class SpringKafkaSingletons { + private static final String INSTRUMENTATION_NAME = "io.opentelemetry.spring-kafka-2.7"; private static final SpringKafkaTelemetry TELEMETRY = SpringKafkaTelemetry.builder(GlobalOpenTelemetry.get()) + .setCapturedHeaders(ExperimentalConfig.get().getMessagingHeaders()) .setCaptureExperimentalSpanAttributes( InstrumentationConfig.get() .getBoolean("otel.instrumentation.kafka.experimental-span-attributes", false)) .setMessagingReceiveInstrumentationEnabled( ExperimentalConfig.get().messagingReceiveInstrumentationEnabled()) .build(); + private static final Instrumenter BATCH_PROCESS_INSTRUMENTER; + + static { + KafkaInstrumenterFactory factory = + new KafkaInstrumenterFactory(GlobalOpenTelemetry.get(), INSTRUMENTATION_NAME) + .setCapturedHeaders(ExperimentalConfig.get().getMessagingHeaders()) + .setCaptureExperimentalSpanAttributes( + InstrumentationConfig.get() + .getBoolean("otel.instrumentation.kafka.experimental-span-attributes", false)) + .setMessagingReceiveInstrumentationEnabled( + ExperimentalConfig.get().messagingReceiveInstrumentationEnabled()) + .setErrorCauseExtractor(SpringKafkaErrorCauseExtractor.INSTANCE); + BATCH_PROCESS_INSTRUMENTER = factory.createBatchProcessInstrumenter(); + } public static SpringKafkaTelemetry telemetry() { return TELEMETRY; } + public static Instrumenter batchProcessInstrumenter() { + return BATCH_PROCESS_INSTRUMENTER; + } + private SpringKafkaSingletons() {} } diff --git a/instrumentation/spring/spring-kafka-2.7/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/spring/kafka/v2_7/SpringKafkaTest.java b/instrumentation/spring/spring-kafka-2.7/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/spring/kafka/v2_7/SpringKafkaTest.java index e3e62c5b10da..a58f3f1eefbe 100644 --- a/instrumentation/spring/spring-kafka-2.7/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/spring/kafka/v2_7/SpringKafkaTest.java +++ b/instrumentation/spring/spring-kafka-2.7/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/spring/kafka/v2_7/SpringKafkaTest.java @@ -14,15 +14,21 @@ import io.opentelemetry.api.trace.SpanKind; import io.opentelemetry.instrumentation.testing.junit.AgentInstrumentationExtension; import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension; +import io.opentelemetry.sdk.testing.assertj.AttributeAssertion; +import io.opentelemetry.sdk.testing.assertj.SpanDataAssert; +import io.opentelemetry.sdk.testing.assertj.TraceAssert; import io.opentelemetry.sdk.trace.data.LinkData; import io.opentelemetry.sdk.trace.data.SpanData; import io.opentelemetry.sdk.trace.data.StatusData; import io.opentelemetry.semconv.SemanticAttributes; import io.opentelemetry.testing.AbstractSpringKafkaTest; +import java.util.ArrayList; +import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; import org.assertj.core.api.AbstractLongAssert; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; @@ -144,8 +150,42 @@ void shouldHandleFailureInKafkaListener() { }); }); - AtomicReference producer = new AtomicReference<>(); + Consumer receiveSpanAssert = + span -> + span.hasName("testSingleTopic receive") + .hasKind(SpanKind.CONSUMER) + .hasNoParent() + .hasAttributesSatisfyingExactly( + equalTo(SemanticAttributes.MESSAGING_SYSTEM, "kafka"), + equalTo(SemanticAttributes.MESSAGING_DESTINATION_NAME, "testSingleTopic"), + equalTo(SemanticAttributes.MESSAGING_OPERATION, "receive"), + equalTo( + SemanticAttributes.MESSAGING_KAFKA_CONSUMER_GROUP, "testSingleListener"), + satisfies( + SemanticAttributes.MESSAGING_CLIENT_ID, + stringAssert -> stringAssert.startsWith("consumer")), + equalTo(SemanticAttributes.MESSAGING_BATCH_MESSAGE_COUNT, 1)); + List processAttributes = + Arrays.asList( + equalTo(SemanticAttributes.MESSAGING_SYSTEM, "kafka"), + equalTo(SemanticAttributes.MESSAGING_DESTINATION_NAME, "testSingleTopic"), + equalTo(SemanticAttributes.MESSAGING_OPERATION, "process"), + satisfies( + SemanticAttributes.MESSAGING_MESSAGE_BODY_SIZE, AbstractLongAssert::isNotNegative), + satisfies( + SemanticAttributes.MESSAGING_KAFKA_DESTINATION_PARTITION, + AbstractLongAssert::isNotNegative), + satisfies( + SemanticAttributes.MESSAGING_KAFKA_MESSAGE_OFFSET, + AbstractLongAssert::isNotNegative), + equalTo(SemanticAttributes.MESSAGING_KAFKA_MESSAGE_KEY, "10"), + equalTo(SemanticAttributes.MESSAGING_KAFKA_CONSUMER_GROUP, "testSingleListener"), + satisfies( + SemanticAttributes.MESSAGING_CLIENT_ID, + stringAssert -> stringAssert.startsWith("consumer")), + satisfies(longKey("kafka.record.queue_time_ms"), AbstractLongAssert::isNotNegative)); + AtomicReference producer = new AtomicReference<>(); testing.waitAndAssertSortedTraces( orderByRootSpanKind(SpanKind.INTERNAL, SpanKind.CONSUMER), trace -> { @@ -174,22 +214,19 @@ void shouldHandleFailureInKafkaListener() { }, trace -> trace.hasSpansSatisfyingExactly( + receiveSpanAssert, span -> - span.hasName("testSingleTopic receive") + span.hasName("testSingleTopic process") .hasKind(SpanKind.CONSUMER) - .hasNoParent() - .hasAttributesSatisfyingExactly( - equalTo(SemanticAttributes.MESSAGING_SYSTEM, "kafka"), - equalTo( - SemanticAttributes.MESSAGING_DESTINATION_NAME, "testSingleTopic"), - equalTo(SemanticAttributes.MESSAGING_OPERATION, "receive"), - equalTo( - SemanticAttributes.MESSAGING_KAFKA_CONSUMER_GROUP, - "testSingleListener"), - satisfies( - SemanticAttributes.MESSAGING_CLIENT_ID, - stringAssert -> stringAssert.startsWith("consumer")), - equalTo(SemanticAttributes.MESSAGING_BATCH_MESSAGE_COUNT, 1)), + .hasParent(trace.getSpan(0)) + .hasLinks(LinkData.create(producer.get().getSpanContext())) + .hasStatus(StatusData.error()) + .hasException(new IllegalArgumentException("boom")) + .hasAttributesSatisfyingExactly(processAttributes), + span -> span.hasName("consumer").hasParent(trace.getSpan(1))), + trace -> + trace.hasSpansSatisfyingExactly( + receiveSpanAssert, span -> span.hasName("testSingleTopic process") .hasKind(SpanKind.CONSUMER) @@ -197,30 +234,17 @@ void shouldHandleFailureInKafkaListener() { .hasLinks(LinkData.create(producer.get().getSpanContext())) .hasStatus(StatusData.error()) .hasException(new IllegalArgumentException("boom")) - .hasAttributesSatisfyingExactly( - equalTo(SemanticAttributes.MESSAGING_SYSTEM, "kafka"), - equalTo( - SemanticAttributes.MESSAGING_DESTINATION_NAME, "testSingleTopic"), - equalTo(SemanticAttributes.MESSAGING_OPERATION, "process"), - satisfies( - SemanticAttributes.MESSAGING_MESSAGE_BODY_SIZE, - AbstractLongAssert::isNotNegative), - satisfies( - SemanticAttributes.MESSAGING_KAFKA_DESTINATION_PARTITION, - AbstractLongAssert::isNotNegative), - satisfies( - SemanticAttributes.MESSAGING_KAFKA_MESSAGE_OFFSET, - AbstractLongAssert::isNotNegative), - equalTo(SemanticAttributes.MESSAGING_KAFKA_MESSAGE_KEY, "10"), - equalTo( - SemanticAttributes.MESSAGING_KAFKA_CONSUMER_GROUP, - "testSingleListener"), - satisfies( - SemanticAttributes.MESSAGING_CLIENT_ID, - stringAssert -> stringAssert.startsWith("consumer")), - satisfies( - longKey("kafka.record.queue_time_ms"), - AbstractLongAssert::isNotNegative)), + .hasAttributesSatisfyingExactly(processAttributes), + span -> span.hasName("consumer").hasParent(trace.getSpan(1))), + trace -> + trace.hasSpansSatisfyingExactly( + receiveSpanAssert, + span -> + span.hasName("testSingleTopic process") + .hasKind(SpanKind.CONSUMER) + .hasParent(trace.getSpan(0)) + .hasLinks(LinkData.create(producer.get().getSpanContext())) + .hasAttributesSatisfyingExactly(processAttributes), span -> span.hasName("consumer").hasParent(trace.getSpan(1)))); } @@ -333,8 +357,8 @@ void shouldHandleFailureInKafkaBatchListener() { AtomicReference producer = new AtomicReference<>(); - testing.waitAndAssertSortedTraces( - orderByRootSpanKind(SpanKind.INTERNAL, SpanKind.CONSUMER), + List> assertions = new ArrayList<>(); + assertions.add( trace -> { trace.hasSpansSatisfyingExactly( span -> span.hasName("producer"), @@ -358,44 +382,76 @@ void shouldHandleFailureInKafkaBatchListener() { stringAssert -> stringAssert.startsWith("producer")))); producer.set(trace.getSpan(1)); - }, - trace -> - trace.hasSpansSatisfyingExactly( - span -> - span.hasName("testBatchTopic receive") - .hasKind(SpanKind.CONSUMER) - .hasNoParent() - .hasAttributesSatisfyingExactly( - equalTo(SemanticAttributes.MESSAGING_SYSTEM, "kafka"), - equalTo( - SemanticAttributes.MESSAGING_DESTINATION_NAME, "testBatchTopic"), - equalTo(SemanticAttributes.MESSAGING_OPERATION, "receive"), - equalTo( - SemanticAttributes.MESSAGING_KAFKA_CONSUMER_GROUP, - "testBatchListener"), - satisfies( - SemanticAttributes.MESSAGING_CLIENT_ID, - stringAssert -> stringAssert.startsWith("consumer")), - equalTo(SemanticAttributes.MESSAGING_BATCH_MESSAGE_COUNT, 1)), - span -> - span.hasName("testBatchTopic process") - .hasKind(SpanKind.CONSUMER) - .hasParent(trace.getSpan(0)) - .hasLinks(LinkData.create(producer.get().getSpanContext())) - .hasStatus(StatusData.error()) - .hasException(new IllegalArgumentException("boom")) - .hasAttributesSatisfyingExactly( - equalTo(SemanticAttributes.MESSAGING_SYSTEM, "kafka"), - equalTo( - SemanticAttributes.MESSAGING_DESTINATION_NAME, "testBatchTopic"), - equalTo(SemanticAttributes.MESSAGING_OPERATION, "process"), - equalTo( - SemanticAttributes.MESSAGING_KAFKA_CONSUMER_GROUP, - "testBatchListener"), - satisfies( - SemanticAttributes.MESSAGING_CLIENT_ID, - stringAssert -> stringAssert.startsWith("consumer")), - equalTo(SemanticAttributes.MESSAGING_BATCH_MESSAGE_COUNT, 1)), - span -> span.hasName("consumer").hasParent(trace.getSpan(1)))); + }); + + if (Boolean.getBoolean("testLatestDeps")) { + // latest dep tests call receive once and only retry the failed process step + assertions.add( + trace -> + trace.hasSpansSatisfyingExactly( + SpringKafkaTest::assertReceiveSpan, + span -> assertProcessSpan(span, trace, producer.get(), true), + span -> span.hasName("consumer").hasParent(trace.getSpan(1)), + span -> assertProcessSpan(span, trace, producer.get(), true), + span -> span.hasName("consumer").hasParent(trace.getSpan(3)), + span -> assertProcessSpan(span, trace, producer.get(), false), + span -> span.hasName("consumer").hasParent(trace.getSpan(5)))); + } else { + assertions.addAll( + Arrays.asList( + trace -> + trace.hasSpansSatisfyingExactly( + SpringKafkaTest::assertReceiveSpan, + span -> assertProcessSpan(span, trace, producer.get(), true), + span -> span.hasName("consumer").hasParent(trace.getSpan(1))), + trace -> + trace.hasSpansSatisfyingExactly( + SpringKafkaTest::assertReceiveSpan, + span -> assertProcessSpan(span, trace, producer.get(), true), + span -> span.hasName("consumer").hasParent(trace.getSpan(1))), + trace -> + trace.hasSpansSatisfyingExactly( + SpringKafkaTest::assertReceiveSpan, + span -> assertProcessSpan(span, trace, producer.get(), false), + span -> span.hasName("consumer").hasParent(trace.getSpan(1))))); + } + + testing.waitAndAssertSortedTraces( + orderByRootSpanKind(SpanKind.INTERNAL, SpanKind.CONSUMER), assertions); + } + + private static void assertReceiveSpan(SpanDataAssert span) { + span.hasName("testBatchTopic receive") + .hasKind(SpanKind.CONSUMER) + .hasNoParent() + .hasAttributesSatisfyingExactly( + equalTo(SemanticAttributes.MESSAGING_SYSTEM, "kafka"), + equalTo(SemanticAttributes.MESSAGING_DESTINATION_NAME, "testBatchTopic"), + equalTo(SemanticAttributes.MESSAGING_OPERATION, "receive"), + equalTo(SemanticAttributes.MESSAGING_KAFKA_CONSUMER_GROUP, "testBatchListener"), + satisfies( + SemanticAttributes.MESSAGING_CLIENT_ID, + stringAssert -> stringAssert.startsWith("consumer")), + equalTo(SemanticAttributes.MESSAGING_BATCH_MESSAGE_COUNT, 1)); + } + + private static void assertProcessSpan( + SpanDataAssert span, TraceAssert trace, SpanData producer, boolean failed) { + span.hasName("testBatchTopic process") + .hasKind(SpanKind.CONSUMER) + .hasParent(trace.getSpan(0)) + .hasLinks(LinkData.create(producer.getSpanContext())) + .hasAttributesSatisfyingExactly( + equalTo(SemanticAttributes.MESSAGING_SYSTEM, "kafka"), + equalTo(SemanticAttributes.MESSAGING_DESTINATION_NAME, "testBatchTopic"), + equalTo(SemanticAttributes.MESSAGING_OPERATION, "process"), + equalTo(SemanticAttributes.MESSAGING_KAFKA_CONSUMER_GROUP, "testBatchListener"), + satisfies( + SemanticAttributes.MESSAGING_CLIENT_ID, + stringAssert -> stringAssert.startsWith("consumer")), + equalTo(SemanticAttributes.MESSAGING_BATCH_MESSAGE_COUNT, 1)); + if (failed) { + span.hasStatus(StatusData.error()).hasException(new IllegalArgumentException("boom")); + } } } diff --git a/instrumentation/spring/spring-kafka-2.7/javaagent/src/testNoReceiveTelemetry/java/io/opentelemetry/javaagent/instrumentation/spring/kafka/v2_7/SpringKafkaNoReceiveTelemetryTest.java b/instrumentation/spring/spring-kafka-2.7/javaagent/src/testNoReceiveTelemetry/java/io/opentelemetry/javaagent/instrumentation/spring/kafka/v2_7/SpringKafkaNoReceiveTelemetryTest.java index 4f3c74786167..e37ebe1efa28 100644 --- a/instrumentation/spring/spring-kafka-2.7/javaagent/src/testNoReceiveTelemetry/java/io/opentelemetry/javaagent/instrumentation/spring/kafka/v2_7/SpringKafkaNoReceiveTelemetryTest.java +++ b/instrumentation/spring/spring-kafka-2.7/javaagent/src/testNoReceiveTelemetry/java/io/opentelemetry/javaagent/instrumentation/spring/kafka/v2_7/SpringKafkaNoReceiveTelemetryTest.java @@ -27,4 +27,9 @@ protected InstrumentationExtension testing() { protected List> additionalSpringConfigs() { return emptyList(); } + + @Override + protected boolean isLibraryInstrumentationTest() { + return false; + } } diff --git a/instrumentation/spring/spring-kafka-2.7/library/build.gradle.kts b/instrumentation/spring/spring-kafka-2.7/library/build.gradle.kts index 66499b5c303f..9fe8482fb8ce 100644 --- a/instrumentation/spring/spring-kafka-2.7/library/build.gradle.kts +++ b/instrumentation/spring/spring-kafka-2.7/library/build.gradle.kts @@ -24,6 +24,7 @@ dependencies { tasks.withType().configureEach { usesService(gradle.sharedServices.registrations["testcontainersBuildService"].service) + systemProperty("testLatestDeps", findProperty("testLatestDeps") as Boolean) } val latestDepTest = findProperty("testLatestDeps") as Boolean diff --git a/instrumentation/spring/spring-kafka-2.7/library/src/main/java/io/opentelemetry/instrumentation/spring/kafka/v2_7/InstrumentedBatchInterceptor.java b/instrumentation/spring/spring-kafka-2.7/library/src/main/java/io/opentelemetry/instrumentation/spring/kafka/v2_7/InstrumentedBatchInterceptor.java index 4d30d8c97b15..20af82bbff99 100644 --- a/instrumentation/spring/spring-kafka-2.7/library/src/main/java/io/opentelemetry/instrumentation/spring/kafka/v2_7/InstrumentedBatchInterceptor.java +++ b/instrumentation/spring/spring-kafka-2.7/library/src/main/java/io/opentelemetry/instrumentation/spring/kafka/v2_7/InstrumentedBatchInterceptor.java @@ -12,6 +12,7 @@ import io.opentelemetry.instrumentation.kafka.internal.KafkaConsumerContext; import io.opentelemetry.instrumentation.kafka.internal.KafkaConsumerContextUtil; import io.opentelemetry.instrumentation.kafka.internal.KafkaReceiveRequest; +import java.lang.ref.WeakReference; import javax.annotation.Nullable; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecords; @@ -21,6 +22,8 @@ final class InstrumentedBatchInterceptor implements BatchInterceptor private static final VirtualField, State> stateField = VirtualField.find(ConsumerRecords.class, State.class); + private static final ThreadLocal>> lastProcessed = + new ThreadLocal<>(); private final Instrumenter batchProcessInstrumenter; @Nullable private final BatchInterceptor decorated; @@ -37,7 +40,7 @@ public ConsumerRecords intercept(ConsumerRecords records, Consumer intercept(ConsumerRecords records, Consumer records) { + // When retrying failed listener interceptors work as expected only in the earlier versions that + // we test (e.g spring-kafka:2.7.1). In later versions interceptor isn't called at all during + // the retry, which results in missing process span, or worse, the intercept method is called, + // but neither success nor failure is called, which results in a context leak. Here we attempt + // to prevent the context leak by observing whether intercept is called with the same + // ConsumerRecords as on previous call, and if it is, we skip creating the process span. + WeakReference> reference = lastProcessed.get(); + return reference != null && reference.get() == records; + } + private static Context getParentContext(ConsumerRecords records) { KafkaConsumerContext consumerContext = KafkaConsumerContextUtil.get(records); Context receiveContext = consumerContext.getContext(); @@ -77,6 +91,7 @@ private void end(ConsumerRecords records, @Nullable Throwable error) { KafkaReceiveRequest request = state.request(); state.scope().close(); batchProcessInstrumenter.end(state.context(), request, null, error); + lastProcessed.set(new WeakReference<>(records)); } } } diff --git a/instrumentation/spring/spring-kafka-2.7/library/src/main/java/io/opentelemetry/instrumentation/spring/kafka/v2_7/SpringKafkaTelemetryBuilder.java b/instrumentation/spring/spring-kafka-2.7/library/src/main/java/io/opentelemetry/instrumentation/spring/kafka/v2_7/SpringKafkaTelemetryBuilder.java index 0edd8a78f55c..dc74cd179fc2 100644 --- a/instrumentation/spring/spring-kafka-2.7/library/src/main/java/io/opentelemetry/instrumentation/spring/kafka/v2_7/SpringKafkaTelemetryBuilder.java +++ b/instrumentation/spring/spring-kafka-2.7/library/src/main/java/io/opentelemetry/instrumentation/spring/kafka/v2_7/SpringKafkaTelemetryBuilder.java @@ -10,6 +10,7 @@ import com.google.errorprone.annotations.CanIgnoreReturnValue; import io.opentelemetry.api.OpenTelemetry; import io.opentelemetry.instrumentation.kafka.internal.KafkaInstrumenterFactory; +import io.opentelemetry.instrumentation.spring.kafka.v2_7.internal.SpringKafkaErrorCauseExtractor; import java.util.List; /** A builder of {@link SpringKafkaTelemetry}. */ diff --git a/instrumentation/spring/spring-kafka-2.7/library/src/main/java/io/opentelemetry/instrumentation/spring/kafka/v2_7/SpringKafkaErrorCauseExtractor.java b/instrumentation/spring/spring-kafka-2.7/library/src/main/java/io/opentelemetry/instrumentation/spring/kafka/v2_7/internal/SpringKafkaErrorCauseExtractor.java similarity index 65% rename from instrumentation/spring/spring-kafka-2.7/library/src/main/java/io/opentelemetry/instrumentation/spring/kafka/v2_7/SpringKafkaErrorCauseExtractor.java rename to instrumentation/spring/spring-kafka-2.7/library/src/main/java/io/opentelemetry/instrumentation/spring/kafka/v2_7/internal/SpringKafkaErrorCauseExtractor.java index 463ce1b13b0d..bc9a59629fb3 100644 --- a/instrumentation/spring/spring-kafka-2.7/library/src/main/java/io/opentelemetry/instrumentation/spring/kafka/v2_7/SpringKafkaErrorCauseExtractor.java +++ b/instrumentation/spring/spring-kafka-2.7/library/src/main/java/io/opentelemetry/instrumentation/spring/kafka/v2_7/internal/SpringKafkaErrorCauseExtractor.java @@ -3,12 +3,16 @@ * SPDX-License-Identifier: Apache-2.0 */ -package io.opentelemetry.instrumentation.spring.kafka.v2_7; +package io.opentelemetry.instrumentation.spring.kafka.v2_7.internal; import io.opentelemetry.instrumentation.api.instrumenter.ErrorCauseExtractor; import org.springframework.kafka.listener.ListenerExecutionFailedException; -enum SpringKafkaErrorCauseExtractor implements ErrorCauseExtractor { +/** + * This class is internal and is hence not for public use. Its APIs are unstable and can change at + * any time. + */ +public enum SpringKafkaErrorCauseExtractor implements ErrorCauseExtractor { INSTANCE; @Override diff --git a/instrumentation/spring/spring-kafka-2.7/library/src/test/java/io/opentelemetry/instrumentation/spring/kafka/v2_7/SpringKafkaNoReceiveTelemetryTest.java b/instrumentation/spring/spring-kafka-2.7/library/src/test/java/io/opentelemetry/instrumentation/spring/kafka/v2_7/SpringKafkaNoReceiveTelemetryTest.java index 49980d63f72a..e4e96db2926d 100644 --- a/instrumentation/spring/spring-kafka-2.7/library/src/test/java/io/opentelemetry/instrumentation/spring/kafka/v2_7/SpringKafkaNoReceiveTelemetryTest.java +++ b/instrumentation/spring/spring-kafka-2.7/library/src/test/java/io/opentelemetry/instrumentation/spring/kafka/v2_7/SpringKafkaNoReceiveTelemetryTest.java @@ -34,6 +34,11 @@ protected List> additionalSpringConfigs() { return singletonList(KafkaInstrumentationConfig.class); } + @Override + protected boolean isLibraryInstrumentationTest() { + return true; + } + @Configuration public static class KafkaInstrumentationConfig { diff --git a/instrumentation/spring/spring-kafka-2.7/testing/src/main/java/io/opentelemetry/testing/AbstractSpringKafkaNoReceiveTelemetryTest.java b/instrumentation/spring/spring-kafka-2.7/testing/src/main/java/io/opentelemetry/testing/AbstractSpringKafkaNoReceiveTelemetryTest.java index 222dd76f75fd..10e00c5d7d35 100644 --- a/instrumentation/spring/spring-kafka-2.7/testing/src/main/java/io/opentelemetry/testing/AbstractSpringKafkaNoReceiveTelemetryTest.java +++ b/instrumentation/spring/spring-kafka-2.7/testing/src/main/java/io/opentelemetry/testing/AbstractSpringKafkaNoReceiveTelemetryTest.java @@ -6,14 +6,18 @@ package io.opentelemetry.testing; import static io.opentelemetry.instrumentation.testing.util.TelemetryDataUtil.orderByRootSpanKind; +import static io.opentelemetry.instrumentation.testing.util.TelemetryDataUtil.orderByRootSpanName; import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.equalTo; import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.satisfies; import io.opentelemetry.api.trace.SpanKind; +import io.opentelemetry.sdk.testing.assertj.AttributeAssertion; import io.opentelemetry.sdk.trace.data.SpanData; import io.opentelemetry.sdk.trace.data.StatusData; import io.opentelemetry.semconv.SemanticAttributes; +import java.util.Arrays; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicReference; import org.assertj.core.api.AbstractLongAssert; @@ -21,6 +25,8 @@ public abstract class AbstractSpringKafkaNoReceiveTelemetryTest extends AbstractSpringKafkaTest { + protected abstract boolean isLibraryInstrumentationTest(); + @Test void shouldCreateSpansForSingleRecordProcess() { testing() @@ -101,6 +107,25 @@ void shouldHandleFailureInKafkaListener() { }); }); + List processAttributes = + Arrays.asList( + equalTo(SemanticAttributes.MESSAGING_SYSTEM, "kafka"), + equalTo(SemanticAttributes.MESSAGING_DESTINATION_NAME, "testSingleTopic"), + equalTo(SemanticAttributes.MESSAGING_OPERATION, "process"), + satisfies( + SemanticAttributes.MESSAGING_MESSAGE_BODY_SIZE, AbstractLongAssert::isNotNegative), + satisfies( + SemanticAttributes.MESSAGING_KAFKA_DESTINATION_PARTITION, + AbstractLongAssert::isNotNegative), + satisfies( + SemanticAttributes.MESSAGING_KAFKA_MESSAGE_OFFSET, + AbstractLongAssert::isNotNegative), + equalTo(SemanticAttributes.MESSAGING_KAFKA_MESSAGE_KEY, "10"), + equalTo(SemanticAttributes.MESSAGING_KAFKA_CONSUMER_GROUP, "testSingleListener"), + satisfies( + SemanticAttributes.MESSAGING_CLIENT_ID, + stringAssert -> stringAssert.startsWith("consumer"))); + testing() .waitAndAssertTraces( trace -> @@ -132,29 +157,23 @@ void shouldHandleFailureInKafkaListener() { .hasParent(trace.getSpan(1)) .hasStatus(StatusData.error()) .hasException(new IllegalArgumentException("boom")) - .hasAttributesSatisfyingExactly( - equalTo(SemanticAttributes.MESSAGING_SYSTEM, "kafka"), - equalTo( - SemanticAttributes.MESSAGING_DESTINATION_NAME, - "testSingleTopic"), - equalTo(SemanticAttributes.MESSAGING_OPERATION, "process"), - satisfies( - SemanticAttributes.MESSAGING_MESSAGE_BODY_SIZE, - AbstractLongAssert::isNotNegative), - satisfies( - SemanticAttributes.MESSAGING_KAFKA_DESTINATION_PARTITION, - AbstractLongAssert::isNotNegative), - satisfies( - SemanticAttributes.MESSAGING_KAFKA_MESSAGE_OFFSET, - AbstractLongAssert::isNotNegative), - equalTo(SemanticAttributes.MESSAGING_KAFKA_MESSAGE_KEY, "10"), - equalTo( - SemanticAttributes.MESSAGING_KAFKA_CONSUMER_GROUP, - "testSingleListener"), - satisfies( - SemanticAttributes.MESSAGING_CLIENT_ID, - stringAssert -> stringAssert.startsWith("consumer"))), - span -> span.hasName("consumer").hasParent(trace.getSpan(2)))); + .hasAttributesSatisfyingExactly(processAttributes), + span -> span.hasName("consumer").hasParent(trace.getSpan(2)), + span -> + span.hasName("testSingleTopic process") + .hasKind(SpanKind.CONSUMER) + .hasParent(trace.getSpan(1)) + .hasStatus(StatusData.error()) + .hasException(new IllegalArgumentException("boom")) + .hasAttributesSatisfyingExactly(processAttributes), + span -> span.hasName("consumer").hasParent(trace.getSpan(4)), + span -> + span.hasName("testSingleTopic process") + .hasKind(SpanKind.CONSUMER) + .hasParent(trace.getSpan(1)) + .hasStatus(StatusData.unset()) + .hasAttributesSatisfyingExactly(processAttributes), + span -> span.hasName("consumer").hasParent(trace.getSpan(6)))); } @Test @@ -256,9 +275,20 @@ void shouldHandleFailureInKafkaBatchListener() { AtomicReference producer = new AtomicReference<>(); + List processAttributes = + Arrays.asList( + equalTo(SemanticAttributes.MESSAGING_SYSTEM, "kafka"), + equalTo(SemanticAttributes.MESSAGING_DESTINATION_NAME, "testBatchTopic"), + equalTo(SemanticAttributes.MESSAGING_OPERATION, "process"), + equalTo(SemanticAttributes.MESSAGING_KAFKA_CONSUMER_GROUP, "testBatchListener"), + satisfies( + SemanticAttributes.MESSAGING_CLIENT_ID, + stringAssert -> stringAssert.startsWith("consumer")), + equalTo(SemanticAttributes.MESSAGING_BATCH_MESSAGE_COUNT, 1)); + testing() .waitAndAssertSortedTraces( - orderByRootSpanKind(SpanKind.INTERNAL, SpanKind.CONSUMER), + orderByRootSpanName("producer", "testBatchTopic process", "consumer"), trace -> { trace.hasSpansSatisfyingExactly( span -> span.hasName("producer"), @@ -293,19 +323,40 @@ void shouldHandleFailureInKafkaBatchListener() { .hasLinksSatisfying(links(producer.get().getSpanContext())) .hasStatus(StatusData.error()) .hasException(new IllegalArgumentException("boom")) - .hasAttributesSatisfyingExactly( - equalTo(SemanticAttributes.MESSAGING_SYSTEM, "kafka"), - equalTo( - SemanticAttributes.MESSAGING_DESTINATION_NAME, - "testBatchTopic"), - equalTo(SemanticAttributes.MESSAGING_OPERATION, "process"), - equalTo( - SemanticAttributes.MESSAGING_KAFKA_CONSUMER_GROUP, - "testBatchListener"), - satisfies( - SemanticAttributes.MESSAGING_CLIENT_ID, - stringAssert -> stringAssert.startsWith("consumer")), - equalTo(SemanticAttributes.MESSAGING_BATCH_MESSAGE_COUNT, 1)), - span -> span.hasName("consumer").hasParent(trace.getSpan(0)))); + .hasAttributesSatisfyingExactly(processAttributes), + span -> span.hasName("consumer").hasParent(trace.getSpan(0))), + trace -> { + if (isLibraryInstrumentationTest() && Boolean.getBoolean("testLatestDeps")) { + // in latest dep tests process spans are not created for retries because spring does + // not call the success/failure methods on the BatchInterceptor for reties + trace.hasSpansSatisfyingExactly(span -> span.hasName("consumer").hasNoParent()); + } else { + trace.hasSpansSatisfyingExactly( + span -> + span.hasName("testBatchTopic process") + .hasKind(SpanKind.CONSUMER) + .hasNoParent() + .hasLinksSatisfying(links(producer.get().getSpanContext())) + .hasStatus(StatusData.error()) + .hasException(new IllegalArgumentException("boom")) + .hasAttributesSatisfyingExactly(processAttributes), + span -> span.hasName("consumer").hasParent(trace.getSpan(0))); + } + }, + trace -> { + if (isLibraryInstrumentationTest() && Boolean.getBoolean("testLatestDeps")) { + trace.hasSpansSatisfyingExactly(span -> span.hasName("consumer").hasNoParent()); + } else { + trace.hasSpansSatisfyingExactly( + span -> + span.hasName("testBatchTopic process") + .hasKind(SpanKind.CONSUMER) + .hasNoParent() + .hasLinksSatisfying(links(producer.get().getSpanContext())) + .hasStatus(StatusData.unset()) + .hasAttributesSatisfyingExactly(processAttributes), + span -> span.hasName("consumer").hasParent(trace.getSpan(0))); + } + }); } } diff --git a/instrumentation/spring/spring-kafka-2.7/testing/src/main/java/io/opentelemetry/testing/BatchRecordListener.java b/instrumentation/spring/spring-kafka-2.7/testing/src/main/java/io/opentelemetry/testing/BatchRecordListener.java index b0bbeea4d205..ea3bb655b592 100644 --- a/instrumentation/spring/spring-kafka-2.7/testing/src/main/java/io/opentelemetry/testing/BatchRecordListener.java +++ b/instrumentation/spring/spring-kafka-2.7/testing/src/main/java/io/opentelemetry/testing/BatchRecordListener.java @@ -18,6 +18,7 @@ public class BatchRecordListener { private static final AtomicInteger lastBatchSize = new AtomicInteger(); private static volatile CountDownLatch messageReceived = new CountDownLatch(2); + private int failureCount; @KafkaListener( id = "testBatchListener", @@ -30,7 +31,8 @@ public void listener(List> records) { GlobalTraceUtil.runWithSpan("consumer", () -> {}); records.forEach( record -> { - if (record.value().equals("error")) { + if (record.value().equals("error") && failureCount < 2) { + failureCount++; throw new IllegalArgumentException("boom"); } }); diff --git a/instrumentation/spring/spring-kafka-2.7/testing/src/main/java/io/opentelemetry/testing/ConsumerConfig.java b/instrumentation/spring/spring-kafka-2.7/testing/src/main/java/io/opentelemetry/testing/ConsumerConfig.java index 4c97068197f7..78fb2607b349 100644 --- a/instrumentation/spring/spring-kafka-2.7/testing/src/main/java/io/opentelemetry/testing/ConsumerConfig.java +++ b/instrumentation/spring/spring-kafka-2.7/testing/src/main/java/io/opentelemetry/testing/ConsumerConfig.java @@ -5,7 +5,6 @@ package io.opentelemetry.testing; -import java.lang.reflect.Method; import org.apache.kafka.clients.admin.NewTopic; import org.springframework.beans.factory.ObjectProvider; import org.springframework.boot.SpringBootConfiguration; @@ -16,8 +15,6 @@ import org.springframework.kafka.config.TopicBuilder; import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.listener.ConcurrentMessageListenerContainer; -import org.springframework.util.backoff.BackOff; -import org.springframework.util.backoff.FixedBackOff; @SpringBootConfiguration @EnableAutoConfiguration @@ -52,14 +49,6 @@ public ConcurrentKafkaListenerContainerFactory batchFactory( customizerProvider) { ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); - // do not retry failed records - try { - Class.forName("org.springframework.kafka.listener.BatchErrorHandler"); - ErrorHandlerSetter.setBatchErrorHandler(factory); - } catch (ClassNotFoundException ignored) { - // org.springframework.kafka.listener.BatchErrorHandler is missing in latest - setCommonErrorHandler(factory); - } factory.setConsumerFactory(consumerFactory); factory.setBatchListener(true); factory.setAutoStartup(true); @@ -76,35 +65,10 @@ public ConcurrentKafkaListenerContainerFactory singleFactory( customizerProvider) { ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); - // do not retry failed records - try { - Class.forName("org.springframework.kafka.listener.ErrorHandler"); - ErrorHandlerSetter.setErrorHandler(factory); - } catch (ClassNotFoundException ignored) { - // org.springframework.kafka.listener.ErrorHandler is missing in latest - setCommonErrorHandler(factory); - } factory.setConsumerFactory(consumerFactory); factory.setBatchListener(false); factory.setAutoStartup(true); customizerProvider.ifAvailable(factory::setContainerCustomizer); return factory; } - - private static void setCommonErrorHandler( - ConcurrentKafkaListenerContainerFactory factory) { - try { - Class handlerClass = - Class.forName("org.springframework.kafka.listener.CommonErrorHandler"); - Class defaultHandlerClass = - Class.forName("org.springframework.kafka.listener.DefaultErrorHandler"); - BackOff backOff = new FixedBackOff(0, 0); - Object handler = - defaultHandlerClass.getDeclaredConstructor(BackOff.class).newInstance(backOff); - Method method = factory.getClass().getMethod("setCommonErrorHandler", handlerClass); - method.invoke(factory, handler); - } catch (Exception exception) { - // ignored - } - } } diff --git a/instrumentation/spring/spring-kafka-2.7/testing/src/main/java/io/opentelemetry/testing/SingleRecordListener.java b/instrumentation/spring/spring-kafka-2.7/testing/src/main/java/io/opentelemetry/testing/SingleRecordListener.java index bf05ad33f4bd..17f2c1057d04 100644 --- a/instrumentation/spring/spring-kafka-2.7/testing/src/main/java/io/opentelemetry/testing/SingleRecordListener.java +++ b/instrumentation/spring/spring-kafka-2.7/testing/src/main/java/io/opentelemetry/testing/SingleRecordListener.java @@ -10,6 +10,7 @@ import org.springframework.kafka.annotation.KafkaListener; public class SingleRecordListener { + private int failureCount; @KafkaListener( id = "testSingleListener", @@ -17,7 +18,8 @@ public class SingleRecordListener { containerFactory = "singleFactory") public void listener(ConsumerRecord record) { GlobalTraceUtil.runWithSpan("consumer", () -> {}); - if (record.value().equals("error")) { + if (record.value().equals("error") && failureCount < 2) { + failureCount++; throw new IllegalArgumentException("boom"); } }