Skip to content

Commit

Permalink
[DBMON-3030] report postgres check query operation time metrics (#16040)
Browse files Browse the repository at this point in the history
* report postgres check query perf metrics

* fix unit tests

* update changelog

* fix lint

* fix test for postgres 9

* use debug_stats_kwargs to create internal metrics tags

* report query perf metrics under same metric

* add shared util to track query operation time

* conditionally track query operational time

* track dynamic queries operation time for postgres

* remove track_operation_time from context manager
  • Loading branch information
lu-zhengda committed Oct 19, 2023
1 parent 69937f5 commit b84c5e6
Show file tree
Hide file tree
Showing 14 changed files with 154 additions and 19 deletions.
4 changes: 4 additions & 0 deletions datadog_checks_base/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

## Unreleased

***Added***:

* Add util to track db query operation time ([#16040](https://github.com/DataDog/integrations-core/pull/16040))

***Fixed***:

* Bump the `pyodbc` version to 4.0.39 ([#16021](https://github.com/DataDog/integrations-core/pull/16021))
Expand Down
10 changes: 8 additions & 2 deletions datadog_checks_base/datadog_checks/base/utils/db/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from ..containers import iter_unique
from .query import Query
from .transform import COLUMN_TRANSFORMERS, EXTRA_TRANSFORMERS
from .utils import SUBMISSION_METHODS, create_submission_transformer
from .utils import SUBMISSION_METHODS, create_submission_transformer, tracked_query


class QueryExecutor(object):
Expand All @@ -31,6 +31,7 @@ def __init__(
error_handler=None, # type: Callable[[str], str]
hostname=None, # type: str
logger=None,
track_operation_time=False, # type: bool
): # type: (...) -> QueryExecutor
self.executor = executor # type: QueriesExecutor
self.submitter = submitter # type: QueriesSubmitter
Expand All @@ -45,6 +46,7 @@ def __init__(
self.queries = [Query(payload) for payload in queries or []] # type: List[Query]
self.hostname = hostname # type: str
self.logger = logger or logging.getLogger(__name__)
self.track_operation_time = track_operation_time

def compile_queries(self):
"""This method compiles every `Query` object."""
Expand Down Expand Up @@ -72,7 +74,11 @@ def execute(self, extra_tags=None):
query_tags = query.base_tags

try:
rows = self.execute_query(query.query)
if self.track_operation_time:
with tracked_query(check=self.submitter, operation=query_name):
rows = self.execute_query(query.query)
else:
rows = self.execute_query(query.query)
except Exception as e:
if self.error_handler:
self.logger.error('Error querying %s: %s', query_name, self.error_handler(str(e)))
Expand Down
31 changes: 31 additions & 0 deletions datadog_checks_base/datadog_checks/base/utils/db/utils.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# (C) Datadog, Inc. 2019-present
# All rights reserved
# Licensed under a 3-clause BSD style license (see LICENSE)
import contextlib
import datetime
import decimal
import functools
Expand Down Expand Up @@ -351,3 +352,33 @@ def _run_job_traced(self):

def run_job(self):
raise NotImplementedError()


@contextlib.contextmanager
def tracked_query(check, operation, tags=None):
"""
A simple context manager that tracks the time spent in a given query operation
The intention is to use this for context manager is to wrap the execution of a query,
that way the time spent waiting for query execution can be tracked as a metric. For example,
'''
with tracked_query(check, "my_metric_query", tags):
cursor.execute(query)
'''
if debug_stats_kwargs is defined on the check instance,
it will be called to set additional kwargs when submitting the metric.
:param check: The check instance
:param operation: The name of the query operation being performed.
:param tags: A list of tags to apply to the metric.
"""
start_time = time.time()
stats_kwargs = {}
if hasattr(check, 'debug_stats_kwargs'):
stats_kwargs = dict(check.debug_stats_kwargs())
stats_kwargs['tags'] = stats_kwargs.get('tags', []) + ["operation:{}".format(operation)] + (tags or [])
stats_kwargs['raw'] = True # always submit as raw to ignore any defined namespace prefix
yield
elapsed_ms = (time.time() - start_time) * 1000
check.histogram("dd.{}.operation.time".format(check.name), elapsed_ms, **stats_kwargs)
14 changes: 14 additions & 0 deletions datadog_checks_base/tests/base/utils/db/test_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
default_json_event_encoding,
obfuscate_sql_with_metadata,
resolve_db_host,
tracked_query,
)
from datadog_checks.base.utils.serialization import json

Expand Down Expand Up @@ -276,3 +277,16 @@ def test_dbm_async_job_inactive_stop(aggregator):
def test_default_json_event_encoding(input):
# assert that the default json event encoding can handle all defined types without raising TypeError
assert json.dumps(input, default=default_json_event_encoding)


def test_tracked_query(aggregator):
with mock.patch('time.time', side_effect=[100, 101]):
with tracked_query(
check=AgentCheck(name="testcheck"),
operation="test_query",
tags=["test:tag"],
):
pass
aggregator.assert_metric(
"dd.testcheck.operation.time", tags=["test:tag", "operation:test_query"], count=1, value=1000.0
)
1 change: 1 addition & 0 deletions postgres/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

* Upgrade `psycopg2-binary` to `v2.9.8` ([#15949](https://github.com/DataDog/integrations-core/pull/15949))
* Add support for reporting SQL obfuscation errors ([#15990](https://github.com/DataDog/integrations-core/pull/15990))
* Emit postgres metrics queries operation time ([#16040](https://github.com/DataDog/integrations-core/pull/16040))

***Fixed***:

Expand Down
4 changes: 4 additions & 0 deletions postgres/datadog_checks/postgres/metrics_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ def get_instance_metrics(self, version):
"FROM pg_stat_database psd "
"JOIN pg_database pd ON psd.datname = pd.datname",
'relation': False,
'name': 'instance_metrics',
}

res["query"] += " WHERE " + " AND ".join(
Expand Down Expand Up @@ -128,6 +129,7 @@ def get_bgw_metrics(self, version):
'metrics': self.bgw_metrics,
'query': "select {metrics_columns} FROM pg_stat_bgwriter",
'relation': False,
'name': 'bgw_metrics',
}

def get_count_metrics(self):
Expand Down Expand Up @@ -158,6 +160,7 @@ def get_archiver_metrics(self, version):
'metrics': self.archiver_metrics,
'query': "select {metrics_columns} FROM pg_stat_archiver",
'relation': False,
'name': 'archiver_metrics',
}

def get_replication_metrics(self, version, is_aurora):
Expand Down Expand Up @@ -238,4 +241,5 @@ def get_activity_metrics(self, version):
'metrics': metrics,
'query': query,
'relation': False,
'name': 'activity_metrics',
}
43 changes: 30 additions & 13 deletions postgres/datadog_checks/postgres/postgres.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from datadog_checks.base.utils.db import QueryExecutor
from datadog_checks.base.utils.db.utils import (
default_json_event_encoding,
tracked_query,
)
from datadog_checks.base.utils.db.utils import resolve_db_host as agent_host_resolver
from datadog_checks.base.utils.serialization import json
Expand Down Expand Up @@ -178,6 +179,7 @@ def _new_query_executor(self, queries):
queries=queries,
tags=self.tags_without_db,
hostname=self.resolved_hostname,
track_operation_time=True,
)

def execute_query_raw(self, query):
Expand Down Expand Up @@ -417,16 +419,17 @@ def _run_query_scope(self, cursor, scope, is_custom_metrics, cols, descriptors):
is_relations = scope.get('relation') and self._relations_manager.has_relations
try:
query = fmt.format(scope['query'], metrics_columns=", ".join(cols))
# if this is a relation-specific query, we need to list all relations last
if is_relations:
schema_field = get_schema_field(descriptors)
formatted_query = self._relations_manager.filter_relation_query(query, schema_field)
cursor.execute(formatted_query)
else:
self.log.debug("Running query: %s", str(query))
cursor.execute(query.replace(r'%', r'%%'))

results = cursor.fetchall()
with tracked_query(check=self, operation='custom_metrics' if is_custom_metrics else scope['name']):
# if this is a relation-specific query, we need to list all relations last
if is_relations:
schema_field = get_schema_field(descriptors)
formatted_query = self._relations_manager.filter_relation_query(query, schema_field)
cursor.execute(formatted_query)
else:
self.log.debug("Running query: %s", str(query))
cursor.execute(query.replace(r'%', r'%%'))

results = cursor.fetchall()
except psycopg2.errors.FeatureNotSupported as e:
# This happens for example when trying to get replication metrics from readers in Aurora. Let's ignore it.
log_func(e)
Expand Down Expand Up @@ -647,9 +650,13 @@ def _collect_stats(self, instance_tags):
with conn.cursor() as cursor:
self._query_scope(cursor, activity_metrics, instance_tags, False)

for scope in list(metric_scope) + self._config.custom_metrics:
for scope in list(metric_scope):
with conn.cursor() as cursor:
self._query_scope(cursor, scope, instance_tags, scope in self._config.custom_metrics)
self._query_scope(cursor, scope, instance_tags, False)

for scope in self._config.custom_metrics:
with conn.cursor() as cursor:
self._query_scope(cursor, scope, instance_tags, True)

if self.dynamic_queries:
self.dynamic_queries.execute()
Expand Down Expand Up @@ -779,7 +786,10 @@ def _collect_custom_queries(self, tags):
with conn.cursor() as cursor:
try:
self.log.debug("Running query: %s", query)
cursor.execute(query)
with tracked_query(
check=self, operation='custom_queries', tags=['metric_prefix:{}'.format(metric_prefix)]
):
cursor.execute(query)
except (psycopg2.ProgrammingError, psycopg2.errors.QueryCanceled) as e:
self.log.error("Error executing query for metric_prefix %s: %s", metric_prefix, str(e))
continue
Expand Down Expand Up @@ -884,6 +894,13 @@ def _send_database_instance_metadata(self):
self._database_instance_emitted[self.resolved_hostname] = event
self.database_monitoring_metadata(json.dumps(event, default=default_json_event_encoding))

def debug_stats_kwargs(self, tags=None):
tags = self.tags + self._get_debug_tags() + (tags or [])
return {
'tags': tags,
"hostname": self.resolved_hostname,
}

def check(self, _):
tags = copy.copy(self.tags)
# Collect metrics
Expand Down
7 changes: 7 additions & 0 deletions postgres/datadog_checks/postgres/relationsmanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
AND pc.relname NOT LIKE 'pg^_%%' ESCAPE '^'
GROUP BY pd.datname, pc.relname, pn.nspname, locktype, mode""",
'relation': True,
'name': 'lock_metrics',
}

# The pg_stat_all_tables contain one row for each table in the current database,
Expand Down Expand Up @@ -81,6 +82,7 @@
FROM pg_stat_user_tables
WHERE {relations}""",
'relation': True,
'name': 'rel_metrics',
}


Expand All @@ -103,6 +105,7 @@
FROM pg_stat_user_indexes
WHERE {relations}""",
'relation': True,
'name': 'idx_metrics',
}


Expand Down Expand Up @@ -190,7 +193,9 @@
FROM pg_statio_user_tables
WHERE {relations}""",
'relation': True,
'name': 'statio_metrics',
}

# adapted from https://wiki.postgresql.org/wiki/Show_database_bloat and https://github.com/bucardo/check_postgres/
TABLE_BLOAT_QUERY = """
SELECT
Expand Down Expand Up @@ -241,6 +246,7 @@
},
'query': TABLE_BLOAT_QUERY,
'relation': True,
'name': 'table_bloat_metrics',
}


Expand Down Expand Up @@ -296,6 +302,7 @@
},
'query': INDEX_BLOAT_QUERY,
'relation': True,
'name': 'index_bloat_metrics',
}

RELATION_METRICS = [LOCK_METRICS, REL_METRICS, IDX_METRICS, STATIO_METRICS]
Expand Down
6 changes: 6 additions & 0 deletions postgres/datadog_checks/postgres/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,7 @@ def payload_pg_version(version):
) AS subquery GROUP BY schemaname
"""
),
'name': 'count_metrics',
}

q1 = (
Expand Down Expand Up @@ -252,6 +253,7 @@ def payload_pg_version(version):
'query': """
SELECT {metrics_columns}
WHERE (SELECT pg_is_in_recovery())""",
'name': 'replication_metrics',
}

# Requires postgres 10+
Expand Down Expand Up @@ -285,6 +287,7 @@ def payload_pg_version(version):
SELECT application_name, state, sync_state, client_addr, {metrics_columns}
FROM pg_stat_replication
""",
'name': 'replication_stats_metrics',
}


Expand Down Expand Up @@ -349,6 +352,7 @@ def payload_pg_version(version):
SELECT {metrics_columns}
FROM pg_stat_database, max_con
""",
'name': 'connections_metrics',
}

SLRU_METRICS = {
Expand All @@ -367,6 +371,7 @@ def payload_pg_version(version):
SELECT name, {metrics_columns}
FROM pg_stat_slru
""",
'name': 'slru_metrics',
}

SNAPSHOT_TXID_METRICS = {
Expand Down Expand Up @@ -469,6 +474,7 @@ def payload_pg_version(version):
ON o.funcname = s.funcname;
""",
'relation': False,
'name': 'function_metrics',
}

# The metrics we retrieve from pg_stat_activity when the postgres version >= 9.6
Expand Down
25 changes: 25 additions & 0 deletions postgres/tests/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,17 @@
CONNECTION_METRICS_DB = ['postgresql.connections']
COMMON_DBS = ['dogs', 'postgres', 'dogs_nofunc', 'dogs_noschema', DB_NAME]

CHECK_PERFORMANCE_METRICS = [
'archiver_metrics',
'bgw_metrics',
'connections_metrics',
'count_metrics',
'instance_metrics',
'replication_metrics',
'replication_stats_metrics',
'slru_metrics',
]

requires_static_version = pytest.mark.skipif(USING_LATEST, reason='Version `latest` is ever-changing, skipping')


Expand Down Expand Up @@ -313,3 +324,17 @@ def check_stat_wal_metrics(aggregator, expected_tags, count=1):

for metric_name in _iterate_metric_name(STAT_WAL_METRICS):
aggregator.assert_metric(metric_name, count=count, tags=expected_tags)


def check_performance_metrics(aggregator, expected_tags, count=1, is_aurora=False):
expected_metrics = set(CHECK_PERFORMANCE_METRICS)
if is_aurora:
expected_metrics = expected_metrics - {'replication_metrics'}
if float(POSTGRES_VERSION) < 13.0:
expected_metrics = expected_metrics - {'slru_metrics'}
if float(POSTGRES_VERSION) < 10.0:
expected_metrics = expected_metrics - {'replication_stats_metrics'}
for name in expected_metrics:
aggregator.assert_metric(
'dd.postgres.operation.time', count=count, tags=expected_tags + ['operation:{}'.format(name)]
)
2 changes: 1 addition & 1 deletion postgres/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ def cursor_execute(query, second_arg=""):
data.appendleft(['app1', 'streaming', 'async', '1.1.1.1', 12, 12, 12, 12])
data.appendleft(['app2', 'backup', 'sync', '1.1.1.1', 13, 13, 13, 13])
elif query == 'SHOW SERVER_VERSION;':
data.appendleft(['10.15'])
data.appendleft([POSTGRES_VERSION])

def cursor_fetchall():
while data:
Expand Down
Loading

0 comments on commit b84c5e6

Please sign in to comment.