From 9a259ee90de13bdc45ca4120e10bfe5a3fafc53f Mon Sep 17 00:00:00 2001 From: Liudmila Molkova Date: Tue, 12 Oct 2021 11:02:57 -0700 Subject: [PATCH] Allow reactor instrumentation to pick up spans from reactor context (#4159) * Allow reactor instrumentation to pick up spans from reactor context in addition to current tracing context * And fix nested WithSpan with defer * up * Review comments * move ReactorTracing to Operation and rename to ContextPropagationOperator * fix build * Add reactor-netty client test nested under WithSpan * Add link to the issue in comments * clean up --- .../v5_1/LettuceReactiveClientTest.groovy | 4 +- .../reactor/HooksInstrumentation.java | 4 +- .../ReactorWithSpanInstrumentationTest.groovy | 131 ++++++++++++ .../reactor/TracedWithSpan.java | 5 + ...r.java => ContextPropagationOperator.java} | 77 ++++++- ...=> ContextPropagationOperatorBuilder.java} | 10 +- .../ReactorAsyncOperationEndStrategy.java | 7 +- .../reactor/TracingSubscriber.java | 4 +- .../instrumentation/reactor/HooksTest.groovy | 2 +- .../reactor/ReactorCoreTest.groovy | 193 +++++++++++++++++- .../reactor/SubscriptionTest.groovy | 2 +- .../javaagent/build.gradle.kts | 4 + .../groovy/ReactorNettyWithSpanTest.groovy | 81 ++++++++ .../test/reactor/netty/TracedWithSpan.java | 17 ++ 14 files changed, 520 insertions(+), 21 deletions(-) rename instrumentation/reactor-3.1/library/src/main/java/io/opentelemetry/instrumentation/reactor/{TracingOperator.java => ContextPropagationOperator.java} (53%) rename instrumentation/reactor-3.1/library/src/main/java/io/opentelemetry/instrumentation/reactor/{TracingOperatorBuilder.java => ContextPropagationOperatorBuilder.java} (53%) create mode 100644 instrumentation/reactor-netty/reactor-netty-1.0/javaagent/src/test/groovy/ReactorNettyWithSpanTest.groovy create mode 100644 instrumentation/reactor-netty/reactor-netty-1.0/javaagent/src/test/java/io/opentelemetry/test/reactor/netty/TracedWithSpan.java diff --git a/instrumentation/lettuce/lettuce-5.1/library/src/test/groovy/io/opentelemetry/instrumentation/lettuce/v5_1/LettuceReactiveClientTest.groovy b/instrumentation/lettuce/lettuce-5.1/library/src/test/groovy/io/opentelemetry/instrumentation/lettuce/v5_1/LettuceReactiveClientTest.groovy index af26609719d7..021e4c859516 100644 --- a/instrumentation/lettuce/lettuce-5.1/library/src/test/groovy/io/opentelemetry/instrumentation/lettuce/v5_1/LettuceReactiveClientTest.groovy +++ b/instrumentation/lettuce/lettuce-5.1/library/src/test/groovy/io/opentelemetry/instrumentation/lettuce/v5_1/LettuceReactiveClientTest.groovy @@ -7,13 +7,13 @@ package io.opentelemetry.instrumentation.lettuce.v5_1 import io.lettuce.core.RedisClient import io.lettuce.core.resource.ClientResources -import io.opentelemetry.instrumentation.reactor.TracingOperator +import io.opentelemetry.instrumentation.reactor.ContextPropagationOperator import io.opentelemetry.instrumentation.test.LibraryTestTrait import spock.lang.Shared class LettuceReactiveClientTest extends AbstractLettuceReactiveClientTest implements LibraryTestTrait { @Shared - TracingOperator tracingOperator = TracingOperator.create() + ContextPropagationOperator tracingOperator = ContextPropagationOperator.create() @Override RedisClient createClient(String uri) { diff --git a/instrumentation/reactor-3.1/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactor/HooksInstrumentation.java b/instrumentation/reactor-3.1/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactor/HooksInstrumentation.java index 624818ef7abf..679cbf1528f7 100644 --- a/instrumentation/reactor-3.1/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactor/HooksInstrumentation.java +++ b/instrumentation/reactor-3.1/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactor/HooksInstrumentation.java @@ -9,7 +9,7 @@ import static net.bytebuddy.matcher.ElementMatchers.named; import io.opentelemetry.instrumentation.api.config.Config; -import io.opentelemetry.instrumentation.reactor.TracingOperator; +import io.opentelemetry.instrumentation.reactor.ContextPropagationOperator; import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer; import net.bytebuddy.asm.Advice; @@ -34,7 +34,7 @@ public static class ResetOnEachOperatorAdvice { @Advice.OnMethodExit(suppress = Throwable.class) public static void postStaticInitializer() { - TracingOperator.newBuilder() + ContextPropagationOperator.newBuilder() .setCaptureExperimentalSpanAttributes( Config.get() .getBoolean("otel.instrumentation.reactor.experimental-span-attributes", false)) diff --git a/instrumentation/reactor-3.1/javaagent/src/test/groovy/ReactorWithSpanInstrumentationTest.groovy b/instrumentation/reactor-3.1/javaagent/src/test/groovy/ReactorWithSpanInstrumentationTest.groovy index e2b0940cacaa..7601d489480b 100644 --- a/instrumentation/reactor-3.1/javaagent/src/test/groovy/ReactorWithSpanInstrumentationTest.groovy +++ b/instrumentation/reactor-3.1/javaagent/src/test/groovy/ReactorWithSpanInstrumentationTest.groovy @@ -3,8 +3,11 @@ * SPDX-License-Identifier: Apache-2.0 */ +import io.opentelemetry.api.GlobalOpenTelemetry +import io.opentelemetry.api.trace.Span import io.opentelemetry.api.trace.SpanKind import io.opentelemetry.api.trace.StatusCode +import io.opentelemetry.context.Scope import io.opentelemetry.instrumentation.reactor.TracedWithSpan import io.opentelemetry.instrumentation.test.AgentInstrumentationSpecification import reactor.core.publisher.Flux @@ -70,6 +73,134 @@ class ReactorWithSpanInstrumentationTest extends AgentInstrumentationSpecificati } } + def "should capture nested Mono spans"() { + setup: + def mono = Mono.defer({ -> + Span span = GlobalOpenTelemetry.getTracer("test").spanBuilder("inner-manual").startSpan() + span.end() + return Mono.just("Value") + }) + + def result = new TracedWithSpan() + .outer(mono) + + StepVerifier.create(result) + .expectNext("Value") + .verifyComplete() + + expect: + assertTraces(1) { + trace(0, 3) { + span(0) { + name "TracedWithSpan.outer" + kind SpanKind.INTERNAL + hasNoParent() + attributes { + } + } + span(1) { + name "TracedWithSpan.mono" + kind SpanKind.INTERNAL + childOf span(0) + attributes { + } + } + span(2) { + name "inner-manual" + kind SpanKind.INTERNAL + childOf span(1) + attributes { + } + } + } + } + } + + def "should capture nested spans from current"() { + setup: + Span parent = GlobalOpenTelemetry.getTracer("test") + .spanBuilder("parent").startSpan() + + Scope scope = parent.makeCurrent() + + def result = new TracedWithSpan() + .mono(Mono.defer({ -> + Span inner = GlobalOpenTelemetry.getTracer("test").spanBuilder("inner-manual").startSpan() + inner.end() + return Mono.just("Value") + })) + + StepVerifier.create(result) + .expectNext("Value") + .verifyComplete() + + scope.close() + parent.end() + + expect: + assertTraces(1) { + trace(0, 3) { + span(0) { + name "parent" + kind SpanKind.INTERNAL + hasNoParent() + attributes { + } + } + span(1) { + name "TracedWithSpan.mono" + kind SpanKind.INTERNAL + childOf span(0) + attributes { + } + } + span(2) { + name "inner-manual" + kind SpanKind.INTERNAL + childOf span(1) + attributes { + } + } + } + } + } + + def "should capture nested Flux spans"() { + setup: + def mono = Flux.defer({ -> + Span span = GlobalOpenTelemetry.getTracer("test").spanBuilder("inner-manual").startSpan() + span.end() + return Flux.just("Value") + }) + + def result = new TracedWithSpan() + .flux(mono) + + StepVerifier.create(result) + .expectNext("Value") + .verifyComplete() + + expect: + assertTraces(1) { + trace(0, 2) { + span(0) { + name "TracedWithSpan.flux" + kind SpanKind.INTERNAL + hasNoParent() + attributes { + } + } + span(1) { + name "inner-manual" + kind SpanKind.INTERNAL + childOf span(0) + attributes { + } + } + } + } + } + def "should capture span for already errored Mono"() { setup: def error = new IllegalArgumentException("Boom") diff --git a/instrumentation/reactor-3.1/javaagent/src/test/java/io/opentelemetry/instrumentation/reactor/TracedWithSpan.java b/instrumentation/reactor-3.1/javaagent/src/test/java/io/opentelemetry/instrumentation/reactor/TracedWithSpan.java index e9cdb703945e..8abfb52b0652 100644 --- a/instrumentation/reactor-3.1/javaagent/src/test/java/io/opentelemetry/instrumentation/reactor/TracedWithSpan.java +++ b/instrumentation/reactor-3.1/javaagent/src/test/java/io/opentelemetry/instrumentation/reactor/TracedWithSpan.java @@ -15,6 +15,11 @@ public Mono mono(Mono mono) { return mono; } + @WithSpan + public Mono outer(Mono inner) { + return mono(inner); + } + @WithSpan public Flux flux(Flux flux) { return flux; diff --git a/instrumentation/reactor-3.1/library/src/main/java/io/opentelemetry/instrumentation/reactor/TracingOperator.java b/instrumentation/reactor-3.1/library/src/main/java/io/opentelemetry/instrumentation/reactor/ContextPropagationOperator.java similarity index 53% rename from instrumentation/reactor-3.1/library/src/main/java/io/opentelemetry/instrumentation/reactor/TracingOperator.java rename to instrumentation/reactor-3.1/library/src/main/java/io/opentelemetry/instrumentation/reactor/ContextPropagationOperator.java index cedbadae8173..0de6fec7daa1 100644 --- a/instrumentation/reactor-3.1/library/src/main/java/io/opentelemetry/instrumentation/reactor/TracingOperator.java +++ b/instrumentation/reactor-3.1/library/src/main/java/io/opentelemetry/instrumentation/reactor/ContextPropagationOperator.java @@ -30,23 +30,62 @@ import reactor.core.CoreSubscriber; import reactor.core.Fuseable; import reactor.core.Scannable; +import reactor.core.publisher.Flux; import reactor.core.publisher.Hooks; +import reactor.core.publisher.Mono; import reactor.core.publisher.Operators; /** Based on Spring Sleuth's Reactor instrumentation. */ -public final class TracingOperator { +public final class ContextPropagationOperator { - public static TracingOperator create() { + public static ContextPropagationOperator create() { return newBuilder().build(); } - public static TracingOperatorBuilder newBuilder() { - return new TracingOperatorBuilder(); + public static ContextPropagationOperatorBuilder newBuilder() { + return new ContextPropagationOperatorBuilder(); } private final ReactorAsyncOperationEndStrategy asyncOperationEndStrategy; - TracingOperator(boolean captureExperimentalSpanAttributes) { + private static final Object TRACE_CONTEXT_KEY = + new Object() { + @Override + public String toString() { + return "otel-trace-context"; + } + }; + + private static volatile Mono dummyMono = Mono.just(""); + private static volatile Flux dummyFlux = Flux.just(""); + + /** + * Stores Trace {@link io.opentelemetry.context.Context} in Reactor {@link + * reactor.util.context.Context}. + * + * @param context Reactor's context to store trace context in. + * @param traceContext Trace context to be stored. + */ + public static reactor.util.context.Context storeOpenTelemetryContext( + reactor.util.context.Context context, Context traceContext) { + return context.put(TRACE_CONTEXT_KEY, traceContext); + } + + /** + * Gets Trace {@link io.opentelemetry.context.Context} from Reactor {@link + * reactor.util.context.Context}. + * + * @param context Reactor's context to get trace context from. + * @param defaultTraceContext Default value to be returned if no trace context is found on Reactor + * context. + * @return Trace context or default value. + */ + public static Context getOpenTelemetryContext( + reactor.util.context.Context context, Context defaultTraceContext) { + return context.getOrDefault(TRACE_CONTEXT_KEY, defaultTraceContext); + } + + ContextPropagationOperator(boolean captureExperimentalSpanAttributes) { this.asyncOperationEndStrategy = ReactorAsyncOperationEndStrategy.newBuilder() .setCaptureExperimentalSpanAttributes(captureExperimentalSpanAttributes) @@ -62,12 +101,14 @@ public static TracingOperatorBuilder newBuilder() { public void registerOnEachOperator() { Hooks.onEachOperator(TracingSubscriber.class.getName(), tracingLift(asyncOperationEndStrategy)); AsyncOperationEndStrategies.instance().registerStrategy(asyncOperationEndStrategy); + resetDummy(); } /** Unregisters the hook registered by {@link #registerOnEachOperator()}. */ public void resetOnEachOperator() { Hooks.resetOnEachOperator(TracingSubscriber.class.getName()); AsyncOperationEndStrategies.instance().unregisterStrategy(asyncOperationEndStrategy); + resetDummy(); } private static Function, ? extends Publisher> tracingLift( @@ -75,6 +116,32 @@ public void resetOnEachOperator() { return Operators.lift(new Lifter<>(asyncOperationEndStrategy)); } + /** Forces Mono to run in traceContext scope. */ + static Mono runWithContext(Mono publisher, Context tracingContext) { + // this hack forces 'publisher' to run in the onNext callback of `TracingSubscriber` + // (created for this publisher) and with current() span that refers to span created here + // without the hack, publisher runs in the onAssembly stage, before traceContext is made current + return dummyMono + .flatMap(i -> publisher) + .subscriberContext(ctx -> storeOpenTelemetryContext(ctx, tracingContext)); + } + + /** Forces Flux to run in traceContext scope. */ + static Flux runWithContext(Flux publisher, Context tracingContext) { + // this hack forces 'publisher' to run in the onNext callback of `TracingSubscriber` + // (created for this publisher) and with current() span that refers to span created here + // without the hack, publisher runs in the onAssembly stage, before traceContext is made current + return dummyFlux + .flatMap(i -> publisher) + .subscriberContext(ctx -> storeOpenTelemetryContext(ctx, tracingContext)); + } + + private static synchronized void resetDummy() { + // have to be reset as they capture async strategy and lift + dummyMono = Mono.just(""); + dummyFlux = Flux.just(""); + } + public static class Lifter implements BiFunction, CoreSubscriber> { diff --git a/instrumentation/reactor-3.1/library/src/main/java/io/opentelemetry/instrumentation/reactor/TracingOperatorBuilder.java b/instrumentation/reactor-3.1/library/src/main/java/io/opentelemetry/instrumentation/reactor/ContextPropagationOperatorBuilder.java similarity index 53% rename from instrumentation/reactor-3.1/library/src/main/java/io/opentelemetry/instrumentation/reactor/TracingOperatorBuilder.java rename to instrumentation/reactor-3.1/library/src/main/java/io/opentelemetry/instrumentation/reactor/ContextPropagationOperatorBuilder.java index 99889cfa3cc2..ebe4f255fab7 100644 --- a/instrumentation/reactor-3.1/library/src/main/java/io/opentelemetry/instrumentation/reactor/TracingOperatorBuilder.java +++ b/instrumentation/reactor-3.1/library/src/main/java/io/opentelemetry/instrumentation/reactor/ContextPropagationOperatorBuilder.java @@ -5,18 +5,18 @@ package io.opentelemetry.instrumentation.reactor; -public final class TracingOperatorBuilder { +public final class ContextPropagationOperatorBuilder { private boolean captureExperimentalSpanAttributes; - TracingOperatorBuilder() {} + ContextPropagationOperatorBuilder() {} - public TracingOperatorBuilder setCaptureExperimentalSpanAttributes( + public ContextPropagationOperatorBuilder setCaptureExperimentalSpanAttributes( boolean captureExperimentalSpanAttributes) { this.captureExperimentalSpanAttributes = captureExperimentalSpanAttributes; return this; } - public TracingOperator build() { - return new TracingOperator(captureExperimentalSpanAttributes); + public ContextPropagationOperator build() { + return new ContextPropagationOperator(captureExperimentalSpanAttributes); } } diff --git a/instrumentation/reactor-3.1/library/src/main/java/io/opentelemetry/instrumentation/reactor/ReactorAsyncOperationEndStrategy.java b/instrumentation/reactor-3.1/library/src/main/java/io/opentelemetry/instrumentation/reactor/ReactorAsyncOperationEndStrategy.java index 5e2f7aab0d53..110ed77c101f 100644 --- a/instrumentation/reactor-3.1/library/src/main/java/io/opentelemetry/instrumentation/reactor/ReactorAsyncOperationEndStrategy.java +++ b/instrumentation/reactor-3.1/library/src/main/java/io/opentelemetry/instrumentation/reactor/ReactorAsyncOperationEndStrategy.java @@ -59,12 +59,15 @@ protected void end(Object result, Throwable error) { if (asyncValue instanceof Mono) { Mono mono = (Mono) asyncValue; - return mono.doOnError(notificationConsumer) + + return ContextPropagationOperator.runWithContext(mono, context) + .doOnError(notificationConsumer) .doOnSuccess(notificationConsumer::onSuccess) .doOnCancel(notificationConsumer::onCancel); } else { Flux flux = Flux.from((Publisher) asyncValue); - return flux.doOnError(notificationConsumer) + return ContextPropagationOperator.runWithContext(flux, context) + .doOnError(notificationConsumer) .doOnComplete(notificationConsumer) .doOnCancel(notificationConsumer::onCancel); } diff --git a/instrumentation/reactor-3.1/library/src/main/java/io/opentelemetry/instrumentation/reactor/TracingSubscriber.java b/instrumentation/reactor-3.1/library/src/main/java/io/opentelemetry/instrumentation/reactor/TracingSubscriber.java index a851ce5412ca..fae0af530fa4 100644 --- a/instrumentation/reactor-3.1/library/src/main/java/io/opentelemetry/instrumentation/reactor/TracingSubscriber.java +++ b/instrumentation/reactor-3.1/library/src/main/java/io/opentelemetry/instrumentation/reactor/TracingSubscriber.java @@ -45,12 +45,12 @@ public TracingSubscriber( io.opentelemetry.context.Context contextToPropagate) { this.subscriber = subscriber; this.context = ctx; - this.traceContext = contextToPropagate; + this.traceContext = ContextPropagationOperator.getOpenTelemetryContext(ctx, contextToPropagate); } @Override public void onSubscribe(Subscription subscription) { - subscriber.onSubscribe(subscription); + withActiveSpan(() -> subscriber.onSubscribe(subscription)); } @Override diff --git a/instrumentation/reactor-3.1/library/src/test/groovy/io/opentelemetry/instrumentation/reactor/HooksTest.groovy b/instrumentation/reactor-3.1/library/src/test/groovy/io/opentelemetry/instrumentation/reactor/HooksTest.groovy index 70aa49658e3d..11143228ee99 100644 --- a/instrumentation/reactor-3.1/library/src/test/groovy/io/opentelemetry/instrumentation/reactor/HooksTest.groovy +++ b/instrumentation/reactor-3.1/library/src/test/groovy/io/opentelemetry/instrumentation/reactor/HooksTest.groovy @@ -15,7 +15,7 @@ class HooksTest extends LibraryInstrumentationSpecification { def "can reset out hooks"() { setup: - def underTest = TracingOperator.create() + def underTest = ContextPropagationOperator.create() AtomicReference subscriber = new AtomicReference<>() when: "no hook registered" diff --git a/instrumentation/reactor-3.1/library/src/test/groovy/io/opentelemetry/instrumentation/reactor/ReactorCoreTest.groovy b/instrumentation/reactor-3.1/library/src/test/groovy/io/opentelemetry/instrumentation/reactor/ReactorCoreTest.groovy index 9d7e65acbb71..6972fae28173 100644 --- a/instrumentation/reactor-3.1/library/src/test/groovy/io/opentelemetry/instrumentation/reactor/ReactorCoreTest.groovy +++ b/instrumentation/reactor-3.1/library/src/test/groovy/io/opentelemetry/instrumentation/reactor/ReactorCoreTest.groovy @@ -5,12 +5,19 @@ package io.opentelemetry.instrumentation.reactor +import io.opentelemetry.api.GlobalOpenTelemetry +import io.opentelemetry.api.trace.Span +import io.opentelemetry.api.trace.SpanKind +import io.opentelemetry.api.trace.StatusCode +import io.opentelemetry.context.Context import io.opentelemetry.instrumentation.test.LibraryTestTrait +import reactor.core.publisher.Flux +import reactor.core.publisher.Mono import spock.lang.Shared class ReactorCoreTest extends AbstractReactorCoreTest implements LibraryTestTrait { @Shared - TracingOperator tracingOperator = TracingOperator.create() + ContextPropagationOperator tracingOperator = ContextPropagationOperator.create() def setupSpec() { tracingOperator.registerOnEachOperator() @@ -19,4 +26,188 @@ class ReactorCoreTest extends AbstractReactorCoreTest implements LibraryTestTrai def cleanupSpec() { tracingOperator.resetOnEachOperator() } + + def "Current in non-blocking publisher assembly"() { + when: + runWithSpan({ + return publisherSupplier().transform({ publisher -> traceNonBlocking(publisher, "inner")}) + }) + + then: + assertTraces(1) { + trace(0, 3) { + span(0) { + name "trace-parent" + hasNoParent() + attributes { + } + } + + span(1) { + name "publisher-parent" + kind SpanKind.INTERNAL + childOf span(0) + } + + span(2) { + name "inner" + childOf span(1) + attributes { + "inner" "foo" + } + } + } + } + + where: + paramName | publisherSupplier + "basic mono" | { -> Mono.fromCallable({ i -> + Span.current().setAttribute("inner", "foo") + return 1 + }) } + "basic flux" | { -> Flux.defer({ + Span.current().setAttribute("inner", "foo") + return Flux.just([5,6].toArray()) + })} + } + + def "Nested non-blocking"() { + when: + def result = runWithSpan({ + Mono.defer({ -> + Span.current().setAttribute("middle", "foo") + return Mono.fromCallable({ -> + Span.current().setAttribute("inner", "bar") + return 1 + }) + .transform({ i -> traceNonBlocking(i, "inner") }) + }) + .transform({ m -> traceNonBlocking(m, "middle")}) + }) + + then: + result == 1 + and: + assertTraces(1) { + trace(0, 4) { + span(0) { + name "trace-parent" + hasNoParent() + attributes { + } + } + + span(1) { + name "publisher-parent" + kind SpanKind.INTERNAL + childOf span(0) + } + + span(2) { + name "middle" + childOf span(1) + attributes { + "middle" "foo" + } + } + + span(3) { + name "inner" + childOf span(2) + attributes { + "inner" "bar" + } + } + } + } + } + + + def "No tracing before registration"() { + when: + tracingOperator.resetOnEachOperator() + + def result1 = Mono.fromCallable({ -> + assert !Span.current().getSpanContext().isValid() : "current span is not set" + return 1 + }) + .transform({ i -> + + def beforeSpan = GlobalOpenTelemetry.getTracer("test").spanBuilder("before").startSpan() + + return ContextPropagationOperator + .runWithContext(i, Context.root().with(beforeSpan)) + .doOnEach({ signal -> + assert !Span.current().getSpanContext().isValid() : "current span is not set" + })}).block() + + tracingOperator.registerOnEachOperator() + def result2 = Mono.fromCallable({ -> + assert Span.current().getSpanContext().isValid() : "current span is set" + return 2 + }) + .transform({ i -> + + def afterSpan = GlobalOpenTelemetry.getTracer("test").spanBuilder("after").startSpan() + + return ContextPropagationOperator + .runWithContext(i, Context.root().with(afterSpan)) + .doOnEach({ signal -> + assert Span.current().getSpanContext().isValid() : "current span is set" + if (signal.isOnComplete()) { + Span.current().end()n + + } + })}).block() + + then: + result1 == 1 + result2 == 2 + and: + assertTraces(1) { + trace(0, 1) { + span(0) { + name "after" + hasNoParent() + attributes { + } + } + } + } + } + + def traceNonBlocking(def publisher, def spanName) { + return getDummy(publisher) + .flatMap({ i -> publisher }) + .doOnEach({ signal -> + if (signal.isOnError()) { + // reactor 3.1 does not support getting context here yet + Span.current().setStatus(StatusCode.ERROR) + Span.current().end() + } else if (signal.isOnComplete()) { + Span.current().end() + } + }) + .subscriberContext({ ctx -> + + def parent = ContextPropagationOperator.getOpenTelemetryContext(ctx, Context.current()) + + def innerSpan = GlobalOpenTelemetry.getTracer("test") + .spanBuilder(spanName) + .setParent(parent) + .startSpan() + + return ContextPropagationOperator.storeOpenTelemetryContext(ctx, parent.with(innerSpan)) + }) + } + + def getDummy(def publisher) { + if (publisher instanceof Mono) { + return Mono.just("") + } else if (publisher instanceof Flux) { + return Flux.just("") + } + + throw new IllegalStateException("Unknown publisher") + } } diff --git a/instrumentation/reactor-3.1/library/src/test/groovy/io/opentelemetry/instrumentation/reactor/SubscriptionTest.groovy b/instrumentation/reactor-3.1/library/src/test/groovy/io/opentelemetry/instrumentation/reactor/SubscriptionTest.groovy index 9bfc899474a6..a2653bdd9b6d 100644 --- a/instrumentation/reactor-3.1/library/src/test/groovy/io/opentelemetry/instrumentation/reactor/SubscriptionTest.groovy +++ b/instrumentation/reactor-3.1/library/src/test/groovy/io/opentelemetry/instrumentation/reactor/SubscriptionTest.groovy @@ -10,7 +10,7 @@ import spock.lang.Shared class SubscriptionTest extends AbstractSubscriptionTest implements LibraryTestTrait { @Shared - TracingOperator tracingOperator = TracingOperator.create() + ContextPropagationOperator tracingOperator = ContextPropagationOperator.create() def setupSpec() { tracingOperator.registerOnEachOperator() diff --git a/instrumentation/reactor-netty/reactor-netty-1.0/javaagent/build.gradle.kts b/instrumentation/reactor-netty/reactor-netty-1.0/javaagent/build.gradle.kts index 5453a23f29b5..a4b9fc91ea89 100644 --- a/instrumentation/reactor-netty/reactor-netty-1.0/javaagent/build.gradle.kts +++ b/instrumentation/reactor-netty/reactor-netty-1.0/javaagent/build.gradle.kts @@ -23,6 +23,10 @@ dependencies { testInstrumentation(project(":instrumentation:reactor-netty:reactor-netty-0.9:javaagent")) testInstrumentation(project(":instrumentation:netty:netty-4.1:javaagent")) testInstrumentation(project(":instrumentation:reactor-3.1:javaagent")) + + testLibrary("io.projectreactor:reactor-test:3.1.0.RELEASE") + testImplementation("io.opentelemetry:opentelemetry-extension-annotations") + testInstrumentation(project(":instrumentation:opentelemetry-annotations-1.0:javaagent")) } tasks { diff --git a/instrumentation/reactor-netty/reactor-netty-1.0/javaagent/src/test/groovy/ReactorNettyWithSpanTest.groovy b/instrumentation/reactor-netty/reactor-netty-1.0/javaagent/src/test/groovy/ReactorNettyWithSpanTest.groovy new file mode 100644 index 000000000000..51a80bed60fc --- /dev/null +++ b/instrumentation/reactor-netty/reactor-netty-1.0/javaagent/src/test/groovy/ReactorNettyWithSpanTest.groovy @@ -0,0 +1,81 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +import io.opentelemetry.instrumentation.test.AgentTestTrait +import io.opentelemetry.instrumentation.test.InstrumentationSpecification +import io.opentelemetry.instrumentation.testing.junit.http.HttpClientTestServer +import io.opentelemetry.test.reactor.netty.TracedWithSpan +import reactor.core.publisher.Mono +import reactor.netty.http.client.HttpClient +import reactor.test.StepVerifier +import spock.lang.Shared + +import java.time.Duration + +import static io.opentelemetry.api.trace.SpanKind.CLIENT +import static io.opentelemetry.api.trace.SpanKind.INTERNAL +import static io.opentelemetry.api.trace.SpanKind.SERVER + +class ReactorNettyWithSpanTest extends InstrumentationSpecification implements AgentTestTrait { + + @Shared + private HttpClientTestServer server + + def setupSpec() { + server = new HttpClientTestServer(openTelemetry) + server.start() + } + + def cleanupSpec() { + server.stop() + } + + def "test successful nested under WithSpan"() { + when: + def httpClient = HttpClient.create() + + def httpRequest = Mono.defer({ -> + httpClient.get().uri("http://localhost:${server.httpPort()}/success") + .responseSingle ({ resp, content -> + // Make sure to consume content since that's when we close the span. + content.map { resp } + }) + .map({ r -> r.status().code() }) + }) + + def getResponse = new TracedWithSpan().mono( + // https://github.com/open-telemetry/opentelemetry-java-instrumentation/issues/4348 + // our HTTP server is synchronous, i.e. it returns Mono.just with response + // which is not supported by TracingSubscriber - it does not instrument scalar calls + // so we delay here to fake async http request and let Reactor context instrumentation work + Mono.delay(Duration.ofMillis(1)).then(httpRequest)) + + then: + StepVerifier.create(getResponse) + .expectNext(200) + .expectComplete() + .verify() + + assertTraces(1) { + trace(0, 3) { + span(0) { + name "TracedWithSpan.mono" + kind INTERNAL + hasNoParent() + } + span(1) { + name "HTTP GET" + kind CLIENT + childOf(span(0)) + } + span(2) { + name "test-http-server" + kind SERVER + childOf(span(1)) + } + } + } + } +} diff --git a/instrumentation/reactor-netty/reactor-netty-1.0/javaagent/src/test/java/io/opentelemetry/test/reactor/netty/TracedWithSpan.java b/instrumentation/reactor-netty/reactor-netty-1.0/javaagent/src/test/java/io/opentelemetry/test/reactor/netty/TracedWithSpan.java new file mode 100644 index 000000000000..7a208056e28d --- /dev/null +++ b/instrumentation/reactor-netty/reactor-netty-1.0/javaagent/src/test/java/io/opentelemetry/test/reactor/netty/TracedWithSpan.java @@ -0,0 +1,17 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.test.reactor.netty; + +import io.opentelemetry.extension.annotations.WithSpan; +import reactor.core.publisher.Mono; + +public class TracedWithSpan { + + @WithSpan + public Mono mono(Mono mono) { + return mono; + } +}