Skip to content

Commit

Permalink
Clean up an EnvServlet from global Runhouse daemon state if it dies
Browse files Browse the repository at this point in the history
unexpectedly.
  • Loading branch information
rohinb2 committed Apr 16, 2024
1 parent 109a475 commit 21db95c
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 17 deletions.
10 changes: 9 additions & 1 deletion runhouse/servers/cluster_servlet.py
Original file line number Diff line number Diff line change
Expand Up @@ -199,5 +199,13 @@ async def aclear_key_to_env_servlet_name_dict(self):
##############################################
# Remove Env Servlet
##############################################
async def aremove_env_servlet_name(self, env_servlet_name: str):
async def aclear_all_references_to_env_servlet_name(self, env_servlet_name: str):
self._initialized_env_servlet_names.remove(env_servlet_name)
deleted_keys = [
key
for key, env in self._key_to_env_servlet_name.items()
if env == env_servlet_name
]
for key in deleted_keys:
self._key_to_env_servlet_name.pop(key)
return deleted_keys
34 changes: 18 additions & 16 deletions runhouse/servers/obj_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -263,15 +263,20 @@ def initialize(
# Generic helpers
##############################################
async def acall_env_servlet_method(
self,
servlet_name: str,
method: str,
*args,
use_env_servlet_cache: bool = True,
**kwargs,
self, servlet_name: str, method: str, *args, **kwargs
):
env_servlet = self.get_env_servlet(servlet_name, use_env_servlet_cache)
return await ObjStore.acall_actor_method(env_servlet, method, *args, **kwargs)
env_servlet = self.get_env_servlet(servlet_name)
try:
return await ObjStore.acall_actor_method(
env_servlet, method, *args, **kwargs
)
except (ray.exceptions.RayActorError, ray.exceptions.OutOfMemoryError) as e:
if isinstance(
e, ray.exceptions.OutOfMemoryError
) or "died unexpectedly before finishing this task" in str(e):
await self.adelete_env_contents(servlet_name)

raise e

@staticmethod
async def acall_actor_method(
Expand Down Expand Up @@ -542,9 +547,11 @@ async def _apop_env_servlet_name_for_key(self, key: Any, *args) -> str:
##############################################
# Remove Env Servlet
##############################################
async def aremove_env_servlet_name(self, env_servlet_name: str):
async def aclear_all_references_to_env_servlet_name(self, env_servlet_name: str):
return await self.acall_actor_method(
self.cluster_servlet, "aremove_env_servlet_name", env_servlet_name
self.cluster_servlet,
"aclear_all_references_to_env_servlet_name",
env_servlet_name,
)

##############################################
Expand Down Expand Up @@ -885,18 +892,13 @@ async def adelete_local(self, key: Any):

async def adelete_env_contents(self, env_name: Any):

# clear keys in the env servlet
deleted_keys = await self.akeys_for_env_servlet_name(env_name)
await self.aclear_for_env_servlet_name(env_name)

# delete the env servlet actor and remove its references
if env_name in self.env_servlet_cache:
actor = self.env_servlet_cache[env_name]
ray.kill(actor)

del self.env_servlet_cache[env_name]
await self.aremove_env_servlet_name(env_name)

deleted_keys = await self.aclear_all_references_to_env_servlet_name(env_name)
return deleted_keys

def delete_env_contents(self, env_name: Any):
Expand Down

0 comments on commit 21db95c

Please sign in to comment.