diff --git a/instrumentation/opentelemetry-instrumentation-confluent-kafka/tests/test_instrumentation.py b/instrumentation/opentelemetry-instrumentation-confluent-kafka/tests/test_instrumentation.py index 21d5bd6f83..b83ccf86b9 100644 --- a/instrumentation/opentelemetry-instrumentation-confluent-kafka/tests/test_instrumentation.py +++ b/instrumentation/opentelemetry-instrumentation-confluent-kafka/tests/test_instrumentation.py @@ -31,7 +31,7 @@ ) from opentelemetry.test.test_base import TestBase -from .utils import MockConsumer, MockedMessage +from .utils import MockConsumer, MockedMessage, MockedProducer class TestConfluentKafka(TestBase): @@ -246,3 +246,35 @@ def _compare_spans(self, spans, expected_spans): self.assertEqual( expected_attribute_value, span.attributes[attribute_key] ) + + def test_producer_poll(self) -> None: + instrumentation = ConfluentKafkaInstrumentor() + message_queue = [] + + producer = MockedProducer( + message_queue, + { + "bootstrap.servers": "localhost:29092", + }, + ) + + producer = instrumentation.instrument_producer(producer) + producer.produce(topic="topic-1", key="key-1", value="value-1") + msg = producer.poll() + self.assertIsNotNone(msg) + + def test_producer_flush(self) -> None: + instrumentation = ConfluentKafkaInstrumentor() + message_queue = [] + + producer = MockedProducer( + message_queue, + { + "bootstrap.servers": "localhost:29092", + }, + ) + + producer = instrumentation.instrument_producer(producer) + producer.produce(topic="topic-1", key="key-1", value="value-1") + msg = producer.flush() + self.assertIsNotNone(msg) \ No newline at end of file diff --git a/instrumentation/opentelemetry-instrumentation-confluent-kafka/tests/utils.py b/instrumentation/opentelemetry-instrumentation-confluent-kafka/tests/utils.py index 798daaeff4..0fc3bf696d 100644 --- a/instrumentation/opentelemetry-instrumentation-confluent-kafka/tests/utils.py +++ b/instrumentation/opentelemetry-instrumentation-confluent-kafka/tests/utils.py @@ -1,4 +1,4 @@ -from confluent_kafka import Consumer +from confluent_kafka import Consumer, Producer class MockConsumer(Consumer): @@ -20,7 +20,7 @@ def poll(self, timeout=None): class MockedMessage: - def __init__(self, topic: str, partition: int, offset: int, headers): + def __init__(self, topic: str, partition: int, offset: int, headers, key=None, value=None): self._topic = topic self._partition = partition self._offset = offset @@ -37,3 +37,35 @@ def offset(self): def headers(self): return self._headers + + def key(self): + return self._key + + def value(self): + return self._value + + +class MockedProducer(Producer): + def __init__(self, queue, config): + self._queue = queue + super().__init__(config) + + def produce( + self, *args, **kwargs + ): # pylint: disable=keyword-arg-before-vararg + self._queue.append( + MockedMessage( + topic=kwargs.get("topic"), + partition=0, + offset=0, + headers=[], + key=kwargs.get("key"), + value=kwargs.get("value") + ) + ) + + def poll(self, timeout=None): + return len(self._queue) + + def flush(self, timeout=None): + return len(self._queue)