Skip to content

Commit

Permalink
And fix nested WithSpan with defer
Browse files Browse the repository at this point in the history
  • Loading branch information
lmolkova committed Oct 4, 2021
1 parent ace2614 commit 38afb0b
Show file tree
Hide file tree
Showing 5 changed files with 184 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,11 @@
* SPDX-License-Identifier: Apache-2.0
*/

import io.opentelemetry.api.GlobalOpenTelemetry
import io.opentelemetry.api.trace.Span
import io.opentelemetry.api.trace.SpanKind
import io.opentelemetry.api.trace.StatusCode
import io.opentelemetry.context.Scope
import io.opentelemetry.instrumentation.reactor.TracedWithSpan
import io.opentelemetry.instrumentation.test.AgentInstrumentationSpecification
import reactor.core.publisher.Flux
Expand Down Expand Up @@ -70,6 +73,134 @@ class ReactorWithSpanInstrumentationTest extends AgentInstrumentationSpecificati
}
}

def "should capture nested Mono spans"() {
setup:
def mono = Mono.defer({ ->
Span span = GlobalOpenTelemetry.getTracer("test").spanBuilder("inner-manual").startSpan()
span.end()
return Mono.just("Value")
})

def result = new TracedWithSpan()
.outer(mono)

StepVerifier.create(result)
.expectNext("Value")
.verifyComplete()

expect:
assertTraces(1) {
trace(0, 3) {
span(0) {
name "TracedWithSpan.outer"
kind SpanKind.INTERNAL
hasNoParent()
attributes {
}
}
span(1) {
name "TracedWithSpan.mono"
kind SpanKind.INTERNAL
childOf span(0)
attributes {
}
}
span(2) {
name "inner-manual"
kind SpanKind.INTERNAL
childOf span(1)
attributes {
}
}
}
}
}

def "should capture nested spans from current"() {
setup:
Span parent = GlobalOpenTelemetry.getTracer("test")
.spanBuilder("parent").startSpan()

Scope scope = parent.makeCurrent()

def result = new TracedWithSpan()
.mono(Mono.defer({ ->
Span inner = GlobalOpenTelemetry.getTracer("test").spanBuilder("inner-manual").startSpan()
inner.end()
return Mono.just("Value")
}))

StepVerifier.create(result)
.expectNext("Value")
.verifyComplete()

scope.close()
parent.end()

expect:
assertTraces(1) {
trace(0, 3) {
span(0) {
name "parent"
kind SpanKind.INTERNAL
hasNoParent()
attributes {
}
}
span(1) {
name "TracedWithSpan.mono"
kind SpanKind.INTERNAL
childOf span(0)
attributes {
}
}
span(2) {
name "inner-manual"
kind SpanKind.INTERNAL
childOf span(1)
attributes {
}
}
}
}
}

def "should capture nested Flux spans"() {
setup:
def mono = Flux.defer({ ->
Span span = GlobalOpenTelemetry.getTracer("test").spanBuilder("inner-manual").startSpan()
span.end()
return Flux.just("Value")
})

def result = new TracedWithSpan()
.flux(mono)

StepVerifier.create(result)
.expectNext("Value")
.verifyComplete()

expect:
assertTraces(1) {
trace(0, 2) {
span(0) {
name "TracedWithSpan.flux"
kind SpanKind.INTERNAL
hasNoParent()
attributes {
}
}
span(1) {
name "inner-manual"
kind SpanKind.INTERNAL
childOf span(0)
attributes {
}
}
}
}
}

def "should capture span for already errored Mono"() {
setup:
def error = new IllegalArgumentException("Boom")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,11 @@ public Mono<String> mono(Mono<String> mono) {
return mono;
}

@WithSpan
public Mono<String> outer(Mono<String> inner) {
return mono(inner);
}

@WithSpan
public Flux<String> flux(Flux<String> flux) {
return flux;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,14 +59,18 @@ protected void end(Object result, Throwable error) {

if (asyncValue instanceof Mono) {
Mono<?> mono = (Mono<?>) asyncValue;
return mono.doOnError(notificationConsumer)

return TracingOperator.runInScope(mono, context)
.doOnError(notificationConsumer)
.doOnSuccess(notificationConsumer::onSuccess)
.doOnCancel(notificationConsumer::onCancel);
} else {
Flux<?> flux = Flux.from((Publisher<?>) asyncValue);
return flux.doOnError(notificationConsumer)
return TracingOperator.runInScope(flux, context)
.doOnError(notificationConsumer)
.doOnComplete(notificationConsumer)
.doOnCancel(notificationConsumer::onCancel);
.doOnCancel(notificationConsumer::onCancel)
.subscriberContext(ctx -> TracingOperator.storeInContext(ctx, context));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@
import reactor.core.CoreSubscriber;
import reactor.core.Fuseable;
import reactor.core.Scannable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Hooks;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Operators;

/** Based on Spring Sleuth's Reactor instrumentation. */
Expand All @@ -45,6 +47,13 @@ public static TracingOperatorBuilder newBuilder() {
return new TracingOperatorBuilder();
}

private static final ContextKey<Context> TRACE_CONTEXT_KEY =
ContextKey.named("otel-trace-context");

// have to be re-initialized after hooks are set/reset
private static Mono<String> dummyMono = Mono.just("");
private static Flux<String> dummyFlux = Flux.just("");

private final ReactorAsyncOperationEndStrategy asyncOperationEndStrategy;

TracingOperator(boolean captureExperimentalSpanAttributes) {
Expand All @@ -54,12 +63,29 @@ public static TracingOperatorBuilder newBuilder() {
.build();
}

private static final ContextKey<Context> TRACE_CONTEXT_KEY =
ContextKey.named("otel-trace-context");
/** Forces Mono to run in traceContext scope. */
static <T> Mono<T> runInScope(Mono<T> publisher, Context tracingContext) {
// this hack forces 'publisher' to run in the onNext callback of `TracingSubscriber`
// created for this publisher and with current() span that refer to span created here
// without the hack, publisher runs in the onAssembly stage, before traceContext is made current
return dummyMono
.flatMap(i -> publisher)
.subscriberContext(ctx -> storeInContext(ctx, tracingContext));
}

/** Forces Flux to run in traceContext scope. */
static <T> Flux<T> runInScope(Flux<T> publisher, Context tracingContext) {
// this hack forces 'publisher' to run in the onNext callback of `TracingSubscriber`
// created for this publisher and with current() span that refer to span created here
// without the hack, publisher runs in the onAssembly stage, before traceContext is made current
return dummyFlux
.flatMap(i -> publisher)
.subscriberContext(ctx -> storeInContext(ctx, tracingContext));
}

/**
* Stores Trace {@link io.opentelemetry.context.Context} in Reactor {@link
* reactor.util.context.Context}
* reactor.util.context.Context}.
*
* @param context Reactor's context to store trace context in.
* @param traceContext Trace context to be stored.
Expand All @@ -71,20 +97,16 @@ public static reactor.util.context.Context storeInContext(

/**
* Gets Trace {@link io.opentelemetry.context.Context} from Reactor {@link
* reactor.util.context.Context}
* reactor.util.context.Context}.
*
* @param context Reactor's context to get trace context from.
* @param traceContext Default value to be returned if no trace context is found on Reactor
* @param defaultTraceContext Default value to be returned if no trace context is found on Reactor
* context.
* @return Trace context or null.
* @return Trace context or default value.
*/
public static Context fromContextOrDefault(
reactor.util.context.Context context, Context traceContext) {
if (context == null) {
return traceContext;
}

return context.getOrDefault(TRACE_CONTEXT_KEY, traceContext);
reactor.util.context.Context context, Context defaultTraceContext) {
return context.getOrDefault(TRACE_CONTEXT_KEY, defaultTraceContext);
}

/**
Expand All @@ -96,12 +118,16 @@ public static Context fromContextOrDefault(
public void registerOnEachOperator() {
Hooks.onEachOperator(TracingSubscriber.class.getName(), tracingLift(asyncOperationEndStrategy));
AsyncOperationEndStrategies.instance().registerStrategy(asyncOperationEndStrategy);
dummyMono = Mono.just("");
dummyFlux = Flux.just("");
}

/** Unregisters the hook registered by {@link #registerOnEachOperator()}. */
public void resetOnEachOperator() {
Hooks.resetOnEachOperator(TracingSubscriber.class.getName());
AsyncOperationEndStrategies.instance().unregisterStrategy(asyncOperationEndStrategy);
dummyMono = Mono.just("");
dummyFlux = Flux.just("");
}

private static <T> Function<? super Publisher<T>, ? extends Publisher<T>> tracingLift(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ class ReactorCoreTest extends AbstractReactorCoreTest implements LibraryTestTrai
when:
runWithSpan({
return publisherSupplier().transform({ publisher -> traceNonBlocking(publisher, "inner")})
});
})

then:
assertTraces(1) {
Expand Down Expand Up @@ -126,7 +126,7 @@ class ReactorCoreTest extends AbstractReactorCoreTest implements LibraryTestTrai
.doOnEach({ signal ->
if (signal.isOnError()) {
// reactor 3.1 does not support getting context here yet
Span.current().setStatus(status);
Span.current().setStatus(status)
Span.current().end()
} else if (signal.isOnComplete()) {
Span.current().end()
Expand All @@ -143,7 +143,7 @@ class ReactorCoreTest extends AbstractReactorCoreTest implements LibraryTestTrai
.startSpan()

return TracingOperator.storeInContext(ctx, parent.with(innerSpan))
});
})
}

def getDummy(def publisher) {
Expand Down

0 comments on commit 38afb0b

Please sign in to comment.