Skip to content

Commit

Permalink
Add RocketMqPublishingMessageImplInstrumentationImpl
Browse files Browse the repository at this point in the history
  • Loading branch information
aaron-ai committed Oct 25, 2022
1 parent 09ce481 commit 4f07902
Show file tree
Hide file tree
Showing 4 changed files with 74 additions and 50 deletions.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

package io.opentelemetry.javaagent.instrumentation.rocketmqclientjava.v5_0;

import static java.util.Collections.singletonList;
import static java.util.Arrays.asList;

import com.google.auto.service.AutoService;
import io.opentelemetry.javaagent.extension.instrumentation.InstrumentationModule;
Expand All @@ -20,6 +20,7 @@ public RocketMqInstrumentationModule() {

@Override
public List<TypeInstrumentation> typeInstrumentations() {
return singletonList(new RocketMqProducerInstrumentation());
return asList(
new RocketMqPublishingMessageImplInstrumentation(), new RocketMqProducerInstrumentation());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.context.Context;
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
import io.opentelemetry.instrumentation.api.util.VirtualField;
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
import java.util.ArrayList;
Expand Down Expand Up @@ -67,8 +68,14 @@ public static void onEnter(
for (int i = 0; i < count; i++) {
PublishingMessageImpl message = messages.get(i);

// Try to extract parent context from message.
Context parentContext = ParentContextExtractor.fromMessage(message);
// Try to extract parent context.
VirtualField<PublishingMessageImpl, Context> virtualField =
VirtualField.find(PublishingMessageImpl.class, Context.class);
Context parentContext = virtualField.get(message);
if (parentContext == null) {
parentContext = Context.current();
}

Span span = Span.fromContext(parentContext);
if (!span.getSpanContext().isValid()) {
parentContext = Context.current();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.rocketmqclientjava.v5_0;

import static net.bytebuddy.matcher.ElementMatchers.isConstructor;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;

import io.opentelemetry.context.Context;
import io.opentelemetry.instrumentation.api.util.VirtualField;
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
import org.apache.rocketmq.client.apis.message.Message;
import org.apache.rocketmq.client.apis.producer.Producer;
import org.apache.rocketmq.client.apis.producer.Transaction;
import org.apache.rocketmq.client.java.message.PublishingMessageImpl;

final class RocketMqPublishingMessageImplInstrumentation implements TypeInstrumentation {

@Override
public ElementMatcher<TypeDescription> typeMatcher() {
return named("org.apache.rocketmq.client.java.message.PublishingMessageImpl");
}

@Override
public void transform(TypeTransformer transformer) {
transformer.applyAdviceToMethod(
isConstructor()
.and(takesArgument(0, named("org.apache.rocketmq.client.apis.message.Message")))
.and(
takesArgument(
1, named("org.apache.rocketmq.client.java.impl.producer.PublishingSettings")))
.and(takesArgument(2, boolean.class)),
RocketMqPublishingMessageImplInstrumentation.class.getName() + "$ConstructorAdvice");
}

@SuppressWarnings("unused")
public static class ConstructorAdvice {
/**
* The constructor of {@link PublishingMessageImpl} is always called in the same thread that
* user invoke {@link Producer#send(Message)}/{@link Producer#sendAsync(Message)}/{@link
* Producer#send(Message, Transaction)}. Store the {@link Context} here and fetch it in {@link
* RocketMqProducerInstrumentation}.
*/
@Advice.OnMethodExit(suppress = Throwable.class)
public static void onExit(@Advice.This PublishingMessageImpl message) {
VirtualField<PublishingMessageImpl, Context> virtualField =
VirtualField.find(PublishingMessageImpl.class, Context.class);
Context context = virtualField.get(message);
if (context == null) {
context = Context.current();
virtualField.set(message, context);
}
}
}
}

0 comments on commit 4f07902

Please sign in to comment.