Skip to content

Commit

Permalink
Reactor Netty: emit actual HTTP client spans spans on connection errors
Browse files Browse the repository at this point in the history
  • Loading branch information
Mateusz Rzeszutek committed Jul 27, 2023
1 parent dc5d76a commit 3de3908
Show file tree
Hide file tree
Showing 14 changed files with 233 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,22 +34,22 @@ public final class NettyClientInstrumenterFactory {

private final OpenTelemetry openTelemetry;
private final String instrumentationName;
private final boolean connectionTelemetryEnabled;
private final boolean sslTelemetryEnabled;
private final NettyInstrumentationFlag connectionTelemetryState;
private final NettyInstrumentationFlag sslTelemetryState;
private final Map<String, String> peerServiceMapping;
private final boolean emitExperimentalHttpClientMetrics;

public NettyClientInstrumenterFactory(
OpenTelemetry openTelemetry,
String instrumentationName,
boolean connectionTelemetryEnabled,
boolean sslTelemetryEnabled,
NettyInstrumentationFlag connectionTelemetryState,
NettyInstrumentationFlag sslTelemetryState,
Map<String, String> peerServiceMapping,
boolean emitExperimentalHttpClientMetrics) {
this.openTelemetry = openTelemetry;
this.instrumentationName = instrumentationName;
this.connectionTelemetryEnabled = connectionTelemetryEnabled;
this.sslTelemetryEnabled = sslTelemetryEnabled;
this.connectionTelemetryState = connectionTelemetryState;
this.sslTelemetryState = sslTelemetryState;
this.peerServiceMapping = peerServiceMapping;
this.emitExperimentalHttpClientMetrics = emitExperimentalHttpClientMetrics;
}
Expand Down Expand Up @@ -84,6 +84,12 @@ public Instrumenter<HttpRequestAndChannel, HttpResponse> createHttpInstrumenter(
}

public NettyConnectionInstrumenter createConnectionInstrumenter() {
if (connectionTelemetryState == NettyInstrumentationFlag.DISABLED) {
return NoopConnectionInstrumenter.INSTANCE;
}

boolean connectionTelemetryFullyEnabled =
connectionTelemetryState == NettyInstrumentationFlag.ENABLED;
NettyConnectNetAttributesGetter netAttributesGetter = new NettyConnectNetAttributesGetter();

InstrumenterBuilder<NettyConnectionRequest, Channel> instrumenterBuilder =
Expand All @@ -92,7 +98,7 @@ public NettyConnectionInstrumenter createConnectionInstrumenter() {
.addAttributesExtractor(
PeerServiceAttributesExtractor.create(netAttributesGetter, peerServiceMapping));

if (connectionTelemetryEnabled) {
if (connectionTelemetryFullyEnabled) {
// when the connection telemetry is enabled, we do not want these CONNECT spans to be
// suppressed by higher-level HTTP spans - this would happen in the reactor-netty
// instrumentation
Expand All @@ -115,16 +121,21 @@ public NettyConnectionInstrumenter createConnectionInstrumenter() {

Instrumenter<NettyConnectionRequest, Channel> instrumenter =
instrumenterBuilder.buildInstrumenter(
connectionTelemetryEnabled
connectionTelemetryFullyEnabled
? SpanKindExtractor.alwaysInternal()
: SpanKindExtractor.alwaysClient());

return connectionTelemetryEnabled
return connectionTelemetryFullyEnabled
? new NettyConnectionInstrumenterImpl(instrumenter)
: new NettyErrorOnlyConnectionInstrumenter(instrumenter);
}

public NettySslInstrumenter createSslInstrumenter() {
if (sslTelemetryState == NettyInstrumentationFlag.DISABLED) {
return NoopSslInstrumenter.INSTANCE;
}

boolean sslTelemetryFullyEnabled = sslTelemetryState == NettyInstrumentationFlag.ENABLED;
NettySslNetAttributesGetter netAttributesGetter = new NettySslNetAttributesGetter();
Instrumenter<NettySslRequest, Void> instrumenter =
Instrumenter.<NettySslRequest, Void>builder(
Expand All @@ -133,11 +144,11 @@ public NettySslInstrumenter createSslInstrumenter() {
.addAttributesExtractor(
PeerServiceAttributesExtractor.create(netAttributesGetter, peerServiceMapping))
.buildInstrumenter(
sslTelemetryEnabled
sslTelemetryFullyEnabled
? SpanKindExtractor.alwaysInternal()
: SpanKindExtractor.alwaysClient());

return sslTelemetryEnabled
return sslTelemetryFullyEnabled
? new NettySslInstrumenterImpl(instrumenter)
: new NettySslErrorOnlyInstrumenter(instrumenter);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.instrumentation.netty.v4.common.internal.client;

/**
* This class is internal and is hence not for public use. Its APIs are unstable and can change at
* any time.
*/
public enum NettyInstrumentationFlag {
ENABLED,
ERROR_ONLY,
DISABLED;

public static NettyInstrumentationFlag enabledOrErrorOnly(boolean b) {
return b ? ENABLED : ERROR_ONLY;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.instrumentation.netty.v4.common.internal.client;

import com.google.errorprone.annotations.CanIgnoreReturnValue;
import io.netty.channel.Channel;
import io.opentelemetry.context.Context;
import io.opentelemetry.instrumentation.netty.common.internal.NettyConnectionRequest;
import javax.annotation.Nullable;

enum NoopConnectionInstrumenter implements NettyConnectionInstrumenter {
INSTANCE;

@Override
public boolean shouldStart(Context parentContext, NettyConnectionRequest request) {
return false;
}

@CanIgnoreReturnValue
@Override
public Context start(Context parentContext, NettyConnectionRequest request) {
return parentContext;
}

@Override
public void end(
Context context,
NettyConnectionRequest request,
Channel channel,
@Nullable Throwable error) {}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.instrumentation.netty.v4.common.internal.client;

import com.google.errorprone.annotations.CanIgnoreReturnValue;
import io.opentelemetry.context.Context;
import javax.annotation.Nullable;

enum NoopSslInstrumenter implements NettySslInstrumenter {
INSTANCE;

@Override
public boolean shouldStart(Context parentContext, NettySslRequest request) {
return false;
}

@CanIgnoreReturnValue
@Override
public Context start(Context parentContext, NettySslRequest request) {
return parentContext;
}

@Override
public void end(Context context, NettySslRequest request, @Nullable Throwable error) {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

package io.opentelemetry.javaagent.instrumentation.netty.v4_0.client;

import static io.opentelemetry.instrumentation.netty.v4.common.internal.client.NettyInstrumentationFlag.enabledOrErrorOnly;

import io.netty.handler.codec.http.HttpResponse;
import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
Expand Down Expand Up @@ -43,8 +45,8 @@ public final class NettyClientSingletons {
new NettyClientInstrumenterFactory(
GlobalOpenTelemetry.get(),
"io.opentelemetry.netty-4.0",
connectionTelemetryEnabled,
sslTelemetryEnabled,
enabledOrErrorOnly(connectionTelemetryEnabled),
enabledOrErrorOnly(sslTelemetryEnabled),
CommonConfig.get().getPeerServiceMapping(),
CommonConfig.get().shouldEmitExperimentalHttpClientMetrics());
INSTRUMENTER =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

package io.opentelemetry.javaagent.instrumentation.netty.v4_1;

import static io.opentelemetry.instrumentation.netty.v4.common.internal.client.NettyInstrumentationFlag.enabledOrErrorOnly;

import io.netty.handler.codec.http.HttpResponse;
import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
Expand Down Expand Up @@ -43,8 +45,8 @@ public final class NettyClientSingletons {
new NettyClientInstrumenterFactory(
GlobalOpenTelemetry.get(),
"io.opentelemetry.netty-4.1",
connectionTelemetryEnabled,
sslTelemetryEnabled,
enabledOrErrorOnly(connectionTelemetryEnabled),
enabledOrErrorOnly(sslTelemetryEnabled),
CommonConfig.get().getPeerServiceMapping(),
CommonConfig.get().shouldEmitExperimentalHttpClientMetrics());
INSTRUMENTER =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import io.opentelemetry.instrumentation.api.instrumenter.http.HttpClientAttributesExtractorBuilder;
import io.opentelemetry.instrumentation.netty.v4.common.HttpRequestAndChannel;
import io.opentelemetry.instrumentation.netty.v4.common.internal.client.NettyClientInstrumenterFactory;
import io.opentelemetry.instrumentation.netty.v4.common.internal.client.NettyInstrumentationFlag;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
Expand Down Expand Up @@ -111,8 +112,8 @@ public NettyClientTelemetry build() {
new NettyClientInstrumenterFactory(
openTelemetry,
"io.opentelemetry.netty-4.1",
false,
false,
NettyInstrumentationFlag.DISABLED,
NettyInstrumentationFlag.DISABLED,
Collections.emptyMap(),
emitExperimentalHttpClientMetrics)
.createHttpInstrumenter(extractorConfigurer, additionalAttributesExtractors));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.reactornetty.v1_0;

import java.lang.reflect.InvocationHandler;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import reactor.netty.http.client.HttpClientConfig;
import reactor.netty.http.client.HttpClientRequest;

final class FailedRequestWithUrlMaker {

static HttpClientRequest create(HttpClientConfig config, HttpClientRequest failedRequest) {
return (HttpClientRequest)
Proxy.newProxyInstance(
FailedRequestWithUrlMaker.class.getClassLoader(),
new Class<?>[] {HttpClientRequest.class},
new HttpRequestInvocationHandler(config, failedRequest));
}

private static final class HttpRequestInvocationHandler implements InvocationHandler {

private final HttpClientConfig config;
private final HttpClientRequest failedRequest;

private HttpRequestInvocationHandler(HttpClientConfig config, HttpClientRequest failedRequest) {
this.config = config;
this.failedRequest = failedRequest;
}

@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
if ("resourceUrl".equals(method.getName())) {
return computeUrlFromConfig();
}
try {
return method.invoke(failedRequest, args);
} catch (InvocationTargetException exception) {
throw exception.getCause();
}
}

private String computeUrlFromConfig() {
String uri = config.uri();
if (isAbsolute(uri)) {
return uri;
}

// use the baseUrl if it was configured
String baseUrl = config.baseUrl();
if (baseUrl != null) {
if (baseUrl.endsWith("/") && uri.startsWith("/")) {
baseUrl = baseUrl.substring(0, baseUrl.length() - 1);
}
return baseUrl + uri;
}

// otherwise, use the host+port config to construct the full url
SocketAddress hostAddress = config.remoteAddress().get();
if (hostAddress instanceof InetSocketAddress) {
InetSocketAddress inetHostAddress = (InetSocketAddress) hostAddress;
return (config.isSecure() ? "https://" : "http://")
+ inetHostAddress.getHostString()
+ ":"
+ inetHostAddress.getPort()
+ (uri.startsWith("/") ? "" : "/")
+ uri;
}

return uri;
}

private static boolean isAbsolute(String uri) {
return uri != null && !uri.isEmpty() && !uri.startsWith("/");
}
}

private FailedRequestWithUrlMaker() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import reactor.core.publisher.Mono;
import reactor.netty.Connection;
import reactor.netty.http.client.HttpClient;
import reactor.netty.http.client.HttpClientConfig;
import reactor.netty.http.client.HttpClientRequest;
import reactor.netty.http.client.HttpClientResponse;

Expand All @@ -31,13 +32,14 @@ public static HttpClient.ResponseReceiver<?> instrument(HttpClient.ResponseRecei
// implements ResponseReceiver
if (receiver instanceof HttpClient) {
HttpClient client = (HttpClient) receiver;
HttpClientConfig config = client.configuration();

InstrumentationContexts instrumentationContexts = new InstrumentationContexts();

HttpClient modified =
client
.mapConnect(new CaptureParentContext(instrumentationContexts))
.doOnRequestError(new EndOperationWithRequestError(instrumentationContexts))
.doOnRequestError(new EndOperationWithRequestError(config, instrumentationContexts))
.doOnRequest(new StartOperation(instrumentationContexts))
.doOnResponseError(new EndOperationWithResponseError(instrumentationContexts))
.doAfterResponseSuccess(new EndOperationWithSuccess(instrumentationContexts))
Expand Down Expand Up @@ -103,9 +105,12 @@ public void accept(HttpClientRequest request, Connection connection) {
private static final class EndOperationWithRequestError
implements BiConsumer<HttpClientRequest, Throwable> {

private final HttpClientConfig config;
private final InstrumentationContexts instrumentationContexts;

EndOperationWithRequestError(InstrumentationContexts instrumentationContexts) {
EndOperationWithRequestError(
HttpClientConfig config, InstrumentationContexts instrumentationContexts) {
this.config = config;
this.instrumentationContexts = instrumentationContexts;
}

Expand All @@ -114,15 +119,10 @@ public void accept(HttpClientRequest request, Throwable error) {
instrumentationContexts.endClientSpan(null, error);

if (HttpClientResend.get(instrumentationContexts.getParentContext()) == 0) {
// TODO: emit connection error span

// FIXME: this branch requires lots of changes around the NettyConnectionInstrumenter
// currently it also creates that connection error span (when the connection telemetry is
// turned off), but without HTTP semantics - it does not have access to any HTTP information
// after all
// it should be possible to completely disable it, and just start and end the span here
// this requires lots of refactoring and pretty uninteresting changes in the netty code, so
// I'll do that in a separate PR - for better readability
// request is an instance of FailedHttpClientRequest, which does not implement a correct
// resourceUrl() method -- we have to work around that
request = FailedRequestWithUrlMaker.create(config, request);
instrumentationContexts.startAndEndConnectionErrorSpan(request, error);
}
}
}
Expand Down
Loading

0 comments on commit 3de3908

Please sign in to comment.