Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Experimental option to suppress messaging receive spans #4187

Merged
merged 7 commits into from
Sep 23, 2021
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,9 @@ public boolean suppressControllerSpans() {
public boolean suppressViewSpans() {
return config.getBoolean("otel.instrumentation.common.experimental.suppress-view-spans", false);
}

public boolean suppressMessagingReceiveSpans() {
return config.getBoolean(
"otel.instrumentation.common.experimental.suppress-messaging-receive-spans", false);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,21 @@ tasks {
jvmArgs("-Dotel.instrumentation.kafka.client-propagation.enabled=false")
}

named<Test>("test") {
val testReceiveSpansDisabled by registering(Test::class) {
filter {
includeTestsMatching("KafkaClientSuppressReceiveSpansTest")
isFailOnNoMatchingTests = false
}
include("**/KafkaClientSuppressReceiveSpansTest.*")
jvmArgs("-Dotel.instrumentation.common.experimental.suppress-messaging-receive-spans=true")
}

test {
dependsOn(testPropagationDisabled)
dependsOn(testReceiveSpansDisabled)
filter {
excludeTestsMatching("KafkaClientPropagationDisabledTest")
excludeTestsMatching("KafkaClientSuppressReceiveSpansTest")
isFailOnNoMatchingTests = false
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package io.opentelemetry.javaagent.instrumentation.kafkaclients;

import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.instrumentation.api.config.ExperimentalConfig;
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
import io.opentelemetry.instrumentation.api.instrumenter.InstrumenterBuilder;
import io.opentelemetry.instrumentation.api.instrumenter.SpanKindExtractor;
Expand Down Expand Up @@ -52,6 +53,7 @@ private static Instrumenter<ReceivedRecords, Void> buildConsumerReceiveInstrumen
GlobalOpenTelemetry.get(), INSTRUMENTATION_NAME, spanNameExtractor)
.addAttributesExtractor(attributesExtractor)
.setTimeExtractors(ReceivedRecords::startTime, (request, response, error) -> request.now())
.setDisabled(ExperimentalConfig.get().suppressMessagingReceiveSpans())
trask marked this conversation as resolved.
Show resolved Hide resolved
.newInstrumenter(SpanKindExtractor.alwaysConsumer());
}

Expand All @@ -69,12 +71,19 @@ private static Instrumenter<ReceivedRecords, Void> buildConsumerReceiveInstrumen
if (KafkaConsumerExperimentalAttributesExtractor.isEnabled()) {
builder.addAttributesExtractor(new KafkaConsumerExperimentalAttributesExtractor());
}

if (KafkaPropagation.isPropagationEnabled()) {
trask marked this conversation as resolved.
Show resolved Hide resolved
builder.addSpanLinksExtractor(
SpanLinksExtractor.fromUpstreamRequest(
GlobalOpenTelemetry.getPropagators(), new KafkaHeadersGetter()));
if (ExperimentalConfig.get().suppressMessagingReceiveSpans()) {
return builder.newConsumerInstrumenter(new KafkaHeadersGetter());
} else {
builder.addSpanLinksExtractor(
SpanLinksExtractor.fromUpstreamRequest(
GlobalOpenTelemetry.getPropagators(), new KafkaHeadersGetter()));
return builder.newInstrumenter(SpanKindExtractor.alwaysConsumer());
}
} else {
return builder.newInstrumenter(SpanKindExtractor.alwaysConsumer());
}
return builder.newInstrumenter(SpanKindExtractor.alwaysConsumer());
}

public static Instrumenter<ProducerRecord<?, ?>, Void> producerInstrumenter() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import static io.opentelemetry.api.trace.SpanKind.CONSUMER
import static io.opentelemetry.api.trace.SpanKind.INTERNAL
import static io.opentelemetry.api.trace.SpanKind.PRODUCER

class KafkaClientPropagationEnabledTest extends KafkaClientBaseTest {
class KafkaClientDefaultTest extends KafkaClientBaseTest {

def "test kafka produce and consume"() {
when:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

import io.opentelemetry.semconv.trace.attributes.SemanticAttributes
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.TopicPartition

import java.time.Duration

import static io.opentelemetry.api.trace.SpanKind.CONSUMER
import static io.opentelemetry.api.trace.SpanKind.INTERNAL
import static io.opentelemetry.api.trace.SpanKind.PRODUCER

class KafkaClientSuppressReceiveSpansTest extends KafkaClientBaseTest {

def "test kafka produce and consume"() {
when:
String greeting = "Hello Kafka!"
runWithSpan("parent") {
producer.send(new ProducerRecord(SHARED_TOPIC, greeting)) { meta, ex ->
if (ex == null) {
runWithSpan("producer callback") {}
} else {
runWithSpan("producer exception: " + ex) {}
}
}
}

then:
// check that the message was received
def records = consumer.poll(Duration.ofSeconds(5).toMillis())
for (record in records) {
runWithSpan("processing") {
assert record.value() == greeting
assert record.key() == null
}
}

assertTraces(1) {
trace(0, 5) {
// sort spans with a ranking function
spans.sort({
// job span is first
if (it.name == "BatchJob parallelItemsJob") {
return 0
}
// step span is second
if (it.name == "BatchJob parallelItemsJob.parallelItemsStep") {
return 1
}
})
trask marked this conversation as resolved.
Show resolved Hide resolved
span(0) {
name "parent"
kind INTERNAL
hasNoParent()
}
span(1) {
name SHARED_TOPIC + " send"
kind PRODUCER
childOf span(0)
attributes {
"${SemanticAttributes.MESSAGING_SYSTEM.key}" "kafka"
"${SemanticAttributes.MESSAGING_DESTINATION.key}" SHARED_TOPIC
"${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "topic"
}
}
span(2) {
name SHARED_TOPIC + " process"
kind CONSUMER
childOf span(1)
attributes {
"${SemanticAttributes.MESSAGING_SYSTEM.key}" "kafka"
"${SemanticAttributes.MESSAGING_DESTINATION.key}" SHARED_TOPIC
"${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "topic"
"${SemanticAttributes.MESSAGING_OPERATION.key}" "process"
"${SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES.key}" Long
"${SemanticAttributes.MESSAGING_KAFKA_PARTITION.key}" { it >= 0 }
"kafka.offset" Long
"kafka.record.queue_time_ms" { it >= 0 }
}
}
span(3) {
name "processing"
childOf span(2)
}
span(4) {
name "producer callback"
kind INTERNAL
childOf span(0)
}
}
}
}

def "test pass through tombstone"() {
when:
producer.send(new ProducerRecord<>(SHARED_TOPIC, null))

then:
// check that the message was received
def records = consumer.poll(Duration.ofSeconds(5).toMillis())
for (record in records) {
assert record.value() == null
assert record.key() == null
}

assertTraces(1) {
trace(0, 2) {
span(0) {
name SHARED_TOPIC + " send"
kind PRODUCER
hasNoParent()
attributes {
"${SemanticAttributes.MESSAGING_SYSTEM.key}" "kafka"
"${SemanticAttributes.MESSAGING_DESTINATION.key}" SHARED_TOPIC
"${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "topic"
"${SemanticAttributes.MESSAGING_KAFKA_TOMBSTONE.key}" true
}
}
span(1) {
name SHARED_TOPIC + " process"
kind CONSUMER
childOf span(0)
attributes {
"${SemanticAttributes.MESSAGING_SYSTEM.key}" "kafka"
"${SemanticAttributes.MESSAGING_DESTINATION.key}" SHARED_TOPIC
"${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "topic"
"${SemanticAttributes.MESSAGING_OPERATION.key}" "process"
"${SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES.key}" Long
"${SemanticAttributes.MESSAGING_KAFKA_PARTITION.key}" { it >= 0 }
"${SemanticAttributes.MESSAGING_KAFKA_TOMBSTONE.key}" true
"kafka.offset" Long
"kafka.record.queue_time_ms" { it >= 0 }
}
}
}
}
}

def "test records(TopicPartition) kafka consume"() {
setup:
def partition = 0

when: "send message"
def greeting = "Hello from MockConsumer!"
producer.send(new ProducerRecord<>(SHARED_TOPIC, partition, null, greeting))

then: "wait for PRODUCER span"
waitForTraces(1)

when: "receive messages"
def consumerRecords = consumer.poll(Duration.ofSeconds(5).toMillis())
def recordsInPartition = consumerRecords.records(new TopicPartition(SHARED_TOPIC, partition))
for (record in recordsInPartition) {
assert record.value() == greeting
assert record.key() == null
}

then:
assertTraces(1) {
trace(0, 2) {
span(0) {
name SHARED_TOPIC + " send"
kind PRODUCER
hasNoParent()
attributes {
"${SemanticAttributes.MESSAGING_SYSTEM.key}" "kafka"
"${SemanticAttributes.MESSAGING_DESTINATION.key}" SHARED_TOPIC
"${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "topic"
"${SemanticAttributes.MESSAGING_KAFKA_PARTITION.key}" { it >= 0 }
}
}
span(1) {
name SHARED_TOPIC + " process"
kind CONSUMER
childOf span(0)
attributes {
"${SemanticAttributes.MESSAGING_SYSTEM.key}" "kafka"
"${SemanticAttributes.MESSAGING_DESTINATION.key}" SHARED_TOPIC
"${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "topic"
"${SemanticAttributes.MESSAGING_OPERATION.key}" "process"
"${SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES.key}" Long
"${SemanticAttributes.MESSAGING_KAFKA_PARTITION.key}" { it >= 0 }
"kafka.offset" Long
"kafka.record.queue_time_ms" { it >= 0 }
}
}
}
}
}
}
19 changes: 18 additions & 1 deletion instrumentation/kafka-streams-0.11/javaagent/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,27 @@ dependencies {
}

tasks {
test {
withType<Test>().configureEach {
usesService(gradle.sharedServices.registrations["testcontainersBuildService"].service)

// TODO run tests both with and without experimental span attributes
jvmArgs("-Dotel.instrumentation.kafka.experimental-span-attributes=true")
}

val testReceiveSpansDisabled by registering(Test::class) {
filter {
includeTestsMatching("KafkaStreamsSuppressReceiveSpansTest")
isFailOnNoMatchingTests = false
}
include("**/KafkaStreamsSuppressReceiveSpansTest.*")
jvmArgs("-Dotel.instrumentation.common.experimental.suppress-messaging-receive-spans=true")
}

test {
dependsOn(testReceiveSpansDisabled)
filter {
excludeTestsMatching("KafkaStreamsSuppressReceiveSpansTest")
isFailOnNoMatchingTests = false
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package io.opentelemetry.javaagent.instrumentation.kafkastreams;

import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.instrumentation.api.config.ExperimentalConfig;
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
import io.opentelemetry.instrumentation.api.instrumenter.InstrumenterBuilder;
import io.opentelemetry.instrumentation.api.instrumenter.SpanKindExtractor;
Expand Down Expand Up @@ -40,12 +41,19 @@ public final class KafkaStreamsSingletons {
if (KafkaConsumerExperimentalAttributesExtractor.isEnabled()) {
builder.addAttributesExtractor(new KafkaConsumerExperimentalAttributesExtractor());
}

if (KafkaPropagation.isPropagationEnabled()) {
builder.addSpanLinksExtractor(
SpanLinksExtractor.fromUpstreamRequest(
GlobalOpenTelemetry.getPropagators(), new KafkaHeadersGetter()));
if (ExperimentalConfig.get().suppressMessagingReceiveSpans()) {
return builder.newConsumerInstrumenter(new KafkaHeadersGetter());
} else {
builder.addSpanLinksExtractor(
SpanLinksExtractor.fromUpstreamRequest(
GlobalOpenTelemetry.getPropagators(), new KafkaHeadersGetter()));
return builder.newInstrumenter(SpanKindExtractor.alwaysConsumer());
}
} else {
return builder.newInstrumenter(SpanKindExtractor.alwaysConsumer());
}
return builder.newInstrumenter(SpanKindExtractor.alwaysConsumer());
}

public static Instrumenter<ConsumerRecord<?, ?>, Void> instrumenter() {
Expand Down
Loading