Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for Kafka consumer and producer interceptors. #4065

Merged
merged 8 commits into from
Oct 2, 2021
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,12 @@ dependencies {
compileOnly("com.google.auto.value:auto-value-annotations")
annotationProcessor("com.google.auto.value:auto-value")

implementation(project(":instrumentation:kafka-clients:kafka-clients-common:javaagent"))
implementation(project(":instrumentation:kafka-clients:kafka-clients-common:library"))

library("org.apache.kafka:kafka-clients:0.11.0.0")

testImplementation("org.testcontainers:kafka")
testImplementation(project(":instrumentation:kafka-clients:kafka-clients-0.11:testing"))
}

tasks {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

import io.opentelemetry.api.trace.SpanContext;
import io.opentelemetry.context.Context;
import io.opentelemetry.instrumentation.kafka.ReceivedRecords;
import io.opentelemetry.instrumentation.kafka.Timer;
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
import io.opentelemetry.javaagent.instrumentation.api.ContextStore;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@

import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import io.opentelemetry.instrumentation.kafka.KafkaPropagation;
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
import io.opentelemetry.javaagent.instrumentation.api.Java8BytecodeBridge;
import io.opentelemetry.javaagent.instrumentation.kafka.KafkaPropagation;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,84 +5,21 @@

package io.opentelemetry.javaagent.instrumentation.kafkaclients;

import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.instrumentation.api.config.ExperimentalConfig;
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
import io.opentelemetry.instrumentation.api.instrumenter.InstrumenterBuilder;
import io.opentelemetry.instrumentation.api.instrumenter.SpanKindExtractor;
import io.opentelemetry.instrumentation.api.instrumenter.SpanLinksExtractor;
import io.opentelemetry.instrumentation.api.instrumenter.SpanNameExtractor;
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessageOperation;
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingSpanNameExtractor;
import io.opentelemetry.javaagent.instrumentation.kafka.KafkaConsumerAdditionalAttributesExtractor;
import io.opentelemetry.javaagent.instrumentation.kafka.KafkaConsumerAttributesExtractor;
import io.opentelemetry.javaagent.instrumentation.kafka.KafkaConsumerExperimentalAttributesExtractor;
import io.opentelemetry.javaagent.instrumentation.kafka.KafkaHeadersGetter;
import io.opentelemetry.javaagent.instrumentation.kafka.KafkaPropagation;
import io.opentelemetry.instrumentation.kafka.KafkaInstrumenterBuilder;
import io.opentelemetry.instrumentation.kafka.ReceivedRecords;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerRecord;

public final class KafkaSingletons {
private static final String INSTRUMENTATION_NAME = "io.opentelemetry.kafka-clients-0.11";

private static final Instrumenter<ProducerRecord<?, ?>, Void> PRODUCER_INSTRUMENTER =
buildProducerInstrumenter();
KafkaInstrumenterBuilder.buildProducerInstrumenter(INSTRUMENTATION_NAME);
private static final Instrumenter<ReceivedRecords, Void> CONSUMER_RECEIVE_INSTRUMENTER =
buildConsumerReceiveInstrumenter();
KafkaInstrumenterBuilder.buildConsumerReceiveInstrumenter(INSTRUMENTATION_NAME);
private static final Instrumenter<ConsumerRecord<?, ?>, Void> CONSUMER_PROCESS_INSTRUMENTER =
buildConsumerProcessInstrumenter();

private static Instrumenter<ProducerRecord<?, ?>, Void> buildProducerInstrumenter() {
KafkaProducerAttributesExtractor attributesExtractor = new KafkaProducerAttributesExtractor();
SpanNameExtractor<ProducerRecord<?, ?>> spanNameExtractor =
MessagingSpanNameExtractor.create(attributesExtractor);

return Instrumenter.<ProducerRecord<?, ?>, Void>newBuilder(
GlobalOpenTelemetry.get(), INSTRUMENTATION_NAME, spanNameExtractor)
.addAttributesExtractor(attributesExtractor)
.addAttributesExtractor(new KafkaProducerAdditionalAttributesExtractor())
.newInstrumenter(SpanKindExtractor.alwaysProducer());
}

private static Instrumenter<ReceivedRecords, Void> buildConsumerReceiveInstrumenter() {
KafkaReceiveAttributesExtractor attributesExtractor = new KafkaReceiveAttributesExtractor();
SpanNameExtractor<ReceivedRecords> spanNameExtractor =
MessagingSpanNameExtractor.create(attributesExtractor);

return Instrumenter.<ReceivedRecords, Void>newBuilder(
GlobalOpenTelemetry.get(), INSTRUMENTATION_NAME, spanNameExtractor)
.addAttributesExtractor(attributesExtractor)
.setTimeExtractors(ReceivedRecords::startTime, (request, response, error) -> request.now())
.setDisabled(ExperimentalConfig.get().suppressMessagingReceiveSpans())
.newInstrumenter(SpanKindExtractor.alwaysConsumer());
}

private static Instrumenter<ConsumerRecord<?, ?>, Void> buildConsumerProcessInstrumenter() {
KafkaConsumerAttributesExtractor attributesExtractor =
new KafkaConsumerAttributesExtractor(MessageOperation.PROCESS);
SpanNameExtractor<ConsumerRecord<?, ?>> spanNameExtractor =
MessagingSpanNameExtractor.create(attributesExtractor);

InstrumenterBuilder<ConsumerRecord<?, ?>, Void> builder =
Instrumenter.<ConsumerRecord<?, ?>, Void>newBuilder(
GlobalOpenTelemetry.get(), INSTRUMENTATION_NAME, spanNameExtractor)
.addAttributesExtractor(attributesExtractor)
.addAttributesExtractor(new KafkaConsumerAdditionalAttributesExtractor());
if (KafkaConsumerExperimentalAttributesExtractor.isEnabled()) {
builder.addAttributesExtractor(new KafkaConsumerExperimentalAttributesExtractor());
}

if (!KafkaPropagation.isPropagationEnabled()) {
return builder.newInstrumenter(SpanKindExtractor.alwaysConsumer());
} else if (ExperimentalConfig.get().suppressMessagingReceiveSpans()) {
return builder.newConsumerInstrumenter(new KafkaHeadersGetter());
} else {
builder.addSpanLinksExtractor(
SpanLinksExtractor.fromUpstreamRequest(
GlobalOpenTelemetry.getPropagators(), new KafkaHeadersGetter()));
return builder.newInstrumenter(SpanKindExtractor.alwaysConsumer());
}
}
KafkaInstrumenterBuilder.buildConsumerProcessInstrumenter(INSTRUMENTATION_NAME);

public static Instrumenter<ProducerRecord<?, ?>, Void> producerInstrumenter() {
return PRODUCER_INSTRUMENTER;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
package io.opentelemetry.javaagent.instrumentation.kafkaclients;

import io.opentelemetry.api.trace.SpanContext;
import io.opentelemetry.javaagent.instrumentation.kafka.KafkaConsumerIterableWrapper;
import io.opentelemetry.instrumentation.kafka.KafkaConsumerIterableWrapper;
import java.util.Iterator;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.checkerframework.checker.nullness.qual.Nullable;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
import io.opentelemetry.api.trace.SpanContext;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import io.opentelemetry.javaagent.instrumentation.kafka.KafkaConsumerIteratorWrapper;
import io.opentelemetry.instrumentation.kafka.KafkaConsumerIteratorWrapper;
import java.util.Iterator;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.checkerframework.checker.nullness.qual.Nullable;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.instrumentation.kafkaclients

import io.opentelemetry.sdk.trace.data.SpanData
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.TopicPartition

import java.time.Duration
import java.util.concurrent.TimeUnit
Expand All @@ -15,7 +16,7 @@ import static io.opentelemetry.api.trace.SpanKind.CONSUMER
import static io.opentelemetry.api.trace.SpanKind.INTERNAL
import static io.opentelemetry.api.trace.SpanKind.PRODUCER

class KafkaClientDefaultTest extends KafkaClientBaseTest {
class KafkaClientDefaultTest extends KafkaClientPropagationBaseTest {

def "test kafka produce and consume"() {
when:
Expand Down Expand Up @@ -190,7 +191,7 @@ class KafkaClientDefaultTest extends KafkaClientBaseTest {
when: "receive messages"
awaitUntilConsumerIsReady()
def consumerRecords = consumer.poll(Duration.ofSeconds(5).toMillis())
def recordsInPartition = consumerRecords.records(new TopicPartition(SHARED_TOPIC, partition))
def recordsInPartition = consumerRecords.records(topicPartition)
recordsInPartition.size() == 1

// iterate over records to generate spans
Expand Down Expand Up @@ -251,4 +252,4 @@ class KafkaClientDefaultTest extends KafkaClientBaseTest {
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.instrumentation.kafkaclients

import io.opentelemetry.semconv.trace.attributes.SemanticAttributes
import org.apache.kafka.clients.producer.ProducerRecord

Expand All @@ -11,7 +13,7 @@ import java.time.Duration
import static io.opentelemetry.api.trace.SpanKind.CONSUMER
import static io.opentelemetry.api.trace.SpanKind.PRODUCER

class KafkaClientPropagationDisabledTest extends KafkaClientBaseTest {
class KafkaClientPropagationDisabledTest extends KafkaClientPropagationBaseTest {

def "should not read remote context when consuming messages if propagation is disabled"() {
when: "send message"
Expand Down Expand Up @@ -93,4 +95,4 @@ class KafkaClientPropagationDisabledTest extends KafkaClientBaseTest {
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.instrumentation.kafkaclients

import io.opentelemetry.semconv.trace.attributes.SemanticAttributes
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.TopicPartition
Expand All @@ -13,7 +15,7 @@ import static io.opentelemetry.api.trace.SpanKind.CONSUMER
import static io.opentelemetry.api.trace.SpanKind.INTERNAL
import static io.opentelemetry.api.trace.SpanKind.PRODUCER

class KafkaClientSuppressReceiveSpansTest extends KafkaClientBaseTest {
class KafkaClientSuppressReceiveSpansTest extends KafkaClientPropagationBaseTest {

def "test kafka produce and consume"() {
when:
Expand All @@ -29,6 +31,7 @@ class KafkaClientSuppressReceiveSpansTest extends KafkaClientBaseTest {
}

then:
awaitUntilConsumerIsReady()
// check that the message was received
def records = consumer.poll(Duration.ofSeconds(5).toMillis())
for (record in records) {
Expand Down Expand Up @@ -88,6 +91,7 @@ class KafkaClientSuppressReceiveSpansTest extends KafkaClientBaseTest {
producer.send(new ProducerRecord<>(SHARED_TOPIC, null))

then:
awaitUntilConsumerIsReady()
// check that the message was received
def records = consumer.poll(Duration.ofSeconds(5).toMillis())
for (record in records) {
Expand Down Expand Up @@ -138,6 +142,7 @@ class KafkaClientSuppressReceiveSpansTest extends KafkaClientBaseTest {

then: "wait for PRODUCER span"
waitForTraces(1)
awaitUntilConsumerIsReady()

when: "receive messages"
def consumerRecords = consumer.poll(Duration.ofSeconds(5).toMillis())
Expand Down Expand Up @@ -179,4 +184,4 @@ class KafkaClientSuppressReceiveSpansTest extends KafkaClientBaseTest {
}
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
plugins {
id("otel.library-instrumentation")
}

val versions: Map<String, String> by project

dependencies {
implementation(project(":instrumentation:kafka-clients:kafka-clients-common:library"))
library("org.apache.kafka:kafka-clients:2.8.1")

testImplementation(project(":instrumentation:kafka-clients:kafka-clients-0.11:testing"))
alesj marked this conversation as resolved.
Show resolved Hide resolved

testImplementation("org.testcontainers:kafka:${versions["org.testcontainers"]}")
}

tasks {
withType<Test>().configureEach {
usesService(gradle.sharedServices.registrations["testcontainersBuildService"].service)

// TODO run tests both with and without experimental span attributes
jvmArgs("-Dotel.instrumentation.kafka.experimental-span-attributes=true")
}
}
Loading