From 7fd27e6adfb5665d9f51da35b3b34f529c1a212f Mon Sep 17 00:00:00 2001 From: Mateusz Rzeszutek Date: Thu, 28 Jul 2022 12:44:53 +0200 Subject: [PATCH] Add Kafka instrumentation to the Spring Boot starter (#6371) --- .../build.gradle.kts | 5 + ...ListenerContainerFactoryPostProcessor.java | 35 +++++ ...KafkaInstrumentationAutoConfiguration.java | 37 +++++ .../kafka/KafkaInstrumentationProperties.java | 22 +++ .../main/resources/META-INF/spring.factories | 1 + .../kafka/KafkaIntegrationTest.java | 141 ++++++++++++++++++ 6 files changed, 241 insertions(+) create mode 100644 instrumentation/spring/spring-boot-autoconfigure/src/main/java/io/opentelemetry/instrumentation/spring/autoconfigure/kafka/ConcurrentKafkaListenerContainerFactoryPostProcessor.java create mode 100644 instrumentation/spring/spring-boot-autoconfigure/src/main/java/io/opentelemetry/instrumentation/spring/autoconfigure/kafka/KafkaInstrumentationAutoConfiguration.java create mode 100644 instrumentation/spring/spring-boot-autoconfigure/src/main/java/io/opentelemetry/instrumentation/spring/autoconfigure/kafka/KafkaInstrumentationProperties.java create mode 100644 instrumentation/spring/spring-boot-autoconfigure/src/test/java/io/opentelemetry/instrumentation/spring/autoconfigure/kafka/KafkaIntegrationTest.java diff --git a/instrumentation/spring/spring-boot-autoconfigure/build.gradle.kts b/instrumentation/spring/spring-boot-autoconfigure/build.gradle.kts index 5e5381c8ac51..cffab0f58048 100644 --- a/instrumentation/spring/spring-boot-autoconfigure/build.gradle.kts +++ b/instrumentation/spring/spring-boot-autoconfigure/build.gradle.kts @@ -14,6 +14,8 @@ dependencies { implementation("javax.validation:validation-api:2.0.1.Final") implementation(project(":instrumentation-annotations-support")) + implementation(project(":instrumentation:kafka:kafka-clients:kafka-clients-2.6:library")) + implementation(project(":instrumentation:spring:spring-kafka-2.7:library")) implementation(project(":instrumentation:spring:spring-web-3.1:library")) implementation(project(":instrumentation:spring:spring-webmvc-3.1:library")) implementation(project(":instrumentation:spring:spring-webflux-5.0:library")) @@ -22,6 +24,7 @@ dependencies { exclude("io.micrometer", "micrometer-core") } + compileOnly("org.springframework.kafka:spring-kafka:2.7.1") compileOnly("org.springframework.boot:spring-boot-starter-actuator:$springBootVersion") compileOnly("org.springframework.boot:spring-boot-starter-aop:$springBootVersion") compileOnly("org.springframework.boot:spring-boot-starter-web:$springBootVersion") @@ -38,6 +41,7 @@ dependencies { compileOnly("io.opentelemetry:opentelemetry-exporter-zipkin") compileOnly(project(":instrumentation-annotations")) + testImplementation("org.springframework.kafka:spring-kafka:2.7.1") testImplementation("org.springframework.boot:spring-boot-starter-actuator:$springBootVersion") testImplementation("org.springframework.boot:spring-boot-starter-aop:$springBootVersion") testImplementation("org.springframework.boot:spring-boot-starter-webflux:$springBootVersion") @@ -45,6 +49,7 @@ dependencies { testImplementation("org.springframework.boot:spring-boot-starter-test:$springBootVersion") { exclude("org.junit.vintage", "junit-vintage-engine") } + testImplementation("org.testcontainers:kafka") testImplementation(project(":testing-common")) testImplementation("io.opentelemetry:opentelemetry-sdk") diff --git a/instrumentation/spring/spring-boot-autoconfigure/src/main/java/io/opentelemetry/instrumentation/spring/autoconfigure/kafka/ConcurrentKafkaListenerContainerFactoryPostProcessor.java b/instrumentation/spring/spring-boot-autoconfigure/src/main/java/io/opentelemetry/instrumentation/spring/autoconfigure/kafka/ConcurrentKafkaListenerContainerFactoryPostProcessor.java new file mode 100644 index 000000000000..4a39a5d296ef --- /dev/null +++ b/instrumentation/spring/spring-boot-autoconfigure/src/main/java/io/opentelemetry/instrumentation/spring/autoconfigure/kafka/ConcurrentKafkaListenerContainerFactoryPostProcessor.java @@ -0,0 +1,35 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.spring.autoconfigure.kafka; + +import io.opentelemetry.api.OpenTelemetry; +import io.opentelemetry.instrumentation.spring.kafka.v2_7.SpringKafkaTelemetry; +import org.springframework.beans.factory.config.BeanPostProcessor; +import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; + +class ConcurrentKafkaListenerContainerFactoryPostProcessor implements BeanPostProcessor { + + private final OpenTelemetry openTelemetry; + + ConcurrentKafkaListenerContainerFactoryPostProcessor(OpenTelemetry openTelemetry) { + this.openTelemetry = openTelemetry; + } + + @Override + public Object postProcessAfterInitialization(Object bean, String beanName) { + if (!(bean instanceof ConcurrentKafkaListenerContainerFactory)) { + return bean; + } + + ConcurrentKafkaListenerContainerFactory listenerContainerFactory = + (ConcurrentKafkaListenerContainerFactory) bean; + SpringKafkaTelemetry springKafkaTelemetry = SpringKafkaTelemetry.create(openTelemetry); + listenerContainerFactory.setBatchInterceptor(springKafkaTelemetry.createBatchInterceptor()); + listenerContainerFactory.setRecordInterceptor(springKafkaTelemetry.createRecordInterceptor()); + + return listenerContainerFactory; + } +} diff --git a/instrumentation/spring/spring-boot-autoconfigure/src/main/java/io/opentelemetry/instrumentation/spring/autoconfigure/kafka/KafkaInstrumentationAutoConfiguration.java b/instrumentation/spring/spring-boot-autoconfigure/src/main/java/io/opentelemetry/instrumentation/spring/autoconfigure/kafka/KafkaInstrumentationAutoConfiguration.java new file mode 100644 index 000000000000..ace3b7a4c7de --- /dev/null +++ b/instrumentation/spring/spring-boot-autoconfigure/src/main/java/io/opentelemetry/instrumentation/spring/autoconfigure/kafka/KafkaInstrumentationAutoConfiguration.java @@ -0,0 +1,37 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.spring.autoconfigure.kafka; + +import io.opentelemetry.api.OpenTelemetry; +import io.opentelemetry.instrumentation.kafkaclients.KafkaTelemetry; +import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.boot.autoconfigure.kafka.DefaultKafkaProducerFactoryCustomizer; +import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; +import org.springframework.kafka.core.KafkaTemplate; + +@Configuration +@EnableConfigurationProperties(KafkaInstrumentationProperties.class) +@ConditionalOnProperty(name = "otel.springboot.kafka.enabled", matchIfMissing = true) +@ConditionalOnClass({KafkaTemplate.class, ConcurrentKafkaListenerContainerFactory.class}) +public class KafkaInstrumentationAutoConfiguration { + + @Bean + public DefaultKafkaProducerFactoryCustomizer producerInstrumentation( + OpenTelemetry openTelemetry) { + KafkaTelemetry kafkaTelemetry = KafkaTelemetry.create(openTelemetry); + return producerFactory -> producerFactory.addPostProcessor(kafkaTelemetry::wrap); + } + + @Bean + public ConcurrentKafkaListenerContainerFactoryPostProcessor + concurrentKafkaListenerContainerFactoryPostProcessor(OpenTelemetry openTelemetry) { + return new ConcurrentKafkaListenerContainerFactoryPostProcessor(openTelemetry); + } +} diff --git a/instrumentation/spring/spring-boot-autoconfigure/src/main/java/io/opentelemetry/instrumentation/spring/autoconfigure/kafka/KafkaInstrumentationProperties.java b/instrumentation/spring/spring-boot-autoconfigure/src/main/java/io/opentelemetry/instrumentation/spring/autoconfigure/kafka/KafkaInstrumentationProperties.java new file mode 100644 index 000000000000..73eac23125bb --- /dev/null +++ b/instrumentation/spring/spring-boot-autoconfigure/src/main/java/io/opentelemetry/instrumentation/spring/autoconfigure/kafka/KafkaInstrumentationProperties.java @@ -0,0 +1,22 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.spring.autoconfigure.kafka; + +import org.springframework.boot.context.properties.ConfigurationProperties; + +@ConfigurationProperties(prefix = "otel.springboot.kafka") +public class KafkaInstrumentationProperties { + + private boolean enabled = true; + + public boolean isEnabled() { + return enabled; + } + + public void setEnabled(boolean enabled) { + this.enabled = enabled; + } +} diff --git a/instrumentation/spring/spring-boot-autoconfigure/src/main/resources/META-INF/spring.factories b/instrumentation/spring/spring-boot-autoconfigure/src/main/resources/META-INF/spring.factories index 0c92cbbab63e..229b05f41c03 100644 --- a/instrumentation/spring/spring-boot-autoconfigure/src/main/resources/META-INF/spring.factories +++ b/instrumentation/spring/spring-boot-autoconfigure/src/main/resources/META-INF/spring.factories @@ -8,6 +8,7 @@ io.opentelemetry.instrumentation.spring.autoconfigure.exporters.otlp.OtlpSpanExp io.opentelemetry.instrumentation.spring.autoconfigure.exporters.zipkin.ZipkinSpanExporterAutoConfiguration,\ io.opentelemetry.instrumentation.spring.autoconfigure.httpclients.resttemplate.RestTemplateAutoConfiguration,\ io.opentelemetry.instrumentation.spring.autoconfigure.httpclients.webclient.WebClientAutoConfiguration,\ +io.opentelemetry.instrumentation.spring.autoconfigure.kafka.KafkaInstrumentationAutoConfiguration,\ io.opentelemetry.instrumentation.spring.autoconfigure.metrics.MicrometerShimAutoConfiguration,\ io.opentelemetry.instrumentation.spring.autoconfigure.propagators.PropagationAutoConfiguration,\ io.opentelemetry.instrumentation.spring.autoconfigure.resources.OtelResourceAutoConfiguration,\ diff --git a/instrumentation/spring/spring-boot-autoconfigure/src/test/java/io/opentelemetry/instrumentation/spring/autoconfigure/kafka/KafkaIntegrationTest.java b/instrumentation/spring/spring-boot-autoconfigure/src/test/java/io/opentelemetry/instrumentation/spring/autoconfigure/kafka/KafkaIntegrationTest.java new file mode 100644 index 000000000000..8cf9084914cd --- /dev/null +++ b/instrumentation/spring/spring-boot-autoconfigure/src/test/java/io/opentelemetry/instrumentation/spring/autoconfigure/kafka/KafkaIntegrationTest.java @@ -0,0 +1,141 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.spring.autoconfigure.kafka; + +import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.equalTo; +import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.satisfies; + +import io.opentelemetry.api.OpenTelemetry; +import io.opentelemetry.api.trace.SpanKind; +import io.opentelemetry.instrumentation.testing.junit.LibraryInstrumentationExtension; +import io.opentelemetry.semconv.trace.attributes.SemanticAttributes; +import java.time.Duration; +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.assertj.core.api.AbstractLongAssert; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.springframework.boot.autoconfigure.AutoConfigurations; +import org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration; +import org.springframework.boot.test.context.runner.ApplicationContextRunner; +import org.springframework.context.ConfigurableApplicationContext; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.kafka.config.TopicBuilder; +import org.springframework.kafka.core.KafkaTemplate; +import org.testcontainers.containers.KafkaContainer; +import org.testcontainers.containers.wait.strategy.Wait; +import org.testcontainers.utility.DockerImageName; + +class KafkaIntegrationTest { + + @RegisterExtension + static final LibraryInstrumentationExtension testing = LibraryInstrumentationExtension.create(); + + static KafkaContainer kafka; + + private ApplicationContextRunner contextRunner; + + @BeforeAll + static void setUpKafka() { + kafka = + new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:5.4.3")) + .waitingFor(Wait.forLogMessage(".*started \\(kafka.server.KafkaServer\\).*", 1)) + .withStartupTimeout(Duration.ofMinutes(1)); + kafka.start(); + } + + @AfterAll + static void tearDownKafka() { + kafka.stop(); + } + + @BeforeEach + void setUpContext() { + contextRunner = + new ApplicationContextRunner() + .withConfiguration( + AutoConfigurations.of( + KafkaAutoConfiguration.class, + KafkaInstrumentationAutoConfiguration.class, + TestConfig.class)) + .withBean("openTelemetry", OpenTelemetry.class, testing::getOpenTelemetry) + .withPropertyValues( + "spring.kafka.bootstrap-servers=" + kafka.getBootstrapServers(), + "spring.kafka.consumer.auto-offset-reset=earliest", + "spring.kafka.consumer.linger-ms=10", + "spring.kafka.listener.idle-between-polls=1000", + "spring.kafka.producer.transaction-id-prefix=test-"); + } + + @Test + void shouldInstrumentProducerAndConsumer() { + contextRunner.run(KafkaIntegrationTest::runShouldInstrumentProducerAndConsumer); + } + + @SuppressWarnings("unchecked") + private static void runShouldInstrumentProducerAndConsumer( + ConfigurableApplicationContext applicationContext) { + KafkaTemplate kafkaTemplate = applicationContext.getBean(KafkaTemplate.class); + + testing.runWithSpan( + "producer", + () -> { + kafkaTemplate.executeInTransaction( + ops -> { + ops.send("testTopic", "10", "testSpan"); + return 0; + }); + }); + + testing.waitAndAssertTraces( + trace -> + trace.hasSpansSatisfyingExactly( + span -> span.hasName("producer"), + span -> + span.hasName("testTopic send") + .hasKind(SpanKind.PRODUCER) + .hasParent(trace.getSpan(0)) + .hasAttributesSatisfyingExactly( + equalTo(SemanticAttributes.MESSAGING_SYSTEM, "kafka"), + equalTo(SemanticAttributes.MESSAGING_DESTINATION, "testTopic"), + equalTo(SemanticAttributes.MESSAGING_DESTINATION_KIND, "topic")), + span -> + span.hasName("testTopic process") + .hasKind(SpanKind.CONSUMER) + .hasParent(trace.getSpan(1)) + .hasAttributesSatisfyingExactly( + equalTo(SemanticAttributes.MESSAGING_SYSTEM, "kafka"), + equalTo(SemanticAttributes.MESSAGING_DESTINATION, "testTopic"), + equalTo(SemanticAttributes.MESSAGING_DESTINATION_KIND, "topic"), + equalTo(SemanticAttributes.MESSAGING_OPERATION, "process"), + satisfies( + SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES, + AbstractLongAssert::isNotNegative), + satisfies( + SemanticAttributes.MESSAGING_KAFKA_PARTITION, + AbstractLongAssert::isNotNegative)), + span -> span.hasName("consumer").hasParent(trace.getSpan(2)))); + } + + @Configuration + static class TestConfig { + + @Bean + public NewTopic testTopic() { + return TopicBuilder.name("testTopic").partitions(1).replicas(1).build(); + } + + @KafkaListener(id = "testListener", topics = "testTopic") + public void listener(ConsumerRecord record) { + testing.runWithSpan("consumer", () -> {}); + } + } +}