Skip to content

Commit

Permalink
Reactor instrumentation: do not make root context current (open-telem…
Browse files Browse the repository at this point in the history
…etry#6593)

Co-authored-by: Trask Stalnaker <trask.stalnaker@gmail.com>
  • Loading branch information
2 people authored and LironKS committed Oct 31, 2022
1 parent a979c4f commit 4790a6f
Show file tree
Hide file tree
Showing 7 changed files with 206 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

package io.opentelemetry.instrumentation.reactor;

import io.opentelemetry.api.trace.Span;
import io.opentelemetry.context.Scope;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
Expand All @@ -34,6 +35,7 @@ public class TracingSubscriber<T> implements CoreSubscriber<T> {
private final io.opentelemetry.context.Context traceContext;
private final Subscriber<? super T> subscriber;
private final Context context;
private final boolean hasContextToPropagate;

public TracingSubscriber(Subscriber<? super T> subscriber, Context ctx) {
this(subscriber, ctx, io.opentelemetry.context.Context.current());
Expand All @@ -46,6 +48,8 @@ public TracingSubscriber(
this.subscriber = subscriber;
this.context = ctx;
this.traceContext = ContextPropagationOperator.getOpenTelemetryContext(ctx, contextToPropagate);
this.hasContextToPropagate =
traceContext == null ? false : Span.fromContext(traceContext).getSpanContext().isValid();
}

@Override
Expand Down Expand Up @@ -74,7 +78,7 @@ public Context currentContext() {
}

private void withActiveSpan(Runnable runnable) {
if (traceContext != null) {
if (hasContextToPropagate) {
try (Scope ignored = traceContext.makeCurrent()) {
runnable.run();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,16 @@

import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.attributeEntry;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.fail;

import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.StatusCode;
import io.opentelemetry.api.trace.Tracer;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
import io.opentelemetry.instrumentation.testing.junit.LibraryInstrumentationExtension;
import java.time.Duration;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
Expand All @@ -22,6 +25,7 @@
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.UnicastProcessor;
import reactor.test.StepVerifier;

class ReactorCoreTest extends AbstractReactorCoreTest {

Expand Down Expand Up @@ -229,6 +233,135 @@ void fluxParentsAccessible() {
.isPresent();
}

@Test
void doesNotOverrideInnerCurrentSpans() {
Flux<Object> publish =
Flux.create(
sink -> {
for (int i = 0; i < 2; i++) {
Span s = tracer.spanBuilder("inner").startSpan();
try (Scope scope = s.makeCurrent()) {
sink.next(i);
} finally {
s.end();
}
}
});

// as a result we'll have
// 1. publish subscriber that creates inner spans
// 2. tracing subscriber without current context - subscription was done outside any scope
// 3. inner subscriber that will add onNext attribute to inner spans
// I.e. tracing subscriber context (root) at subscription time will be different from inner in
// onNext
publish
.take(2)
.subscribe(
n -> {
assertThat(Span.current().getSpanContext().isValid()).isTrue();
Span.current().setAttribute("onNext", true);
},
error -> fail(error.getMessage()));

testing.waitAndAssertTraces(
trace ->
trace.hasSpansSatisfyingExactly(
span ->
span.hasName("inner")
.hasNoParent()
.hasAttributes(attributeEntry("onNext", true))),
trace ->
trace.hasSpansSatisfyingExactly(
span ->
span.hasName("inner")
.hasNoParent()
.hasAttributes(attributeEntry("onNext", true))));
}

@Test
void doesNotOverrideInnerCurrentSpansAsync() {
Flux<Object> publish =
Flux.create(
sink -> {
Span s = tracer.spanBuilder("inner").startSpan();
try (Scope scope = s.makeCurrent()) {
sink.next(s);
} finally {
s.end();
}
});

publish
.take(1)
.delayElements(Duration.ofMillis(1))
.doOnNext(
span -> {
assertThat(Span.current().getSpanContext().isValid()).isTrue();
assertThat(Span.current()).isSameAs(span);
})
.subscribe(
span -> assertThat(Span.current()).isSameAs(span), error -> fail(error.getMessage()));

testing.waitAndAssertTraces(
trace -> trace.hasSpansSatisfyingExactly(span -> span.hasName("inner").hasNoParent()));
}

@Test
void doesNotOverrideInnerCurrentSpansWithThereIsOuterCurrent() {
Flux<Object> publish =
Flux.create(
sink -> {
for (int i = 0; i < 2; i++) {
Span s = tracer.spanBuilder("inner").startSpan();
try (Scope scope = s.makeCurrent()) {
sink.next(i);
} finally {
s.end();
}
}
});

// as a result we'll have
// 1. publish subscriber that creates inner spans
// 2. tracing subscriber with outer context - it was active at subscription time
// 3. inner subscriber that will add onNext attribute
// I.e. tracing subscriber context at subscription time will be different from inner in onNext
Span outer = tracer.spanBuilder("outer").startSpan();
try (Scope scope = outer.makeCurrent()) {
StepVerifier.create(
publish
.take(2)
.doOnNext(
n -> {
assertThat(Span.current().getSpanContext().isValid()).isTrue();
Span.current().setAttribute("onNext", true);
})
.subscriberContext(
// subscribers that know that their subscription can happen
// ahead of time and in the 'wrong' context, has to clean up 'wrong' context
context ->
ContextPropagationOperator.storeOpenTelemetryContext(
context, Context.root())))
.expectNextCount(2)
.verifyComplete();

outer.end();
}

testing.waitAndAssertTraces(
trace ->
trace.hasSpansSatisfyingExactly(
span -> span.hasName("outer").hasNoParent(),
span ->
span.hasName("inner")
.hasParent(trace.getSpan(0))
.hasAttributes(attributeEntry("onNext", true)),
span ->
span.hasName("inner")
.hasParent(trace.getSpan(0))
.hasAttributes(attributeEntry("onNext", true))));
}

private <T> Mono<T> monoSpan(Mono<T> mono, String spanName) {
return ContextPropagationOperator.ScalarPropagatingMono.create(mono)
.doOnEach(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ protected void configure(HttpClientTestOptions options) {
options.disableTestRedirects();
options.enableTestReadTimeout();
options.setUserAgent(USER_AGENT);
options.enableTestCallbackWithImplicitParent();

options.setClientSpanErrorMapper(
(uri, exception) -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,11 @@ class SpringWebfluxHttpClientTest extends HttpClientTest<WebClient.RequestBodySp
false
}

@Override
boolean testCallbackWithImplicitParent() {
true
}

@Override
Set<AttributeKey<?>> httpAttributes(URI uri) {
def attributes = super.httpAttributes(uri)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package io.opentelemetry.instrumentation.test.base

import static org.junit.jupiter.api.Assumptions.assumeTrue
import static org.junit.jupiter.api.Assumptions.assumeFalse

import io.opentelemetry.api.common.AttributeKey
import io.opentelemetry.api.trace.SpanId
Expand Down Expand Up @@ -221,6 +222,11 @@ abstract class HttpClientTest<REQUEST> extends InstrumentationSpecification {
return HttpClientTest.this.testCallbackWithParent()
}

@Override
protected boolean testCallbackWithImplicitParent() {
return HttpClientTest.this.testCallbackWithImplicitParent()
}

@Override
protected boolean testErrorWithCallback() {
return HttpClientTest.this.testErrorWithCallback()
Expand Down Expand Up @@ -294,10 +300,18 @@ abstract class HttpClientTest<REQUEST> extends InstrumentationSpecification {
def "trace request with callback and no parent"() {
assumeTrue(testCallback())
assumeFalse(testCallbackWithImplicitParent())
expect:
junitTest.requestWithCallbackAndNoParent()
}
def "trace request with callback and implicit parent"() {
assumeTrue(testCallback())
assumeTrue(testCallbackWithImplicitParent())
expect:
junitTest.requestWithCallbackAndImplicitParent()
}
def "basic request with 1 redirect"() {
assumeTrue(testRedirects())
expect:
Expand Down Expand Up @@ -497,6 +511,13 @@ abstract class HttpClientTest<REQUEST> extends InstrumentationSpecification {
true
}
boolean testCallbackWithImplicitParent() {
// depending on async behavior callback can be executed within
// parent span scope or outside of the scope, e.g. in reactor-netty or spring
// callback is correlated.
false
}
boolean testErrorWithCallback() {
return true
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.assertThat;
import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.NetTransportValues.IP_TCP;
import static org.assertj.core.api.Assertions.catchThrowable;
import static org.junit.Assume.assumeFalse;
import static org.junit.jupiter.api.Assumptions.assumeTrue;

import io.opentelemetry.api.common.AttributeKey;
Expand Down Expand Up @@ -197,7 +198,9 @@ void setupOptions() {
if (!testErrorWithCallback()) {
options.disableTestErrorWithCallback();
}

if (testCallbackWithImplicitParent()) {
options.enableTestCallbackWithImplicitParent();
}
configure(options);
}

Expand Down Expand Up @@ -306,6 +309,7 @@ void requestWithCallbackAndParent() throws Throwable {
@Test
void requestWithCallbackAndNoParent() throws Throwable {
assumeTrue(options.testCallback);
assumeFalse(options.testCallbackWithImplicitParent);

String method = "GET";
URI uri = resolveAddress("/success");
Expand All @@ -326,6 +330,29 @@ void requestWithCallbackAndNoParent() throws Throwable {
span -> span.hasName("callback").hasKind(SpanKind.INTERNAL).hasNoParent()));
}

@Test
void requestWithCallbackAndImplicitParent() throws Throwable {
assumeTrue(options.testCallbackWithImplicitParent);

String method = "GET";
URI uri = resolveAddress("/success");

RequestResult result =
doRequestWithCallback(method, uri, () -> testing.runWithSpan("callback", () -> {}));

assertThat(result.get()).isEqualTo(200);

testing.waitAndAssertTraces(
trace ->
trace.hasSpansSatisfyingExactly(
span -> assertClientSpan(span, uri, method, 200).hasNoParent(),
span -> assertServerSpan(span).hasParent(trace.getSpan(0)),
span ->
span.hasName("callback")
.hasKind(SpanKind.INTERNAL)
.hasParent(trace.getSpan(0))));
}

@Test
void basicRequestWith1Redirect() throws Exception {
// TODO quite a few clients create an extra span for the redirect
Expand Down Expand Up @@ -1112,6 +1139,13 @@ protected boolean testCallbackWithParent() {
return true;
}

protected boolean testCallbackWithImplicitParent() {
// depending on async behavior callback can be executed within
// parent span scope or outside of the scope, e.g. in reactor-netty or spring
// callback is correlated.
return false;
}

protected boolean testErrorWithCallback() {
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ public final class HttpClientTestOptions {
boolean testCausalityWithCallback = true;
boolean testCallback = true;
boolean testCallbackWithParent = true;
boolean testCallbackWithImplicitParent = false;
boolean testErrorWithCallback = true;

HttpClientTestOptions() {}
Expand Down Expand Up @@ -159,6 +160,11 @@ public HttpClientTestOptions disableTestCallbackWithParent() {
return this;
}

public HttpClientTestOptions enableTestCallbackWithImplicitParent() {
testCallbackWithImplicitParent = true;
return this;
}

public HttpClientTestOptions disableTestErrorWithCallback() {
testErrorWithCallback = false;
return this;
Expand Down

0 comments on commit 4790a6f

Please sign in to comment.