Skip to content

Latest commit

 

History

History
203 lines (146 loc) · 9.25 KB

PYPIDOC.md

File metadata and controls

203 lines (146 loc) · 9.25 KB

MWAA Disaster Recovery

MWAA Python Black CodeCoverage PyPI version

The mwaa-dr PyPi package is a part of the larger MWAA DR solution for the use case where exporting and importing metadata store needs to be performed independent to the broader DR solution. As of the time of writing, access to the MWAA metadata store is only available through DAGs. This solution simplifies the process of creating backup and restore DAGs, respectively, by providing a reusable python library.

Installation

You can install the mwaa-dr package by including the latest version in your MWAA requirements.txt file.

Simple Use Case

Let's look at creating a metadata backup and restore dags, respectively, as follows:

Metadata Backup DAG

Let's assume your environment version is 2.8.1. You can create a metadata backup dag by creating a python file in your MWAA dags folder as follows:

backup_metadata.py:

# Importing DAG is necessary for DAG detection
from airflow import DAG
from mwaa_dr.v_2_8.dr_factory import DRFactory_2_8

factory = DRFactory_2_8(
    dag_id='backup',
    path_prefix='data',
    storage_type='S3'
)

# Assigning the returned dag to a global variable is necessary for DAG detection
dag: DAG = factory.create_backup_dag()

For running backup and restore on your Amazon MWAA environment on AWS, you need to do the following:

  1. Ensure you have an S3 bucket created to store the backup.
  2. Ensure that your MWAA execution role has read and write permissions on the bucket.
  3. Create an Airflow variable with the key named DR_BACKUP_BUCKET and the value containing the name (not ARN) of the S3 bucket.
  4. You are all set to manually trigger the backup and restore DAGs at any point. The metadata backup will be stored in <backup S3 bucket>/<path_prefix>.

If you want to use the solution with aws-mwaa-local-runner, change the storage_type argument from S3 to LOCAL_FS. The backup will be located in the dags/data folder or more generally at the dags/<path_prefix> folder of the local runner project.

Here is a sample run of the backup workflow:

Backup Workflow

Metadata Restore DAG

You can create a metadata restore dag by creating a python file in your MWAA dags folder as follows:

restore_metadata.py:

from airflow import DAG
from mwaa_dr.v_2_8.dr_factory import DRFactory_2_8

factory = DRFactory_2_8(
    dag_id='restore',
    path_prefix='data',
    storage_type='S3'
)

dag:DAG = factory.create_restore_dag()

Here is a sample run of the restore workflow:

Restore Workflow

Please note that variable and connection tables are handled specially during the restore process. You can specify a restore strategy to be applied for these two tables by setting DR_VARIABLE_RESTORE_STRATEGY and DR_CONNECTION_RESTORE_STRATEGY Airflow variables. These variables can take on of the following values:

  1. DO_NOTHING: As the name suggests, this strategy will not restore the variable and connection tables from the backup. This strategy is particularly useful if your MWAA environments have been configured to use AWS Secrets Manager for storing variables and connections.

  2. APPEND: With this strategy, the restore workflow will not overwrite existing entries of the variable and connection tables and only add missing entries from the backup.

  3. REPLACE: This strategy will overwrite existing variable and connections from backup.

Note that these two Airflow variables are treated specially and are unaffected by the restore process of the variable table. In the absence of these variables, the default value of APPEND is used for both variable and connection restores.

Note that you will need an empty database for restore to work. To cleanup the database before restore, please use the cleanup_metadata DAG discussed next.

Metadata Cleanup DAG

You can create a metadata cleanup dag by creating a python file in your MWAA dags folder as follows:

cleanup_metadata.py:

from airflow import DAG
from mwaa_dr.v_2_8.dr_factory import DRFactory_2_8

factory = DRFactory_2_8(
    dag_id='cleanup',
    path_prefix='data',
    storage_type='S3'
)

dag:DAG = factory.create_cleanup_dag()

Advance Use Case

You may have some advance use cases, such as, a need to exclude or include additional tables from your backup, update SQL scripts for specific tables, and others. Note that, by default, the solution backs up only variable, connection, slot_pool, log, job, dag_run, trigger, task_instance, and task_fail tables. Majority of other tables are auto-generated by scheduler or by the webserver and thus, excluded from the list of tables to be backed up.

To add/remove tables from the backup or customize any aspects of the solution, you will derive from an appropriate factory class and override its methods. To see this more concretely, let's assume you want to exclude variable and connection tables from the backup and restore operations. Please follow the subsequent implementation guidelines.

First, find the appropriate factory class from the supported versions in the mwaa_dr package. Let's assume your environment version is 2.7.2. So, you will pick DRFactory_2_7 as your base class for inheritance and override its setup_tables method.

Note that majority of the functionality for the DR framework has either been implemented in BaseDRFactory or in DRFactory_2_5. The other factories just implement a chain of inheritance from these classes and override specific methods to implement differences.

Here is a sample implementation of your derived class that you will need to create in your dags folder, let's name the file custom_dr_factory_2_7.py:

custom_dr_factory_2_7.py:

from mwaa_dr.framework.model.base_table import BaseTable
from mwaa_dr.framework.model.dependency_model import DependencyModel
from mwaa_dr.v_2_7.dr_factory import DRFactory_2_7

class CustomDRFactory_2_7(DRFactory_2_7):
    def setup_tables(self, model: DependencyModel[BaseTable]) -> list[BaseTable]:
        # Create needed tables, all extend from the BaseTable class
        active_dag = self.active_dag(model)

        # Comment out variable and connection from the previous implementation
        # variable = self.variable(model)
        # connection = self.connection(model)
        slot_pool = self.slot_pool(model)

        log = self.log(model)
        job = self.job(model)
        dag_run = self.dag_run(model)
        trigger = self.trigger(model)

        task_instance = self.task_instance(model)
        task_fail = self.task_fail(model)

        # Specify dependencies
        task_instance << [job, trigger, dag_run]
        task_fail << [task_instance, dag_run]
        active_dag << [
            # variable,
            # connection,
            slot_pool,
            log,
            job,
            dag_run,
            trigger,
            task_instance,
            task_fail,
        ]

        # Return the list of tables to be included in backup and restore
        return [
            # variable,
            # connection,
            slot_pool,
            log,
            job,
            dag_run,
            trigger,
            task_instance,
            task_fail,
            active_dag,
        ]

Here is your metadata backup dag that will use your custom factory (also in the dags folder):

backup.py:

from airflow import DAG
from custom_dr_factory_2_7 import CustomDRFactory_2_7

factory = CustomDRFactory_2_7(
    dag_id='backup',
    path_prefix='data',
    storage_type='S3'
)

dag:DAG = factory.create_backup_dag()

And finally, here is your metadata restore dag (also in the dags folder):

restore.py:

from airflow import DAG
from custom_dr_factory_2_7 import CustomDRFactory_2_7

factory = CustomDRFactory_2_7(
    dag_id='restore',
    path_prefix='data',
    storage_type='S3'
)

dag: DAG = factory.create_restore_dag()

For additional details, please visit the project homepage.