Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Confluent Kafka: Ensure consume span is ended when consumer is closed #2640

Merged
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
([#2590](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/2590))
- Reference symbols from generated semantic conventions
([#2611](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/2611))
- `opentelemetry-instrumentation-confluent-kafka` Confluent Kafka: Ensure consume span is ended when consumer is closed
([#2640](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/2640))

## Version 1.25.0/0.46b0 (2024-05-31)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,10 @@ def consume(
): # pylint: disable=useless-super-delegation
return super().consume(*args, **kwargs)

# This method is deliberately implemented in order to allow wrapt to wrap this function
def close(self): # pylint: disable=useless-super-delegation
return super().close()


class ProxiedProducer(Producer):
def __init__(self, producer: Producer, tracer: Tracer):
Expand Down Expand Up @@ -181,6 +185,11 @@ def __init__(self, consumer: Consumer, tracer: Tracer):
self._current_consume_span = None
self._current_context_token = None

def close(self):
return ConfluentKafkaInstrumentor.wrap_close(
self._consumer.close, self
)

def committed(self, partitions, timeout=-1):
return self._consumer.committed(partitions, timeout)

Expand Down Expand Up @@ -303,6 +312,9 @@ def _inner_wrap_consume(func, instance, args, kwargs):
func, instance, self._tracer, args, kwargs
)

def _inner_wrap_close(func, instance):
return ConfluentKafkaInstrumentor.wrap_close(func, instance)

wrapt.wrap_function_wrapper(
AutoInstrumentedProducer,
"produce",
Expand All @@ -321,6 +333,12 @@ def _inner_wrap_consume(func, instance, args, kwargs):
_inner_wrap_consume,
)

wrapt.wrap_function_wrapper(
AutoInstrumentedConsumer,
"close",
_inner_wrap_close,
)

def _uninstrument(self, **kwargs):
confluent_kafka.Producer = self._original_kafka_producer
confluent_kafka.Consumer = self._original_kafka_consumer
Expand Down Expand Up @@ -403,3 +421,9 @@ def wrap_consume(func, instance, tracer, args, kwargs):
)

return records

@staticmethod
def wrap_close(func, instance):
if instance._current_consume_span:
_end_current_consume_span(instance)
func()
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,44 @@ def test_consume(self) -> None:
span_list = self.memory_exporter.get_finished_spans()
self._compare_spans(span_list, expected_spans)

def test_close(self) -> None:
instrumentation = ConfluentKafkaInstrumentor()
mocked_messages = [
MockedMessage("topic-a", 0, 0, []),
]
expected_spans = [
{"name": "recv", "attributes": {}},
{
"name": "topic-a process",
"attributes": {
SpanAttributes.MESSAGING_OPERATION: "process",
SpanAttributes.MESSAGING_KAFKA_PARTITION: 0,
SpanAttributes.MESSAGING_SYSTEM: "kafka",
SpanAttributes.MESSAGING_DESTINATION: "topic-a",
SpanAttributes.MESSAGING_DESTINATION_KIND: MessagingDestinationKindValues.QUEUE.value,
SpanAttributes.MESSAGING_MESSAGE_ID: "topic-a.0.0",
},
},
]

consumer = MockConsumer(
mocked_messages,
{
"bootstrap.servers": "localhost:29092",
"group.id": "mygroup",
"auto.offset.reset": "earliest",
},
)
self.memory_exporter.clear()
consumer = instrumentation.instrument_consumer(consumer)
consumer.poll()
consumer.close()

lzchen marked this conversation as resolved.
Show resolved Hide resolved
span_list = self.memory_exporter.get_finished_spans()
self._compare_spans(span_list, expected_spans)

def _compare_spans(self, spans, expected_spans):
self.assertEqual(len(spans), len(expected_spans))
for span, expected_span in zip(spans, expected_spans):
self.assertEqual(expected_span["name"], span.name)
for attribute_key, expected_attribute_value in expected_span[
Expand Down