From 11debd993eecabda0df90c8eb1c476640d23f818 Mon Sep 17 00:00:00 2001 From: Mateusz Rzeszutek Date: Wed, 14 Dec 2022 12:21:11 +0100 Subject: [PATCH 1/3] Instrument JMS 3.0 (Jakarta) --- .../jms/jms-1.1/javaagent/build.gradle.kts | 9 +- .../jms/jms-3.0/javaagent/build.gradle.kts | 38 ++ .../jms/v3_0/JakartaDestinationAdapter.java | 65 +++ .../jms/v3_0/JakartaMessageAdapter.java | 73 ++++ .../jms/v3_0/JmsInstrumentationModule.java | 28 ++ .../JmsMessageConsumerInstrumentation.java | 90 +++++ .../JmsMessageListenerInstrumentation.java | 79 ++++ .../JmsMessageProducerInstrumentation.java | 154 +++++++ .../jms/v3_0/JmsSingletons.java | 46 +++ .../jms/v3_0/Jms3InstrumentationTest.java | 375 ++++++++++++++++++ settings.gradle.kts | 1 + ...redMessagingHeadersTestConfigSupplier.java | 2 +- 12 files changed, 956 insertions(+), 4 deletions(-) create mode 100644 instrumentation/jms/jms-3.0/javaagent/build.gradle.kts create mode 100644 instrumentation/jms/jms-3.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/jms/v3_0/JakartaDestinationAdapter.java create mode 100644 instrumentation/jms/jms-3.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/jms/v3_0/JakartaMessageAdapter.java create mode 100644 instrumentation/jms/jms-3.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/jms/v3_0/JmsInstrumentationModule.java create mode 100644 instrumentation/jms/jms-3.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/jms/v3_0/JmsMessageConsumerInstrumentation.java create mode 100644 instrumentation/jms/jms-3.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/jms/v3_0/JmsMessageListenerInstrumentation.java create mode 100644 instrumentation/jms/jms-3.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/jms/v3_0/JmsMessageProducerInstrumentation.java create mode 100644 instrumentation/jms/jms-3.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/jms/v3_0/JmsSingletons.java create mode 100644 instrumentation/jms/jms-3.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/jms/v3_0/Jms3InstrumentationTest.java diff --git a/instrumentation/jms/jms-1.1/javaagent/build.gradle.kts b/instrumentation/jms/jms-1.1/javaagent/build.gradle.kts index 102ad1426249..cbcc559f280e 100644 --- a/instrumentation/jms/jms-1.1/javaagent/build.gradle.kts +++ b/instrumentation/jms/jms-1.1/javaagent/build.gradle.kts @@ -14,6 +14,12 @@ muzzle { module.set("javax.jms-api") versions.set("(,)") } + pass { + group.set("jakarta.jms") + module.set("jakarta.jms-api") + versions.set("(,3)") + assertInverse.set(true) + } } testSets { @@ -36,9 +42,6 @@ tasks { } dependencies { - compileOnly("com.google.auto.value:auto-value-annotations") - annotationProcessor("com.google.auto.value:auto-value") - implementation(project(":instrumentation:jms:jms-common:javaagent")) compileOnly("javax.jms:jms-api:1.1-rev-1") diff --git a/instrumentation/jms/jms-3.0/javaagent/build.gradle.kts b/instrumentation/jms/jms-3.0/javaagent/build.gradle.kts new file mode 100644 index 000000000000..8a2b2b8f8e37 --- /dev/null +++ b/instrumentation/jms/jms-3.0/javaagent/build.gradle.kts @@ -0,0 +1,38 @@ +plugins { + id("otel.javaagent-instrumentation") +} + +muzzle { + pass { + group.set("jakarta.jms") + module.set("jakarta.jms-api") + versions.set("[3.0.0,)") + assertInverse.set(true) + } + fail { + group.set("javax.jms") + module.set("jms-api") + versions.set("(,)") + } + fail { + group.set("javax.jms") + module.set("javax.jms-api") + versions.set("(,)") + } +} + +dependencies { + implementation(project(":instrumentation:jms:jms-common:javaagent")) + + library("jakarta.jms:jakarta.jms-api:3.0.0") + + testImplementation("org.apache.activemq:artemis-jakarta-client:2.27.1") +} + +tasks { + test { + usesService(gradle.sharedServices.registrations["testcontainersBuildService"].service) + + jvmArgs("-Dotel.instrumentation.messaging.experimental.receive-telemetry.enabled=true") + } +} diff --git a/instrumentation/jms/jms-3.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/jms/v3_0/JakartaDestinationAdapter.java b/instrumentation/jms/jms-3.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/jms/v3_0/JakartaDestinationAdapter.java new file mode 100644 index 000000000000..05518c326c59 --- /dev/null +++ b/instrumentation/jms/jms-3.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/jms/v3_0/JakartaDestinationAdapter.java @@ -0,0 +1,65 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.jms.v3_0; + +import io.opentelemetry.javaagent.instrumentation.jms.DestinationAdapter; +import jakarta.jms.Destination; +import jakarta.jms.JMSException; +import jakarta.jms.Queue; +import jakarta.jms.TemporaryQueue; +import jakarta.jms.TemporaryTopic; +import jakarta.jms.Topic; + +public final class JakartaDestinationAdapter implements DestinationAdapter { + + public static DestinationAdapter create(Destination destination) { + return new JakartaDestinationAdapter(destination); + } + + private final Destination destination; + + private JakartaDestinationAdapter(Destination destination) { + this.destination = destination; + } + + @Override + public boolean isQueue() { + return destination instanceof Queue; + } + + @Override + public boolean isTopic() { + return destination instanceof Topic; + } + + @Override + public String getQueueName() throws JMSException { + if (!(destination instanceof Queue)) { + throw new IllegalStateException( + "This destination is not a Queue; make sure to call isQueue() before"); + } + return ((Queue) destination).getQueueName(); + } + + @Override + public String getTopicName() throws JMSException { + if (!(destination instanceof Topic)) { + throw new IllegalStateException( + "This destination is not a Topic; make sure to call isTopic() before"); + } + return ((Topic) destination).getTopicName(); + } + + @Override + public boolean isTemporaryQueue() { + return destination instanceof TemporaryQueue; + } + + @Override + public boolean isTemporaryTopic() { + return destination instanceof TemporaryTopic; + } +} diff --git a/instrumentation/jms/jms-3.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/jms/v3_0/JakartaMessageAdapter.java b/instrumentation/jms/jms-3.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/jms/v3_0/JakartaMessageAdapter.java new file mode 100644 index 000000000000..7484304dde4b --- /dev/null +++ b/instrumentation/jms/jms-3.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/jms/v3_0/JakartaMessageAdapter.java @@ -0,0 +1,73 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.jms.v3_0; + +import io.opentelemetry.javaagent.instrumentation.jms.DestinationAdapter; +import io.opentelemetry.javaagent.instrumentation.jms.MessageAdapter; +import jakarta.jms.Destination; +import jakarta.jms.JMSException; +import jakarta.jms.Message; +import java.util.Collections; +import java.util.List; +import javax.annotation.Nullable; + +public final class JakartaMessageAdapter implements MessageAdapter { + + public static MessageAdapter create(Message message) { + return new JakartaMessageAdapter(message); + } + + private final Message message; + + private JakartaMessageAdapter(Message message) { + this.message = message; + } + + @Nullable + @Override + public DestinationAdapter getJmsDestination() throws JMSException { + Destination destination = message.getJMSDestination(); + if (destination == null) { + return null; + } + return JakartaDestinationAdapter.create(destination); + } + + @Override + @SuppressWarnings("unchecked") + public List getPropertyNames() throws JMSException { + return Collections.list(message.getPropertyNames()); + } + + @Nullable + @Override + public Object getObjectProperty(String key) throws JMSException { + return message.getObjectProperty(key); + } + + @Nullable + @Override + public String getStringProperty(String key) throws JMSException { + return message.getStringProperty(key); + } + + @Override + public void setStringProperty(String key, String value) throws JMSException { + message.setStringProperty(key, value); + } + + @Nullable + @Override + public String getJmsCorrelationId() throws JMSException { + return message.getJMSCorrelationID(); + } + + @Nullable + @Override + public String getJmsMessageId() throws JMSException { + return message.getJMSMessageID(); + } +} diff --git a/instrumentation/jms/jms-3.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/jms/v3_0/JmsInstrumentationModule.java b/instrumentation/jms/jms-3.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/jms/v3_0/JmsInstrumentationModule.java new file mode 100644 index 000000000000..6be04f983186 --- /dev/null +++ b/instrumentation/jms/jms-3.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/jms/v3_0/JmsInstrumentationModule.java @@ -0,0 +1,28 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.jms.v3_0; + +import static java.util.Arrays.asList; + +import com.google.auto.service.AutoService; +import io.opentelemetry.javaagent.extension.instrumentation.InstrumentationModule; +import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; +import java.util.List; + +@AutoService(InstrumentationModule.class) +public class JmsInstrumentationModule extends InstrumentationModule { + public JmsInstrumentationModule() { + super("jms", "jms-3.0"); + } + + @Override + public List typeInstrumentations() { + return asList( + new JmsMessageConsumerInstrumentation(), + new JmsMessageListenerInstrumentation(), + new JmsMessageProducerInstrumentation()); + } +} diff --git a/instrumentation/jms/jms-3.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/jms/v3_0/JmsMessageConsumerInstrumentation.java b/instrumentation/jms/jms-3.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/jms/v3_0/JmsMessageConsumerInstrumentation.java new file mode 100644 index 000000000000..420c6c80eb1e --- /dev/null +++ b/instrumentation/jms/jms-3.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/jms/v3_0/JmsMessageConsumerInstrumentation.java @@ -0,0 +1,90 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.jms.v3_0; + +import static io.opentelemetry.javaagent.extension.matcher.AgentElementMatchers.hasClassesNamed; +import static io.opentelemetry.javaagent.extension.matcher.AgentElementMatchers.implementsInterface; +import static io.opentelemetry.javaagent.instrumentation.jms.v3_0.JmsSingletons.consumerReceiveInstrumenter; +import static net.bytebuddy.matcher.ElementMatchers.isPublic; +import static net.bytebuddy.matcher.ElementMatchers.named; +import static net.bytebuddy.matcher.ElementMatchers.returns; +import static net.bytebuddy.matcher.ElementMatchers.takesArguments; + +import io.opentelemetry.context.Context; +import io.opentelemetry.instrumentation.api.internal.InstrumenterUtil; +import io.opentelemetry.javaagent.bootstrap.Java8BytecodeBridge; +import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; +import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer; +import io.opentelemetry.javaagent.instrumentation.jms.MessageWithDestination; +import io.opentelemetry.javaagent.instrumentation.jms.Timer; +import jakarta.jms.Message; +import net.bytebuddy.asm.Advice; +import net.bytebuddy.description.type.TypeDescription; +import net.bytebuddy.matcher.ElementMatcher; + +public class JmsMessageConsumerInstrumentation implements TypeInstrumentation { + + @Override + public ElementMatcher classLoaderOptimization() { + return hasClassesNamed("jakarta.jms.MessageConsumer"); + } + + @Override + public ElementMatcher typeMatcher() { + return implementsInterface(named("jakarta.jms.MessageConsumer")); + } + + @Override + public void transform(TypeTransformer transformer) { + transformer.applyAdviceToMethod( + named("receive") + .and(takesArguments(0).or(takesArguments(1))) + .and(returns(named("jakarta.jms.Message"))) + .and(isPublic()), + JmsMessageConsumerInstrumentation.class.getName() + "$ConsumerAdvice"); + transformer.applyAdviceToMethod( + named("receiveNoWait") + .and(takesArguments(0)) + .and(returns(named("jakarta.jms.Message"))) + .and(isPublic()), + JmsMessageConsumerInstrumentation.class.getName() + "$ConsumerAdvice"); + } + + @SuppressWarnings("unused") + public static class ConsumerAdvice { + + @Advice.OnMethodEnter + public static Timer onEnter() { + return Timer.start(); + } + + @Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class) + public static void stopSpan( + @Advice.Enter Timer timer, + @Advice.Return Message message, + @Advice.Thrown Throwable throwable) { + if (message == null) { + // Do not create span when no message is received + return; + } + + Context parentContext = Java8BytecodeBridge.currentContext(); + MessageWithDestination request = + MessageWithDestination.create(JakartaMessageAdapter.create(message), null); + + if (consumerReceiveInstrumenter().shouldStart(parentContext, request)) { + InstrumenterUtil.startAndEnd( + consumerReceiveInstrumenter(), + parentContext, + request, + null, + throwable, + timer.startTime(), + timer.now()); + } + } + } +} diff --git a/instrumentation/jms/jms-3.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/jms/v3_0/JmsMessageListenerInstrumentation.java b/instrumentation/jms/jms-3.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/jms/v3_0/JmsMessageListenerInstrumentation.java new file mode 100644 index 000000000000..762a94189751 --- /dev/null +++ b/instrumentation/jms/jms-3.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/jms/v3_0/JmsMessageListenerInstrumentation.java @@ -0,0 +1,79 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.jms.v3_0; + +import static io.opentelemetry.javaagent.extension.matcher.AgentElementMatchers.hasClassesNamed; +import static io.opentelemetry.javaagent.extension.matcher.AgentElementMatchers.implementsInterface; +import static io.opentelemetry.javaagent.instrumentation.jms.v3_0.JmsSingletons.consumerProcessInstrumenter; +import static net.bytebuddy.matcher.ElementMatchers.isPublic; +import static net.bytebuddy.matcher.ElementMatchers.named; +import static net.bytebuddy.matcher.ElementMatchers.takesArgument; + +import io.opentelemetry.context.Context; +import io.opentelemetry.context.Scope; +import io.opentelemetry.javaagent.bootstrap.Java8BytecodeBridge; +import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; +import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer; +import io.opentelemetry.javaagent.instrumentation.jms.MessageWithDestination; +import jakarta.jms.Message; +import net.bytebuddy.asm.Advice; +import net.bytebuddy.description.type.TypeDescription; +import net.bytebuddy.matcher.ElementMatcher; + +public class JmsMessageListenerInstrumentation implements TypeInstrumentation { + + @Override + public ElementMatcher classLoaderOptimization() { + return hasClassesNamed("jakarta.jms.MessageListener"); + } + + @Override + public ElementMatcher typeMatcher() { + return implementsInterface(named("jakarta.jms.MessageListener")); + } + + @Override + public void transform(TypeTransformer transformer) { + transformer.applyAdviceToMethod( + named("onMessage").and(takesArgument(0, named("jakarta.jms.Message"))).and(isPublic()), + JmsMessageListenerInstrumentation.class.getName() + "$MessageListenerAdvice"); + } + + @SuppressWarnings("unused") + public static class MessageListenerAdvice { + + @Advice.OnMethodEnter(suppress = Throwable.class) + public static void onEnter( + @Advice.Argument(0) Message message, + @Advice.Local("otelRequest") MessageWithDestination request, + @Advice.Local("otelContext") Context context, + @Advice.Local("otelScope") Scope scope) { + + Context parentContext = Java8BytecodeBridge.currentContext(); + request = MessageWithDestination.create(JakartaMessageAdapter.create(message), null); + + if (!consumerProcessInstrumenter().shouldStart(parentContext, request)) { + return; + } + + context = consumerProcessInstrumenter().start(parentContext, request); + scope = context.makeCurrent(); + } + + @Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class) + public static void stopSpan( + @Advice.Local("otelRequest") MessageWithDestination request, + @Advice.Local("otelContext") Context context, + @Advice.Local("otelScope") Scope scope, + @Advice.Thrown Throwable throwable) { + if (scope == null) { + return; + } + scope.close(); + consumerProcessInstrumenter().end(context, request, null, throwable); + } + } +} diff --git a/instrumentation/jms/jms-3.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/jms/v3_0/JmsMessageProducerInstrumentation.java b/instrumentation/jms/jms-3.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/jms/v3_0/JmsMessageProducerInstrumentation.java new file mode 100644 index 000000000000..cffb5d4735d7 --- /dev/null +++ b/instrumentation/jms/jms-3.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/jms/v3_0/JmsMessageProducerInstrumentation.java @@ -0,0 +1,154 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.jms.v3_0; + +import static io.opentelemetry.javaagent.extension.matcher.AgentElementMatchers.hasClassesNamed; +import static io.opentelemetry.javaagent.extension.matcher.AgentElementMatchers.implementsInterface; +import static io.opentelemetry.javaagent.instrumentation.jms.v3_0.JmsSingletons.producerInstrumenter; +import static net.bytebuddy.matcher.ElementMatchers.isPublic; +import static net.bytebuddy.matcher.ElementMatchers.named; +import static net.bytebuddy.matcher.ElementMatchers.takesArgument; + +import io.opentelemetry.context.Context; +import io.opentelemetry.context.Scope; +import io.opentelemetry.javaagent.bootstrap.CallDepth; +import io.opentelemetry.javaagent.bootstrap.Java8BytecodeBridge; +import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; +import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer; +import io.opentelemetry.javaagent.instrumentation.jms.MessageWithDestination; +import jakarta.jms.Destination; +import jakarta.jms.JMSException; +import jakarta.jms.Message; +import jakarta.jms.MessageProducer; +import net.bytebuddy.asm.Advice; +import net.bytebuddy.description.type.TypeDescription; +import net.bytebuddy.matcher.ElementMatcher; + +public class JmsMessageProducerInstrumentation implements TypeInstrumentation { + + @Override + public ElementMatcher classLoaderOptimization() { + return hasClassesNamed("jakarta.jms.MessageProducer"); + } + + @Override + public ElementMatcher typeMatcher() { + return implementsInterface(named("jakarta.jms.MessageProducer")); + } + + @Override + public void transform(TypeTransformer transformer) { + transformer.applyAdviceToMethod( + named("send").and(takesArgument(0, named("jakarta.jms.Message"))).and(isPublic()), + JmsMessageProducerInstrumentation.class.getName() + "$ProducerAdvice"); + transformer.applyAdviceToMethod( + named("send") + .and(takesArgument(0, named("jakarta.jms.Destination"))) + .and(takesArgument(1, named("jakarta.jms.Message"))) + .and(isPublic()), + JmsMessageProducerInstrumentation.class.getName() + "$ProducerWithDestinationAdvice"); + } + + @SuppressWarnings("unused") + public static class ProducerAdvice { + + @Advice.OnMethodEnter(suppress = Throwable.class) + public static void onEnter( + @Advice.Argument(0) Message message, + @Advice.This MessageProducer producer, + @Advice.Local("otelCallDepth") CallDepth callDepth, + @Advice.Local("otelRequest") MessageWithDestination request, + @Advice.Local("otelContext") Context context, + @Advice.Local("otelScope") Scope scope) { + callDepth = CallDepth.forClass(MessageProducer.class); + if (callDepth.getAndIncrement() > 0) { + return; + } + + Destination defaultDestination; + try { + defaultDestination = producer.getDestination(); + } catch (JMSException e) { + defaultDestination = null; + } + + Context parentContext = Java8BytecodeBridge.currentContext(); + request = + MessageWithDestination.create( + JakartaMessageAdapter.create(message), + JakartaDestinationAdapter.create(defaultDestination)); + if (!producerInstrumenter().shouldStart(parentContext, request)) { + return; + } + + context = producerInstrumenter().start(parentContext, request); + scope = context.makeCurrent(); + } + + @Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class) + public static void stopSpan( + @Advice.Local("otelCallDepth") CallDepth callDepth, + @Advice.Local("otelRequest") MessageWithDestination request, + @Advice.Local("otelContext") Context context, + @Advice.Local("otelScope") Scope scope, + @Advice.Thrown Throwable throwable) { + if (callDepth.decrementAndGet() > 0) { + return; + } + + if (scope != null) { + scope.close(); + producerInstrumenter().end(context, request, null, throwable); + } + } + } + + @SuppressWarnings("unused") + public static class ProducerWithDestinationAdvice { + + @Advice.OnMethodEnter(suppress = Throwable.class) + public static void onEnter( + @Advice.Argument(0) Destination destination, + @Advice.Argument(1) Message message, + @Advice.Local("otelCallDepth") CallDepth callDepth, + @Advice.Local("otelRequest") MessageWithDestination request, + @Advice.Local("otelContext") Context context, + @Advice.Local("otelScope") Scope scope) { + callDepth = CallDepth.forClass(MessageProducer.class); + if (callDepth.getAndIncrement() > 0) { + return; + } + + Context parentContext = Java8BytecodeBridge.currentContext(); + request = + MessageWithDestination.create( + JakartaMessageAdapter.create(message), JakartaDestinationAdapter.create(destination)); + if (!producerInstrumenter().shouldStart(parentContext, request)) { + return; + } + + context = producerInstrumenter().start(parentContext, request); + scope = context.makeCurrent(); + } + + @Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class) + public static void stopSpan( + @Advice.Local("otelCallDepth") CallDepth callDepth, + @Advice.Local("otelRequest") MessageWithDestination request, + @Advice.Local("otelContext") Context context, + @Advice.Local("otelScope") Scope scope, + @Advice.Thrown Throwable throwable) { + if (callDepth.decrementAndGet() > 0) { + return; + } + + if (scope != null) { + scope.close(); + producerInstrumenter().end(context, request, null, throwable); + } + } + } +} diff --git a/instrumentation/jms/jms-3.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/jms/v3_0/JmsSingletons.java b/instrumentation/jms/jms-3.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/jms/v3_0/JmsSingletons.java new file mode 100644 index 000000000000..179befc13ee3 --- /dev/null +++ b/instrumentation/jms/jms-3.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/jms/v3_0/JmsSingletons.java @@ -0,0 +1,46 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.jms.v3_0; + +import io.opentelemetry.api.GlobalOpenTelemetry; +import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter; +import io.opentelemetry.javaagent.bootstrap.internal.ExperimentalConfig; +import io.opentelemetry.javaagent.instrumentation.jms.JmsInstrumenterFactory; +import io.opentelemetry.javaagent.instrumentation.jms.MessageWithDestination; + +public final class JmsSingletons { + private static final String INSTRUMENTATION_NAME = "io.opentelemetry.jms-3.0"; + + private static final Instrumenter PRODUCER_INSTRUMENTER; + private static final Instrumenter CONSUMER_RECEIVE_INSTRUMENTER; + private static final Instrumenter CONSUMER_PROCESS_INSTRUMENTER; + + static { + JmsInstrumenterFactory factory = + new JmsInstrumenterFactory(GlobalOpenTelemetry.get(), INSTRUMENTATION_NAME) + .setCapturedHeaders(ExperimentalConfig.get().getMessagingHeaders()) + .setMessagingReceiveInstrumentationEnabled( + ExperimentalConfig.get().messagingReceiveInstrumentationEnabled()); + + PRODUCER_INSTRUMENTER = factory.createProducerInstrumenter(); + CONSUMER_RECEIVE_INSTRUMENTER = factory.createConsumerReceiveInstrumenter(); + CONSUMER_PROCESS_INSTRUMENTER = factory.createConsumerProcessInstrumenter(); + } + + public static Instrumenter producerInstrumenter() { + return PRODUCER_INSTRUMENTER; + } + + public static Instrumenter consumerReceiveInstrumenter() { + return CONSUMER_RECEIVE_INSTRUMENTER; + } + + public static Instrumenter consumerProcessInstrumenter() { + return CONSUMER_PROCESS_INSTRUMENTER; + } + + private JmsSingletons() {} +} diff --git a/instrumentation/jms/jms-3.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/jms/v3_0/Jms3InstrumentationTest.java b/instrumentation/jms/jms-3.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/jms/v3_0/Jms3InstrumentationTest.java new file mode 100644 index 000000000000..06871da7efda --- /dev/null +++ b/instrumentation/jms/jms-3.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/jms/v3_0/Jms3InstrumentationTest.java @@ -0,0 +1,375 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.jms.v3_0; + +import static io.opentelemetry.api.common.AttributeKey.stringArrayKey; +import static io.opentelemetry.api.trace.SpanKind.CONSUMER; +import static io.opentelemetry.api.trace.SpanKind.PRODUCER; +import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.equalTo; +import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.satisfies; +import static java.util.Collections.singletonList; +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.params.provider.Arguments.arguments; + +import io.opentelemetry.instrumentation.testing.internal.AutoCleanupExtension; +import io.opentelemetry.instrumentation.testing.junit.AgentInstrumentationExtension; +import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension; +import io.opentelemetry.sdk.testing.assertj.AttributeAssertion; +import io.opentelemetry.sdk.trace.data.LinkData; +import io.opentelemetry.sdk.trace.data.SpanData; +import io.opentelemetry.semconv.trace.attributes.SemanticAttributes; +import jakarta.jms.Connection; +import jakarta.jms.Destination; +import jakarta.jms.JMSException; +import jakarta.jms.Message; +import jakarta.jms.MessageConsumer; +import jakarta.jms.MessageProducer; +import jakarta.jms.Session; +import jakarta.jms.TextMessage; +import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Stream; +import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory; +import org.apache.activemq.artemis.jms.client.ActiveMQDestination; +import org.assertj.core.api.AbstractAssert; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.extension.ExtensionContext; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.ArgumentsProvider; +import org.junit.jupiter.params.provider.ArgumentsSource; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.output.Slf4jLogConsumer; + +class Jms3InstrumentationTest { + + static final Logger logger = LoggerFactory.getLogger(Jms3InstrumentationTest.class); + + @RegisterExtension + static final InstrumentationExtension testing = AgentInstrumentationExtension.create(); + + @RegisterExtension static final AutoCleanupExtension cleanup = AutoCleanupExtension.create(); + + static GenericContainer broker; + static ActiveMQConnectionFactory connectionFactory; + static Connection connection; + static Session session; + + @BeforeAll + static void setUp() throws JMSException { + broker = + new GenericContainer<>("quay.io/artemiscloud/activemq-artemis-broker:artemis.2.27.0") + .withEnv("AMQ_USER", "test") + .withEnv("AMQ_PASSWORD", "test") + .withExposedPorts(61616, 8161) + .withLogConsumer(new Slf4jLogConsumer(logger)); + broker.start(); + + connectionFactory = + new ActiveMQConnectionFactory("tcp://localhost:" + broker.getMappedPort(61616)); + connectionFactory.setUser("test"); + connectionFactory.setPassword("test"); + + connection = connectionFactory.createConnection(); + connection.start(); + + session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + } + + @AfterAll + static void tearDown() throws JMSException { + if (session != null) { + session.close(); + } + if (connection != null) { + connection.close(); + } + if (connectionFactory != null) { + connectionFactory.close(); + } + if (broker != null) { + broker.close(); + } + } + + @ArgumentsSource(DestinationsProvider.class) + @ParameterizedTest + void testMessageConsumer( + DestinationFactory destinationFactory, String destinationKind, boolean isTemporary) + throws JMSException { + + // given + Destination destination = destinationFactory.create(session); + TextMessage sentMessage = session.createTextMessage("hello there"); + + MessageProducer producer = session.createProducer(destination); + cleanup.deferCleanup(producer); + MessageConsumer consumer = session.createConsumer(destination); + cleanup.deferCleanup(consumer); + + // when + testing.runWithSpan("producer parent", () -> producer.send(sentMessage)); + + TextMessage receivedMessage = + testing.runWithSpan("consumer parent", () -> (TextMessage) consumer.receive()); + + // then + assertThat(receivedMessage.getText()).isEqualTo(sentMessage.getText()); + + String actualDestinationName = ((ActiveMQDestination) destination).getName(); + // artemis consumers don't know whether the destination is temporary or not + String producerDestinationName = isTemporary ? "(temporary)" : actualDestinationName; + String messageId = receivedMessage.getJMSMessageID(); + + AtomicReference producerSpan = new AtomicReference<>(); + testing.waitAndAssertTraces( + trace -> { + trace.hasSpansSatisfyingExactly( + span -> span.hasName("producer parent").hasNoParent(), + span -> + span.hasName(producerDestinationName + " send") + .hasKind(PRODUCER) + .hasParent(trace.getSpan(0)) + .hasAttributesSatisfying( + equalTo(SemanticAttributes.MESSAGING_SYSTEM, "jms"), + equalTo( + SemanticAttributes.MESSAGING_DESTINATION, producerDestinationName), + equalTo(SemanticAttributes.MESSAGING_DESTINATION_KIND, destinationKind), + equalTo(SemanticAttributes.MESSAGING_MESSAGE_ID, messageId), + messagingTempDestination(isTemporary))); + + producerSpan.set(trace.getSpan(1)); + }, + trace -> + trace.hasSpansSatisfyingExactly( + span -> span.hasName("consumer parent").hasNoParent(), + span -> + span.hasName(actualDestinationName + " receive") + .hasKind(CONSUMER) + .hasParent(trace.getSpan(0)) + .hasLinks(LinkData.create(producerSpan.get().getSpanContext())) + .hasAttributesSatisfying( + equalTo(SemanticAttributes.MESSAGING_SYSTEM, "jms"), + equalTo( + SemanticAttributes.MESSAGING_DESTINATION, actualDestinationName), + equalTo(SemanticAttributes.MESSAGING_DESTINATION_KIND, destinationKind), + equalTo(SemanticAttributes.MESSAGING_OPERATION, "receive"), + equalTo(SemanticAttributes.MESSAGING_MESSAGE_ID, messageId)))); + } + + @ArgumentsSource(DestinationsProvider.class) + @ParameterizedTest + void testMessageListener( + DestinationFactory destinationFactory, String destinationKind, boolean isTemporary) + throws JMSException { + + // given + Destination destination = destinationFactory.create(session); + TextMessage sentMessage = session.createTextMessage("hello there"); + + MessageProducer producer = session.createProducer(null); + cleanup.deferCleanup(producer); + MessageConsumer consumer = session.createConsumer(destination); + cleanup.deferCleanup(consumer); + + AtomicReference receivedMessage = new AtomicReference<>(); + consumer.setMessageListener( + message -> + testing.runWithSpan("consumer", () -> receivedMessage.set((TextMessage) message))); + + // when + testing.runWithSpan("parent", () -> producer.send(destination, sentMessage)); + + // then + testing.waitForTraces(1); + + assertThat(receivedMessage.get().getText()).isEqualTo(sentMessage.getText()); + + String actualDestinationName = ((ActiveMQDestination) destination).getName(); + // artemis consumers don't know whether the destination is temporary or not + String producerDestinationName = isTemporary ? "(temporary)" : actualDestinationName; + String messageId = receivedMessage.get().getJMSMessageID(); + + testing.waitAndAssertTraces( + trace -> + trace.hasSpansSatisfyingExactly( + span -> span.hasName("parent").hasNoParent(), + span -> + span.hasName(producerDestinationName + " send") + .hasKind(PRODUCER) + .hasParent(trace.getSpan(0)) + .hasAttributesSatisfying( + equalTo(SemanticAttributes.MESSAGING_SYSTEM, "jms"), + equalTo( + SemanticAttributes.MESSAGING_DESTINATION, producerDestinationName), + equalTo(SemanticAttributes.MESSAGING_DESTINATION_KIND, destinationKind), + equalTo(SemanticAttributes.MESSAGING_MESSAGE_ID, messageId), + messagingTempDestination(isTemporary)), + span -> + span.hasName(actualDestinationName + " process") + .hasKind(CONSUMER) + .hasParent(trace.getSpan(1)) + .hasAttributesSatisfying( + equalTo(SemanticAttributes.MESSAGING_SYSTEM, "jms"), + equalTo( + SemanticAttributes.MESSAGING_DESTINATION, actualDestinationName), + equalTo(SemanticAttributes.MESSAGING_DESTINATION_KIND, destinationKind), + equalTo(SemanticAttributes.MESSAGING_OPERATION, "process"), + equalTo(SemanticAttributes.MESSAGING_MESSAGE_ID, messageId)), + span -> span.hasName("consumer").hasParent(trace.getSpan(2)))); + } + + @ArgumentsSource(EmptyReceiveArgumentsProvider.class) + @ParameterizedTest + void shouldNotEmitTelemetryOnEmptyReceive( + DestinationFactory destinationFactory, MessageReceiver receiver) throws JMSException { + + // given + Destination destination = destinationFactory.create(session); + + MessageConsumer consumer = session.createConsumer(destination); + cleanup.deferCleanup(consumer); + + // when + Message message = receiver.receive(consumer); + + // then + assertThat(message).isNull(); + + testing.waitForTraces(0); + } + + @ArgumentsSource(DestinationsProvider.class) + @ParameterizedTest + void shouldCaptureMessageHeaders( + DestinationFactory destinationFactory, String destinationKind, boolean isTemporary) + throws JMSException { + + // given + Destination destination = destinationFactory.create(session); + TextMessage sentMessage = session.createTextMessage("hello there"); + sentMessage.setStringProperty("test_message_header", "test"); + sentMessage.setIntProperty("test_message_int_header", 1234); + + MessageProducer producer = session.createProducer(destination); + cleanup.deferCleanup(producer); + MessageConsumer consumer = session.createConsumer(destination); + cleanup.deferCleanup(consumer); + + AtomicReference receivedMessage = new AtomicReference<>(); + consumer.setMessageListener( + message -> + testing.runWithSpan("consumer", () -> receivedMessage.set((TextMessage) message))); + + // when + testing.runWithSpan("parent", () -> producer.send(sentMessage)); + + // then + testing.waitForTraces(1); + + assertThat(receivedMessage.get().getText()).isEqualTo(sentMessage.getText()); + + String actualDestinationName = ((ActiveMQDestination) destination).getName(); + // artemis consumers don't know whether the destination is temporary or not + String producerDestinationName = isTemporary ? "(temporary)" : actualDestinationName; + String messageId = receivedMessage.get().getJMSMessageID(); + + testing.waitAndAssertTraces( + trace -> + trace.hasSpansSatisfyingExactly( + span -> span.hasName("parent").hasNoParent(), + span -> + span.hasName(producerDestinationName + " send") + .hasKind(PRODUCER) + .hasParent(trace.getSpan(0)) + .hasAttributesSatisfying( + equalTo(SemanticAttributes.MESSAGING_SYSTEM, "jms"), + equalTo( + SemanticAttributes.MESSAGING_DESTINATION, producerDestinationName), + equalTo(SemanticAttributes.MESSAGING_DESTINATION_KIND, destinationKind), + equalTo(SemanticAttributes.MESSAGING_MESSAGE_ID, messageId), + messagingTempDestination(isTemporary), + equalTo( + stringArrayKey("messaging.header.test_message_header"), + singletonList("test")), + equalTo( + stringArrayKey("messaging.header.test_message_int_header"), + singletonList("1234"))), + span -> + span.hasName(actualDestinationName + " process") + .hasKind(CONSUMER) + .hasParent(trace.getSpan(1)) + .hasAttributesSatisfying( + equalTo(SemanticAttributes.MESSAGING_SYSTEM, "jms"), + equalTo( + SemanticAttributes.MESSAGING_DESTINATION, actualDestinationName), + equalTo(SemanticAttributes.MESSAGING_DESTINATION_KIND, destinationKind), + equalTo(SemanticAttributes.MESSAGING_OPERATION, "process"), + equalTo(SemanticAttributes.MESSAGING_MESSAGE_ID, messageId), + equalTo( + stringArrayKey("messaging.header.test_message_header"), + singletonList("test")), + equalTo( + stringArrayKey("messaging.header.test_message_int_header"), + singletonList("1234"))), + span -> span.hasName("consumer").hasParent(trace.getSpan(2)))); + } + + private static AttributeAssertion messagingTempDestination(boolean isTemporary) { + return isTemporary + ? equalTo(SemanticAttributes.MESSAGING_TEMP_DESTINATION, true) + : satisfies(SemanticAttributes.MESSAGING_TEMP_DESTINATION, AbstractAssert::isNull); + } + + static final class EmptyReceiveArgumentsProvider implements ArgumentsProvider { + + @Override + public Stream provideArguments(ExtensionContext context) { + DestinationFactory topic = session -> session.createTopic("someTopic"); + DestinationFactory queue = session -> session.createQueue("someQueue"); + MessageReceiver receive = consumer -> consumer.receive(100); + MessageReceiver receiveNoWait = MessageConsumer::receiveNoWait; + + return Stream.of( + arguments(topic, receive), + arguments(queue, receive), + arguments(topic, receiveNoWait), + arguments(queue, receiveNoWait)); + } + } + + static final class DestinationsProvider implements ArgumentsProvider { + + @Override + public Stream provideArguments(ExtensionContext context) { + DestinationFactory topic = session -> session.createTopic("someTopic"); + DestinationFactory queue = session -> session.createQueue("someQueue"); + DestinationFactory tempTopic = Session::createTemporaryTopic; + DestinationFactory tempQueue = Session::createTemporaryQueue; + + return Stream.of( + arguments(topic, "topic", false), + arguments(queue, "queue", false), + arguments(tempTopic, "topic", true), + arguments(tempQueue, "queue", true)); + } + } + + @FunctionalInterface + interface DestinationFactory { + + Destination create(Session session) throws JMSException; + } + + @FunctionalInterface + interface MessageReceiver { + + Message receive(MessageConsumer consumer) throws JMSException; + } +} diff --git a/settings.gradle.kts b/settings.gradle.kts index 575f6c0940d3..6883a995cc76 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -291,6 +291,7 @@ hideFromDependabot(":instrumentation:jetty-httpclient:jetty-httpclient-9.2:javaa hideFromDependabot(":instrumentation:jetty-httpclient:jetty-httpclient-9.2:library") hideFromDependabot(":instrumentation:jetty-httpclient:jetty-httpclient-9.2:testing") hideFromDependabot(":instrumentation:jms:jms-1.1:javaagent") +hideFromDependabot(":instrumentation:jms:jms-3.0:javaagent") hideFromDependabot(":instrumentation:jms:jms-common:javaagent") hideFromDependabot(":instrumentation:jms:jms-common:javaagent-unit-tests") hideFromDependabot(":instrumentation:jmx-metrics:javaagent") diff --git a/testing/agent-exporter/src/main/java/io/opentelemetry/javaagent/testing/messaging/CapturedMessagingHeadersTestConfigSupplier.java b/testing/agent-exporter/src/main/java/io/opentelemetry/javaagent/testing/messaging/CapturedMessagingHeadersTestConfigSupplier.java index 7c0865aee601..231c9cca9168 100644 --- a/testing/agent-exporter/src/main/java/io/opentelemetry/javaagent/testing/messaging/CapturedMessagingHeadersTestConfigSupplier.java +++ b/testing/agent-exporter/src/main/java/io/opentelemetry/javaagent/testing/messaging/CapturedMessagingHeadersTestConfigSupplier.java @@ -25,7 +25,7 @@ private static Map getTestProperties() { Map testConfig = new HashMap<>(); testConfig.put( "otel.instrumentation.messaging.experimental.capture-headers", - // most tests use "test-message-header", "test_message_header" is used for JMS2 because + // most tests use "test-message-header", "test_message_header" is used for JMS2+ because // '-' is not allowed in a JMS property name. JMS property name should be a valid java // identifier. "test-message-header, test-message-int-header, test_message_header, test_message_int_header"); From 9cc677c27664c4a6bff704cde96d3dd7c6a83524 Mon Sep 17 00:00:00 2001 From: Mateusz Rzeszutek Date: Wed, 14 Dec 2022 13:49:22 +0100 Subject: [PATCH 2/3] jakarta jms requires java 11 --- instrumentation/jms/jms-3.0/javaagent/build.gradle.kts | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/instrumentation/jms/jms-3.0/javaagent/build.gradle.kts b/instrumentation/jms/jms-3.0/javaagent/build.gradle.kts index 8a2b2b8f8e37..f80e344c8fbb 100644 --- a/instrumentation/jms/jms-3.0/javaagent/build.gradle.kts +++ b/instrumentation/jms/jms-3.0/javaagent/build.gradle.kts @@ -29,6 +29,10 @@ dependencies { testImplementation("org.apache.activemq:artemis-jakarta-client:2.27.1") } +otelJava { + minJavaVersionSupported.set(JavaVersion.VERSION_11) +} + tasks { test { usesService(gradle.sharedServices.registrations["testcontainersBuildService"].service) From 31205a71d81d2ed8204387058b28fd1a09a637db Mon Sep 17 00:00:00 2001 From: Mateusz Rzeszutek Date: Wed, 14 Dec 2022 14:15:50 +0100 Subject: [PATCH 3/3] fix randomly failing tests --- .../jms/v3_0/Jms3InstrumentationTest.java | 30 ++++++++++--------- 1 file changed, 16 insertions(+), 14 deletions(-) diff --git a/instrumentation/jms/jms-3.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/jms/v3_0/Jms3InstrumentationTest.java b/instrumentation/jms/jms-3.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/jms/v3_0/Jms3InstrumentationTest.java index 06871da7efda..416fb5457f0e 100644 --- a/instrumentation/jms/jms-3.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/jms/v3_0/Jms3InstrumentationTest.java +++ b/instrumentation/jms/jms-3.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/jms/v3_0/Jms3InstrumentationTest.java @@ -29,6 +29,8 @@ import jakarta.jms.MessageProducer; import jakarta.jms.Session; import jakarta.jms.TextMessage; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Stream; import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory; @@ -167,7 +169,7 @@ void testMessageConsumer( @ParameterizedTest void testMessageListener( DestinationFactory destinationFactory, String destinationKind, boolean isTemporary) - throws JMSException { + throws Exception { // given Destination destination = destinationFactory.create(session); @@ -178,23 +180,23 @@ void testMessageListener( MessageConsumer consumer = session.createConsumer(destination); cleanup.deferCleanup(consumer); - AtomicReference receivedMessage = new AtomicReference<>(); + CompletableFuture receivedMessageFuture = new CompletableFuture<>(); consumer.setMessageListener( message -> - testing.runWithSpan("consumer", () -> receivedMessage.set((TextMessage) message))); + testing.runWithSpan( + "consumer", () -> receivedMessageFuture.complete((TextMessage) message))); // when testing.runWithSpan("parent", () -> producer.send(destination, sentMessage)); // then - testing.waitForTraces(1); - - assertThat(receivedMessage.get().getText()).isEqualTo(sentMessage.getText()); + TextMessage receivedMessage = receivedMessageFuture.get(10, TimeUnit.SECONDS); + assertThat(receivedMessage.getText()).isEqualTo(sentMessage.getText()); String actualDestinationName = ((ActiveMQDestination) destination).getName(); // artemis consumers don't know whether the destination is temporary or not String producerDestinationName = isTemporary ? "(temporary)" : actualDestinationName; - String messageId = receivedMessage.get().getJMSMessageID(); + String messageId = receivedMessage.getJMSMessageID(); testing.waitAndAssertTraces( trace -> @@ -249,7 +251,7 @@ void shouldNotEmitTelemetryOnEmptyReceive( @ParameterizedTest void shouldCaptureMessageHeaders( DestinationFactory destinationFactory, String destinationKind, boolean isTemporary) - throws JMSException { + throws Exception { // given Destination destination = destinationFactory.create(session); @@ -262,23 +264,23 @@ void shouldCaptureMessageHeaders( MessageConsumer consumer = session.createConsumer(destination); cleanup.deferCleanup(consumer); - AtomicReference receivedMessage = new AtomicReference<>(); + CompletableFuture receivedMessageFuture = new CompletableFuture<>(); consumer.setMessageListener( message -> - testing.runWithSpan("consumer", () -> receivedMessage.set((TextMessage) message))); + testing.runWithSpan( + "consumer", () -> receivedMessageFuture.complete((TextMessage) message))); // when testing.runWithSpan("parent", () -> producer.send(sentMessage)); // then - testing.waitForTraces(1); - - assertThat(receivedMessage.get().getText()).isEqualTo(sentMessage.getText()); + TextMessage receivedMessage = receivedMessageFuture.get(10, TimeUnit.SECONDS); + assertThat(receivedMessage.getText()).isEqualTo(sentMessage.getText()); String actualDestinationName = ((ActiveMQDestination) destination).getName(); // artemis consumers don't know whether the destination is temporary or not String producerDestinationName = isTemporary ? "(temporary)" : actualDestinationName; - String messageId = receivedMessage.get().getJMSMessageID(); + String messageId = receivedMessage.getJMSMessageID(); testing.waitAndAssertTraces( trace ->