From 4790a6f80f2e1fde92ef199999e53ca532027e23 Mon Sep 17 00:00:00 2001 From: Liudmila Molkova Date: Wed, 14 Sep 2022 11:23:18 -0700 Subject: [PATCH] Reactor instrumentation: do not make root context current (#6593) Co-authored-by: Trask Stalnaker --- .../reactor/TracingSubscriber.java | 6 +- .../reactor/ReactorCoreTest.java | 133 ++++++++++++++++++ .../AbstractReactorNettyHttpClientTest.java | 1 + .../client/SpringWebfluxHttpClientTest.groovy | 5 + .../test/base/HttpClientTest.groovy | 21 +++ .../junit/http/AbstractHttpClientTest.java | 36 ++++- .../junit/http/HttpClientTestOptions.java | 6 + 7 files changed, 206 insertions(+), 2 deletions(-) diff --git a/instrumentation/reactor/reactor-3.1/library/src/main/java/io/opentelemetry/instrumentation/reactor/TracingSubscriber.java b/instrumentation/reactor/reactor-3.1/library/src/main/java/io/opentelemetry/instrumentation/reactor/TracingSubscriber.java index fae0af530fa4..355068bf1d8e 100644 --- a/instrumentation/reactor/reactor-3.1/library/src/main/java/io/opentelemetry/instrumentation/reactor/TracingSubscriber.java +++ b/instrumentation/reactor/reactor-3.1/library/src/main/java/io/opentelemetry/instrumentation/reactor/TracingSubscriber.java @@ -20,6 +20,7 @@ package io.opentelemetry.instrumentation.reactor; +import io.opentelemetry.api.trace.Span; import io.opentelemetry.context.Scope; import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; @@ -34,6 +35,7 @@ public class TracingSubscriber implements CoreSubscriber { private final io.opentelemetry.context.Context traceContext; private final Subscriber subscriber; private final Context context; + private final boolean hasContextToPropagate; public TracingSubscriber(Subscriber subscriber, Context ctx) { this(subscriber, ctx, io.opentelemetry.context.Context.current()); @@ -46,6 +48,8 @@ public TracingSubscriber( this.subscriber = subscriber; this.context = ctx; this.traceContext = ContextPropagationOperator.getOpenTelemetryContext(ctx, contextToPropagate); + this.hasContextToPropagate = + traceContext == null ? false : Span.fromContext(traceContext).getSpanContext().isValid(); } @Override @@ -74,7 +78,7 @@ public Context currentContext() { } private void withActiveSpan(Runnable runnable) { - if (traceContext != null) { + if (hasContextToPropagate) { try (Scope ignored = traceContext.makeCurrent()) { runnable.run(); } diff --git a/instrumentation/reactor/reactor-3.1/library/src/test/java/io/opentelemetry/instrumentation/reactor/ReactorCoreTest.java b/instrumentation/reactor/reactor-3.1/library/src/test/java/io/opentelemetry/instrumentation/reactor/ReactorCoreTest.java index aefefb572356..eff9c205b0a1 100644 --- a/instrumentation/reactor/reactor-3.1/library/src/test/java/io/opentelemetry/instrumentation/reactor/ReactorCoreTest.java +++ b/instrumentation/reactor/reactor-3.1/library/src/test/java/io/opentelemetry/instrumentation/reactor/ReactorCoreTest.java @@ -7,13 +7,16 @@ import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.attributeEntry; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.fail; import io.opentelemetry.api.trace.Span; import io.opentelemetry.api.trace.StatusCode; import io.opentelemetry.api.trace.Tracer; import io.opentelemetry.context.Context; +import io.opentelemetry.context.Scope; import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension; import io.opentelemetry.instrumentation.testing.junit.LibraryInstrumentationExtension; +import java.time.Duration; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; @@ -22,6 +25,7 @@ import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.publisher.UnicastProcessor; +import reactor.test.StepVerifier; class ReactorCoreTest extends AbstractReactorCoreTest { @@ -229,6 +233,135 @@ void fluxParentsAccessible() { .isPresent(); } + @Test + void doesNotOverrideInnerCurrentSpans() { + Flux publish = + Flux.create( + sink -> { + for (int i = 0; i < 2; i++) { + Span s = tracer.spanBuilder("inner").startSpan(); + try (Scope scope = s.makeCurrent()) { + sink.next(i); + } finally { + s.end(); + } + } + }); + + // as a result we'll have + // 1. publish subscriber that creates inner spans + // 2. tracing subscriber without current context - subscription was done outside any scope + // 3. inner subscriber that will add onNext attribute to inner spans + // I.e. tracing subscriber context (root) at subscription time will be different from inner in + // onNext + publish + .take(2) + .subscribe( + n -> { + assertThat(Span.current().getSpanContext().isValid()).isTrue(); + Span.current().setAttribute("onNext", true); + }, + error -> fail(error.getMessage())); + + testing.waitAndAssertTraces( + trace -> + trace.hasSpansSatisfyingExactly( + span -> + span.hasName("inner") + .hasNoParent() + .hasAttributes(attributeEntry("onNext", true))), + trace -> + trace.hasSpansSatisfyingExactly( + span -> + span.hasName("inner") + .hasNoParent() + .hasAttributes(attributeEntry("onNext", true)))); + } + + @Test + void doesNotOverrideInnerCurrentSpansAsync() { + Flux publish = + Flux.create( + sink -> { + Span s = tracer.spanBuilder("inner").startSpan(); + try (Scope scope = s.makeCurrent()) { + sink.next(s); + } finally { + s.end(); + } + }); + + publish + .take(1) + .delayElements(Duration.ofMillis(1)) + .doOnNext( + span -> { + assertThat(Span.current().getSpanContext().isValid()).isTrue(); + assertThat(Span.current()).isSameAs(span); + }) + .subscribe( + span -> assertThat(Span.current()).isSameAs(span), error -> fail(error.getMessage())); + + testing.waitAndAssertTraces( + trace -> trace.hasSpansSatisfyingExactly(span -> span.hasName("inner").hasNoParent())); + } + + @Test + void doesNotOverrideInnerCurrentSpansWithThereIsOuterCurrent() { + Flux publish = + Flux.create( + sink -> { + for (int i = 0; i < 2; i++) { + Span s = tracer.spanBuilder("inner").startSpan(); + try (Scope scope = s.makeCurrent()) { + sink.next(i); + } finally { + s.end(); + } + } + }); + + // as a result we'll have + // 1. publish subscriber that creates inner spans + // 2. tracing subscriber with outer context - it was active at subscription time + // 3. inner subscriber that will add onNext attribute + // I.e. tracing subscriber context at subscription time will be different from inner in onNext + Span outer = tracer.spanBuilder("outer").startSpan(); + try (Scope scope = outer.makeCurrent()) { + StepVerifier.create( + publish + .take(2) + .doOnNext( + n -> { + assertThat(Span.current().getSpanContext().isValid()).isTrue(); + Span.current().setAttribute("onNext", true); + }) + .subscriberContext( + // subscribers that know that their subscription can happen + // ahead of time and in the 'wrong' context, has to clean up 'wrong' context + context -> + ContextPropagationOperator.storeOpenTelemetryContext( + context, Context.root()))) + .expectNextCount(2) + .verifyComplete(); + + outer.end(); + } + + testing.waitAndAssertTraces( + trace -> + trace.hasSpansSatisfyingExactly( + span -> span.hasName("outer").hasNoParent(), + span -> + span.hasName("inner") + .hasParent(trace.getSpan(0)) + .hasAttributes(attributeEntry("onNext", true)), + span -> + span.hasName("inner") + .hasParent(trace.getSpan(0)) + .hasAttributes(attributeEntry("onNext", true)))); + } + private Mono monoSpan(Mono mono, String spanName) { return ContextPropagationOperator.ScalarPropagatingMono.create(mono) .doOnEach( diff --git a/instrumentation/reactor/reactor-netty/reactor-netty-1.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/AbstractReactorNettyHttpClientTest.java b/instrumentation/reactor/reactor-netty/reactor-netty-1.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/AbstractReactorNettyHttpClientTest.java index 264799fdc9d5..b6a84f5b69c6 100644 --- a/instrumentation/reactor/reactor-netty/reactor-netty-1.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/AbstractReactorNettyHttpClientTest.java +++ b/instrumentation/reactor/reactor-netty/reactor-netty-1.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/AbstractReactorNettyHttpClientTest.java @@ -102,6 +102,7 @@ protected void configure(HttpClientTestOptions options) { options.disableTestRedirects(); options.enableTestReadTimeout(); options.setUserAgent(USER_AGENT); + options.enableTestCallbackWithImplicitParent(); options.setClientSpanErrorMapper( (uri, exception) -> { diff --git a/instrumentation/spring/spring-webflux-5.0/javaagent/src/test/groovy/client/SpringWebfluxHttpClientTest.groovy b/instrumentation/spring/spring-webflux-5.0/javaagent/src/test/groovy/client/SpringWebfluxHttpClientTest.groovy index 9c2317293aee..674a5da83c4a 100644 --- a/instrumentation/spring/spring-webflux-5.0/javaagent/src/test/groovy/client/SpringWebfluxHttpClientTest.groovy +++ b/instrumentation/spring/spring-webflux-5.0/javaagent/src/test/groovy/client/SpringWebfluxHttpClientTest.groovy @@ -80,6 +80,11 @@ class SpringWebfluxHttpClientTest extends HttpClientTest> httpAttributes(URI uri) { def attributes = super.httpAttributes(uri) diff --git a/testing-common/src/main/groovy/io/opentelemetry/instrumentation/test/base/HttpClientTest.groovy b/testing-common/src/main/groovy/io/opentelemetry/instrumentation/test/base/HttpClientTest.groovy index 6e0674d10e82..1729cb5a3fa2 100644 --- a/testing-common/src/main/groovy/io/opentelemetry/instrumentation/test/base/HttpClientTest.groovy +++ b/testing-common/src/main/groovy/io/opentelemetry/instrumentation/test/base/HttpClientTest.groovy @@ -6,6 +6,7 @@ package io.opentelemetry.instrumentation.test.base import static org.junit.jupiter.api.Assumptions.assumeTrue +import static org.junit.jupiter.api.Assumptions.assumeFalse import io.opentelemetry.api.common.AttributeKey import io.opentelemetry.api.trace.SpanId @@ -221,6 +222,11 @@ abstract class HttpClientTest extends InstrumentationSpecification { return HttpClientTest.this.testCallbackWithParent() } + @Override + protected boolean testCallbackWithImplicitParent() { + return HttpClientTest.this.testCallbackWithImplicitParent() + } + @Override protected boolean testErrorWithCallback() { return HttpClientTest.this.testErrorWithCallback() @@ -294,10 +300,18 @@ abstract class HttpClientTest extends InstrumentationSpecification { def "trace request with callback and no parent"() { assumeTrue(testCallback()) + assumeFalse(testCallbackWithImplicitParent()) expect: junitTest.requestWithCallbackAndNoParent() } + def "trace request with callback and implicit parent"() { + assumeTrue(testCallback()) + assumeTrue(testCallbackWithImplicitParent()) + expect: + junitTest.requestWithCallbackAndImplicitParent() + } + def "basic request with 1 redirect"() { assumeTrue(testRedirects()) expect: @@ -497,6 +511,13 @@ abstract class HttpClientTest extends InstrumentationSpecification { true } + boolean testCallbackWithImplicitParent() { + // depending on async behavior callback can be executed within + // parent span scope or outside of the scope, e.g. in reactor-netty or spring + // callback is correlated. + false + } + boolean testErrorWithCallback() { return true } diff --git a/testing-common/src/main/java/io/opentelemetry/instrumentation/testing/junit/http/AbstractHttpClientTest.java b/testing-common/src/main/java/io/opentelemetry/instrumentation/testing/junit/http/AbstractHttpClientTest.java index 2a8596b20db5..5249140c0227 100644 --- a/testing-common/src/main/java/io/opentelemetry/instrumentation/testing/junit/http/AbstractHttpClientTest.java +++ b/testing-common/src/main/java/io/opentelemetry/instrumentation/testing/junit/http/AbstractHttpClientTest.java @@ -8,6 +8,7 @@ import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.assertThat; import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.NetTransportValues.IP_TCP; import static org.assertj.core.api.Assertions.catchThrowable; +import static org.junit.Assume.assumeFalse; import static org.junit.jupiter.api.Assumptions.assumeTrue; import io.opentelemetry.api.common.AttributeKey; @@ -197,7 +198,9 @@ void setupOptions() { if (!testErrorWithCallback()) { options.disableTestErrorWithCallback(); } - + if (testCallbackWithImplicitParent()) { + options.enableTestCallbackWithImplicitParent(); + } configure(options); } @@ -306,6 +309,7 @@ void requestWithCallbackAndParent() throws Throwable { @Test void requestWithCallbackAndNoParent() throws Throwable { assumeTrue(options.testCallback); + assumeFalse(options.testCallbackWithImplicitParent); String method = "GET"; URI uri = resolveAddress("/success"); @@ -326,6 +330,29 @@ void requestWithCallbackAndNoParent() throws Throwable { span -> span.hasName("callback").hasKind(SpanKind.INTERNAL).hasNoParent())); } + @Test + void requestWithCallbackAndImplicitParent() throws Throwable { + assumeTrue(options.testCallbackWithImplicitParent); + + String method = "GET"; + URI uri = resolveAddress("/success"); + + RequestResult result = + doRequestWithCallback(method, uri, () -> testing.runWithSpan("callback", () -> {})); + + assertThat(result.get()).isEqualTo(200); + + testing.waitAndAssertTraces( + trace -> + trace.hasSpansSatisfyingExactly( + span -> assertClientSpan(span, uri, method, 200).hasNoParent(), + span -> assertServerSpan(span).hasParent(trace.getSpan(0)), + span -> + span.hasName("callback") + .hasKind(SpanKind.INTERNAL) + .hasParent(trace.getSpan(0)))); + } + @Test void basicRequestWith1Redirect() throws Exception { // TODO quite a few clients create an extra span for the redirect @@ -1112,6 +1139,13 @@ protected boolean testCallbackWithParent() { return true; } + protected boolean testCallbackWithImplicitParent() { + // depending on async behavior callback can be executed within + // parent span scope or outside of the scope, e.g. in reactor-netty or spring + // callback is correlated. + return false; + } + protected boolean testErrorWithCallback() { return true; } diff --git a/testing-common/src/main/java/io/opentelemetry/instrumentation/testing/junit/http/HttpClientTestOptions.java b/testing-common/src/main/java/io/opentelemetry/instrumentation/testing/junit/http/HttpClientTestOptions.java index acad7093d97f..184a179fbb7e 100644 --- a/testing-common/src/main/java/io/opentelemetry/instrumentation/testing/junit/http/HttpClientTestOptions.java +++ b/testing-common/src/main/java/io/opentelemetry/instrumentation/testing/junit/http/HttpClientTestOptions.java @@ -56,6 +56,7 @@ public final class HttpClientTestOptions { boolean testCausalityWithCallback = true; boolean testCallback = true; boolean testCallbackWithParent = true; + boolean testCallbackWithImplicitParent = false; boolean testErrorWithCallback = true; HttpClientTestOptions() {} @@ -159,6 +160,11 @@ public HttpClientTestOptions disableTestCallbackWithParent() { return this; } + public HttpClientTestOptions enableTestCallbackWithImplicitParent() { + testCallbackWithImplicitParent = true; + return this; + } + public HttpClientTestOptions disableTestErrorWithCallback() { testErrorWithCallback = false; return this;