Skip to content

Commit

Permalink
Clean up an EnvServlet from global Runhouse daemon state if it dies
Browse files Browse the repository at this point in the history
unexpectedly.
  • Loading branch information
rohinb2 committed Apr 16, 2024
1 parent 908fa1a commit 6d20bf0
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 9 deletions.
10 changes: 9 additions & 1 deletion runhouse/servers/cluster_servlet.py
Original file line number Diff line number Diff line change
Expand Up @@ -197,5 +197,13 @@ async def aclear_key_to_env_servlet_name_dict(self):
##############################################
# Remove Env Servlet
##############################################
async def aremove_env_servlet_name(self, env_servlet_name: str):
async def aclear_all_references_to_env_servlet_name(self, env_servlet_name: str):
self._initialized_env_servlet_names.remove(env_servlet_name)
deleted_keys = [
key
for key, env in self._key_to_env_servlet_name.items()
if env == env_servlet_name
]
for key in deleted_keys:
self._key_to_env_servlet_name.pop(key)
return deleted_keys
29 changes: 21 additions & 8 deletions runhouse/servers/obj_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,21 @@ def initialize(
@staticmethod
async def acall_env_servlet_method(servlet_name: str, method: str, *args, **kwargs):
env_servlet = ObjStore.get_env_servlet(servlet_name)
return await ObjStore.acall_actor_method(env_servlet, method, *args, **kwargs)
try:
return await ObjStore.acall_actor_method(
env_servlet, method, *args, **kwargs
)
except (ray.exceptions.RayActorError, ray.exceptions.OutOfMemoryError) as e:
if isinstance(
e, ray.exceptions.OutOfMemoryError
) or "died unexpectedly before finishing this task" in str(e):
# Need to clean up the env_servlet everywhere
# Need an actual instance of the ObjStore that has a cluster servlet set up in order to do this
from runhouse.globals import obj_store

await obj_store.adelete_env_contents(servlet_name)

raise e

@staticmethod
async def acall_actor_method(
Expand Down Expand Up @@ -534,9 +548,11 @@ async def _apop_env_servlet_name_for_key(self, key: Any, *args) -> str:
##############################################
# Remove Env Servlet
##############################################
async def aremove_env_servlet_name(self, env_servlet_name: str):
async def aclear_all_references_to_env_servlet_name(self, env_servlet_name: str):
return await self.acall_actor_method(
self.cluster_servlet, "aremove_env_servlet_name", env_servlet_name
self.cluster_servlet,
"aclear_all_references_to_env_servlet_name",
env_servlet_name,
)

##############################################
Expand Down Expand Up @@ -875,17 +891,14 @@ async def adelete_local(self, key: Any):
async def adelete_env_contents(self, env_name: Any):
from runhouse.globals import env_servlets

# clear keys in the env servlet
deleted_keys = await self.akeys_for_env_servlet_name(env_name)
await self.aclear_for_env_servlet_name(env_name)

# delete the env servlet actor and remove its references
if env_name in env_servlets:
actor = env_servlets[env_name]
ray.kill(actor)

del env_servlets[env_name]
await self.aremove_env_servlet_name(env_name)

deleted_keys = await self.aclear_all_references_to_env_servlet_name(env_name)

return deleted_keys

Expand Down

0 comments on commit 6d20bf0

Please sign in to comment.