Skip to content

Commit

Permalink
End reactor-netty HTTP client span properly on Mono#timeout() (open…
Browse files Browse the repository at this point in the history
…-telemetry#6891)

Calling `Mono#timeout()` with a timeout value smaller than the HTTP
client timeout caused the on request/response end callbacks to be simply
discarded; and the HTTP span was never finished.
  • Loading branch information
Mateusz Rzeszutek authored and dmarkwat committed Oct 22, 2022
1 parent a63bfe5 commit e812b9e
Show file tree
Hide file tree
Showing 2 changed files with 95 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import io.opentelemetry.context.Context;
import io.opentelemetry.instrumentation.netty.v4_1.NettyClientTelemetry;
import io.opentelemetry.instrumentation.reactor.ContextPropagationOperator;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.BiConsumer;
import java.util.function.Function;
import javax.annotation.Nullable;
Expand Down Expand Up @@ -58,8 +59,20 @@ public static HttpClient.ResponseReceiver<?> instrument(HttpClient.ResponseRecei
}

static final class ContextHolder {

private static final AtomicReferenceFieldUpdater<ContextHolder, Context> contextUpdater =
AtomicReferenceFieldUpdater.newUpdater(ContextHolder.class, Context.class, "context");

volatile Context parentContext;
volatile Context context;

void setContext(Context context) {
contextUpdater.set(this, context);
}

Context getAndRemoveContext() {
return contextUpdater.getAndSet(this, null);
}
}

static final class StartOperation
Expand All @@ -76,23 +89,33 @@ static final class StartOperation
@Override
public Mono<? extends Connection> apply(Mono<? extends Connection> mono) {
return Mono.defer(
() -> {
Context parentContext = Context.current();
contextHolder.parentContext = parentContext;
if (!instrumenter().shouldStart(parentContext, config)) {
// make context accessible via the reactor ContextView - the doOn* callbacks
// instrumentation uses this to set the proper context for callbacks
return mono.contextWrite(ctx -> ctx.put(CLIENT_PARENT_CONTEXT_KEY, parentContext));
}

Context context = instrumenter().start(parentContext, config);
contextHolder.context = context;
return ContextPropagationOperator.runWithContext(mono, context)
// make contexts accessible via the reactor ContextView - the doOn* callbacks
// instrumentation uses the parent context to set the proper context for callbacks
.contextWrite(ctx -> ctx.put(CLIENT_PARENT_CONTEXT_KEY, parentContext))
.contextWrite(ctx -> ctx.put(CLIENT_CONTEXT_KEY, context));
});
() -> {
Context parentContext = Context.current();
contextHolder.parentContext = parentContext;
if (!instrumenter().shouldStart(parentContext, config)) {
// make context accessible via the reactor ContextView - the doOn* callbacks
// instrumentation uses this to set the proper context for callbacks
return mono.contextWrite(
ctx -> ctx.put(CLIENT_PARENT_CONTEXT_KEY, parentContext));
}

Context context = instrumenter().start(parentContext, config);
contextHolder.setContext(context);
return ContextPropagationOperator.runWithContext(mono, context)
// make contexts accessible via the reactor ContextView - the doOn* callbacks
// instrumentation uses the parent context to set the proper context for
// callbacks
.contextWrite(ctx -> ctx.put(CLIENT_PARENT_CONTEXT_KEY, parentContext))
.contextWrite(ctx -> ctx.put(CLIENT_CONTEXT_KEY, context));
})
.doOnCancel(
() -> {
Context context = contextHolder.getAndRemoveContext();
if (context == null) {
return;
}
instrumenter().end(context, config, null, null);
});
}
}

Expand Down Expand Up @@ -134,7 +157,7 @@ static final class EndOperationWithRequestError

@Override
public void accept(HttpClientRequest httpClientRequest, Throwable error) {
Context context = contextHolder.context;
Context context = contextHolder.getAndRemoveContext();
if (context == null) {
return;
}
Expand All @@ -155,7 +178,7 @@ static final class EndOperationWithResponseError

@Override
public void accept(HttpClientResponse response, Throwable error) {
Context context = contextHolder.context;
Context context = contextHolder.getAndRemoveContext();
if (context == null) {
return;
}
Expand All @@ -175,7 +198,7 @@ static final class EndOperationWithSuccess implements BiConsumer<HttpClientRespo

@Override
public void accept(HttpClientResponse response, Connection connection) {
Context context = contextHolder.context;
Context context = contextHolder.getAndRemoveContext();
if (context == null) {
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import static io.opentelemetry.api.trace.SpanKind.INTERNAL;
import static io.opentelemetry.api.trace.SpanKind.SERVER;
import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.assertThat;
import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.equalTo;
import static java.util.Collections.emptySet;
import static org.assertj.core.api.Assertions.catchThrowable;

Expand All @@ -17,6 +18,7 @@
import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.SpanContext;
import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.instrumentation.test.utils.PortUtils;
import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
import io.opentelemetry.instrumentation.testing.junit.http.AbstractHttpClientTest;
Expand All @@ -27,6 +29,7 @@
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes;
import java.net.InetSocketAddress;
import java.net.URI;
import java.time.Duration;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -260,6 +263,55 @@ void shouldNotLeakConnections() {
assertThat(uniqueChannelHashes).hasSize(1);
}

@Test
void shouldEndSpanOnMonoTimeout() {
HttpClient httpClient = createHttpClient();

URI uri = resolveAddress("/read-timeout");
Throwable thrown =
catchThrowable(
() ->
testing.runWithSpan(
"parent",
() ->
httpClient
.get()
.uri(uri)
.responseSingle(
(resp, content) -> {
// Make sure to consume content since that's when we close the
// span.
return content.map(unused -> resp);
})
// apply Mono timeout that is way shorter than HTTP request timeout
.timeout(Duration.ofSeconds(1))
.block()));

testing.waitAndAssertTraces(
trace ->
trace.hasSpansSatisfyingExactly(
span ->
span.hasName("parent")
.hasKind(SpanKind.INTERNAL)
.hasNoParent()
.hasStatus(StatusData.error())
.hasException(thrown),
span ->
span.hasName("HTTP GET")
.hasKind(CLIENT)
.hasParent(trace.getSpan(0))
.hasAttributesSatisfyingExactly(
equalTo(SemanticAttributes.HTTP_METHOD, "GET"),
equalTo(SemanticAttributes.HTTP_URL, uri.toString()),
equalTo(SemanticAttributes.HTTP_USER_AGENT, USER_AGENT),
equalTo(SemanticAttributes.NET_PEER_NAME, "localhost"),
equalTo(SemanticAttributes.NET_PEER_PORT, uri.getPort())),
span ->
span.hasName("test-http-server")
.hasKind(SpanKind.SERVER)
.hasParent(trace.getSpan(1))));
}

private static void assertSameSpan(SpanData expected, AtomicReference<Span> actual) {
SpanContext expectedSpanContext = expected.getSpanContext();
SpanContext actualSpanContext = actual.get().getSpanContext();
Expand Down

0 comments on commit e812b9e

Please sign in to comment.