Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Psycopg3 sync and async instrumentation #2146

Merged
merged 27 commits into from
Mar 19, 2024
Merged
Show file tree
Hide file tree
Changes from 21 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
cf404fa
* psycopg3 instrumentation including asynchronous instrumentation
reiktar Jan 31, 2024
dc78394
* Updating documents
reiktar Jan 31, 2024
b2d490d
* adding Pullrequest link to changelog
reiktar Jan 31, 2024
0b12d9b
* adding github workflow for psycopg3 instrumentation testing
reiktar Jan 31, 2024
f8c349c
* updates to pass PR checks
reiktar Feb 2, 2024
f4abf01
Merge branch 'main' into psycopg3-instrumentation
reiktar Feb 5, 2024
a897b6c
Merge branch 'main' into psycopg3-instrumentation
reiktar Feb 20, 2024
057504a
Merge branch 'main' into psycopg3-instrumentation
reiktar Mar 1, 2024
953a113
* refactor on package name psycopg insteead of psycopg3
reiktar Mar 1, 2024
dafc587
* updating Changlog to reflect the new PR scope
reiktar Mar 1, 2024
8731ff9
* Cleaning up as per comments in #2146
reiktar Mar 4, 2024
9098939
* WRAPT_DISABLE_EXTENSIONS is nolonger required
reiktar Mar 4, 2024
9d2f79b
Merge branch 'main' into psycopg3-instrumentation
reiktar Mar 4, 2024
02d2295
Merge branch 'main' into psycopg3-instrumentation
reiktar Mar 11, 2024
b812e01
Merge branch 'main' into psycopg3-instrumentation
reiktar Mar 13, 2024
d38532a
* move changelog entry to unreleased section
reiktar Mar 14, 2024
f1f1780
* move changelog entry to unreleased section
reiktar Mar 14, 2024
f577330
Merge branch 'main' into psycopg3-instrumentation
reiktar Mar 14, 2024
8c95c88
* remove lingering newline
reiktar Mar 14, 2024
09890a4
* lingering refrerence to Psycopg3Instrumentor (hold over from the or…
reiktar Mar 14, 2024
dca2adb
Merge branch 'main' into psycopg3-instrumentation
lzchen Mar 14, 2024
6025de9
* linting + black
reiktar Mar 15, 2024
08e0c50
* Contribute.MD should point out that manually running black must use…
reiktar Mar 15, 2024
55b6f55
* isort and pylint disagrees on where to have the pylint options. Thi…
reiktar Mar 18, 2024
3543b7b
Merge branch 'main' into psycopg3-instrumentation
reiktar Mar 18, 2024
ee377d6
Merge branch 'main' into psycopg3-instrumentation
ocelotl Mar 18, 2024
9c450f4
Merge branch 'main' into psycopg3-instrumentation
lzchen Mar 19, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## Unreleased

- `opentelemetry-instrumentation-psycopg` Async Instrumentation for psycopg 3.x
([#2146](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/2146))

### Fixed
- `opentelemetry-instrumentation-celery` Allow Celery instrumentation to be installed multiple times
([#2342](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/2342))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@
from typing import Collection

import psycopg
from psycopg import AsyncCursor as pg_async_cursor
from psycopg import Cursor as pg_cursor # pylint: disable=no-name-in-module
from psycopg.sql import Composed # pylint: disable=no-name-in-module

Expand Down Expand Up @@ -151,9 +152,36 @@ def _instrument(self, **kwargs):
commenter_options=commenter_options,
)

dbapi.wrap_connect(
__name__,
psycopg.Connection,
"connect",
self._DATABASE_SYSTEM,
self._CONNECTION_ATTRIBUTES,
version=__version__,
tracer_provider=tracer_provider,
db_api_integration_factory=DatabaseApiIntegration,
enable_commenter=enable_sqlcommenter,
commenter_options=commenter_options,
)
dbapi.wrap_connect(
__name__,
psycopg.AsyncConnection,
"connect",
self._DATABASE_SYSTEM,
self._CONNECTION_ATTRIBUTES,
version=__version__,
tracer_provider=tracer_provider,
db_api_integration_factory=DatabaseApiAsyncIntegration,
enable_commenter=enable_sqlcommenter,
commenter_options=commenter_options,
)

def _uninstrument(self, **kwargs):
""" "Disable Psycopg instrumentation"""
dbapi.unwrap_connect(psycopg, "connect")
dbapi.unwrap_connect(psycopg.Connection, "connect")
dbapi.unwrap_connect(psycopg.AsyncConnection, "connect")

# TODO(owais): check if core dbapi can do this for all dbapi implementations e.g, pymysql and mysql
@staticmethod
Expand Down Expand Up @@ -204,6 +232,26 @@ def wrapped_connection(
return connection


class DatabaseApiAsyncIntegration(dbapi.DatabaseApiIntegration):
async def wrapped_connection(
self,
connect_method: typing.Callable[..., typing.Any],
args: typing.Tuple[typing.Any, typing.Any],
kwargs: typing.Dict[typing.Any, typing.Any],
):
"""Add object proxy to connection object."""
base_cursor_factory = kwargs.pop("cursor_factory", None)
new_factory_kwargs = {"db_api": self}
if base_cursor_factory:
new_factory_kwargs["base_factory"] = base_cursor_factory
kwargs["cursor_factory"] = _new_cursor_async_factory(
**new_factory_kwargs
)
connection = await connect_method(*args, **kwargs)
self.get_connection_attributes(connection)
return connection


class CursorTracer(dbapi.CursorTracer):
def get_operation_name(self, cursor, args):
if not args:
Expand Down Expand Up @@ -259,3 +307,36 @@ def callproc(self, *args, **kwargs):
)

return TracedCursorFactory


def _new_cursor_async_factory(
db_api=None, base_factory=None, tracer_provider=None
):
if not db_api:
db_api = DatabaseApiAsyncIntegration(
__name__,
PsycopgInstrumentor._DATABASE_SYSTEM,
connection_attributes=PsycopgInstrumentor._CONNECTION_ATTRIBUTES,
version=__version__,
tracer_provider=tracer_provider,
)
base_factory = base_factory or pg_async_cursor
_cursor_tracer = CursorTracer(db_api)

class TracedCursorAsyncFactory(base_factory):
async def execute(self, *args, **kwargs):
return await _cursor_tracer.traced_execution(
self, super().execute, *args, **kwargs
)

async def executemany(self, *args, **kwargs):
return await _cursor_tracer.traced_execution(
self, super().executemany, *args, **kwargs
)

async def callproc(self, *args, **kwargs):
return await _cursor_tracer.traced_execution(
self, super().callproc, *args, **kwargs
)

return TracedCursorAsyncFactory
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import asyncio
import types
from unittest import mock

Expand Down Expand Up @@ -45,6 +46,35 @@ def __exit__(self, *args):
return self


class MockAsyncCursor:
def __init__(self, *args, **kwargs):
pass

# pylint: disable=unused-argument, no-self-use
async def execute(self, query, params=None, throw_exception=False):
if throw_exception:
raise Exception("Test Exception")

# pylint: disable=unused-argument, no-self-use
async def executemany(self, query, params=None, throw_exception=False):
if throw_exception:
raise Exception("Test Exception")

# pylint: disable=unused-argument, no-self-use
async def callproc(self, query, params=None, throw_exception=False):
if throw_exception:
raise Exception("Test Exception")

async def __aenter__(self, *args, **kwargs):
return self

async def __aexit__(self, *args, **kwargs):
pass

def close(self):
pass


class MockConnection:
commit = mock.MagicMock(spec=types.MethodType)
commit.__name__ = "commit"
Expand All @@ -64,22 +94,69 @@ def get_dsn_parameters(self): # pylint: disable=no-self-use
return {"dbname": "test"}


class MockAsyncConnection:
commit = mock.MagicMock(spec=types.MethodType)
commit.__name__ = "commit"

rollback = mock.MagicMock(spec=types.MethodType)
rollback.__name__ = "rollback"

def __init__(self, *args, **kwargs):
self.cursor_factory = kwargs.pop("cursor_factory", None)
pass

@classmethod
async def connect(*args, **kwargs):
return MockAsyncConnection(**kwargs)

def cursor(self):
if self.cursor_factory:
cur = self.cursor_factory(self)
return cur
return MockAsyncCursor()

def get_dsn_parameters(self): # pylint: disable=no-self-use
return {"dbname": "test"}

async def __aenter__(self):
return self

async def __aexit__(self, *args):
return mock.MagicMock(spec=types.MethodType)


class TestPostgresqlIntegration(TestBase):
def setUp(self):
super().setUp()
self.cursor_mock = mock.patch(
"opentelemetry.instrumentation.psycopg.pg_cursor", MockCursor
)
self.cursor_async_mock = mock.patch(
"opentelemetry.instrumentation.psycopg.pg_async_cursor",
MockAsyncCursor,
)
self.connection_mock = mock.patch("psycopg.connect", MockConnection)
self.connection_sync_mock = mock.patch(
"psycopg.Connection.connect", MockConnection
)
self.connection_async_mock = mock.patch(
"psycopg.AsyncConnection.connect", MockAsyncConnection.connect
)
reiktar marked this conversation as resolved.
Show resolved Hide resolved

self.cursor_mock.start()
self.cursor_async_mock.start()
self.connection_mock.start()
self.connection_sync_mock.start()
self.connection_async_mock.start()

def tearDown(self):
super().tearDown()
self.memory_exporter.clear()
self.cursor_mock.stop()
self.cursor_async_mock.stop()
self.connection_mock.stop()
self.connection_sync_mock.stop()
self.connection_async_mock.stop()
with self.disable_logging():
PsycopgInstrumentor().uninstrument()

Expand Down Expand Up @@ -114,6 +191,91 @@ def test_instrumentor(self):
spans_list = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans_list), 1)

# pylint: disable=unused-argument
def test_instrumentor_with_connection_class(self):
PsycopgInstrumentor().instrument()

cnx = psycopg.Connection.connect(database="test")

cursor = cnx.cursor()

query = "SELECT * FROM test"
cursor.execute(query)

spans_list = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans_list), 1)
span = spans_list[0]

# Check version and name in span's instrumentation info
self.assertEqualSpanInstrumentationInfo(
span, opentelemetry.instrumentation.psycopg
)

# check that no spans are generated after uninstrument
PsycopgInstrumentor().uninstrument()

cnx = psycopg.Connection.connect(database="test")
cursor = cnx.cursor()
query = "SELECT * FROM test"
cursor.execute(query)

spans_list = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans_list), 1)

async def test_wrap_async_connection_class_with_cursor(self):
PsycopgInstrumentor().instrument()

async def test_async_connection():
acnx = await psycopg.AsyncConnection.connect(database="test")
async with acnx as cnx:
async with cnx.cursor() as cursor:
await cursor.execute("SELECT * FROM test")

asyncio.run(test_async_connection())
spans_list = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans_list), 1)
span = spans_list[0]

# Check version and name in span's instrumentation info
self.assertEqualSpanInstrumentationInfo(
span, opentelemetry.instrumentation.psycopg
)

# check that no spans are generated after uninstrument
PsycopgInstrumentor().uninstrument()

asyncio.run(test_async_connection())

spans_list = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans_list), 1)

# pylint: disable=unused-argument
async def test_instrumentor_with_async_connection_class(self):
PsycopgInstrumentor().instrument()

async def test_async_connection():
acnx = await psycopg.AsyncConnection.connect(database="test")
async with acnx as cnx:
await cnx.execute("SELECT * FROM test")

asyncio.run(test_async_connection())

spans_list = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans_list), 1)
span = spans_list[0]

# Check version and name in span's instrumentation info
self.assertEqualSpanInstrumentationInfo(
span, opentelemetry.instrumentation.psycopg
)

# check that no spans are generated after uninstrument
PsycopgInstrumentor().uninstrument()
asyncio.run(test_async_connection())

spans_list = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans_list), 1)

def test_span_name(self):
PsycopgInstrumentor().instrument()

Expand All @@ -140,6 +302,34 @@ def test_span_name(self):
self.assertEqual(spans_list[4].name, "query")
self.assertEqual(spans_list[5].name, "query")

async def test_span_name_async(self):
PsycopgInstrumentor().instrument()

acnx = psycopg.AsyncConnection.connect(database="test")
async with acnx as cnx:
async with cnx.cursor() as cursor:
await cursor.execute("Test query", ("param1Value", False))
await cursor.execute(
"""multi
line
query"""
)
await cursor.execute("tab\tseparated query")
await cursor.execute("/* leading comment */ query")
await cursor.execute(
"/* leading comment */ query /* trailing comment */"
)
await cursor.execute("query /* trailing comment */")

spans_list = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans_list), 6)
self.assertEqual(spans_list[0].name, "Test")
self.assertEqual(spans_list[1].name, "multi")
self.assertEqual(spans_list[2].name, "tab")
self.assertEqual(spans_list[3].name, "query")
self.assertEqual(spans_list[4].name, "query")
self.assertEqual(spans_list[5].name, "query")

# pylint: disable=unused-argument
def test_not_recording(self):
mock_tracer = mock.Mock()
Expand All @@ -160,6 +350,27 @@ def test_not_recording(self):

PsycopgInstrumentor().uninstrument()

# pylint: disable=unused-argument
async def test_not_recording_async(self):
mock_tracer = mock.Mock()
mock_span = mock.Mock()
mock_span.is_recording.return_value = False
mock_tracer.start_span.return_value = mock_span
PsycopgInstrumentor().instrument()
with mock.patch("opentelemetry.trace.get_tracer") as tracer:
tracer.return_value = mock_tracer
acnx = psycopg.AsyncConnection.connect(database="test")
async with acnx as cnx:
async with cnx.cursor() as cursor:
query = "SELECT * FROM test"
cursor.execute(query)
self.assertFalse(mock_span.is_recording())
self.assertTrue(mock_span.is_recording.called)
self.assertFalse(mock_span.set_attribute.called)
self.assertFalse(mock_span.set_status.called)

PsycopgInstrumentor().uninstrument()

# pylint: disable=unused-argument
def test_custom_tracer_provider(self):
resource = resources.Resource.create({})
Expand Down
Loading