diff --git a/instrumentation/kafka-clients/kafka-clients-0.11/library/build.gradle.kts b/instrumentation/kafka-clients/kafka-clients-0.11/library/build.gradle.kts new file mode 100644 index 000000000000..cbd01b7af3b4 --- /dev/null +++ b/instrumentation/kafka-clients/kafka-clients-0.11/library/build.gradle.kts @@ -0,0 +1,15 @@ +plugins { + id("otel.library-instrumentation") +} + +dependencies { + implementation(project(":instrumentation:kafka-clients:kafka-clients-common:javaagent")) + compileOnly("org.apache.kafka:kafka-clients:0.11.0.0") + + testLibrary("org.springframework.kafka:spring-kafka:2.4.0.RELEASE") + testLibrary("org.springframework.kafka:spring-kafka-test:2.4.0.RELEASE") + testLibrary("org.springframework:spring-core:5.2.9.RELEASE") + testImplementation("javax.xml.bind:jaxb-api:2.2.3") + + latestDepTestLibrary("org.apache.kafka:kafka_2.13:+") +} diff --git a/instrumentation/kafka-clients/kafka-clients-0.11/library/src/main/java/io/opentelemetry/instrumentation/kafka/ClientSpanNameProvider.java b/instrumentation/kafka-clients/kafka-clients-0.11/library/src/main/java/io/opentelemetry/instrumentation/kafka/ClientSpanNameProvider.java new file mode 100644 index 000000000000..63c454546219 --- /dev/null +++ b/instrumentation/kafka-clients/kafka-clients-0.11/library/src/main/java/io/opentelemetry/instrumentation/kafka/ClientSpanNameProvider.java @@ -0,0 +1,35 @@ +/* + * Copyright 2017-2020 The OpenTracing Authors + * + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.kafka; + +import java.util.function.BiFunction; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.producer.ProducerRecord; + +/** + * Copied from OpenTracing. + *

+ * Returns a string to be used as the name of the spans, based on the + * operation preformed and the record the span is based off of. + * + * @author Jordan J Lopez + */ +public class ClientSpanNameProvider { + + // Operation Name as Span Name + public static BiFunction consumerOperationName = + (operationName, consumerRecord) -> replaceIfNull(operationName); + + public static BiFunction producerOperationName = + (operationName, producerRecord) -> replaceIfNull(operationName); + + private static String replaceIfNull(String input) { + return (input == null) ? "unknown" : input; + } + +} diff --git a/instrumentation/kafka-clients/kafka-clients-0.11/library/src/main/java/io/opentelemetry/instrumentation/kafka/SpanDecorator.java b/instrumentation/kafka-clients/kafka-clients-0.11/library/src/main/java/io/opentelemetry/instrumentation/kafka/SpanDecorator.java new file mode 100644 index 000000000000..306acf30e145 --- /dev/null +++ b/instrumentation/kafka-clients/kafka-clients-0.11/library/src/main/java/io/opentelemetry/instrumentation/kafka/SpanDecorator.java @@ -0,0 +1,33 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.kafka; + +import io.opentelemetry.api.trace.Span; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.producer.ProducerRecord; + +public interface SpanDecorator { + + /** + * Method called before record is sent by producer. + */ + void onSend(ProducerRecord record, Span span); + + /** + * Method called when record is received in consumer. + */ + void onResponse(ConsumerRecord record, Span span); + + /** + * Method called when an error occurs. + */ + void onError(Exception exception, Span span); + + /** + * Gives a SpanDecorator with the standard tags. + */ + SpanDecorator STANDARD_TAGS = new StandardSpanDecorator(); +} diff --git a/instrumentation/kafka-clients/kafka-clients-0.11/library/src/main/java/io/opentelemetry/instrumentation/kafka/StandardSpanDecorator.java b/instrumentation/kafka-clients/kafka-clients-0.11/library/src/main/java/io/opentelemetry/instrumentation/kafka/StandardSpanDecorator.java new file mode 100644 index 000000000000..c89f65008e35 --- /dev/null +++ b/instrumentation/kafka-clients/kafka-clients-0.11/library/src/main/java/io/opentelemetry/instrumentation/kafka/StandardSpanDecorator.java @@ -0,0 +1,62 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.kafka; + +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.common.AttributesBuilder; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.semconv.trace.attributes.SemanticAttributes; +import java.io.PrintWriter; +import java.io.StringWriter; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.producer.ProducerRecord; + +class StandardSpanDecorator implements SpanDecorator { + + static final String COMPONENT_NAME = "java-kafka"; + static final String KAFKA_SERVICE = "kafka"; + + @Override + public void onSend(ProducerRecord record, Span span) { + setCommonTags(span); + span.setAttribute(SemanticAttributes.MESSAGING_DESTINATION, record.topic()); + if (record.partition() != null) { + span.setAttribute("partition", record.partition()); + } + } + + @Override + public void onResponse(ConsumerRecord record, Span span) { + setCommonTags(span); + span.setAttribute(SemanticAttributes.MESSAGING_DESTINATION, record.topic()); + span.setAttribute("partition", record.partition()); + span.setAttribute("offset", record.offset()); + } + + @Override + public void onError(Exception exception, Span span) { + span.setAttribute(SemanticAttributes.EXCEPTION_ESCAPED, Boolean.TRUE); + span.setAllAttributes(errorLogs(exception)); + } + + private static Attributes errorLogs(Throwable throwable) { + AttributesBuilder errorLogs = Attributes.builder(); + errorLogs.put("event", SemanticAttributes.EXCEPTION_TYPE.getKey()); + errorLogs.put("error.kind", throwable.getClass().getName()); + errorLogs.put("error.object", throwable.toString()); + errorLogs.put("message", throwable.getMessage()); + + StringWriter sw = new StringWriter(); + throwable.printStackTrace(new PrintWriter(sw)); + errorLogs.put("stack", sw.toString()); + + return errorLogs.build(); + } + + private static void setCommonTags(Span span) { + span.setAttribute(SemanticAttributes.PEER_SERVICE, KAFKA_SERVICE); + } +} diff --git a/instrumentation/kafka-clients/kafka-clients-0.11/library/src/main/java/io/opentelemetry/instrumentation/kafka/TracingConsumerInterceptor.java b/instrumentation/kafka-clients/kafka-clients-0.11/library/src/main/java/io/opentelemetry/instrumentation/kafka/TracingConsumerInterceptor.java new file mode 100644 index 000000000000..1a3a1d64af33 --- /dev/null +++ b/instrumentation/kafka-clients/kafka-clients-0.11/library/src/main/java/io/opentelemetry/instrumentation/kafka/TracingConsumerInterceptor.java @@ -0,0 +1,40 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.kafka; + +import java.util.Map; +import org.apache.kafka.clients.consumer.ConsumerInterceptor; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.TopicPartition; + +public class TracingConsumerInterceptor implements ConsumerInterceptor { + + @Override + public ConsumerRecords onConsume(ConsumerRecords records) { + for (ConsumerRecord record : records) { + TracingKafkaUtils.buildAndFinishChildSpan(record); + } + + return records; + } + + @Override + public void onCommit(Map offsets) { + + } + + @Override + public void close() { + + } + + @Override + public void configure(Map configs) { + + } +} \ No newline at end of file diff --git a/instrumentation/kafka-clients/kafka-clients-0.11/library/src/main/java/io/opentelemetry/instrumentation/kafka/TracingKafkaUtils.java b/instrumentation/kafka-clients/kafka-clients-0.11/library/src/main/java/io/opentelemetry/instrumentation/kafka/TracingKafkaUtils.java new file mode 100644 index 000000000000..3004766b986a --- /dev/null +++ b/instrumentation/kafka-clients/kafka-clients-0.11/library/src/main/java/io/opentelemetry/instrumentation/kafka/TracingKafkaUtils.java @@ -0,0 +1,183 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.kafka; + +import io.opentelemetry.api.GlobalOpenTelemetry; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.SpanBuilder; +import io.opentelemetry.api.trace.SpanContext; +import io.opentelemetry.api.trace.SpanKind; +import io.opentelemetry.api.trace.Tracer; +import io.opentelemetry.context.Context; +import io.opentelemetry.context.Scope; +import io.opentelemetry.context.propagation.TextMapGetter; +import io.opentelemetry.context.propagation.TextMapPropagator; +import io.opentelemetry.context.propagation.TextMapSetter; +import io.opentelemetry.javaagent.instrumentation.kafka.KafkaHeadersRefGetter; +import java.nio.charset.StandardCharsets; +import java.util.Collection; +import java.util.Collections; +import java.util.function.BiFunction; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.header.Headers; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@SuppressWarnings("rawtypes") +public class TracingKafkaUtils { + + private static final Logger logger = LoggerFactory.getLogger(TracingKafkaUtils.class); + public static final String TO_PREFIX = "To_"; + public static final String FROM_PREFIX = "From_"; + + private static final TextMapGetter HEADERS_TEXT_MAP_GETTER = + new KafkaHeadersRefGetter(); + + private static final TextMapSetter HEADERS_TEXT_MAP_SETTER = + (carrier, key, value) -> carrier.add(key, value.getBytes(StandardCharsets.UTF_8)); + + static Tracer getTracer() { + return GlobalOpenTelemetry.getTracer(StandardSpanDecorator.COMPONENT_NAME); + } + + private static TextMapPropagator propagator() { + return GlobalOpenTelemetry.getPropagators().getTextMapPropagator(); + } + + /** + * Extract Context from record headers. + * + * @param headers record headers + * @return span context + */ + public static Context extractSpanContext(Headers headers) { + return propagator().extract(Context.current(), headers, HEADERS_TEXT_MAP_GETTER); + } + + /** + * Inject current Context to record headers. + * + * @param headers record headers + */ + public static void inject(Headers headers) { + inject(Context.current(), headers); + } + + /** + * Inject Context to record headers. + * + * @param context the context + * @param headers record headers + */ + public static void inject(Context context, Headers headers) { + propagator().inject(context, headers, HEADERS_TEXT_MAP_SETTER); + } + + public static Span buildAndInjectSpan(ProducerRecord record) { + return buildAndInjectSpan(record, getTracer()); + } + + public static Span buildAndInjectSpan(ProducerRecord record, Tracer tracer) { + return buildAndInjectSpan(record, tracer, ClientSpanNameProvider.producerOperationName, null, + Collections.singletonList(SpanDecorator.STANDARD_TAGS)); + } + + public static Span buildAndInjectSpan(ProducerRecord record, Tracer tracer, + BiFunction producerSpanNameProvider, + Context parent) { + return buildAndInjectSpan(record, tracer, producerSpanNameProvider, parent, + Collections.singletonList(SpanDecorator.STANDARD_TAGS)); + } + + public static Span buildAndInjectSpan(ProducerRecord record, Tracer tracer, + BiFunction producerSpanNameProvider, + Context parent, Collection spanDecorators) { + + String producerOper = + TO_PREFIX + record.topic(); // <======== It provides better readability in the UI + SpanBuilder spanBuilder = tracer + .spanBuilder(producerSpanNameProvider.apply(producerOper, record)) + .setSpanKind(SpanKind.PRODUCER); + + Context spanContext = TracingKafkaUtils.extractSpanContext(record.headers()); + + if (spanContext != null) { + spanBuilder.setParent(spanContext); + } else if (parent != null) { + spanBuilder.setParent(parent); + } + + Span span = spanBuilder.startSpan(); + try (Scope scope = span.makeCurrent()) { + + for (SpanDecorator decorator : spanDecorators) { + decorator.onSend(record, span); + } + + try { + inject(record.headers()); + } catch (Throwable t) { + // it can happen if headers are read only (when record is sent second time) + logger.error("failed to inject span context. sending record second time?", t); + } + + } + + return span; + } + + public static void buildAndFinishChildSpan(ConsumerRecord record) { + buildAndFinishChildSpan(record, getTracer()); + } + + public static void buildAndFinishChildSpan(ConsumerRecord record, Tracer tracer) { + buildAndFinishChildSpan(record, tracer, ClientSpanNameProvider.consumerOperationName, + Collections.singletonList(SpanDecorator.STANDARD_TAGS)); + } + + public static void buildAndFinishChildSpan(ConsumerRecord record, Tracer tracer, + BiFunction consumerSpanNameProvider) { + buildAndFinishChildSpan(record, tracer, consumerSpanNameProvider, + Collections.singletonList(SpanDecorator.STANDARD_TAGS)); + } + + public static void buildAndFinishChildSpan(ConsumerRecord record, Tracer tracer, + BiFunction consumerSpanNameProvider, + Collection spanDecorators) { + + Context parentContext = TracingKafkaUtils.extractSpanContext(record.headers()); + String consumerOper = + FROM_PREFIX + record.topic(); // <====== It provides better readability in the UI + + SpanBuilder spanBuilder = tracer + .spanBuilder(consumerSpanNameProvider.apply(consumerOper, record)) + .setSpanKind(SpanKind.CONSUMER); + + if (parentContext != null) { + Span parentSpan = Span.fromContext(parentContext); + SpanContext psc = parentSpan != null ? parentSpan.getSpanContext() : null; + if (psc != null) { + spanBuilder.addLink(psc); // TODO OK? + } + } + + Context context; + Span span = spanBuilder.startSpan(); + try (Scope scope = span.makeCurrent()) { + context = Context.current(); // grab span's context ... OK? + + for (SpanDecorator decorator : spanDecorators) { + decorator.onResponse(record, span); + } + + } + span.end(); + + // Inject created span context into record headers for extraction by client to continue span chain + inject(context, record.headers()); + } +} diff --git a/instrumentation/kafka-clients/kafka-clients-0.11/library/src/main/java/io/opentelemetry/instrumentation/kafka/TracingProducerInterceptor.java b/instrumentation/kafka-clients/kafka-clients-0.11/library/src/main/java/io/opentelemetry/instrumentation/kafka/TracingProducerInterceptor.java new file mode 100644 index 000000000000..3e5bc1f9ebae --- /dev/null +++ b/instrumentation/kafka-clients/kafka-clients-0.11/library/src/main/java/io/opentelemetry/instrumentation/kafka/TracingProducerInterceptor.java @@ -0,0 +1,34 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.kafka; + +import java.util.Map; +import org.apache.kafka.clients.producer.ProducerInterceptor; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; + +public class TracingProducerInterceptor implements ProducerInterceptor { + @Override + public ProducerRecord onSend(ProducerRecord producerRecord) { + TracingKafkaUtils.buildAndInjectSpan(producerRecord).end(); + return producerRecord; + } + + @Override + public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) { + + } + + @Override + public void close() { + + } + + @Override + public void configure(Map map) { + + } +} \ No newline at end of file diff --git a/instrumentation/kafka-clients/kafka-clients-0.11/library/src/test/groovy/io/opentelemetry/instrumentation/kafka/InterceptorsTest.groovy b/instrumentation/kafka-clients/kafka-clients-0.11/library/src/test/groovy/io/opentelemetry/instrumentation/kafka/InterceptorsTest.groovy new file mode 100644 index 000000000000..0f6cbe6800a2 --- /dev/null +++ b/instrumentation/kafka-clients/kafka-clients-0.11/library/src/test/groovy/io/opentelemetry/instrumentation/kafka/InterceptorsTest.groovy @@ -0,0 +1,112 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.kafka + +import io.opentelemetry.instrumentation.test.InstrumentationSpecification +import io.opentelemetry.instrumentation.test.LibraryTestTrait +import org.apache.kafka.clients.consumer.ConsumerConfig +import org.apache.kafka.clients.consumer.ConsumerRecord +import org.apache.kafka.clients.producer.ProducerConfig +import org.apache.kafka.common.header.Header +import org.apache.kafka.common.header.Headers +import org.junit.Rule +import org.springframework.kafka.core.DefaultKafkaConsumerFactory +import org.springframework.kafka.core.DefaultKafkaProducerFactory +import org.springframework.kafka.core.KafkaTemplate +import org.springframework.kafka.listener.ContainerProperties +import org.springframework.kafka.listener.KafkaMessageListenerContainer +import org.springframework.kafka.listener.MessageListener +import org.springframework.kafka.test.rule.EmbeddedKafkaRule +import org.springframework.kafka.test.utils.ContainerTestUtils +import org.springframework.kafka.test.utils.KafkaTestUtils +import spock.lang.Unroll + +import java.nio.charset.StandardCharsets +import java.util.concurrent.LinkedBlockingQueue +import java.util.concurrent.TimeUnit +import java.util.function.Consumer + +class InterceptorsTest extends InstrumentationSpecification implements LibraryTestTrait { + + private static final TOPIC = "xtopic" + + @Rule + EmbeddedKafkaRule embeddedKafka = new EmbeddedKafkaRule(1, true, TOPIC) + + @Unroll + def "test interceptors"() throws Exception { + setup: + def senderProps = KafkaTestUtils.producerProps( + embeddedKafka.getEmbeddedKafka().getBrokersAsString()) + senderProps.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, TracingProducerInterceptor.class.getName()) + + def producerFactory = new DefaultKafkaProducerFactory(senderProps) + def kafkaTemplate = new KafkaTemplate(producerFactory) + + // set up the Kafka consumer properties + def consumerProperties = KafkaTestUtils.consumerProps("xgroup", "true", embeddedKafka.getEmbeddedKafka()) + consumerProperties.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, TracingConsumerInterceptor.class.getName()) + + // create a Kafka consumer factory + def consumerFactory = new DefaultKafkaConsumerFactory(consumerProperties) + + // set the topic that needs to be consumed + def containerProperties = new ContainerProperties(TOPIC); + + // create a Kafka MessageListenerContainer + def container = new KafkaMessageListenerContainer(consumerFactory, containerProperties) + + // create a thread safe queue to store the received message + def records = new LinkedBlockingQueue>() + + // setup a Kafka message listener + container.setupMessageListener(new MessageListener() { + @Override + void onMessage(ConsumerRecord record) { + records.add(record) + } + }) + + // start the container and underlying message listener + container.start() + + // wait until the container has the required number of assigned partitions + ContainerTestUtils.waitForAssignment(container, embeddedKafka.getEmbeddedKafka().getPartitionsPerTopic()) + + when: + String message = "Testing 123" + def produced = kafkaTemplate.send(TOPIC, message).get().producerRecord + + then: + // check that the message was received + def received = records.poll(5, TimeUnit.SECONDS) + + // TODO -- add asserts ... + println "XXX------------------------------" + println "produced = " + printHeader(produced.headers()) + println "received = " + printHeader(received.headers()) + + cleanup: + producerFactory.destroy() + container?.stop() + } + + def printHeader(Headers headers) { + def output = new StringBuilder() + headers.forEach(new Consumer

() { + @Override + void accept(Header header) { + output + .append(header.key()) + .append("=") + .append(new String(header.value(), StandardCharsets.UTF_8)) + .append(",") + } + }) + return output + } +} + diff --git a/instrumentation/kafka-clients/kafka-clients-0.11/library/src/test/resources/log4j.properties b/instrumentation/kafka-clients/kafka-clients-0.11/library/src/test/resources/log4j.properties new file mode 100644 index 000000000000..5cb4ed0e4283 --- /dev/null +++ b/instrumentation/kafka-clients/kafka-clients-0.11/library/src/test/resources/log4j.properties @@ -0,0 +1,5 @@ +# Set root logger level to INFO and its only appender to C1. +log4j.rootLogger=INFO, A1 + +# C1 is set to be a ConsoleAppender. +log4j.appender.C1=org.apache.log4j.ConsoleAppender \ No newline at end of file diff --git a/instrumentation/kafka-clients/kafka-clients-common/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafka/KafkaHeadersRefGetter.java b/instrumentation/kafka-clients/kafka-clients-common/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafka/KafkaHeadersRefGetter.java new file mode 100644 index 000000000000..ea37fac6d02d --- /dev/null +++ b/instrumentation/kafka-clients/kafka-clients-common/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafka/KafkaHeadersRefGetter.java @@ -0,0 +1,37 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.kafka; + +import io.opentelemetry.context.propagation.TextMapGetter; +import java.nio.charset.StandardCharsets; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; +import org.apache.kafka.common.header.Header; +import org.apache.kafka.common.header.Headers; +import org.checkerframework.checker.nullness.qual.Nullable; + +public final class KafkaHeadersRefGetter implements TextMapGetter { + @Override + public Iterable keys(Headers carrier) { + return StreamSupport.stream(carrier.spliterator(), false) + .map(Header::key) + .collect(Collectors.toList()); + } + + @Nullable + @Override + public String get(@Nullable Headers carrier, String key) { + Header header = carrier.lastHeader(key); + if (header == null) { + return null; + } + byte[] value = header.value(); + if (value == null) { + return null; + } + return new String(value, StandardCharsets.UTF_8); + } +} diff --git a/settings.gradle.kts b/settings.gradle.kts index 59baa53cde98..c8af673b9a5c 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -211,6 +211,7 @@ include(":instrumentation:jsf:mojarra-1.2:javaagent") include(":instrumentation:jsf:myfaces-1.2:javaagent") include(":instrumentation:jsp-2.3:javaagent") include(":instrumentation:kafka-clients:kafka-clients-0.11:javaagent") +include(":instrumentation:kafka-clients:kafka-clients-0.11:library") include(":instrumentation:kafka-clients:kafka-clients-2.4.0-testing") include(":instrumentation:kafka-clients:kafka-clients-common:javaagent") include(":instrumentation:kafka-streams-0.11:javaagent")