Skip to content

Commit

Permalink
Add support for Kafka consumer and producer interceptors.
Browse files Browse the repository at this point in the history
  • Loading branch information
alesj committed Sep 13, 2021
1 parent 1a994b9 commit 32042f1
Show file tree
Hide file tree
Showing 11 changed files with 557 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
plugins {
id("otel.library-instrumentation")
}

dependencies {
implementation(project(":instrumentation:kafka-clients:kafka-clients-common:javaagent"))
compileOnly("org.apache.kafka:kafka-clients:0.11.0.0")

testLibrary("org.springframework.kafka:spring-kafka:2.4.0.RELEASE")
testLibrary("org.springframework.kafka:spring-kafka-test:2.4.0.RELEASE")
testLibrary("org.springframework:spring-core:5.2.9.RELEASE")
testImplementation("javax.xml.bind:jaxb-api:2.2.3")

latestDepTestLibrary("org.apache.kafka:kafka_2.13:+")
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Copyright 2017-2020 The OpenTracing Authors
*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.instrumentation.kafka;

import java.util.function.BiFunction;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerRecord;

/**
* Copied from OpenTracing.
* <p/>
* Returns a string to be used as the name of the spans, based on the
* operation preformed and the record the span is based off of.
*
* @author Jordan J Lopez
*/
public class ClientSpanNameProvider {

// Operation Name as Span Name
public static BiFunction<String, ConsumerRecord, String> consumerOperationName =
(operationName, consumerRecord) -> replaceIfNull(operationName);

public static BiFunction<String, ProducerRecord, String> producerOperationName =
(operationName, producerRecord) -> replaceIfNull(operationName);

private static String replaceIfNull(String input) {
return (input == null) ? "unknown" : input;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.instrumentation.kafka;

import io.opentelemetry.api.trace.Span;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerRecord;

public interface SpanDecorator {

/**
* Method called before record is sent by producer.
*/
<K, V> void onSend(ProducerRecord<K, V> record, Span span);

/**
* Method called when record is received in consumer.
*/
<K, V> void onResponse(ConsumerRecord<K, V> record, Span span);

/**
* Method called when an error occurs.
*/
<K, V> void onError(Exception exception, Span span);

/**
* Gives a SpanDecorator with the standard tags.
*/
SpanDecorator STANDARD_TAGS = new StandardSpanDecorator();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.instrumentation.kafka;

import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.common.AttributesBuilder;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes;
import java.io.PrintWriter;
import java.io.StringWriter;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerRecord;

class StandardSpanDecorator implements SpanDecorator {

static final String COMPONENT_NAME = "java-kafka";
static final String KAFKA_SERVICE = "kafka";

@Override
public <K, V> void onSend(ProducerRecord<K, V> record, Span span) {
setCommonTags(span);
span.setAttribute(SemanticAttributes.MESSAGING_DESTINATION, record.topic());
if (record.partition() != null) {
span.setAttribute("partition", record.partition());
}
}

@Override
public <K, V> void onResponse(ConsumerRecord<K, V> record, Span span) {
setCommonTags(span);
span.setAttribute(SemanticAttributes.MESSAGING_DESTINATION, record.topic());
span.setAttribute("partition", record.partition());
span.setAttribute("offset", record.offset());
}

@Override
public void onError(Exception exception, Span span) {
span.setAttribute(SemanticAttributes.EXCEPTION_ESCAPED, Boolean.TRUE);
span.setAllAttributes(errorLogs(exception));
}

private static Attributes errorLogs(Throwable throwable) {
AttributesBuilder errorLogs = Attributes.builder();
errorLogs.put("event", SemanticAttributes.EXCEPTION_TYPE.getKey());
errorLogs.put("error.kind", throwable.getClass().getName());
errorLogs.put("error.object", throwable.toString());
errorLogs.put("message", throwable.getMessage());

StringWriter sw = new StringWriter();
throwable.printStackTrace(new PrintWriter(sw));
errorLogs.put("stack", sw.toString());

return errorLogs.build();
}

private static void setCommonTags(Span span) {
span.setAttribute(SemanticAttributes.PEER_SERVICE, KAFKA_SERVICE);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.instrumentation.kafka;

import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerInterceptor;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;

public class TracingConsumerInterceptor<K, V> implements ConsumerInterceptor<K, V> {

@Override
public ConsumerRecords<K, V> onConsume(ConsumerRecords<K, V> records) {
for (ConsumerRecord<K, V> record : records) {
TracingKafkaUtils.buildAndFinishChildSpan(record);
}

return records;
}

@Override
public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {

}

@Override
public void close() {

}

@Override
public void configure(Map<String, ?> configs) {

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.instrumentation.kafka;

import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.SpanBuilder;
import io.opentelemetry.api.trace.SpanContext;
import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.api.trace.Tracer;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import io.opentelemetry.context.propagation.TextMapGetter;
import io.opentelemetry.context.propagation.TextMapPropagator;
import io.opentelemetry.context.propagation.TextMapSetter;
import io.opentelemetry.javaagent.instrumentation.kafka.KafkaHeadersRefGetter;
import java.nio.charset.StandardCharsets;
import java.util.Collection;
import java.util.Collections;
import java.util.function.BiFunction;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Headers;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@SuppressWarnings("rawtypes")
public class TracingKafkaUtils {

private static final Logger logger = LoggerFactory.getLogger(TracingKafkaUtils.class);
public static final String TO_PREFIX = "To_";
public static final String FROM_PREFIX = "From_";

private static final TextMapGetter<Headers> HEADERS_TEXT_MAP_GETTER =
new KafkaHeadersRefGetter();

private static final TextMapSetter<Headers> HEADERS_TEXT_MAP_SETTER =
(carrier, key, value) -> carrier.add(key, value.getBytes(StandardCharsets.UTF_8));

static Tracer getTracer() {
return GlobalOpenTelemetry.getTracer(StandardSpanDecorator.COMPONENT_NAME);
}

private static TextMapPropagator propagator() {
return GlobalOpenTelemetry.getPropagators().getTextMapPropagator();
}

/**
* Extract Context from record headers.
*
* @param headers record headers
* @return span context
*/
public static Context extractSpanContext(Headers headers) {
return propagator().extract(Context.current(), headers, HEADERS_TEXT_MAP_GETTER);
}

/**
* Inject current Context to record headers.
*
* @param headers record headers
*/
public static void inject(Headers headers) {
inject(Context.current(), headers);
}

/**
* Inject Context to record headers.
*
* @param context the context
* @param headers record headers
*/
public static void inject(Context context, Headers headers) {
propagator().inject(context, headers, HEADERS_TEXT_MAP_SETTER);
}

public static <K, V> Span buildAndInjectSpan(ProducerRecord<K, V> record) {
return buildAndInjectSpan(record, getTracer());
}

public static <K, V> Span buildAndInjectSpan(ProducerRecord<K, V> record, Tracer tracer) {
return buildAndInjectSpan(record, tracer, ClientSpanNameProvider.producerOperationName, null,
Collections.singletonList(SpanDecorator.STANDARD_TAGS));
}

public static <K, V> Span buildAndInjectSpan(ProducerRecord<K, V> record, Tracer tracer,
BiFunction<String, ProducerRecord, String> producerSpanNameProvider,
Context parent) {
return buildAndInjectSpan(record, tracer, producerSpanNameProvider, parent,
Collections.singletonList(SpanDecorator.STANDARD_TAGS));
}

public static <K, V> Span buildAndInjectSpan(ProducerRecord<K, V> record, Tracer tracer,
BiFunction<String, ProducerRecord, String> producerSpanNameProvider,
Context parent, Collection<SpanDecorator> spanDecorators) {

String producerOper =
TO_PREFIX + record.topic(); // <======== It provides better readability in the UI
SpanBuilder spanBuilder = tracer
.spanBuilder(producerSpanNameProvider.apply(producerOper, record))
.setSpanKind(SpanKind.PRODUCER);

Context spanContext = TracingKafkaUtils.extractSpanContext(record.headers());

if (spanContext != null) {
spanBuilder.setParent(spanContext);
} else if (parent != null) {
spanBuilder.setParent(parent);
}

Span span = spanBuilder.startSpan();
try (Scope scope = span.makeCurrent()) {

for (SpanDecorator decorator : spanDecorators) {
decorator.onSend(record, span);
}

try {
inject(record.headers());
} catch (Throwable t) {
// it can happen if headers are read only (when record is sent second time)
logger.error("failed to inject span context. sending record second time?", t);
}

}

return span;
}

public static <K, V> void buildAndFinishChildSpan(ConsumerRecord<K, V> record) {
buildAndFinishChildSpan(record, getTracer());
}

public static <K, V> void buildAndFinishChildSpan(ConsumerRecord<K, V> record, Tracer tracer) {
buildAndFinishChildSpan(record, tracer, ClientSpanNameProvider.consumerOperationName,
Collections.singletonList(SpanDecorator.STANDARD_TAGS));
}

public static <K, V> void buildAndFinishChildSpan(ConsumerRecord<K, V> record, Tracer tracer,
BiFunction<String, ConsumerRecord, String> consumerSpanNameProvider) {
buildAndFinishChildSpan(record, tracer, consumerSpanNameProvider,
Collections.singletonList(SpanDecorator.STANDARD_TAGS));
}

public static <K, V> void buildAndFinishChildSpan(ConsumerRecord<K, V> record, Tracer tracer,
BiFunction<String, ConsumerRecord, String> consumerSpanNameProvider,
Collection<SpanDecorator> spanDecorators) {

Context parentContext = TracingKafkaUtils.extractSpanContext(record.headers());
String consumerOper =
FROM_PREFIX + record.topic(); // <====== It provides better readability in the UI

SpanBuilder spanBuilder = tracer
.spanBuilder(consumerSpanNameProvider.apply(consumerOper, record))
.setSpanKind(SpanKind.CONSUMER);

if (parentContext != null) {
Span parentSpan = Span.fromContext(parentContext);
SpanContext psc = parentSpan != null ? parentSpan.getSpanContext() : null;
if (psc != null) {
spanBuilder.addLink(psc); // TODO OK?
}
}

Context context;
Span span = spanBuilder.startSpan();
try (Scope scope = span.makeCurrent()) {
context = Context.current(); // grab span's context ... OK?

for (SpanDecorator decorator : spanDecorators) {
decorator.onResponse(record, span);
}

}
span.end();

// Inject created span context into record headers for extraction by client to continue span chain
inject(context, record.headers());
}
}
Loading

0 comments on commit 32042f1

Please sign in to comment.