Skip to content

Commit

Permalink
Improve vertx-sql client context propagation (#9640)
Browse files Browse the repository at this point in the history
  • Loading branch information
laurit committed Oct 10, 2023
1 parent 13585c6 commit 6038a87
Show file tree
Hide file tree
Showing 5 changed files with 183 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ public static void onExit(
SqlConnectOptions sqlConnectOptions = virtualField.get(pool);

future = VertxSqlClientSingletons.attachConnectOptions(future, sqlConnectOptions);
future = VertxSqlClientSingletons.wrapContext(future);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,6 @@
package io.opentelemetry.javaagent.instrumentation.vertx.v4_0.sql;

import static io.opentelemetry.javaagent.bootstrap.Java8BytecodeBridge.currentContext;
import static io.opentelemetry.javaagent.instrumentation.vertx.v4_0.sql.VertxSqlClientSingletons.OTEL_CONTEXT_KEY;
import static io.opentelemetry.javaagent.instrumentation.vertx.v4_0.sql.VertxSqlClientSingletons.OTEL_PARENT_CONTEXT_KEY;
import static io.opentelemetry.javaagent.instrumentation.vertx.v4_0.sql.VertxSqlClientSingletons.OTEL_REQUEST_KEY;
import static io.opentelemetry.javaagent.instrumentation.vertx.v4_0.sql.VertxSqlClientSingletons.getSqlConnectOptions;
import static io.opentelemetry.javaagent.instrumentation.vertx.v4_0.sql.VertxSqlClientSingletons.instrumenter;
import static net.bytebuddy.matcher.ElementMatchers.isConstructor;
Expand Down Expand Up @@ -98,9 +95,7 @@ public static void onEnter(

context = instrumenter().start(parentContext, otelRequest);
scope = context.makeCurrent();
promiseInternal.context().localContextData().put(OTEL_REQUEST_KEY, otelRequest);
promiseInternal.context().localContextData().put(OTEL_CONTEXT_KEY, context);
promiseInternal.context().localContextData().put(OTEL_PARENT_CONTEXT_KEY, parentContext);
VertxSqlClientSingletons.attachRequest(promiseInternal, otelRequest, context, parentContext);
}

@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
import io.vertx.core.Promise;
import io.vertx.core.impl.ContextInternal;
import io.vertx.core.impl.future.PromiseInternal;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
Expand All @@ -40,12 +38,7 @@ public void transform(TypeTransformer transformer) {
public static class CompleteAdvice {
@Advice.OnMethodEnter(suppress = Throwable.class)
public static Scope onEnter(@Advice.FieldValue("handler") Promise<?> promise) {
if (!(promise instanceof PromiseInternal)) {
return null;
}
PromiseInternal<?> promiseInternal = (PromiseInternal<?>) promise;
ContextInternal contextInternal = promiseInternal.context();
return endQuerySpan(contextInternal.localContextData(), null);
return endQuerySpan(promise, null);
}

@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
Expand All @@ -61,12 +54,7 @@ public static class FailAdvice {
@Advice.OnMethodEnter(suppress = Throwable.class)
public static Scope onEnter(
@Advice.Argument(0) Throwable throwable, @Advice.FieldValue("handler") Promise<?> promise) {
if (!(promise instanceof PromiseInternal)) {
return null;
}
PromiseInternal<?> promiseInternal = (PromiseInternal<?>) promise;
ContextInternal contextInternal = promiseInternal.context();
return endQuerySpan(contextInternal.localContextData(), throwable);
return endQuerySpan(promise, throwable);
}

@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,13 @@
import io.opentelemetry.instrumentation.api.util.VirtualField;
import io.opentelemetry.javaagent.bootstrap.internal.CommonConfig;
import io.vertx.core.Future;
import io.vertx.core.Promise;
import io.vertx.sqlclient.SqlConnectOptions;
import io.vertx.sqlclient.SqlConnection;
import io.vertx.sqlclient.impl.SqlClientBase;
import java.util.Map;
import java.util.concurrent.CompletableFuture;

public final class VertxSqlClientSingletons {
public static final String OTEL_REQUEST_KEY = "otel.request";
public static final String OTEL_CONTEXT_KEY = "otel.context";
public static final String OTEL_PARENT_CONTEXT_KEY = "otel.parent-context";
private static final String INSTRUMENTATION_NAME = "io.opentelemetry.vertx-sql-client-4.0";
private static final Instrumenter<VertxSqlClientRequest, Void> INSTRUMENTER;
private static final ThreadLocal<SqlConnectOptions> connectOptions = new ThreadLocal<>();
Expand Down Expand Up @@ -66,16 +64,33 @@ public static SqlConnectOptions getSqlConnectOptions() {
return connectOptions.get();
}

public static Scope endQuerySpan(Map<Object, Object> contextData, Throwable throwable) {
VertxSqlClientRequest otelRequest =
(VertxSqlClientRequest) contextData.remove(OTEL_REQUEST_KEY);
Context otelContext = (Context) contextData.remove(OTEL_CONTEXT_KEY);
Context otelParentContext = (Context) contextData.remove(OTEL_PARENT_CONTEXT_KEY);
if (otelRequest == null || otelContext == null || otelParentContext == null) {
private static final VirtualField<Promise<?>, RequestData> requestDataField =
VirtualField.find(Promise.class, RequestData.class);

public static void attachRequest(
Promise<?> promise, VertxSqlClientRequest request, Context context, Context parentContext) {
requestDataField.set(promise, new RequestData(request, context, parentContext));
}

public static Scope endQuerySpan(Promise<?> promise, Throwable throwable) {
RequestData requestData = requestDataField.get(promise);
if (requestData == null) {
return null;
}
instrumenter().end(otelContext, otelRequest, null, throwable);
return otelParentContext.makeCurrent();
instrumenter().end(requestData.context, requestData.request, null, throwable);
return requestData.parentContext.makeCurrent();
}

static class RequestData {
final VertxSqlClientRequest request;
final Context context;
final Context parentContext;

RequestData(VertxSqlClientRequest request, Context context, Context parentContext) {
this.request = request;
this.context = context;
this.parentContext = parentContext;
}
}

// this virtual field is also used in SqlClientBase instrumentation
Expand All @@ -93,5 +108,23 @@ public static Future<SqlConnection> attachConnectOptions(
});
}

public static <T> Future<T> wrapContext(Future<T> future) {
Context context = Context.current();
CompletableFuture<T> result = new CompletableFuture<>();
future
.toCompletionStage()
.whenComplete(
(value, throwable) -> {
try (Scope ignore = context.makeCurrent()) {
if (throwable != null) {
result.completeExceptionally(throwable);
} else {
result.complete(value);
}
}
});
return Future.fromCompletionStage(result);
}

private VertxSqlClientSingletons() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@
import static io.opentelemetry.semconv.SemanticAttributes.NET_PEER_PORT;

import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.instrumentation.testing.internal.AutoCleanupExtension;
import io.opentelemetry.instrumentation.testing.junit.AgentInstrumentationExtension;
import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
import io.opentelemetry.sdk.testing.assertj.TraceAssert;
import io.opentelemetry.sdk.trace.data.StatusData;
import io.vertx.core.Vertx;
import io.vertx.pgclient.PgConnectOptions;
Expand All @@ -30,10 +32,16 @@
import io.vertx.sqlclient.PoolOptions;
import io.vertx.sqlclient.Tuple;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
Expand All @@ -54,6 +62,9 @@ class VertxSqlClientTest {
@RegisterExtension
private static final InstrumentationExtension testing = AgentInstrumentationExtension.create();

@RegisterExtension
private static final AutoCleanupExtension cleanup = AutoCleanupExtension.create();

private static GenericContainer<?> container;
private static Vertx vertx;
private static Pool pool;
Expand Down Expand Up @@ -288,4 +299,127 @@ void testWithConnection() throws Exception {

assertPreparedSelect();
}

@Test
void testManyQueries() throws Exception {
int count = 50;
CountDownLatch latch = new CountDownLatch(count);
List<CompletableFuture<Object>> futureList = new ArrayList<>();
List<CompletableFuture<Object>> resultList = new ArrayList<>();
for (int i = 0; i < count; i++) {
CompletableFuture<Object> future = new CompletableFuture<>();
futureList.add(future);
resultList.add(
future.whenComplete((rows, throwable) -> testing.runWithSpan("callback", () -> {})));
}
for (CompletableFuture<Object> future : futureList) {
testing.runWithSpan(
"parent",
() ->
pool.query("select * from test")
.execute(
rowSetAsyncResult -> {
if (rowSetAsyncResult.succeeded()) {
future.complete(rowSetAsyncResult.result());
} else {
future.completeExceptionally(rowSetAsyncResult.cause());
}
latch.countDown();
}));
}
latch.await(30, TimeUnit.SECONDS);
for (CompletableFuture<Object> result : resultList) {
result.get(10, TimeUnit.SECONDS);
}

List<Consumer<TraceAssert>> assertions =
Collections.nCopies(
count,
trace ->
trace.hasSpansSatisfyingExactly(
span -> span.hasName("parent").hasKind(SpanKind.INTERNAL),
span ->
span.hasName("SELECT tempdb.test")
.hasKind(SpanKind.CLIENT)
.hasParent(trace.getSpan(0))
.hasAttributesSatisfyingExactly(
equalTo(DB_NAME, DB),
equalTo(DB_USER, USER_DB),
equalTo(DB_STATEMENT, "select * from test"),
equalTo(DB_OPERATION, "SELECT"),
equalTo(DB_SQL_TABLE, "test"),
equalTo(NET_PEER_NAME, "localhost"),
equalTo(NET_PEER_PORT, port)),
span ->
span.hasName("callback")
.hasKind(SpanKind.INTERNAL)
.hasParent(trace.getSpan(0))));
testing.waitAndAssertTraces(assertions);
}

@Test
void testConcurrency() throws Exception {
int count = 50;
CountDownLatch latch = new CountDownLatch(count);
List<CompletableFuture<Object>> futureList = new ArrayList<>();
List<CompletableFuture<Object>> resultList = new ArrayList<>();
for (int i = 0; i < count; i++) {
CompletableFuture<Object> future = new CompletableFuture<>();
futureList.add(future);
resultList.add(
future.whenComplete((rows, throwable) -> testing.runWithSpan("callback", () -> {})));
}
ExecutorService executorService = Executors.newFixedThreadPool(4);
cleanup.deferCleanup(() -> executorService.shutdown());
for (CompletableFuture<Object> future : futureList) {
executorService.submit(
() -> {
testing
.runWithSpan(
"parent",
() ->
pool.withConnection(
conn ->
conn.preparedQuery("select * from test where id = $1")
.execute(Tuple.of(1))))
.onComplete(
rowSetAsyncResult -> {
if (rowSetAsyncResult.succeeded()) {
future.complete(rowSetAsyncResult.result());
} else {
future.completeExceptionally(rowSetAsyncResult.cause());
}
latch.countDown();
});
});
}
latch.await(30, TimeUnit.SECONDS);
for (CompletableFuture<Object> result : resultList) {
result.get(10, TimeUnit.SECONDS);
}

List<Consumer<TraceAssert>> assertions =
Collections.nCopies(
count,
trace ->
trace.hasSpansSatisfyingExactly(
span -> span.hasName("parent").hasKind(SpanKind.INTERNAL),
span ->
span.hasName("SELECT tempdb.test")
.hasKind(SpanKind.CLIENT)
.hasParent(trace.getSpan(0))
.hasAttributesSatisfyingExactly(
equalTo(DB_NAME, DB),
equalTo(DB_USER, USER_DB),
equalTo(DB_STATEMENT, "select * from test where id = $?"),
equalTo(DB_OPERATION, "SELECT"),
equalTo(DB_SQL_TABLE, "test"),
equalTo(NET_PEER_NAME, "localhost"),
equalTo(NET_PEER_PORT, port)),
span ->
span.hasName("callback")
.hasKind(SpanKind.INTERNAL)
.hasParent(trace.getSpan(0))));
testing.waitAndAssertTraces(assertions);
}
}

0 comments on commit 6038a87

Please sign in to comment.