Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow reactor instrumentation to pick up spans from reactor context #4159

Merged
merged 9 commits into from
Oct 12, 2021
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,11 @@
* SPDX-License-Identifier: Apache-2.0
*/

import io.opentelemetry.api.GlobalOpenTelemetry
import io.opentelemetry.api.trace.Span
import io.opentelemetry.api.trace.SpanKind
import io.opentelemetry.api.trace.StatusCode
import io.opentelemetry.context.Scope
import io.opentelemetry.instrumentation.reactor.TracedWithSpan
import io.opentelemetry.instrumentation.test.AgentInstrumentationSpecification
import reactor.core.publisher.Flux
Expand Down Expand Up @@ -70,6 +73,134 @@ class ReactorWithSpanInstrumentationTest extends AgentInstrumentationSpecificati
}
}

def "should capture nested Mono spans"() {
setup:
def mono = Mono.defer({ ->
Span span = GlobalOpenTelemetry.getTracer("test").spanBuilder("inner-manual").startSpan()
span.end()
return Mono.just("Value")
})

def result = new TracedWithSpan()
.outer(mono)

StepVerifier.create(result)
.expectNext("Value")
.verifyComplete()

expect:
assertTraces(1) {
trace(0, 3) {
span(0) {
name "TracedWithSpan.outer"
kind SpanKind.INTERNAL
hasNoParent()
attributes {
}
}
span(1) {
name "TracedWithSpan.mono"
kind SpanKind.INTERNAL
childOf span(0)
attributes {
}
}
span(2) {
name "inner-manual"
kind SpanKind.INTERNAL
childOf span(1)
attributes {
}
}
}
}
}

def "should capture nested spans from current"() {
setup:
Span parent = GlobalOpenTelemetry.getTracer("test")
.spanBuilder("parent").startSpan()

Scope scope = parent.makeCurrent()

def result = new TracedWithSpan()
.mono(Mono.defer({ ->
Span inner = GlobalOpenTelemetry.getTracer("test").spanBuilder("inner-manual").startSpan()
inner.end()
return Mono.just("Value")
}))

StepVerifier.create(result)
.expectNext("Value")
.verifyComplete()

scope.close()
parent.end()

expect:
assertTraces(1) {
trace(0, 3) {
span(0) {
name "parent"
kind SpanKind.INTERNAL
hasNoParent()
attributes {
}
}
span(1) {
name "TracedWithSpan.mono"
kind SpanKind.INTERNAL
childOf span(0)
attributes {
}
}
span(2) {
name "inner-manual"
kind SpanKind.INTERNAL
childOf span(1)
attributes {
}
}
}
}
}

def "should capture nested Flux spans"() {
setup:
def mono = Flux.defer({ ->
Span span = GlobalOpenTelemetry.getTracer("test").spanBuilder("inner-manual").startSpan()
span.end()
return Flux.just("Value")
})

def result = new TracedWithSpan()
.flux(mono)

StepVerifier.create(result)
.expectNext("Value")
.verifyComplete()

expect:
assertTraces(1) {
trace(0, 2) {
span(0) {
name "TracedWithSpan.flux"
kind SpanKind.INTERNAL
hasNoParent()
attributes {
}
}
span(1) {
name "inner-manual"
kind SpanKind.INTERNAL
childOf span(0)
attributes {
}
}
}
}
}

def "should capture span for already errored Mono"() {
setup:
def error = new IllegalArgumentException("Boom")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,11 @@ public Mono<String> mono(Mono<String> mono) {
return mono;
}

@WithSpan
public Mono<String> outer(Mono<String> inner) {
return mono(inner);
}

@WithSpan
public Flux<String> flux(Flux<String> flux) {
return flux;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,14 +59,18 @@ protected void end(Object result, Throwable error) {

if (asyncValue instanceof Mono) {
Mono<?> mono = (Mono<?>) asyncValue;
return mono.doOnError(notificationConsumer)

return TracingOperator.runInScope(mono, context)
.doOnError(notificationConsumer)
.doOnSuccess(notificationConsumer::onSuccess)
.doOnCancel(notificationConsumer::onCancel);
} else {
Flux<?> flux = Flux.from((Publisher<?>) asyncValue);
return flux.doOnError(notificationConsumer)
return TracingOperator.runInScope(flux, context)
.doOnError(notificationConsumer)
.doOnComplete(notificationConsumer)
.doOnCancel(notificationConsumer::onCancel);
.doOnCancel(notificationConsumer::onCancel)
.subscriberContext(ctx -> TracingOperator.storeInContext(ctx, context));
lmolkova marked this conversation as resolved.
Show resolved Hide resolved
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,17 @@
package io.opentelemetry.instrumentation.reactor;

import io.opentelemetry.context.Context;
import io.opentelemetry.context.ContextKey;
import io.opentelemetry.instrumentation.api.annotation.support.async.AsyncOperationEndStrategies;
import java.util.function.BiFunction;
import java.util.function.Function;
import org.reactivestreams.Publisher;
import reactor.core.CoreSubscriber;
import reactor.core.Fuseable;
import reactor.core.Scannable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Hooks;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Operators;

/** Based on Spring Sleuth's Reactor instrumentation. */
Expand All @@ -44,6 +47,13 @@ public static TracingOperatorBuilder newBuilder() {
return new TracingOperatorBuilder();
}

private static final ContextKey<Context> TRACE_CONTEXT_KEY =
ContextKey.named("otel-trace-context");
lmolkova marked this conversation as resolved.
Show resolved Hide resolved

// have to be re-initialized after hooks are set/reset
private static Mono<String> dummyMono = Mono.just("");
private static Flux<String> dummyFlux = Flux.just("");
trask marked this conversation as resolved.
Show resolved Hide resolved

private final ReactorAsyncOperationEndStrategy asyncOperationEndStrategy;

TracingOperator(boolean captureExperimentalSpanAttributes) {
Expand All @@ -53,6 +63,52 @@ public static TracingOperatorBuilder newBuilder() {
.build();
}

/** Forces Mono to run in traceContext scope. */
static <T> Mono<T> runInScope(Mono<T> publisher, Context tracingContext) {
// this hack forces 'publisher' to run in the onNext callback of `TracingSubscriber`
// created for this publisher and with current() span that refer to span created here
// without the hack, publisher runs in the onAssembly stage, before traceContext is made current
return dummyMono
.flatMap(i -> publisher)
.subscriberContext(ctx -> storeInContext(ctx, tracingContext));
}

/** Forces Flux to run in traceContext scope. */
static <T> Flux<T> runInScope(Flux<T> publisher, Context tracingContext) {
// this hack forces 'publisher' to run in the onNext callback of `TracingSubscriber`
// created for this publisher and with current() span that refer to span created here
// without the hack, publisher runs in the onAssembly stage, before traceContext is made current
return dummyFlux
.flatMap(i -> publisher)
.subscriberContext(ctx -> storeInContext(ctx, tracingContext));
}

/**
* Stores Trace {@link io.opentelemetry.context.Context} in Reactor {@link
* reactor.util.context.Context}.
*
* @param context Reactor's context to store trace context in.
* @param traceContext Trace context to be stored.
*/
public static reactor.util.context.Context storeInContext(
reactor.util.context.Context context, Context traceContext) {
return context.put(TRACE_CONTEXT_KEY, traceContext);
}

/**
* Gets Trace {@link io.opentelemetry.context.Context} from Reactor {@link
* reactor.util.context.Context}.
*
* @param context Reactor's context to get trace context from.
* @param defaultTraceContext Default value to be returned if no trace context is found on Reactor
* context.
* @return Trace context or default value.
*/
public static Context fromContextOrDefault(
reactor.util.context.Context context, Context defaultTraceContext) {
return context.getOrDefault(TRACE_CONTEXT_KEY, defaultTraceContext);
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what do you think of:

  • storeInContext() -> storeOpenTelemetryContext()
  • fromContextOrDefault() -> getOpenTelemetryContext()

also, I think (future) renaming of the class might help:

  • TracingOperator -> ReactorTracing
  • TracingOperatorBuilder -> ReactorTracingBuilder


/**
* Registers a hook that applies to every operator, propagating {@link Context} to downstream
* callbacks to ensure spans in the {@link Context} are available throughout the lifetime of a
Expand All @@ -62,12 +118,16 @@ public static TracingOperatorBuilder newBuilder() {
public void registerOnEachOperator() {
Hooks.onEachOperator(TracingSubscriber.class.getName(), tracingLift(asyncOperationEndStrategy));
AsyncOperationEndStrategies.instance().registerStrategy(asyncOperationEndStrategy);
dummyMono = Mono.just("");
dummyFlux = Flux.just("");
}

/** Unregisters the hook registered by {@link #registerOnEachOperator()}. */
public void resetOnEachOperator() {
Hooks.resetOnEachOperator(TracingSubscriber.class.getName());
AsyncOperationEndStrategies.instance().unregisterStrategy(asyncOperationEndStrategy);
dummyMono = Mono.just("");
dummyFlux = Flux.just("");
}

private static <T> Function<? super Publisher<T>, ? extends Publisher<T>> tracingLift(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,12 @@ public TracingSubscriber(
io.opentelemetry.context.Context contextToPropagate) {
this.subscriber = subscriber;
this.context = ctx;
this.traceContext = contextToPropagate;
this.traceContext = TracingOperator.fromContextOrDefault(ctx, contextToPropagate);
}

@Override
public void onSubscribe(Subscription subscription) {
subscriber.onSubscribe(subscription);
withActiveSpan(() -> subscriber.onSubscribe(subscription));
}

@Override
Expand Down
Loading