From a8b9829f7613e96f4994b48343f4675727fca387 Mon Sep 17 00:00:00 2001 From: oxeye-dorkolog <89912797+oxeye-dorkolog@users.noreply.github.com> Date: Fri, 3 Jun 2022 16:26:40 +0300 Subject: [PATCH] Support confluent kafka (#1111) * add kafka instrumentation * add confluent kafka instrumentation * fix tests * change documentation * lint fix * fix lint Co-authored-by: Nikolay Sokolik <81902191+oxeye-nikolay@users.noreply.github.com> --- .github/component_owners.yml | 4 + CHANGELOG.md | 5 + instrumentation/README.md | 1 + .../README.rst | 23 ++ .../setup.cfg | 57 +++ .../setup.py | 99 +++++ .../confluent_kafka/__init__.py | 360 ++++++++++++++++++ .../confluent_kafka/package.py | 16 + .../instrumentation/confluent_kafka/utils.py | 109 ++++++ .../confluent_kafka/version.py | 15 + .../tests/__init__.py | 0 .../tests/test_instrumentation.py | 60 +++ .../setup.cfg | 1 + .../instrumentation/bootstrap_gen.py | 4 + 14 files changed, 754 insertions(+) create mode 100644 instrumentation/opentelemetry-instrumentation-confluent-kafka/README.rst create mode 100644 instrumentation/opentelemetry-instrumentation-confluent-kafka/setup.cfg create mode 100644 instrumentation/opentelemetry-instrumentation-confluent-kafka/setup.py create mode 100644 instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/__init__.py create mode 100644 instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/package.py create mode 100644 instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/utils.py create mode 100644 instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/version.py create mode 100644 instrumentation/opentelemetry-instrumentation-confluent-kafka/tests/__init__.py create mode 100644 instrumentation/opentelemetry-instrumentation-confluent-kafka/tests/test_instrumentation.py diff --git a/.github/component_owners.yml b/.github/component_owners.yml index 44f9ce2367..81a0f9da79 100644 --- a/.github/component_owners.yml +++ b/.github/component_owners.yml @@ -15,6 +15,10 @@ components: - ben-natan - machine424 + instrumentation/opentelemetry-instrumentation-confluent-kafka: + - oxeye-dorkolog + - dorkolog + propagator/opentelemetry-propagator-aws-xray: - NathanielRN diff --git a/CHANGELOG.md b/CHANGELOG.md index e4d5c7b007..74105aa672 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -17,6 +17,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - `opentelemetry-instrumentation-remoulade` Initial release ([#1082](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1082)) +### Added +- Added `opentelemetry-instrumention-confluent-kafka` + ([#1111](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1111)) + + ## [1.12.0rc1-0.31b0](https://github.com/open-telemetry/opentelemetry-python/releases/tag/v1.12.0rc1-0.31b0) - 2022-05-17 ### Fixed diff --git a/instrumentation/README.md b/instrumentation/README.md index 08e25c89c3..ffb2c8bad7 100644 --- a/instrumentation/README.md +++ b/instrumentation/README.md @@ -10,6 +10,7 @@ | [opentelemetry-instrumentation-boto3sqs](./opentelemetry-instrumentation-boto3sqs) | boto3 ~= 1.0 | | [opentelemetry-instrumentation-botocore](./opentelemetry-instrumentation-botocore) | botocore ~= 1.0 | | [opentelemetry-instrumentation-celery](./opentelemetry-instrumentation-celery) | celery >= 4.0, < 6.0 | +| [opentelemetry-instrumentation-confluent-kafka](./opentelemetry-instrumentation-confluent-kafka) | confluent-kafka ~= 1.8.2 | | [opentelemetry-instrumentation-dbapi](./opentelemetry-instrumentation-dbapi) | dbapi | | [opentelemetry-instrumentation-django](./opentelemetry-instrumentation-django) | django >= 1.10 | | [opentelemetry-instrumentation-elasticsearch](./opentelemetry-instrumentation-elasticsearch) | elasticsearch >= 2.0 | diff --git a/instrumentation/opentelemetry-instrumentation-confluent-kafka/README.rst b/instrumentation/opentelemetry-instrumentation-confluent-kafka/README.rst new file mode 100644 index 0000000000..163c2a4393 --- /dev/null +++ b/instrumentation/opentelemetry-instrumentation-confluent-kafka/README.rst @@ -0,0 +1,23 @@ +OpenTelemetry confluent-kafka Instrumentation +============================================= + +|pypi| + +.. |pypi| image:: https://badge.fury.io/py/opentelemetry-instrumentation-confluent-kafka.svg + :target: https://pypi.org/project/opentelemetry-instrumentation-confluent-kafka/ + +This library allows tracing requests made by the confluent-kafka library. + +Installation +------------ + +:: + + pip install opentelemetry-instrumentation-confluent-kafka + + +References +---------- + +* `OpenTelemetry confluent-kafka/ Tracing `_ +* `OpenTelemetry Project `_ diff --git a/instrumentation/opentelemetry-instrumentation-confluent-kafka/setup.cfg b/instrumentation/opentelemetry-instrumentation-confluent-kafka/setup.cfg new file mode 100644 index 0000000000..302127afa6 --- /dev/null +++ b/instrumentation/opentelemetry-instrumentation-confluent-kafka/setup.cfg @@ -0,0 +1,57 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +[metadata] +name = opentelemetry-instrumentation-confluent-kafka +description = OpenTelemetry Confluent Kafka instrumentation +long_description = file: README.rst +long_description_content_type = text/x-rst +author = OpenTelemetry Authors +author_email = cncf-opentelemetry-contributors@lists.cncf.io +url = https://github.com/open-telemetry/opentelemetry-python-contrib/tree/main/instrumentation/opentelemetry-instrumentation-confluent-kafka +platforms = any +license = Apache-2.0 +classifiers = + Development Status :: 4 - Beta + Intended Audience :: Developers + License :: OSI Approved :: Apache Software License + Programming Language :: Python + Programming Language :: Python :: 3 + Programming Language :: Python :: 3.6 + Programming Language :: Python :: 3.7 + Programming Language :: Python :: 3.8 + Programming Language :: Python :: 3.9 + Programming Language :: Python :: 3.10 + +[options] +python_requires = >=3.6 +package_dir= + =src +packages=find_namespace: + +install_requires = + opentelemetry-api ~= 1.3 + wrapt >= 1.0.0, < 2.0.0 + +[options.extras_require] +test = + # add any test dependencies here + confluent-kafka ~= 1.8.2 + +[options.packages.find] +where = src + +[options.entry_points] +opentelemetry_instrumentor = + confluent_kafka = opentelemetry.instrumentation.confluent_kafka:ConfluentKafkaInstrumentor diff --git a/instrumentation/opentelemetry-instrumentation-confluent-kafka/setup.py b/instrumentation/opentelemetry-instrumentation-confluent-kafka/setup.py new file mode 100644 index 0000000000..03f616369b --- /dev/null +++ b/instrumentation/opentelemetry-instrumentation-confluent-kafka/setup.py @@ -0,0 +1,99 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +# DO NOT EDIT. THIS FILE WAS AUTOGENERATED FROM templates/instrumentation_setup.py.txt. +# RUN `python scripts/generate_setup.py` TO REGENERATE. + + +import distutils.cmd +import json +import os +from configparser import ConfigParser + +import setuptools + +config = ConfigParser() +config.read("setup.cfg") + +# We provide extras_require parameter to setuptools.setup later which +# overwrites the extras_require section from setup.cfg. To support extras_require +# section in setup.cfg, we load it here and merge it with the extras_require param. +extras_require = {} +if "options.extras_require" in config: + for key, value in config["options.extras_require"].items(): + extras_require[key] = [v for v in value.split("\n") if v.strip()] + +BASE_DIR = os.path.dirname(__file__) +PACKAGE_INFO = {} + +VERSION_FILENAME = os.path.join( + BASE_DIR, + "src", + "opentelemetry", + "instrumentation", + "confluent_kafka", + "version.py", +) +with open(VERSION_FILENAME, encoding="utf-8") as f: + exec(f.read(), PACKAGE_INFO) + +PACKAGE_FILENAME = os.path.join( + BASE_DIR, + "src", + "opentelemetry", + "instrumentation", + "confluent_kafka", + "package.py", +) +with open(PACKAGE_FILENAME, encoding="utf-8") as f: + exec(f.read(), PACKAGE_INFO) + +# Mark any instruments/runtime dependencies as test dependencies as well. +extras_require["instruments"] = PACKAGE_INFO["_instruments"] +test_deps = extras_require.get("test", []) +for dep in extras_require["instruments"]: + test_deps.append(dep) + +extras_require["test"] = test_deps + + +class JSONMetadataCommand(distutils.cmd.Command): + + description = ( + "print out package metadata as JSON. This is used by OpenTelemetry dev scripts to ", + "auto-generate code in other places", + ) + user_options = [] + + def initialize_options(self): + pass + + def finalize_options(self): + pass + + def run(self): + metadata = { + "name": config["metadata"]["name"], + "version": PACKAGE_INFO["__version__"], + "instruments": PACKAGE_INFO["_instruments"], + } + print(json.dumps(metadata)) + + +setuptools.setup( + cmdclass={"meta": JSONMetadataCommand}, + version=PACKAGE_INFO["__version__"], + extras_require=extras_require, +) diff --git a/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/__init__.py b/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/__init__.py new file mode 100644 index 0000000000..ed4ee930b3 --- /dev/null +++ b/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/__init__.py @@ -0,0 +1,360 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Instrument `confluent-kafka-python` to report instrumentation-confluent-kafka produced and consumed messages + +Usage +----- + +..code:: python + + from opentelemetry.instrumentation.confluentkafka import ConfluentKafkaInstrumentor + from confluent_kafka import Producer, Consumer + + # Instrument kafka + ConfluentKafkaInstrumentor().instrument() + + # report a span of type producer with the default settings + conf1 = {'bootstrap.servers': "localhost:9092"} + producer = Producer(conf1) + producer.produce('my-topic',b'raw_bytes') + + conf2 = {'bootstrap.servers': "localhost:9092", + 'group.id': "foo", + 'auto.offset.reset': 'smallest'} + # report a span of type consumer with the default settings + consumer = Consumer(conf2) + def basic_consume_loop(consumer, topics): + try: + consumer.subscribe(topics) + running = True + while running: + msg = consumer.poll(timeout=1.0) + if msg is None: continue + + if msg.error(): + if msg.error().code() == KafkaError._PARTITION_EOF: + # End of partition event + sys.stderr.write(f"{msg.topic()} [{msg.partition()}] reached end at offset {msg.offset()}}\n") + elif msg.error(): + raise KafkaException(msg.error()) + else: + msg_process(msg) + finally: + # Close down consumer to commit final offsets. + consumer.close() + + basic_consume_loop(consumer, "my-topic") + + +The `_instrument` method accepts the following keyword args: +tracer_provider (TracerProvider) - an optional tracer provider +instrument_producer (Callable) - a function with extra user-defined logic to be performed before sending the message + this function signature is: + def instrument_producer(producer: Producer, tracer_provider=None) +instrument_consumer (Callable) - a function with extra user-defined logic to be performed after consuming a message + this function signature is: + def instrument_consumer(consumer: Consumer, tracer_provider=None) +for example: +.. code: python + from opentelemetry.instrumentation.confluentkafka import ConfluentKafkaInstrumentor + from confluent_kafka import Producer, Consumer + + inst = ConfluentKafkaInstrumentor() + + p = confluent_kafka.Producer({'bootstrap.servers': 'localhost:29092'}) + c = confluent_kafka.Consumer({ + 'bootstrap.servers': 'localhost:29092', + 'group.id': 'mygroup', + 'auto.offset.reset': 'earliest' + }) + + # instrument confluent kafka with produce and consume hooks + p = inst.instrument_producer(p, tracer_provider) + c = inst.instrument_consumer(c, tracer_provider=tracer_provider) + + + # Using kafka as normal now will automatically generate spans, + # including user custom attributes added from the hooks + conf = {'bootstrap.servers': "localhost:9092"} + p.produce('my-topic',b'raw_bytes') + msg = c.poll() + + +API +___ +""" +from typing import Collection + +import confluent_kafka +import wrapt +from confluent_kafka import Consumer, Producer + +from opentelemetry import context, propagate, trace +from opentelemetry.instrumentation.instrumentor import BaseInstrumentor +from opentelemetry.instrumentation.utils import unwrap +from opentelemetry.semconv.trace import MessagingOperationValues +from opentelemetry.trace import Link, SpanKind, Tracer + +from .package import _instruments +from .utils import ( + KafkaPropertiesExtractor, + _enrich_span, + _get_span_name, + _kafka_getter, + _kafka_setter, +) +from .version import __version__ + + +class AutoInstrumentedProducer(Producer): + + # This method is deliberately implemented in order to allow wrapt to wrap this function + def produce( + self, topic, value=None, *args, **kwargs + ): # pylint: disable=keyword-arg-before-vararg,useless-super-delegation + super().produce(topic, value, *args, **kwargs) + + +class AutoInstrumentedConsumer(Consumer): + def __init__(self, config): + super().__init__(config) + self._current_consume_span = None + + # This method is deliberately implemented in order to allow wrapt to wrap this function + def poll(self, timeout=-1): # pylint: disable=useless-super-delegation + return super().poll(timeout) + + +class ProxiedProducer(Producer): + def __init__(self, producer: Producer, tracer: Tracer): + self._producer = producer + self._tracer = tracer + + def flush(self, timeout=-1): + self._producer.flush(timeout) + + def poll(self, timeout=-1): + self._producer.poll(timeout) + + def produce( + self, topic, value=None, *args, **kwargs + ): # pylint: disable=keyword-arg-before-vararg + new_kwargs = kwargs.copy() + new_kwargs["topic"] = topic + new_kwargs["value"] = value + + return ConfluentKafkaInstrumentor.wrap_produce( + self._producer.produce, self, self._tracer, args, new_kwargs + ) + + def original_producer(self): + return self._producer + + +class ProxiedConsumer(Consumer): + def __init__(self, consumer: Consumer, tracer: Tracer): + self._consumer = consumer + self._tracer = tracer + self._current_consume_span = None + self._current_context_token = None + + def committed(self, partitions, timeout=-1): + return self._consumer.committed(partitions, timeout) + + def consume( + self, num_messages=1, *args, **kwargs + ): # pylint: disable=keyword-arg-before-vararg + return self._consumer.consume(num_messages, *args, **kwargs) + + def get_watermark_offsets( + self, partition, timeout=-1, *args, **kwargs + ): # pylint: disable=keyword-arg-before-vararg + return self._consumer.get_watermark_offsets( + partition, timeout, *args, **kwargs + ) + + def offsets_for_times(self, partitions, timeout=-1): + return self._consumer.offsets_for_times(partitions, timeout) + + def poll(self, timeout=-1): + return ConfluentKafkaInstrumentor.wrap_poll( + self._consumer.poll, self, self._tracer, [timeout], {} + ) + + def subscribe( + self, topics, on_assign=lambda *args: None, *args, **kwargs + ): # pylint: disable=keyword-arg-before-vararg + self._consumer.subscribe(topics, on_assign, *args, **kwargs) + + def original_consumer(self): + return self._consumer + + +class ConfluentKafkaInstrumentor(BaseInstrumentor): + """An instrumentor for confluent kafka module + See `BaseInstrumentor` + """ + + # pylint: disable=attribute-defined-outside-init + @staticmethod + def instrument_producer( + producer: Producer, tracer_provider=None + ) -> ProxiedProducer: + tracer = trace.get_tracer( + __name__, __version__, tracer_provider=tracer_provider + ) + + manual_producer = ProxiedProducer(producer, tracer) + + return manual_producer + + @staticmethod + def instrument_consumer( + consumer: Consumer, tracer_provider=None + ) -> ProxiedConsumer: + tracer = trace.get_tracer( + __name__, __version__, tracer_provider=tracer_provider + ) + + manual_consumer = ProxiedConsumer(consumer, tracer) + + return manual_consumer + + @staticmethod + def uninstrument_producer(producer: Producer) -> Producer: + if isinstance(producer, ProxiedProducer): + return producer.original_producer() + return producer + + @staticmethod + def uninstrument_consumer(consumer: Consumer) -> Consumer: + if isinstance(consumer, ProxiedConsumer): + return consumer.original_consumer() + return consumer + + def instrumentation_dependencies(self) -> Collection[str]: + return _instruments + + def _instrument(self, **kwargs): + self._original_kafka_producer = confluent_kafka.Producer + self._original_kafka_consumer = confluent_kafka.Consumer + + confluent_kafka.Producer = AutoInstrumentedProducer + confluent_kafka.Consumer = AutoInstrumentedConsumer + + tracer_provider = kwargs.get("tracer_provider") + tracer = trace.get_tracer( + __name__, __version__, tracer_provider=tracer_provider + ) + + self._tracer = tracer + + def _inner_wrap_produce(func, instance, args, kwargs): + return ConfluentKafkaInstrumentor.wrap_produce( + func, instance, self._tracer, args, kwargs + ) + + def _inner_wrap_poll(func, instance, args, kwargs): + return ConfluentKafkaInstrumentor.wrap_poll( + func, instance, self._tracer, args, kwargs + ) + + wrapt.wrap_function_wrapper( + AutoInstrumentedProducer, + "produce", + _inner_wrap_produce, + ) + + wrapt.wrap_function_wrapper( + AutoInstrumentedConsumer, + "poll", + _inner_wrap_poll, + ) + + def _uninstrument(self, **kwargs): + confluent_kafka.Producer = self._original_kafka_producer + confluent_kafka.Consumer = self._original_kafka_consumer + + unwrap(AutoInstrumentedProducer, "produce") + unwrap(AutoInstrumentedConsumer, "poll") + + @staticmethod + def wrap_produce(func, instance, tracer, args, kwargs): + topic = kwargs.get("topic") + if not topic: + topic = args[0] + + span_name = _get_span_name("send", topic) + with tracer.start_as_current_span( + name=span_name, kind=trace.SpanKind.PRODUCER + ) as span: + headers = KafkaPropertiesExtractor.extract_produce_headers( + args, kwargs + ) + if headers is None: + headers = [] + kwargs["headers"] = headers + + topic = KafkaPropertiesExtractor.extract_produce_topic(args) + _enrich_span( + span, + topic, + operation=MessagingOperationValues.RECEIVE, + ) # Replace + propagate.inject( + headers, + setter=_kafka_setter, + ) + return func(*args, **kwargs) + + @staticmethod + def wrap_poll(func, instance, tracer, args, kwargs): + if instance._current_consume_span: + context.detach(instance._current_context_token) + instance._current_context_token = None + instance._current_consume_span.end() + instance._current_consume_span = None + + with tracer.start_as_current_span( + "recv", end_on_exit=True, kind=trace.SpanKind.CONSUMER + ): + record = func(*args, **kwargs) + if record: + links = [] + ctx = propagate.extract(record.headers(), getter=_kafka_getter) + if ctx: + for item in ctx.values(): + if hasattr(item, "get_span_context"): + links.append(Link(context=item.get_span_context())) + + instance._current_consume_span = tracer.start_span( + name=f"{record.topic()} process", + links=links, + kind=SpanKind.CONSUMER, + ) + + _enrich_span( + instance._current_consume_span, + record.topic(), + record.partition(), + record.offset(), + operation=MessagingOperationValues.PROCESS, + ) + instance._current_context_token = context.attach( + trace.set_span_in_context(instance._current_consume_span) + ) + + return record diff --git a/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/package.py b/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/package.py new file mode 100644 index 0000000000..dbe3ac484b --- /dev/null +++ b/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/package.py @@ -0,0 +1,16 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +_instruments = ("confluent-kafka ~= 1.8.2",) diff --git a/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/utils.py b/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/utils.py new file mode 100644 index 0000000000..4907031a75 --- /dev/null +++ b/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/utils.py @@ -0,0 +1,109 @@ +from logging import getLogger +from typing import List, Optional + +from opentelemetry.propagators import textmap +from opentelemetry.semconv.trace import ( + MessagingDestinationKindValues, + MessagingOperationValues, + SpanAttributes, +) + +_LOG = getLogger(__name__) + + +class KafkaPropertiesExtractor: + @staticmethod + def extract_bootstrap_servers(instance): + return instance.config.get("bootstrap_servers") + + @staticmethod + def _extract_argument(key, position, default_value, args, kwargs): + if len(args) > position: + return args[position] + return kwargs.get(key, default_value) + + @staticmethod + def extract_produce_topic(args): + """extract topic from `produce` method arguments in Producer class""" + if len(args) > 0: + return args[0] + return "unknown" + + @staticmethod + def extract_produce_headers(args, kwargs): + """extract headers from `produce` method arguments in Producer class""" + return KafkaPropertiesExtractor._extract_argument( + "headers", 6, None, args, kwargs + ) + + +class KafkaContextGetter(textmap.Getter): + def get(self, carrier: textmap.CarrierT, key: str) -> Optional[List[str]]: + if carrier is None: + return None + for item_key, value in carrier: + if item_key == key: + if value is not None: + return [value.decode()] + return None + + def keys(self, carrier: textmap.CarrierT) -> List[str]: + if carrier is None: + return [] + return [key for (key, value) in carrier] + + +class KafkaContextSetter(textmap.Setter): + def set(self, carrier: textmap.CarrierT, key: str, value: str) -> None: + if carrier is None or key is None: + return + + if value: + value = value.encode() + carrier.append((key, value)) + + +_kafka_getter = KafkaContextGetter() + + +def _enrich_span( + span, + topic, + partition: Optional[int] = None, + offset: Optional[int] = None, + operation: Optional[MessagingOperationValues] = None, +): + + if not span.is_recording(): + return + + span.set_attribute(SpanAttributes.MESSAGING_SYSTEM, "kafka") + span.set_attribute(SpanAttributes.MESSAGING_DESTINATION, topic) + + if partition: + span.set_attribute(SpanAttributes.MESSAGING_KAFKA_PARTITION, partition) + + span.set_attribute( + SpanAttributes.MESSAGING_DESTINATION_KIND, + MessagingDestinationKindValues.QUEUE.value, + ) + + if operation: + span.set_attribute(SpanAttributes.MESSAGING_OPERATION, operation.value) + else: + span.set_attribute(SpanAttributes.MESSAGING_TEMP_DESTINATION, True) + + # https://stackoverflow.com/questions/65935155/identify-and-find-specific-message-in-kafka-topic + # A message within Kafka is uniquely defined by its topic name, topic partition and offset. + if partition and offset and topic: + span.set_attribute( + SpanAttributes.MESSAGING_MESSAGE_ID, + f"{topic}.{partition}.{offset}", + ) + + +_kafka_setter = KafkaContextSetter() + + +def _get_span_name(operation: str, topic: str): + return f"{topic} {operation}" diff --git a/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/version.py b/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/version.py new file mode 100644 index 0000000000..d8dc1e1ed7 --- /dev/null +++ b/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/version.py @@ -0,0 +1,15 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +__version__ = "0.31b0" diff --git a/instrumentation/opentelemetry-instrumentation-confluent-kafka/tests/__init__.py b/instrumentation/opentelemetry-instrumentation-confluent-kafka/tests/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/instrumentation/opentelemetry-instrumentation-confluent-kafka/tests/test_instrumentation.py b/instrumentation/opentelemetry-instrumentation-confluent-kafka/tests/test_instrumentation.py new file mode 100644 index 0000000000..e9462d7898 --- /dev/null +++ b/instrumentation/opentelemetry-instrumentation-confluent-kafka/tests/test_instrumentation.py @@ -0,0 +1,60 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# pylint: disable=no-name-in-module + +from unittest import TestCase + +from confluent_kafka import Consumer, Producer + +from opentelemetry.instrumentation.confluent_kafka import ( + ConfluentKafkaInstrumentor, + ProxiedConsumer, + ProxiedProducer, +) + + +class TestConfluentKafka(TestCase): + def test_instrument_api(self) -> None: + instrumentation = ConfluentKafkaInstrumentor() + + producer = Producer({"bootstrap.servers": "localhost:29092"}) + producer = instrumentation.instrument_producer(producer) + + self.assertEqual(producer.__class__, ProxiedProducer) + + producer = instrumentation.uninstrument_producer(producer) + self.assertEqual(producer.__class__, Producer) + + producer = Producer({"bootstrap.servers": "localhost:29092"}) + producer = instrumentation.instrument_producer(producer) + + self.assertEqual(producer.__class__, ProxiedProducer) + + producer = instrumentation.uninstrument_producer(producer) + self.assertEqual(producer.__class__, Producer) + + consumer = Consumer( + { + "bootstrap.servers": "localhost:29092", + "group.id": "mygroup", + "auto.offset.reset": "earliest", + } + ) + + consumer = instrumentation.instrument_consumer(consumer) + self.assertEqual(consumer.__class__, ProxiedConsumer) + + consumer = instrumentation.uninstrument_consumer(consumer) + self.assertEqual(consumer.__class__, Consumer) diff --git a/opentelemetry-contrib-instrumentations/setup.cfg b/opentelemetry-contrib-instrumentations/setup.cfg index 42c1ba3481..fa5aa4c853 100644 --- a/opentelemetry-contrib-instrumentations/setup.cfg +++ b/opentelemetry-contrib-instrumentations/setup.cfg @@ -37,6 +37,7 @@ install_requires = opentelemetry-instrumentation-boto3sqs==0.31b0 opentelemetry-instrumentation-botocore==0.31b0 opentelemetry-instrumentation-celery==0.31b0 + opentelemetry-instrumentation-confluent-kafka==0.31b0 opentelemetry-instrumentation-dbapi==0.31b0 opentelemetry-instrumentation-django==0.31b0 opentelemetry-instrumentation-elasticsearch==0.31b0 diff --git a/opentelemetry-instrumentation/src/opentelemetry/instrumentation/bootstrap_gen.py b/opentelemetry-instrumentation/src/opentelemetry/instrumentation/bootstrap_gen.py index 93d06d3295..6796b18c08 100644 --- a/opentelemetry-instrumentation/src/opentelemetry/instrumentation/bootstrap_gen.py +++ b/opentelemetry-instrumentation/src/opentelemetry/instrumentation/bootstrap_gen.py @@ -48,6 +48,10 @@ "library": "celery >= 4.0, < 6.0", "instrumentation": "opentelemetry-instrumentation-celery==0.31b0", }, + "confluent-kafka": { + "library": "confluent-kafka ~= 1.8.2", + "instrumentation": "opentelemetry-instrumentation-confluent-kafka==0.31b0", + }, "django": { "library": "django >= 1.10", "instrumentation": "opentelemetry-instrumentation-django==0.31b0",