diff --git a/ddtrace/contrib/aiomysql/patch.py b/ddtrace/contrib/aiomysql/patch.py index bbea1c51535..d15295683f6 100644 --- a/ddtrace/contrib/aiomysql/patch.py +++ b/ddtrace/contrib/aiomysql/patch.py @@ -6,6 +6,7 @@ from ddtrace.constants import SPAN_KIND from ddtrace.constants import SPAN_MEASURED_KEY from ddtrace.contrib import dbapi +from ddtrace.internal import core from ddtrace.internal.constants import COMPONENT from ddtrace.internal.schema import schematize_database_operation from ddtrace.internal.utils.wrappers import unwrap @@ -16,12 +17,16 @@ from ...ext import db from ...ext import net from ...internal.schema import schematize_service_name +from ...propagation._database_monitoring import _DBM_Propagator from .. import trace_utils config._add( "aiomysql", - dict(_default_service=schematize_service_name("mysql")), + dict( + _default_service=schematize_service_name("mysql"), + _dbm_propagator=_DBM_Propagator(0, "query"), + ), ) @@ -43,7 +48,7 @@ async def patched_connect(connect_func, _, args, kwargs): tags = {} for tag, attr in CONN_ATTR_BY_TAG.items(): if hasattr(conn, attr): - tags[tag] = getattr(conn, attr) + tags[tag] = trace_utils._convert_to_string(getattr(conn, attr, None)) tags[db.SYSTEM] = "mysql" c = AIOTracedConnection(conn) @@ -83,6 +88,11 @@ async def _trace_method(self, method, resource, extra_tags, *args, **kwargs): # set analytics sample rate s.set_tag(ANALYTICS_SAMPLE_RATE_KEY, config.aiomysql.get_analytics_sample_rate()) + # dispatch DBM + result = core.dispatch_with_results("aiomysql.execute", (config.aiomysql, s, args, kwargs)).result + if result: + s, args, kwargs = result.value + try: result = await method(*args, **kwargs) return result diff --git a/ddtrace/contrib/asyncpg/patch.py b/ddtrace/contrib/asyncpg/patch.py index c7686a05eed..21c1db723a5 100644 --- a/ddtrace/contrib/asyncpg/patch.py +++ b/ddtrace/contrib/asyncpg/patch.py @@ -4,6 +4,7 @@ from ddtrace import Pin from ddtrace import config +from ddtrace.internal import core from ddtrace.internal.constants import COMPONENT from ddtrace.vendor import wrapt @@ -17,6 +18,7 @@ from ...internal.schema import schematize_database_operation from ...internal.schema import schematize_service_name from ...internal.utils import get_argument_value +from ...propagation._database_monitoring import _DBM_Propagator from ..trace_utils import ext_service from ..trace_utils import unwrap from ..trace_utils import wrap @@ -37,6 +39,7 @@ "asyncpg", dict( _default_service=schematize_service_name("postgres"), + _dbm_propagator=_DBM_Propagator(0, "query"), ), ) @@ -113,16 +116,21 @@ async def _traced_query(pin, method, query, args, kwargs): # set span.kind to the type of request being performed span.set_tag_str(SPAN_KIND, SpanKind.CLIENT) - span.set_tag(SPAN_MEASURED_KEY) span.set_tags(pin.tags) + + # dispatch DBM + result = core.dispatch_with_results("asyncpg.execute", (config.asyncpg, method, span, args, kwargs)).result + if result: + span, args, kwargs = result.value + return await method(*args, **kwargs) @with_traced_module async def _traced_protocol_execute(asyncpg, pin, func, instance, args, kwargs): state = get_argument_value(args, kwargs, 0, "state") # type: Union[str, PreparedStatement] - query = state if isinstance(state, str) else state.query + query = state if isinstance(state, str) or isinstance(state, bytes) else state.query return await _traced_query(pin, func, query, args, kwargs) diff --git a/ddtrace/contrib/dbapi/__init__.py b/ddtrace/contrib/dbapi/__init__.py index 92d5efe07fc..3fb5d6cbe1c 100644 --- a/ddtrace/contrib/dbapi/__init__.py +++ b/ddtrace/contrib/dbapi/__init__.py @@ -3,6 +3,7 @@ """ from ddtrace import config from ddtrace.appsec._iast._utils import _is_iast_enabled +from ddtrace.internal import core from ddtrace.internal.constants import COMPONENT from ...appsec._constants import IAST_SPAN_TAGS @@ -118,8 +119,14 @@ def _trace_method(self, method, name, resource, extra_tags, dbm_propagator, *arg if not isinstance(self, FetchTracedCursor): s.set_tag(ANALYTICS_SAMPLE_RATE_KEY, self._self_config.get_analytics_sample_rate()) + # dispatch DBM if dbm_propagator: - args, kwargs = dbm_propagator.inject(s, args, kwargs) + # this check is necessary to prevent fetch methods from trying to add dbm propagation + result = core.dispatch_with_results( + f"{self._self_config.integration_name}.execute", (self._self_config, s, args, kwargs) + ).result + if result: + s, args, kwargs = result.value try: return method(*args, **kwargs) diff --git a/ddtrace/contrib/dbapi_async/__init__.py b/ddtrace/contrib/dbapi_async/__init__.py index af204633d09..c37638fdf67 100644 --- a/ddtrace/contrib/dbapi_async/__init__.py +++ b/ddtrace/contrib/dbapi_async/__init__.py @@ -1,5 +1,6 @@ from ddtrace import config from ddtrace.appsec._iast._utils import _is_iast_enabled +from ddtrace.internal import core from ddtrace.internal.constants import COMPONENT from ...appsec._constants import IAST_SPAN_TAGS @@ -88,8 +89,14 @@ async def _trace_method(self, method, name, resource, extra_tags, dbm_propagator if not isinstance(self, FetchTracedAsyncCursor): s.set_tag(ANALYTICS_SAMPLE_RATE_KEY, self._self_config.get_analytics_sample_rate()) + # dispatch DBM if dbm_propagator: - args, kwargs = dbm_propagator.inject(s, args, kwargs) + # this check is necessary to prevent fetch methods from trying to add dbm propagation + result = core.dispatch_with_results( + f"{self._self_config.integration_name}.execute", [self._self_config, s, args, kwargs] + ).result + if result: + s, args, kwargs = result.value try: return await method(*args, **kwargs) diff --git a/ddtrace/contrib/mysql/patch.py b/ddtrace/contrib/mysql/patch.py index 03a4f922bdc..cfb1f17a7da 100644 --- a/ddtrace/contrib/mysql/patch.py +++ b/ddtrace/contrib/mysql/patch.py @@ -12,6 +12,8 @@ from ...internal.schema import schematize_database_operation from ...internal.schema import schematize_service_name from ...internal.utils.formats import asbool +from ...propagation._database_monitoring import _DBM_Propagator +from ..trace_utils import _convert_to_string config._add( @@ -21,6 +23,7 @@ _dbapi_span_name_prefix="mysql", _dbapi_span_operation_name=schematize_database_operation("mysql.query", database_provider="mysql"), trace_fetch_methods=asbool(os.getenv("DD_MYSQL_TRACE_FETCH_METHODS", default=False)), + _dbm_propagator=_DBM_Propagator(0, "query"), ), ) @@ -58,7 +61,9 @@ def _connect(func, instance, args, kwargs): def patch_conn(conn): - tags = {t: getattr(conn, a) for t, a in CONN_ATTR_BY_TAG.items() if getattr(conn, a, "") != ""} + tags = { + t: _convert_to_string(getattr(conn, a, None)) for t, a in CONN_ATTR_BY_TAG.items() if getattr(conn, a, "") != "" + } tags[db.SYSTEM] = "mysql" pin = Pin(tags=tags) diff --git a/ddtrace/contrib/mysqldb/patch.py b/ddtrace/contrib/mysqldb/patch.py index 878280d16a0..a7a0d2b1ff8 100644 --- a/ddtrace/contrib/mysqldb/patch.py +++ b/ddtrace/contrib/mysqldb/patch.py @@ -19,6 +19,8 @@ from ...internal.schema import schematize_service_name from ...internal.utils.formats import asbool from ...internal.utils.wrappers import unwrap as _u +from ...propagation._database_monitoring import _DBM_Propagator +from ..trace_utils import _convert_to_string config._add( @@ -29,6 +31,7 @@ _dbapi_span_operation_name=schematize_database_operation("mysql.query", database_provider="mysql"), trace_fetch_methods=asbool(os.getenv("DD_MYSQLDB_TRACE_FETCH_METHODS", default=False)), trace_connect=asbool(os.getenv("DD_MYSQLDB_TRACE_CONNECT", default=False)), + _dbm_propagator=_DBM_Propagator(0, "query"), ), ) @@ -99,7 +102,9 @@ def _connect(func, instance, args, kwargs): def patch_conn(conn, *args, **kwargs): tags = { - t: kwargs[k] if k in kwargs else args[p] for t, (k, p) in KWPOS_BY_TAG.items() if k in kwargs or len(args) > p + t: _convert_to_string(kwargs[k]) if k in kwargs else _convert_to_string(args[p]) + for t, (k, p) in KWPOS_BY_TAG.items() + if k in kwargs or len(args) > p } tags[db.SYSTEM] = "mysql" tags[net.TARGET_PORT] = conn.port diff --git a/ddtrace/contrib/pymysql/patch.py b/ddtrace/contrib/pymysql/patch.py index fe2fdcec843..ad83f73a134 100644 --- a/ddtrace/contrib/pymysql/patch.py +++ b/ddtrace/contrib/pymysql/patch.py @@ -12,6 +12,8 @@ from ...internal.schema import schematize_database_operation from ...internal.schema import schematize_service_name from ...internal.utils.formats import asbool +from ...propagation._database_monitoring import _DBM_Propagator +from ..trace_utils import _convert_to_string config._add( @@ -21,6 +23,7 @@ _dbapi_span_name_prefix="pymysql", _dbapi_span_operation_name=schematize_database_operation("pymysql.query", database_provider="mysql"), trace_fetch_methods=asbool(os.getenv("DD_PYMYSQL_TRACE_FETCH_METHODS", default=False)), + _dbm_propagator=_DBM_Propagator(0, "query"), ), ) @@ -53,7 +56,7 @@ def _connect(func, instance, args, kwargs): def patch_conn(conn): - tags = {t: getattr(conn, a, "") for t, a in CONN_ATTR_BY_TAG.items()} + tags = {t: _convert_to_string(getattr(conn, a)) for t, a in CONN_ATTR_BY_TAG.items() if getattr(conn, a, "") != ""} tags[db.SYSTEM] = "mysql" pin = Pin(tags=tags) diff --git a/ddtrace/contrib/trace_utils.py b/ddtrace/contrib/trace_utils.py index e0ae0128ebe..ace93ec06f2 100644 --- a/ddtrace/contrib/trace_utils.py +++ b/ddtrace/contrib/trace_utils.py @@ -23,6 +23,7 @@ from ddtrace.ext import net from ddtrace.ext import user from ddtrace.internal import core +from ddtrace.internal.compat import ensure_text from ddtrace.internal.compat import ip_is_global from ddtrace.internal.compat import parse from ddtrace.internal.logger import get_logger @@ -671,3 +672,13 @@ def extract_netloc_and_query_info_from_url(url): class InterruptException(Exception): pass + + +def _convert_to_string(attr): + # ensures attribute is converted to a string + if attr: + if isinstance(attr, int) or isinstance(attr, float): + return str(attr) + else: + return ensure_text(attr) + return attr diff --git a/ddtrace/propagation/_database_monitoring.py b/ddtrace/propagation/_database_monitoring.py index bdeb12adc82..4210e4cbec6 100644 --- a/ddtrace/propagation/_database_monitoring.py +++ b/ddtrace/propagation/_database_monitoring.py @@ -2,6 +2,7 @@ from typing import Union # noqa:F401 import ddtrace +from ddtrace.internal import core from ddtrace.internal.logger import get_logger from ddtrace.settings.peer_service import PeerServiceConfig from ddtrace.vendor.sqlcommenter import generate_sql_comment as _generate_sql_comment @@ -122,3 +123,28 @@ def _get_dbm_comment(self, db_span): # replace leading whitespace with trailing whitespace return sql_comment.strip() + " " return "" + + +def handle_dbm_injection(int_config, span, args, kwargs): + dbm_propagator = getattr(int_config, "_dbm_propagator", None) + if dbm_propagator: + args, kwargs = dbm_propagator.inject(span, args, kwargs) + + return span, args, kwargs + + +def handle_dbm_injection_asyncpg(int_config, method, span, args, kwargs): + # bind_execute_many uses prepared statements which we want to avoid injection for + if method.__name__ != "bind_execute_many": + return handle_dbm_injection(int_config, span, args, kwargs) + return span, args, kwargs + + +if dbm_config.propagation_mode in ["full", "service"]: + core.on("aiomysql.execute", handle_dbm_injection, "result") + core.on("asyncpg.execute", handle_dbm_injection_asyncpg, "result") + core.on("dbapi.execute", handle_dbm_injection, "result") + core.on("mysql.execute", handle_dbm_injection, "result") + core.on("mysqldb.execute", handle_dbm_injection, "result") + core.on("psycopg.execute", handle_dbm_injection, "result") + core.on("pymysql.execute", handle_dbm_injection, "result") diff --git a/releasenotes/notes/add-dbm-for-mysql-and-postgres-integrations-8558939c45f0dbd7.yaml b/releasenotes/notes/add-dbm-for-mysql-and-postgres-integrations-8558939c45f0dbd7.yaml new file mode 100644 index 00000000000..65355868164 --- /dev/null +++ b/releasenotes/notes/add-dbm-for-mysql-and-postgres-integrations-8558939c45f0dbd7.yaml @@ -0,0 +1,5 @@ +--- +features: + - | + aiomysql, asyncpg, mysql, mysqldb, pymysql: Add Database Monitoring (DBM) for remaining + mysql and postgres integrations lacking support. diff --git a/tests/.suitespec.json b/tests/.suitespec.json index b1cf43c7817..f20f33f57e6 100644 --- a/tests/.suitespec.json +++ b/tests/.suitespec.json @@ -388,6 +388,9 @@ "django": [ "ddtrace/contrib/django/*" ], + "aiopg": [ + "ddtrace/contrib/aiopg/*" + ], "pytest": [ "ddtrace/contrib/pytest/*", "ddtrace/contrib/pytest_bdd/*", @@ -731,14 +734,16 @@ "@contrib", "@tracing", "@dbapi", - "@appsec" + "@appsec", + "tests/contrib/shared_tests.py" ], "dbapi_async": [ "@bootstrap", "@core", "@contrib", "@tracing", - "@dbapi" + "@dbapi", + "tests/contrib/shared_tests.py" ], "asyncpg": [ "@bootstrap", @@ -747,7 +752,8 @@ "@tracing", "@pg", "tests/contrib/asyncpg/*", - "tests/snapshots/tests.contrib.{suite}.*" + "tests/snapshots/tests.contrib.{suite}.*", + "tests/contrib/shared_tests.py" ], "aiohttp": [ "@bootstrap", @@ -1000,7 +1006,8 @@ "@dbapi", "@mysql", "tests/contrib/mysqldb/*", - "tests/contrib/mysql/*" + "tests/contrib/mysql/*", + "tests/contrib/shared_tests.py" ], "pymysql": [ "@bootstrap", @@ -1010,7 +1017,8 @@ "@dbapi", "@mysql", "tests/contrib/pymysql/*", - "tests/contrib/mysql/*" + "tests/contrib/mysql/*", + "tests/contrib/shared_tests.py" ], "pylibmc": [ "@bootstrap", @@ -1170,7 +1178,8 @@ "@dbapi", "@pg", "tests/contrib/psycopg/*", - "tests/snapshots/tests.contrib.psycopg.*" + "tests/snapshots/tests.contrib.psycopg.*", + "tests/contrib/shared_tests.py" ], "psycopg2": [ "@bootstrap", @@ -1180,7 +1189,8 @@ "@tracing", "@pg", "tests/contrib/psycopg2/*", - "tests/snapshots/tests.contrib.psycopg2.*" + "tests/snapshots/tests.contrib.psycopg2.*", + "tests/contrib/shared_tests.py" ], "aiobotocore": [ "@bootstrap", @@ -1198,7 +1208,8 @@ "@dbapi", "@mysql", "tests/contrib/aiomysql/*", - "tests/snapshots/tests.contrib.{suite}.*" + "tests/snapshots/tests.contrib.{suite}.*", + "tests/contrib/shared_tests.py" ], "aiopg": [ "@bootstrap", @@ -1207,7 +1218,9 @@ "@tracing", "@dbapi", "@pg", - "tests/contrib/aiopg/*" + "@aiopg", + "tests/contrib/aiopg/*", + "tests/contrib/shared_tests.py" ], "aredis": [ "@bootstrap", diff --git a/tests/contrib/aiomysql/test_aiomysql.py b/tests/contrib/aiomysql/test_aiomysql.py index dbdb94c2790..35e0a7e09c6 100644 --- a/tests/contrib/aiomysql/test_aiomysql.py +++ b/tests/contrib/aiomysql/test_aiomysql.py @@ -1,6 +1,7 @@ import os import aiomysql +import mock import pymysql import pytest @@ -9,6 +10,7 @@ from ddtrace.contrib.aiomysql import patch from ddtrace.contrib.aiomysql import unpatch from ddtrace.internal.schema import DEFAULT_SPAN_SERVICE_NAME +from tests.contrib import shared_tests from tests.contrib.asyncio.utils import AsyncioTestCase from tests.contrib.asyncio.utils import mark_asyncio from tests.contrib.config import MYSQL_CONFIG @@ -340,3 +342,90 @@ async def test_span_name_v1_schema(self): assert len(spans) == 1 span = spans[0] assert span.name == "mysql.query" + + @mark_asyncio + @AsyncioTestCase.run_in_subprocess(env_overrides=dict(DD_DBM_PROPAGATION_MODE="full")) + async def test_aiomysql_dbm_propagation_enabled(self): + conn, tracer = await self._get_conn_tracer() + cursor = await conn.cursor() + + await shared_tests._test_dbm_propagation_enabled(tracer, cursor, "mysql") + + @mark_asyncio + @AsyncioTestCase.run_in_subprocess( + env_overrides=dict( + DD_DBM_PROPAGATION_MODE="service", + DD_SERVICE="orders-app", + DD_ENV="staging", + DD_VERSION="v7343437-d7ac743", + ) + ) + async def test_aiomysql_dbm_propagation_comment_with_global_service_name_configured(self): + """tests if dbm comment is set in mysql""" + conn, tracer = await self._get_conn_tracer() + cursor = await conn.cursor() + cursor.__wrapped__ = mock.AsyncMock() + + await shared_tests._test_dbm_propagation_comment_with_global_service_name_configured( + config=AIOMYSQL_CONFIG, db_system="mysql", cursor=cursor, wrapped_instance=cursor.__wrapped__ + ) + + @mark_asyncio + @AsyncioTestCase.run_in_subprocess( + env_overrides=dict( + DD_DBM_PROPAGATION_MODE="service", + DD_SERVICE="orders-app", + DD_ENV="staging", + DD_VERSION="v7343437-d7ac743", + DD_AIOMYSQL_SERVICE="service-name-override", + ) + ) + async def test_aiomysql_dbm_propagation_comment_integration_service_name_override(self): + """tests if dbm comment is set in mysql""" + conn, tracer = await self._get_conn_tracer() + cursor = await conn.cursor() + cursor.__wrapped__ = mock.AsyncMock() + + await shared_tests._test_dbm_propagation_comment_integration_service_name_override( + config=AIOMYSQL_CONFIG, cursor=cursor, wrapped_instance=cursor.__wrapped__ + ) + + @mark_asyncio + @AsyncioTestCase.run_in_subprocess( + env_overrides=dict( + DD_DBM_PROPAGATION_MODE="service", + DD_SERVICE="orders-app", + DD_ENV="staging", + DD_VERSION="v7343437-d7ac743", + DD_AIOMYSQL_SERVICE="service-name-override", + ) + ) + async def test_aiomysql_dbm_propagation_comment_pin_service_name_override(self): + """tests if dbm comment is set in mysql""" + conn, tracer = await self._get_conn_tracer() + cursor = await conn.cursor() + cursor.__wrapped__ = mock.AsyncMock() + + await shared_tests._test_dbm_propagation_comment_pin_service_name_override( + config=AIOMYSQL_CONFIG, cursor=cursor, conn=conn, tracer=tracer, wrapped_instance=cursor.__wrapped__ + ) + + @mark_asyncio + @AsyncioTestCase.run_in_subprocess( + env_overrides=dict( + DD_DBM_PROPAGATION_MODE="service", + DD_SERVICE="orders-app", + DD_ENV="staging", + DD_VERSION="v7343437-d7ac743", + DD_TRACE_PEER_SERVICE_DEFAULTS_ENABLED="True", + ) + ) + async def test_aiomysql_dbm_propagation_comment_peer_service_enabled(self): + """tests if dbm comment is set in mysql""" + conn, tracer = await self._get_conn_tracer() + cursor = await conn.cursor() + cursor.__wrapped__ = mock.AsyncMock() + + await shared_tests._test_dbm_propagation_comment_peer_service_enabled( + config=AIOMYSQL_CONFIG, cursor=cursor, wrapped_instance=cursor.__wrapped__ + ) diff --git a/tests/contrib/asyncpg/test_asyncpg.py b/tests/contrib/asyncpg/test_asyncpg.py index 1e6af1b84bd..5cec27dbd47 100644 --- a/tests/contrib/asyncpg/test_asyncpg.py +++ b/tests/contrib/asyncpg/test_asyncpg.py @@ -2,6 +2,7 @@ from typing import Generator # noqa:F401 import asyncpg +import mock import pytest from ddtrace import Pin @@ -9,6 +10,8 @@ from ddtrace.contrib.asyncpg import patch from ddtrace.contrib.asyncpg import unpatch from ddtrace.contrib.trace_utils import iswrapped +from tests.contrib.asyncio.utils import AsyncioTestCase +from tests.contrib.asyncio.utils import mark_asyncio from tests.contrib.config import POSTGRES_CONFIG from tests.utils import flaky @@ -325,3 +328,202 @@ def test_patch_unpatch_asyncpg(): assert not iswrapped(asyncpg.protocol.Protocol.bind_execute) assert not iswrapped(asyncpg.protocol.Protocol.query) assert not iswrapped(asyncpg.protocol.Protocol.bind_execute_many) + + +class AsyncPgTestCase(AsyncioTestCase): + # default service + TEST_SERVICE = "mysql" + conn = None + + async def _get_conn_tracer(self): + if not self.conn: + self.conn = await asyncpg.connect( + host=POSTGRES_CONFIG["host"], + port=POSTGRES_CONFIG["port"], + user=POSTGRES_CONFIG["user"], + database=POSTGRES_CONFIG["dbname"], + password=POSTGRES_CONFIG["password"], + ) + assert not self.conn.is_closed() + # Ensure that the default pin is there, with its default value + pin = Pin.get_from(self.conn) + assert pin + # Customize the service + # we have to apply it on the existing one since new one won't inherit `app` + pin.clone(tracer=self.tracer).onto(self.conn) + + return self.conn, self.tracer + + def setUp(self): + super().setUp() + self.conn = None + patch() + + async def tearDown(self): + super().tearDown() + if self.conn and not self.conn.is_closed(): + await self.conn.close() + + unpatch() + + @mark_asyncio + @AsyncioTestCase.run_in_subprocess(env_overrides=dict(DD_DBM_PROPAGATION_MODE="full")) + async def test_asyncpg_dbm_propagation_enabled(self): + conn, tracer = await self._get_conn_tracer() + + await conn.execute("SELECT 1") + spans = tracer.get_spans() + assert len(spans) == 1 + span = spans[0] + assert span.name == "postgres.query" + + assert span.get_tag("_dd.dbm_trace_injected") == "true" + + @mark_asyncio + @AsyncioTestCase.run_in_subprocess( + env_overrides=dict( + DD_DBM_PROPAGATION_MODE="service", + DD_SERVICE="orders-app", + DD_ENV="staging", + DD_VERSION="v7343437-d7ac743", + ) + ) + async def test_asyncpg_dbm_propagation_comment_with_global_service_name_configured(self): + """tests if dbm comment is set in postgres""" + db_name = POSTGRES_CONFIG["dbname"] + conn, tracer = await self._get_conn_tracer() + + def mock_func(args, kwargs, sql_pos, sql_kw, sql_with_dbm_tags): + return args, kwargs + + with mock.patch( + "ddtrace.propagation._database_monitoring.set_argument_value", side_effect=mock_func + ) as patched: + # test string queries + create_table_query = """ + CREATE TABLE IF NOT EXISTS my_table( + my_column text PRIMARY KEY + ) + """ + + await conn.execute(create_table_query) + dbm_comment = ( + f"/*dddb='{db_name}',dddbs='postgres',dde='staging',ddh='127.0.0.1',ddps='orders-app'," + "ddpv='v7343437-d7ac743'*/ " + ) + assert ( + patched.call_args_list[0][0][4] == dbm_comment + create_table_query + ), f"Expected: {dbm_comment + create_table_query},\nActual: {patched.call_args_list[0][0][4]}" + + @mark_asyncio + @AsyncioTestCase.run_in_subprocess( + env_overrides=dict( + DD_DBM_PROPAGATION_MODE="service", + DD_SERVICE="orders-app", + DD_ENV="staging", + DD_VERSION="v7343437-d7ac743", + DD_ASYNCPG_SERVICE="service-name-override", + ) + ) + async def test_asyncpg_dbm_propagation_comment_integration_service_name_override(self): + """tests if dbm comment is set in postgres""" + db_name = POSTGRES_CONFIG["dbname"] + conn, tracer = await self._get_conn_tracer() + + def mock_func(args, kwargs, sql_pos, sql_kw, sql_with_dbm_tags): + return args, kwargs + + with mock.patch( + "ddtrace.propagation._database_monitoring.set_argument_value", side_effect=mock_func + ) as patched: + # test string queries + create_table_query = """ + CREATE TABLE IF NOT EXISTS my_table( + my_column text PRIMARY KEY + ) + """ + + await conn.execute(create_table_query) + dbm_comment = ( + f"/*dddb='{db_name}',dddbs='service-name-override',dde='staging',ddh='127.0.0.1',ddps='orders-app'," + "ddpv='v7343437-d7ac743'*/ " + ) + assert ( + patched.call_args_list[0][0][4] == dbm_comment + create_table_query + ), f"Expected: {dbm_comment + create_table_query},\nActual: {patched.call_args_list[0][0][4]}" + + @mark_asyncio + @AsyncioTestCase.run_in_subprocess( + env_overrides=dict( + DD_DBM_PROPAGATION_MODE="service", + DD_SERVICE="orders-app", + DD_ENV="staging", + DD_VERSION="v7343437-d7ac743", + DD_ASYNCPG_SERVICE="service-name-override", + ) + ) + async def test_asyncpg_dbm_propagation_comment_pin_service_name_override(self): + """tests if dbm comment is set in postgres""" + db_name = POSTGRES_CONFIG["dbname"] + conn, tracer = await self._get_conn_tracer() + + Pin.override(conn, service="pin-service-name-override", tracer=tracer) + + def mock_func(args, kwargs, sql_pos, sql_kw, sql_with_dbm_tags): + return args, kwargs + + with mock.patch( + "ddtrace.propagation._database_monitoring.set_argument_value", side_effect=mock_func + ) as patched: + # test string queries + create_table_query = """ + CREATE TABLE IF NOT EXISTS my_table( + my_column text PRIMARY KEY + ) + """ + + await conn.execute(create_table_query) + dbm_comment = ( + f"/*dddb='{db_name}',dddbs='pin-service-name-override',dde='staging',ddh='127.0.0.1',ddps='orders-app'," + "ddpv='v7343437-d7ac743'*/ " + ) + assert ( + patched.call_args_list[0][0][4] == dbm_comment + create_table_query + ), f"Expected: {dbm_comment + create_table_query},\nActual: {patched.call_args_list[0][0][4]}" + + @mark_asyncio + @AsyncioTestCase.run_in_subprocess( + env_overrides=dict( + DD_DBM_PROPAGATION_MODE="service", + DD_SERVICE="orders-app", + DD_ENV="staging", + DD_VERSION="v7343437-d7ac743", + DD_TRACE_PEER_SERVICE_DEFAULTS_ENABLED="True", + ) + ) + async def test_asyncpg_dbm_propagation_comment_peer_service_enabled(self): + """tests if dbm comment is set in postgres""" + db_name = POSTGRES_CONFIG["dbname"] + conn, tracer = await self._get_conn_tracer() + + def mock_func(args, kwargs, sql_pos, sql_kw, sql_with_dbm_tags): + return args, kwargs + + with mock.patch( + "ddtrace.propagation._database_monitoring.set_argument_value", side_effect=mock_func + ) as patched: + # test string queries + create_table_query = """ + CREATE TABLE IF NOT EXISTS my_table( + my_column text PRIMARY KEY + ) + """ + + await conn.execute(create_table_query) + dbm_comment = ( + f"/*dddb='{db_name}',dddbs='{db_name}',dde='staging',ddh='127.0.0.1',ddps='orders-app'," + "ddpv='v7343437-d7ac743'*/ " + ) + assert ( + patched.call_args_list[0][0][4] == dbm_comment + create_table_query + ), f"Expected: {dbm_comment + create_table_query},\nActual: {patched.call_args_list[0][0][4]}" diff --git a/tests/contrib/mysql/test_mysql.py b/tests/contrib/mysql/test_mysql.py index c7c89d566c9..353641661ba 100644 --- a/tests/contrib/mysql/test_mysql.py +++ b/tests/contrib/mysql/test_mysql.py @@ -1,9 +1,11 @@ +import mock import mysql from ddtrace import Pin from ddtrace.constants import ANALYTICS_SAMPLE_RATE_KEY from ddtrace.contrib.mysql.patch import patch from ddtrace.contrib.mysql.patch import unpatch +from tests.contrib import shared_tests from tests.contrib.config import MYSQL_CONFIG from tests.opentracer.utils import init_tracer from tests.utils import TracerTestCase @@ -11,6 +13,9 @@ from tests.utils import assert_is_measured +MYSQL_CONFIG["db"] = MYSQL_CONFIG["database"] + + class MySQLCore(object): """Base test case for MySQL drivers""" @@ -23,7 +28,9 @@ def tearDown(self): if self.conn: try: self.conn.ping() - except mysql.InterfaceError: + except mysql.connector.errors.InternalError: + pass + except mysql.connector.errors.InterfaceError: pass else: self.conn.close() @@ -552,3 +559,85 @@ def test_operation_name_v1_schema(self): spans = tracer.pop() assert spans[0].name == "mysql.query" + + @TracerTestCase.run_in_subprocess(env_overrides=dict(DD_DBM_PROPAGATION_MODE="full")) + def test_mysql_dbm_propagation_enabled(self): + conn, tracer = self._get_conn_tracer() + cursor = conn.cursor() + + shared_tests._test_dbm_propagation_enabled(tracer, cursor, "mysql") + + @TracerTestCase.run_in_subprocess( + env_overrides=dict( + DD_DBM_PROPAGATION_MODE="service", + DD_SERVICE="orders-app", + DD_ENV="staging", + DD_VERSION="v7343437-d7ac743", + ) + ) + def test_mysql_dbm_propagation_comment_with_global_service_name_configured(self): + """tests if dbm comment is set in mysql""" + conn, tracer = self._get_conn_tracer() + cursor = conn.cursor() + cursor.__wrapped__ = mock.Mock() + + shared_tests._test_dbm_propagation_comment_with_global_service_name_configured( + config=MYSQL_CONFIG, db_system="mysql", cursor=cursor, wrapped_instance=cursor.__wrapped__ + ) + + @TracerTestCase.run_in_subprocess( + env_overrides=dict( + DD_DBM_PROPAGATION_MODE="service", + DD_SERVICE="orders-app", + DD_ENV="staging", + DD_VERSION="v7343437-d7ac743", + DD_AIOMYSQL_SERVICE="service-name-override", + ) + ) + def test_mysql_dbm_propagation_comment_integration_service_name_override(self): + """tests if dbm comment is set in mysql""" + conn, tracer = self._get_conn_tracer() + cursor = conn.cursor() + cursor.__wrapped__ = mock.Mock() + + shared_tests._test_dbm_propagation_comment_integration_service_name_override( + config=MYSQL_CONFIG, cursor=cursor, wrapped_instance=cursor.__wrapped__ + ) + + @TracerTestCase.run_in_subprocess( + env_overrides=dict( + DD_DBM_PROPAGATION_MODE="service", + DD_SERVICE="orders-app", + DD_ENV="staging", + DD_VERSION="v7343437-d7ac743", + DD_AIOMYSQL_SERVICE="service-name-override", + ) + ) + def test_mysql_dbm_propagation_comment_pin_service_name_override(self): + """tests if dbm comment is set in mysql""" + conn, tracer = self._get_conn_tracer() + cursor = conn.cursor() + cursor.__wrapped__ = mock.Mock() + + shared_tests._test_dbm_propagation_comment_pin_service_name_override( + config=MYSQL_CONFIG, cursor=cursor, conn=conn, tracer=tracer, wrapped_instance=cursor.__wrapped__ + ) + + @TracerTestCase.run_in_subprocess( + env_overrides=dict( + DD_DBM_PROPAGATION_MODE="service", + DD_SERVICE="orders-app", + DD_ENV="staging", + DD_VERSION="v7343437-d7ac743", + DD_TRACE_PEER_SERVICE_DEFAULTS_ENABLED="True", + ) + ) + def test_mysql_dbm_propagation_comment_peer_service_enabled(self): + """tests if dbm comment is set in mysql""" + conn, tracer = self._get_conn_tracer() + cursor = conn.cursor() + cursor.__wrapped__ = mock.Mock() + + shared_tests._test_dbm_propagation_comment_peer_service_enabled( + config=MYSQL_CONFIG, cursor=cursor, wrapped_instance=cursor.__wrapped__ + ) diff --git a/tests/contrib/mysqldb/test_mysqldb.py b/tests/contrib/mysqldb/test_mysqldb.py index cdf0fe7421c..92aaff81708 100644 --- a/tests/contrib/mysqldb/test_mysqldb.py +++ b/tests/contrib/mysqldb/test_mysqldb.py @@ -1,3 +1,4 @@ +import mock import MySQLdb import pytest @@ -6,6 +7,7 @@ from ddtrace.contrib.mysqldb.patch import patch from ddtrace.contrib.mysqldb.patch import unpatch from ddtrace.internal.schema import DEFAULT_SPAN_SERVICE_NAME +from tests.contrib import shared_tests from tests.opentracer.utils import init_tracer from tests.utils import TracerTestCase from tests.utils import assert_dict_issuperset @@ -14,6 +16,9 @@ from ..config import MYSQL_CONFIG +MYSQL_CONFIG["db"] = MYSQL_CONFIG["database"] + + class MySQLCore(object): """Base test case for MySQL drivers""" @@ -759,3 +764,85 @@ def test_span_name_schema_v1(self): span = spans[0] assert span.name == "mysql.query" + + @TracerTestCase.run_in_subprocess(env_overrides=dict(DD_DBM_PROPAGATION_MODE="full")) + def test_mysql_dbm_propagation_enabled(self): + conn, tracer = self._get_conn_tracer() + cursor = conn.cursor() + + shared_tests._test_dbm_propagation_enabled(tracer, cursor, "mysql") + + @TracerTestCase.run_in_subprocess( + env_overrides=dict( + DD_DBM_PROPAGATION_MODE="service", + DD_SERVICE="orders-app", + DD_ENV="staging", + DD_VERSION="v7343437-d7ac743", + ) + ) + def test_mysql_dbm_propagation_comment_with_global_service_name_configured(self): + """tests if dbm comment is set in mysql""" + conn, tracer = self._get_conn_tracer() + cursor = conn.cursor() + cursor.__wrapped__ = mock.Mock() + + shared_tests._test_dbm_propagation_comment_with_global_service_name_configured( + config=MYSQL_CONFIG, db_system="mysql", cursor=cursor, wrapped_instance=cursor.__wrapped__ + ) + + @TracerTestCase.run_in_subprocess( + env_overrides=dict( + DD_DBM_PROPAGATION_MODE="service", + DD_SERVICE="orders-app", + DD_ENV="staging", + DD_VERSION="v7343437-d7ac743", + DD_AIOMYSQL_SERVICE="service-name-override", + ) + ) + def test_mysql_dbm_propagation_comment_integration_service_name_override(self): + """tests if dbm comment is set in mysql""" + conn, tracer = self._get_conn_tracer() + cursor = conn.cursor() + cursor.__wrapped__ = mock.Mock() + + shared_tests._test_dbm_propagation_comment_integration_service_name_override( + config=MYSQL_CONFIG, cursor=cursor, wrapped_instance=cursor.__wrapped__ + ) + + @TracerTestCase.run_in_subprocess( + env_overrides=dict( + DD_DBM_PROPAGATION_MODE="service", + DD_SERVICE="orders-app", + DD_ENV="staging", + DD_VERSION="v7343437-d7ac743", + DD_AIOMYSQL_SERVICE="service-name-override", + ) + ) + def test_mysql_dbm_propagation_comment_pin_service_name_override(self): + """tests if dbm comment is set in mysql""" + conn, tracer = self._get_conn_tracer() + cursor = conn.cursor() + cursor.__wrapped__ = mock.Mock() + + shared_tests._test_dbm_propagation_comment_pin_service_name_override( + config=MYSQL_CONFIG, cursor=cursor, conn=conn, tracer=tracer, wrapped_instance=cursor.__wrapped__ + ) + + @TracerTestCase.run_in_subprocess( + env_overrides=dict( + DD_DBM_PROPAGATION_MODE="service", + DD_SERVICE="orders-app", + DD_ENV="staging", + DD_VERSION="v7343437-d7ac743", + DD_TRACE_PEER_SERVICE_DEFAULTS_ENABLED="True", + ) + ) + def test_mysql_dbm_propagation_comment_peer_service_enabled(self): + """tests if dbm comment is set in mysql""" + conn, tracer = self._get_conn_tracer() + cursor = conn.cursor() + cursor.__wrapped__ = mock.Mock() + + shared_tests._test_dbm_propagation_comment_peer_service_enabled( + config=MYSQL_CONFIG, cursor=cursor, wrapped_instance=cursor.__wrapped__ + ) diff --git a/tests/contrib/pymysql/test_pymysql.py b/tests/contrib/pymysql/test_pymysql.py index 0f4e8ac7b38..a4df306cd0e 100644 --- a/tests/contrib/pymysql/test_pymysql.py +++ b/tests/contrib/pymysql/test_pymysql.py @@ -1,3 +1,4 @@ +import mock import pymysql from ddtrace import Pin @@ -5,6 +6,7 @@ from ddtrace.contrib.pymysql.patch import patch from ddtrace.contrib.pymysql.patch import unpatch from ddtrace.internal.schema import DEFAULT_SPAN_SERVICE_NAME +from tests.contrib import shared_tests from tests.opentracer.utils import init_tracer from tests.utils import TracerTestCase from tests.utils import assert_dict_issuperset @@ -13,6 +15,9 @@ from ...contrib.config import MYSQL_CONFIG +MYSQL_CONFIG["db"] = MYSQL_CONFIG["database"] + + class PyMySQLCore(object): """PyMySQL test case reuses the connection across tests""" @@ -24,8 +29,8 @@ class PyMySQLCore(object): } DB_INFO.update( { - "db.user": str(bytes(MYSQL_CONFIG.get("user"), encoding="utf-8")), - "db.name": str(bytes(MYSQL_CONFIG.get("database"), encoding="utf-8")), + "db.user": str(MYSQL_CONFIG.get("user")), + "db.name": str(MYSQL_CONFIG.get("database")), } ) @@ -529,3 +534,85 @@ def test_span_name_v1_schema(self): assert len(spans) == 1 span = spans[0] assert span.name == "mysql.query" + + @TracerTestCase.run_in_subprocess(env_overrides=dict(DD_DBM_PROPAGATION_MODE="full")) + def test_pymysql_dbm_propagation_enabled(self): + conn, tracer = self._get_conn_tracer() + cursor = conn.cursor() + + shared_tests._test_dbm_propagation_enabled(tracer, cursor, "mysql") + + @TracerTestCase.run_in_subprocess( + env_overrides=dict( + DD_DBM_PROPAGATION_MODE="service", + DD_SERVICE="orders-app", + DD_ENV="staging", + DD_VERSION="v7343437-d7ac743", + ) + ) + def test_pymysql_dbm_propagation_comment_with_global_service_name_configured(self): + """tests if dbm comment is set in mysql""" + conn, tracer = self._get_conn_tracer() + cursor = conn.cursor() + cursor.__wrapped__ = mock.Mock() + + shared_tests._test_dbm_propagation_comment_with_global_service_name_configured( + config=MYSQL_CONFIG, db_system="mysql", cursor=cursor, wrapped_instance=cursor.__wrapped__ + ) + + @TracerTestCase.run_in_subprocess( + env_overrides=dict( + DD_DBM_PROPAGATION_MODE="service", + DD_SERVICE="orders-app", + DD_ENV="staging", + DD_VERSION="v7343437-d7ac743", + DD_AIOMYSQL_SERVICE="service-name-override", + ) + ) + def test_pymysql_dbm_propagation_comment_integration_service_name_override(self): + """tests if dbm comment is set in mysql""" + conn, tracer = self._get_conn_tracer() + cursor = conn.cursor() + cursor.__wrapped__ = mock.Mock() + + shared_tests._test_dbm_propagation_comment_integration_service_name_override( + config=MYSQL_CONFIG, cursor=cursor, wrapped_instance=cursor.__wrapped__ + ) + + @TracerTestCase.run_in_subprocess( + env_overrides=dict( + DD_DBM_PROPAGATION_MODE="service", + DD_SERVICE="orders-app", + DD_ENV="staging", + DD_VERSION="v7343437-d7ac743", + DD_AIOMYSQL_SERVICE="service-name-override", + ) + ) + def test_pymysql_dbm_propagation_comment_pin_service_name_override(self): + """tests if dbm comment is set in mysql""" + conn, tracer = self._get_conn_tracer() + cursor = conn.cursor() + cursor.__wrapped__ = mock.Mock() + + shared_tests._test_dbm_propagation_comment_pin_service_name_override( + config=MYSQL_CONFIG, cursor=cursor, conn=conn, tracer=tracer, wrapped_instance=cursor.__wrapped__ + ) + + @TracerTestCase.run_in_subprocess( + env_overrides=dict( + DD_DBM_PROPAGATION_MODE="service", + DD_SERVICE="orders-app", + DD_ENV="staging", + DD_VERSION="v7343437-d7ac743", + DD_TRACE_PEER_SERVICE_DEFAULTS_ENABLED="True", + ) + ) + def test_pymysql_dbm_propagation_comment_peer_service_enabled(self): + """tests if dbm comment is set in mysql""" + conn, tracer = self._get_conn_tracer() + cursor = conn.cursor() + cursor.__wrapped__ = mock.Mock() + + shared_tests._test_dbm_propagation_comment_peer_service_enabled( + config=MYSQL_CONFIG, cursor=cursor, wrapped_instance=cursor.__wrapped__ + ) diff --git a/tests/contrib/shared_tests.py b/tests/contrib/shared_tests.py new file mode 100644 index 00000000000..2ccb319551f --- /dev/null +++ b/tests/contrib/shared_tests.py @@ -0,0 +1,96 @@ +from ddtrace import Pin + + +# DBM Shared Tests +async def _test_execute(dbm_comment, cursor, wrapped_instance): + # test string queries + await cursor.execute("select 'blah'") + wrapped_instance.execute.assert_called_once_with(dbm_comment + "select 'blah'") + wrapped_instance.reset_mock() + + # test byte string queries + await cursor.execute(b"select 'blah'") + wrapped_instance.execute.assert_called_once_with(dbm_comment.encode() + b"select 'blah'") + wrapped_instance.reset_mock() + + +async def _test_execute_many(dbm_comment, cursor, wrapped_instance): + # test string queries + await cursor.executemany("select %s", (("foo",), ("bar",))) + wrapped_instance.executemany.assert_called_once_with(dbm_comment + "select %s", (("foo",), ("bar",))) + wrapped_instance.reset_mock() + + # test byte string queries + await cursor.executemany(b"select %s", ((b"foo",), (b"bar",))) + wrapped_instance.executemany.assert_called_once_with(dbm_comment.encode() + b"select %s", ((b"foo",), (b"bar",))) + wrapped_instance.reset_mock() + + +async def _test_dbm_propagation_enabled(tracer, cursor, service): + await cursor.execute("SELECT 1") + spans = tracer.pop() + assert len(spans) == 1 + span = spans[0] + assert span.name == f"{service}.query" + + assert span.get_tag("_dd.dbm_trace_injected") == "true" + + +async def _test_dbm_propagation_comment_with_global_service_name_configured( + config, db_system, cursor, wrapped_instance, execute_many=True +): + """tests if dbm comment is set in given db system""" + db_name = config["db"] + + dbm_comment = ( + f"/*dddb='{db_name}',dddbs='{db_system}',dde='staging',ddh='127.0.0.1',ddps='orders-app'," + "ddpv='v7343437-d7ac743'*/ " + ) + await _test_execute(dbm_comment, cursor, wrapped_instance) + if execute_many: + await _test_execute_many(dbm_comment, cursor, wrapped_instance) + + +async def _test_dbm_propagation_comment_integration_service_name_override( + config, cursor, wrapped_instance, execute_many=True +): + """tests if dbm comment is set in mysql""" + db_name = config["db"] + + dbm_comment = ( + f"/*dddb='{db_name}',dddbs='service-name-override',dde='staging',ddh='127.0.0.1',ddps='orders-app'," + "ddpv='v7343437-d7ac743'*/ " + ) + await _test_execute(dbm_comment, cursor, wrapped_instance) + if execute_many: + await _test_execute_many(dbm_comment, cursor, wrapped_instance) + + +async def _test_dbm_propagation_comment_pin_service_name_override( + config, cursor, conn, tracer, wrapped_instance, execute_many=True +): + """tests if dbm comment is set in mysql""" + db_name = config["db"] + + Pin.override(conn, service="pin-service-name-override", tracer=tracer) + Pin.override(cursor, service="pin-service-name-override", tracer=tracer) + + dbm_comment = ( + f"/*dddb='{db_name}',dddbs='pin-service-name-override',dde='staging',ddh='127.0.0.1',ddps='orders-app'," + "ddpv='v7343437-d7ac743'*/ " + ) + await _test_execute(dbm_comment, cursor, wrapped_instance) + if execute_many: + await _test_execute_many(dbm_comment, cursor, wrapped_instance) + + +async def _test_dbm_propagation_comment_peer_service_enabled(config, cursor, wrapped_instance, execute_many=True): + """tests if dbm comment is set in mysql""" + db_name = config["db"] + + dbm_comment = ( + f"/*dddb='{db_name}',dddbs='test',dde='staging',ddh='127.0.0.1',ddps='orders-app'," "ddpv='v7343437-d7ac743'*/ " + ) + await _test_execute(dbm_comment, cursor, wrapped_instance) + if execute_many: + await _test_execute_many(dbm_comment, cursor, wrapped_instance)