diff --git a/ext/opentelemetry-ext-asyncpg/CHANGELOG.md b/ext/opentelemetry-ext-asyncpg/CHANGELOG.md index e81079bfb8..052b66ea4e 100644 --- a/ext/opentelemetry-ext-asyncpg/CHANGELOG.md +++ b/ext/opentelemetry-ext-asyncpg/CHANGELOG.md @@ -2,6 +2,9 @@ ## Unreleased +- Shouldn't capture query parameters by default + ([#854](https://github.com/open-telemetry/opentelemetry-python/pull/854)) + ## Version 0.10b0 Released 2020-06-23 diff --git a/ext/opentelemetry-ext-asyncpg/src/opentelemetry/ext/asyncpg/__init__.py b/ext/opentelemetry-ext-asyncpg/src/opentelemetry/ext/asyncpg/__init__.py index c373d7194d..4a3a51ac08 100644 --- a/ext/opentelemetry-ext-asyncpg/src/opentelemetry/ext/asyncpg/__init__.py +++ b/ext/opentelemetry-ext-asyncpg/src/opentelemetry/ext/asyncpg/__init__.py @@ -74,36 +74,11 @@ def _hydrate_span_from_args(connection, query, parameters) -> dict: return span_attributes -async def _do_execute(func, instance, args, kwargs): - span_attributes = _hydrate_span_from_args(instance, args[0], args[1:]) - tracer = getattr(asyncpg, _APPLIED) - - exception = None - - with tracer.start_as_current_span( - "postgresql", kind=SpanKind.CLIENT - ) as span: - - for attribute, value in span_attributes.items(): - span.set_attribute(attribute, value) - - try: - result = await func(*args, **kwargs) - except Exception as exc: # pylint: disable=W0703 - exception = exc - raise - finally: - if exception is not None: - span.set_status( - Status(_exception_to_canonical_code(exception)) - ) - else: - span.set_status(Status(StatusCanonicalCode.OK)) - - return result - - class AsyncPGInstrumentor(BaseInstrumentor): + def __init__(self, capture_parameters=False): + super().__init__() + self.capture_parameters = capture_parameters + def _instrument(self, **kwargs): tracer_provider = kwargs.get( "tracer_provider", trace.get_tracer_provider() @@ -113,6 +88,7 @@ def _instrument(self, **kwargs): _APPLIED, tracer_provider.get_tracer("asyncpg", __version__), ) + for method in [ "Connection.execute", "Connection.executemany", @@ -121,7 +97,7 @@ def _instrument(self, **kwargs): "Connection.fetchrow", ]: wrapt.wrap_function_wrapper( - "asyncpg.connection", method, _do_execute + "asyncpg.connection", method, self._do_execute ) def _uninstrument(self, **__): @@ -134,3 +110,33 @@ def _uninstrument(self, **__): "fetchrow", ]: unwrap(asyncpg.Connection, method) + + async def _do_execute(self, func, instance, args, kwargs): + span_attributes = _hydrate_span_from_args( + instance, args[0], args[1:] if self.capture_parameters else None, + ) + tracer = getattr(asyncpg, _APPLIED) + + exception = None + + with tracer.start_as_current_span( + "postgresql", kind=SpanKind.CLIENT + ) as span: + + for attribute, value in span_attributes.items(): + span.set_attribute(attribute, value) + + try: + result = await func(*args, **kwargs) + except Exception as exc: # pylint: disable=W0703 + exception = exc + raise + finally: + if exception is not None: + span.set_status( + Status(_exception_to_canonical_code(exception)) + ) + else: + span.set_status(Status(StatusCanonicalCode.OK)) + + return result diff --git a/ext/opentelemetry-ext-docker-tests/tests/asyncpg/test_asyncpg_functional.py b/ext/opentelemetry-ext-docker-tests/tests/asyncpg/test_asyncpg_functional.py index c5f5557438..d3060592a6 100644 --- a/ext/opentelemetry-ext-docker-tests/tests/asyncpg/test_asyncpg_functional.py +++ b/ext/opentelemetry-ext-docker-tests/tests/asyncpg/test_asyncpg_functional.py @@ -19,7 +19,7 @@ def _await(coro): return loop.run_until_complete(coro) -class TestFunctionalPsycopg(TestBase): +class TestFunctionalAsyncPG(TestBase): @classmethod def setUpClass(cls): super().setUpClass() @@ -58,24 +58,6 @@ def test_instrumented_execute_method_without_arguments(self, *_, **__): }, ) - def test_instrumented_execute_method_with_arguments(self, *_, **__): - _await(self._connection.execute("SELECT $1;", "1")) - spans = self.memory_exporter.get_finished_spans() - self.assertEqual(len(spans), 1) - self.assertEqual( - StatusCanonicalCode.OK, spans[0].status.canonical_code - ) - self.assertEqual( - spans[0].attributes, - { - "db.type": "sql", - "db.user": POSTGRES_USER, - "db.statement.parameters": "('1',)", - "db.instance": POSTGRES_DB_NAME, - "db.statement": "SELECT $1;", - }, - ) - def test_instrumented_fetch_method_without_arguments(self, *_, **__): _await(self._connection.fetch("SELECT 42;")) spans = self.memory_exporter.get_finished_spans() @@ -90,52 +72,6 @@ def test_instrumented_fetch_method_without_arguments(self, *_, **__): }, ) - def test_instrumented_fetch_method_with_arguments(self, *_, **__): - _await(self._connection.fetch("SELECT $1;", "1")) - spans = self.memory_exporter.get_finished_spans() - self.assertEqual(len(spans), 1) - self.assertEqual( - spans[0].attributes, - { - "db.type": "sql", - "db.user": POSTGRES_USER, - "db.statement.parameters": "('1',)", - "db.instance": POSTGRES_DB_NAME, - "db.statement": "SELECT $1;", - }, - ) - - def test_instrumented_executemany_method_with_arguments(self, *_, **__): - _await(self._connection.executemany("SELECT $1;", [["1"], ["2"]])) - spans = self.memory_exporter.get_finished_spans() - self.assertEqual(len(spans), 1) - self.assertEqual( - { - "db.type": "sql", - "db.statement": "SELECT $1;", - "db.statement.parameters": "([['1'], ['2']],)", - "db.user": POSTGRES_USER, - "db.instance": POSTGRES_DB_NAME, - }, - spans[0].attributes, - ) - - def test_instrumented_execute_interface_error_method(self, *_, **__): - with self.assertRaises(asyncpg.InterfaceError): - _await(self._connection.execute("SELECT 42;", 1, 2, 3)) - spans = self.memory_exporter.get_finished_spans() - self.assertEqual(len(spans), 1) - self.assertEqual( - spans[0].attributes, - { - "db.type": "sql", - "db.instance": POSTGRES_DB_NAME, - "db.user": POSTGRES_USER, - "db.statement.parameters": "(1, 2, 3)", - "db.statement": "SELECT 42;", - }, - ) - def test_instrumented_transaction_method(self, *_, **__): async def _transaction_execute(): async with self._connection.transaction(): @@ -229,3 +165,113 @@ async def _transaction_execute(): self.assertEqual( StatusCanonicalCode.OK, spans[2].status.canonical_code ) + + def test_instrumented_method_doesnt_capture_parameters(self, *_, **__): + _await(self._connection.execute("SELECT $1;", "1")) + spans = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans), 1) + self.assertEqual( + StatusCanonicalCode.OK, spans[0].status.canonical_code + ) + self.assertEqual( + spans[0].attributes, + { + "db.type": "sql", + "db.user": POSTGRES_USER, + # This shouldn't be set because we don't capture parameters by + # default + # + # "db.statement.parameters": "('1',)", + "db.instance": POSTGRES_DB_NAME, + "db.statement": "SELECT $1;", + }, + ) + + +class TestFunctionalAsyncPG_CaptureParameters(TestBase): + @classmethod + def setUpClass(cls): + super().setUpClass() + cls._connection = None + cls._cursor = None + cls._tracer = cls.tracer_provider.get_tracer(__name__) + AsyncPGInstrumentor(capture_parameters=True).instrument( + tracer_provider=cls.tracer_provider + ) + cls._connection = _await( + asyncpg.connect( + database=POSTGRES_DB_NAME, + user=POSTGRES_USER, + password=POSTGRES_PASSWORD, + host=POSTGRES_HOST, + port=POSTGRES_PORT, + ) + ) + + @classmethod + def tearDownClass(cls): + AsyncPGInstrumentor().uninstrument() + + def test_instrumented_execute_method_with_arguments(self, *_, **__): + _await(self._connection.execute("SELECT $1;", "1")) + spans = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans), 1) + self.assertEqual( + StatusCanonicalCode.OK, spans[0].status.canonical_code + ) + self.assertEqual( + spans[0].attributes, + { + "db.type": "sql", + "db.user": POSTGRES_USER, + "db.statement.parameters": "('1',)", + "db.instance": POSTGRES_DB_NAME, + "db.statement": "SELECT $1;", + }, + ) + + def test_instrumented_fetch_method_with_arguments(self, *_, **__): + _await(self._connection.fetch("SELECT $1;", "1")) + spans = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans), 1) + self.assertEqual( + spans[0].attributes, + { + "db.type": "sql", + "db.user": POSTGRES_USER, + "db.statement.parameters": "('1',)", + "db.instance": POSTGRES_DB_NAME, + "db.statement": "SELECT $1;", + }, + ) + + def test_instrumented_executemany_method_with_arguments(self, *_, **__): + _await(self._connection.executemany("SELECT $1;", [["1"], ["2"]])) + spans = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans), 1) + self.assertEqual( + { + "db.type": "sql", + "db.statement": "SELECT $1;", + "db.statement.parameters": "([['1'], ['2']],)", + "db.user": POSTGRES_USER, + "db.instance": POSTGRES_DB_NAME, + }, + spans[0].attributes, + ) + + def test_instrumented_execute_interface_error_method(self, *_, **__): + with self.assertRaises(asyncpg.InterfaceError): + _await(self._connection.execute("SELECT 42;", 1, 2, 3)) + spans = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans), 1) + self.assertEqual( + spans[0].attributes, + { + "db.type": "sql", + "db.instance": POSTGRES_DB_NAME, + "db.user": POSTGRES_USER, + "db.statement.parameters": "(1, 2, 3)", + "db.statement": "SELECT 42;", + }, + )