Skip to content

Commit

Permalink
[Fetch Migration] Handle idle pipeline when target doc count is never…
Browse files Browse the repository at this point in the history
… reached (#377)

This commit introduces changes to track various Data Prepper metrics (as well as API failure counters) in order to detect an idle pipeline or non-responsive Data Prepper subprocess. Without these logic changes, the monitoring module would only shutdown the Data Prepper pipeline when the target doc count was reached. If this failed to occur for any reason, or if the Data Prepper API was unresponsive, the overall Fetch Migration workflow would never conclude.

A new ProgressMetrics class has been added to track all metrics and encapsulate detection logic. Much of the migration-success logic from the monitoring module has been moved to this class. Unit test updates and improvements are also included.

This PR also refactors/merges the run and monitor_local functions together (since most of their code/logic is identical) for improved unit test coverage.

---------

Signed-off-by: Kartik Ganesh <gkart@amazon.com>
  • Loading branch information
kartg committed Nov 7, 2023
1 parent e5ed0eb commit 400b236
Show file tree
Hide file tree
Showing 8 changed files with 405 additions and 142 deletions.
2 changes: 1 addition & 1 deletion FetchMigration/python/fetch_orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ def run(dp_base_path: str, dp_config_file: str, dp_endpoint: str) -> Optional[in
# Run the migration monitor next
migration_monitor_params = MigrationMonitorParams(metadata_migration_result.target_doc_count, dp_endpoint)
logging.info("Starting migration monitor...\n")
return migration_monitor.monitor_local(migration_monitor_params, proc)
return migration_monitor.run(migration_monitor_params, proc)


if __name__ == '__main__': # pragma no cover
Expand Down
2 changes: 1 addition & 1 deletion FetchMigration/python/metadata_migration.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ def run(args: MetadataMigrationParams) -> MetadataMigrationResult:
# Write output YAML
if len(args.output_file) > 0:
write_output(dp_config, indices_to_create, args.output_file)
if args.report:
if args.report: # pragma no cover
print("Wrote output YAML pipeline to: " + args.output_file)
if not args.dryrun:
index_data = dict()
Expand Down
134 changes: 56 additions & 78 deletions FetchMigration/python/migration_monitor.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import argparse
import logging
import math
import subprocess
import time
from subprocess import Popen
Expand All @@ -12,14 +11,20 @@

from endpoint_info import EndpointInfo
from migration_monitor_params import MigrationMonitorParams
from progress_metrics import ProgressMetrics

# Path to the Data Prepper Prometheus metrics API endpoint
# Used to monitor the progress of the migration
__METRICS_API_PATH = "/metrics/prometheus"
__SHUTDOWN_API_PATH = "/shutdown"
__DOC_SUCCESS_METRIC = "_opensearch_documentsSuccess"
__RECORDS_IN_FLIGHT_METRIC = "_BlockingBuffer_recordsInFlight"
__NO_PARTITIONS_METRIC = "_noPartitionsAcquired"
__METRICS_API_PATH: str = "/metrics/prometheus"
__SHUTDOWN_API_PATH: str = "/shutdown"
__DOC_SUCCESS_METRIC: str = "_opensearch_documentsSuccess"
__RECORDS_IN_FLIGHT_METRIC: str = "_BlockingBuffer_recordsInFlight"
__NO_PARTITIONS_METRIC: str = "_noPartitionsAcquired"
__IDLE_THRESHOLD: int = 5


def is_process_alive(proc: Popen) -> bool:
return proc.returncode is None


# Gracefully shutdown a subprocess
Expand All @@ -29,7 +34,7 @@ def shutdown_process(proc: Popen) -> Optional[int]:
try:
proc.wait(timeout=60)
except subprocess.TimeoutExpired:
if proc.returncode is None:
if is_process_alive(proc):
# Failed to terminate, send SIGKILL
proc.kill()
return proc.returncode
Expand Down Expand Up @@ -60,106 +65,79 @@ def get_metric_value(metric_families: List, metric_suffix: str) -> Optional[int]
return None


def check_if_complete(doc_count: Optional[int], in_flight: Optional[int], no_partition_count: Optional[int],
prev_no_partition: int, target: int) -> bool:
# Check for target doc_count
# TODO Add a check for partitionsCompleted = indices
if doc_count is not None and doc_count >= target:
# Check for idle pipeline
logging.info("Target doc count reached, checking for idle pipeline...")
# Debug metrics
if logging.getLogger().isEnabledFor(logging.DEBUG): # pragma no cover
debug_msg_template: str = "Idle pipeline metrics - " + \
"Records in flight: [{0}], " + \
"No-partitions counter: [{1}]" + \
"Previous no-partition value: [{2}]"
logging.debug(debug_msg_template.format(in_flight, no_partition_count, prev_no_partition))

if in_flight is not None and in_flight == 0:
# No-partitions metrics should steadily tick up
if no_partition_count is not None and no_partition_count > prev_no_partition > 0:
return True
return False


def check_and_log_progress(endpoint_info: EndpointInfo, target_doc_count: int, prev_no_partitions_count: int) -> \
tuple[bool, int]:
terminal: bool = False
def check_and_log_progress(endpoint_info: EndpointInfo, progress: ProgressMetrics) -> ProgressMetrics:
# If the API call fails, the response is empty
metrics = fetch_prometheus_metrics(endpoint_info)
if metrics is not None:
# Reset API failure counter
progress.reset_metric_api_failure()
success_docs = get_metric_value(metrics, __DOC_SUCCESS_METRIC)
rec_in_flight = get_metric_value(metrics, __RECORDS_IN_FLIGHT_METRIC)
no_partitions_count = get_metric_value(metrics, __NO_PARTITIONS_METRIC)
if success_docs is not None: # pragma no cover
completion_percentage: int = math.floor((success_docs * 100) / target_doc_count)
progress.update_records_in_flight_count(get_metric_value(metrics, __RECORDS_IN_FLIGHT_METRIC))
progress.update_no_partitions_count(get_metric_value(metrics, __NO_PARTITIONS_METRIC))
if success_docs is not None:
completion_percentage = progress.update_success_doc_count(success_docs)
progress_message: str = "Completed " + str(success_docs) + \
" docs ( " + str(completion_percentage) + "% )"
logging.info(progress_message)
if progress.all_docs_migrated():
logging.info("All documents migrated...")
else:
progress.record_success_doc_value_failure()
logging.warning("Could not fetch progress stats from Data Prepper response, " +
"will retry on next polling cycle...")
terminal = check_if_complete(success_docs, rec_in_flight, no_partitions_count, prev_no_partitions_count,
target_doc_count)
if not terminal:
# Save no_partitions_count
prev_no_partitions_count = no_partitions_count
else:
progress.record_metric_api_failure()
logging.warning("Data Prepper metrics API call failed, will retry on next polling cycle...")
# TODO - Handle idle non-terminal pipeline
return terminal, prev_no_partitions_count
return progress


def __should_continue_monitoring(progress: ProgressMetrics, proc: Optional[Popen] = None) -> bool:
return not progress.is_in_terminal_state() and (proc is None or is_process_alive(proc))


def monitor_local(args: MigrationMonitorParams, dp_process: Popen, poll_interval_seconds: int = 30) -> Optional[int]:
# The "dp_process" parameter is optional, and signifies a local Data Prepper process
def run(args: MigrationMonitorParams, dp_process: Optional[Popen] = None, poll_interval_seconds: int = 30) -> int:
endpoint_info = EndpointInfo(args.data_prepper_endpoint)
target_doc_count: int = args.target_count
# Counter to track the no_partition_count metric
no_partition_count: int = 0
is_migration_complete = False
logging.info("Starting migration monitor until target doc count: " + str(target_doc_count))
# Sets returncode. A value of None means the subprocess has not yet terminated
dp_process.poll()
while dp_process.returncode is None and not is_migration_complete:
try:
dp_process.wait(timeout=poll_interval_seconds)
except subprocess.TimeoutExpired:
pass
if dp_process.returncode is None:
is_migration_complete, no_partition_count = check_and_log_progress(
endpoint_info, target_doc_count, no_partition_count)
progress_metrics = ProgressMetrics(args.target_count, __IDLE_THRESHOLD)
logging.info("Starting migration monitor until target doc count: " + str(progress_metrics.target_doc_count))
while __should_continue_monitoring(progress_metrics, dp_process):
if dp_process is not None:
# Wait on local process
try:
dp_process.wait(timeout=poll_interval_seconds)
except subprocess.TimeoutExpired:
pass
else:
# Thread sleep
time.sleep(poll_interval_seconds)
if dp_process is None or is_process_alive(dp_process):
progress_metrics = check_and_log_progress(endpoint_info, progress_metrics)
# Loop terminated
if not is_migration_complete:
if not progress_metrics.is_in_terminal_state():
# This will only happen for a local Data Prepper process
logging.error("Migration did not complete, process exited with code: " + str(dp_process.returncode))
# TODO - Implement rollback
logging.error("Please delete any partially migrated indices before retrying the migration.")
return dp_process.returncode
else:
if progress_metrics.is_migration_complete_success():
logging.info("Migration monitor observed successful migration, shutting down...\n")
elif progress_metrics.is_migration_idle():
logging.warning("Migration monitor observed idle pipeline (migration may be incomplete), shutting down...")
elif progress_metrics.is_too_may_api_failures():
logging.warning("Migration monitor was unable to fetch migration metrics, terminating...")
# Shut down Data Prepper pipeline via API
logging.info("Migration monitor observed successful migration and idle pipeline, shutting down...\n")
shutdown_pipeline(endpoint_info)
if dp_process.returncode is None:
if dp_process is None:
# No local process
return 0
elif is_process_alive(dp_process):
# Workaround for https://github.com/opensearch-project/data-prepper/issues/3141
return shutdown_process(dp_process)
else:
return dp_process.returncode


def run(args: MigrationMonitorParams, poll_interval_seconds: int = 30) -> None:
endpoint_info = EndpointInfo(args.data_prepper_endpoint)
target_doc_count: int = args.target_count
# Counter to track the no_partition_count metric
no_partition_count: int = 0
is_migration_complete = False
logging.info("Starting migration monitor until target doc count: " + str(target_doc_count))
while not is_migration_complete:
time.sleep(poll_interval_seconds)
is_migration_complete, no_partition_count = check_and_log_progress(
endpoint_info, target_doc_count, no_partition_count)
# Loop terminated, shut down the Data Prepper pipeline
logging.info("Migration monitor observed successful migration and idle pipeline, shutting down...\n")
shutdown_pipeline(endpoint_info)


if __name__ == '__main__': # pragma no cover
# Set up parsing for command line arguments
arg_parser = argparse.ArgumentParser(
Expand Down
1 change: 1 addition & 0 deletions FetchMigration/python/migration_monitor_params.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@
class MigrationMonitorParams:
target_count: int
data_prepper_endpoint: str
is_local_process: bool = False
134 changes: 134 additions & 0 deletions FetchMigration/python/progress_metrics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
import logging
import math
from typing import Optional


# Class that tracks metrics on the health and progress of the migration.Specific metric values from the Data Prepper
# metrics API endpoint are retrieved and stored, as well as idle-value tracking via counters that may indicate an
# idle pipeline. Counters are also used to keep track of API failures or missing metric values.
class ProgressMetrics:

# Private constants
__IDLE_VALUE_PREFIX: str = "idle-value-"
_METRIC_API_FAIL_KEY: str = "metric_api_fail"
_SUCCESS_DOCS_KEY = "success_docs"
_REC_IN_FLIGHT_KEY = "records_in_flight"
_NO_PART_KEY = "no_partitions"

target_doc_count: int
idle_threshold: int
current_values_map: dict[str, Optional[int]]
prev_values_map: dict[str, Optional[int]]
counter_map: dict[str, int]

def __init__(self, doc_count, idle_threshold):
self.target_doc_count = doc_count
self.idle_threshold = idle_threshold
self.current_values_map = dict()
self.prev_values_map = dict()
self.counter_map = dict()

def __reset_counter(self, key: str):
if key in self.counter_map:
del self.counter_map[key]

def __increment_counter(self, key: str):
val = self.counter_map.get(key, 0)
self.counter_map[key] = val + 1

def __get_idle_value_key_name(self, key: str) -> str:
return self.__IDLE_VALUE_PREFIX + key

def __get_idle_value_count(self, key: str) -> Optional[int]:
idle_value_key = self.__get_idle_value_key_name(key)
return self.counter_map.get(idle_value_key)

def __record_value(self, key: str, val: Optional[int]):
if key in self.current_values_map:
# Move current value to previous
self.prev_values_map[key] = self.current_values_map[key]
# Track idle value metrics
idle_value_key = self.__get_idle_value_key_name(key)
if self.prev_values_map[key] == val:
self.__increment_counter(idle_value_key)
else:
self.__reset_counter(idle_value_key)
# Store new value
self.current_values_map[key] = val

def __get_current_value(self, key: str) -> Optional[int]:
return self.current_values_map.get(key)

def reset_metric_api_failure(self):
self.__reset_counter(self._METRIC_API_FAIL_KEY)

def record_metric_api_failure(self):
self.__increment_counter(self._METRIC_API_FAIL_KEY)

def __reset_success_doc_value_failure(self):
self.__reset_counter(self._SUCCESS_DOCS_KEY)
# Also reset API falure counter
self.reset_metric_api_failure()

def record_success_doc_value_failure(self):
self.__record_value(self._SUCCESS_DOCS_KEY, None)

def update_success_doc_count(self, doc_count: int) -> int:
self.__reset_success_doc_value_failure()
self.__record_value(self._SUCCESS_DOCS_KEY, doc_count)
return self.get_doc_completion_percentage()

def update_records_in_flight_count(self, rec_in_flight: Optional[int]):
self.__record_value(self._REC_IN_FLIGHT_KEY, rec_in_flight)

def update_no_partitions_count(self, no_part_count: Optional[int]):
self.__record_value(self._NO_PART_KEY, no_part_count)

def get_doc_completion_percentage(self) -> int:
success_doc_count = self.__get_current_value(self._SUCCESS_DOCS_KEY)
if success_doc_count is None:
success_doc_count = 0
return math.floor((success_doc_count * 100) / self.target_doc_count)

def all_docs_migrated(self) -> bool:
# TODO Add a check for partitionsCompleted = indices
success_doc_count = self.__get_current_value(self._SUCCESS_DOCS_KEY)
if success_doc_count is None:
success_doc_count = 0
return success_doc_count >= self.target_doc_count

def is_migration_complete_success(self) -> bool:
is_idle_pipeline: bool = False
rec_in_flight = self.__get_current_value(self._REC_IN_FLIGHT_KEY)
no_partitions_count = self.__get_current_value(self._NO_PART_KEY)
prev_no_partitions_count = self.prev_values_map.get(self._NO_PART_KEY, 0)
# Check for no records in flight
if rec_in_flight is not None and rec_in_flight == 0:
# No-partitions metrics should steadily tick up
if no_partitions_count is not None and no_partitions_count > prev_no_partitions_count > 0:
is_idle_pipeline = True
return is_idle_pipeline and self.all_docs_migrated()

def is_migration_idle(self) -> bool:
keys_to_check = [self._NO_PART_KEY, self._SUCCESS_DOCS_KEY]
for key in keys_to_check:
val = self.__get_idle_value_count(key)
if val is not None and val >= self.idle_threshold:
logging.warning("Idle pipeline detected because [" + key + "] value was idle above threshold: " +
str(self.idle_threshold))
return True
# End of loop
return False

def is_too_may_api_failures(self) -> bool:
return self.counter_map.get(self._METRIC_API_FAIL_KEY, 0) >= self.idle_threshold

def is_in_terminal_state(self) -> bool:
return self.is_migration_complete_success() or self.is_migration_idle() or self.is_too_may_api_failures()

def log_idle_pipeline_debug_metrics(self): # pragma no cover
if logging.getLogger().isEnabledFor(logging.DEBUG):
logging.debug("Idle pipeline metrics - " +
f"Records in flight: [{self.__get_current_value(self._REC_IN_FLIGHT_KEY)}], " +
f"No-partitions counter: [{self.__get_current_value(self._NO_PART_KEY)}]" +
f"Previous no-partition value: [{self.prev_values_map.get(self._NO_PART_KEY)}]")
4 changes: 2 additions & 2 deletions FetchMigration/python/tests/test_fetch_orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

class TestFetchOrchestrator(unittest.TestCase):

@patch('migration_monitor.monitor_local')
@patch('migration_monitor.run')
@patch('subprocess.Popen')
@patch('metadata_migration.run')
# Note that mock objects are passed bottom-up from the patch order above
Expand All @@ -33,7 +33,7 @@ def test_orchestrator_run(self, mock_metadata_migration: MagicMock, mock_subproc
mock_subprocess.assert_called_once_with(expected_dp_runnable)
mock_monitor.assert_called_once_with(expected_monitor_input, mock_subprocess.return_value)

@patch('migration_monitor.monitor_local')
@patch('migration_monitor.run')
@patch('subprocess.Popen')
@patch('metadata_migration.run')
# Note that mock objects are passed bottom-up from the patch order above
Expand Down
Loading

0 comments on commit 400b236

Please sign in to comment.