From 3ce940548c06c1bcb13f31bd12d080552d0ad714 Mon Sep 17 00:00:00 2001 From: Trask Stalnaker Date: Wed, 22 Sep 2021 21:50:05 -0700 Subject: [PATCH] Experimental option to suppress messaging receive spans (#4187) * Experimental option to suppress messaging receive spans * Kafka streams too * Better conditionals * Remove oops * Extract base class for kafka streams tests * Spotless --- .../api/config/ExperimentalConfig.java | 5 + .../javaagent/build.gradle.kts | 13 +- .../kafkaclients/KafkaSingletons.java | 11 +- ...t.groovy => KafkaClientDefaultTest.groovy} | 2 +- ...KafkaClientSuppressReceiveSpansTest.groovy | 182 ++++++++++++++++++ .../javaagent/build.gradle.kts | 19 +- .../kafkastreams/KafkaStreamsSingletons.java | 10 +- .../test/groovy/KafkaStreamsBaseTest.groovy | 111 +++++++++++ ....groovy => KafkaStreamsDefaultTest.groovy} | 104 +--------- ...afkaStreamsSuppressReceiveSpansTest.groovy | 171 ++++++++++++++++ 10 files changed, 518 insertions(+), 110 deletions(-) rename instrumentation/kafka-clients/kafka-clients-0.11/javaagent/src/test/groovy/{KafkaClientPropagationEnabledTest.groovy => KafkaClientDefaultTest.groovy} (99%) create mode 100644 instrumentation/kafka-clients/kafka-clients-0.11/javaagent/src/test/groovy/KafkaClientSuppressReceiveSpansTest.groovy create mode 100644 instrumentation/kafka-streams-0.11/javaagent/src/test/groovy/KafkaStreamsBaseTest.groovy rename instrumentation/kafka-streams-0.11/javaagent/src/test/groovy/{KafkaStreamsTest.groovy => KafkaStreamsDefaultTest.groovy} (68%) create mode 100644 instrumentation/kafka-streams-0.11/javaagent/src/test/groovy/KafkaStreamsSuppressReceiveSpansTest.groovy diff --git a/instrumentation-api/src/main/java/io/opentelemetry/instrumentation/api/config/ExperimentalConfig.java b/instrumentation-api/src/main/java/io/opentelemetry/instrumentation/api/config/ExperimentalConfig.java index 062acbfb7306..aa50ed302a12 100644 --- a/instrumentation-api/src/main/java/io/opentelemetry/instrumentation/api/config/ExperimentalConfig.java +++ b/instrumentation-api/src/main/java/io/opentelemetry/instrumentation/api/config/ExperimentalConfig.java @@ -28,4 +28,9 @@ public boolean suppressControllerSpans() { public boolean suppressViewSpans() { return config.getBoolean("otel.instrumentation.common.experimental.suppress-view-spans", false); } + + public boolean suppressMessagingReceiveSpans() { + return config.getBoolean( + "otel.instrumentation.common.experimental.suppress-messaging-receive-spans", false); + } } diff --git a/instrumentation/kafka-clients/kafka-clients-0.11/javaagent/build.gradle.kts b/instrumentation/kafka-clients/kafka-clients-0.11/javaagent/build.gradle.kts index 63336e72314a..e83069f3d73f 100644 --- a/instrumentation/kafka-clients/kafka-clients-0.11/javaagent/build.gradle.kts +++ b/instrumentation/kafka-clients/kafka-clients-0.11/javaagent/build.gradle.kts @@ -39,10 +39,21 @@ tasks { jvmArgs("-Dotel.instrumentation.kafka.client-propagation.enabled=false") } - named("test") { + val testReceiveSpansDisabled by registering(Test::class) { + filter { + includeTestsMatching("KafkaClientSuppressReceiveSpansTest") + isFailOnNoMatchingTests = false + } + include("**/KafkaClientSuppressReceiveSpansTest.*") + jvmArgs("-Dotel.instrumentation.common.experimental.suppress-messaging-receive-spans=true") + } + + test { dependsOn(testPropagationDisabled) + dependsOn(testReceiveSpansDisabled) filter { excludeTestsMatching("KafkaClientPropagationDisabledTest") + excludeTestsMatching("KafkaClientSuppressReceiveSpansTest") isFailOnNoMatchingTests = false } } diff --git a/instrumentation/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/KafkaSingletons.java b/instrumentation/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/KafkaSingletons.java index f0d33e7bbdae..7b40fad81d36 100644 --- a/instrumentation/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/KafkaSingletons.java +++ b/instrumentation/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/KafkaSingletons.java @@ -6,6 +6,7 @@ package io.opentelemetry.javaagent.instrumentation.kafkaclients; import io.opentelemetry.api.GlobalOpenTelemetry; +import io.opentelemetry.instrumentation.api.config.ExperimentalConfig; import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter; import io.opentelemetry.instrumentation.api.instrumenter.InstrumenterBuilder; import io.opentelemetry.instrumentation.api.instrumenter.SpanKindExtractor; @@ -52,6 +53,7 @@ private static Instrumenter buildConsumerReceiveInstrumen GlobalOpenTelemetry.get(), INSTRUMENTATION_NAME, spanNameExtractor) .addAttributesExtractor(attributesExtractor) .setTimeExtractors(ReceivedRecords::startTime, (request, response, error) -> request.now()) + .setDisabled(ExperimentalConfig.get().suppressMessagingReceiveSpans()) .newInstrumenter(SpanKindExtractor.alwaysConsumer()); } @@ -69,12 +71,17 @@ private static Instrumenter buildConsumerReceiveInstrumen if (KafkaConsumerExperimentalAttributesExtractor.isEnabled()) { builder.addAttributesExtractor(new KafkaConsumerExperimentalAttributesExtractor()); } - if (KafkaPropagation.isPropagationEnabled()) { + + if (!KafkaPropagation.isPropagationEnabled()) { + return builder.newInstrumenter(SpanKindExtractor.alwaysConsumer()); + } else if (ExperimentalConfig.get().suppressMessagingReceiveSpans()) { + return builder.newConsumerInstrumenter(new KafkaHeadersGetter()); + } else { builder.addSpanLinksExtractor( SpanLinksExtractor.fromUpstreamRequest( GlobalOpenTelemetry.getPropagators(), new KafkaHeadersGetter())); + return builder.newInstrumenter(SpanKindExtractor.alwaysConsumer()); } - return builder.newInstrumenter(SpanKindExtractor.alwaysConsumer()); } public static Instrumenter, Void> producerInstrumenter() { diff --git a/instrumentation/kafka-clients/kafka-clients-0.11/javaagent/src/test/groovy/KafkaClientPropagationEnabledTest.groovy b/instrumentation/kafka-clients/kafka-clients-0.11/javaagent/src/test/groovy/KafkaClientDefaultTest.groovy similarity index 99% rename from instrumentation/kafka-clients/kafka-clients-0.11/javaagent/src/test/groovy/KafkaClientPropagationEnabledTest.groovy rename to instrumentation/kafka-clients/kafka-clients-0.11/javaagent/src/test/groovy/KafkaClientDefaultTest.groovy index 2590b703c214..1af6ccf5ac73 100644 --- a/instrumentation/kafka-clients/kafka-clients-0.11/javaagent/src/test/groovy/KafkaClientPropagationEnabledTest.groovy +++ b/instrumentation/kafka-clients/kafka-clients-0.11/javaagent/src/test/groovy/KafkaClientDefaultTest.groovy @@ -15,7 +15,7 @@ import static io.opentelemetry.api.trace.SpanKind.CONSUMER import static io.opentelemetry.api.trace.SpanKind.INTERNAL import static io.opentelemetry.api.trace.SpanKind.PRODUCER -class KafkaClientPropagationEnabledTest extends KafkaClientBaseTest { +class KafkaClientDefaultTest extends KafkaClientBaseTest { def "test kafka produce and consume"() { when: diff --git a/instrumentation/kafka-clients/kafka-clients-0.11/javaagent/src/test/groovy/KafkaClientSuppressReceiveSpansTest.groovy b/instrumentation/kafka-clients/kafka-clients-0.11/javaagent/src/test/groovy/KafkaClientSuppressReceiveSpansTest.groovy new file mode 100644 index 000000000000..0b60b16a9128 --- /dev/null +++ b/instrumentation/kafka-clients/kafka-clients-0.11/javaagent/src/test/groovy/KafkaClientSuppressReceiveSpansTest.groovy @@ -0,0 +1,182 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +import io.opentelemetry.semconv.trace.attributes.SemanticAttributes +import org.apache.kafka.clients.producer.ProducerRecord +import org.apache.kafka.common.TopicPartition + +import java.time.Duration + +import static io.opentelemetry.api.trace.SpanKind.CONSUMER +import static io.opentelemetry.api.trace.SpanKind.INTERNAL +import static io.opentelemetry.api.trace.SpanKind.PRODUCER + +class KafkaClientSuppressReceiveSpansTest extends KafkaClientBaseTest { + + def "test kafka produce and consume"() { + when: + String greeting = "Hello Kafka!" + runWithSpan("parent") { + producer.send(new ProducerRecord(SHARED_TOPIC, greeting)) { meta, ex -> + if (ex == null) { + runWithSpan("producer callback") {} + } else { + runWithSpan("producer exception: " + ex) {} + } + } + } + + then: + // check that the message was received + def records = consumer.poll(Duration.ofSeconds(5).toMillis()) + for (record in records) { + runWithSpan("processing") { + assert record.value() == greeting + assert record.key() == null + } + } + + assertTraces(1) { + trace(0, 5) { + span(0) { + name "parent" + kind INTERNAL + hasNoParent() + } + span(1) { + name SHARED_TOPIC + " send" + kind PRODUCER + childOf span(0) + attributes { + "${SemanticAttributes.MESSAGING_SYSTEM.key}" "kafka" + "${SemanticAttributes.MESSAGING_DESTINATION.key}" SHARED_TOPIC + "${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "topic" + } + } + span(2) { + name SHARED_TOPIC + " process" + kind CONSUMER + childOf span(1) + attributes { + "${SemanticAttributes.MESSAGING_SYSTEM.key}" "kafka" + "${SemanticAttributes.MESSAGING_DESTINATION.key}" SHARED_TOPIC + "${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "topic" + "${SemanticAttributes.MESSAGING_OPERATION.key}" "process" + "${SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES.key}" Long + "${SemanticAttributes.MESSAGING_KAFKA_PARTITION.key}" { it >= 0 } + "kafka.offset" Long + "kafka.record.queue_time_ms" { it >= 0 } + } + } + span(3) { + name "processing" + childOf span(2) + } + span(4) { + name "producer callback" + kind INTERNAL + childOf span(0) + } + } + } + } + + def "test pass through tombstone"() { + when: + producer.send(new ProducerRecord<>(SHARED_TOPIC, null)) + + then: + // check that the message was received + def records = consumer.poll(Duration.ofSeconds(5).toMillis()) + for (record in records) { + assert record.value() == null + assert record.key() == null + } + + assertTraces(1) { + trace(0, 2) { + span(0) { + name SHARED_TOPIC + " send" + kind PRODUCER + hasNoParent() + attributes { + "${SemanticAttributes.MESSAGING_SYSTEM.key}" "kafka" + "${SemanticAttributes.MESSAGING_DESTINATION.key}" SHARED_TOPIC + "${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "topic" + "${SemanticAttributes.MESSAGING_KAFKA_TOMBSTONE.key}" true + } + } + span(1) { + name SHARED_TOPIC + " process" + kind CONSUMER + childOf span(0) + attributes { + "${SemanticAttributes.MESSAGING_SYSTEM.key}" "kafka" + "${SemanticAttributes.MESSAGING_DESTINATION.key}" SHARED_TOPIC + "${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "topic" + "${SemanticAttributes.MESSAGING_OPERATION.key}" "process" + "${SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES.key}" Long + "${SemanticAttributes.MESSAGING_KAFKA_PARTITION.key}" { it >= 0 } + "${SemanticAttributes.MESSAGING_KAFKA_TOMBSTONE.key}" true + "kafka.offset" Long + "kafka.record.queue_time_ms" { it >= 0 } + } + } + } + } + } + + def "test records(TopicPartition) kafka consume"() { + setup: + def partition = 0 + + when: "send message" + def greeting = "Hello from MockConsumer!" + producer.send(new ProducerRecord<>(SHARED_TOPIC, partition, null, greeting)) + + then: "wait for PRODUCER span" + waitForTraces(1) + + when: "receive messages" + def consumerRecords = consumer.poll(Duration.ofSeconds(5).toMillis()) + def recordsInPartition = consumerRecords.records(new TopicPartition(SHARED_TOPIC, partition)) + for (record in recordsInPartition) { + assert record.value() == greeting + assert record.key() == null + } + + then: + assertTraces(1) { + trace(0, 2) { + span(0) { + name SHARED_TOPIC + " send" + kind PRODUCER + hasNoParent() + attributes { + "${SemanticAttributes.MESSAGING_SYSTEM.key}" "kafka" + "${SemanticAttributes.MESSAGING_DESTINATION.key}" SHARED_TOPIC + "${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "topic" + "${SemanticAttributes.MESSAGING_KAFKA_PARTITION.key}" { it >= 0 } + } + } + span(1) { + name SHARED_TOPIC + " process" + kind CONSUMER + childOf span(0) + attributes { + "${SemanticAttributes.MESSAGING_SYSTEM.key}" "kafka" + "${SemanticAttributes.MESSAGING_DESTINATION.key}" SHARED_TOPIC + "${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "topic" + "${SemanticAttributes.MESSAGING_OPERATION.key}" "process" + "${SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES.key}" Long + "${SemanticAttributes.MESSAGING_KAFKA_PARTITION.key}" { it >= 0 } + "kafka.offset" Long + "kafka.record.queue_time_ms" { it >= 0 } + } + } + } + } + } +} diff --git a/instrumentation/kafka-streams-0.11/javaagent/build.gradle.kts b/instrumentation/kafka-streams-0.11/javaagent/build.gradle.kts index c9caa2af0c17..d29de9126d96 100644 --- a/instrumentation/kafka-streams-0.11/javaagent/build.gradle.kts +++ b/instrumentation/kafka-streams-0.11/javaagent/build.gradle.kts @@ -25,10 +25,27 @@ dependencies { } tasks { - test { + withType().configureEach { usesService(gradle.sharedServices.registrations["testcontainersBuildService"].service) // TODO run tests both with and without experimental span attributes jvmArgs("-Dotel.instrumentation.kafka.experimental-span-attributes=true") } + + val testReceiveSpansDisabled by registering(Test::class) { + filter { + includeTestsMatching("KafkaStreamsSuppressReceiveSpansTest") + isFailOnNoMatchingTests = false + } + include("**/KafkaStreamsSuppressReceiveSpansTest.*") + jvmArgs("-Dotel.instrumentation.common.experimental.suppress-messaging-receive-spans=true") + } + + test { + dependsOn(testReceiveSpansDisabled) + filter { + excludeTestsMatching("KafkaStreamsSuppressReceiveSpansTest") + isFailOnNoMatchingTests = false + } + } } diff --git a/instrumentation/kafka-streams-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkastreams/KafkaStreamsSingletons.java b/instrumentation/kafka-streams-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkastreams/KafkaStreamsSingletons.java index 26ca1bb6b755..aa7845d9b760 100644 --- a/instrumentation/kafka-streams-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkastreams/KafkaStreamsSingletons.java +++ b/instrumentation/kafka-streams-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkastreams/KafkaStreamsSingletons.java @@ -6,6 +6,7 @@ package io.opentelemetry.javaagent.instrumentation.kafkastreams; import io.opentelemetry.api.GlobalOpenTelemetry; +import io.opentelemetry.instrumentation.api.config.ExperimentalConfig; import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter; import io.opentelemetry.instrumentation.api.instrumenter.InstrumenterBuilder; import io.opentelemetry.instrumentation.api.instrumenter.SpanKindExtractor; @@ -40,12 +41,17 @@ public final class KafkaStreamsSingletons { if (KafkaConsumerExperimentalAttributesExtractor.isEnabled()) { builder.addAttributesExtractor(new KafkaConsumerExperimentalAttributesExtractor()); } - if (KafkaPropagation.isPropagationEnabled()) { + + if (!KafkaPropagation.isPropagationEnabled()) { + return builder.newInstrumenter(SpanKindExtractor.alwaysConsumer()); + } else if (ExperimentalConfig.get().suppressMessagingReceiveSpans()) { + return builder.newConsumerInstrumenter(new KafkaHeadersGetter()); + } else { builder.addSpanLinksExtractor( SpanLinksExtractor.fromUpstreamRequest( GlobalOpenTelemetry.getPropagators(), new KafkaHeadersGetter())); + return builder.newInstrumenter(SpanKindExtractor.alwaysConsumer()); } - return builder.newInstrumenter(SpanKindExtractor.alwaysConsumer()); } public static Instrumenter, Void> instrumenter() { diff --git a/instrumentation/kafka-streams-0.11/javaagent/src/test/groovy/KafkaStreamsBaseTest.groovy b/instrumentation/kafka-streams-0.11/javaagent/src/test/groovy/KafkaStreamsBaseTest.groovy new file mode 100644 index 000000000000..e2c68bb5c7a0 --- /dev/null +++ b/instrumentation/kafka-streams-0.11/javaagent/src/test/groovy/KafkaStreamsBaseTest.groovy @@ -0,0 +1,111 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +import io.opentelemetry.instrumentation.test.AgentInstrumentationSpecification +import org.apache.kafka.clients.admin.AdminClient +import org.apache.kafka.clients.admin.NewTopic +import org.apache.kafka.clients.consumer.Consumer +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener +import org.apache.kafka.clients.consumer.KafkaConsumer +import org.apache.kafka.clients.producer.KafkaProducer +import org.apache.kafka.clients.producer.Producer +import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.serialization.IntegerDeserializer +import org.apache.kafka.common.serialization.IntegerSerializer +import org.apache.kafka.common.serialization.StringDeserializer +import org.apache.kafka.common.serialization.StringSerializer +import org.testcontainers.containers.KafkaContainer +import spock.lang.Shared + +import java.util.concurrent.CountDownLatch +import java.util.concurrent.TimeUnit + +class KafkaStreamsBaseTest extends AgentInstrumentationSpecification { + + protected static final STREAM_PENDING = "test.pending" + protected static final STREAM_PROCESSED = "test.processed" + + @Shared + static KafkaContainer kafka + @Shared + static Producer producer + @Shared + static Consumer consumer + @Shared + static CountDownLatch consumerReady = new CountDownLatch(1) + + def setupSpec() { + kafka = new KafkaContainer() + kafka.start() + + // create test topic + AdminClient.create(["bootstrap.servers": kafka.bootstrapServers]).withCloseable { admin -> + admin.createTopics([ + new NewTopic(STREAM_PENDING, 1, (short) 1), + new NewTopic(STREAM_PROCESSED, 1, (short) 1), + ]).all().get(10, TimeUnit.SECONDS) + } + + producer = new KafkaProducer<>(producerProps(kafka.bootstrapServers)) + + def consumerProps = [ + "bootstrap.servers" : kafka.bootstrapServers, + "group.id" : "test", + "enable.auto.commit" : "true", + "auto.commit.interval.ms": "10", + "session.timeout.ms" : "30000", + "key.deserializer" : IntegerDeserializer, + "value.deserializer" : StringDeserializer + ] + consumer = new KafkaConsumer<>(consumerProps) + + consumer.subscribe([STREAM_PROCESSED], new ConsumerRebalanceListener() { + @Override + void onPartitionsRevoked(Collection collection) { + } + + @Override + void onPartitionsAssigned(Collection collection) { + consumerReady.countDown() + } + }) + } + + def cleanupSpec() { + consumer?.close() + producer?.close() + kafka.stop() + } + + static Map producerProps(String servers) { + // values copied from spring's KafkaTestUtils + return [ + "bootstrap.servers": servers, + "retries" : 0, + "batch.size" : "16384", + "linger.ms" : 1, + "buffer.memory" : "33554432", + "key.serializer" : IntegerSerializer, + "value.serializer" : StringSerializer + ] + } + + // Kafka's eventual consistency behavior forces us to do a couple of empty poll() calls until it gets properly assigned a topic partition + static void awaitUntilConsumerIsReady() { + if (consumerReady.await(0, TimeUnit.SECONDS)) { + return + } + for (i in 0..<10) { + consumer.poll(0) + if (consumerReady.await(1, TimeUnit.SECONDS)) { + break + } + } + if (consumerReady.getCount() != 0) { + throw new AssertionError("Consumer wasn't assigned any partitions!") + } + consumer.seekToBeginning([]) + } +} diff --git a/instrumentation/kafka-streams-0.11/javaagent/src/test/groovy/KafkaStreamsTest.groovy b/instrumentation/kafka-streams-0.11/javaagent/src/test/groovy/KafkaStreamsDefaultTest.groovy similarity index 68% rename from instrumentation/kafka-streams-0.11/javaagent/src/test/groovy/KafkaStreamsTest.groovy rename to instrumentation/kafka-streams-0.11/javaagent/src/test/groovy/KafkaStreamsDefaultTest.groovy index 2e18a354e78b..3d371a976b4b 100644 --- a/instrumentation/kafka-streams-0.11/javaagent/src/test/groovy/KafkaStreamsTest.groovy +++ b/instrumentation/kafka-streams-0.11/javaagent/src/test/groovy/KafkaStreamsDefaultTest.groovy @@ -7,94 +7,22 @@ import io.opentelemetry.api.trace.Span import io.opentelemetry.api.trace.propagation.W3CTraceContextPropagator import io.opentelemetry.context.Context import io.opentelemetry.context.propagation.TextMapGetter -import io.opentelemetry.instrumentation.test.AgentInstrumentationSpecification import io.opentelemetry.sdk.trace.data.SpanData import io.opentelemetry.semconv.trace.attributes.SemanticAttributes -import org.apache.kafka.clients.admin.AdminClient -import org.apache.kafka.clients.admin.NewTopic -import org.apache.kafka.clients.consumer.Consumer -import org.apache.kafka.clients.consumer.ConsumerRebalanceListener -import org.apache.kafka.clients.consumer.KafkaConsumer -import org.apache.kafka.clients.producer.KafkaProducer -import org.apache.kafka.clients.producer.Producer import org.apache.kafka.clients.producer.ProducerRecord -import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.header.Headers -import org.apache.kafka.common.serialization.IntegerDeserializer -import org.apache.kafka.common.serialization.IntegerSerializer import org.apache.kafka.common.serialization.Serdes -import org.apache.kafka.common.serialization.StringDeserializer -import org.apache.kafka.common.serialization.StringSerializer import org.apache.kafka.streams.KafkaStreams import org.apache.kafka.streams.StreamsConfig import org.apache.kafka.streams.kstream.KStream import org.apache.kafka.streams.kstream.ValueMapper -import org.testcontainers.containers.KafkaContainer -import spock.lang.Shared import java.time.Duration -import java.util.concurrent.CountDownLatch -import java.util.concurrent.TimeUnit import static io.opentelemetry.api.trace.SpanKind.CONSUMER import static io.opentelemetry.api.trace.SpanKind.PRODUCER -class KafkaStreamsTest extends AgentInstrumentationSpecification { - - static final STREAM_PENDING = "test.pending" - static final STREAM_PROCESSED = "test.processed" - - @Shared - static KafkaContainer kafka - @Shared - static Producer producer - @Shared - static Consumer consumer - @Shared - static CountDownLatch consumerReady = new CountDownLatch(1) - - def setupSpec() { - kafka = new KafkaContainer() - kafka.start() - - // create test topic - AdminClient.create(["bootstrap.servers": kafka.bootstrapServers]).withCloseable { admin -> - admin.createTopics([ - new NewTopic(STREAM_PENDING, 1, (short) 1), - new NewTopic(STREAM_PROCESSED, 1, (short) 1), - ]).all().get(10, TimeUnit.SECONDS) - } - - producer = new KafkaProducer<>(producerProps(kafka.bootstrapServers)) - - def consumerProps = [ - "bootstrap.servers" : kafka.bootstrapServers, - "group.id" : "test", - "enable.auto.commit" : "true", - "auto.commit.interval.ms": "10", - "session.timeout.ms" : "30000", - "key.deserializer" : IntegerDeserializer, - "value.deserializer" : StringDeserializer - ] - consumer = new KafkaConsumer<>(consumerProps) - - consumer.subscribe([STREAM_PROCESSED], new ConsumerRebalanceListener() { - @Override - void onPartitionsRevoked(Collection collection) { - } - - @Override - void onPartitionsAssigned(Collection collection) { - consumerReady.countDown() - } - }) - } - - def cleanupSpec() { - consumer?.close() - producer?.close() - kafka.stop() - } +class KafkaStreamsDefaultTest extends KafkaStreamsBaseTest { def "test kafka produce and consume with streams in-between"() { setup: @@ -278,34 +206,4 @@ class KafkaStreamsTest extends AgentInstrumentationSpecification { spanContext.traceId == streamSendSpan.traceId spanContext.spanId == streamSendSpan.spanId } - - private static Map producerProps(String servers) { - // values copied from spring's KafkaTestUtils - return [ - "bootstrap.servers": servers, - "retries" : 0, - "batch.size" : "16384", - "linger.ms" : 1, - "buffer.memory" : "33554432", - "key.serializer" : IntegerSerializer, - "value.serializer" : StringSerializer - ] - } - - // Kafka's eventual consistency behavior forces us to do a couple of empty poll() calls until it gets properly assigned a topic partition - static void awaitUntilConsumerIsReady() { - if (consumerReady.await(0, TimeUnit.SECONDS)) { - return - } - for (i in 0..<10) { - consumer.poll(0) - if (consumerReady.await(1, TimeUnit.SECONDS)) { - break - } - } - if (consumerReady.getCount() != 0) { - throw new AssertionError("Consumer wasn't assigned any partitions!") - } - consumer.seekToBeginning([]) - } } diff --git a/instrumentation/kafka-streams-0.11/javaagent/src/test/groovy/KafkaStreamsSuppressReceiveSpansTest.groovy b/instrumentation/kafka-streams-0.11/javaagent/src/test/groovy/KafkaStreamsSuppressReceiveSpansTest.groovy new file mode 100644 index 000000000000..c36e606e2458 --- /dev/null +++ b/instrumentation/kafka-streams-0.11/javaagent/src/test/groovy/KafkaStreamsSuppressReceiveSpansTest.groovy @@ -0,0 +1,171 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +import io.opentelemetry.api.trace.Span +import io.opentelemetry.api.trace.propagation.W3CTraceContextPropagator +import io.opentelemetry.context.Context +import io.opentelemetry.context.propagation.TextMapGetter +import io.opentelemetry.sdk.trace.data.SpanData +import io.opentelemetry.semconv.trace.attributes.SemanticAttributes +import org.apache.kafka.clients.producer.ProducerRecord +import org.apache.kafka.common.header.Headers +import org.apache.kafka.common.serialization.Serdes +import org.apache.kafka.streams.KafkaStreams +import org.apache.kafka.streams.StreamsConfig +import org.apache.kafka.streams.kstream.KStream +import org.apache.kafka.streams.kstream.ValueMapper + +import java.time.Duration + +import static io.opentelemetry.api.trace.SpanKind.CONSUMER +import static io.opentelemetry.api.trace.SpanKind.PRODUCER + +class KafkaStreamsSuppressReceiveSpansTest extends KafkaStreamsBaseTest { + + def "test kafka produce and consume with streams in-between"() { + setup: + def config = new Properties() + config.putAll(producerProps(kafka.bootstrapServers)) + config.put(StreamsConfig.APPLICATION_ID_CONFIG, "test-application") + config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass().getName()) + config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()) + + // CONFIGURE PROCESSOR + def builder + try { + // Different class names for test and latestDepTest. + builder = Class.forName("org.apache.kafka.streams.kstream.KStreamBuilder").newInstance() + } catch (ClassNotFoundException | NoClassDefFoundError e) { + builder = Class.forName("org.apache.kafka.streams.StreamsBuilder").newInstance() + } + KStream textLines = builder.stream(STREAM_PENDING) + def values = textLines + .mapValues(new ValueMapper() { + @Override + String apply(String textLine) { + Span.current().setAttribute("asdf", "testing") + return textLine.toLowerCase() + } + }) + + KafkaStreams streams + try { + // Different api for test and latestDepTest. + values.to(Serdes.String(), Serdes.String(), STREAM_PROCESSED) + streams = new KafkaStreams(builder, config) + } catch (MissingMethodException e) { + def producer = Class.forName("org.apache.kafka.streams.kstream.Produced") + .with(Serdes.String(), Serdes.String()) + values.to(STREAM_PROCESSED, producer) + streams = new KafkaStreams(builder.build(), config) + } + streams.start() + + when: + String greeting = "TESTING TESTING 123!" + producer.send(new ProducerRecord<>(STREAM_PENDING, greeting)) + + then: + // check that the message was received + def records = consumer.poll(Duration.ofSeconds(10).toMillis()) + Headers receivedHeaders = null + for (record in records) { + Span.current().setAttribute("testing", 123) + + assert record.value() == greeting.toLowerCase() + assert record.key() == null + + if (receivedHeaders == null) { + receivedHeaders = record.headers() + } + } + + SpanData streamSendSpan + + assertTraces(1) { + trace(0, 4) { + // kafka-clients PRODUCER + span(0) { + name STREAM_PENDING + " send" + kind PRODUCER + hasNoParent() + attributes { + "${SemanticAttributes.MESSAGING_SYSTEM.key}" "kafka" + "${SemanticAttributes.MESSAGING_DESTINATION.key}" STREAM_PENDING + "${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "topic" + } + } + // kafka-stream CONSUMER + span(1) { + name STREAM_PENDING + " process" + kind CONSUMER + childOf span(0) + attributes { + "${SemanticAttributes.MESSAGING_SYSTEM.key}" "kafka" + "${SemanticAttributes.MESSAGING_DESTINATION.key}" STREAM_PENDING + "${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "topic" + "${SemanticAttributes.MESSAGING_OPERATION.key}" "process" + "${SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES.key}" Long + "${SemanticAttributes.MESSAGING_KAFKA_PARTITION.key}" { it >= 0 } + "kafka.offset" 0 + "kafka.record.queue_time_ms" { it >= 0 } + "asdf" "testing" + } + } + + streamSendSpan = span(2) + + // kafka-clients PRODUCER + span(2) { + name STREAM_PROCESSED + " send" + kind PRODUCER + childOf span(1) + attributes { + "${SemanticAttributes.MESSAGING_SYSTEM.key}" "kafka" + "${SemanticAttributes.MESSAGING_DESTINATION.key}" STREAM_PROCESSED + "${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "topic" + } + } + // kafka-clients CONSUMER process + span(3) { + name STREAM_PROCESSED + " process" + kind CONSUMER + childOf span(2) + attributes { + "${SemanticAttributes.MESSAGING_SYSTEM.key}" "kafka" + "${SemanticAttributes.MESSAGING_DESTINATION.key}" STREAM_PROCESSED + "${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "topic" + "${SemanticAttributes.MESSAGING_OPERATION.key}" "process" + "${SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES.key}" Long + "${SemanticAttributes.MESSAGING_KAFKA_PARTITION.key}" { it >= 0 } + "kafka.offset" 0 + "kafka.record.queue_time_ms" { it >= 0 } + "testing" 123 + } + } + } + } + + receivedHeaders.iterator().hasNext() + def traceparent = new String(receivedHeaders.headers("traceparent").iterator().next().value()) + Context context = W3CTraceContextPropagator.instance.extract(Context.root(), "", new TextMapGetter() { + @Override + Iterable keys(String carrier) { + return Collections.singleton("traceparent") + } + + @Override + String get(String carrier, String key) { + if (key == "traceparent") { + return traceparent + } + return null + } + }) + def spanContext = Span.fromContext(context).getSpanContext() + spanContext.traceId == streamSendSpan.traceId + spanContext.spanId == streamSendSpan.spanId + } +}