diff --git a/instrumentation/kafka-clients/kafka-clients-0.11/javaagent/build.gradle.kts b/instrumentation/kafka-clients/kafka-clients-0.11/javaagent/build.gradle.kts index acf4944cfe3d..9335e62c5380 100644 --- a/instrumentation/kafka-clients/kafka-clients-0.11/javaagent/build.gradle.kts +++ b/instrumentation/kafka-clients/kafka-clients-0.11/javaagent/build.gradle.kts @@ -12,10 +12,7 @@ muzzle { } dependencies { - compileOnly("com.google.auto.value:auto-value-annotations") - annotationProcessor("com.google.auto.value:auto-value") - - implementation(project(":instrumentation:kafka-clients:kafka-clients-common:javaagent")) + implementation(project(":instrumentation:kafka-clients:kafka-clients-common:library")) library("org.apache.kafka:kafka-clients:0.11.0.0") diff --git a/instrumentation/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/ConsumerRecordsInstrumentation.java b/instrumentation/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/ConsumerRecordsInstrumentation.java index 5893dc12a42a..f6164aad1232 100644 --- a/instrumentation/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/ConsumerRecordsInstrumentation.java +++ b/instrumentation/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/ConsumerRecordsInstrumentation.java @@ -13,6 +13,9 @@ import static net.bytebuddy.matcher.ElementMatchers.takesArguments; import io.opentelemetry.api.trace.SpanContext; +import io.opentelemetry.instrumentation.kafka.TracingIterable; +import io.opentelemetry.instrumentation.kafka.TracingIterator; +import io.opentelemetry.instrumentation.kafka.TracingList; import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer; import io.opentelemetry.javaagent.instrumentation.api.ContextStore; diff --git a/instrumentation/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/KafkaConsumerInstrumentation.java b/instrumentation/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/KafkaConsumerInstrumentation.java index 8674438ce77f..669e70d77068 100644 --- a/instrumentation/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/KafkaConsumerInstrumentation.java +++ b/instrumentation/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/KafkaConsumerInstrumentation.java @@ -5,9 +5,9 @@ package io.opentelemetry.javaagent.instrumentation.kafkaclients; +import static io.opentelemetry.instrumentation.kafka.KafkaSingletons.consumerReceiveInstrumenter; import static io.opentelemetry.javaagent.instrumentation.api.Java8BytecodeBridge.currentContext; import static io.opentelemetry.javaagent.instrumentation.api.Java8BytecodeBridge.spanFromContext; -import static io.opentelemetry.javaagent.instrumentation.kafkaclients.KafkaSingletons.consumerReceiveInstrumenter; import static net.bytebuddy.matcher.ElementMatchers.isPublic; import static net.bytebuddy.matcher.ElementMatchers.named; import static net.bytebuddy.matcher.ElementMatchers.returns; @@ -16,6 +16,8 @@ import io.opentelemetry.api.trace.SpanContext; import io.opentelemetry.context.Context; +import io.opentelemetry.instrumentation.kafka.ReceivedRecords; +import io.opentelemetry.instrumentation.kafka.Timer; import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer; import io.opentelemetry.javaagent.instrumentation.api.ContextStore; diff --git a/instrumentation/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/KafkaProducerInstrumentation.java b/instrumentation/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/KafkaProducerInstrumentation.java index c599b5f450ad..3359436ca317 100644 --- a/instrumentation/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/KafkaProducerInstrumentation.java +++ b/instrumentation/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/KafkaProducerInstrumentation.java @@ -5,7 +5,7 @@ package io.opentelemetry.javaagent.instrumentation.kafkaclients; -import static io.opentelemetry.javaagent.instrumentation.kafkaclients.KafkaSingletons.producerInstrumenter; +import static io.opentelemetry.instrumentation.kafka.KafkaSingletons.producerInstrumenter; import static net.bytebuddy.matcher.ElementMatchers.isMethod; import static net.bytebuddy.matcher.ElementMatchers.isPublic; import static net.bytebuddy.matcher.ElementMatchers.named; @@ -13,6 +13,8 @@ import io.opentelemetry.context.Context; import io.opentelemetry.context.Scope; +import io.opentelemetry.instrumentation.kafka.KafkaPropagation; +import io.opentelemetry.instrumentation.kafka.ProducerCallback; import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer; import io.opentelemetry.javaagent.instrumentation.api.Java8BytecodeBridge; diff --git a/instrumentation/kafka-clients/kafka-clients-0.11/library/build.gradle.kts b/instrumentation/kafka-clients/kafka-clients-0.11/library/build.gradle.kts new file mode 100644 index 000000000000..fc7661072cb5 --- /dev/null +++ b/instrumentation/kafka-clients/kafka-clients-0.11/library/build.gradle.kts @@ -0,0 +1,15 @@ +plugins { + id("otel.library-instrumentation") +} + +dependencies { + implementation(project(":instrumentation:kafka-clients:kafka-clients-common:library")) + compileOnly("org.apache.kafka:kafka-clients:0.11.0.0") + + testLibrary("org.springframework.kafka:spring-kafka:2.4.0.RELEASE") + testLibrary("org.springframework.kafka:spring-kafka-test:2.4.0.RELEASE") + testLibrary("org.springframework:spring-core:5.2.9.RELEASE") + testImplementation("javax.xml.bind:jaxb-api:2.2.3") + + latestDepTestLibrary("org.apache.kafka:kafka_2.13:+") +} diff --git a/instrumentation/kafka-clients/kafka-clients-0.11/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/SpanDecorator.java b/instrumentation/kafka-clients/kafka-clients-0.11/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/SpanDecorator.java new file mode 100644 index 000000000000..a4b4437f0881 --- /dev/null +++ b/instrumentation/kafka-clients/kafka-clients-0.11/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/SpanDecorator.java @@ -0,0 +1,25 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.kafkaclients; + +import io.opentelemetry.api.trace.Span; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.producer.ProducerRecord; + +public interface SpanDecorator { + + /** Method called before record is sent by producer. */ + void onSend(ProducerRecord record, Span span); + + /** Method called when record is received in consumer. */ + void onResponse(ConsumerRecord record, Span span); + + /** Method called when an error occurs. */ + void onError(Exception exception, Span span); + + /** Gives a SpanDecorator with the standard tags. */ + SpanDecorator STANDARD_TAGS = new StandardSpanDecorator(); +} diff --git a/instrumentation/kafka-clients/kafka-clients-0.11/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/StandardSpanDecorator.java b/instrumentation/kafka-clients/kafka-clients-0.11/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/StandardSpanDecorator.java new file mode 100644 index 000000000000..bd316eefb1b9 --- /dev/null +++ b/instrumentation/kafka-clients/kafka-clients-0.11/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/StandardSpanDecorator.java @@ -0,0 +1,62 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.kafkaclients; + +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.common.AttributesBuilder; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.semconv.trace.attributes.SemanticAttributes; +import java.io.PrintWriter; +import java.io.StringWriter; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.producer.ProducerRecord; + +class StandardSpanDecorator implements SpanDecorator { + + static final String COMPONENT_NAME = "java-kafka"; + static final String KAFKA_SERVICE = "kafka"; + + @Override + public void onSend(ProducerRecord record, Span span) { + setCommonTags(span); + span.setAttribute(SemanticAttributes.MESSAGING_DESTINATION, record.topic()); + if (record.partition() != null) { + span.setAttribute("partition", record.partition()); + } + } + + @Override + public void onResponse(ConsumerRecord record, Span span) { + setCommonTags(span); + span.setAttribute(SemanticAttributes.MESSAGING_DESTINATION, record.topic()); + span.setAttribute("partition", record.partition()); + span.setAttribute("offset", record.offset()); + } + + @Override + public void onError(Exception exception, Span span) { + span.setAttribute(SemanticAttributes.EXCEPTION_ESCAPED, Boolean.TRUE); + span.setAllAttributes(errorLogs(exception)); + } + + private static Attributes errorLogs(Throwable throwable) { + AttributesBuilder errorLogs = Attributes.builder(); + errorLogs.put("event", SemanticAttributes.EXCEPTION_TYPE.getKey()); + errorLogs.put("error.kind", throwable.getClass().getName()); + errorLogs.put("error.object", throwable.toString()); + errorLogs.put("message", throwable.getMessage()); + + StringWriter sw = new StringWriter(); + throwable.printStackTrace(new PrintWriter(sw)); + errorLogs.put("stack", sw.toString()); + + return errorLogs.build(); + } + + private static void setCommonTags(Span span) { + span.setAttribute(SemanticAttributes.PEER_SERVICE, KAFKA_SERVICE); + } +} diff --git a/instrumentation/kafka-clients/kafka-clients-0.11/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/TracingConsumerInterceptor.java b/instrumentation/kafka-clients/kafka-clients-0.11/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/TracingConsumerInterceptor.java new file mode 100644 index 000000000000..b09e9bf6fd50 --- /dev/null +++ b/instrumentation/kafka-clients/kafka-clients-0.11/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/TracingConsumerInterceptor.java @@ -0,0 +1,34 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.kafkaclients; + +import java.util.Map; +import org.apache.kafka.clients.consumer.ConsumerInterceptor; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.TopicPartition; + +public class TracingConsumerInterceptor implements ConsumerInterceptor { + + @Override + public ConsumerRecords onConsume(ConsumerRecords records) { + for (ConsumerRecord record : records) { + TracingKafkaUtils.buildAndFinishChildSpan(record); + } + + return records; + } + + @Override + public void onCommit(Map offsets) {} + + @Override + public void close() {} + + @Override + public void configure(Map configs) {} +} diff --git a/instrumentation/kafka-clients/kafka-clients-0.11/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/TracingKafkaUtils.java b/instrumentation/kafka-clients/kafka-clients-0.11/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/TracingKafkaUtils.java new file mode 100644 index 000000000000..212797c4fd40 --- /dev/null +++ b/instrumentation/kafka-clients/kafka-clients-0.11/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/TracingKafkaUtils.java @@ -0,0 +1,128 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.kafkaclients; + +import static io.opentelemetry.instrumentation.kafka.KafkaSingletons.consumerProcessInstrumenter; +import static io.opentelemetry.instrumentation.kafka.KafkaSingletons.producerInstrumenter; + +import io.opentelemetry.api.GlobalOpenTelemetry; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.context.Context; +import io.opentelemetry.context.Scope; +import io.opentelemetry.context.propagation.TextMapGetter; +import io.opentelemetry.context.propagation.TextMapPropagator; +import io.opentelemetry.context.propagation.TextMapSetter; +import io.opentelemetry.instrumentation.kafka.KafkaHeadersGetter; +import io.opentelemetry.instrumentation.kafka.KafkaHeadersSetter; +import java.util.Collection; +import java.util.Collections; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.header.Headers; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class TracingKafkaUtils { + + private static final Logger logger = LoggerFactory.getLogger(TracingKafkaUtils.class); + + private static final TextMapGetter GETTER = new KafkaHeadersGetter(); + + private static final TextMapSetter SETTER = new KafkaHeadersSetter(); + + private static TextMapPropagator propagator() { + return GlobalOpenTelemetry.getPropagators().getTextMapPropagator(); + } + + /** + * Extract Context from record headers. + * + * @param headers record headers + * @return context + */ + public static Context extractContext(Headers headers) { + return propagator().extract(Context.current(), headers, GETTER); + } + + /** + * Inject current Context to record headers. + * + * @param headers record headers + */ + public static void inject(Headers headers) { + inject(Context.current(), headers); + } + + /** + * Inject Context to record headers. + * + * @param context the context + * @param headers record headers + */ + public static void inject(Context context, Headers headers) { + propagator().inject(context, headers, SETTER); + } + + /** + * Build and inject span into record. Return Runnable handle to end the current span. + * + * @param record the producer record to inject span info. + * @return runnable to close the current span + */ + public static Runnable buildAndInjectSpan(ProducerRecord record) { + return buildAndInjectSpan(record, Collections.singletonList(SpanDecorator.STANDARD_TAGS)); + } + + public static Runnable buildAndInjectSpan( + ProducerRecord record, Collection spanDecorators) { + + Context spanContext = extractContext(record.headers()); + + Context current = producerInstrumenter().start(spanContext, record); + try (Scope ignored = current.makeCurrent()) { + + Span span = Span.fromContext(current); + for (SpanDecorator decorator : spanDecorators) { + decorator.onSend(record, span); + } + + try { + inject(record.headers()); + } catch (Throwable t) { + // it can happen if headers are read only (when record is sent second time) + logger.error("failed to inject span context. sending record second time?", t); + } + } + + return () -> producerInstrumenter().end(current, record, null, null); + } + + public static void buildAndFinishChildSpan(ConsumerRecord record) { + buildAndFinishChildSpan(record, Collections.singletonList(SpanDecorator.STANDARD_TAGS)); + } + + public static void buildAndFinishChildSpan( + ConsumerRecord record, Collection spanDecorators) { + + Context parentContext = extractContext(record.headers()); + Context current = consumerProcessInstrumenter().start(parentContext, record); + try (Scope ignored = current.makeCurrent()) { + + Span span = Span.fromContext(current); + for (SpanDecorator decorator : spanDecorators) { + decorator.onResponse(record, span); + } + } catch (RuntimeException e) { + consumerProcessInstrumenter().end(current, record, null, e); + throw e; + } + consumerProcessInstrumenter().end(current, record, null, null); + + // Inject created span context into record headers for extraction by client to continue span + // chain + inject(current, record.headers()); // TODO -- OK? + } +} diff --git a/instrumentation/kafka-clients/kafka-clients-0.11/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/TracingProducerInterceptor.java b/instrumentation/kafka-clients/kafka-clients-0.11/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/TracingProducerInterceptor.java new file mode 100644 index 000000000000..8209c08f297a --- /dev/null +++ b/instrumentation/kafka-clients/kafka-clients-0.11/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/TracingProducerInterceptor.java @@ -0,0 +1,28 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.kafkaclients; + +import java.util.Map; +import org.apache.kafka.clients.producer.ProducerInterceptor; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; + +public class TracingProducerInterceptor implements ProducerInterceptor { + @Override + public ProducerRecord onSend(ProducerRecord producerRecord) { + TracingKafkaUtils.buildAndInjectSpan(producerRecord).run(); + return producerRecord; + } + + @Override + public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) {} + + @Override + public void close() {} + + @Override + public void configure(Map map) {} +} diff --git a/instrumentation/kafka-clients/kafka-clients-0.11/library/src/test/groovy/io/opentelemetry/instrumentation/kafkaclients/InterceptorsTest.groovy b/instrumentation/kafka-clients/kafka-clients-0.11/library/src/test/groovy/io/opentelemetry/instrumentation/kafkaclients/InterceptorsTest.groovy new file mode 100644 index 000000000000..78b86801cb5e --- /dev/null +++ b/instrumentation/kafka-clients/kafka-clients-0.11/library/src/test/groovy/io/opentelemetry/instrumentation/kafkaclients/InterceptorsTest.groovy @@ -0,0 +1,113 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.kafkaclients + + +import io.opentelemetry.instrumentation.test.InstrumentationSpecification +import io.opentelemetry.instrumentation.test.LibraryTestTrait +import org.apache.kafka.clients.consumer.ConsumerConfig +import org.apache.kafka.clients.consumer.ConsumerRecord +import org.apache.kafka.clients.producer.ProducerConfig +import org.apache.kafka.common.header.Header +import org.apache.kafka.common.header.Headers +import org.junit.Rule +import org.springframework.kafka.core.DefaultKafkaConsumerFactory +import org.springframework.kafka.core.DefaultKafkaProducerFactory +import org.springframework.kafka.core.KafkaTemplate +import org.springframework.kafka.listener.ContainerProperties +import org.springframework.kafka.listener.KafkaMessageListenerContainer +import org.springframework.kafka.listener.MessageListener +import org.springframework.kafka.test.rule.EmbeddedKafkaRule +import org.springframework.kafka.test.utils.ContainerTestUtils +import org.springframework.kafka.test.utils.KafkaTestUtils +import spock.lang.Unroll + +import java.nio.charset.StandardCharsets +import java.util.concurrent.LinkedBlockingQueue +import java.util.concurrent.TimeUnit +import java.util.function.Consumer + +class InterceptorsTest extends InstrumentationSpecification implements LibraryTestTrait { + + private static final TOPIC = "xtopic" + + @Rule + EmbeddedKafkaRule embeddedKafka = new EmbeddedKafkaRule(1, true, TOPIC) + + @Unroll + def "test interceptors"() throws Exception { + setup: + def senderProps = KafkaTestUtils.producerProps( + embeddedKafka.getEmbeddedKafka().getBrokersAsString()) + senderProps.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, TracingProducerInterceptor.getName()) + + def producerFactory = new DefaultKafkaProducerFactory(senderProps) + def kafkaTemplate = new KafkaTemplate(producerFactory) + + // set up the Kafka consumer properties + def consumerProperties = KafkaTestUtils.consumerProps("xgroup", "true", embeddedKafka.getEmbeddedKafka()) + consumerProperties.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, TracingConsumerInterceptor.getName()) + + // create a Kafka consumer factory + def consumerFactory = new DefaultKafkaConsumerFactory(consumerProperties) + + // set the topic that needs to be consumed + def containerProperties = new ContainerProperties(TOPIC) + + // create a Kafka MessageListenerContainer + def container = new KafkaMessageListenerContainer(consumerFactory, containerProperties) + + // create a thread safe queue to store the received message + def records = new LinkedBlockingQueue>() + + // setup a Kafka message listener + container.setupMessageListener(new MessageListener() { + @Override + void onMessage(ConsumerRecord record) { + records.add(record) + } + }) + + // start the container and underlying message listener + container.start() + + // wait until the container has the required number of assigned partitions + ContainerTestUtils.waitForAssignment(container, embeddedKafka.getEmbeddedKafka().getPartitionsPerTopic()) + + when: + String message = "Testing 123" + def produced = kafkaTemplate.send(TOPIC, message).get().producerRecord + + then: + // check that the message was received + def received = records.poll(5, TimeUnit.SECONDS) + + // TODO -- add asserts ... + println "XXX------------------------------" + println "produced = " + printHeader(produced.headers()) + println "received = " + printHeader(received.headers()) + + cleanup: + producerFactory.destroy() + container?.stop() + } + + def printHeader(Headers headers) { + def output = new StringBuilder() + headers.forEach(new Consumer
() { + @Override + void accept(Header header) { + output + .append(header.key()) + .append("=") + .append(new String(header.value(), StandardCharsets.UTF_8)) + .append(",") + } + }) + return output + } +} + diff --git a/instrumentation/kafka-clients/kafka-clients-0.11/library/src/test/resources/log4j.properties b/instrumentation/kafka-clients/kafka-clients-0.11/library/src/test/resources/log4j.properties new file mode 100644 index 000000000000..1ea4751367ed --- /dev/null +++ b/instrumentation/kafka-clients/kafka-clients-0.11/library/src/test/resources/log4j.properties @@ -0,0 +1,5 @@ +# Set root logger level to INFO and its only appender to C1. +log4j.rootLogger=INFO, A1 + +# C1 is set to be a ConsoleAppender. +log4j.appender.C1=org.apache.log4j.ConsoleAppender diff --git a/instrumentation/kafka-clients/kafka-clients-common/javaagent/build.gradle.kts b/instrumentation/kafka-clients/kafka-clients-common/javaagent/build.gradle.kts deleted file mode 100644 index c27944c21943..000000000000 --- a/instrumentation/kafka-clients/kafka-clients-common/javaagent/build.gradle.kts +++ /dev/null @@ -1,7 +0,0 @@ -plugins { - id("otel.javaagent-instrumentation") -} - -dependencies { - compileOnly("org.apache.kafka:kafka-clients:0.11.0.0") -} diff --git a/instrumentation/kafka-clients/kafka-clients-common/library/build.gradle.kts b/instrumentation/kafka-clients/kafka-clients-common/library/build.gradle.kts new file mode 100644 index 000000000000..780051761e57 --- /dev/null +++ b/instrumentation/kafka-clients/kafka-clients-common/library/build.gradle.kts @@ -0,0 +1,9 @@ +plugins { + id("otel.library-instrumentation") +} + +dependencies { + compileOnly("org.apache.kafka:kafka-clients:0.11.0.0") + compileOnly("com.google.auto.value:auto-value-annotations") + annotationProcessor("com.google.auto.value:auto-value") +} diff --git a/instrumentation/kafka-clients/kafka-clients-common/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafka/KafkaConsumerAdditionalAttributesExtractor.java b/instrumentation/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/KafkaConsumerAdditionalAttributesExtractor.java similarity index 94% rename from instrumentation/kafka-clients/kafka-clients-common/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafka/KafkaConsumerAdditionalAttributesExtractor.java rename to instrumentation/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/KafkaConsumerAdditionalAttributesExtractor.java index 9cbb7047cea4..5c53e13c40bb 100644 --- a/instrumentation/kafka-clients/kafka-clients-common/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafka/KafkaConsumerAdditionalAttributesExtractor.java +++ b/instrumentation/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/KafkaConsumerAdditionalAttributesExtractor.java @@ -3,7 +3,7 @@ * SPDX-License-Identifier: Apache-2.0 */ -package io.opentelemetry.javaagent.instrumentation.kafka; +package io.opentelemetry.instrumentation.kafka; import io.opentelemetry.api.common.AttributesBuilder; import io.opentelemetry.instrumentation.api.instrumenter.AttributesExtractor; diff --git a/instrumentation/kafka-clients/kafka-clients-common/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafka/KafkaConsumerAttributesExtractor.java b/instrumentation/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/KafkaConsumerAttributesExtractor.java similarity index 97% rename from instrumentation/kafka-clients/kafka-clients-common/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafka/KafkaConsumerAttributesExtractor.java rename to instrumentation/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/KafkaConsumerAttributesExtractor.java index 5de1a1af9904..b2603a5cc8c0 100644 --- a/instrumentation/kafka-clients/kafka-clients-common/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafka/KafkaConsumerAttributesExtractor.java +++ b/instrumentation/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/KafkaConsumerAttributesExtractor.java @@ -3,7 +3,7 @@ * SPDX-License-Identifier: Apache-2.0 */ -package io.opentelemetry.javaagent.instrumentation.kafka; +package io.opentelemetry.instrumentation.kafka; import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessageOperation; import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesExtractor; diff --git a/instrumentation/kafka-clients/kafka-clients-common/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafka/KafkaConsumerExperimentalAttributesExtractor.java b/instrumentation/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/KafkaConsumerExperimentalAttributesExtractor.java similarity index 97% rename from instrumentation/kafka-clients/kafka-clients-common/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafka/KafkaConsumerExperimentalAttributesExtractor.java rename to instrumentation/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/KafkaConsumerExperimentalAttributesExtractor.java index a7c1b768e96c..13fe3eadb9f9 100644 --- a/instrumentation/kafka-clients/kafka-clients-common/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafka/KafkaConsumerExperimentalAttributesExtractor.java +++ b/instrumentation/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/KafkaConsumerExperimentalAttributesExtractor.java @@ -3,7 +3,7 @@ * SPDX-License-Identifier: Apache-2.0 */ -package io.opentelemetry.javaagent.instrumentation.kafka; +package io.opentelemetry.instrumentation.kafka; import static io.opentelemetry.api.common.AttributeKey.longKey; diff --git a/instrumentation/kafka-clients/kafka-clients-common/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafka/KafkaConsumerIteratorWrapper.java b/instrumentation/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/KafkaConsumerIteratorWrapper.java similarity index 88% rename from instrumentation/kafka-clients/kafka-clients-common/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafka/KafkaConsumerIteratorWrapper.java rename to instrumentation/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/KafkaConsumerIteratorWrapper.java index b0f1bbfd626e..cc61022d316c 100644 --- a/instrumentation/kafka-clients/kafka-clients-common/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafka/KafkaConsumerIteratorWrapper.java +++ b/instrumentation/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/KafkaConsumerIteratorWrapper.java @@ -3,7 +3,7 @@ * SPDX-License-Identifier: Apache-2.0 */ -package io.opentelemetry.javaagent.instrumentation.kafka; +package io.opentelemetry.instrumentation.kafka; import java.util.Iterator; import org.apache.kafka.clients.consumer.ConsumerRecord; diff --git a/instrumentation/kafka-clients/kafka-clients-common/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafka/KafkaHeadersGetter.java b/instrumentation/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/KafkaConsumerRecordGetter.java similarity index 87% rename from instrumentation/kafka-clients/kafka-clients-common/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafka/KafkaHeadersGetter.java rename to instrumentation/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/KafkaConsumerRecordGetter.java index fd57abfaf124..fd28fb8d856a 100644 --- a/instrumentation/kafka-clients/kafka-clients-common/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafka/KafkaHeadersGetter.java +++ b/instrumentation/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/KafkaConsumerRecordGetter.java @@ -3,7 +3,7 @@ * SPDX-License-Identifier: Apache-2.0 */ -package io.opentelemetry.javaagent.instrumentation.kafka; +package io.opentelemetry.instrumentation.kafka; import io.opentelemetry.context.propagation.TextMapGetter; import java.nio.charset.StandardCharsets; @@ -13,7 +13,7 @@ import org.apache.kafka.common.header.Header; import org.checkerframework.checker.nullness.qual.Nullable; -public final class KafkaHeadersGetter implements TextMapGetter> { +public final class KafkaConsumerRecordGetter implements TextMapGetter> { @Override public Iterable keys(ConsumerRecord carrier) { return StreamSupport.stream(carrier.headers().spliterator(), false) diff --git a/instrumentation/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/KafkaHeadersGetter.java b/instrumentation/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/KafkaHeadersGetter.java new file mode 100644 index 000000000000..62244aaaafa4 --- /dev/null +++ b/instrumentation/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/KafkaHeadersGetter.java @@ -0,0 +1,37 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.kafka; + +import io.opentelemetry.context.propagation.TextMapGetter; +import java.nio.charset.StandardCharsets; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; +import org.apache.kafka.common.header.Header; +import org.apache.kafka.common.header.Headers; +import org.checkerframework.checker.nullness.qual.Nullable; + +public final class KafkaHeadersGetter implements TextMapGetter { + @Override + public Iterable keys(Headers carrier) { + return StreamSupport.stream(carrier.spliterator(), false) + .map(Header::key) + .collect(Collectors.toList()); + } + + @Nullable + @Override + public String get(@Nullable Headers carrier, String key) { + Header header = carrier.lastHeader(key); + if (header == null) { + return null; + } + byte[] value = header.value(); + if (value == null) { + return null; + } + return new String(value, StandardCharsets.UTF_8); + } +} diff --git a/instrumentation/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/KafkaHeadersSetter.java b/instrumentation/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/KafkaHeadersSetter.java similarity index 87% rename from instrumentation/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/KafkaHeadersSetter.java rename to instrumentation/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/KafkaHeadersSetter.java index c23f1c314ea1..f26c17d87f27 100644 --- a/instrumentation/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/KafkaHeadersSetter.java +++ b/instrumentation/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/KafkaHeadersSetter.java @@ -3,7 +3,7 @@ * SPDX-License-Identifier: Apache-2.0 */ -package io.opentelemetry.javaagent.instrumentation.kafkaclients; +package io.opentelemetry.instrumentation.kafka; import io.opentelemetry.context.propagation.TextMapSetter; import java.nio.charset.StandardCharsets; diff --git a/instrumentation/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/KafkaProducerAdditionalAttributesExtractor.java b/instrumentation/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/KafkaProducerAdditionalAttributesExtractor.java similarity index 94% rename from instrumentation/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/KafkaProducerAdditionalAttributesExtractor.java rename to instrumentation/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/KafkaProducerAdditionalAttributesExtractor.java index 1a83c7c862ee..fd91c8081b74 100644 --- a/instrumentation/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/KafkaProducerAdditionalAttributesExtractor.java +++ b/instrumentation/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/KafkaProducerAdditionalAttributesExtractor.java @@ -3,7 +3,7 @@ * SPDX-License-Identifier: Apache-2.0 */ -package io.opentelemetry.javaagent.instrumentation.kafkaclients; +package io.opentelemetry.instrumentation.kafka; import io.opentelemetry.api.common.AttributesBuilder; import io.opentelemetry.instrumentation.api.instrumenter.AttributesExtractor; diff --git a/instrumentation/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/KafkaProducerAttributesExtractor.java b/instrumentation/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/KafkaProducerAttributesExtractor.java similarity index 96% rename from instrumentation/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/KafkaProducerAttributesExtractor.java rename to instrumentation/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/KafkaProducerAttributesExtractor.java index 3d3402cde666..bce52c377bee 100644 --- a/instrumentation/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/KafkaProducerAttributesExtractor.java +++ b/instrumentation/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/KafkaProducerAttributesExtractor.java @@ -3,7 +3,7 @@ * SPDX-License-Identifier: Apache-2.0 */ -package io.opentelemetry.javaagent.instrumentation.kafkaclients; +package io.opentelemetry.instrumentation.kafka; import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessageOperation; import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesExtractor; diff --git a/instrumentation/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/KafkaPropagation.java b/instrumentation/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/KafkaPropagation.java similarity index 97% rename from instrumentation/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/KafkaPropagation.java rename to instrumentation/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/KafkaPropagation.java index 6b163374518b..7e3e44afe826 100644 --- a/instrumentation/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/KafkaPropagation.java +++ b/instrumentation/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/KafkaPropagation.java @@ -3,7 +3,7 @@ * SPDX-License-Identifier: Apache-2.0 */ -package io.opentelemetry.javaagent.instrumentation.kafkaclients; +package io.opentelemetry.instrumentation.kafka; import io.opentelemetry.api.GlobalOpenTelemetry; import io.opentelemetry.context.Context; diff --git a/instrumentation/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/KafkaReceiveAttributesExtractor.java b/instrumentation/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/KafkaReceiveAttributesExtractor.java similarity index 97% rename from instrumentation/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/KafkaReceiveAttributesExtractor.java rename to instrumentation/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/KafkaReceiveAttributesExtractor.java index d22b8ca9df33..45a1b7b40664 100644 --- a/instrumentation/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/KafkaReceiveAttributesExtractor.java +++ b/instrumentation/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/KafkaReceiveAttributesExtractor.java @@ -3,7 +3,7 @@ * SPDX-License-Identifier: Apache-2.0 */ -package io.opentelemetry.javaagent.instrumentation.kafkaclients; +package io.opentelemetry.instrumentation.kafka; import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessageOperation; import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesExtractor; diff --git a/instrumentation/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/KafkaSingletons.java b/instrumentation/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/KafkaSingletons.java new file mode 100644 index 000000000000..d75108694d7a --- /dev/null +++ b/instrumentation/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/KafkaSingletons.java @@ -0,0 +1,39 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.kafka; + +import static io.opentelemetry.instrumentation.kafka.KafkaUtils.buildConsumerProcessInstrumenter; +import static io.opentelemetry.instrumentation.kafka.KafkaUtils.buildConsumerReceiveInstrumenter; +import static io.opentelemetry.instrumentation.kafka.KafkaUtils.buildProducerInstrumenter; + +import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.producer.ProducerRecord; + +public final class KafkaSingletons { + private static final String INSTRUMENTATION_NAME = "io.opentelemetry.kafka-clients-0.11"; + + private static final Instrumenter, Void> PRODUCER_INSTRUMENTER = + buildProducerInstrumenter(INSTRUMENTATION_NAME); + private static final Instrumenter CONSUMER_RECEIVE_INSTRUMENTER = + buildConsumerReceiveInstrumenter(INSTRUMENTATION_NAME); + private static final Instrumenter, Void> CONSUMER_PROCESS_INSTRUMENTER = + buildConsumerProcessInstrumenter(INSTRUMENTATION_NAME); + + public static Instrumenter, Void> producerInstrumenter() { + return PRODUCER_INSTRUMENTER; + } + + public static Instrumenter consumerReceiveInstrumenter() { + return CONSUMER_RECEIVE_INSTRUMENTER; + } + + public static Instrumenter, Void> consumerProcessInstrumenter() { + return CONSUMER_PROCESS_INSTRUMENTER; + } + + private KafkaSingletons() {} +} diff --git a/instrumentation/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/KafkaSingletons.java b/instrumentation/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/KafkaUtils.java similarity index 56% rename from instrumentation/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/KafkaSingletons.java rename to instrumentation/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/KafkaUtils.java index 12a28b691955..7c531204f86e 100644 --- a/instrumentation/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/KafkaSingletons.java +++ b/instrumentation/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/KafkaUtils.java @@ -3,7 +3,7 @@ * SPDX-License-Identifier: Apache-2.0 */ -package io.opentelemetry.javaagent.instrumentation.kafkaclients; +package io.opentelemetry.instrumentation.kafka; import io.opentelemetry.api.GlobalOpenTelemetry; import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter; @@ -12,48 +12,39 @@ import io.opentelemetry.instrumentation.api.instrumenter.SpanNameExtractor; import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessageOperation; import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingSpanNameExtractor; -import io.opentelemetry.javaagent.instrumentation.kafka.KafkaConsumerAdditionalAttributesExtractor; -import io.opentelemetry.javaagent.instrumentation.kafka.KafkaConsumerAttributesExtractor; -import io.opentelemetry.javaagent.instrumentation.kafka.KafkaConsumerExperimentalAttributesExtractor; -import io.opentelemetry.javaagent.instrumentation.kafka.KafkaHeadersGetter; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.producer.ProducerRecord; -public final class KafkaSingletons { - private static final String INSTRUMENTATION_NAME = "io.opentelemetry.kafka-clients-0.11"; +public final class KafkaUtils { - private static final Instrumenter, Void> PRODUCER_INSTRUMENTER = - buildProducerInstrumenter(); - private static final Instrumenter CONSUMER_RECEIVE_INSTRUMENTER = - buildConsumerReceiveInstrumenter(); - private static final Instrumenter, Void> CONSUMER_PROCESS_INSTRUMENTER = - buildConsumerProcessInstrumenter(); - - private static Instrumenter, Void> buildProducerInstrumenter() { + static Instrumenter, Void> buildProducerInstrumenter( + String instrumentationName) { KafkaProducerAttributesExtractor attributesExtractor = new KafkaProducerAttributesExtractor(); SpanNameExtractor> spanNameExtractor = MessagingSpanNameExtractor.create(attributesExtractor); return Instrumenter., Void>newBuilder( - GlobalOpenTelemetry.get(), INSTRUMENTATION_NAME, spanNameExtractor) + GlobalOpenTelemetry.get(), instrumentationName, spanNameExtractor) .addAttributesExtractor(attributesExtractor) .addAttributesExtractor(new KafkaProducerAdditionalAttributesExtractor()) .newInstrumenter(SpanKindExtractor.alwaysProducer()); } - private static Instrumenter buildConsumerReceiveInstrumenter() { + static Instrumenter buildConsumerReceiveInstrumenter( + String instrumentationName) { KafkaReceiveAttributesExtractor attributesExtractor = new KafkaReceiveAttributesExtractor(); SpanNameExtractor spanNameExtractor = MessagingSpanNameExtractor.create(attributesExtractor); return Instrumenter.newBuilder( - GlobalOpenTelemetry.get(), INSTRUMENTATION_NAME, spanNameExtractor) + GlobalOpenTelemetry.get(), instrumentationName, spanNameExtractor) .addAttributesExtractor(attributesExtractor) .setTimeExtractors(ReceivedRecords::startTime, (request, response, error) -> request.now()) .newInstrumenter(SpanKindExtractor.alwaysConsumer()); } - private static Instrumenter, Void> buildConsumerProcessInstrumenter() { + static Instrumenter, Void> buildConsumerProcessInstrumenter( + String instrumentationName) { KafkaConsumerAttributesExtractor attributesExtractor = new KafkaConsumerAttributesExtractor(MessageOperation.PROCESS); SpanNameExtractor> spanNameExtractor = @@ -61,28 +52,16 @@ private static Instrumenter buildConsumerReceiveInstrumen InstrumenterBuilder, Void> builder = Instrumenter., Void>newBuilder( - GlobalOpenTelemetry.get(), INSTRUMENTATION_NAME, spanNameExtractor) + GlobalOpenTelemetry.get(), instrumentationName, spanNameExtractor) .addAttributesExtractor(attributesExtractor) .addAttributesExtractor(new KafkaConsumerAdditionalAttributesExtractor()); if (KafkaConsumerExperimentalAttributesExtractor.isEnabled()) { builder.addAttributesExtractor(new KafkaConsumerExperimentalAttributesExtractor()); } return KafkaPropagation.isPropagationEnabled() - ? builder.newConsumerInstrumenter(new KafkaHeadersGetter()) + ? builder.newConsumerInstrumenter(new KafkaConsumerRecordGetter()) : builder.newInstrumenter(SpanKindExtractor.alwaysConsumer()); } - public static Instrumenter, Void> producerInstrumenter() { - return PRODUCER_INSTRUMENTER; - } - - public static Instrumenter consumerReceiveInstrumenter() { - return CONSUMER_RECEIVE_INSTRUMENTER; - } - - public static Instrumenter, Void> consumerProcessInstrumenter() { - return CONSUMER_PROCESS_INSTRUMENTER; - } - - private KafkaSingletons() {} + private KafkaUtils() {} } diff --git a/instrumentation/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/ProducerCallback.java b/instrumentation/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/ProducerCallback.java similarity index 86% rename from instrumentation/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/ProducerCallback.java rename to instrumentation/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/ProducerCallback.java index a500724b8e6c..35362d847cad 100644 --- a/instrumentation/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/ProducerCallback.java +++ b/instrumentation/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/ProducerCallback.java @@ -3,9 +3,9 @@ * SPDX-License-Identifier: Apache-2.0 */ -package io.opentelemetry.javaagent.instrumentation.kafkaclients; +package io.opentelemetry.instrumentation.kafka; -import static io.opentelemetry.javaagent.instrumentation.kafkaclients.KafkaSingletons.producerInstrumenter; +import static io.opentelemetry.instrumentation.kafka.KafkaSingletons.producerInstrumenter; import io.opentelemetry.context.Context; import io.opentelemetry.context.Scope; diff --git a/instrumentation/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/ReceivedRecords.java b/instrumentation/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/ReceivedRecords.java similarity index 90% rename from instrumentation/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/ReceivedRecords.java rename to instrumentation/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/ReceivedRecords.java index 260457c32d94..8ff29b32414d 100644 --- a/instrumentation/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/ReceivedRecords.java +++ b/instrumentation/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/ReceivedRecords.java @@ -3,7 +3,7 @@ * SPDX-License-Identifier: Apache-2.0 */ -package io.opentelemetry.javaagent.instrumentation.kafkaclients; +package io.opentelemetry.instrumentation.kafka; import com.google.auto.value.AutoValue; import java.time.Instant; diff --git a/instrumentation/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/Timer.java b/instrumentation/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/Timer.java similarity index 90% rename from instrumentation/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/Timer.java rename to instrumentation/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/Timer.java index 42b3bc819910..835bb3c65c00 100644 --- a/instrumentation/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/Timer.java +++ b/instrumentation/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/Timer.java @@ -3,7 +3,7 @@ * SPDX-License-Identifier: Apache-2.0 */ -package io.opentelemetry.javaagent.instrumentation.kafkaclients; +package io.opentelemetry.instrumentation.kafka; import java.time.Instant; diff --git a/instrumentation/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/TracingIterable.java b/instrumentation/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/TracingIterable.java similarity index 93% rename from instrumentation/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/TracingIterable.java rename to instrumentation/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/TracingIterable.java index 19c5e467e1bc..13612be74832 100644 --- a/instrumentation/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/TracingIterable.java +++ b/instrumentation/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/TracingIterable.java @@ -3,7 +3,7 @@ * SPDX-License-Identifier: Apache-2.0 */ -package io.opentelemetry.javaagent.instrumentation.kafkaclients; +package io.opentelemetry.instrumentation.kafka; import java.util.Iterator; import org.apache.kafka.clients.consumer.ConsumerRecord; diff --git a/instrumentation/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/TracingIterator.java b/instrumentation/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/TracingIterator.java similarity index 89% rename from instrumentation/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/TracingIterator.java rename to instrumentation/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/TracingIterator.java index d4ca812d50ab..3a17913cc83d 100644 --- a/instrumentation/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/TracingIterator.java +++ b/instrumentation/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/TracingIterator.java @@ -3,13 +3,12 @@ * SPDX-License-Identifier: Apache-2.0 */ -package io.opentelemetry.javaagent.instrumentation.kafkaclients; +package io.opentelemetry.instrumentation.kafka; -import static io.opentelemetry.javaagent.instrumentation.kafkaclients.KafkaSingletons.consumerProcessInstrumenter; +import static io.opentelemetry.instrumentation.kafka.KafkaSingletons.consumerProcessInstrumenter; import io.opentelemetry.context.Context; import io.opentelemetry.context.Scope; -import io.opentelemetry.javaagent.instrumentation.kafka.KafkaConsumerIteratorWrapper; import java.util.Iterator; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.checkerframework.checker.nullness.qual.Nullable; diff --git a/instrumentation/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/TracingList.java b/instrumentation/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/TracingList.java similarity index 98% rename from instrumentation/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/TracingList.java rename to instrumentation/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/TracingList.java index 603ae4c53099..357bcc1964bd 100644 --- a/instrumentation/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/TracingList.java +++ b/instrumentation/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/TracingList.java @@ -3,7 +3,7 @@ * SPDX-License-Identifier: Apache-2.0 */ -package io.opentelemetry.javaagent.instrumentation.kafkaclients; +package io.opentelemetry.instrumentation.kafka; import java.util.Collection; import java.util.List; diff --git a/instrumentation/spring/spring-kafka-2.7/javaagent/build.gradle.kts b/instrumentation/spring/spring-kafka-2.7/javaagent/build.gradle.kts index 896be8200e1c..3a7a2a4e5c09 100644 --- a/instrumentation/spring/spring-kafka-2.7/javaagent/build.gradle.kts +++ b/instrumentation/spring/spring-kafka-2.7/javaagent/build.gradle.kts @@ -17,7 +17,7 @@ dependencies { compileOnly("com.google.auto.value:auto-value-annotations") annotationProcessor("com.google.auto.value:auto-value") - implementation(project(":instrumentation:kafka-clients:kafka-clients-common:javaagent")) + implementation(project(":instrumentation:kafka-clients:kafka-clients-common:library")) library("org.springframework.kafka:spring-kafka:2.7.0") diff --git a/instrumentation/spring/spring-kafka-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/kafka/KafkaBatchProcessSpanLinksExtractor.java b/instrumentation/spring/spring-kafka-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/kafka/KafkaBatchProcessSpanLinksExtractor.java index 3896bbabb9e5..d94ac0f06810 100644 --- a/instrumentation/spring/spring-kafka-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/kafka/KafkaBatchProcessSpanLinksExtractor.java +++ b/instrumentation/spring/spring-kafka-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/kafka/KafkaBatchProcessSpanLinksExtractor.java @@ -9,8 +9,8 @@ import io.opentelemetry.context.propagation.ContextPropagators; import io.opentelemetry.instrumentation.api.instrumenter.SpanLinksBuilder; import io.opentelemetry.instrumentation.api.instrumenter.SpanLinksExtractor; -import io.opentelemetry.javaagent.instrumentation.kafka.KafkaConsumerIteratorWrapper; -import io.opentelemetry.javaagent.instrumentation.kafka.KafkaHeadersGetter; +import io.opentelemetry.instrumentation.kafka.KafkaConsumerIteratorWrapper; +import io.opentelemetry.instrumentation.kafka.KafkaConsumerRecordGetter; import java.util.Iterator; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; @@ -22,7 +22,7 @@ public class KafkaBatchProcessSpanLinksExtractor public KafkaBatchProcessSpanLinksExtractor(ContextPropagators contextPropagators) { this.singleRecordLinkExtractor = - SpanLinksExtractor.fromUpstreamRequest(contextPropagators, new KafkaHeadersGetter()); + SpanLinksExtractor.fromUpstreamRequest(contextPropagators, new KafkaConsumerRecordGetter()); } @Override diff --git a/settings.gradle.kts b/settings.gradle.kts index 5363573733ff..7acca37388f4 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -211,8 +211,9 @@ include(":instrumentation:jsf:mojarra-1.2:javaagent") include(":instrumentation:jsf:myfaces-1.2:javaagent") include(":instrumentation:jsp-2.3:javaagent") include(":instrumentation:kafka-clients:kafka-clients-0.11:javaagent") +include(":instrumentation:kafka-clients:kafka-clients-0.11:library") include(":instrumentation:kafka-clients:kafka-clients-2.4.0-testing") -include(":instrumentation:kafka-clients:kafka-clients-common:javaagent") +include(":instrumentation:kafka-clients:kafka-clients-common:library") include(":instrumentation:kafka-streams-0.11:javaagent") include(":instrumentation:kotlinx-coroutines:javaagent") include(":instrumentation:kubernetes-client-7.0:javaagent")