Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement capturing message headers for aws1 sqs spans #9824

Merged
merged 1 commit into from
Nov 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@

package io.opentelemetry.instrumentation.api.internal;

import java.util.Arrays;
import java.util.List;
import java.util.Locale;
import java.util.stream.Collectors;
import javax.annotation.Nullable;

/**
Expand Down Expand Up @@ -40,6 +43,21 @@ public static String getString(String propertyName) {
return System.getenv(toEnvVarName(propertyName));
}

public static List<String> getList(String propertyName, List<String> defaultValue) {
String value = getString(propertyName);
if (value == null) {
return defaultValue;
}
return filterBlanksAndNulls(value.split(","));
}

private static List<String> filterBlanksAndNulls(String[] values) {
return Arrays.stream(values)
.map(String::trim)
.filter(s -> !s.isEmpty())
.collect(Collectors.toList());
}

private static String toEnvVarName(String propertyName) {
return propertyName.toUpperCase(Locale.ROOT).replace('-', '_').replace('.', '_');
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ public class TracingRequestHandler extends RequestHandler2 {
.getBoolean("otel.instrumentation.aws-sdk.experimental-span-attributes", false))
.setMessagingReceiveInstrumentationEnabled(
ExperimentalConfig.get().messagingReceiveInstrumentationEnabled())
.setCapturedHeaders(ExperimentalConfig.get().getMessagingHeaders())
.build()
.newRequestHandler();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ dependencies {
tasks {
withType<Test>().configureEach {
systemProperty("otel.instrumentation.aws-sdk.experimental-span-attributes", "true")
systemProperty("otel.instrumentation.messaging.experimental.capture-headers", "test-message-header")
}

val testReceiveSpansDisabled by registering(Test::class) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

package io.opentelemetry.instrumentation.awssdk.v1_11.autoconfigure;

import static java.util.Collections.emptyList;

import com.amazonaws.AmazonWebServiceRequest;
import com.amazonaws.Request;
import com.amazonaws.Response;
Expand All @@ -26,6 +28,9 @@ public class TracingRequestHandler extends RequestHandler2 {
.setMessagingReceiveInstrumentationEnabled(
ConfigPropertiesUtil.getBoolean(
"otel.instrumentation.messaging.experimental.receive-telemetry.enabled", false))
.setCapturedHeaders(
ConfigPropertiesUtil.getList(
"otel.instrumentation.messaging.experimental.capture-headers", emptyList()))
.build()
.newRequestHandler();

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.instrumentation.awssdk.v1_11;

import com.amazonaws.Request;

abstract class AbstractSqsRequest {

public abstract Request<?> getRequest();
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,13 @@
import io.opentelemetry.instrumentation.api.instrumenter.http.HttpClientAttributesExtractor;
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessageOperation;
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesExtractor;
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesGetter;
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingSpanNameExtractor;
import io.opentelemetry.instrumentation.api.instrumenter.rpc.RpcClientAttributesExtractor;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.function.Function;
import javax.annotation.Nullable;

final class AwsSdkInstrumenterFactory {
Expand All @@ -47,48 +49,73 @@ final class AwsSdkInstrumenterFactory {
httpAttributesExtractor, rpcAttributesExtractor, experimentalAttributesExtractor);
private static final AwsSdkSpanNameExtractor spanName = new AwsSdkSpanNameExtractor();

static Instrumenter<Request<?>, Response<?>> requestInstrumenter(
OpenTelemetry openTelemetry, boolean captureExperimentalSpanAttributes) {
private final OpenTelemetry openTelemetry;
private final List<String> capturedHeaders;
private final boolean captureExperimentalSpanAttributes;
private final boolean messagingReceiveInstrumentationEnabled;

AwsSdkInstrumenterFactory(
OpenTelemetry openTelemetry,
List<String> capturedHeaders,
boolean captureExperimentalSpanAttributes,
boolean messagingReceiveInstrumentationEnabled) {
this.openTelemetry = openTelemetry;
this.capturedHeaders = capturedHeaders;
this.captureExperimentalSpanAttributes = captureExperimentalSpanAttributes;
this.messagingReceiveInstrumentationEnabled = messagingReceiveInstrumentationEnabled;
}

Instrumenter<Request<?>, Response<?>> requestInstrumenter() {
return createInstrumenter(
openTelemetry,
captureExperimentalSpanAttributes,
spanName,
SpanKindExtractor.alwaysClient(),
attributesExtractors(),
emptyList(),
true);
}

static Instrumenter<Request<?>, Response<?>> consumerReceiveInstrumenter(
OpenTelemetry openTelemetry,
boolean captureExperimentalSpanAttributes,
boolean messagingReceiveInstrumentationEnabled) {
return sqsInstrumenter(
private List<AttributesExtractor<Request<?>, Response<?>>> attributesExtractors() {
return captureExperimentalSpanAttributes
? extendedAttributesExtractors
: defaultAttributesExtractors;
}

private <REQUEST, RESPONSE> AttributesExtractor<REQUEST, RESPONSE> messagingAttributesExtractor(
MessagingAttributesGetter<REQUEST, RESPONSE> getter, MessageOperation operation) {
return MessagingAttributesExtractor.builder(getter, operation)
.setCapturedHeaders(capturedHeaders)
.build();
}

Instrumenter<SqsReceiveRequest, Response<?>> consumerReceiveInstrumenter() {
MessageOperation operation = MessageOperation.RECEIVE;
SqsReceiveRequestAttributesGetter getter = SqsReceiveRequestAttributesGetter.INSTANCE;
AttributesExtractor<SqsReceiveRequest, Response<?>> messagingAttributeExtractor =
messagingAttributesExtractor(getter, operation);

return createInstrumenter(
openTelemetry,
MessageOperation.RECEIVE,
captureExperimentalSpanAttributes,
MessagingSpanNameExtractor.create(getter, operation),
SpanKindExtractor.alwaysConsumer(),
toSqsRequestExtractors(attributesExtractors(), Function.identity()),
singletonList(messagingAttributeExtractor),
messagingReceiveInstrumentationEnabled);
}

static Instrumenter<SqsProcessRequest, Void> consumerProcessInstrumenter(
OpenTelemetry openTelemetry,
boolean captureExperimentalSpanAttributes,
boolean messagingReceiveInstrumentationEnabled) {
Instrumenter<SqsProcessRequest, Void> consumerProcessInstrumenter() {
MessageOperation operation = MessageOperation.PROCESS;
SqsProcessRequestAttributesGetter getter = SqsProcessRequestAttributesGetter.INSTANCE;
AttributesExtractor<SqsProcessRequest, Void> messagingAttributeExtractor =
messagingAttributesExtractor(getter, operation);

InstrumenterBuilder<SqsProcessRequest, Void> builder =
Instrumenter.<SqsProcessRequest, Void>builder(
openTelemetry,
INSTRUMENTATION_NAME,
MessagingSpanNameExtractor.create(getter, operation))
.addAttributesExtractors(
toProcessRequestExtractors(
captureExperimentalSpanAttributes
? extendedAttributesExtractors
: defaultAttributesExtractors))
.addAttributesExtractor(
MessagingAttributesExtractor.builder(getter, operation).build());
.addAttributesExtractors(toSqsRequestExtractors(attributesExtractors(), unused -> null))
.addAttributesExtractor(messagingAttributeExtractor);

if (messagingReceiveInstrumentationEnabled) {
builder.addSpanLinksExtractor(
Expand All @@ -101,77 +128,68 @@ static Instrumenter<SqsProcessRequest, Void> consumerProcessInstrumenter(
return builder.buildInstrumenter(SpanKindExtractor.alwaysConsumer());
}

private static List<AttributesExtractor<SqsProcessRequest, Void>> toProcessRequestExtractors(
List<AttributesExtractor<Request<?>, Response<?>>> extractors) {
List<AttributesExtractor<SqsProcessRequest, Void>> result = new ArrayList<>();
private static <RESPONSE>
List<AttributesExtractor<AbstractSqsRequest, RESPONSE>> toSqsRequestExtractors(
List<AttributesExtractor<Request<?>, Response<?>>> extractors,
Function<RESPONSE, Response<?>> responseConverter) {
List<AttributesExtractor<AbstractSqsRequest, RESPONSE>> result = new ArrayList<>();
for (AttributesExtractor<Request<?>, Response<?>> extractor : extractors) {
result.add(
new AttributesExtractor<SqsProcessRequest, Void>() {
new AttributesExtractor<AbstractSqsRequest, RESPONSE>() {
@Override
public void onStart(
AttributesBuilder attributes,
Context parentContext,
SqsProcessRequest sqsProcessRequest) {
extractor.onStart(attributes, parentContext, sqsProcessRequest.getRequest());
AbstractSqsRequest sqsRequest) {
extractor.onStart(attributes, parentContext, sqsRequest.getRequest());
}

@Override
public void onEnd(
AttributesBuilder attributes,
Context context,
SqsProcessRequest sqsProcessRequest,
@Nullable Void unused,
AbstractSqsRequest sqsRequest,
@Nullable RESPONSE response,
@Nullable Throwable error) {
extractor.onEnd(attributes, context, sqsProcessRequest.getRequest(), null, error);
extractor.onEnd(
attributes,
context,
sqsRequest.getRequest(),
responseConverter.apply(response),
error);
}
});
}
return result;
}

static Instrumenter<Request<?>, Response<?>> producerInstrumenter(
OpenTelemetry openTelemetry, boolean captureExperimentalSpanAttributes) {
return sqsInstrumenter(
openTelemetry, MessageOperation.PUBLISH, captureExperimentalSpanAttributes, true);
}

private static Instrumenter<Request<?>, Response<?>> sqsInstrumenter(
OpenTelemetry openTelemetry,
MessageOperation operation,
boolean captureExperimentalSpanAttributes,
boolean enabled) {
Instrumenter<Request<?>, Response<?>> producerInstrumenter() {
MessageOperation operation = MessageOperation.PUBLISH;
SqsAttributesGetter getter = SqsAttributesGetter.INSTANCE;
AttributesExtractor<Request<?>, Response<?>> messagingAttributeExtractor =
MessagingAttributesExtractor.builder(getter, operation).build();
messagingAttributesExtractor(getter, operation);

return createInstrumenter(
openTelemetry,
captureExperimentalSpanAttributes,
MessagingSpanNameExtractor.create(getter, operation),
operation == MessageOperation.PUBLISH
? SpanKindExtractor.alwaysProducer()
: SpanKindExtractor.alwaysConsumer(),
SpanKindExtractor.alwaysProducer(),
attributesExtractors(),
singletonList(messagingAttributeExtractor),
enabled);
true);
}

private static Instrumenter<Request<?>, Response<?>> createInstrumenter(
private static <REQUEST, RESPONSE> Instrumenter<REQUEST, RESPONSE> createInstrumenter(
OpenTelemetry openTelemetry,
boolean captureExperimentalSpanAttributes,
SpanNameExtractor<Request<?>> spanNameExtractor,
SpanKindExtractor<Request<?>> spanKindExtractor,
List<AttributesExtractor<Request<?>, Response<?>>> additionalAttributeExtractors,
SpanNameExtractor<REQUEST> spanNameExtractor,
SpanKindExtractor<REQUEST> spanKindExtractor,
List<? extends AttributesExtractor<? super REQUEST, ? super RESPONSE>> attributeExtractors,
List<AttributesExtractor<REQUEST, RESPONSE>> additionalAttributeExtractors,
boolean enabled) {
return Instrumenter.<Request<?>, Response<?>>builder(
return Instrumenter.<REQUEST, RESPONSE>builder(
openTelemetry, INSTRUMENTATION_NAME, spanNameExtractor)
.addAttributesExtractors(
captureExperimentalSpanAttributes
? extendedAttributesExtractors
: defaultAttributesExtractors)
.addAttributesExtractors(attributeExtractors)
.addAttributesExtractors(additionalAttributeExtractors)
.setEnabled(enabled)
.buildInstrumenter(spanKindExtractor);
}

private AwsSdkInstrumenterFactory() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.context.Context;
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
import java.util.List;

/**
* Entrypoint for instrumenting AWS SDK v1 clients.
Expand Down Expand Up @@ -45,30 +46,25 @@ public static AwsSdkTelemetryBuilder builder(OpenTelemetry openTelemetry) {
}

private final Instrumenter<Request<?>, Response<?>> requestInstrumenter;
private final Instrumenter<Request<?>, Response<?>> consumerReceiveInstrumenter;
private final Instrumenter<SqsReceiveRequest, Response<?>> consumerReceiveInstrumenter;
private final Instrumenter<SqsProcessRequest, Void> consumerProcessInstrumenter;
private final Instrumenter<Request<?>, Response<?>> producerInstrumenter;

AwsSdkTelemetry(
OpenTelemetry openTelemetry,
List<String> capturedHeaders,
boolean captureExperimentalSpanAttributes,
boolean messagingReceiveInstrumentationEnabled) {
requestInstrumenter =
AwsSdkInstrumenterFactory.requestInstrumenter(
openTelemetry, captureExperimentalSpanAttributes);
consumerReceiveInstrumenter =
AwsSdkInstrumenterFactory.consumerReceiveInstrumenter(
AwsSdkInstrumenterFactory instrumenterFactory =
new AwsSdkInstrumenterFactory(
openTelemetry,
capturedHeaders,
captureExperimentalSpanAttributes,
messagingReceiveInstrumentationEnabled);
consumerProcessInstrumenter =
AwsSdkInstrumenterFactory.consumerProcessInstrumenter(
openTelemetry,
captureExperimentalSpanAttributes,
messagingReceiveInstrumentationEnabled);
producerInstrumenter =
AwsSdkInstrumenterFactory.producerInstrumenter(
openTelemetry, captureExperimentalSpanAttributes);
requestInstrumenter = instrumenterFactory.requestInstrumenter();
consumerReceiveInstrumenter = instrumenterFactory.consumerReceiveInstrumenter();
consumerProcessInstrumenter = instrumenterFactory.consumerProcessInstrumenter();
producerInstrumenter = instrumenterFactory.producerInstrumenter();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,36 @@

package io.opentelemetry.instrumentation.awssdk.v1_11;

import static java.util.Collections.emptyList;

import com.google.errorprone.annotations.CanIgnoreReturnValue;
import io.opentelemetry.api.OpenTelemetry;
import java.util.List;

/** A builder of {@link AwsSdkTelemetry}. */
public class AwsSdkTelemetryBuilder {

private final OpenTelemetry openTelemetry;

private List<String> capturedHeaders = emptyList();
private boolean captureExperimentalSpanAttributes;
private boolean messagingReceiveInstrumentationEnabled;

AwsSdkTelemetryBuilder(OpenTelemetry openTelemetry) {
this.openTelemetry = openTelemetry;
}

/**
* Configures the messaging headers that will be captured as span attributes.
*
* @param capturedHeaders A list of messaging header names.
*/
@CanIgnoreReturnValue
public AwsSdkTelemetryBuilder setCapturedHeaders(List<String> capturedHeaders) {
this.capturedHeaders = capturedHeaders;
return this;
}

/**
* Sets whether experimental attributes should be set to spans. These attributes may be changed or
* removed in the future, so only enable this if you know you do not require attributes filled by
Expand Down Expand Up @@ -50,6 +65,9 @@ public AwsSdkTelemetryBuilder setMessagingReceiveInstrumentationEnabled(
*/
public AwsSdkTelemetry build() {
return new AwsSdkTelemetry(
openTelemetry, captureExperimentalSpanAttributes, messagingReceiveInstrumentationEnabled);
openTelemetry,
capturedHeaders,
captureExperimentalSpanAttributes,
messagingReceiveInstrumentationEnabled);
}
}
Loading
Loading