Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Capture messaging header value as span attribute #6454

Merged
merged 2 commits into from
Aug 12, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.instrumentation.api.instrumenter.messaging;

import static java.util.Collections.unmodifiableList;

import io.opentelemetry.api.common.AttributeKey;
import java.util.List;
import java.util.Locale;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.stream.Collectors;

final class CapturedMessageHeadersUtil {

private static final ConcurrentMap<String, AttributeKey<List<String>>> attributeKeysCache =
new ConcurrentHashMap<>();

static List<String> lowercase(List<String> names) {
return unmodifiableList(
names.stream().map(s -> s.toLowerCase(Locale.ROOT)).collect(Collectors.toList()));
}

static AttributeKey<List<String>> attributeKey(String headerName) {
return attributeKeysCache.computeIfAbsent(headerName, n -> createKey(n));
}

private static AttributeKey<List<String>> createKey(String headerName) {
// headerName is always lowercase, see MessagingAttributesExtractor
String key = "messaging.header." + headerName.replace('-', '_');
return AttributeKey.stringArrayKey(key);
}

private CapturedMessageHeadersUtil() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

package io.opentelemetry.instrumentation.api.instrumenter.messaging;

import static io.opentelemetry.instrumentation.api.instrumenter.messaging.CapturedMessageHeadersUtil.attributeKey;
import static io.opentelemetry.instrumentation.api.instrumenter.messaging.CapturedMessageHeadersUtil.lowercase;
import static io.opentelemetry.instrumentation.api.instrumenter.messaging.MessageOperation.PROCESS;
import static io.opentelemetry.instrumentation.api.instrumenter.messaging.MessageOperation.RECEIVE;
import static io.opentelemetry.instrumentation.api.internal.AttributesExtractorUtil.internalSet;
Expand All @@ -15,6 +17,7 @@
import io.opentelemetry.instrumentation.api.internal.SpanKey;
import io.opentelemetry.instrumentation.api.internal.SpanKeyProvider;
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes;
import java.util.List;
import javax.annotation.Nullable;

/**
Expand All @@ -31,20 +34,34 @@ public final class MessagingAttributesExtractor<REQUEST, RESPONSE>
static final String TEMP_DESTINATION_NAME = "(temporary)";

/**
* Creates the messaging attributes extractor for the given {@link MessageOperation operation}.
* Creates the messaging attributes extractor for the given {@link MessageOperation operation}
* with default configuration.
*/
public static <REQUEST, RESPONSE> MessagingAttributesExtractor<REQUEST, RESPONSE> create(
MessagingAttributesGetter<REQUEST, RESPONSE> getter, MessageOperation operation) {
return new MessagingAttributesExtractor<>(getter, operation);
return builder(getter, operation).build();
}

/**
* Returns a new {@link MessagingAttributesExtractorBuilder} for the given {@link MessageOperation
* operation} that can be used to configure the messaging attributes extractor.
*/
public static <REQUEST, RESPONSE> MessagingAttributesExtractorBuilder<REQUEST, RESPONSE> builder(
MessagingAttributesGetter<REQUEST, RESPONSE> getter, MessageOperation operation) {
return new MessagingAttributesExtractorBuilder<>(getter, operation);
}

private final MessagingAttributesGetter<REQUEST, RESPONSE> getter;
private final MessageOperation operation;
private final List<String> capturedHeaders;

private MessagingAttributesExtractor(
MessagingAttributesGetter<REQUEST, RESPONSE> getter, MessageOperation operation) {
MessagingAttributesExtractor(
MessagingAttributesGetter<REQUEST, RESPONSE> getter,
MessageOperation operation,
List<String> capturedHeaders) {
this.getter = getter;
this.operation = operation;
this.capturedHeaders = lowercase(capturedHeaders);
}

@SuppressWarnings("deprecation") // operationName
Expand Down Expand Up @@ -89,6 +106,13 @@ public void onEnd(
@Nullable Throwable error) {
internalSet(
attributes, SemanticAttributes.MESSAGING_MESSAGE_ID, getter.messageId(request, response));

for (String name : capturedHeaders) {
List<String> values = getter.header(request, name);
if (!values.isEmpty()) {
internalSet(attributes, attributeKey(name), values);
}
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.instrumentation.api.instrumenter.messaging;

import static java.util.Collections.emptyList;

import java.util.List;

/** A builder of {@link MessagingAttributesExtractor}. */
public final class MessagingAttributesExtractorBuilder<REQUEST, RESPONSE> {

final MessagingAttributesGetter<REQUEST, RESPONSE> getter;
final MessageOperation operation;
List<String> capturedHeaders = emptyList();

MessagingAttributesExtractorBuilder(
MessagingAttributesGetter<REQUEST, RESPONSE> getter, MessageOperation operation) {
this.getter = getter;
this.operation = operation;
}

/**
* Configures the messaging headers that will be captured as span attributes.
*
* <p>The messaging header values will be captured under the {@code messaging.header.<name>}
* attribute key. The {@code <name>} part in the attribute key is the normalized header name:
* lowercase, with dashes replaced by underscores.
*
* @param capturedHeaders A list of messaging header names.
*/
public MessagingAttributesExtractorBuilder<REQUEST, RESPONSE> setCapturedHeaders(
List<String> capturedHeaders) {
this.capturedHeaders = capturedHeaders;
return this;
}

/**
* Returns a new {@link MessagingAttributesExtractor} with the settings of this {@link
* MessagingAttributesExtractorBuilder}.
*/
public MessagingAttributesExtractor<REQUEST, RESPONSE> build() {
return new MessagingAttributesExtractor<>(getter, operation, capturedHeaders);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

package io.opentelemetry.instrumentation.api.instrumenter.messaging;

import java.util.Collections;
import java.util.List;
import javax.annotation.Nullable;

/**
Expand Down Expand Up @@ -47,4 +49,15 @@ public interface MessagingAttributesGetter<REQUEST, RESPONSE> {

@Nullable
String messageId(REQUEST request, @Nullable RESPONSE response);

/**
* Extracts all values of header named {@code name} from the request, or an empty list if there
* were none.
*
* <p>Implementations of this method <b>must not</b> return a null value; an empty list should be
* returned instead.
*/
default List<String> header(REQUEST request, String name) {
return Collections.emptyList();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
import static java.util.logging.Level.FINE;

import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesGetter;
import java.util.Collections;
import java.util.List;
import java.util.logging.Logger;
import javax.annotation.Nullable;
import javax.jms.JMSException;
Expand Down Expand Up @@ -86,9 +88,22 @@ public Long messagePayloadCompressedSize(MessageWithDestination messageWithDesti
public String messageId(MessageWithDestination messageWithDestination, Void unused) {
try {
return messageWithDestination.message().getJMSMessageID();
} catch (JMSException e) {
logger.log(FINE, "Failure getting JMS message id", e);
} catch (JMSException exception) {
logger.log(FINE, "Failure getting JMS message id", exception);
return null;
}
}

@Override
public List<String> header(MessageWithDestination messageWithDestination, String name) {
try {
String value = messageWithDestination.message().getStringProperty(name);
if (value != null) {
return Collections.singletonList(value);
}
} catch (JMSException exception) {
logger.log(FINE, "Failure getting JMS message header", exception);
}
return Collections.emptyList();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import io.opentelemetry.instrumentation.api.instrumenter.SpanKindExtractor;
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessageOperation;
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesExtractor;
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesGetter;
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingSpanNameExtractor;
import io.opentelemetry.javaagent.bootstrap.internal.ExperimentalConfig;

Expand All @@ -31,7 +32,7 @@ private static Instrumenter<MessageWithDestination, Void> buildProducerInstrumen
GlobalOpenTelemetry.get(),
INSTRUMENTATION_NAME,
MessagingSpanNameExtractor.create(getter, operation))
.addAttributesExtractor(MessagingAttributesExtractor.create(getter, operation))
.addAttributesExtractor(buildMessagingAttributesExtractor(getter, operation))
.buildProducerInstrumenter(MessagePropertySetter.INSTANCE);
}

Expand All @@ -44,7 +45,7 @@ private static Instrumenter<MessageWithDestination, Void> buildConsumerInstrumen
GlobalOpenTelemetry.get(),
INSTRUMENTATION_NAME,
MessagingSpanNameExtractor.create(getter, operation))
.addAttributesExtractor(MessagingAttributesExtractor.create(getter, operation))
.addAttributesExtractor(buildMessagingAttributesExtractor(getter, operation))
.setEnabled(ExperimentalConfig.get().messagingReceiveInstrumentationEnabled())
.buildInstrumenter(SpanKindExtractor.alwaysConsumer());
}
Expand All @@ -57,10 +58,19 @@ private static Instrumenter<MessageWithDestination, Void> buildListenerInstrumen
GlobalOpenTelemetry.get(),
INSTRUMENTATION_NAME,
MessagingSpanNameExtractor.create(getter, operation))
.addAttributesExtractor(MessagingAttributesExtractor.create(getter, operation))
.addAttributesExtractor(buildMessagingAttributesExtractor(getter, operation))
.buildConsumerInstrumenter(MessagePropertyGetter.INSTANCE);
}

private static MessagingAttributesExtractor<MessageWithDestination, Void>
buildMessagingAttributesExtractor(
MessagingAttributesGetter<MessageWithDestination, Void> getter,
MessageOperation operation) {
return MessagingAttributesExtractor.builder(getter, operation)
.setCapturedHeaders(ExperimentalConfig.get().getMessagingHeaders())
.build();
}

public static Instrumenter<MessageWithDestination, Void> producerInstrumenter() {
return PRODUCER_INSTRUMENTER;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,39 @@ class Jms1Test extends AgentInstrumentationSpecification {
session.createTemporaryTopic() | "topic" | "(temporary)"
}

static producerSpan(TraceAssert trace, int index, String destinationType, String destinationName) {
def "capture message header as span attribute"() {
setup:
def destinationName = "someQueue"
def destinationType = "queue"
def destination = session.createQueue(destinationName)
def producer = session.createProducer(destination)
def consumer = session.createConsumer(destination)

def message = session.createTextMessage(messageText)
message.setStringProperty("test-message-header", "test")
message.setIntProperty("test-message-int-header", 1234)
producer.send(message)

TextMessage receivedMessage = consumer.receive()
String messageId = receivedMessage.getJMSMessageID()

expect:
receivedMessage.text == messageText
assertTraces(2) {
trace(0, 1) {
producerSpan(it, 0, destinationType, destinationName, true)
}
trace(1, 1) {
consumerSpan(it, 0, destinationType, destinationName, messageId, null, "receive", true)
}
}

cleanup:
producer.close()
consumer.close()
}

static producerSpan(TraceAssert trace, int index, String destinationType, String destinationName, boolean testHeaders = false) {
trace.span(index) {
name destinationName + " send"
kind PRODUCER
Expand All @@ -280,14 +312,18 @@ class Jms1Test extends AgentInstrumentationSpecification {
"$SemanticAttributes.MESSAGING_TEMP_DESTINATION" true
}
"$SemanticAttributes.MESSAGING_MESSAGE_ID" String
if (testHeaders) {
"messaging.header.test_message_header" { it == ["test"] }
"messaging.header.test_message_int_header" { it == ["1234"] }
}
}
}
}

// passing messageId = null will verify message.id is not captured,
// passing messageId = "" will verify message.id is captured (but won't verify anything about the value),
// any other value for messageId will verify that message.id is captured and has that same value
static consumerSpan(TraceAssert trace, int index, String destinationType, String destinationName, String messageId, Object parentOrLinkedSpan, String operation) {
static consumerSpan(TraceAssert trace, int index, String destinationType, String destinationName, String messageId, Object parentOrLinkedSpan, String operation, boolean testHeaders = false) {
trace.span(index) {
name destinationName + " " + operation
kind CONSUMER
Expand All @@ -308,6 +344,10 @@ class Jms1Test extends AgentInstrumentationSpecification {
if (destinationName == "(temporary)") {
"$SemanticAttributes.MESSAGING_TEMP_DESTINATION" true
}
if (testHeaders) {
"messaging.header.test_message_header" { it == ["test"] }
"messaging.header.test_message_int_header" { it == ["1234"] }
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,27 +65,28 @@ public static void onExit(

Context parentContext = currentContext();
if (consumerReceiveInstrumenter().shouldStart(parentContext, records)) {
Context context =
InstrumenterUtil.startAndEnd(
consumerReceiveInstrumenter(),
parentContext,
records,
null,
error,
timer.startTime(),
timer.now());

// we're storing the context of the receive span so that process spans can use it as parent
// context even though the span has ended
// this is the suggested behavior according to the spec batch receive scenario:
// https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/semantic_conventions/messaging.md#batch-receiving
VirtualField<ConsumerRecords<?, ?>, Context> consumerRecordsContext =
VirtualField.find(ConsumerRecords.class, Context.class);
consumerRecordsContext.set(records, context);

// disable process tracing and store the receive span for each individual record too
boolean previousValue = KafkaClientsConsumerProcessTracing.setEnabled(false);
try {
Context context =
InstrumenterUtil.startAndEnd(
consumerReceiveInstrumenter(),
parentContext,
records,
null,
error,
timer.startTime(),
timer.now());

// we're storing the context of the receive span so that process spans can use it as
// parent
// context even though the span has ended
// this is the suggested behavior according to the spec batch receive scenario:
// https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/semantic_conventions/messaging.md#batch-receiving
VirtualField<ConsumerRecords<?, ?>, Context> consumerRecordsContext =
VirtualField.find(ConsumerRecords.class, Context.class);
consumerRecordsContext.set(records, context);

VirtualField<ConsumerRecord<?, ?>, Context> consumerRecordContext =
VirtualField.find(ConsumerRecord.class, Context.class);
for (ConsumerRecord<?, ?> record : records) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ public final class KafkaSingletons {
static {
KafkaInstrumenterFactory instrumenterFactory =
new KafkaInstrumenterFactory(GlobalOpenTelemetry.get(), INSTRUMENTATION_NAME)
.setCapturedHeaders(ExperimentalConfig.get().getMessagingHeaders())
.setCaptureExperimentalSpanAttributes(
InstrumentationConfig.get()
.getBoolean("otel.instrumentation.kafka.experimental-span-attributes", false))
Expand Down
Loading