Skip to content

Commit

Permalink
ext/asyncpg: Shouldn't capture query parameters by default (#854)
Browse files Browse the repository at this point in the history
* Update CHANGELOG.md
Co-authored-by: alrex <aboten@lightstep.com>
  • Loading branch information
thomasdesr committed Jul 28, 2020
1 parent 090fb7c commit 7f74a89
Show file tree
Hide file tree
Showing 3 changed files with 150 additions and 95 deletions.
3 changes: 3 additions & 0 deletions ext/opentelemetry-ext-asyncpg/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@

## Unreleased

- Shouldn't capture query parameters by default
([#854](https://github.com/open-telemetry/opentelemetry-python/pull/854))

## Version 0.10b0

Released 2020-06-23
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,36 +74,11 @@ def _hydrate_span_from_args(connection, query, parameters) -> dict:
return span_attributes


async def _do_execute(func, instance, args, kwargs):
span_attributes = _hydrate_span_from_args(instance, args[0], args[1:])
tracer = getattr(asyncpg, _APPLIED)

exception = None

with tracer.start_as_current_span(
"postgresql", kind=SpanKind.CLIENT
) as span:

for attribute, value in span_attributes.items():
span.set_attribute(attribute, value)

try:
result = await func(*args, **kwargs)
except Exception as exc: # pylint: disable=W0703
exception = exc
raise
finally:
if exception is not None:
span.set_status(
Status(_exception_to_canonical_code(exception))
)
else:
span.set_status(Status(StatusCanonicalCode.OK))

return result


class AsyncPGInstrumentor(BaseInstrumentor):
def __init__(self, capture_parameters=False):
super().__init__()
self.capture_parameters = capture_parameters

def _instrument(self, **kwargs):
tracer_provider = kwargs.get(
"tracer_provider", trace.get_tracer_provider()
Expand All @@ -113,6 +88,7 @@ def _instrument(self, **kwargs):
_APPLIED,
tracer_provider.get_tracer("asyncpg", __version__),
)

for method in [
"Connection.execute",
"Connection.executemany",
Expand All @@ -121,7 +97,7 @@ def _instrument(self, **kwargs):
"Connection.fetchrow",
]:
wrapt.wrap_function_wrapper(
"asyncpg.connection", method, _do_execute
"asyncpg.connection", method, self._do_execute
)

def _uninstrument(self, **__):
Expand All @@ -134,3 +110,33 @@ def _uninstrument(self, **__):
"fetchrow",
]:
unwrap(asyncpg.Connection, method)

async def _do_execute(self, func, instance, args, kwargs):
span_attributes = _hydrate_span_from_args(
instance, args[0], args[1:] if self.capture_parameters else None,
)
tracer = getattr(asyncpg, _APPLIED)

exception = None

with tracer.start_as_current_span(
"postgresql", kind=SpanKind.CLIENT
) as span:

for attribute, value in span_attributes.items():
span.set_attribute(attribute, value)

try:
result = await func(*args, **kwargs)
except Exception as exc: # pylint: disable=W0703
exception = exc
raise
finally:
if exception is not None:
span.set_status(
Status(_exception_to_canonical_code(exception))
)
else:
span.set_status(Status(StatusCanonicalCode.OK))

return result
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ def _await(coro):
return loop.run_until_complete(coro)


class TestFunctionalPsycopg(TestBase):
class TestFunctionalAsyncPG(TestBase):
@classmethod
def setUpClass(cls):
super().setUpClass()
Expand Down Expand Up @@ -58,24 +58,6 @@ def test_instrumented_execute_method_without_arguments(self, *_, **__):
},
)

def test_instrumented_execute_method_with_arguments(self, *_, **__):
_await(self._connection.execute("SELECT $1;", "1"))
spans = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans), 1)
self.assertEqual(
StatusCanonicalCode.OK, spans[0].status.canonical_code
)
self.assertEqual(
spans[0].attributes,
{
"db.type": "sql",
"db.user": POSTGRES_USER,
"db.statement.parameters": "('1',)",
"db.instance": POSTGRES_DB_NAME,
"db.statement": "SELECT $1;",
},
)

def test_instrumented_fetch_method_without_arguments(self, *_, **__):
_await(self._connection.fetch("SELECT 42;"))
spans = self.memory_exporter.get_finished_spans()
Expand All @@ -90,52 +72,6 @@ def test_instrumented_fetch_method_without_arguments(self, *_, **__):
},
)

def test_instrumented_fetch_method_with_arguments(self, *_, **__):
_await(self._connection.fetch("SELECT $1;", "1"))
spans = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans), 1)
self.assertEqual(
spans[0].attributes,
{
"db.type": "sql",
"db.user": POSTGRES_USER,
"db.statement.parameters": "('1',)",
"db.instance": POSTGRES_DB_NAME,
"db.statement": "SELECT $1;",
},
)

def test_instrumented_executemany_method_with_arguments(self, *_, **__):
_await(self._connection.executemany("SELECT $1;", [["1"], ["2"]]))
spans = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans), 1)
self.assertEqual(
{
"db.type": "sql",
"db.statement": "SELECT $1;",
"db.statement.parameters": "([['1'], ['2']],)",
"db.user": POSTGRES_USER,
"db.instance": POSTGRES_DB_NAME,
},
spans[0].attributes,
)

def test_instrumented_execute_interface_error_method(self, *_, **__):
with self.assertRaises(asyncpg.InterfaceError):
_await(self._connection.execute("SELECT 42;", 1, 2, 3))
spans = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans), 1)
self.assertEqual(
spans[0].attributes,
{
"db.type": "sql",
"db.instance": POSTGRES_DB_NAME,
"db.user": POSTGRES_USER,
"db.statement.parameters": "(1, 2, 3)",
"db.statement": "SELECT 42;",
},
)

def test_instrumented_transaction_method(self, *_, **__):
async def _transaction_execute():
async with self._connection.transaction():
Expand Down Expand Up @@ -229,3 +165,113 @@ async def _transaction_execute():
self.assertEqual(
StatusCanonicalCode.OK, spans[2].status.canonical_code
)

def test_instrumented_method_doesnt_capture_parameters(self, *_, **__):
_await(self._connection.execute("SELECT $1;", "1"))
spans = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans), 1)
self.assertEqual(
StatusCanonicalCode.OK, spans[0].status.canonical_code
)
self.assertEqual(
spans[0].attributes,
{
"db.type": "sql",
"db.user": POSTGRES_USER,
# This shouldn't be set because we don't capture parameters by
# default
#
# "db.statement.parameters": "('1',)",
"db.instance": POSTGRES_DB_NAME,
"db.statement": "SELECT $1;",
},
)


class TestFunctionalAsyncPG_CaptureParameters(TestBase):
@classmethod
def setUpClass(cls):
super().setUpClass()
cls._connection = None
cls._cursor = None
cls._tracer = cls.tracer_provider.get_tracer(__name__)
AsyncPGInstrumentor(capture_parameters=True).instrument(
tracer_provider=cls.tracer_provider
)
cls._connection = _await(
asyncpg.connect(
database=POSTGRES_DB_NAME,
user=POSTGRES_USER,
password=POSTGRES_PASSWORD,
host=POSTGRES_HOST,
port=POSTGRES_PORT,
)
)

@classmethod
def tearDownClass(cls):
AsyncPGInstrumentor().uninstrument()

def test_instrumented_execute_method_with_arguments(self, *_, **__):
_await(self._connection.execute("SELECT $1;", "1"))
spans = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans), 1)
self.assertEqual(
StatusCanonicalCode.OK, spans[0].status.canonical_code
)
self.assertEqual(
spans[0].attributes,
{
"db.type": "sql",
"db.user": POSTGRES_USER,
"db.statement.parameters": "('1',)",
"db.instance": POSTGRES_DB_NAME,
"db.statement": "SELECT $1;",
},
)

def test_instrumented_fetch_method_with_arguments(self, *_, **__):
_await(self._connection.fetch("SELECT $1;", "1"))
spans = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans), 1)
self.assertEqual(
spans[0].attributes,
{
"db.type": "sql",
"db.user": POSTGRES_USER,
"db.statement.parameters": "('1',)",
"db.instance": POSTGRES_DB_NAME,
"db.statement": "SELECT $1;",
},
)

def test_instrumented_executemany_method_with_arguments(self, *_, **__):
_await(self._connection.executemany("SELECT $1;", [["1"], ["2"]]))
spans = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans), 1)
self.assertEqual(
{
"db.type": "sql",
"db.statement": "SELECT $1;",
"db.statement.parameters": "([['1'], ['2']],)",
"db.user": POSTGRES_USER,
"db.instance": POSTGRES_DB_NAME,
},
spans[0].attributes,
)

def test_instrumented_execute_interface_error_method(self, *_, **__):
with self.assertRaises(asyncpg.InterfaceError):
_await(self._connection.execute("SELECT 42;", 1, 2, 3))
spans = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans), 1)
self.assertEqual(
spans[0].attributes,
{
"db.type": "sql",
"db.instance": POSTGRES_DB_NAME,
"db.user": POSTGRES_USER,
"db.statement.parameters": "(1, 2, 3)",
"db.statement": "SELECT 42;",
},
)

0 comments on commit 7f74a89

Please sign in to comment.