Skip to content

Commit

Permalink
Update the message properties through VirtualField
Browse files Browse the repository at this point in the history
  • Loading branch information
aaron-ai committed Oct 26, 2022
1 parent 1e324bb commit ac018ad
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,27 @@
package io.opentelemetry.javaagent.instrumentation.rocketmqclientjava.v5_0;

import io.opentelemetry.context.propagation.TextMapSetter;
import io.opentelemetry.instrumentation.api.util.VirtualField;
import java.util.HashMap;
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.rocketmq.client.java.message.PublishingMessageImpl;

enum MapSetter implements TextMapSetter<PublishingMessageImpl> {
INSTANCE;

@Override
public void set(@Nullable PublishingMessageImpl carrier, String key, String value) {
if (carrier == null) {
public void set(@Nullable PublishingMessageImpl message, String key, String value) {
if (message == null) {
return;
}
carrier.setTraceContext(value);
VirtualField<PublishingMessageImpl, Map<String, String>> virtualField =
VirtualField.find(PublishingMessageImpl.class, Map.class);
Map<String, String> extraProperties = virtualField.get(message);
if (extraProperties == null) {
extraProperties = new HashMap<>();
virtualField.set(message, extraProperties);
}
extraProperties.put(key, value);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package io.opentelemetry.javaagent.instrumentation.rocketmqclientjava.v5_0;

import static net.bytebuddy.matcher.ElementMatchers.isMethod;
import static net.bytebuddy.matcher.ElementMatchers.isPrivate;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
Expand Down Expand Up @@ -41,6 +42,7 @@ public void transform(TypeTransformer transformer) {
transformer.applyAdviceToMethod(
isMethod()
.and(named("send0"))
.and(isPrivate())
.and(takesArguments(6))
.and(
takesArgument(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,38 +6,49 @@
package io.opentelemetry.javaagent.instrumentation.rocketmqclientjava.v5_0;

import static net.bytebuddy.matcher.ElementMatchers.isConstructor;
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
import static net.bytebuddy.matcher.ElementMatchers.isPublic;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.namedOneOf;
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;

import io.opentelemetry.context.Context;
import io.opentelemetry.instrumentation.api.util.VirtualField;
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
import java.util.Map;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
import org.apache.rocketmq.client.apis.message.Message;
import org.apache.rocketmq.client.apis.producer.Producer;
import org.apache.rocketmq.client.apis.producer.Transaction;
import org.apache.rocketmq.client.java.message.MessageImpl;
import org.apache.rocketmq.client.java.message.PublishingMessageImpl;

final class RocketMqPublishingMessageImplInstrumentation implements TypeInstrumentation {

@Override
public ElementMatcher<TypeDescription> typeMatcher() {
return named("org.apache.rocketmq.client.java.message.PublishingMessageImpl");
return namedOneOf(
"org.apache.rocketmq.client.java.message.PublishingMessageImpl",
"org.apache.rocketmq.client.java.message.MessageImpl");
}

@Override
public void transform(TypeTransformer transformer) {
transformer.applyAdviceToMethod(
isConstructor()
.and(isPublic())
.and(takesArgument(0, named("org.apache.rocketmq.client.apis.message.Message")))
.and(
takesArgument(
1, named("org.apache.rocketmq.client.java.impl.producer.PublishingSettings")))
.and(takesArgument(2, boolean.class)),
RocketMqPublishingMessageImplInstrumentation.class.getName() + "$ConstructorAdvice");
transformer.applyAdviceToMethod(
isMethod().and(named("getProperties")).and(isPublic()),
RocketMqPublishingMessageImplInstrumentation.class.getName() + "$GetPropertiesAdvice");
}

@SuppressWarnings("unused")
Expand All @@ -59,4 +70,24 @@ public static void onExit(@Advice.This PublishingMessageImpl message) {
}
}
}

@SuppressWarnings("unused")
public static class GetPropertiesAdvice {
/** Update the message properties to propagate context recorded by {@link MapSetter}. */
@Advice.OnMethodExit(suppress = Throwable.class)
public static void onExit(
@Advice.This MessageImpl messageImpl,
@Advice.Return(readOnly = false) Map<String, String> properties) {
if (!(messageImpl instanceof PublishingMessageImpl)) {
return;
}
PublishingMessageImpl message = (PublishingMessageImpl) messageImpl;
VirtualField<PublishingMessageImpl, Map<String, String>> virtualField =
VirtualField.find(PublishingMessageImpl.class, Map.class);
Map<String, String> extraProperties = virtualField.get(message);
if (extraProperties != null) {
properties.putAll(extraProperties);
}
}
}
}

0 comments on commit ac018ad

Please sign in to comment.