Skip to content

Commit

Permalink
Add tests for index creation progress
Browse files Browse the repository at this point in the history
  • Loading branch information
bonnefoa committed Dec 11, 2023
1 parent 6144fc6 commit 6c6bcf1
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 13 deletions.
42 changes: 42 additions & 0 deletions postgres/tests/test_progress_stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,19 @@
from datadog_checks.postgres.util import (
ANALYZE_PROGRESS_METRICS,
CLUSTER_VACUUM_PROGRESS_METRICS,
INDEX_PROGRESS_METRICS,
VACUUM_PROGRESS_METRICS,
)

from .common import DB_NAME, _get_expected_tags, _iterate_metric_name
from .utils import (
_wait_for_value,
kill_session,
kill_vacuum,
lock_table,
requires_over_12,
requires_over_13,
run_query_thread,
run_vacuum_thread,
)

Expand Down Expand Up @@ -99,6 +102,45 @@ def test_vacuum_progress(aggregator, integration_check, pg_instance):
aggregator.assert_metric(metric_name, count=1, tags=expected_tags)


@requires_over_12
def test_index_progress(aggregator, integration_check, pg_instance):
check = integration_check(pg_instance)

# Keep test_part locked to prevent create index concurrently from finishing
conn = lock_table(pg_instance, 'test_part1', 'ROW EXCLUSIVE')

# Start vacuum in a thread
thread = run_query_thread(pg_instance, 'CREATE INDEX CONCURRENTLY test_progress_index ON test_part1 (id);')

# Wait for blocked created index to appear
_wait_for_value(
pg_instance,
lower_threshold=0,
query="select count(*) FROM pg_stat_progress_create_index where lockers_total=1;",
)
# Gather metrics
check.check(pg_instance)

# Kill the create index
kill_session(pg_instance, 'CREATE INDEX')

# Cleanup connection and thread
conn.close()
thread.join()

# Check metrics
expected_tags = _get_expected_tags(check, pg_instance) + [
'command:CREATE INDEX CONCURRENTLY',
'index:test_progress_index',
'phase:waiting for writers before build',
'table:test_part1',
f'db:{DB_NAME}',
]
for metric_name in _iterate_metric_name(INDEX_PROGRESS_METRICS):
aggregator.assert_metric(metric_name, count=1, tags=expected_tags)
aggregator.assert_metric('postgresql.create_index.lockers_total', count=1, value=1, tags=expected_tags)


@requires_over_12
def test_cluster_vacuum_progress(aggregator, integration_check, pg_instance):
check = integration_check(pg_instance)
Expand Down
39 changes: 26 additions & 13 deletions postgres/tests/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,18 @@ def lock_table(pg_instance, table, lock_mode):
return lock_conn


def kill_vacuum(pg_instance):
def kill_session(pg_instance, query_pattern):
with _get_superconn(pg_instance) as conn:
with conn.cursor() as cur:
cur.execute("SELECT pg_cancel_backend(pid) FROM pg_stat_activity WHERE query ~* '^vacuum'")
cur.execute(
f"""SELECT pg_cancel_backend(pid)
FROM pg_stat_activity
WHERE query ~* '{query_pattern}' AND pid!=pg_backend_pid()"""
)


def kill_vacuum(pg_instance):
kill_session(pg_instance, '^vacuum')


# Wait until the query yielding a single value cross the provided threshold
Expand All @@ -85,25 +93,30 @@ def _wait_for_value(db_instance, lower_threshold, query, attempts=10):
conn.close()


def run_vacuum_thread(pg_instance, vacuum_query, application_name='test'):
def run_analyze():
conn_vacuum = _get_superconn(pg_instance, application_name)
with conn_vacuum.cursor() as cur:
cur.execute("set statement_timeout='2s'")
cur.execute('set vacuum_cost_delay=100')
cur.execute('set vacuum_cost_limit=1')
def run_query_thread(pg_instance, query, application_name='test', init_statements=None):
def run_query():
conn = _get_superconn(pg_instance, application_name)
with conn.cursor() as cur:
if init_statements:
for stmt in init_statements:
cur.execute(stmt)
try:
cur.execute(vacuum_query)
cur.execute(query)
except psycopg2.errors.QueryCanceled:
pass
conn_vacuum.close()
conn.close()

# Start vacuum
thread = threading.Thread(target=run_analyze)
# Start thread
thread = threading.Thread(target=run_query)
thread.start()
return thread


def run_vacuum_thread(pg_instance, vacuum_query, application_name='test'):
init_stmts = ["set statement_timeout='2s'", 'set vacuum_cost_delay=100', 'set vacuum_cost_limit=1']
return run_query_thread(pg_instance, vacuum_query, application_name, init_stmts)


def run_one_check(check, db_instance, cancel=True):
"""
Run check and immediately cancel.
Expand Down

0 comments on commit 6c6bcf1

Please sign in to comment.