Skip to content

Commit

Permalink
cluster list filters
Browse files Browse the repository at this point in the history
  • Loading branch information
Alexandra Belousov authored and Alexandra Belousov committed Sep 8, 2024
1 parent e41609e commit e150b64
Show file tree
Hide file tree
Showing 2 changed files with 207 additions and 70 deletions.
1 change: 1 addition & 0 deletions runhouse/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@
SECOND = 1
MINUTE = 60
HOUR = 3600
DAY = HOUR * 24
DEFAULT_STATUS_CHECK_INTERVAL = 1 * MINUTE
INCREASED_STATUS_CHECK_INTERVAL = 1 * HOUR
GPU_COLLECTION_INTERVAL = 5 * SECOND
Expand Down
276 changes: 206 additions & 70 deletions runhouse/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,11 @@
from runhouse import __version__, cluster, Cluster, configs
from runhouse.constants import (
BULLET_UNICODE,
DAY,
DOUBLE_SPACE_UNICODE,
HOUR,
LAST_ACTIVE_AT_TIMEFRAME,
MINUTE,
RAY_KILL_CMD,
RAY_START_CMD,
SERVER_LOGFILE,
Expand All @@ -44,6 +47,7 @@
check_for_existing_ray_instance,
kill_actors,
)
from runhouse.resources.hardware.utils import ResourceServerStatus

from runhouse.utils import get_status_color

Expand Down Expand Up @@ -445,7 +449,7 @@ def _print_status(status_data: dict, current_cluster: Cluster) -> None:
cluster_cpu_utilization: float = status_data.get("server_cpu_utilization")

server_util_info = (
f"CPU Utilization: {round(cluster_cpu_utilization, 2)}% | GPU Utilization: {round(cluster_gpu_utilization,2)}%"
f"CPU Utilization: {round(cluster_cpu_utilization, 2)}% | GPU Utilization: {round(cluster_gpu_utilization, 2)}%"
if has_cuda
else f"CPU Utilization: {round(cluster_cpu_utilization, 2)}%"
)
Expand Down Expand Up @@ -524,10 +528,14 @@ def status(
_print_status(cluster_status, current_cluster)


async def aget_clusters_from_den():
async def aget_clusters_from_den(cluster_filters: dict):
httpx_client = httpx.AsyncClient()

get_clusters_params = {"resource_type": "cluster", "folder": rns_client.username}

if cluster_filters:
get_clusters_params.update(cluster_filters)

clusters_in_den_resp = await httpx_client.get(
f"{rns_client.api_server_url}/resource",
params=get_clusters_params,
Expand All @@ -537,52 +545,14 @@ async def aget_clusters_from_den():
return clusters_in_den_resp


def get_clusters_from_den():
return asyncio.run(aget_clusters_from_den())


@cluster_app.command("list")
def cluster_list():
"""Load Runhouse clusters"""
import sky

# logged out case
if not rh.configs.token:
# TODO [SB]: adjust msg formatting (coloring etc)
sky_cli_command_formatted = f"{italic_bold_ansi}sky status -r{reset_format}" # will be printed bold and italic
console.print(
f"This feature is available only for Den users. Please run {sky_cli_command_formatted} to get on-demand cluster(s) information or sign-up Den."
)
return

on_demand_clusters_sky = sky.status(refresh=True)

clusters_in_den_resp = get_clusters_from_den()
def get_clusters_from_den(cluster_filters: dict):
return asyncio.run(aget_clusters_from_den(cluster_filters=cluster_filters))

if clusters_in_den_resp.status_code != 200:
logger.error(f"Failed to load {rns_client.username}'s clusters from Den")
clusters_in_den = []
else:
clusters_in_den = clusters_in_den_resp.json().get("data")

clusters_in_den_names = [cluster.get("name") for cluster in clusters_in_den]

if not on_demand_clusters_sky and not clusters_in_den:
console.print("No existing clusters.")

if on_demand_clusters_sky:
# getting the on-demand clusters that are not saved in den.
on_demand_clusters_sky = [
cluster
for cluster in on_demand_clusters_sky
if f'/{rns_client.username}/{cluster.get("name")}'
not in clusters_in_den_names
]

running_clusters = []
not_running_clusters = []
def _get_running_and_not_running_clusters(clusters: list):
running_clusters, not_running_clusters = [], []

for den_cluster in clusters_in_den:
for den_cluster in clusters:
# get just name, not full rns address. reset is used so the name will be printed all in white.
cluster_name = f'[reset]{den_cluster.get("name").split("/")[-1]}'
cluster_type = den_cluster.get("data").get("resource_subtype")
Expand Down Expand Up @@ -617,49 +587,72 @@ def cluster_list():
) if cluster_status == "running" else not_running_clusters.append(cluster_info)

# TODO: will be used if we'll need to print not-running clusters
# Sort not-running clusters by the 'Status' column
# not_running_clusters = sorted(not_running_clusters, key=lambda x: x["Status"])

# Sort clusters by the 'Last Active (UTC)' column
# Sort clusters by the 'Last Active (UTC)' and 'Status' column
not_running_clusters = sorted(
not_running_clusters, key=lambda x: x["Last Active (UTC)"], reverse=True
not_running_clusters,
key=lambda x: (x["Last Active (UTC)"], x["Status"]),
reverse=True,
)
running_clusters = sorted(
running_clusters, key=lambda x: x["Last Active (UTC)"], reverse=True
)

# creating the clusters table
total_clusters = len(clusters_in_den)
table_title = f"[bold cyan]{rns_client.username}'s Clusters (Running: {len(running_clusters)}, Total: {total_clusters})[/bold cyan]"
return running_clusters, not_running_clusters


def _create_output_table(total_clusters: int, running_clusters: int):
table_title = f"[bold cyan]{rns_client.username}'s Clusters (Running: {running_clusters}, Total: {total_clusters})[/bold cyan]"
table = Table(title=table_title)

# Add columns to the table
table.add_column("Name", justify="left", no_wrap=True)
table.add_column("Cluster Type", justify="center", no_wrap=True)
table.add_column("Cluster Type", justify="left", no_wrap=True)
table.add_column("Status", justify="left")
table.add_column("Last Active (UTC)", justify="left")

# TODO: will be used if we'll need to print not-running clusters
# all_clusters = running_clusters + not_running_clusters
return table

for rh_cluster in running_clusters:
last_active_at = rh_cluster.get("Last Active (UTC)")

# Print Running clusters that were active it the last 24 hours
now = datetime.datetime.utcnow().replace(tzinfo=datetime.timezone.utc)
if (now - last_active_at).total_seconds() <= LAST_ACTIVE_AT_TIMEFRAME:
table.add_row(
rh_cluster.get("Name"),
rh_cluster.get("Cluster Type"),
get_status_color(rh_cluster.get("Status")),
str(last_active_at).split("+")[
0
], # The split is required to remove the offset (according to UTC)
)
def _add_cluster_as_table_row(table: Table, rh_cluster: dict):
last_active = rh_cluster.get("Last Active (UTC)")
last_active = last_active if last_active != "1970-01-01 00:00:00" else "Unknown"
table.add_row(
rh_cluster.get("Name"),
rh_cluster.get("Cluster Type"),
rh_cluster.get("Status"),
last_active,
)

console.print(table)
return table


def _add_clusters_to_output_table(
table: Table, clusters: list[Dict], filters: dict = None
):
if filters and "all" in filters.keys():
clusters = clusters[: (min(len(clusters), 50))]

live_clusters_not_in_den = len(on_demand_clusters_sky)
for rh_cluster in clusters:
last_active_at = rh_cluster.get("Last Active (UTC)")
last_active_at_no_offset = str(last_active_at).split("+")[
0
] # The split is required to remove the offset (according to UTC)
rh_cluster["Last Active (UTC)"] = last_active_at_no_offset
rh_cluster["Status"] = get_status_color(rh_cluster.get("Status"))

if filters and "all" in filters.keys():
table = _add_cluster_as_table_row(table, rh_cluster)
elif len(filters) > 0:
table = _add_cluster_as_table_row(table, rh_cluster)
else:
# Default behaviour: Print Running clusters that were active it the last 24 hours
now = datetime.datetime.utcnow().replace(tzinfo=datetime.timezone.utc)
if (now - last_active_at).total_seconds() <= LAST_ACTIVE_AT_TIMEFRAME:
table = _add_cluster_as_table_row(table, rh_cluster)


def _print_msg_live_sky_clusters(live_clusters_not_in_den: int):
if live_clusters_not_in_den > 0:
live_sky_clusters_str = (
f"There are {live_clusters_not_in_den} live clusters that are not saved in Den."
Expand All @@ -674,6 +667,149 @@ def cluster_list():
)


def _get_unsaved_live_clusters(den_clusters: list[Dict], sky_live_clusters: list):
den_clusters_names = [c.get("name") for c in den_clusters]

# getting the on-demand clusters that are not saved in den.
if sky_live_clusters:
return [
cluster
for cluster in sky_live_clusters
if f'/{rns_client.username}/{cluster.get("name")}' not in den_clusters_names
]
else:
return []


def _parse_time_duration(duration: str):
# A simple parser for duration like "15m", "2h", "3d"
unit = duration[-1]
value = duration[:-1]

if unit == "s":
return int(value)
elif unit == "m":
return int(value) * MINUTE # Convert minutes to seconds
elif unit == "h":
return int(value) * HOUR # Convert hours to seconds
elif unit == "d":
return int(value) * DAY # Convert days to seconds
else:
raise ValueError(
f"Invalid time unit: {unit}. Supported units are 's', 'm', 'h', and 'd'."
)


def _parse_cluster_type(cluster_type: str):
cluster_types_mapping_to_db_schema = {
"on-demand": "OnDemandCluster",
"cluster": "Cluster",
"k8": "kubernetes",
}

return cluster_types_mapping_to_db_schema.get(cluster_type)


def _parse_filters(since: str, cluster_status: str, cluster_subtype: str):
cluster_filters = {}

if since:
last_active_in: int = _parse_time_duration(
duration=since
) # return in represting the "since" filter in seconds
cluster_filters["last_active"] = last_active_in

if cluster_status:
if cluster_status.lower() not in ResourceServerStatus.__members__.values():
logger.error(f"The provided status ({cluster_status}) is invalid.")
else:
cluster_filters["status"] = cluster_status

if cluster_subtype:
# TODO [sb]: make it not hard-coded.
if cluster_subtype not in ["on-demand", "cluster", "k8"]:
logger.error(
f"The provided cluster type ({cluster_status}) is invalid. Supported types are on-demand, cluster, or k8."
)
else:
cluster_subtype = _parse_cluster_type(cluster_type=cluster_subtype)
cluster_filters["cluster_subtype"] = cluster_subtype

return cluster_filters


@cluster_app.command("list")
def cluster_list(
get_all_clusters: bool = typer.Option(
False,
"--a",
"-a",
help="Get all den clusters. Up to 50 most recently clusters will be returnted",
),
since: Optional[str] = typer.Option(
None, "--since", help="Time duration to filter, e.g. 30s, 15m, 2h, 3d."
),
cluster_status: Optional[str] = typer.Option(
None, "--status", help="Cluster status, e.g. running, terminated, etc."
),
cluster_subtype: Optional[str] = typer.Option(
None, "--type", help="Runhouse cluster type, e.g. cluster, on-demand, k8."
),
):
"""Load Runhouse clusters"""
import sky

# logged out case
if not rh.configs.token:
# TODO [SB]: adjust msg formatting (coloring etc)
sky_cli_command_formatted = f"{italic_bold_ansi}sky status -r{reset_format}" # will be printed bold and italic
console.print(
f"This feature is available only for Den users. Please run {sky_cli_command_formatted} to get on-demand cluster(s) information or sign-up Den."
)
return

cluster_filters = _parse_filters(
since=since, cluster_status=cluster_status, cluster_subtype=cluster_subtype
)

# get clusters from den
den_clusters_resp = get_clusters_from_den(cluster_filters=cluster_filters)
if den_clusters_resp.status_code != 200:
logger.error(f"Failed to load {rns_client.username}'s clusters from Den")
den_clusters = []
else:
den_clusters = den_clusters_resp.json().get("data")

# get sky live clusters
sky_live_clusters = sky.status(refresh=True)

if not sky_live_clusters and not den_clusters:
console.print("No existing clusters.")

sky_live_clusters = _get_unsaved_live_clusters(
den_clusters=den_clusters, sky_live_clusters=sky_live_clusters
)

running_clusters, not_running_clusters = _get_running_and_not_running_clusters(
clusters=den_clusters
)
all_clusters = running_clusters + not_running_clusters

# creating the clusters table
table = _create_output_table(
total_clusters=len(den_clusters), running_clusters=len(running_clusters)
)
clusters_to_print = all_clusters if get_all_clusters else running_clusters
filters = {"all": "all"} if get_all_clusters else cluster_filters
_add_clusters_to_output_table(
table=table, clusters=clusters_to_print, filters=filters
)

console.print(table)

_print_msg_live_sky_clusters(live_clusters_not_in_den=len(sky_live_clusters))


# Register the 'cluster' command group with the main runhouse application
app.add_typer(cluster_app, name="cluster")

Expand Down

0 comments on commit e150b64

Please sign in to comment.