Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support chain dag in example_app #267

Closed
wants to merge 18 commits into from
52 changes: 39 additions & 13 deletions prototype/examples/example_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,15 @@

import time_estimators

file_mounts = {
'~/application_default_credentials.json': '~/.config/gcloud/application_default_credentials.json',
'~/.aws': '~/.aws',
}
setup = """
echo "export GOOGLE_APPLICATION_CREDENTIALS=~/application_default_credentials.json" >> ~/.bashrc
echo "export GOOGLE_CLOUD_PROJECT=intercloud-320520" >> ~/.bashrc
"""


def make_application():
"""A simple application: train_op -> infer_op."""
Expand All @@ -27,17 +36,27 @@ def make_application():
# Train.
train_op = sky.Task(
'train_op',
run='python train.py --data_dir=INPUTS[0] --model_dir=OUTPUTS[0]')
# run='python train.py --data_dir=INPUTS[0] --model_dir=OUTPUTS[0]')
run="""\
echo "Training on INPUTS[0]"; ls INPUTS[0]
mkdir -p OUTPUTS[0]
echo $(hostname) >> OUTPUTS[0]/model-1.pt; echo "Generated model in OUTPUTS[0]"
""",
setup=setup)

train_op.set_inputs(
's3://my-imagenet-data',
# estimated_size_gigabytes=150,
# 's3://my-imagenet-data',
# TODO: Change this bucket to your own bucket.
's3://sky-example-test',
estimated_size_gigabytes=150,
# estimated_size_gigabytes=1500,
estimated_size_gigabytes=600,
# estimated_size_gigabytes=600,
)

# 'CLOUD': saves to the cloud this op ends up executing on.
train_op.set_outputs('CLOUD://my-model', estimated_size_gigabytes=0.1)
# TODO: This bucket should be globally unique and available to you.
train_op.set_outputs('CLOUD://sky-my-model',
estimated_size_gigabytes=0.1)

train_op.set_resources({
sky.Resources(sky.AWS(), 'p3.2xlarge'), # 1 V100, EC2.
Expand All @@ -46,27 +65,34 @@ def make_application():
sky.Resources(sky.GCP(), 'n1-standard-8', 'tpu-v3-8'),
})

train_op.set_file_mounts(file_mounts)

train_op.set_time_estimator(time_estimators.resnet50_estimate_runtime)

# Infer.
infer_op = sky.Task('infer_op',
run='python infer.py --model_dir=INPUTS[0]')
infer_op = sky.Task(
'infer_op',
# run='python infer.py --model_dir=INPUTS[0]')
run='echo "Infering on INPUTS[0]"; ls INPUTS[0]/model-1.pt',
setup=setup)

# Data dependency.
# FIXME: make the system know this is from train_op's outputs.
infer_op.set_inputs(train_op.get_outputs(),
estimated_size_gigabytes=0.1)

infer_op.set_resources({
sky.Resources(sky.AWS(), 'inf1.2xlarge'),
sky.Resources(sky.AWS(), 'p3.2xlarge'),
sky.Resources(sky.GCP(), 'n1-standard-4', 'T4'),
sky.Resources(sky.GCP(), 'n1-standard-8', 'T4'),
sky.Resources(sky.AWS(), 'inf1.2xlarge', use_spot=True),
sky.Resources(sky.AWS(), 'p3.2xlarge', use_spot=True),
sky.Resources(sky.GCP(), 'n1-standard-4', 'T4', use_spot=True),
sky.Resources(sky.GCP(), 'n1-standard-8', 'T4', use_spot=True),
})

infer_op.set_time_estimator(
time_estimators.resnet50_infer_estimate_runtime)

infer_op.set_file_mounts(file_mounts)

# Chain the sky.tasks (Airflow syntax).
# The dependency represents data flow.
train_op >> infer_op
Expand All @@ -75,5 +101,5 @@ def make_application():


dag = make_application()
sky.optimize(dag, minimize=sky.OptimizeTarget.COST)
# sky.optimize(dag, minimize=OptimizeTarget.TIME)
# sky.launch_chain(dag, optimize_target=sky.OptimizeTarget.COST)
sky.launch_chain(dag, optimize_target=sky.OptimizeTarget.TIME)
3 changes: 2 additions & 1 deletion prototype/sky/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from sky import clouds
from sky.clouds.service_catalog import list_accelerators
from sky.dag import Dag, DagContext
from sky.execution import launch, exec # pylint: disable=redefined-builtin
from sky.execution import launch, launch_chain, exec # pylint: disable=redefined-builtin
from sky.resources import Resources
from sky.task import Task
from sky.registry import fill_in_launchable_resources
Expand All @@ -33,6 +33,7 @@
'Task',
'backends',
'launch',
'launch_chain',
'exec',
'fill_in_launchable_resources',
'list_accelerators',
Expand Down
38 changes: 22 additions & 16 deletions prototype/sky/backends/cloud_vm_ray_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,19 @@ def _remove_cluster_from_ssh_config(cluster_ip: str,
backend_utils.SSHConfigHelper.remove_cluster(cluster_ip, auth_config)


def _create_gpu_dict_from_accelerators(
accelerator_dict: Dict[str, int]) -> Dict[str, int]:
acc_name = list(accelerator_dict.keys())[0]
acc_count = list(accelerator_dict.values())[0]
gpu_dict = {'GPU': acc_count}
# gpu_dict should be empty when the accelerator is not GPU.
# FIXME: This is a hack to make sure that we do not reserve
# GPU when requesting TPU.
if any(acc in acc_name.lower() for acc in ['tpu', 'inferentia']):
gpu_dict = dict()
return gpu_dict


class RayCodeGen:
"""Code generator of a Ray program that executes a sky.Task.

Expand Down Expand Up @@ -197,14 +210,7 @@ def add_gang_scheduling_placement_group(
self._ip_to_bundle_index = {ip: i for i, ip in enumerate(ip_list)}

if accelerator_dict is not None:
acc_name = list(accelerator_dict.keys())[0]
acc_count = list(accelerator_dict.values())[0]
gpu_dict = {'GPU': acc_count}
# gpu_dict should be empty when the accelerator is not GPU.
# FIXME: This is a hack to make sure that we do not reserve
# GPU when requesting TPU.
if 'tpu' in acc_name.lower():
gpu_dict = dict()
gpu_dict = _create_gpu_dict_from_accelerators(accelerator_dict)
for bundle in bundles:
bundle.update({
**accelerator_dict,
Expand Down Expand Up @@ -261,14 +267,14 @@ def add_ray_task(
f' Found: {ray_resources_dict}.')
resources_str = f', resources={json.dumps(ray_resources_dict)}'

# Passing this ensures that the Ray remote task gets
# CUDA_VISIBLE_DEVICES set correctly. If not passed, that flag
# would be force-set to empty by Ray.
num_gpus_str = f', num_gpus={list(ray_resources_dict.values())[0]}'
# `num_gpus` should be empty when the accelerator is not GPU.
# FIXME: use a set of GPU types.
resources_key = list(ray_resources_dict.keys())[0]
if 'tpu' in resources_key.lower():
gpu_dict = _create_gpu_dict_from_accelerators(ray_resources_dict)
if len(gpu_dict) > 0:
# Passing this ensures that the Ray remote task gets
# CUDA_VISIBLE_DEVICES set correctly. If not passed, that flag
# would be force-set to empty by Ray.
num_gpus_str = f', num_gpus={list(gpu_dict.values())[0]}'
else:
# `num_gpus` should be empty when the accelerator is not GPU.
num_gpus_str = ''

bundle_index = self._ip_to_bundle_index[gang_scheduling_ip]
Expand Down
23 changes: 14 additions & 9 deletions prototype/sky/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -461,6 +461,7 @@ def launch(entrypoint: str, cluster: Optional[str], dryrun: bool,
new_resources = copy.deepcopy(list(task.resources)[0])

if cloud is not None:
cloud = cloud.lower()
if cloud not in clouds.CLOUD_REGISTRY:
raise click.UsageError(
f'Cloud \'{cloud}\' is not supported. '
Expand Down Expand Up @@ -1109,10 +1110,12 @@ def gpunode(cluster: str, port_forward: Optional[List[int]],
gpus is None and spot is None)
default_resources = _INTERACTIVE_NODE_DEFAULT_RESOURCES['gpunode']
cloud_provider = clouds.CLOUD_REGISTRY.get(cloud, default_resources.cloud)
if cloud is not None and cloud not in clouds.CLOUD_REGISTRY:
raise click.UsageError(
f'Cloud \'{cloud}\' is not supported. '
f'Supported clouds: {list(clouds.CLOUD_REGISTRY.keys())}')
if cloud is not None:
cloud = cloud.lower()
if cloud not in clouds.CLOUD_REGISTRY:
raise click.UsageError(
f'Cloud \'{cloud}\' is not supported. '
f'Supported clouds: {list(clouds.CLOUD_REGISTRY.keys())}')
if gpus is not None:
gpus = _parse_accelerator_options(gpus)
elif instance_type is None:
Expand Down Expand Up @@ -1184,11 +1187,13 @@ def cpunode(cluster: str, port_forward: Optional[List[int]],
spot is None)
default_resources = _INTERACTIVE_NODE_DEFAULT_RESOURCES['cpunode']
cloud_provider = clouds.CLOUD_REGISTRY.get(cloud, None)
if cloud is not None and cloud not in clouds.CLOUD_REGISTRY:
raise click.UsageError(
f'Cloud \'{cloud}\' is not supported. ' + \
f'Supported clouds: {list(clouds.CLOUD_REGISTRY.keys())}'
)
if cloud is not None:
cloud = cloud.lower()
if cloud not in clouds.CLOUD_REGISTRY:
raise click.UsageError(
f'Cloud \'{cloud}\' is not supported. ' + \
f'Supported clouds: {list(clouds.CLOUD_REGISTRY.keys())}'
)
if instance_type is None:
instance_type = default_resources.instance_type
if spot is None:
Expand Down
3 changes: 3 additions & 0 deletions prototype/sky/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,3 +70,6 @@ def __repr__(self):

def get_graph(self):
return self.graph

def get_sorted_tasks(self):
return nx.topological_sort(self.graph)
4 changes: 1 addition & 3 deletions prototype/sky/data/data_transfer.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,7 @@ def s3_to_gcs(s3_bucket_name: str, gs_bucket_name: str) -> None:
session = aws.session()
aws_credentials = session.get_credentials().get_frozen_credentials()

with open(os.environ['GOOGLE_APPLICATION_CREDENTIALS'], 'r') as fp:
gcp_credentials = json.load(fp)
project_id = gcp_credentials['project_id']
project_id = os.environ['GCLOUD_PROJECT']

# Update cloud bucket IAM role to allow for data transfer
storage_account = storagetransfer.googleServiceAccounts().get(
Expand Down
22 changes: 19 additions & 3 deletions prototype/sky/data/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from multiprocessing import pool
from typing import Any, Dict, Optional, Tuple

from sky import clouds
from sky.data import data_utils, data_transfer
from sky import sky_logging
from sky.cloud_adaptors import aws, gcp
Expand All @@ -16,9 +17,24 @@


class StorageType(enum.Enum):
S3 = 0
GCS = 1
AZURE = 2
S3 = 's3://'
GCS = 'gs://'
AZURE = 'azure://'


def get_storage_type_from_cloud(cloud: clouds.Cloud) -> StorageType:
if isinstance(cloud, clouds.AWS):
return StorageType.S3
if isinstance(cloud, clouds.GCP):
return StorageType.GCS
raise NotImplementedError(f'Unknown cloud: {cloud}')


def get_storage_name(storage_path: str, default_name: Optional[str]):
if '://' in storage_path:
return storage_path.split('://')[-1].split('/')[0]
assert default_name is not None
return default_name


class AbstractStore:
Expand Down
Loading