From b84c5e62dba9f321bd36b71f5a1d6fc9c950ef34 Mon Sep 17 00:00:00 2001 From: Zhengda Lu Date: Thu, 19 Oct 2023 18:10:20 -0400 Subject: [PATCH] [DBMON-3030] report postgres check query operation time metrics (#16040) * report postgres check query perf metrics * fix unit tests * update changelog * fix lint * fix test for postgres 9 * use debug_stats_kwargs to create internal metrics tags * report query perf metrics under same metric * add shared util to track query operation time * conditionally track query operational time * track dynamic queries operation time for postgres * remove track_operation_time from context manager --- datadog_checks_base/CHANGELOG.md | 4 ++ .../datadog_checks/base/utils/db/core.py | 10 ++++- .../datadog_checks/base/utils/db/utils.py | 31 +++++++++++++ .../tests/base/utils/db/test_util.py | 14 ++++++ postgres/CHANGELOG.md | 1 + .../datadog_checks/postgres/metrics_cache.py | 4 ++ postgres/datadog_checks/postgres/postgres.py | 43 +++++++++++++------ .../postgres/relationsmanager.py | 7 +++ postgres/datadog_checks/postgres/util.py | 6 +++ postgres/tests/common.py | 25 +++++++++++ postgres/tests/conftest.py | 2 +- postgres/tests/test_pg_integration.py | 4 ++ postgres/tests/test_pg_replication.py | 3 ++ postgres/tests/test_unit.py | 19 ++++++-- 14 files changed, 154 insertions(+), 19 deletions(-) diff --git a/datadog_checks_base/CHANGELOG.md b/datadog_checks_base/CHANGELOG.md index d3ea0d6036f3f..054a28030869b 100644 --- a/datadog_checks_base/CHANGELOG.md +++ b/datadog_checks_base/CHANGELOG.md @@ -2,6 +2,10 @@ ## Unreleased +***Added***: + +* Add util to track db query operation time ([#16040](https://github.com/DataDog/integrations-core/pull/16040)) + ***Fixed***: * Bump the `pyodbc` version to 4.0.39 ([#16021](https://github.com/DataDog/integrations-core/pull/16021)) diff --git a/datadog_checks_base/datadog_checks/base/utils/db/core.py b/datadog_checks_base/datadog_checks/base/utils/db/core.py index e7e007b46ea4b..5112982ddf32b 100644 --- a/datadog_checks_base/datadog_checks/base/utils/db/core.py +++ b/datadog_checks_base/datadog_checks/base/utils/db/core.py @@ -12,7 +12,7 @@ from ..containers import iter_unique from .query import Query from .transform import COLUMN_TRANSFORMERS, EXTRA_TRANSFORMERS -from .utils import SUBMISSION_METHODS, create_submission_transformer +from .utils import SUBMISSION_METHODS, create_submission_transformer, tracked_query class QueryExecutor(object): @@ -31,6 +31,7 @@ def __init__( error_handler=None, # type: Callable[[str], str] hostname=None, # type: str logger=None, + track_operation_time=False, # type: bool ): # type: (...) -> QueryExecutor self.executor = executor # type: QueriesExecutor self.submitter = submitter # type: QueriesSubmitter @@ -45,6 +46,7 @@ def __init__( self.queries = [Query(payload) for payload in queries or []] # type: List[Query] self.hostname = hostname # type: str self.logger = logger or logging.getLogger(__name__) + self.track_operation_time = track_operation_time def compile_queries(self): """This method compiles every `Query` object.""" @@ -72,7 +74,11 @@ def execute(self, extra_tags=None): query_tags = query.base_tags try: - rows = self.execute_query(query.query) + if self.track_operation_time: + with tracked_query(check=self.submitter, operation=query_name): + rows = self.execute_query(query.query) + else: + rows = self.execute_query(query.query) except Exception as e: if self.error_handler: self.logger.error('Error querying %s: %s', query_name, self.error_handler(str(e))) diff --git a/datadog_checks_base/datadog_checks/base/utils/db/utils.py b/datadog_checks_base/datadog_checks/base/utils/db/utils.py index 8ab9184ef6086..c50226af8e19a 100644 --- a/datadog_checks_base/datadog_checks/base/utils/db/utils.py +++ b/datadog_checks_base/datadog_checks/base/utils/db/utils.py @@ -1,6 +1,7 @@ # (C) Datadog, Inc. 2019-present # All rights reserved # Licensed under a 3-clause BSD style license (see LICENSE) +import contextlib import datetime import decimal import functools @@ -351,3 +352,33 @@ def _run_job_traced(self): def run_job(self): raise NotImplementedError() + + +@contextlib.contextmanager +def tracked_query(check, operation, tags=None): + """ + A simple context manager that tracks the time spent in a given query operation + + The intention is to use this for context manager is to wrap the execution of a query, + that way the time spent waiting for query execution can be tracked as a metric. For example, + ''' + with tracked_query(check, "my_metric_query", tags): + cursor.execute(query) + ''' + + if debug_stats_kwargs is defined on the check instance, + it will be called to set additional kwargs when submitting the metric. + + :param check: The check instance + :param operation: The name of the query operation being performed. + :param tags: A list of tags to apply to the metric. + """ + start_time = time.time() + stats_kwargs = {} + if hasattr(check, 'debug_stats_kwargs'): + stats_kwargs = dict(check.debug_stats_kwargs()) + stats_kwargs['tags'] = stats_kwargs.get('tags', []) + ["operation:{}".format(operation)] + (tags or []) + stats_kwargs['raw'] = True # always submit as raw to ignore any defined namespace prefix + yield + elapsed_ms = (time.time() - start_time) * 1000 + check.histogram("dd.{}.operation.time".format(check.name), elapsed_ms, **stats_kwargs) diff --git a/datadog_checks_base/tests/base/utils/db/test_util.py b/datadog_checks_base/tests/base/utils/db/test_util.py index 2225c9c6c2eaf..f23a98184a4b5 100644 --- a/datadog_checks_base/tests/base/utils/db/test_util.py +++ b/datadog_checks_base/tests/base/utils/db/test_util.py @@ -20,6 +20,7 @@ default_json_event_encoding, obfuscate_sql_with_metadata, resolve_db_host, + tracked_query, ) from datadog_checks.base.utils.serialization import json @@ -276,3 +277,16 @@ def test_dbm_async_job_inactive_stop(aggregator): def test_default_json_event_encoding(input): # assert that the default json event encoding can handle all defined types without raising TypeError assert json.dumps(input, default=default_json_event_encoding) + + +def test_tracked_query(aggregator): + with mock.patch('time.time', side_effect=[100, 101]): + with tracked_query( + check=AgentCheck(name="testcheck"), + operation="test_query", + tags=["test:tag"], + ): + pass + aggregator.assert_metric( + "dd.testcheck.operation.time", tags=["test:tag", "operation:test_query"], count=1, value=1000.0 + ) diff --git a/postgres/CHANGELOG.md b/postgres/CHANGELOG.md index d165d959aeca8..4784a26ca76ee 100644 --- a/postgres/CHANGELOG.md +++ b/postgres/CHANGELOG.md @@ -6,6 +6,7 @@ * Upgrade `psycopg2-binary` to `v2.9.8` ([#15949](https://github.com/DataDog/integrations-core/pull/15949)) * Add support for reporting SQL obfuscation errors ([#15990](https://github.com/DataDog/integrations-core/pull/15990)) +* Emit postgres metrics queries operation time ([#16040](https://github.com/DataDog/integrations-core/pull/16040)) ***Fixed***: diff --git a/postgres/datadog_checks/postgres/metrics_cache.py b/postgres/datadog_checks/postgres/metrics_cache.py index 617a79b47f85e..68dc6689eb45e 100644 --- a/postgres/datadog_checks/postgres/metrics_cache.py +++ b/postgres/datadog_checks/postgres/metrics_cache.py @@ -95,6 +95,7 @@ def get_instance_metrics(self, version): "FROM pg_stat_database psd " "JOIN pg_database pd ON psd.datname = pd.datname", 'relation': False, + 'name': 'instance_metrics', } res["query"] += " WHERE " + " AND ".join( @@ -128,6 +129,7 @@ def get_bgw_metrics(self, version): 'metrics': self.bgw_metrics, 'query': "select {metrics_columns} FROM pg_stat_bgwriter", 'relation': False, + 'name': 'bgw_metrics', } def get_count_metrics(self): @@ -158,6 +160,7 @@ def get_archiver_metrics(self, version): 'metrics': self.archiver_metrics, 'query': "select {metrics_columns} FROM pg_stat_archiver", 'relation': False, + 'name': 'archiver_metrics', } def get_replication_metrics(self, version, is_aurora): @@ -238,4 +241,5 @@ def get_activity_metrics(self, version): 'metrics': metrics, 'query': query, 'relation': False, + 'name': 'activity_metrics', } diff --git a/postgres/datadog_checks/postgres/postgres.py b/postgres/datadog_checks/postgres/postgres.py index b4a89fdcd8226..212c795463a2d 100644 --- a/postgres/datadog_checks/postgres/postgres.py +++ b/postgres/datadog_checks/postgres/postgres.py @@ -14,6 +14,7 @@ from datadog_checks.base.utils.db import QueryExecutor from datadog_checks.base.utils.db.utils import ( default_json_event_encoding, + tracked_query, ) from datadog_checks.base.utils.db.utils import resolve_db_host as agent_host_resolver from datadog_checks.base.utils.serialization import json @@ -178,6 +179,7 @@ def _new_query_executor(self, queries): queries=queries, tags=self.tags_without_db, hostname=self.resolved_hostname, + track_operation_time=True, ) def execute_query_raw(self, query): @@ -417,16 +419,17 @@ def _run_query_scope(self, cursor, scope, is_custom_metrics, cols, descriptors): is_relations = scope.get('relation') and self._relations_manager.has_relations try: query = fmt.format(scope['query'], metrics_columns=", ".join(cols)) - # if this is a relation-specific query, we need to list all relations last - if is_relations: - schema_field = get_schema_field(descriptors) - formatted_query = self._relations_manager.filter_relation_query(query, schema_field) - cursor.execute(formatted_query) - else: - self.log.debug("Running query: %s", str(query)) - cursor.execute(query.replace(r'%', r'%%')) - - results = cursor.fetchall() + with tracked_query(check=self, operation='custom_metrics' if is_custom_metrics else scope['name']): + # if this is a relation-specific query, we need to list all relations last + if is_relations: + schema_field = get_schema_field(descriptors) + formatted_query = self._relations_manager.filter_relation_query(query, schema_field) + cursor.execute(formatted_query) + else: + self.log.debug("Running query: %s", str(query)) + cursor.execute(query.replace(r'%', r'%%')) + + results = cursor.fetchall() except psycopg2.errors.FeatureNotSupported as e: # This happens for example when trying to get replication metrics from readers in Aurora. Let's ignore it. log_func(e) @@ -647,9 +650,13 @@ def _collect_stats(self, instance_tags): with conn.cursor() as cursor: self._query_scope(cursor, activity_metrics, instance_tags, False) - for scope in list(metric_scope) + self._config.custom_metrics: + for scope in list(metric_scope): with conn.cursor() as cursor: - self._query_scope(cursor, scope, instance_tags, scope in self._config.custom_metrics) + self._query_scope(cursor, scope, instance_tags, False) + + for scope in self._config.custom_metrics: + with conn.cursor() as cursor: + self._query_scope(cursor, scope, instance_tags, True) if self.dynamic_queries: self.dynamic_queries.execute() @@ -779,7 +786,10 @@ def _collect_custom_queries(self, tags): with conn.cursor() as cursor: try: self.log.debug("Running query: %s", query) - cursor.execute(query) + with tracked_query( + check=self, operation='custom_queries', tags=['metric_prefix:{}'.format(metric_prefix)] + ): + cursor.execute(query) except (psycopg2.ProgrammingError, psycopg2.errors.QueryCanceled) as e: self.log.error("Error executing query for metric_prefix %s: %s", metric_prefix, str(e)) continue @@ -884,6 +894,13 @@ def _send_database_instance_metadata(self): self._database_instance_emitted[self.resolved_hostname] = event self.database_monitoring_metadata(json.dumps(event, default=default_json_event_encoding)) + def debug_stats_kwargs(self, tags=None): + tags = self.tags + self._get_debug_tags() + (tags or []) + return { + 'tags': tags, + "hostname": self.resolved_hostname, + } + def check(self, _): tags = copy.copy(self.tags) # Collect metrics diff --git a/postgres/datadog_checks/postgres/relationsmanager.py b/postgres/datadog_checks/postgres/relationsmanager.py index 64f902e93a14c..cbb29e8c0908c 100644 --- a/postgres/datadog_checks/postgres/relationsmanager.py +++ b/postgres/datadog_checks/postgres/relationsmanager.py @@ -53,6 +53,7 @@ AND pc.relname NOT LIKE 'pg^_%%' ESCAPE '^' GROUP BY pd.datname, pc.relname, pn.nspname, locktype, mode""", 'relation': True, + 'name': 'lock_metrics', } # The pg_stat_all_tables contain one row for each table in the current database, @@ -81,6 +82,7 @@ FROM pg_stat_user_tables WHERE {relations}""", 'relation': True, + 'name': 'rel_metrics', } @@ -103,6 +105,7 @@ FROM pg_stat_user_indexes WHERE {relations}""", 'relation': True, + 'name': 'idx_metrics', } @@ -190,7 +193,9 @@ FROM pg_statio_user_tables WHERE {relations}""", 'relation': True, + 'name': 'statio_metrics', } + # adapted from https://wiki.postgresql.org/wiki/Show_database_bloat and https://github.com/bucardo/check_postgres/ TABLE_BLOAT_QUERY = """ SELECT @@ -241,6 +246,7 @@ }, 'query': TABLE_BLOAT_QUERY, 'relation': True, + 'name': 'table_bloat_metrics', } @@ -296,6 +302,7 @@ }, 'query': INDEX_BLOAT_QUERY, 'relation': True, + 'name': 'index_bloat_metrics', } RELATION_METRICS = [LOCK_METRICS, REL_METRICS, IDX_METRICS, STATIO_METRICS] diff --git a/postgres/datadog_checks/postgres/util.py b/postgres/datadog_checks/postgres/util.py index 2e253421cc6bf..cc5195cebe4c9 100644 --- a/postgres/datadog_checks/postgres/util.py +++ b/postgres/datadog_checks/postgres/util.py @@ -209,6 +209,7 @@ def payload_pg_version(version): ) AS subquery GROUP BY schemaname """ ), + 'name': 'count_metrics', } q1 = ( @@ -252,6 +253,7 @@ def payload_pg_version(version): 'query': """ SELECT {metrics_columns} WHERE (SELECT pg_is_in_recovery())""", + 'name': 'replication_metrics', } # Requires postgres 10+ @@ -285,6 +287,7 @@ def payload_pg_version(version): SELECT application_name, state, sync_state, client_addr, {metrics_columns} FROM pg_stat_replication """, + 'name': 'replication_stats_metrics', } @@ -349,6 +352,7 @@ def payload_pg_version(version): SELECT {metrics_columns} FROM pg_stat_database, max_con """, + 'name': 'connections_metrics', } SLRU_METRICS = { @@ -367,6 +371,7 @@ def payload_pg_version(version): SELECT name, {metrics_columns} FROM pg_stat_slru """, + 'name': 'slru_metrics', } SNAPSHOT_TXID_METRICS = { @@ -469,6 +474,7 @@ def payload_pg_version(version): ON o.funcname = s.funcname; """, 'relation': False, + 'name': 'function_metrics', } # The metrics we retrieve from pg_stat_activity when the postgres version >= 9.6 diff --git a/postgres/tests/common.py b/postgres/tests/common.py index ccbe7cce86590..09699ec1abe63 100644 --- a/postgres/tests/common.py +++ b/postgres/tests/common.py @@ -96,6 +96,17 @@ CONNECTION_METRICS_DB = ['postgresql.connections'] COMMON_DBS = ['dogs', 'postgres', 'dogs_nofunc', 'dogs_noschema', DB_NAME] +CHECK_PERFORMANCE_METRICS = [ + 'archiver_metrics', + 'bgw_metrics', + 'connections_metrics', + 'count_metrics', + 'instance_metrics', + 'replication_metrics', + 'replication_stats_metrics', + 'slru_metrics', +] + requires_static_version = pytest.mark.skipif(USING_LATEST, reason='Version `latest` is ever-changing, skipping') @@ -313,3 +324,17 @@ def check_stat_wal_metrics(aggregator, expected_tags, count=1): for metric_name in _iterate_metric_name(STAT_WAL_METRICS): aggregator.assert_metric(metric_name, count=count, tags=expected_tags) + + +def check_performance_metrics(aggregator, expected_tags, count=1, is_aurora=False): + expected_metrics = set(CHECK_PERFORMANCE_METRICS) + if is_aurora: + expected_metrics = expected_metrics - {'replication_metrics'} + if float(POSTGRES_VERSION) < 13.0: + expected_metrics = expected_metrics - {'slru_metrics'} + if float(POSTGRES_VERSION) < 10.0: + expected_metrics = expected_metrics - {'replication_stats_metrics'} + for name in expected_metrics: + aggregator.assert_metric( + 'dd.postgres.operation.time', count=count, tags=expected_tags + ['operation:{}'.format(name)] + ) diff --git a/postgres/tests/conftest.py b/postgres/tests/conftest.py index bfba18101ae41..3218c02221883 100644 --- a/postgres/tests/conftest.py +++ b/postgres/tests/conftest.py @@ -119,7 +119,7 @@ def cursor_execute(query, second_arg=""): data.appendleft(['app1', 'streaming', 'async', '1.1.1.1', 12, 12, 12, 12]) data.appendleft(['app2', 'backup', 'sync', '1.1.1.1', 13, 13, 13, 13]) elif query == 'SHOW SERVER_VERSION;': - data.appendleft(['10.15']) + data.appendleft([POSTGRES_VERSION]) def cursor_fetchall(): while data: diff --git a/postgres/tests/test_pg_integration.py b/postgres/tests/test_pg_integration.py index 38457241ed2c4..71dd58f2a234a 100644 --- a/postgres/tests/test_pg_integration.py +++ b/postgres/tests/test_pg_integration.py @@ -29,6 +29,7 @@ check_db_count, check_file_wal_metrics, check_logical_replication_slots, + check_performance_metrics, check_physical_replication_slots, check_slru_metrics, check_snapshot_txid_metrics, @@ -72,6 +73,9 @@ def test_common_metrics(aggregator, integration_check, pg_instance, is_aurora): check_logical_replication_slots(aggregator, expected_tags) check_physical_replication_slots(aggregator, expected_tags) check_snapshot_txid_metrics(aggregator, expected_tags=expected_tags) + + check_performance_metrics(aggregator, expected_tags=check.debug_stats_kwargs()['tags'], is_aurora=is_aurora) + aggregator.assert_all_metrics_covered() diff --git a/postgres/tests/test_pg_replication.py b/postgres/tests/test_pg_replication.py index 36a740669ca2d..342a82daea46a 100644 --- a/postgres/tests/test_pg_replication.py +++ b/postgres/tests/test_pg_replication.py @@ -16,6 +16,7 @@ check_control_metrics, check_db_count, check_file_wal_metrics, + check_performance_metrics, check_replication_delay, check_slru_metrics, check_snapshot_txid_metrics, @@ -50,6 +51,8 @@ def test_common_replica_metrics(aggregator, integration_check, metrics_cache_rep check_stat_wal_metrics(aggregator, expected_tags=expected_tags) check_file_wal_metrics(aggregator, expected_tags=expected_tags) + check_performance_metrics(aggregator, expected_tags=check.debug_stats_kwargs()['tags']) + aggregator.assert_all_metrics_covered() diff --git a/postgres/tests/test_unit.py b/postgres/tests/test_unit.py index c9ba2313ab888..52f5e3abac3f4 100644 --- a/postgres/tests/test_unit.py +++ b/postgres/tests/test_unit.py @@ -14,7 +14,8 @@ from datadog_checks.postgres import PostgreSql, util -from .common import PORT +from .common import PORT, check_performance_metrics +from .utils import requires_over_10 pytestmark = pytest.mark.unit @@ -240,18 +241,28 @@ def test_resolved_hostname_metadata(check, test_case): m.assert_any_call('test:123', 'resolved_hostname', test_case) +@requires_over_10 @pytest.mark.usefixtures('mock_cursor_for_replica_stats') def test_replication_stats(aggregator, integration_check, pg_instance): check = integration_check(pg_instance) check.check(pg_instance) - base_tags = ['foo:bar', 'port:5432', 'dd.internal.resource:database_instance:{}'.format(check.resolved_hostname)] + base_tags = [ + 'foo:bar', + 'port:5432', + 'dd.internal.resource:database_instance:{}'.format(check.resolved_hostname), + ] app1_tags = base_tags + [ 'wal_sync_state:async', 'wal_state:streaming', 'wal_app_name:app1', 'wal_client_addr:1.1.1.1', ] - app2_tags = base_tags + ['wal_sync_state:sync', 'wal_state:backup', 'wal_app_name:app2', 'wal_client_addr:1.1.1.1'] + app2_tags = base_tags + [ + 'wal_sync_state:sync', + 'wal_state:backup', + 'wal_app_name:app2', + 'wal_client_addr:1.1.1.1', + ] aggregator.assert_metric('postgresql.db.count', 0, base_tags) for suffix in ('wal_write_lag', 'wal_flush_lag', 'wal_replay_lag', 'backend_xmin_age'): @@ -259,6 +270,8 @@ def test_replication_stats(aggregator, integration_check, pg_instance): aggregator.assert_metric(metric_name, 12, app1_tags) aggregator.assert_metric(metric_name, 13, app2_tags) + check_performance_metrics(aggregator, check.debug_stats_kwargs()['tags']) + aggregator.assert_all_metrics_covered()