diff --git a/instrumentation/rabbitmq-2.7/javaagent/build.gradle.kts b/instrumentation/rabbitmq-2.7/javaagent/build.gradle.kts index b21b83824dea..a20f7dc2f59d 100644 --- a/instrumentation/rabbitmq-2.7/javaagent/build.gradle.kts +++ b/instrumentation/rabbitmq-2.7/javaagent/build.gradle.kts @@ -32,5 +32,7 @@ dependencies { tasks.withType().configureEach { // TODO run tests both with and without experimental span attributes jvmArgs("-Dotel.instrumentation.rabbitmq.experimental-span-attributes=true") + jvmArgs("-Dotel.instrumentation.messaging.experimental.receive-telemetry.enabled=true") + usesService(gradle.sharedServices.registrations["testcontainersBuildService"].getService()) } diff --git a/instrumentation/rabbitmq-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rabbitmq/RabbitSingletons.java b/instrumentation/rabbitmq-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rabbitmq/RabbitSingletons.java index 6bb758388d71..43a0a0469613 100644 --- a/instrumentation/rabbitmq-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rabbitmq/RabbitSingletons.java +++ b/instrumentation/rabbitmq-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rabbitmq/RabbitSingletons.java @@ -18,6 +18,7 @@ import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesExtractor; import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesGetter; import io.opentelemetry.instrumentation.api.instrumenter.net.NetClientAttributesExtractor; +import io.opentelemetry.instrumentation.api.internal.PropagatorBasedSpanLinksExtractor; import io.opentelemetry.javaagent.bootstrap.internal.ExperimentalConfig; import io.opentelemetry.javaagent.bootstrap.internal.InstrumentationConfig; import java.util.ArrayList; @@ -29,18 +30,15 @@ public final class RabbitSingletons { InstrumentationConfig.get() .getBoolean("otel.instrumentation.rabbitmq.experimental-span-attributes", false); private static final String instrumentationName = "io.opentelemetry.rabbitmq-2.7"; - private static final Instrumenter channelInstrumenter; - private static final Instrumenter receiveInstrumenter; - private static final Instrumenter deliverInstrumenter; + private static final Instrumenter channelInstrumenter = + createChannelInstrumenter(); + private static final Instrumenter receiveInstrumenter = + createReceiveInstrumenter(); + private static final Instrumenter deliverInstrumenter = + createDeliverInstrumenter(); static final ContextKey CHANNEL_AND_METHOD_CONTEXT_KEY = ContextKey.named("opentelemetry-rabbitmq-channel-and-method-context-key"); - static { - channelInstrumenter = createChannelInstrumenter(); - receiveInstrumenter = createReceiveInstrumenter(); - deliverInstrumenter = createDeliverInstrumenter(); - } - public static Instrumenter channelInstrumenter() { return channelInstrumenter; } @@ -82,7 +80,12 @@ private static Instrumenter createReceiveInstrument return Instrumenter.builder( GlobalOpenTelemetry.get(), instrumentationName, ReceiveRequest::spanName) .addAttributesExtractors(extractors) - .buildInstrumenter(SpanKindExtractor.alwaysClient()); + .setEnabled(ExperimentalConfig.get().messagingReceiveInstrumentationEnabled()) + .addSpanLinksExtractor( + new PropagatorBasedSpanLinksExtractor<>( + GlobalOpenTelemetry.getPropagators().getTextMapPropagator(), + ReceiveRequestTextMapGetter.INSTANCE)) + .buildInstrumenter(SpanKindExtractor.alwaysConsumer()); } private static Instrumenter createDeliverInstrumenter() { diff --git a/instrumentation/rabbitmq-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rabbitmq/ReceiveRequestTextMapGetter.java b/instrumentation/rabbitmq-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rabbitmq/ReceiveRequestTextMapGetter.java new file mode 100644 index 000000000000..1e34a85fc17a --- /dev/null +++ b/instrumentation/rabbitmq-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rabbitmq/ReceiveRequestTextMapGetter.java @@ -0,0 +1,40 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.rabbitmq; + +import com.rabbitmq.client.AMQP; +import com.rabbitmq.client.GetResponse; +import io.opentelemetry.context.propagation.TextMapGetter; +import java.util.Collections; +import java.util.Map; +import java.util.Optional; +import javax.annotation.Nullable; + +enum ReceiveRequestTextMapGetter implements TextMapGetter { + INSTANCE; + + @Override + public Iterable keys(ReceiveRequest carrier) { + return Optional.of(carrier) + .map(ReceiveRequest::getResponse) + .map(GetResponse::getProps) + .map(AMQP.BasicProperties::getHeaders) + .map(Map::keySet) + .orElse(Collections.emptySet()); + } + + @Nullable + @Override + public String get(@Nullable ReceiveRequest carrier, String key) { + return Optional.ofNullable(carrier) + .map(ReceiveRequest::getResponse) + .map(GetResponse::getProps) + .map(AMQP.BasicProperties::getHeaders) + .map(headers -> headers.get(key)) + .map(Object::toString) + .orElse(null); + } +} diff --git a/instrumentation/rabbitmq-2.7/javaagent/src/test/groovy/RabbitMqTest.groovy b/instrumentation/rabbitmq-2.7/javaagent/src/test/groovy/RabbitMqTest.groovy index 6a4ea6d5a6d6..456502883b35 100644 --- a/instrumentation/rabbitmq-2.7/javaagent/src/test/groovy/RabbitMqTest.groovy +++ b/instrumentation/rabbitmq-2.7/javaagent/src/test/groovy/RabbitMqTest.groovy @@ -53,11 +53,14 @@ class RabbitMqTest extends AgentInstrumentationSpecification implements WithRabb def "test rabbit publish/get"() { setup: - GetResponse response = runWithSpan("parent") { + String queueName = runWithSpan("producer parent") { channel.exchangeDeclare(exchangeName, "direct", false) String queueName = channel.queueDeclare().getQueue() channel.queueBind(queueName, exchangeName, routingKey) channel.basicPublish(exchangeName, routingKey, null, "Hello, world!".getBytes()) + return queueName + } + GetResponse response = runWithSpan("consumer parent") { return channel.basicGet(queueName, true) } @@ -65,18 +68,28 @@ class RabbitMqTest extends AgentInstrumentationSpecification implements WithRabb new String(response.getBody()) == "Hello, world!" and: - assertTraces(1) { - trace(0, 6) { + assertTraces(2) { + SpanData producerSpan + + trace(0, 5) { span(0) { - name "parent" - attributes { - } + name "producer parent" + hasNoParent() } rabbitSpan(it, 1, null, null, null, "exchange.declare", span(0)) rabbitSpan(it, 2, null, null, null, "queue.declare", span(0)) rabbitSpan(it, 3, null, null, null, "queue.bind", span(0)) rabbitSpan(it, 4, exchangeName, routingKey, "send", "$exchangeName", span(0)) - rabbitSpan(it, 5, exchangeName, routingKey, "receive", "", span(0)) + + producerSpan = span(4) + } + + trace(1, 2) { + span(0) { + name "consumer parent" + hasNoParent() + } + rabbitSpan(it, 1, exchangeName, routingKey, "receive", "", span(0), producerSpan) } } @@ -87,24 +100,39 @@ class RabbitMqTest extends AgentInstrumentationSpecification implements WithRabb def "test rabbit publish/get default exchange"() { setup: - String queueName = channel.queueDeclare().getQueue() - channel.basicPublish("", queueName, null, "Hello, world!".getBytes()) - GetResponse response = channel.basicGet(queueName, true) + String queueName = runWithSpan("producer parent") { + String queueName = channel.queueDeclare().getQueue() + channel.basicPublish("", queueName, null, "Hello, world!".getBytes()) + return queueName + } + GetResponse response = runWithSpan("consumer parent") { + return channel.basicGet(queueName, true) + } expect: new String(response.getBody()) == "Hello, world!" and: - assertTraces(3) { - traces.subList(1, 3).sort(orderByRootSpanKind(PRODUCER, CLIENT)) - trace(0, 1) { - rabbitSpan(it, 0, null, null, null, "queue.declare") - } - trace(1, 1) { - rabbitSpan(it, 0, "", null, "send", "") + assertTraces(2) { + SpanData producerSpan + + trace(0, 3) { + span(0) { + name "producer parent" + hasNoParent() + } + rabbitSpan(it, 1, null, null, null, "queue.declare", span(0)) + rabbitSpan(it, 2, "", null, "send", "", span(0)) + + producerSpan = span(2) } - trace(2, 1) { - rabbitSpan(it, 0, "", null, "receive", "", null) + + trace(1, 2) { + span(0) { + name "consumer parent" + hasNoParent() + } + rabbitSpan(it, 1, "", null, "receive", "", span(0), producerSpan) } } } @@ -142,16 +170,16 @@ class RabbitMqTest extends AgentInstrumentationSpecification implements WithRabb expect: assertTraces(4 + messageCount) { trace(0, 1) { - rabbitSpan(it, null, null, null, "exchange.declare") + rabbitSpan(it, 0, null, null, null, "exchange.declare") } trace(1, 1) { - rabbitSpan(it, null, null, null, "queue.declare") + rabbitSpan(it, 0, null, null, null, "queue.declare") } trace(2, 1) { - rabbitSpan(it, null, null, null, "queue.bind") + rabbitSpan(it, 0, null, null, null, "queue.bind") } trace(3, 1) { - rabbitSpan(it, null, null, null, "basic.consume") + rabbitSpan(it, 0, null, null, null, "basic.consume") } (1..messageCount).each { trace(3 + it, 2) { @@ -197,16 +225,16 @@ class RabbitMqTest extends AgentInstrumentationSpecification implements WithRabb expect: assertTraces(5) { trace(0, 1) { - rabbitSpan(it, null, null, null, "exchange.declare") + rabbitSpan(it, 0, null, null, null, "exchange.declare") } trace(1, 1) { - rabbitSpan(it, null, null, null, "queue.declare") + rabbitSpan(it, 0, null, null, null, "queue.declare") } trace(2, 1) { - rabbitSpan(it, null, null, null, "queue.bind") + rabbitSpan(it, 0, null, null, null, "queue.bind") } trace(3, 1) { - rabbitSpan(it, null, null, null, "basic.consume") + rabbitSpan(it, 0, null, null, null, "basic.consume") } trace(4, 2) { rabbitSpan(it, 0, exchangeName, null, "send", "$exchangeName") @@ -229,7 +257,7 @@ class RabbitMqTest extends AgentInstrumentationSpecification implements WithRabb assertTraces(1) { trace(0, 1) { - rabbitSpan(it, null, null, operation, command, null, null, error, errorMsg) + rabbitSpan(it, 0, null, null, operation, command, null, null, error, errorMsg) } } @@ -250,26 +278,41 @@ class RabbitMqTest extends AgentInstrumentationSpecification implements WithRabb setup: def connectionFactory = new CachingConnectionFactory(connectionFactory) AmqpAdmin admin = new RabbitAdmin(connectionFactory) - def queue = new Queue("some-routing-queue", false, true, true, null) - admin.declareQueue(queue) AmqpTemplate template = new RabbitTemplate(connectionFactory) - template.convertAndSend(queue.name, "foo") - String message = (String) template.receiveAndConvert(queue.name) + + def queue = new Queue("some-routing-queue", false, true, true, null) + runWithSpan("producer parent") { + admin.declareQueue(queue) + template.convertAndSend(queue.name, "foo") + } + String message = runWithSpan("consumer parent") { + return template.receiveAndConvert(queue.name) as String + } expect: message == "foo" and: - assertTraces(3) { - traces.subList(1, 3).sort(orderByRootSpanKind(PRODUCER, CLIENT)) - trace(0, 1) { - rabbitSpan(it, null, null, null, "queue.declare") - } - trace(1, 1) { - rabbitSpan(it, 0, "", "some-routing-queue", "send", "") + assertTraces(2) { + SpanData producerSpan + + trace(0, 3) { + span(0) { + name "producer parent" + hasNoParent() + } + rabbitSpan(it, 1, null, null, null, "queue.declare", span(0)) + rabbitSpan(it, 2, "", "some-routing-queue", "send", "", span(0)) + + producerSpan = span(2) } - trace(2, 1) { - rabbitSpan(it, 0, "", "some-routing-queue", "receive", queue.name, null) + + trace(1, 2) { + span(0) { + name "consumer parent" + hasNoParent() + } + rabbitSpan(it, 1, "", "some-routing-queue", "receive", queue.name, span(0), producerSpan) } } } @@ -303,7 +346,7 @@ class RabbitMqTest extends AgentInstrumentationSpecification implements WithRabb rabbitSpan(it, 0, null, null, null, "queue.declare") } trace(1, 2) { - rabbitSpan(it, 0, "", null, "send", "", true) + rabbitSpan(it, 0, "", null, "send", "", null, null, null, null, false, true) rabbitSpan(it, 1, "", null, "process", "", span(0), null, null, null, false, true) } trace(2, 1) { @@ -312,21 +355,6 @@ class RabbitMqTest extends AgentInstrumentationSpecification implements WithRabb } } - def rabbitSpan( - TraceAssert trace, - String exchange, - String routingKey, - String operation, - String resource, - Object parentSpan = null, - Object linkSpan = null, - Throwable exception = null, - String errorMsg = null, - boolean expectTimestamp = false - ) { - rabbitSpan(trace, 0, exchange, routingKey, operation, resource, parentSpan, linkSpan, exception, errorMsg, expectTimestamp) - } - def rabbitSpan( TraceAssert trace, int index, @@ -334,20 +362,8 @@ class RabbitMqTest extends AgentInstrumentationSpecification implements WithRabb String routingKey, String operation, String resource, - boolean testHeaders - ) { - rabbitSpan(trace, index, exchange, routingKey, operation, resource, null, null, null, null, false, testHeaders) - } - - def rabbitSpan( - TraceAssert trace, - int index, - String exchange, - String routingKey, - String operation, - String resource, - Object parentSpan = null, - Object linkSpan = null, + SpanData parentSpan = null, + SpanData linkSpan = null, Throwable exception = null, String errorMsg = null, boolean expectTimestamp = false, @@ -359,14 +375,14 @@ class RabbitMqTest extends AgentInstrumentationSpecification implements WithRabb spanName = spanName + " " + operation } + def rabbitCommand = trace.span(index).attributes.get(AttributeKey.stringKey("rabbitmq.command")) + def spanKind - switch (trace.span(index).attributes.get(AttributeKey.stringKey("rabbitmq.command"))) { + switch (rabbitCommand) { case "basic.publish": spanKind = PRODUCER break - case "basic.get": - spanKind = CLIENT - break + case "basic.get": // fallthrough case "basic.deliver": spanKind = CONSUMER break @@ -378,14 +394,16 @@ class RabbitMqTest extends AgentInstrumentationSpecification implements WithRabb name spanName kind spanKind - if (parentSpan) { - childOf((SpanData) parentSpan) - } else { + if (parentSpan == null) { hasNoParent() + } else { + childOf(parentSpan) } - if (linkSpan) { - hasLink((SpanData) linkSpan) + if (linkSpan == null) { + hasNoLinks() + } else { + hasLink(linkSpan) } if (exception) { @@ -394,7 +412,8 @@ class RabbitMqTest extends AgentInstrumentationSpecification implements WithRabb } attributes { - if (spanKind != CONSUMER) { + // listener does not have access to net attributes + if (rabbitCommand != "basic.deliver") { "net.sock.peer.addr" { it == "127.0.0.1" || it == "0:0:0:0:0:0:0:1" || it == null } "net.sock.peer.port" Long "net.sock.family" { it == null || it == "inet6" } @@ -415,7 +434,7 @@ class RabbitMqTest extends AgentInstrumentationSpecification implements WithRabb "messaging.header.test_message_header" { it == ["test"] } } - switch (trace.span(index).attributes.get(AttributeKey.stringKey("rabbitmq.command"))) { + switch (rabbitCommand) { case "basic.publish": "rabbitmq.command" "basic.publish" "$SemanticAttributes.MESSAGING_RABBITMQ_ROUTING_KEY" {