diff --git a/instrumentation/reactor-3.1/javaagent/src/test/groovy/ReactorWithSpanInstrumentationTest.groovy b/instrumentation/reactor-3.1/javaagent/src/test/groovy/ReactorWithSpanInstrumentationTest.groovy index e2b0940cacaa..7601d489480b 100644 --- a/instrumentation/reactor-3.1/javaagent/src/test/groovy/ReactorWithSpanInstrumentationTest.groovy +++ b/instrumentation/reactor-3.1/javaagent/src/test/groovy/ReactorWithSpanInstrumentationTest.groovy @@ -3,8 +3,11 @@ * SPDX-License-Identifier: Apache-2.0 */ +import io.opentelemetry.api.GlobalOpenTelemetry +import io.opentelemetry.api.trace.Span import io.opentelemetry.api.trace.SpanKind import io.opentelemetry.api.trace.StatusCode +import io.opentelemetry.context.Scope import io.opentelemetry.instrumentation.reactor.TracedWithSpan import io.opentelemetry.instrumentation.test.AgentInstrumentationSpecification import reactor.core.publisher.Flux @@ -70,6 +73,134 @@ class ReactorWithSpanInstrumentationTest extends AgentInstrumentationSpecificati } } + def "should capture nested Mono spans"() { + setup: + def mono = Mono.defer({ -> + Span span = GlobalOpenTelemetry.getTracer("test").spanBuilder("inner-manual").startSpan() + span.end() + return Mono.just("Value") + }) + + def result = new TracedWithSpan() + .outer(mono) + + StepVerifier.create(result) + .expectNext("Value") + .verifyComplete() + + expect: + assertTraces(1) { + trace(0, 3) { + span(0) { + name "TracedWithSpan.outer" + kind SpanKind.INTERNAL + hasNoParent() + attributes { + } + } + span(1) { + name "TracedWithSpan.mono" + kind SpanKind.INTERNAL + childOf span(0) + attributes { + } + } + span(2) { + name "inner-manual" + kind SpanKind.INTERNAL + childOf span(1) + attributes { + } + } + } + } + } + + def "should capture nested spans from current"() { + setup: + Span parent = GlobalOpenTelemetry.getTracer("test") + .spanBuilder("parent").startSpan() + + Scope scope = parent.makeCurrent() + + def result = new TracedWithSpan() + .mono(Mono.defer({ -> + Span inner = GlobalOpenTelemetry.getTracer("test").spanBuilder("inner-manual").startSpan() + inner.end() + return Mono.just("Value") + })) + + StepVerifier.create(result) + .expectNext("Value") + .verifyComplete() + + scope.close() + parent.end() + + expect: + assertTraces(1) { + trace(0, 3) { + span(0) { + name "parent" + kind SpanKind.INTERNAL + hasNoParent() + attributes { + } + } + span(1) { + name "TracedWithSpan.mono" + kind SpanKind.INTERNAL + childOf span(0) + attributes { + } + } + span(2) { + name "inner-manual" + kind SpanKind.INTERNAL + childOf span(1) + attributes { + } + } + } + } + } + + def "should capture nested Flux spans"() { + setup: + def mono = Flux.defer({ -> + Span span = GlobalOpenTelemetry.getTracer("test").spanBuilder("inner-manual").startSpan() + span.end() + return Flux.just("Value") + }) + + def result = new TracedWithSpan() + .flux(mono) + + StepVerifier.create(result) + .expectNext("Value") + .verifyComplete() + + expect: + assertTraces(1) { + trace(0, 2) { + span(0) { + name "TracedWithSpan.flux" + kind SpanKind.INTERNAL + hasNoParent() + attributes { + } + } + span(1) { + name "inner-manual" + kind SpanKind.INTERNAL + childOf span(0) + attributes { + } + } + } + } + } + def "should capture span for already errored Mono"() { setup: def error = new IllegalArgumentException("Boom") diff --git a/instrumentation/reactor-3.1/javaagent/src/test/java/io/opentelemetry/instrumentation/reactor/TracedWithSpan.java b/instrumentation/reactor-3.1/javaagent/src/test/java/io/opentelemetry/instrumentation/reactor/TracedWithSpan.java index e9cdb703945e..8abfb52b0652 100644 --- a/instrumentation/reactor-3.1/javaagent/src/test/java/io/opentelemetry/instrumentation/reactor/TracedWithSpan.java +++ b/instrumentation/reactor-3.1/javaagent/src/test/java/io/opentelemetry/instrumentation/reactor/TracedWithSpan.java @@ -15,6 +15,11 @@ public Mono mono(Mono mono) { return mono; } + @WithSpan + public Mono outer(Mono inner) { + return mono(inner); + } + @WithSpan public Flux flux(Flux flux) { return flux; diff --git a/instrumentation/reactor-3.1/library/src/main/java/io/opentelemetry/instrumentation/reactor/ReactorAsyncOperationEndStrategy.java b/instrumentation/reactor-3.1/library/src/main/java/io/opentelemetry/instrumentation/reactor/ReactorAsyncOperationEndStrategy.java index 5e2f7aab0d53..c151408b12ee 100644 --- a/instrumentation/reactor-3.1/library/src/main/java/io/opentelemetry/instrumentation/reactor/ReactorAsyncOperationEndStrategy.java +++ b/instrumentation/reactor-3.1/library/src/main/java/io/opentelemetry/instrumentation/reactor/ReactorAsyncOperationEndStrategy.java @@ -59,14 +59,18 @@ protected void end(Object result, Throwable error) { if (asyncValue instanceof Mono) { Mono mono = (Mono) asyncValue; - return mono.doOnError(notificationConsumer) + + return TracingOperator.runInScope(mono, context) + .doOnError(notificationConsumer) .doOnSuccess(notificationConsumer::onSuccess) .doOnCancel(notificationConsumer::onCancel); } else { Flux flux = Flux.from((Publisher) asyncValue); - return flux.doOnError(notificationConsumer) + return TracingOperator.runInScope(flux, context) + .doOnError(notificationConsumer) .doOnComplete(notificationConsumer) - .doOnCancel(notificationConsumer::onCancel); + .doOnCancel(notificationConsumer::onCancel) + .subscriberContext(ctx -> TracingOperator.storeInContext(ctx, context)); } } diff --git a/instrumentation/reactor-3.1/library/src/main/java/io/opentelemetry/instrumentation/reactor/TracingOperator.java b/instrumentation/reactor-3.1/library/src/main/java/io/opentelemetry/instrumentation/reactor/TracingOperator.java index 84d497f8bd6f..da1936fc35a0 100644 --- a/instrumentation/reactor-3.1/library/src/main/java/io/opentelemetry/instrumentation/reactor/TracingOperator.java +++ b/instrumentation/reactor-3.1/library/src/main/java/io/opentelemetry/instrumentation/reactor/TracingOperator.java @@ -31,7 +31,9 @@ import reactor.core.CoreSubscriber; import reactor.core.Fuseable; import reactor.core.Scannable; +import reactor.core.publisher.Flux; import reactor.core.publisher.Hooks; +import reactor.core.publisher.Mono; import reactor.core.publisher.Operators; /** Based on Spring Sleuth's Reactor instrumentation. */ @@ -45,6 +47,13 @@ public static TracingOperatorBuilder newBuilder() { return new TracingOperatorBuilder(); } + private static final ContextKey TRACE_CONTEXT_KEY = + ContextKey.named("otel-trace-context"); + + // have to be re-initialized after hooks are set/reset + private static Mono dummyMono = Mono.just(""); + private static Flux dummyFlux = Flux.just(""); + private final ReactorAsyncOperationEndStrategy asyncOperationEndStrategy; TracingOperator(boolean captureExperimentalSpanAttributes) { @@ -54,12 +63,29 @@ public static TracingOperatorBuilder newBuilder() { .build(); } - private static final ContextKey TRACE_CONTEXT_KEY = - ContextKey.named("otel-trace-context"); + /** Forces Mono to run in traceContext scope. */ + static Mono runInScope(Mono publisher, Context tracingContext) { + // this hack forces 'publisher' to run in the onNext callback of `TracingSubscriber` + // created for this publisher and with current() span that refer to span created here + // without the hack, publisher runs in the onAssembly stage, before traceContext is made current + return dummyMono + .flatMap(i -> publisher) + .subscriberContext(ctx -> storeInContext(ctx, tracingContext)); + } + + /** Forces Flux to run in traceContext scope. */ + static Flux runInScope(Flux publisher, Context tracingContext) { + // this hack forces 'publisher' to run in the onNext callback of `TracingSubscriber` + // created for this publisher and with current() span that refer to span created here + // without the hack, publisher runs in the onAssembly stage, before traceContext is made current + return dummyFlux + .flatMap(i -> publisher) + .subscriberContext(ctx -> storeInContext(ctx, tracingContext)); + } /** * Stores Trace {@link io.opentelemetry.context.Context} in Reactor {@link - * reactor.util.context.Context} + * reactor.util.context.Context}. * * @param context Reactor's context to store trace context in. * @param traceContext Trace context to be stored. @@ -71,20 +97,16 @@ public static reactor.util.context.Context storeInContext( /** * Gets Trace {@link io.opentelemetry.context.Context} from Reactor {@link - * reactor.util.context.Context} + * reactor.util.context.Context}. * * @param context Reactor's context to get trace context from. - * @param traceContext Default value to be returned if no trace context is found on Reactor + * @param defaultTraceContext Default value to be returned if no trace context is found on Reactor * context. - * @return Trace context or null. + * @return Trace context or default value. */ public static Context fromContextOrDefault( - reactor.util.context.Context context, Context traceContext) { - if (context == null) { - return traceContext; - } - - return context.getOrDefault(TRACE_CONTEXT_KEY, traceContext); + reactor.util.context.Context context, Context defaultTraceContext) { + return context.getOrDefault(TRACE_CONTEXT_KEY, defaultTraceContext); } /** @@ -96,12 +118,16 @@ public static Context fromContextOrDefault( public void registerOnEachOperator() { Hooks.onEachOperator(TracingSubscriber.class.getName(), tracingLift(asyncOperationEndStrategy)); AsyncOperationEndStrategies.instance().registerStrategy(asyncOperationEndStrategy); + dummyMono = Mono.just(""); + dummyFlux = Flux.just(""); } /** Unregisters the hook registered by {@link #registerOnEachOperator()}. */ public void resetOnEachOperator() { Hooks.resetOnEachOperator(TracingSubscriber.class.getName()); AsyncOperationEndStrategies.instance().unregisterStrategy(asyncOperationEndStrategy); + dummyMono = Mono.just(""); + dummyFlux = Flux.just(""); } private static Function, ? extends Publisher> tracingLift( diff --git a/instrumentation/reactor-3.1/library/src/test/groovy/io/opentelemetry/instrumentation/reactor/ReactorCoreTest.groovy b/instrumentation/reactor-3.1/library/src/test/groovy/io/opentelemetry/instrumentation/reactor/ReactorCoreTest.groovy index 988914d33053..82c8a2cde602 100644 --- a/instrumentation/reactor-3.1/library/src/test/groovy/io/opentelemetry/instrumentation/reactor/ReactorCoreTest.groovy +++ b/instrumentation/reactor-3.1/library/src/test/groovy/io/opentelemetry/instrumentation/reactor/ReactorCoreTest.groovy @@ -29,7 +29,7 @@ class ReactorCoreTest extends AbstractReactorCoreTest implements LibraryTestTrai when: runWithSpan({ return publisherSupplier().transform({ publisher -> traceNonBlocking(publisher, "inner")}) - }); + }) then: assertTraces(1) { @@ -126,7 +126,7 @@ class ReactorCoreTest extends AbstractReactorCoreTest implements LibraryTestTrai .doOnEach({ signal -> if (signal.isOnError()) { // reactor 3.1 does not support getting context here yet - Span.current().setStatus(status); + Span.current().setStatus(status) Span.current().end() } else if (signal.isOnComplete()) { Span.current().end() @@ -143,7 +143,7 @@ class ReactorCoreTest extends AbstractReactorCoreTest implements LibraryTestTrai .startSpan() return TracingOperator.storeInContext(ctx, parent.with(innerSpan)) - }); + }) } def getDummy(def publisher) {