Skip to content

Commit

Permalink
[Spot] Add eager failover strategy (#2234)
Browse files Browse the repository at this point in the history
* Add aggressive failover strategy

* remove uneccessary code

* reset blocked resources

* reset blocekd_resources to None

* optimize the case where the region is specified

* Use the new eager failover by default

* address comments

* format

* address comment
  • Loading branch information
Michaelvll committed Aug 3, 2023
1 parent 6f52a0b commit ca2a092
Show file tree
Hide file tree
Showing 3 changed files with 110 additions and 12 deletions.
21 changes: 17 additions & 4 deletions sky/backends/cloud_vm_ray_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -629,11 +629,19 @@ class GangSchedulingStatus(enum.Enum):
GANG_FAILED = 1
HEAD_FAILED = 2

def __init__(self, log_dir: str, dag: 'dag.Dag',
def __init__(self,
log_dir: str,
dag: 'dag.Dag',
optimize_target: 'optimizer.OptimizeTarget',
requested_features: Set[clouds.CloudImplementationFeatures],
local_wheel_path: pathlib.Path, wheel_hash: str):
local_wheel_path: pathlib.Path,
wheel_hash: str,
blocked_resources: Optional[Iterable[
resources_lib.Resources]] = None):
self._blocked_resources: Set[resources_lib.Resources] = set()
if blocked_resources:
# blocked_resources is not None and not empty.
self._blocked_resources.update(blocked_resources)

self.log_dir = os.path.expanduser(log_dir)
self._dag = dag
Expand Down Expand Up @@ -2565,8 +2573,13 @@ def _provision(
# of optimization infinitely.
try:
provisioner = RetryingVmProvisioner(
self.log_dir, self._dag, self._optimize_target,
self._requested_features, local_wheel_path, wheel_hash)
self.log_dir,
self._dag,
self._optimize_target,
self._requested_features,
local_wheel_path,
wheel_hash,
blocked_resources=task.blocked_resources)
config_dict = provisioner.provision_with_retries(
task, to_provision_config, dryrun, stream_logs)
break
Expand Down
94 changes: 87 additions & 7 deletions sky/spot/recovery_strategy.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,21 @@
"""The strategy to handle launching/recovery/termination of spot clusters."""
"""The strategy to handle launching/recovery/termination of spot clusters.
In the YAML file, the user can specify the strategy to use for spot jobs.
resources:
spot_recovery: EAGER_NEXT_REGION
"""
import time
import traceback
import typing
from typing import Optional, Tuple

import sky
from sky import backends
from sky import exceptions
from sky import global_user_state
from sky import sky_logging
from sky import status_lib
from sky import backends
from sky.backends import backend_utils
from sky.skylet import job_lib
from sky.spot import spot_utils
Expand Down Expand Up @@ -357,7 +363,8 @@ def _launch(self,
time.sleep(gap_seconds)


class FailoverStrategyExecutor(StrategyExecutor, name='FAILOVER', default=True):
class FailoverStrategyExecutor(StrategyExecutor, name='FAILOVER',
default=False):
"""Failover strategy: wait in same region and failover after timout."""

_MAX_RETRY_CNT = 240 # Retry for 4 hours.
Expand Down Expand Up @@ -385,6 +392,8 @@ def _launch(self,
launched_resources = handle.launched_resources
self._launched_cloud_region = (launched_resources.cloud,
launched_resources.region)
else:
self._launched_cloud_region = None
return job_submitted_at

def recover(self) -> float:
Expand All @@ -397,9 +406,6 @@ def recover(self) -> float:
# Step 1
self._try_cancel_all_jobs()

# Retry the entire block until the cluster is up, so that the ratio of
# the time spent in the current region and the time spent in the other
# region is consistent during the retry.
while True:
# Add region constraint to the task, to retry on the same region
# first (if valid).
Expand All @@ -422,7 +428,6 @@ def recover(self) -> float:
# Step 2
logger.debug('Terminating unhealthy spot cluster and '
'reset cloud region.')
self._launched_cloud_region = None
terminate_cluster(self.cluster_name)

# Step 3
Expand All @@ -445,3 +450,78 @@ def recover(self) -> float:
f'{self._MAX_RETRY_CNT} times.')

return job_submitted_at


class EagerFailoverStrategyExecutor(FailoverStrategyExecutor,
name='EAGER_NEXT_REGION',
default=True):
"""Eager failover strategy.
This strategy is an extension of the FAILOVER strategy. Instead of waiting
in the same region when the preemption happens, it immediately terminates
the cluster and relaunches it in a different region. This is based on the
observation that the preemption is likely to happen again shortly in the
same region, so trying other regions first is more likely to get a longer
running cluster.
"""

def recover(self) -> float:
# 1. Terminate the current cluster
# 2. Launch again by explicitly blocking the previously launched region
# (this will failover through the entire search space except the
# previously launched region)
# 3. (If step 2 failed) Retry forever: Launch again with no blocked
# locations (this will failover through the entire search space)
#
# The entire search space is defined by the original task request,
# task.resources.

# Step 1
logger.debug('Terminating unhealthy spot cluster and '
'reset cloud region.')
terminate_cluster(self.cluster_name)

# Step 2
logger.debug('Relaunch the cluster skipping the previously launched '
'cloud/region.')
if self._launched_cloud_region is not None:
task = self.dag.tasks[0]
requested_resources = list(task.resources)[0]
if (requested_resources.region is None and
requested_resources.zone is None):
# Optimization: We only block the previously launched region,
# if the requested resources does not specify a region or zone,
# because, otherwise, we will spend unnecessary time for
# skipping the only specified region/zone.
launched_cloud, launched_region = self._launched_cloud_region
task.blocked_resources = {
requested_resources.copy(cloud=launched_cloud,
region=launched_region)
}
# Not using self.launch to avoid the retry until up logic.
job_submitted_at = self._launch(raise_on_failure=False)
task.blocked_resources = None
if job_submitted_at is not None:
return job_submitted_at

while True:
# Step 3
logger.debug('Relaunch the cluster without constraining to prior '
'cloud/region.')
# Not using self.launch to avoid the retry until up logic.
job_submitted_at = self._launch(max_retry=self._MAX_RETRY_CNT,
raise_on_failure=False)
if job_submitted_at is None:
# Failed to launch the cluster.
if self.retry_until_up:
gap_seconds = self.RETRY_INIT_GAP_SECONDS
logger.info('Retrying to recover the spot cluster in '
f'{gap_seconds:.1f} seconds.')
time.sleep(gap_seconds)
continue
with ux_utils.print_exception_no_traceback():
raise exceptions.ResourcesUnavailableError(
f'Failed to recover the spot cluster after retrying '
f'{self._MAX_RETRY_CNT} times.')

return job_submitted_at
7 changes: 6 additions & 1 deletion sky/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import os
import re
import typing
from typing import Any, Callable, Dict, List, Optional, Set, Tuple, Union
from typing import Any, Callable, Dict, Iterable, List, Optional, Set, Tuple, Union

import yaml

Expand Down Expand Up @@ -118,6 +118,7 @@ def __init__(
# Advanced:
docker_image: Optional[str] = None,
event_callback: Optional[str] = None,
blocked_resources: Optional[Iterable['resources_lib.Resources']] = None,
):
"""Initializes a Task.
Expand Down Expand Up @@ -172,6 +173,7 @@ def __init__(
docker_image: (EXPERIMENTAL: Only in effect when LocalDockerBackend
is used.) The base docker image that this Task will be built on.
Defaults to 'gpuci/miniforge-cuda:11.4-devel-ubuntu18.04'.
blocked_resources: A set of resources that this task cannot run on.
"""
self.name = name
self.run = run
Expand All @@ -194,6 +196,9 @@ def __init__(
self.estimated_outputs_size_gigabytes = None
# Default to CPUNode
self.resources = {sky.Resources()}
# Resources that this task cannot run on.
self.blocked_resources = blocked_resources

self.time_estimator_func: Optional[Callable[['sky.Resources'],
int]] = None
self.file_mounts: Optional[Dict[str, str]] = None
Expand Down

0 comments on commit ca2a092

Please sign in to comment.