From 3c3ef8affb604db118b5363f220b50c0406f1e22 Mon Sep 17 00:00:00 2001 From: Josh Schlenker Date: Wed, 26 Jun 2024 20:41:12 +0000 Subject: [PATCH 1/5] end consume span when consumer is closed --- .../confluent_kafka/__init__.py | 27 +++++++++++++++++++ 1 file changed, 27 insertions(+) diff --git a/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/__init__.py b/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/__init__.py index 30181d39c2..a6d976ce57 100644 --- a/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/__init__.py @@ -144,6 +144,10 @@ def consume( ): # pylint: disable=useless-super-delegation return super().consume(*args, **kwargs) + # This method is deliberately implemented in order to allow wrapt to wrap this function + def close(self): # pylint: disable=useless-super-delegation + return super().close() + class ProxiedProducer(Producer): def __init__(self, producer: Producer, tracer: Tracer): @@ -178,6 +182,12 @@ def __init__(self, consumer: Consumer, tracer: Tracer): self._current_consume_span = None self._current_context_token = None + def close(self): + return ConfluentKafkaInstrumentor.wrap_close( + self._consumer.close, + self + ) + def committed(self, partitions, timeout=-1): return self._consumer.committed(partitions, timeout) @@ -300,6 +310,11 @@ def _inner_wrap_consume(func, instance, args, kwargs): func, instance, self._tracer, args, kwargs ) + def _inner_wrap_close(func, instance): + return ConfluentKafkaInstrumentor.wrap_close( + func, instance + ) + wrapt.wrap_function_wrapper( AutoInstrumentedProducer, "produce", @@ -318,6 +333,12 @@ def _inner_wrap_consume(func, instance, args, kwargs): _inner_wrap_consume, ) + wrapt.wrap_function_wrapper( + AutoInstrumentedConsumer, + "close", + _inner_wrap_close, + ) + def _uninstrument(self, **kwargs): confluent_kafka.Producer = self._original_kafka_producer confluent_kafka.Consumer = self._original_kafka_consumer @@ -400,3 +421,9 @@ def wrap_consume(func, instance, tracer, args, kwargs): ) return records + + @staticmethod + def wrap_close(func, instance): + if instance._current_consume_span: + _end_current_consume_span(instance) + func() From 084dce16a29b7dfc55d6831139280dd493b1926e Mon Sep 17 00:00:00 2001 From: Josh Schlenker Date: Wed, 26 Jun 2024 20:41:26 +0000 Subject: [PATCH 2/5] add close unit test --- .../tests/test_instrumentation.py | 37 +++++++++++++++++++ 1 file changed, 37 insertions(+) diff --git a/instrumentation/opentelemetry-instrumentation-confluent-kafka/tests/test_instrumentation.py b/instrumentation/opentelemetry-instrumentation-confluent-kafka/tests/test_instrumentation.py index 205de27733..4afb79bef4 100644 --- a/instrumentation/opentelemetry-instrumentation-confluent-kafka/tests/test_instrumentation.py +++ b/instrumentation/opentelemetry-instrumentation-confluent-kafka/tests/test_instrumentation.py @@ -237,7 +237,44 @@ def test_consume(self) -> None: span_list = self.memory_exporter.get_finished_spans() self._compare_spans(span_list, expected_spans) + def test_close(self) -> None: + instrumentation = ConfluentKafkaInstrumentor() + mocked_messages = [ + MockedMessage("topic-a", 0, 0, []), + ] + expected_spans = [ + {"name": "recv", "attributes": {}}, + { + "name": "topic-a process", + "attributes": { + SpanAttributes.MESSAGING_OPERATION: "process", + SpanAttributes.MESSAGING_KAFKA_PARTITION: 0, + SpanAttributes.MESSAGING_SYSTEM: "kafka", + SpanAttributes.MESSAGING_DESTINATION: "topic-a", + SpanAttributes.MESSAGING_DESTINATION_KIND: MessagingDestinationKindValues.QUEUE.value, + SpanAttributes.MESSAGING_MESSAGE_ID: "topic-a.0.0", + }, + } + ] + + consumer = MockConsumer( + mocked_messages, + { + "bootstrap.servers": "localhost:29092", + "group.id": "mygroup", + "auto.offset.reset": "earliest", + }, + ) + self.memory_exporter.clear() + consumer = instrumentation.instrument_consumer(consumer) + consumer.poll() + consumer.close() + + span_list = self.memory_exporter.get_finished_spans() + self._compare_spans(span_list, expected_spans) + def _compare_spans(self, spans, expected_spans): + self.assertEqual(len(spans), len(expected_spans)) for span, expected_span in zip(spans, expected_spans): self.assertEqual(expected_span["name"], span.name) for attribute_key, expected_attribute_value in expected_span[ From cfe571b4cf09f9f1994d412940d0ca62ec5d4f37 Mon Sep 17 00:00:00 2001 From: Josh Schlenker Date: Wed, 26 Jun 2024 20:52:04 +0000 Subject: [PATCH 3/5] fix lint issues --- .../instrumentation/confluent_kafka/__init__.py | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/__init__.py b/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/__init__.py index a6d976ce57..931ff5edfb 100644 --- a/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/__init__.py @@ -184,8 +184,7 @@ def __init__(self, consumer: Consumer, tracer: Tracer): def close(self): return ConfluentKafkaInstrumentor.wrap_close( - self._consumer.close, - self + self._consumer.close, self ) def committed(self, partitions, timeout=-1): @@ -311,9 +310,7 @@ def _inner_wrap_consume(func, instance, args, kwargs): ) def _inner_wrap_close(func, instance): - return ConfluentKafkaInstrumentor.wrap_close( - func, instance - ) + return ConfluentKafkaInstrumentor.wrap_close(func, instance) wrapt.wrap_function_wrapper( AutoInstrumentedProducer, From 28484119504f57dbcb361bb12068b767528cbe84 Mon Sep 17 00:00:00 2001 From: Josh Schlenker Date: Wed, 26 Jun 2024 20:52:14 +0000 Subject: [PATCH 4/5] add unit tests for close --- .../tests/test_instrumentation.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/instrumentation/opentelemetry-instrumentation-confluent-kafka/tests/test_instrumentation.py b/instrumentation/opentelemetry-instrumentation-confluent-kafka/tests/test_instrumentation.py index 4afb79bef4..27653d6777 100644 --- a/instrumentation/opentelemetry-instrumentation-confluent-kafka/tests/test_instrumentation.py +++ b/instrumentation/opentelemetry-instrumentation-confluent-kafka/tests/test_instrumentation.py @@ -254,7 +254,7 @@ def test_close(self) -> None: SpanAttributes.MESSAGING_DESTINATION_KIND: MessagingDestinationKindValues.QUEUE.value, SpanAttributes.MESSAGING_MESSAGE_ID: "topic-a.0.0", }, - } + }, ] consumer = MockConsumer( From 052346cdeb57cc79f9a72677a2ba3f40c0ac9e92 Mon Sep 17 00:00:00 2001 From: Josh Schlenker Date: Wed, 26 Jun 2024 21:03:18 +0000 Subject: [PATCH 5/5] update changelog --- CHANGELOG.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 5ddb3f488b..38e5a89aa5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -36,6 +36,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ([#2590](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/2590)) - Reference symbols from generated semantic conventions ([#2611](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/2611)) +- `opentelemetry-instrumentation-confluent-kafka` Confluent Kafka: Ensure consume span is ended when consumer is closed + ([#2640](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/2640)) ## Version 1.25.0/0.46b0 (2024-05-31)