Skip to content
This repository has been archived by the owner on Apr 11, 2024. It is now read-only.

Commit

Permalink
[Fetch Migration] Adding orchestration script for Fetch Migration ste…
Browse files Browse the repository at this point in the history
…ps (opensearch-project#294)

This change adds an "orchestration" Python script that runs the Fetch Migration steps in order:

1) The metadata migration script is run first to compare the indices on source and target clusters. A human-readable report is printed, and an updated Data Prepper pipeline YAML file is written to the Data Prepper `pipelines` directory

2)  Next, Data Prepper is kicked off as a [sub-process](https://docs.python.org/3/library/subprocess.html) to migrate data based on the output pipeline YAML file

3) After the Data Prepper process is kicked off, the monitoring script is run to poll DP's Prometheus metrics endpoint and determine when migration is complete and the process can be shut down

The orchestrator script takes multiple command-line inputs to enable execution of these steps. The Dockerfile definition has also been updated to use the orchestrator script as the entrypoint.

* Adding logging to migration_monitor.py

Add logging to track progress of the migration. This commit also moves the sleep state to the start of the loop. This allows an initial 30 seconds to go by, letting the Data Prepper process startup. Otherwise, we run the risk of the GET call failing.

---------

Signed-off-by: Kartik Ganesh <gkart@amazon.com>
  • Loading branch information
kartg committed Oct 9, 2023
1 parent d6ea624 commit 8765da5
Show file tree
Hide file tree
Showing 11 changed files with 226 additions and 69 deletions.
8 changes: 4 additions & 4 deletions FetchMigration/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,13 @@ RUN apk update
RUN apk add --no-cache python3 py-pip
RUN pip install --user -r requirements.txt

ENV ICT_CODE_PATH /code
WORKDIR $ICT_CODE_PATH
ENV FM_CODE_PATH /code
WORKDIR $FM_CODE_PATH
# Copy only source code
COPY python/*.py .

# update PATH
ENV PATH=/root/.local:$PATH

# make sure you include the -u flag to have our stdout logged
ENTRYPOINT python -u ./main.py -r $ICT_CODE_PATH/input.yaml $DATA_PREPPER_PATH/pipelines/pipelines.yaml; $DATA_PREPPER_PATH/bin/data-prepper
# Include the -u flag to have stdout logged
ENTRYPOINT python -u ./fetch_orchestrator.py $DATA_PREPPER_PATH $FM_CODE_PATH/input.yaml https://localhost:4900
2 changes: 1 addition & 1 deletion FetchMigration/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ python -m pip install -r index_configuration_tool/requirements.txt
After [setup](#setup), the tool can be executed using:

```shell
python index_configuration_tool/pre_migration.py <pipeline_yaml_path> <output_file>
python index_configuration_tool/metadata_migration.py <pipeline_yaml_path> <output_file>
```

### Docker
Expand Down
60 changes: 60 additions & 0 deletions FetchMigration/python/fetch_orchestrator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
import argparse
import logging
import os
import subprocess

import migration_monitor
import metadata_migration
from migration_monitor_params import MigrationMonitorParams
from metadata_migration_params import MetadataMigrationParams


__DP_EXECUTABLE_SUFFIX = "/bin/data-prepper"
__PIPELINE_OUTPUT_FILE_SUFFIX = "/pipelines/pipeline.yaml"


def run(dp_base_path: str, dp_config_file: str, dp_endpoint: str):
dp_exec_path = dp_base_path + __DP_EXECUTABLE_SUFFIX
output_file = dp_base_path + __PIPELINE_OUTPUT_FILE_SUFFIX
metadata_migration_params = MetadataMigrationParams(dp_config_file, output_file, report=True)
logging.info("Running pre-migration steps...\n")
metadata_migration_result = metadata_migration.run(metadata_migration_params)
if len(metadata_migration_result.created_indices) > 0:
# Kick off a subprocess for Data Prepper
logging.info("Running Data Prepper...\n")
proc = subprocess.Popen(dp_exec_path)
# Data Prepper started successfully, run the migration monitor
migration_monitor_params = MigrationMonitorParams(metadata_migration_result.target_doc_count, dp_endpoint)
logging.info("Starting migration monitor...\n")
migration_monitor.run(migration_monitor_params)
# Migration ended, the following is a workaround for
# https://github.com/opensearch-project/data-prepper/issues/3141
if proc.returncode is None:
proc.terminate()


if __name__ == '__main__': # pragma no cover
# Set log level
logging.basicConfig(level=logging.INFO)
# Set up parsing for command line arguments
arg_parser = argparse.ArgumentParser(
prog="python fetch_orchestrator.py",
description="Orchestrator script for fetch migration",
formatter_class=argparse.RawTextHelpFormatter
)
# Required positional argument
arg_parser.add_argument(
"data_prepper_path",
help="Path to the base directory where Data Prepper is installed "
)
arg_parser.add_argument(
"config_file_path",
help="Path to the Data Prepper pipeline YAML file to parse for source and target endpoint information"
)
arg_parser.add_argument(
"data_prepper_endpoint",
help="Data Prepper endpoint for monitoring the migration"
)
cli_args = arg_parser.parse_args()
base_path = os.path.expandvars(cli_args.data_prepper_path)
run(base_path, cli_args.config_file_path, cli_args.data_prepper_endpoint)
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@

# Constants
from endpoint_info import EndpointInfo
from pre_migration_params import PreMigrationParams
from pre_migration_result import PreMigrationResult
from metadata_migration_params import MetadataMigrationParams
from metadata_migration_result import MetadataMigrationResult

SUPPORTED_ENDPOINTS = ["opensearch", "elasticsearch"]
SOURCE_KEY = "source"
Expand Down Expand Up @@ -153,7 +153,7 @@ def compute_endpoint_and_fetch_indices(config: dict, key: str) -> tuple[Endpoint
return endpoint_info, indices


def run(args: PreMigrationParams) -> PreMigrationResult:
def run(args: MetadataMigrationParams) -> MetadataMigrationResult:
# Sanity check
if not args.report and len(args.output_file) == 0:
raise ValueError("No output file specified")
Expand All @@ -170,7 +170,7 @@ def run(args: PreMigrationParams) -> PreMigrationResult:
diff = get_index_differences(source_indices, target_indices)
# The first element in the tuple is the set of indices to create
indices_to_create = diff[0]
result = PreMigrationResult()
result = MetadataMigrationResult()
if indices_to_create:
result.created_indices = indices_to_create
result.target_doc_count = index_operations.doc_count(indices_to_create, source_endpoint_info)
Expand All @@ -193,7 +193,7 @@ def run(args: PreMigrationParams) -> PreMigrationResult:
if __name__ == '__main__': # pragma no cover
# Set up parsing for command line arguments
arg_parser = argparse.ArgumentParser(
prog="python pre_migration.py",
prog="python metadata_migration.py",
description="This tool creates indices on a target cluster based on the contents of a source cluster.\n" +
"The first input to the tool is a path to a Data Prepper pipeline YAML file, which is parsed to obtain " +
"the source and target cluster endpoints.\nThe second input is an output path to which a modified version " +
Expand All @@ -220,4 +220,4 @@ def run(args: PreMigrationParams) -> PreMigrationResult:
arg_parser.add_argument("--dryrun", action="store_true",
help="Skips the actual creation of indices on the target cluster")
namespace = arg_parser.parse_args()
run(PreMigrationParams(namespace.config_file_path, namespace.output_file, namespace.report, namespace.dryrun))
run(MetadataMigrationParams(namespace.config_file_path, namespace.output_file, namespace.report, namespace.dryrun))
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@


@dataclass
class PreMigrationParams:
class MetadataMigrationParams:
config_file_path: str
output_file: str = ""
report: bool = False
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@


@dataclass
class PreMigrationResult:
class MetadataMigrationResult:
target_doc_count: int = 0
created_indices: set = field(default_factory=set)
41 changes: 30 additions & 11 deletions FetchMigration/python/migration_monitor.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import argparse
import logging
import time
from typing import Optional, List
import math

import requests
from prometheus_client import Metric
Expand Down Expand Up @@ -41,41 +43,58 @@ def get_metric_value(metric_families: List, metric_suffix: str) -> Optional[int]
return None


def check_if_complete(doc_count: Optional[int], in_flight: Optional[int], no_part_count: Optional[int],
prev_no_part_count: int, target: int) -> bool:
def check_if_complete(doc_count: Optional[int], in_flight: Optional[int], no_partition_count: Optional[int],
prev_no_partition: int, target: int) -> bool:
# Check for target doc_count
# TODO Add a check for partitionsCompleted = indices
if doc_count is not None and doc_count >= target:
# Check for idle pipeline
logging.info("Target doc count reached, checking for idle pipeline...")
# Debug metrics
if logging.getLogger().isEnabledFor(logging.DEBUG): # pragma no cover
debug_msg_template: str = "Idle pipeline metrics - " + \
"Records in flight: [{0}], " + \
"No-partitions counter: [{1}]" + \
"Previous no-partition value: [{2}]"
logging.debug(debug_msg_template.format(in_flight, no_partition_count, prev_no_partition))

if in_flight is not None and in_flight == 0:
# No-partitions metrics should steadily tick up
if no_part_count is not None and no_part_count > prev_no_part_count > 0:
if no_partition_count is not None and no_partition_count > prev_no_partition > 0:
return True
return False


def run(args: MigrationMonitorParams, wait_seconds: int = 30) -> None:
def run(args: MigrationMonitorParams, poll_interval_seconds: int = 30) -> None:
# TODO Remove hardcoded EndpointInfo
default_auth = ('admin', 'admin')
endpoint = EndpointInfo(args.dp_endpoint, default_auth, False)
endpoint = EndpointInfo(args.data_prepper_endpoint, default_auth, False)
target_doc_count: int = args.target_count
prev_no_partitions_count = 0
terminal = False
logging.info("Starting migration monitor until target doc count: " + str(target_doc_count))
while not terminal:
time.sleep(poll_interval_seconds)
# If the API call fails, the response is empty
metrics = fetch_prometheus_metrics(endpoint)
if metrics is not None:
success_docs = get_metric_value(metrics, __DOC_SUCCESS_METRIC)
rec_in_flight = get_metric_value(metrics, __RECORDS_IN_FLIGHT_METRIC)
no_partitions_count = get_metric_value(metrics, __NO_PARTITIONS_METRIC)
if success_docs is not None: # pragma no cover
completion_percentage: int = math.floor((success_docs * 100) / target_doc_count)
progress_message: str = "Completed " + str(success_docs) + \
" docs ( " + str(completion_percentage) + "% )"
logging.info(progress_message)
else:
logging.info("Could not fetch metrics from Data Prepper, will retry on next polling cycle...")
terminal = check_if_complete(success_docs, rec_in_flight, no_partitions_count,
prev_no_partitions_count, args.target_count)
prev_no_partitions_count, target_doc_count)
if not terminal:
# Save no_partitions_count
prev_no_partitions_count = no_partitions_count

if not terminal:
time.sleep(wait_seconds)
# Loop terminated, shut down the Data Prepper pipeline
logging.info("Migration monitor observed successful migration and idle pipeline, shutting down...\n")
shutdown_pipeline(endpoint)


Expand All @@ -90,7 +109,7 @@ def run(args: MigrationMonitorParams, wait_seconds: int = 30) -> None:
)
# Required positional arguments
arg_parser.add_argument(
"dp_endpoint",
"data_prepper_endpoint",
help="URL endpoint for the running Data Prepper process"
)
arg_parser.add_argument(
Expand All @@ -100,5 +119,5 @@ def run(args: MigrationMonitorParams, wait_seconds: int = 30) -> None:
)
namespace = arg_parser.parse_args()
print("\n##### Starting monitor tool... #####\n")
run(MigrationMonitorParams(namespace.target_count, namespace.dp_endpoint))
run(MigrationMonitorParams(namespace.target_count, namespace.data_prepper_endpoint))
print("\n##### Ending monitor tool... #####\n")
2 changes: 1 addition & 1 deletion FetchMigration/python/migration_monitor_params.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,4 @@
@dataclass
class MigrationMonitorParams:
target_count: int
dp_endpoint: str = "https://localhost:4900"
data_prepper_endpoint: str
78 changes: 78 additions & 0 deletions FetchMigration/python/tests/test_fetch_orchestrator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
import unittest
from unittest.mock import patch, MagicMock, ANY

import fetch_orchestrator as orchestrator
from migration_monitor_params import MigrationMonitorParams
from metadata_migration_params import MetadataMigrationParams
from metadata_migration_result import MetadataMigrationResult


class TestFetchOrchestrator(unittest.TestCase):

@patch('migration_monitor.run')
@patch('subprocess.Popen')
@patch('metadata_migration.run')
# Note that mock objects are passed bottom-up from the patch order above
def test_orchestrator_run(self, mock_metadata_migration: MagicMock, mock_subprocess: MagicMock,
mock_monitor: MagicMock):
test_path = "test_path"
test_file = "test_file"
test_host = "test_host"
# Setup mock pre-migration
expected_metadata_migration_input = MetadataMigrationParams(test_file, test_path + "/pipelines/pipeline.yaml",
report=True)
test_result = MetadataMigrationResult(10, {"index1", "index2"})
expected_monitor_input = MigrationMonitorParams(test_result.target_doc_count, test_host)
mock_metadata_migration.return_value = test_result
# setup subprocess return value
mock_subprocess.return_value.returncode = 0
# Run test
orchestrator.run(test_path, test_file, test_host)
mock_metadata_migration.assert_called_once_with(expected_metadata_migration_input)
expected_dp_runnable = test_path + "/bin/data-prepper"
mock_subprocess.assert_called_once_with(expected_dp_runnable)
mock_monitor.assert_called_once_with(expected_monitor_input)
mock_subprocess.return_value.terminate.assert_not_called()

@patch('migration_monitor.run')
@patch('subprocess.Popen')
@patch('metadata_migration.run')
# Note that mock objects are passed bottom-up from the patch order above
def test_orchestrator_shutdown_workaround(self, mock_metadata_migration: MagicMock, mock_subprocess: MagicMock,
mock_monitor: MagicMock):
test_path = "test_path"
test_file = "test_file"
test_host = "test_host"
# Setup mock pre-migration
expected_metadata_migration_input = MetadataMigrationParams(test_file, test_path + "/pipelines/pipeline.yaml",
report=True)
test_result = MetadataMigrationResult(10, {"index1", "index2"})
expected_monitor_input = MigrationMonitorParams(test_result.target_doc_count, test_host)
mock_metadata_migration.return_value = test_result
# set subprocess return value to None to simulate a zombie Data Prepper process
mock_subprocess.return_value.returncode = None
# Run test
orchestrator.run(test_path, test_file, test_host)
mock_metadata_migration.assert_called_once_with(expected_metadata_migration_input)
expected_dp_runnable = test_path + "/bin/data-prepper"
mock_subprocess.assert_called_once_with(expected_dp_runnable)
mock_monitor.assert_called_once_with(expected_monitor_input)
mock_subprocess.return_value.terminate.assert_called_once()

@patch('migration_monitor.run')
@patch('subprocess.Popen')
@patch('metadata_migration.run')
# Note that mock objects are passed bottom-up from the patch order above
def test_orchestrator_no_migration(self, mock_metadata_migration: MagicMock, mock_subprocess: MagicMock,
mock_monitor: MagicMock):
# Setup empty result from pre-migration
mock_metadata_migration.return_value = MetadataMigrationResult()
orchestrator.run("test", "test", "test")
mock_metadata_migration.assert_called_once_with(ANY)
# Subsequent steps should not be called
mock_subprocess.assert_not_called()
mock_monitor.assert_not_called()


if __name__ == '__main__':
unittest.main()
Loading

0 comments on commit 8765da5

Please sign in to comment.