Skip to content

Commit

Permalink
Make dbm async job cancel non-blocking (#16028)
Browse files Browse the repository at this point in the history
* Make db async job cancel non-blocking

* add changelog

* reorder changelog entry

* use run_one_check to ensure check is properly cancelled

* remove aggregator reset

* remove logging _last_check_run because logging ts isn't helpful
  • Loading branch information
lu-zhengda committed Oct 17, 2023
1 parent 8ce334b commit caf9bd6
Show file tree
Hide file tree
Showing 8 changed files with 34 additions and 25 deletions.
1 change: 1 addition & 0 deletions datadog_checks_base/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

***Fixed***:

* Fix check cancellation timeout due to `DBMAsyncJob` cancellation being blocked ([#16028](https://github.com/DataDog/integrations-core/pull/16028))
* Bump the `pyodbc` version to 4.0.39 ([#16021](https://github.com/DataDog/integrations-core/pull/16021))

## 34.0.0 / 2023-09-29
Expand Down
3 changes: 0 additions & 3 deletions datadog_checks_base/datadog_checks/base/utils/db/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -264,9 +264,6 @@ def cancel(self):
Send a signal to cancel the job loop asynchronously.
"""
self._cancel_event.set()
# after setting cancel event, wait for job loop to fully shutdown
if self._job_loop_future:
self._job_loop_future.result()

def run_job_loop(self, tags):
"""
Expand Down
4 changes: 4 additions & 0 deletions postgres/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@
* Upgrade `psycopg2-binary` to `v2.9.8` ([#15949](https://github.com/DataDog/integrations-core/pull/15949))
* Add support for reporting SQL obfuscation errors ([#15990](https://github.com/DataDog/integrations-core/pull/15990))

***Fixed***:

* Fix check cancellation timeout due to `DBMAsyncJob` cancellation being blocked ([#16028](https://github.com/DataDog/integrations-core/pull/16028))

## 15.1.0 / 2023-10-06

***Added***:
Expand Down
12 changes: 8 additions & 4 deletions postgres/datadog_checks/postgres/postgres.py
Original file line number Diff line number Diff line change
Expand Up @@ -289,11 +289,15 @@ def dynamic_queries(self):

def cancel(self):
"""
Cancels and waits for all threads to stop.
Cancels and sends cancel signal to all threads.
"""
self.statement_samples.cancel()
self.statement_metrics.cancel()
self.metadata_samples.cancel()
if self._config.dbm_enabled:
self.statement_samples.cancel()
self.statement_metrics.cancel()
self.metadata_samples.cancel()
self._close_db_pool()
if self._db:
self._db.close()

def _clean_state(self):
self.log.debug("Cleaning state")
Expand Down
20 changes: 11 additions & 9 deletions postgres/datadog_checks/postgres/version_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,17 +35,19 @@ def get_raw_version(db):
def is_aurora(self, db):
if self._seen_aurora_exception:
return False
try:
with db as conn:
with conn.cursor() as cursor:
# This query will pollute PG logs in non aurora versions,
# but is the only reliable way to detect aurora
with db as conn:
with conn.cursor() as cursor:
# This query will pollute PG logs in non aurora versions,
# but is the only reliable way to detect aurora
try:
cursor.execute('select AURORA_VERSION();')
return True
except Exception as e:
self.log.debug("Captured exception %s while determining if the DB is aurora. Assuming is not", str(e))
self._seen_aurora_exception = True
return False
except Exception as e:
self.log.debug(
"Captured exception %s while determining if the DB is aurora. Assuming is not", str(e)
)
self._seen_aurora_exception = True
return False

@staticmethod
def parse_version(raw_version):
Expand Down
6 changes: 3 additions & 3 deletions postgres/tests/test_pg_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
check_wal_receiver_metrics,
requires_static_version,
)
from .utils import _get_conn, _get_superconn, requires_over_10, requires_over_14
from .utils import _get_conn, _get_superconn, requires_over_10, requires_over_14, run_one_check

CONNECTION_METRICS = ['postgresql.max_connections', 'postgresql.percent_usage_connections']

Expand Down Expand Up @@ -663,7 +663,7 @@ def test_database_instance_metadata(aggregator, dd_run_check, pg_instance, dbm_e
expected_host = reported_hostname if reported_hostname else 'stubbed.hostname'
expected_tags = pg_instance['tags'] + ['port:{}'.format(pg_instance['port'])]
check = PostgreSql('test_instance', {}, [pg_instance])
dd_run_check(check)
run_one_check(check, pg_instance)

dbm_metadata = aggregator.get_event_platform_events("dbm-metadata")
event = next((e for e in dbm_metadata if e['kind'] == 'database_instance'), None)
Expand All @@ -680,7 +680,7 @@ def test_database_instance_metadata(aggregator, dd_run_check, pg_instance, dbm_e

# Run a second time and expect the metadata to not be emitted again because of the cache TTL
aggregator.reset()
dd_run_check(check)
run_one_check(check, pg_instance)

dbm_metadata = aggregator.get_event_platform_events("dbm-metadata")
event = next((e for e in dbm_metadata if e['kind'] == 'database_instance'), None)
Expand Down
8 changes: 4 additions & 4 deletions postgres/tests/test_statements.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,16 +143,16 @@ def _run_queries():

check = integration_check(dbm_instance)
check._connect()
run_one_check(check, dbm_instance)
run_one_check(check, dbm_instance, cancel=False)

# We can't change track_io_timing at runtime, but we can change what the integration thinks the runtime value is
# This must be done after the first check since postgres settings are loaded from the database then
check.pg_settings["track_io_timing"] = "on" if track_io_timing_enabled else "off"

_run_queries()
run_one_check(check, dbm_instance)
run_one_check(check, dbm_instance, cancel=False)
_run_queries()
run_one_check(check, dbm_instance)
run_one_check(check, dbm_instance, cancel=False)

def _should_catch_query(dbname):
# we can always catch it if the query originals in the same DB
Expand Down Expand Up @@ -370,7 +370,7 @@ def obfuscate_sql(query, options=None):
mock_agent.side_effect = obfuscate_sql
cursor.execute(query, (['app1', 'app2'],))
cursor.execute(query, (['app1', 'app2', 'app3'],))
run_one_check(check, dbm_instance)
check.check(dbm_instance)

cursor.execute(query, (['app1', 'app2'],))
cursor.execute(query, (['app1', 'app2', 'app3'],))
Expand Down
5 changes: 3 additions & 2 deletions postgres/tests/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,13 +55,14 @@ def _wait_for_value(db_instance, lower_threshold, query):
time.sleep(0.1)


def run_one_check(check, db_instance):
def run_one_check(check, db_instance, cancel=True):
"""
Run check and immediately cancel.
Waits for all threads to close before continuing.
"""
check.check(db_instance)
check.cancel()
if cancel:
check.cancel()
if check.statement_samples._job_loop_future is not None:
check.statement_samples._job_loop_future.result()
if check.statement_metrics._job_loop_future is not None:
Expand Down

0 comments on commit caf9bd6

Please sign in to comment.