Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix ending span in Ktor plugin #11726

Merged
merged 5 commits into from
Aug 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ import io.opentelemetry.context.Context
import io.opentelemetry.context.propagation.ContextPropagators
import io.opentelemetry.extension.kotlin.asContextElement
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter
import kotlinx.coroutines.InternalCoroutinesApi
import kotlinx.coroutines.job
import kotlinx.coroutines.launch
import kotlinx.coroutines.withContext

class KtorClientTracing internal constructor(
Expand Down Expand Up @@ -83,23 +86,21 @@ class KtorClientTracing internal constructor(
}
}

@OptIn(InternalCoroutinesApi::class)
private fun installSpanEnd(plugin: KtorClientTracing, scope: HttpClient) {
val endSpanPhase = PipelinePhase("OpenTelemetryEndSpan")
scope.receivePipeline.insertPhaseBefore(HttpReceivePipeline.State, endSpanPhase)
laurit marked this conversation as resolved.
Show resolved Hide resolved

scope.receivePipeline.intercept(endSpanPhase) {
val openTelemetryContext = it.call.attributes.getOrNull(openTelemetryContextKey)
openTelemetryContext ?: return@intercept

if (openTelemetryContext != null) {
try {
withContext(openTelemetryContext.asContextElement()) { proceed() }
plugin.endSpan(openTelemetryContext, it.call, null)
} catch (e: Throwable) {
plugin.endSpan(openTelemetryContext, it.call, e)
throw e
}
} else {
proceed()
scope.launch {
val job = it.call.coroutineContext.job
job.join()
val cause = job.getCancellationException()

plugin.endSpan(openTelemetryContext, it.call, cause)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,20 @@ import io.ktor.client.engine.cio.*
import io.ktor.client.plugins.*
import io.ktor.client.request.*
import io.ktor.http.*
import io.opentelemetry.api.trace.SpanKind
import io.opentelemetry.context.Context
import io.opentelemetry.extension.kotlin.asContextElement
import io.opentelemetry.instrumentation.testing.junit.http.AbstractHttpClientTest
import io.opentelemetry.instrumentation.testing.junit.http.HttpClientResult
import io.opentelemetry.instrumentation.testing.junit.http.HttpClientTestOptions
import io.opentelemetry.instrumentation.testing.junit.http.HttpClientTestOptions.DEFAULT_HTTP_ATTRIBUTES
import io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.assertThat
import io.opentelemetry.sdk.testing.assertj.TraceAssert
import io.opentelemetry.semconv.NetworkAttributes
import kotlinx.coroutines.*
import org.junit.jupiter.api.Test
import java.net.URI
import java.util.function.Consumer

abstract class AbstractKtorHttpClientTest : AbstractHttpClientTest<HttpRequestBuilder>() {

Expand Down Expand Up @@ -71,4 +76,24 @@ abstract class AbstractKtorHttpClientTest : AbstractHttpClientTest<HttpRequestBu
}
}
}

@Test
fun checkSpanEndsAfterBodyReceived() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it make sense to move this to AbstractHttpClientTest to ensure that other http client instrumentations also behave the same way?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure because according to the documentation, it's okay to have both behaviors:

HTTP client spans SHOULD end sometime after the HTTP response headers are fully read (or when they fail to be read). This may or may not include reading the response body. (https://opentelemetry.io/docs/specs/semconv/http/http-spans/#http-client-span-duration)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@marychatte can you confirm that the change conforms with this part of the spec also? thanks

If there is any possibility for application code to not fully read the HTTP response (and for the HTTP client library to then have to clean up the HTTP response asynchronously), the HTTP client span SHOULD NOT be ended in this cleanup phase, and instead SHOULD end at some point after the HTTP response headers are fully read (or fail to be read). This avoids the span being ended asynchronously later on at a time which is no longer directly associated with the application code which made the HTTP request.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@trask, there will be no later from This avoids the span being ended asynchronously later. Span will be ended right after the body is received. It's not the clean-up phase in the usual sense, we are using invokeOnCompletion at the moment of the ending of the body.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks, what happens if a user reads the response headers and then decides not to read (ignore) the body?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@trask In Ktor, we read the raw body in memory before it, and even if the user does not decide to call response.bodyAsText() or response.receive(), we already have the body in memory, and it will not go through the HttpResponsePipeline and all transformations.
Also, as I understand from tests, the body is ignored in them

val method = "GET"
val path = "/long-request"
val uri = resolveAddress(path)
val responseCode = doRequest(method, uri)

assertThat(responseCode).isEqualTo(200)

testing.waitAndAssertTraces(
Consumer { trace: TraceAssert ->
val span = trace.getSpan(0)
assertThat(span).hasKind(SpanKind.CLIENT)
assertThat(span.endEpochNanos - span.startEpochNanos >= 1_000_000_000)
.describedAs("Span duration should be at least 1000ms")
.isTrue()
}
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1048,7 +1048,7 @@ static SpanDataAssert assertServerSpan(SpanDataAssert span) {
return span.hasName("test-http-server").hasKind(SpanKind.SERVER);
}

private int doRequest(String method, URI uri) throws Exception {
protected int doRequest(String method, URI uri) throws Exception {
return doRequest(method, uri, Collections.emptyMap());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import io.opentelemetry.instrumentation.test.server.http.RequestContextGetter;
import io.opentelemetry.testing.internal.armeria.common.HttpData;
import io.opentelemetry.testing.internal.armeria.common.HttpResponse;
import io.opentelemetry.testing.internal.armeria.common.HttpResponseWriter;
import io.opentelemetry.testing.internal.armeria.common.HttpStatus;
import io.opentelemetry.testing.internal.armeria.common.ResponseHeaders;
import io.opentelemetry.testing.internal.armeria.common.ResponseHeadersBuilder;
Expand All @@ -29,6 +30,7 @@
import java.nio.file.Paths;
import java.security.KeyStore;
import java.time.Duration;
import java.util.concurrent.TimeUnit;
import javax.net.ssl.KeyManagerFactory;

public final class HttpClientTestServer extends ServerExtension {
Expand Down Expand Up @@ -99,6 +101,24 @@ protected void configure(ServerBuilder sb) throws Exception {
"/read-timeout",
(ctx, req) ->
HttpResponse.delayed(HttpResponse.of(HttpStatus.OK), Duration.ofSeconds(20)))
.service(
"/long-request",
(ctx, req) -> {
HttpResponseWriter writer = HttpResponse.streaming();
writer.write(ResponseHeaders.of(HttpStatus.OK));
writer.write(HttpData.ofUtf8("Hello"));

ctx.eventLoop()
.schedule(
() -> {
writer.write(HttpData.ofUtf8("World"));
writer.close();
},
1,
TimeUnit.SECONDS);

return writer;
})
.decorator(
(delegate, ctx, req) -> {
for (String field : openTelemetry.getPropagators().getTextMapPropagator().fields()) {
Expand Down
Loading