diff --git a/CHANGELOG.md b/CHANGELOG.md index dff32bafca..28e8a85c26 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -47,6 +47,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ([#2418](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/2418)) - Use sqlalchemy version in sqlalchemy commenter instead of opentelemetry library version ([#2404](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/2404)) +- `opentelemetry-instrumentation-asyncio` Check for cancelledException in the future + ([#2461](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/2461)) - Remove SDK dependency from opentelemetry-instrumentation-grpc ([#2474](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/2474)) diff --git a/instrumentation/opentelemetry-instrumentation-asyncio/src/opentelemetry/instrumentation/asyncio/__init__.py b/instrumentation/opentelemetry-instrumentation-asyncio/src/opentelemetry/instrumentation/asyncio/__init__.py index 68e3d0839f..72aa5fd2aa 100644 --- a/instrumentation/opentelemetry-instrumentation-asyncio/src/opentelemetry/instrumentation/asyncio/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-asyncio/src/opentelemetry/instrumentation/asyncio/__init__.py @@ -116,21 +116,11 @@ class AsyncioInstrumentor(BaseInstrumentor): "run_coroutine_threadsafe", ] - def __init__(self): - super().__init__() - self.process_duration_histogram = None - self.process_created_counter = None - - self._tracer = None - self._meter = None - self._coros_name_to_trace: set = set() - self._to_thread_name_to_trace: set = set() - self._future_active_enabled: bool = False - def instrumentation_dependencies(self) -> Collection[str]: return _instruments def _instrument(self, **kwargs): + # pylint: disable=attribute-defined-outside-init self._tracer = get_tracer( __name__, __version__, kwargs.get("tracer_provider") ) @@ -307,13 +297,17 @@ def trace_future(self, future): ) def callback(f): - exception = f.exception() attr = { "type": "future", + "state": ( + "cancelled" + if f.cancelled() + else determine_state(f.exception()) + ), } - state = determine_state(exception) - attr["state"] = state - self.record_process(start, attr, span, exception) + self.record_process( + start, attr, span, None if f.cancelled() else f.exception() + ) future.add_done_callback(callback) return future diff --git a/instrumentation/opentelemetry-instrumentation-asyncio/tests/test_asyncio_future_cancellation.py b/instrumentation/opentelemetry-instrumentation-asyncio/tests/test_asyncio_future_cancellation.py new file mode 100644 index 0000000000..f8f4e5f230 --- /dev/null +++ b/instrumentation/opentelemetry-instrumentation-asyncio/tests/test_asyncio_future_cancellation.py @@ -0,0 +1,60 @@ +import asyncio +from unittest.mock import patch + +from opentelemetry.instrumentation.asyncio import AsyncioInstrumentor +from opentelemetry.instrumentation.asyncio.environment_variables import ( + OTEL_PYTHON_ASYNCIO_FUTURE_TRACE_ENABLED, +) +from opentelemetry.test.test_base import TestBase +from opentelemetry.trace import get_tracer + + +class TestTraceFuture(TestBase): + @patch.dict( + "os.environ", {OTEL_PYTHON_ASYNCIO_FUTURE_TRACE_ENABLED: "true"} + ) + def setUp(self): + super().setUp() + self._tracer = get_tracer( + __name__, + ) + self.instrumentor = AsyncioInstrumentor() + self.instrumentor.instrument() + + def tearDown(self): + super().tearDown() + self.instrumentor.uninstrument() + + def test_trace_future_cancelled(self): + async def future_cancelled(): + with self._tracer.start_as_current_span("root"): + future = asyncio.Future() + future = self.instrumentor.trace_future(future) + future.cancel() + + try: + asyncio.run(future_cancelled()) + except asyncio.CancelledError as exc: + self.assertEqual(isinstance(exc, asyncio.CancelledError), True) + spans = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans), 2) + self.assertEqual(spans[0].name, "root") + self.assertEqual(spans[1].name, "asyncio future") + + metrics = ( + self.memory_metrics_reader.get_metrics_data() + .resource_metrics[0] + .scope_metrics[0] + .metrics + ) + self.assertEqual(len(metrics), 2) + + self.assertEqual(metrics[0].name, "asyncio.process.duration") + self.assertEqual( + metrics[0].data.data_points[0].attributes["state"], "cancelled" + ) + + self.assertEqual(metrics[1].name, "asyncio.process.created") + self.assertEqual( + metrics[1].data.data_points[0].attributes["state"], "cancelled" + )