Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reactor instrumentation: do not make root context current #6593

Merged
merged 6 commits into from
Sep 14, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

package io.opentelemetry.instrumentation.reactor;

import io.opentelemetry.api.trace.Span;
import io.opentelemetry.context.Scope;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
Expand All @@ -34,6 +35,7 @@ public class TracingSubscriber<T> implements CoreSubscriber<T> {
private final io.opentelemetry.context.Context traceContext;
private final Subscriber<? super T> subscriber;
private final Context context;
private final boolean hasContextToPropagate;

public TracingSubscriber(Subscriber<? super T> subscriber, Context ctx) {
this(subscriber, ctx, io.opentelemetry.context.Context.current());
Expand All @@ -46,6 +48,8 @@ public TracingSubscriber(
this.subscriber = subscriber;
this.context = ctx;
this.traceContext = ContextPropagationOperator.getOpenTelemetryContext(ctx, contextToPropagate);
this.hasContextToPropagate =
traceContext == null ? false : Span.fromContext(traceContext).getSpanContext().isValid();
}

@Override
Expand Down Expand Up @@ -74,7 +78,7 @@ public Context currentContext() {
}

private void withActiveSpan(Runnable runnable) {
if (traceContext != null) {
if (hasContextToPropagate) {
try (Scope ignored = traceContext.makeCurrent()) {
runnable.run();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,16 @@

import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.attributeEntry;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.fail;

import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.StatusCode;
import io.opentelemetry.api.trace.Tracer;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
import io.opentelemetry.instrumentation.testing.junit.LibraryInstrumentationExtension;
import java.time.Duration;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
Expand All @@ -22,6 +25,7 @@
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.UnicastProcessor;
import reactor.test.StepVerifier;

class ReactorCoreTest extends AbstractReactorCoreTest {

Expand Down Expand Up @@ -229,6 +233,135 @@ void fluxParentsAccessible() {
.isPresent();
}

@Test
void doesNotOverrideInnerCurrentSpans() {
Flux<Object> publish =
Flux.create(
sink -> {
for (int i = 0; i < 2; i++) {
Span s = tracer.spanBuilder("inner").startSpan();
try (Scope scope = s.makeCurrent()) {
sink.next(i);
} finally {
s.end();
}
}
});

// as a result we'll have
// 1. publish subscriber that creates inner spans
// 2. tracing subscriber without current context - subscription was done outside any scope
// 3. inner subscriber that will add onNext attribute to inner spans
// I.e. tracing subscriber context (root) at subscription time will be different from inner in
// onNext
publish
.take(2)
.subscribe(
n -> {
assertThat(Span.current().getSpanContext().isValid()).isTrue();
Span.current().setAttribute("onNext", true);
},
error -> fail(error.getMessage()));

testing.waitAndAssertTraces(
trace ->
trace.hasSpansSatisfyingExactly(
span ->
span.hasName("inner")
.hasNoParent()
.hasAttributes(attributeEntry("onNext", true))),
trace ->
trace.hasSpansSatisfyingExactly(
span ->
span.hasName("inner")
.hasNoParent()
.hasAttributes(attributeEntry("onNext", true))));
}

@Test
void doesNotOverrideInnerCurrentSpansAsync() {
Flux<Object> publish =
Flux.create(
sink -> {
Span s = tracer.spanBuilder("inner").startSpan();
try (Scope scope = s.makeCurrent()) {
sink.next(s);
} finally {
s.end();
}
});

publish
.take(1)
.delayElements(Duration.ofMillis(1))
.doOnNext(
span -> {
assertThat(Span.current().getSpanContext().isValid()).isTrue();
assertThat(Span.current()).isSameAs(span);
})
.subscribe(
span -> assertThat(Span.current()).isSameAs(span), error -> fail(error.getMessage()));

testing.waitAndAssertTraces(
trace -> trace.hasSpansSatisfyingExactly(span -> span.hasName("inner").hasNoParent()));
}

@Test
void doesNotOverrideInnerCurrentSpansWithThereIsOuterCurrent() {
Flux<Object> publish =
Flux.create(
sink -> {
for (int i = 0; i < 2; i++) {
Span s = tracer.spanBuilder("inner").startSpan();
try (Scope scope = s.makeCurrent()) {
sink.next(i);
} finally {
s.end();
}
}
});

// as a result we'll have
// 1. publish subscriber that creates inner spans
// 2. tracing subscriber with outer context - it was active at subscription time
// 3. inner subscriber that will add onNext attribute
// I.e. tracing subscriber context at subscription time will be different from inner in onNext
Span outer = tracer.spanBuilder("outer").startSpan();
try (Scope scope = outer.makeCurrent()) {
StepVerifier.create(
publish
.take(2)
.doOnNext(
n -> {
assertThat(Span.current().getSpanContext().isValid()).isTrue();
Span.current().setAttribute("onNext", true);
})
.subscriberContext(
// subscribers that know that their subscription can happen
// ahead of time and in the 'wrong' context, has to clean up 'wrong' context
context ->
ContextPropagationOperator.storeOpenTelemetryContext(
context, Context.root())))
.expectNextCount(2)
.verifyComplete();

outer.end();
}

testing.waitAndAssertTraces(
trace ->
trace.hasSpansSatisfyingExactly(
span -> span.hasName("outer").hasNoParent(),
span ->
span.hasName("inner")
.hasParent(trace.getSpan(0))
.hasAttributes(attributeEntry("onNext", true)),
span ->
span.hasName("inner")
.hasParent(trace.getSpan(0))
.hasAttributes(attributeEntry("onNext", true))));
}

private <T> Mono<T> monoSpan(Mono<T> mono, String spanName) {
return ContextPropagationOperator.ScalarPropagatingMono.create(mono)
.doOnEach(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ protected void configure(HttpClientTestOptions options) {
options.disableTestRedirects();
options.enableTestReadTimeout();
options.setUserAgent(USER_AGENT);
options.enableTestCallbackWithImplicitParent();

options.setClientSpanErrorMapper(
(uri, exception) -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,11 @@ class SpringWebfluxHttpClientTest extends HttpClientTest<WebClient.RequestBodySp
false
}

@Override
boolean testCallbackWithImplicitParent() {
true
}

@Override
Set<AttributeKey<?>> httpAttributes(URI uri) {
def attributes = super.httpAttributes(uri)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package io.opentelemetry.instrumentation.test.base

import static org.junit.jupiter.api.Assumptions.assumeTrue
import static org.junit.jupiter.api.Assumptions.assumeFalse

import io.opentelemetry.api.common.AttributeKey
import io.opentelemetry.api.trace.SpanId
Expand Down Expand Up @@ -221,6 +222,11 @@ abstract class HttpClientTest<REQUEST> extends InstrumentationSpecification {
return HttpClientTest.this.testCallbackWithParent()
}

@Override
protected boolean testCallbackWithImplicitParent() {
return HttpClientTest.this.testCallbackWithImplicitParent()
}

@Override
protected boolean testErrorWithCallback() {
return HttpClientTest.this.testErrorWithCallback()
Expand Down Expand Up @@ -294,10 +300,18 @@ abstract class HttpClientTest<REQUEST> extends InstrumentationSpecification {
def "trace request with callback and no parent"() {
assumeTrue(testCallback())
assumeFalse(testCallbackWithImplicitParent())
expect:
junitTest.requestWithCallbackAndNoParent()
}
def "trace request with callback and implicit parent"() {
assumeTrue(testCallback())
assumeTrue(testCallbackWithImplicitParent())
expect:
junitTest.requestWithCallbackAndImplicitParent()
}
def "basic request with 1 redirect"() {
assumeTrue(testRedirects())
expect:
Expand Down Expand Up @@ -497,6 +511,13 @@ abstract class HttpClientTest<REQUEST> extends InstrumentationSpecification {
true
}
boolean testCallbackWithImplicitParent() {
// depending on async behavior callback can be executed within
// parent span scope or outside of the scope, e.g. in reactor-netty or spring
// callback is correlated.
false
}
boolean testErrorWithCallback() {
return true
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.assertThat;
import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.NetTransportValues.IP_TCP;
import static org.assertj.core.api.Assertions.catchThrowable;
import static org.junit.Assume.assumeFalse;
import static org.junit.jupiter.api.Assumptions.assumeTrue;

import io.opentelemetry.api.common.AttributeKey;
Expand Down Expand Up @@ -197,7 +198,9 @@ void setupOptions() {
if (!testErrorWithCallback()) {
options.disableTestErrorWithCallback();
}

if (testCallbackWithImplicitParent()) {
options.enableTestCallbackWithImplicitParent();
}
configure(options);
}

Expand Down Expand Up @@ -306,6 +309,7 @@ void requestWithCallbackAndParent() throws Throwable {
@Test
void requestWithCallbackAndNoParent() throws Throwable {
assumeTrue(options.testCallback);
assumeFalse(options.testCallbackWithImplicitParent);

String method = "GET";
URI uri = resolveAddress("/success");
Expand All @@ -326,6 +330,29 @@ void requestWithCallbackAndNoParent() throws Throwable {
span -> span.hasName("callback").hasKind(SpanKind.INTERNAL).hasNoParent()));
}

@Test
void requestWithCallbackAndImplicitParent() throws Throwable {
assumeTrue(options.testCallbackWithImplicitParent);

String method = "GET";
URI uri = resolveAddress("/success");

RequestResult result =
doRequestWithCallback(method, uri, () -> testing.runWithSpan("callback", () -> {}));

assertThat(result.get()).isEqualTo(200);

testing.waitAndAssertTraces(
trace ->
trace.hasSpansSatisfyingExactly(
span -> assertClientSpan(span, uri, method, 200).hasNoParent(),
span -> assertServerSpan(span).hasParent(trace.getSpan(0)),
span ->
span.hasName("callback")
.hasKind(SpanKind.INTERNAL)
.hasParent(trace.getSpan(0))));
}

@Test
void basicRequestWith1Redirect() throws Exception {
// TODO quite a few clients create an extra span for the redirect
Expand Down Expand Up @@ -1113,6 +1140,13 @@ protected boolean testCallbackWithParent() {
return true;
}

protected boolean testCallbackWithImplicitParent() {
// depending on async behavior callback can be executed within
// parent span scope or outside of the scope, e.g. in reactor-netty or spring
// callback is correlated.
return false;
}

protected boolean testErrorWithCallback() {
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ public final class HttpClientTestOptions {
boolean testCausalityWithCallback = true;
boolean testCallback = true;
boolean testCallbackWithParent = true;
boolean testCallbackWithImplicitParent = false;
boolean testErrorWithCallback = true;

HttpClientTestOptions() {}
Expand Down Expand Up @@ -159,6 +160,11 @@ public HttpClientTestOptions disableTestCallbackWithParent() {
return this;
}

public HttpClientTestOptions enableTestCallbackWithImplicitParent() {
testCallbackWithImplicitParent = true;
return this;
}

public HttpClientTestOptions disableTestErrorWithCallback() {
testErrorWithCallback = false;
return this;
Expand Down