Skip to content

Commit

Permalink
Fix reactor context propagation in latest reactor (#4377)
Browse files Browse the repository at this point in the history
* Fix build and minor issue

* codenarc

Co-authored-by: Trask Stalnaker <trask.stalnaker@gmail.com>
  • Loading branch information
lmolkova and trask committed Oct 14, 2021
1 parent 0825da7 commit dfb1529
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
package io.opentelemetry.instrumentation.reactor;

import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import io.opentelemetry.instrumentation.api.annotation.support.async.AsyncOperationEndStrategies;
import java.util.function.BiFunction;
import java.util.function.Function;
Expand Down Expand Up @@ -56,8 +57,7 @@ public String toString() {
}
};

private static volatile Mono<String> dummyMono = Mono.just("");
private static volatile Flux<String> dummyFlux = Flux.just("");
private static volatile boolean enabled = false;

/**
* Stores Trace {@link io.opentelemetry.context.Context} in Reactor {@link
Expand Down Expand Up @@ -101,14 +101,14 @@ public static Context getOpenTelemetryContext(
public void registerOnEachOperator() {
Hooks.onEachOperator(TracingSubscriber.class.getName(), tracingLift(asyncOperationEndStrategy));
AsyncOperationEndStrategies.instance().registerStrategy(asyncOperationEndStrategy);
resetDummy();
enabled = true;
}

/** Unregisters the hook registered by {@link #registerOnEachOperator()}. */
public void resetOnEachOperator() {
Hooks.resetOnEachOperator(TracingSubscriber.class.getName());
AsyncOperationEndStrategies.instance().unregisterStrategy(asyncOperationEndStrategy);
resetDummy();
enabled = false;
}

private static <T> Function<? super Publisher<T>, ? extends Publisher<T>> tracingLift(
Expand All @@ -118,30 +118,32 @@ public void resetOnEachOperator() {

/** Forces Mono to run in traceContext scope. */
static <T> Mono<T> runWithContext(Mono<T> publisher, Context tracingContext) {
if (!enabled) {
return publisher;
}

// this hack forces 'publisher' to run in the onNext callback of `TracingSubscriber`
// (created for this publisher) and with current() span that refers to span created here
// without the hack, publisher runs in the onAssembly stage, before traceContext is made current
return dummyMono
return ScalarPropagatingMono.INSTANCE
.flatMap(i -> publisher)
.subscriberContext(ctx -> storeOpenTelemetryContext(ctx, tracingContext));
}

/** Forces Flux to run in traceContext scope. */
static <T> Flux<T> runWithContext(Flux<T> publisher, Context tracingContext) {
if (!enabled) {
return publisher;
}

// this hack forces 'publisher' to run in the onNext callback of `TracingSubscriber`
// (created for this publisher) and with current() span that refers to span created here
// without the hack, publisher runs in the onAssembly stage, before traceContext is made current
return dummyFlux
return ScalarPropagatingFlux.INSTANCE
.flatMap(i -> publisher)
.subscriberContext(ctx -> storeOpenTelemetryContext(ctx, tracingContext));
}

private static synchronized void resetDummy() {
// have to be reset as they capture async strategy and lift
dummyMono = Mono.just("");
dummyFlux = Flux.just("");
}

public static class Lifter<T>
implements BiFunction<Scannable, CoreSubscriber<? super T>, CoreSubscriber<? super T>> {

Expand All @@ -162,4 +164,42 @@ public CoreSubscriber<? super T> apply(Scannable publisher, CoreSubscriber<? sup
return new TracingSubscriber<>(sub, sub.currentContext());
}
}

static void subscribeInActiveSpan(CoreSubscriber<? super Object> actual, Object value) {
Context tracingContextInReactor =
ContextPropagationOperator.getOpenTelemetryContext(actual.currentContext(), null);
if (tracingContextInReactor == null || tracingContextInReactor == Context.current()) {
actual.onSubscribe(Operators.scalarSubscription(actual, value));
} else {
try (Scope ignored = tracingContextInReactor.makeCurrent()) {
actual.onSubscribe(Operators.scalarSubscription(actual, value));
}
}
}

static class ScalarPropagatingMono extends Mono<Object> {
public static final Mono<Object> INSTANCE = new ScalarPropagatingMono();

private final Object value = new Object();

private ScalarPropagatingMono() {}

@Override
public void subscribe(CoreSubscriber<? super Object> actual) {
subscribeInActiveSpan(actual, value);
}
}

static class ScalarPropagatingFlux extends Flux<Object> {
public static final Flux<Object> INSTANCE = new ScalarPropagatingFlux();

private final Object value = new Object();

private ScalarPropagatingFlux() {}

@Override
public void subscribe(CoreSubscriber<? super Object> actual) {
subscribeInActiveSpan(actual, value);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -155,8 +155,7 @@ class ReactorCoreTest extends AbstractReactorCoreTest implements LibraryTestTrai
.doOnEach({ signal ->
assert Span.current().getSpanContext().isValid() : "current span is set"
if (signal.isOnComplete()) {
Span.current().end()n

Span.current().end()
}
})}).block()

Expand Down Expand Up @@ -203,9 +202,9 @@ class ReactorCoreTest extends AbstractReactorCoreTest implements LibraryTestTrai

def getDummy(def publisher) {
if (publisher instanceof Mono) {
return Mono.just("")
return ContextPropagationOperator.ScalarPropagatingMono.INSTANCE
} else if (publisher instanceof Flux) {
return Flux.just("")
return ContextPropagationOperator.ScalarPropagatingFlux.INSTANCE
}

throw new IllegalStateException("Unknown publisher")
Expand Down

0 comments on commit dfb1529

Please sign in to comment.