Skip to content

Commit

Permalink
Allow reactor instrumentation to pick up spans from reactor context (o…
Browse files Browse the repository at this point in the history
…pen-telemetry#4159)

* Allow reactor instrumentation to pick up spans from reactor context in addition to current tracing context

* And fix nested WithSpan with defer

* up

* Review comments

* move ReactorTracing to Operation and rename to ContextPropagationOperator

* fix build

* Add reactor-netty client test nested under WithSpan

* Add link to the issue in comments

* clean up
  • Loading branch information
lmolkova authored and RashmiRam committed May 23, 2022
1 parent fc5938b commit 9a259ee
Show file tree
Hide file tree
Showing 14 changed files with 520 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,13 @@ package io.opentelemetry.instrumentation.lettuce.v5_1

import io.lettuce.core.RedisClient
import io.lettuce.core.resource.ClientResources
import io.opentelemetry.instrumentation.reactor.TracingOperator
import io.opentelemetry.instrumentation.reactor.ContextPropagationOperator
import io.opentelemetry.instrumentation.test.LibraryTestTrait
import spock.lang.Shared

class LettuceReactiveClientTest extends AbstractLettuceReactiveClientTest implements LibraryTestTrait {
@Shared
TracingOperator tracingOperator = TracingOperator.create()
ContextPropagationOperator tracingOperator = ContextPropagationOperator.create()

@Override
RedisClient createClient(String uri) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import static net.bytebuddy.matcher.ElementMatchers.named;

import io.opentelemetry.instrumentation.api.config.Config;
import io.opentelemetry.instrumentation.reactor.TracingOperator;
import io.opentelemetry.instrumentation.reactor.ContextPropagationOperator;
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
import net.bytebuddy.asm.Advice;
Expand All @@ -34,7 +34,7 @@ public static class ResetOnEachOperatorAdvice {

@Advice.OnMethodExit(suppress = Throwable.class)
public static void postStaticInitializer() {
TracingOperator.newBuilder()
ContextPropagationOperator.newBuilder()
.setCaptureExperimentalSpanAttributes(
Config.get()
.getBoolean("otel.instrumentation.reactor.experimental-span-attributes", false))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,11 @@
* SPDX-License-Identifier: Apache-2.0
*/

import io.opentelemetry.api.GlobalOpenTelemetry
import io.opentelemetry.api.trace.Span
import io.opentelemetry.api.trace.SpanKind
import io.opentelemetry.api.trace.StatusCode
import io.opentelemetry.context.Scope
import io.opentelemetry.instrumentation.reactor.TracedWithSpan
import io.opentelemetry.instrumentation.test.AgentInstrumentationSpecification
import reactor.core.publisher.Flux
Expand Down Expand Up @@ -70,6 +73,134 @@ class ReactorWithSpanInstrumentationTest extends AgentInstrumentationSpecificati
}
}

def "should capture nested Mono spans"() {
setup:
def mono = Mono.defer({ ->
Span span = GlobalOpenTelemetry.getTracer("test").spanBuilder("inner-manual").startSpan()
span.end()
return Mono.just("Value")
})

def result = new TracedWithSpan()
.outer(mono)

StepVerifier.create(result)
.expectNext("Value")
.verifyComplete()

expect:
assertTraces(1) {
trace(0, 3) {
span(0) {
name "TracedWithSpan.outer"
kind SpanKind.INTERNAL
hasNoParent()
attributes {
}
}
span(1) {
name "TracedWithSpan.mono"
kind SpanKind.INTERNAL
childOf span(0)
attributes {
}
}
span(2) {
name "inner-manual"
kind SpanKind.INTERNAL
childOf span(1)
attributes {
}
}
}
}
}

def "should capture nested spans from current"() {
setup:
Span parent = GlobalOpenTelemetry.getTracer("test")
.spanBuilder("parent").startSpan()

Scope scope = parent.makeCurrent()

def result = new TracedWithSpan()
.mono(Mono.defer({ ->
Span inner = GlobalOpenTelemetry.getTracer("test").spanBuilder("inner-manual").startSpan()
inner.end()
return Mono.just("Value")
}))

StepVerifier.create(result)
.expectNext("Value")
.verifyComplete()

scope.close()
parent.end()

expect:
assertTraces(1) {
trace(0, 3) {
span(0) {
name "parent"
kind SpanKind.INTERNAL
hasNoParent()
attributes {
}
}
span(1) {
name "TracedWithSpan.mono"
kind SpanKind.INTERNAL
childOf span(0)
attributes {
}
}
span(2) {
name "inner-manual"
kind SpanKind.INTERNAL
childOf span(1)
attributes {
}
}
}
}
}

def "should capture nested Flux spans"() {
setup:
def mono = Flux.defer({ ->
Span span = GlobalOpenTelemetry.getTracer("test").spanBuilder("inner-manual").startSpan()
span.end()
return Flux.just("Value")
})

def result = new TracedWithSpan()
.flux(mono)

StepVerifier.create(result)
.expectNext("Value")
.verifyComplete()

expect:
assertTraces(1) {
trace(0, 2) {
span(0) {
name "TracedWithSpan.flux"
kind SpanKind.INTERNAL
hasNoParent()
attributes {
}
}
span(1) {
name "inner-manual"
kind SpanKind.INTERNAL
childOf span(0)
attributes {
}
}
}
}
}

def "should capture span for already errored Mono"() {
setup:
def error = new IllegalArgumentException("Boom")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,11 @@ public Mono<String> mono(Mono<String> mono) {
return mono;
}

@WithSpan
public Mono<String> outer(Mono<String> inner) {
return mono(inner);
}

@WithSpan
public Flux<String> flux(Flux<String> flux) {
return flux;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,23 +30,62 @@
import reactor.core.CoreSubscriber;
import reactor.core.Fuseable;
import reactor.core.Scannable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Hooks;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Operators;

/** Based on Spring Sleuth's Reactor instrumentation. */
public final class TracingOperator {
public final class ContextPropagationOperator {

public static TracingOperator create() {
public static ContextPropagationOperator create() {
return newBuilder().build();
}

public static TracingOperatorBuilder newBuilder() {
return new TracingOperatorBuilder();
public static ContextPropagationOperatorBuilder newBuilder() {
return new ContextPropagationOperatorBuilder();
}

private final ReactorAsyncOperationEndStrategy asyncOperationEndStrategy;

TracingOperator(boolean captureExperimentalSpanAttributes) {
private static final Object TRACE_CONTEXT_KEY =
new Object() {
@Override
public String toString() {
return "otel-trace-context";
}
};

private static volatile Mono<String> dummyMono = Mono.just("");
private static volatile Flux<String> dummyFlux = Flux.just("");

/**
* Stores Trace {@link io.opentelemetry.context.Context} in Reactor {@link
* reactor.util.context.Context}.
*
* @param context Reactor's context to store trace context in.
* @param traceContext Trace context to be stored.
*/
public static reactor.util.context.Context storeOpenTelemetryContext(
reactor.util.context.Context context, Context traceContext) {
return context.put(TRACE_CONTEXT_KEY, traceContext);
}

/**
* Gets Trace {@link io.opentelemetry.context.Context} from Reactor {@link
* reactor.util.context.Context}.
*
* @param context Reactor's context to get trace context from.
* @param defaultTraceContext Default value to be returned if no trace context is found on Reactor
* context.
* @return Trace context or default value.
*/
public static Context getOpenTelemetryContext(
reactor.util.context.Context context, Context defaultTraceContext) {
return context.getOrDefault(TRACE_CONTEXT_KEY, defaultTraceContext);
}

ContextPropagationOperator(boolean captureExperimentalSpanAttributes) {
this.asyncOperationEndStrategy =
ReactorAsyncOperationEndStrategy.newBuilder()
.setCaptureExperimentalSpanAttributes(captureExperimentalSpanAttributes)
Expand All @@ -62,19 +101,47 @@ public static TracingOperatorBuilder newBuilder() {
public void registerOnEachOperator() {
Hooks.onEachOperator(TracingSubscriber.class.getName(), tracingLift(asyncOperationEndStrategy));
AsyncOperationEndStrategies.instance().registerStrategy(asyncOperationEndStrategy);
resetDummy();
}

/** Unregisters the hook registered by {@link #registerOnEachOperator()}. */
public void resetOnEachOperator() {
Hooks.resetOnEachOperator(TracingSubscriber.class.getName());
AsyncOperationEndStrategies.instance().unregisterStrategy(asyncOperationEndStrategy);
resetDummy();
}

private static <T> Function<? super Publisher<T>, ? extends Publisher<T>> tracingLift(
ReactorAsyncOperationEndStrategy asyncOperationEndStrategy) {
return Operators.lift(new Lifter<>(asyncOperationEndStrategy));
}

/** Forces Mono to run in traceContext scope. */
static <T> Mono<T> runWithContext(Mono<T> publisher, Context tracingContext) {
// this hack forces 'publisher' to run in the onNext callback of `TracingSubscriber`
// (created for this publisher) and with current() span that refers to span created here
// without the hack, publisher runs in the onAssembly stage, before traceContext is made current
return dummyMono
.flatMap(i -> publisher)
.subscriberContext(ctx -> storeOpenTelemetryContext(ctx, tracingContext));
}

/** Forces Flux to run in traceContext scope. */
static <T> Flux<T> runWithContext(Flux<T> publisher, Context tracingContext) {
// this hack forces 'publisher' to run in the onNext callback of `TracingSubscriber`
// (created for this publisher) and with current() span that refers to span created here
// without the hack, publisher runs in the onAssembly stage, before traceContext is made current
return dummyFlux
.flatMap(i -> publisher)
.subscriberContext(ctx -> storeOpenTelemetryContext(ctx, tracingContext));
}

private static synchronized void resetDummy() {
// have to be reset as they capture async strategy and lift
dummyMono = Mono.just("");
dummyFlux = Flux.just("");
}

public static class Lifter<T>
implements BiFunction<Scannable, CoreSubscriber<? super T>, CoreSubscriber<? super T>> {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,18 @@

package io.opentelemetry.instrumentation.reactor;

public final class TracingOperatorBuilder {
public final class ContextPropagationOperatorBuilder {
private boolean captureExperimentalSpanAttributes;

TracingOperatorBuilder() {}
ContextPropagationOperatorBuilder() {}

public TracingOperatorBuilder setCaptureExperimentalSpanAttributes(
public ContextPropagationOperatorBuilder setCaptureExperimentalSpanAttributes(
boolean captureExperimentalSpanAttributes) {
this.captureExperimentalSpanAttributes = captureExperimentalSpanAttributes;
return this;
}

public TracingOperator build() {
return new TracingOperator(captureExperimentalSpanAttributes);
public ContextPropagationOperator build() {
return new ContextPropagationOperator(captureExperimentalSpanAttributes);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,12 +59,15 @@ protected void end(Object result, Throwable error) {

if (asyncValue instanceof Mono) {
Mono<?> mono = (Mono<?>) asyncValue;
return mono.doOnError(notificationConsumer)

return ContextPropagationOperator.runWithContext(mono, context)
.doOnError(notificationConsumer)
.doOnSuccess(notificationConsumer::onSuccess)
.doOnCancel(notificationConsumer::onCancel);
} else {
Flux<?> flux = Flux.from((Publisher<?>) asyncValue);
return flux.doOnError(notificationConsumer)
return ContextPropagationOperator.runWithContext(flux, context)
.doOnError(notificationConsumer)
.doOnComplete(notificationConsumer)
.doOnCancel(notificationConsumer::onCancel);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,12 @@ public TracingSubscriber(
io.opentelemetry.context.Context contextToPropagate) {
this.subscriber = subscriber;
this.context = ctx;
this.traceContext = contextToPropagate;
this.traceContext = ContextPropagationOperator.getOpenTelemetryContext(ctx, contextToPropagate);
}

@Override
public void onSubscribe(Subscription subscription) {
subscriber.onSubscribe(subscription);
withActiveSpan(() -> subscriber.onSubscribe(subscription));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ class HooksTest extends LibraryInstrumentationSpecification {

def "can reset out hooks"() {
setup:
def underTest = TracingOperator.create()
def underTest = ContextPropagationOperator.create()
AtomicReference<CoreSubscriber> subscriber = new AtomicReference<>()

when: "no hook registered"
Expand Down
Loading

0 comments on commit 9a259ee

Please sign in to comment.