Skip to content

Commit

Permalink
Experimental option to suppress messaging receive spans (#4187)
Browse files Browse the repository at this point in the history
* Experimental option to suppress messaging receive spans

* Kafka streams too

* Better conditionals

* Remove oops

* Extract base class for kafka streams tests

* Spotless
  • Loading branch information
trask committed Sep 23, 2021
1 parent 0f3d0cb commit 3ce9405
Show file tree
Hide file tree
Showing 10 changed files with 518 additions and 110 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,9 @@ public boolean suppressControllerSpans() {
public boolean suppressViewSpans() {
return config.getBoolean("otel.instrumentation.common.experimental.suppress-view-spans", false);
}

public boolean suppressMessagingReceiveSpans() {
return config.getBoolean(
"otel.instrumentation.common.experimental.suppress-messaging-receive-spans", false);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,21 @@ tasks {
jvmArgs("-Dotel.instrumentation.kafka.client-propagation.enabled=false")
}

named<Test>("test") {
val testReceiveSpansDisabled by registering(Test::class) {
filter {
includeTestsMatching("KafkaClientSuppressReceiveSpansTest")
isFailOnNoMatchingTests = false
}
include("**/KafkaClientSuppressReceiveSpansTest.*")
jvmArgs("-Dotel.instrumentation.common.experimental.suppress-messaging-receive-spans=true")
}

test {
dependsOn(testPropagationDisabled)
dependsOn(testReceiveSpansDisabled)
filter {
excludeTestsMatching("KafkaClientPropagationDisabledTest")
excludeTestsMatching("KafkaClientSuppressReceiveSpansTest")
isFailOnNoMatchingTests = false
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package io.opentelemetry.javaagent.instrumentation.kafkaclients;

import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.instrumentation.api.config.ExperimentalConfig;
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
import io.opentelemetry.instrumentation.api.instrumenter.InstrumenterBuilder;
import io.opentelemetry.instrumentation.api.instrumenter.SpanKindExtractor;
Expand Down Expand Up @@ -52,6 +53,7 @@ private static Instrumenter<ReceivedRecords, Void> buildConsumerReceiveInstrumen
GlobalOpenTelemetry.get(), INSTRUMENTATION_NAME, spanNameExtractor)
.addAttributesExtractor(attributesExtractor)
.setTimeExtractors(ReceivedRecords::startTime, (request, response, error) -> request.now())
.setDisabled(ExperimentalConfig.get().suppressMessagingReceiveSpans())
.newInstrumenter(SpanKindExtractor.alwaysConsumer());
}

Expand All @@ -69,12 +71,17 @@ private static Instrumenter<ReceivedRecords, Void> buildConsumerReceiveInstrumen
if (KafkaConsumerExperimentalAttributesExtractor.isEnabled()) {
builder.addAttributesExtractor(new KafkaConsumerExperimentalAttributesExtractor());
}
if (KafkaPropagation.isPropagationEnabled()) {

if (!KafkaPropagation.isPropagationEnabled()) {
return builder.newInstrumenter(SpanKindExtractor.alwaysConsumer());
} else if (ExperimentalConfig.get().suppressMessagingReceiveSpans()) {
return builder.newConsumerInstrumenter(new KafkaHeadersGetter());
} else {
builder.addSpanLinksExtractor(
SpanLinksExtractor.fromUpstreamRequest(
GlobalOpenTelemetry.getPropagators(), new KafkaHeadersGetter()));
return builder.newInstrumenter(SpanKindExtractor.alwaysConsumer());
}
return builder.newInstrumenter(SpanKindExtractor.alwaysConsumer());
}

public static Instrumenter<ProducerRecord<?, ?>, Void> producerInstrumenter() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import static io.opentelemetry.api.trace.SpanKind.CONSUMER
import static io.opentelemetry.api.trace.SpanKind.INTERNAL
import static io.opentelemetry.api.trace.SpanKind.PRODUCER

class KafkaClientPropagationEnabledTest extends KafkaClientBaseTest {
class KafkaClientDefaultTest extends KafkaClientBaseTest {

def "test kafka produce and consume"() {
when:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

import io.opentelemetry.semconv.trace.attributes.SemanticAttributes
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.TopicPartition

import java.time.Duration

import static io.opentelemetry.api.trace.SpanKind.CONSUMER
import static io.opentelemetry.api.trace.SpanKind.INTERNAL
import static io.opentelemetry.api.trace.SpanKind.PRODUCER

class KafkaClientSuppressReceiveSpansTest extends KafkaClientBaseTest {

def "test kafka produce and consume"() {
when:
String greeting = "Hello Kafka!"
runWithSpan("parent") {
producer.send(new ProducerRecord(SHARED_TOPIC, greeting)) { meta, ex ->
if (ex == null) {
runWithSpan("producer callback") {}
} else {
runWithSpan("producer exception: " + ex) {}
}
}
}

then:
// check that the message was received
def records = consumer.poll(Duration.ofSeconds(5).toMillis())
for (record in records) {
runWithSpan("processing") {
assert record.value() == greeting
assert record.key() == null
}
}

assertTraces(1) {
trace(0, 5) {
span(0) {
name "parent"
kind INTERNAL
hasNoParent()
}
span(1) {
name SHARED_TOPIC + " send"
kind PRODUCER
childOf span(0)
attributes {
"${SemanticAttributes.MESSAGING_SYSTEM.key}" "kafka"
"${SemanticAttributes.MESSAGING_DESTINATION.key}" SHARED_TOPIC
"${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "topic"
}
}
span(2) {
name SHARED_TOPIC + " process"
kind CONSUMER
childOf span(1)
attributes {
"${SemanticAttributes.MESSAGING_SYSTEM.key}" "kafka"
"${SemanticAttributes.MESSAGING_DESTINATION.key}" SHARED_TOPIC
"${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "topic"
"${SemanticAttributes.MESSAGING_OPERATION.key}" "process"
"${SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES.key}" Long
"${SemanticAttributes.MESSAGING_KAFKA_PARTITION.key}" { it >= 0 }
"kafka.offset" Long
"kafka.record.queue_time_ms" { it >= 0 }
}
}
span(3) {
name "processing"
childOf span(2)
}
span(4) {
name "producer callback"
kind INTERNAL
childOf span(0)
}
}
}
}

def "test pass through tombstone"() {
when:
producer.send(new ProducerRecord<>(SHARED_TOPIC, null))

then:
// check that the message was received
def records = consumer.poll(Duration.ofSeconds(5).toMillis())
for (record in records) {
assert record.value() == null
assert record.key() == null
}

assertTraces(1) {
trace(0, 2) {
span(0) {
name SHARED_TOPIC + " send"
kind PRODUCER
hasNoParent()
attributes {
"${SemanticAttributes.MESSAGING_SYSTEM.key}" "kafka"
"${SemanticAttributes.MESSAGING_DESTINATION.key}" SHARED_TOPIC
"${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "topic"
"${SemanticAttributes.MESSAGING_KAFKA_TOMBSTONE.key}" true
}
}
span(1) {
name SHARED_TOPIC + " process"
kind CONSUMER
childOf span(0)
attributes {
"${SemanticAttributes.MESSAGING_SYSTEM.key}" "kafka"
"${SemanticAttributes.MESSAGING_DESTINATION.key}" SHARED_TOPIC
"${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "topic"
"${SemanticAttributes.MESSAGING_OPERATION.key}" "process"
"${SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES.key}" Long
"${SemanticAttributes.MESSAGING_KAFKA_PARTITION.key}" { it >= 0 }
"${SemanticAttributes.MESSAGING_KAFKA_TOMBSTONE.key}" true
"kafka.offset" Long
"kafka.record.queue_time_ms" { it >= 0 }
}
}
}
}
}

def "test records(TopicPartition) kafka consume"() {
setup:
def partition = 0
when: "send message"
def greeting = "Hello from MockConsumer!"
producer.send(new ProducerRecord<>(SHARED_TOPIC, partition, null, greeting))
then: "wait for PRODUCER span"
waitForTraces(1)
when: "receive messages"
def consumerRecords = consumer.poll(Duration.ofSeconds(5).toMillis())
def recordsInPartition = consumerRecords.records(new TopicPartition(SHARED_TOPIC, partition))
for (record in recordsInPartition) {
assert record.value() == greeting
assert record.key() == null
}
then:
assertTraces(1) {
trace(0, 2) {
span(0) {
name SHARED_TOPIC + " send"
kind PRODUCER
hasNoParent()
attributes {
"${SemanticAttributes.MESSAGING_SYSTEM.key}" "kafka"
"${SemanticAttributes.MESSAGING_DESTINATION.key}" SHARED_TOPIC
"${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "topic"
"${SemanticAttributes.MESSAGING_KAFKA_PARTITION.key}" { it >= 0 }
}
}
span(1) {
name SHARED_TOPIC + " process"
kind CONSUMER
childOf span(0)
attributes {
"${SemanticAttributes.MESSAGING_SYSTEM.key}" "kafka"
"${SemanticAttributes.MESSAGING_DESTINATION.key}" SHARED_TOPIC
"${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "topic"
"${SemanticAttributes.MESSAGING_OPERATION.key}" "process"
"${SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES.key}" Long
"${SemanticAttributes.MESSAGING_KAFKA_PARTITION.key}" { it >= 0 }
"kafka.offset" Long
"kafka.record.queue_time_ms" { it >= 0 }
}
}
}
}
}
}
19 changes: 18 additions & 1 deletion instrumentation/kafka-streams-0.11/javaagent/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,27 @@ dependencies {
}

tasks {
test {
withType<Test>().configureEach {
usesService(gradle.sharedServices.registrations["testcontainersBuildService"].service)

// TODO run tests both with and without experimental span attributes
jvmArgs("-Dotel.instrumentation.kafka.experimental-span-attributes=true")
}

val testReceiveSpansDisabled by registering(Test::class) {
filter {
includeTestsMatching("KafkaStreamsSuppressReceiveSpansTest")
isFailOnNoMatchingTests = false
}
include("**/KafkaStreamsSuppressReceiveSpansTest.*")
jvmArgs("-Dotel.instrumentation.common.experimental.suppress-messaging-receive-spans=true")
}

test {
dependsOn(testReceiveSpansDisabled)
filter {
excludeTestsMatching("KafkaStreamsSuppressReceiveSpansTest")
isFailOnNoMatchingTests = false
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package io.opentelemetry.javaagent.instrumentation.kafkastreams;

import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.instrumentation.api.config.ExperimentalConfig;
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
import io.opentelemetry.instrumentation.api.instrumenter.InstrumenterBuilder;
import io.opentelemetry.instrumentation.api.instrumenter.SpanKindExtractor;
Expand Down Expand Up @@ -40,12 +41,17 @@ public final class KafkaStreamsSingletons {
if (KafkaConsumerExperimentalAttributesExtractor.isEnabled()) {
builder.addAttributesExtractor(new KafkaConsumerExperimentalAttributesExtractor());
}
if (KafkaPropagation.isPropagationEnabled()) {

if (!KafkaPropagation.isPropagationEnabled()) {
return builder.newInstrumenter(SpanKindExtractor.alwaysConsumer());
} else if (ExperimentalConfig.get().suppressMessagingReceiveSpans()) {
return builder.newConsumerInstrumenter(new KafkaHeadersGetter());
} else {
builder.addSpanLinksExtractor(
SpanLinksExtractor.fromUpstreamRequest(
GlobalOpenTelemetry.getPropagators(), new KafkaHeadersGetter()));
return builder.newInstrumenter(SpanKindExtractor.alwaysConsumer());
}
return builder.newInstrumenter(SpanKindExtractor.alwaysConsumer());
}

public static Instrumenter<ConsumerRecord<?, ?>, Void> instrumenter() {
Expand Down
Loading

0 comments on commit 3ce9405

Please sign in to comment.