Skip to content

Commit

Permalink
Fix ending span in Ktor plugin (#11726)
Browse files Browse the repository at this point in the history
  • Loading branch information
marychatte committed Aug 15, 2024
1 parent b00ccd7 commit c9a9f57
Show file tree
Hide file tree
Showing 4 changed files with 57 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ import io.opentelemetry.context.Context
import io.opentelemetry.context.propagation.ContextPropagators
import io.opentelemetry.extension.kotlin.asContextElement
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter
import kotlinx.coroutines.InternalCoroutinesApi
import kotlinx.coroutines.job
import kotlinx.coroutines.launch
import kotlinx.coroutines.withContext

class KtorClientTracing internal constructor(
Expand Down Expand Up @@ -83,23 +86,21 @@ class KtorClientTracing internal constructor(
}
}

@OptIn(InternalCoroutinesApi::class)
private fun installSpanEnd(plugin: KtorClientTracing, scope: HttpClient) {
val endSpanPhase = PipelinePhase("OpenTelemetryEndSpan")
scope.receivePipeline.insertPhaseBefore(HttpReceivePipeline.State, endSpanPhase)

scope.receivePipeline.intercept(endSpanPhase) {
val openTelemetryContext = it.call.attributes.getOrNull(openTelemetryContextKey)
openTelemetryContext ?: return@intercept

if (openTelemetryContext != null) {
try {
withContext(openTelemetryContext.asContextElement()) { proceed() }
plugin.endSpan(openTelemetryContext, it.call, null)
} catch (e: Throwable) {
plugin.endSpan(openTelemetryContext, it.call, e)
throw e
}
} else {
proceed()
scope.launch {
val job = it.call.coroutineContext.job
job.join()
val cause = job.getCancellationException()

plugin.endSpan(openTelemetryContext, it.call, cause)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,20 @@ import io.ktor.client.engine.cio.*
import io.ktor.client.plugins.*
import io.ktor.client.request.*
import io.ktor.http.*
import io.opentelemetry.api.trace.SpanKind
import io.opentelemetry.context.Context
import io.opentelemetry.extension.kotlin.asContextElement
import io.opentelemetry.instrumentation.testing.junit.http.AbstractHttpClientTest
import io.opentelemetry.instrumentation.testing.junit.http.HttpClientResult
import io.opentelemetry.instrumentation.testing.junit.http.HttpClientTestOptions
import io.opentelemetry.instrumentation.testing.junit.http.HttpClientTestOptions.DEFAULT_HTTP_ATTRIBUTES
import io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.assertThat
import io.opentelemetry.sdk.testing.assertj.TraceAssert
import io.opentelemetry.semconv.NetworkAttributes
import kotlinx.coroutines.*
import org.junit.jupiter.api.Test
import java.net.URI
import java.util.function.Consumer

abstract class AbstractKtorHttpClientTest : AbstractHttpClientTest<HttpRequestBuilder>() {

Expand Down Expand Up @@ -71,4 +76,24 @@ abstract class AbstractKtorHttpClientTest : AbstractHttpClientTest<HttpRequestBu
}
}
}

@Test
fun checkSpanEndsAfterBodyReceived() {
val method = "GET"
val path = "/long-request"
val uri = resolveAddress(path)
val responseCode = doRequest(method, uri)

assertThat(responseCode).isEqualTo(200)

testing.waitAndAssertTraces(
Consumer { trace: TraceAssert ->
val span = trace.getSpan(0)
assertThat(span).hasKind(SpanKind.CLIENT)
assertThat(span.endEpochNanos - span.startEpochNanos >= 1_000_000_000)
.describedAs("Span duration should be at least 1000ms")
.isTrue()
}
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1048,7 +1048,7 @@ static SpanDataAssert assertServerSpan(SpanDataAssert span) {
return span.hasName("test-http-server").hasKind(SpanKind.SERVER);
}

private int doRequest(String method, URI uri) throws Exception {
protected int doRequest(String method, URI uri) throws Exception {
return doRequest(method, uri, Collections.emptyMap());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import io.opentelemetry.instrumentation.test.server.http.RequestContextGetter;
import io.opentelemetry.testing.internal.armeria.common.HttpData;
import io.opentelemetry.testing.internal.armeria.common.HttpResponse;
import io.opentelemetry.testing.internal.armeria.common.HttpResponseWriter;
import io.opentelemetry.testing.internal.armeria.common.HttpStatus;
import io.opentelemetry.testing.internal.armeria.common.ResponseHeaders;
import io.opentelemetry.testing.internal.armeria.common.ResponseHeadersBuilder;
Expand All @@ -29,6 +30,7 @@
import java.nio.file.Paths;
import java.security.KeyStore;
import java.time.Duration;
import java.util.concurrent.TimeUnit;
import javax.net.ssl.KeyManagerFactory;

public final class HttpClientTestServer extends ServerExtension {
Expand Down Expand Up @@ -99,6 +101,24 @@ protected void configure(ServerBuilder sb) throws Exception {
"/read-timeout",
(ctx, req) ->
HttpResponse.delayed(HttpResponse.of(HttpStatus.OK), Duration.ofSeconds(20)))
.service(
"/long-request",
(ctx, req) -> {
HttpResponseWriter writer = HttpResponse.streaming();
writer.write(ResponseHeaders.of(HttpStatus.OK));
writer.write(HttpData.ofUtf8("Hello"));

ctx.eventLoop()
.schedule(
() -> {
writer.write(HttpData.ofUtf8("World"));
writer.close();
},
1,
TimeUnit.SECONDS);

return writer;
})
.decorator(
(delegate, ctx, req) -> {
for (String field : openTelemetry.getPropagators().getTextMapPropagator().fields()) {
Expand Down

0 comments on commit c9a9f57

Please sign in to comment.