Skip to content

Commit

Permalink
Implement producer part of RocketMQ new client instrumentation
Browse files Browse the repository at this point in the history
  • Loading branch information
aaron-ai committed Oct 21, 2022
1 parent b25283d commit 36e4537
Show file tree
Hide file tree
Showing 15 changed files with 657 additions and 1 deletion.
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# Settings for the Apache RocketMQ gRPC/protobuf-based client instrumentation

| System property | Type | Default | Description |
|---|---|---|---|
| `otel.instrumentation.rocketmq-client-java.propagation` | Boolean | `true` | Enables remote context propagation via RocketMQ message property. |
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
plugins {
id("otel.javaagent-instrumentation")
}

muzzle {
pass {
group.set("org.apache.rocketmq")
module.set("rocketmq-client-java")
versions.set("[5.0.0,)")
assertInverse.set(true)
}
}

dependencies {
library("org.apache.rocketmq:rocketmq-client-java:5.0.0")

testImplementation(project(":instrumentation:rocketmq:rocketmq-client-java:rocketmq-client-java-5.0:testing"))
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.rocketmqclientjava.v5_0;

import io.opentelemetry.context.propagation.TextMapSetter;
import javax.annotation.Nullable;
import org.apache.rocketmq.client.java.message.PublishingMessageImpl;

enum MapSetter implements TextMapSetter<PublishingMessageImpl> {
INSTANCE;

@Override
public void set(@Nullable PublishingMessageImpl carrier, String key, String value) {
if (null == carrier) {
return;
}
carrier.setTraceContext(value);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.rocketmqclientjava.v5_0;

import io.opentelemetry.api.trace.propagation.W3CTraceContextPropagator;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.propagation.TextMapGetter;
import java.util.Collections;
import java.util.Map;
import java.util.Optional;
import org.apache.rocketmq.client.java.message.PublishingMessageImpl;

public final class ParentContextExtractor {
private ParentContextExtractor() {}

public static Context fromMessage(PublishingMessageImpl message) {
Optional<String> parentTraceContext = message.getParentTraceContext();
if (!parentTraceContext.isPresent()) {
return Context.root();
}

// Use W3C standard propagator to propagate trace context.
return W3CTraceContextPropagator.getInstance()
.extract(
Context.root(),
Collections.singletonMap("traceparent", parentTraceContext.get()),
MapGetter.INSTANCE);
}

private enum MapGetter implements TextMapGetter<Map<String, String>> {
INSTANCE;

@Override
public Iterable<String> keys(Map<String, String> carrier) {
return carrier.keySet();
}

@Override
public String get(Map<String, String> carrier, String key) {
assert carrier != null;
return carrier.get(key);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.rocketmqclientjava.v5_0;

import com.google.auto.service.AutoService;
import io.opentelemetry.javaagent.extension.instrumentation.InstrumentationModule;
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import java.util.Collections;
import java.util.List;

@AutoService(InstrumentationModule.class)
public final class RocketMqInstrumentationModule extends InstrumentationModule {
public RocketMqInstrumentationModule() {
super("rocketmq-client-java", "rocketmq-client-java-5.0");
}

@Override
public List<TypeInstrumentation> typeInstrumentations() {
return Collections.singletonList(new RocketMqProducerInstrumentation());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.rocketmqclientjava.v5_0;

import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.api.trace.StatusCode;
import io.opentelemetry.instrumentation.api.instrumenter.AttributesExtractor;
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
import io.opentelemetry.instrumentation.api.instrumenter.InstrumenterBuilder;
import io.opentelemetry.instrumentation.api.instrumenter.SpanKindExtractor;
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessageOperation;
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesExtractor;
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesGetter;
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingSpanNameExtractor;
import java.util.List;
import org.apache.rocketmq.client.java.impl.producer.SendReceiptImpl;
import org.apache.rocketmq.client.java.message.PublishingMessageImpl;

final class RocketMqInstrumenterFactory {
private static final String INSTRUMENTATION_NAME = "io.opentelemetry.rocketmq-client-java-5.0";

private RocketMqInstrumenterFactory() {}

public static Instrumenter<PublishingMessageImpl, SendReceiptImpl> createProducerInstrumenter(
OpenTelemetry openTelemetry, List<String> capturedHeaders, boolean propagationEnabled) {

RocketMqProducerAttributeGetter getter = RocketMqProducerAttributeGetter.INSTANCE;
MessageOperation operation = MessageOperation.SEND;

AttributesExtractor<PublishingMessageImpl, SendReceiptImpl> attributesExtractor =
buildMessagingAttributesExtractor(getter, operation, capturedHeaders);

InstrumenterBuilder<PublishingMessageImpl, SendReceiptImpl> instrumenterBuilder =
Instrumenter.<PublishingMessageImpl, SendReceiptImpl>builder(
openTelemetry,
INSTRUMENTATION_NAME,
MessagingSpanNameExtractor.create(getter, operation))
.addAttributesExtractor(attributesExtractor)
.addAttributesExtractor(RocketMqProducerAttributeExtractor.INSTANCE)
.setSpanStatusExtractor(
(spanStatusBuilder, message, sendReceipt, error) -> {
if (null != error) {
spanStatusBuilder.setStatus(StatusCode.ERROR);
}
});

if (propagationEnabled) {
return instrumenterBuilder.buildProducerInstrumenter(MapSetter.INSTANCE);
}
return instrumenterBuilder.buildInstrumenter(SpanKindExtractor.alwaysProducer());
}

private static <T, R> MessagingAttributesExtractor<T, R> buildMessagingAttributesExtractor(
MessagingAttributesGetter<T, R> getter,
MessageOperation operation,
List<String> capturedHeaders) {
return MessagingAttributesExtractor.builder(getter, operation)
.setCapturedHeaders(capturedHeaders)
.build();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.rocketmqclientjava.v5_0;

import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MESSAGING_ROCKETMQ_MESSAGE_KEYS;
import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MESSAGING_ROCKETMQ_MESSAGE_TAG;
import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MESSAGING_ROCKETMQ_MESSAGE_TYPE;
import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MessagingRocketmqMessageTypeValues.DELAY;
import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MessagingRocketmqMessageTypeValues.FIFO;
import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MessagingRocketmqMessageTypeValues.NORMAL;
import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MessagingRocketmqMessageTypeValues.TRANSACTION;

import io.opentelemetry.api.common.AttributesBuilder;
import io.opentelemetry.context.Context;
import io.opentelemetry.instrumentation.api.instrumenter.AttributesExtractor;
import java.util.ArrayList;
import javax.annotation.Nullable;
import org.apache.rocketmq.client.java.impl.producer.SendReceiptImpl;
import org.apache.rocketmq.client.java.message.PublishingMessageImpl;

enum RocketMqProducerAttributeExtractor
implements AttributesExtractor<PublishingMessageImpl, SendReceiptImpl> {
INSTANCE;

@Override
public void onStart(
AttributesBuilder attributes, Context parentContext, PublishingMessageImpl message) {
message.getTag().ifPresent(s -> attributes.put(MESSAGING_ROCKETMQ_MESSAGE_TAG, s));
attributes.put(MESSAGING_ROCKETMQ_MESSAGE_KEYS, new ArrayList<>(message.getKeys()));
switch (message.getMessageType()) {
case FIFO:
attributes.put(MESSAGING_ROCKETMQ_MESSAGE_TYPE, FIFO);
break;
case DELAY:
attributes.put(MESSAGING_ROCKETMQ_MESSAGE_TYPE, DELAY);
break;
case TRANSACTION:
attributes.put(MESSAGING_ROCKETMQ_MESSAGE_TYPE, TRANSACTION);
break;
default:
attributes.put(MESSAGING_ROCKETMQ_MESSAGE_TYPE, NORMAL);
}
}

@Override
public void onEnd(
AttributesBuilder attributes,
Context context,
PublishingMessageImpl message,
@Nullable SendReceiptImpl sendReceipt,
@Nullable Throwable error) {}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.rocketmqclientjava.v5_0;

import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesGetter;
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes;
import java.util.Collections;
import java.util.List;
import javax.annotation.Nullable;
import org.apache.rocketmq.client.java.impl.producer.SendReceiptImpl;
import org.apache.rocketmq.client.java.message.PublishingMessageImpl;

enum RocketMqProducerAttributeGetter
implements MessagingAttributesGetter<PublishingMessageImpl, SendReceiptImpl> {
INSTANCE;

@Nullable
@Override
public String system(PublishingMessageImpl message) {
return "rocketmq";
}

@Nullable
@Override
public String destinationKind(PublishingMessageImpl message) {
return SemanticAttributes.MessagingDestinationKindValues.TOPIC;
}

@Nullable
@Override
public String destination(PublishingMessageImpl message) {
return message.getTopic();
}

@Override
public boolean temporaryDestination(PublishingMessageImpl message) {
return false;
}

@Nullable
@Override
public String protocol(PublishingMessageImpl message) {
return null;
}

@Nullable
@Override
public String protocolVersion(PublishingMessageImpl message) {
return null;
}

@Nullable
@Override
public String url(PublishingMessageImpl message) {
return null;
}

@Nullable
@Override
public String conversationId(PublishingMessageImpl message) {
return null;
}

@Nullable
@Override
public Long messagePayloadSize(PublishingMessageImpl message) {
return (long) message.getBody().remaining();
}

@Nullable
@Override
public Long messagePayloadCompressedSize(PublishingMessageImpl message) {
return null;
}

@Nullable
@Override
public String messageId(PublishingMessageImpl message, @Nullable SendReceiptImpl sendReceipt) {
return message.getMessageId().toString();
}

@Override
public List<String> header(PublishingMessageImpl message, String name) {
String value = message.getProperties().get(name);
if (value != null) {
return Collections.singletonList(value);
}
return Collections.emptyList();
}
}
Loading

0 comments on commit 36e4537

Please sign in to comment.