Skip to content

Commit

Permalink
Postgres: Add postgres.snapshot.{xmin,xmax,xip_count} metric (#13777)
Browse files Browse the repository at this point in the history
* Add snapshot.{xmin,xmax,xip_count} metrics

* Add doc on new snapshot metrics

* Add tests on snapshot metrics

---------

Co-authored-by: Alexandre Normand <alex.normand@datadoghq.com>
  • Loading branch information
bonnefoa and alexandre-normand committed Jun 6, 2023
1 parent 4409cb0 commit 4b31386
Show file tree
Hide file tree
Showing 6 changed files with 125 additions and 10 deletions.
7 changes: 7 additions & 0 deletions postgres/datadog_checks/postgres/postgres.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
QUERY_PG_UPTIME,
REPLICATION_METRICS,
SLRU_METRICS,
SNAPSHOT_TXID_METRICS,
SNAPSHOT_TXID_METRICS_LT_13,
DatabaseConfigurationError, # noqa: F401
fmt,
get_schema_field,
Expand Down Expand Up @@ -172,6 +174,11 @@ def dynamic_queries(self):
queries.append(QUERY_PG_STAT_WAL_RECEIVER)
queries.append(QUERY_PG_REPLICATION_SLOTS)

if self.version >= V13:
queries.append(SNAPSHOT_TXID_METRICS)
if self.version < V13:
queries.append(SNAPSHOT_TXID_METRICS_LT_13)

if not queries:
self.log.debug("no dynamic queries defined")
return None
Expand Down
37 changes: 37 additions & 0 deletions postgres/datadog_checks/postgres/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,43 @@ def get_schema_field(descriptors):
""",
}

SNAPSHOT_TXID_METRICS = {
'name': 'pg_snapshot',
# Use CTE to only do a single call to pg_current_snapshot
# FROM LATERAL was necessary given that pg_snapshot_xip returns a setof xid8
'query': """
WITH snap AS (
SELECT * from pg_current_snapshot()
), xip_count AS (
SELECT COUNT(xip_list) FROM LATERAL (SELECT pg_snapshot_xip(pg_current_snapshot) FROM snap) as xip_list
)
select pg_snapshot_xmin(pg_current_snapshot), pg_snapshot_xmax(pg_current_snapshot), count from snap, xip_count;
""",
'columns': [
{'name': 'postgresql.snapshot.xmin', 'type': 'gauge'},
{'name': 'postgresql.snapshot.xmax', 'type': 'gauge'},
{'name': 'postgresql.snapshot.xip_count', 'type': 'gauge'},
],
}

# Use txid_current_snapshot for PG < 13
SNAPSHOT_TXID_METRICS_LT_13 = {
'name': 'pg_snapshot_lt_13',
'query': """
WITH snap AS (
SELECT * from txid_current_snapshot()
), xip_count AS (
SELECT COUNT(xip_list) FROM LATERAL (SELECT txid_snapshot_xip(txid_current_snapshot) FROM snap) as xip_list
)
select txid_snapshot_xmin(txid_current_snapshot), txid_snapshot_xmax(txid_current_snapshot), count from snap, xip_count;
""",
'columns': [
{'name': 'postgresql.snapshot.xmin', 'type': 'gauge'},
{'name': 'postgresql.snapshot.xmax', 'type': 'gauge'},
{'name': 'postgresql.snapshot.xip_count', 'type': 'gauge'},
],
}

FUNCTION_METRICS = {
'descriptors': [('schemaname', 'schema'), ('funcname', 'function')],
'metrics': {
Expand Down
3 changes: 3 additions & 0 deletions postgres/metadata.csv
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,9 @@ postgresql.toast_index_blocks_read,gauge,,block,second,The number of disk blocks
postgresql.toast_index_blocks_hit,gauge,,block,second,The number of buffer hits in this table's TOAST table index.,0,postgres,toast idx blks hit,
postgresql.transactions.open,gauge,,transaction,,The number of open transactions in this database.,0,postgres,transactions open,
postgresql.transactions.idle_in_transaction,gauge,,transaction,,The number of 'idle in transaction' transactions in this database.,0,postgres,transactions idle_in_transaction,
postgresql.snapshot.xmin,gauge,,,,"Report the lowest transaction ID still active based on pg_snapshot_xmin(pg_current_snapshot()). All transaction IDs less than xmin are either committed and visible, or rolled back and dead.",0,postgres,snapshot xmin,
postgresql.snapshot.xmax,gauge,,,,"Report the next transaction ID that will be assigned based on pg_snapshot_xmax(pg_current_snapshot()).",0,postgres,snapshot xmax,
postgresql.snapshot.xip_count,gauge,,,,"Report the number of active transactions based on pg_snapshot_xip(pg_current_snapshot()).",0,postgres,snapshot xip,
postgresql.before_xid_wraparound,gauge,,transaction,,The number of transactions that can occur until a transaction wraparound.,0,postgres,tx before xid wraparound,
postgresql.activity.backend_xmin_age,gauge,,transaction,,The age of the oldest backend's xmin horizon relative to latest stable xid.,-1,postgres,activity xmin,
postgresql.activity.backend_xid_age,gauge,,transaction,,The age of the oldest backend's xid relative to latest stable xid.,-1,postgres,activity xid,
Expand Down
29 changes: 19 additions & 10 deletions postgres/tests/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
QUERY_PG_UPTIME,
REPLICATION_STATS_METRICS,
SLRU_METRICS,
SNAPSHOT_TXID_METRICS,
)
from datadog_checks.postgres.version_utils import VersionUtils

Expand Down Expand Up @@ -95,6 +96,13 @@
requires_static_version = pytest.mark.skipif(USING_LATEST, reason='Version `latest` is ever-changing, skipping')


def _iterate_metric_name(columns):
for column in columns:
if column['type'] == 'tag':
continue
yield column['name']


def assert_metric_at_least(aggregator, metric_name, lower_bound=None, higher_bound=None, count=None, tags=None):
found_values = 0
expected_tags = normalize_tags(tags, sort=True)
Expand Down Expand Up @@ -186,10 +194,8 @@ def check_wal_receiver_metrics(aggregator, expected_tags, count=1, connected=1):
'postgresql.wal_receiver.connected', count=count, value=1, tags=expected_tags + ['status:disconnected']
)
return
for column in QUERY_PG_STAT_WAL_RECEIVER['columns']:
if column['type'] == 'tag':
continue
aggregator.assert_metric(column['name'], count=count, tags=expected_tags)
for metric_name in _iterate_metric_name(QUERY_PG_STAT_WAL_RECEIVER['columns']):
aggregator.assert_metric(metric_name, count=count, tags=expected_tags)


def check_physical_replication_slots(aggregator, expected_tags):
Expand All @@ -215,19 +221,17 @@ def check_logical_replication_slots(aggregator, expected_tags):
def check_replication_slots(aggregator, expected_tags, count=1):
if float(POSTGRES_VERSION) < 10.0:
return
for column in QUERY_PG_REPLICATION_SLOTS['columns']:
if column['type'] == 'tag':
continue
if 'slot_type:physical' in expected_tags and column['name'] in [
for metric_name in _iterate_metric_name(QUERY_PG_REPLICATION_SLOTS['columns']):
if 'slot_type:physical' in expected_tags and metric_name in [
'postgresql.replication_slot.confirmed_flush_delay_bytes',
]:
continue
if 'slot_type:logical' in expected_tags and column['name'] in [
if 'slot_type:logical' in expected_tags and metric_name in [
'postgresql.replication_slot.restart_delay_bytes',
'postgresql.replication_slot.xmin_age',
]:
continue
aggregator.assert_metric(column['name'], count=count, tags=expected_tags)
aggregator.assert_metric(metric_name, count=count, tags=expected_tags)


def check_replication_delay(aggregator, metrics_cache, expected_tags, count=1):
Expand Down Expand Up @@ -267,3 +271,8 @@ def check_slru_metrics(aggregator, expected_tags, count=1):
for (metric_name, _) in SLRU_METRICS['metrics'].values():
for slru_cache in slru_caches:
aggregator.assert_metric(metric_name, count=count, tags=expected_tags + ['slru_name:{}'.format(slru_cache)])


def check_snapshot_txid_metrics(aggregator, expected_tags, count=1):
for metric_name in _iterate_metric_name(SNAPSHOT_TXID_METRICS['columns']):
aggregator.assert_metric(metric_name, count=count, tags=expected_tags)
57 changes: 57 additions & 0 deletions postgres/tests/test_pg_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
check_logical_replication_slots,
check_physical_replication_slots,
check_slru_metrics,
check_snapshot_txid_metrics,
check_stat_replication,
check_uptime_metrics,
check_wal_receiver_metrics,
Expand Down Expand Up @@ -65,10 +66,66 @@ def test_common_metrics(aggregator, integration_check, pg_instance, is_aurora):

check_logical_replication_slots(aggregator, expected_tags)
check_physical_replication_slots(aggregator, expected_tags)
check_snapshot_txid_metrics(aggregator, expected_tags=expected_tags)

aggregator.assert_all_metrics_covered()


def test_snapshot_xmin(aggregator, integration_check, pg_instance):
with psycopg2.connect(host=HOST, dbname=DB_NAME, user="postgres", password="datad0g") as conn:
with conn.cursor() as cur:
cur.execute('select txid_snapshot_xmin(txid_current_snapshot());')
xmin = float(cur.fetchall()[0][0])
check = integration_check(pg_instance)
check.check(pg_instance)

expected_tags = pg_instance['tags'] + [
'port:{}'.format(PORT),
'dd.internal.resource:database_instance:{}'.format(check.resolved_hostname),
]
aggregator.assert_metric('postgresql.snapshot.xmin', value=xmin, count=1, tags=expected_tags)
aggregator.assert_metric('postgresql.snapshot.xmax', value=xmin, count=1, tags=expected_tags)

with psycopg2.connect(host=HOST, dbname=DB_NAME, user="postgres", password="datad0g") as conn:
# Force autocommit
conn.set_session(autocommit=True)
with conn.cursor() as cur:
# Force increases of txid
cur.execute('select txid_current();')
cur.execute('select txid_current();')

check = integration_check(pg_instance)
check.check(pg_instance)
aggregator.assert_metric('postgresql.snapshot.xmin', value=xmin + 2, count=1, tags=expected_tags)
aggregator.assert_metric('postgresql.snapshot.xmax', value=xmin + 2, count=1, tags=expected_tags)


def test_snapshot_xip(aggregator, integration_check, pg_instance):
conn1 = _get_conn(pg_instance)
cur = conn1.cursor()

# Start a transaction
cur.execute('BEGIN;')
# Force assignement of a txid and keep the transaction opened
cur.execute('select txid_current();')
# Make sure to fetch the result to make sure we start the timer after the transaction started
cur.fetchall()

conn2 = _get_conn(pg_instance)
conn2.set_session(autocommit=True)
with conn2.cursor() as cur2:
# Force increases of txid
cur2.execute('select txid_current();')

check = integration_check(pg_instance)
check.check(pg_instance)
expected_tags = pg_instance['tags'] + [
'port:{}'.format(PORT),
'dd.internal.resource:database_instance:{}'.format(check.resolved_hostname),
]
aggregator.assert_metric('postgresql.snapshot.xip_count', value=1, count=1, tags=expected_tags)


def test_common_metrics_without_size(aggregator, integration_check, pg_instance):
pg_instance['collect_database_size_metrics'] = False
check = integration_check(pg_instance)
Expand Down
2 changes: 2 additions & 0 deletions postgres/tests/test_pg_replication.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
check_db_count,
check_replication_delay,
check_slru_metrics,
check_snapshot_txid_metrics,
check_uptime_metrics,
check_wal_receiver_metrics,
)
Expand All @@ -40,6 +41,7 @@ def test_common_replica_metrics(aggregator, integration_check, metrics_cache_rep
check_wal_receiver_metrics(aggregator, expected_tags=expected_tags + ['status:streaming'])
check_conflict_metrics(aggregator, expected_tags=expected_tags)
check_uptime_metrics(aggregator, expected_tags=expected_tags)
check_snapshot_txid_metrics(aggregator, expected_tags=expected_tags)

aggregator.assert_all_metrics_covered()

Expand Down

0 comments on commit 4b31386

Please sign in to comment.