Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ratpack services OpenTelemetry #7477

Merged
merged 5 commits into from
Jan 25, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ public HttpClient instrument(HttpClient httpClient) throws Exception {
httpClientSpec -> {
httpClientSpec.requestIntercept(
requestSpec -> {
Context parentOtelCtx = Context.current();
Context parentOtelCtx =
Execution.current().maybeGet(Context.class).orElse(Context.current());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hi @jsalinaspolo! is this still needed if you are using RatpackTelemetry.getOpenTelemetryExecInterceptor()?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this still needed if you are using RatpackTelemetry.getOpenTelemetryExecInterceptor()?

Unforunelty, yes, because the OpenTelemetryExecInterceptor only works within Ratpack handlers
The PR helps when we use the Ratpack HttpClient in Ratpack Services, which runs in a different Execution, so the compute thread is not blocked.

The Ratpack service will add the OpenTelemetry Context to the Execution when Instrumentation is required.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it possible to configure Ratpack Services to use the OpenTelemetryExecInterceptor?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unforunelty, yes, because the OpenTelemetryExecInterceptor only works within Ratpack handlers

What I meant is that works because the OpenTelemetryServerHandler adds the OpenTelemetry Context into the Ratpack Context

is it possible to configure Ratpack Services to use the OpenTelemetryExecInterceptor?

We could use something similar to OpenTelemetryServerHandler but I do not think that it is what we would want 🤔

The idea of the OpenTelemetryServerHandler is to add instrumentation into the Handlers (the entry point is an HTTP request) and add the OpenTelemetry Context to the Ratpack Execution, so other instrumentation is propagated within the Handler

Ratpack Services, most of the time, you want to run things in the background using something different than the compute threads (so the compute threads are not blocked and can receive requests). Hence, many times the Service will be running things in backgrounds like Kafka Consumers, Kafka Streams, Rabbit Consumers, or just background processes with loops.

Therefore, might not make sense to have a parent span within the Service (as it does with the Handler) but each of the iterations/events running in the service might want to create a Span with or without a parent (if the context is propagated in the events, and so on)

I think that for Services, it makes sense to have the flexibility to decide when to create the Span that aggregates the "iteration"

if (!instrumenter.shouldStart(parentOtelCtx, requestSpec)) {
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,26 @@

package io.opentelemetry.instrumentation.ratpack.client

import io.opentelemetry.api.OpenTelemetry
import io.opentelemetry.api.trace.SpanKind
import io.opentelemetry.api.trace.StatusCode
import io.opentelemetry.api.trace.Tracer
import io.opentelemetry.api.trace.propagation.W3CTraceContextPropagator
import io.opentelemetry.context.Context
import io.opentelemetry.context.propagation.ContextPropagators
import io.opentelemetry.instrumentation.ratpack.RatpackTelemetry
import io.opentelemetry.sdk.OpenTelemetrySdk
import io.opentelemetry.sdk.testing.exporter.InMemorySpanExporter
import io.opentelemetry.sdk.trace.SdkTracerProvider
import io.opentelemetry.sdk.trace.export.SimpleSpanProcessor
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes
import ratpack.exec.Execution
import ratpack.exec.Promise
import ratpack.func.Action
import ratpack.guice.Guice
import ratpack.http.client.HttpClient
import ratpack.service.Service
import ratpack.service.StartEvent
import ratpack.test.embed.EmbeddedApp
import spock.lang.Specification
import spock.util.concurrent.PollingConditions
Expand Down Expand Up @@ -232,4 +238,135 @@ class InstrumentedHttpClientTest extends Specification {
attributes[HTTP_STATUS_CODE] == 200L
}
}

def "propagate http trace in ratpack services with compute thread"() {
expect:
def latch = new CountDownLatch(1)

def otherApp = EmbeddedApp.of { spec ->
spec.handlers {
it.get("foo") { ctx -> ctx.render("bar") }
}
}

def app = EmbeddedApp.of { spec ->
spec.registry(
Guice.registry { bindings ->
telemetry.configureServerRegistry(bindings)
bindings.bindInstance(HttpClient, telemetry.instrumentHttpClient(HttpClient.of(Action.noop())))
bindings.bindInstance(new BarService(latch, "${otherApp.address}foo", openTelemetry))
},
)
spec.handlers { chain ->
chain.get("foo") { ctx -> ctx.render("bar") }
}
}

app.address
latch.await()
def spanData = spanExporter.finishedSpanItems.find { it.name == "a-span" }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suspect that doing it this way will give you a flaky test that occasionally fails because spans haven't been received yet. Any reason why you are not using assertTraces as all other tests are?

Copy link
Contributor Author

@jsalinaspolo jsalinaspolo Jan 2, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not familiar with the custom DSL and does not looks to be used in groovy tests
AFAIK, it have not been flaky in these tests

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems to be a weird test that doesn't extend InstrumentationSpecification so assertTraces isn't available. I guess you could add new PollingConditions().eventually {} around asserts as other tests in this file do.

Copy link
Contributor Author

@jsalinaspolo jsalinaspolo Jan 2, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added the eventually as I am not familiar with the InMemorySpanExporter but I think that those tests are deterministic with the CountDownLatch

Other tests in the file are using eventually because the HttpClient is async and requires eventually, but in this case, the CountDownLatch does the countDown in the Promise subscription which makes it deterministic.

The only scenario that I can think of is if publishing to InMemorySpanExporter is async. As I am not too familiar with the InMemory, let's leave the eventually?

def trace = spanExporter.finishedSpanItems.findAll { it.traceId == spanData.traceId }

trace.size() == 3
}

def "propagate http trace in ratpack services with fork executions"() {
expect:
def latch = new CountDownLatch(1)

def otherApp = EmbeddedApp.of { spec ->
spec.handlers {
it.get("foo") { ctx -> ctx.render("bar") }
}
}

def app = EmbeddedApp.of { spec ->
spec.registry(
Guice.registry { bindings ->
telemetry.configureServerRegistry(bindings)
bindings.bindInstance(HttpClient, telemetry.instrumentHttpClient(HttpClient.of(Action.noop())))
bindings.bindInstance(new BarForkService(latch, "${otherApp.address}foo", openTelemetry))
},
)
spec.handlers { chain ->
chain.get("foo") { ctx -> ctx.render("bar") }
}
}

app.address
latch.await()
def spanData = spanExporter.finishedSpanItems.find { it.name == "a-span" }
def trace = spanExporter.finishedSpanItems.findAll { it.traceId == spanData.traceId }

trace.size() == 3
}
}

class BarService implements Service {
private final String url
private final CountDownLatch latch
private final OpenTelemetry opentelemetry

BarService(CountDownLatch latch, String url, OpenTelemetry opentelemetry) {
this.latch = latch
this.url = url
this.opentelemetry = opentelemetry
}

private Tracer tracer = opentelemetry.tracerProvider.tracerBuilder("testing").build()

void onStart(StartEvent event) {
def parentContext = Context.current()
def span = tracer.spanBuilder("a-span")
.setParent(parentContext)
.startSpan()

Context otelContext = parentContext.with(span)
otelContext.makeCurrent().withCloseable {
Execution.current().add(Context, otelContext)
def httpClient = event.registry.get(HttpClient)
httpClient.get(new URI(url))
.flatMap { httpClient.get(new URI(url)) }
.then {
span.end()
latch.countDown()
}
}
}
}


class BarForkService implements Service {
private final String url
private final CountDownLatch latch
private final OpenTelemetry opentelemetry

BarForkService(CountDownLatch latch, String url, OpenTelemetry opentelemetry) {
this.latch = latch
this.url = url
this.opentelemetry = opentelemetry
}

private Tracer tracer = opentelemetry.tracerProvider.tracerBuilder("testing").build()

void onStart(StartEvent event) {
Execution.fork().start {
def parentContext = Context.current()
def span = tracer.spanBuilder("a-span")
.setParent(parentContext)
.startSpan()

Context otelContext = parentContext.with(span)
otelContext.makeCurrent().withCloseable {
Execution.current().add(Context, otelContext)
def httpClient = event.registry.get(HttpClient)
httpClient.get(new URI(url))
.flatMap { httpClient.get(new URI(url)) }
.then {
span.end()
latch.countDown()
}
}
}
}
}