Skip to content

Commit

Permalink
Clean up an EnvServlet from global Runhouse daemon state if it dies
Browse files Browse the repository at this point in the history
unexpectedly.
  • Loading branch information
rohinb2 committed Apr 16, 2024
1 parent 39536fd commit f5d79e6
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 10 deletions.
10 changes: 9 additions & 1 deletion runhouse/servers/cluster_servlet.py
Original file line number Diff line number Diff line change
Expand Up @@ -199,5 +199,13 @@ async def aclear_key_to_env_servlet_name_dict(self):
##############################################
# Remove Env Servlet
##############################################
async def aremove_env_servlet_name(self, env_servlet_name: str):
async def aclear_all_references_to_env_servlet_name(self, env_servlet_name: str):
self._initialized_env_servlet_names.remove(env_servlet_name)
deleted_keys = [
key
for key, env in self._key_to_env_servlet_name.items()
if env == env_servlet_name
]
for key in deleted_keys:
self._key_to_env_servlet_name.pop(key)
return deleted_keys
25 changes: 16 additions & 9 deletions runhouse/servers/obj_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,17 @@ async def acall_env_servlet_method(
env_servlet = self.get_env_servlet(
servlet_name, use_env_servlet_cache=use_env_servlet_cache
)
return await ObjStore.acall_actor_method(env_servlet, method, *args, **kwargs)
try:
return await ObjStore.acall_actor_method(
env_servlet, method, *args, **kwargs
)
except (ray.exceptions.RayActorError, ray.exceptions.OutOfMemoryError) as e:
if isinstance(
e, ray.exceptions.OutOfMemoryError
) or "died unexpectedly before finishing this task" in str(e):
await self.adelete_env_contents(servlet_name)

raise e

@staticmethod
async def acall_actor_method(
Expand Down Expand Up @@ -544,9 +554,11 @@ async def _apop_env_servlet_name_for_key(self, key: Any, *args) -> str:
##############################################
# Remove Env Servlet
##############################################
async def aremove_env_servlet_name(self, env_servlet_name: str):
async def aclear_all_references_to_env_servlet_name(self, env_servlet_name: str):
return await self.acall_actor_method(
self.cluster_servlet, "aremove_env_servlet_name", env_servlet_name
self.cluster_servlet,
"aclear_all_references_to_env_servlet_name",
env_servlet_name,
)

##############################################
Expand Down Expand Up @@ -887,18 +899,13 @@ async def adelete_local(self, key: Any):

async def adelete_env_contents(self, env_name: Any):

# clear keys in the env servlet
deleted_keys = await self.akeys_for_env_servlet_name(env_name)
await self.aclear_for_env_servlet_name(env_name)

# delete the env servlet actor and remove its references
if env_name in self.env_servlet_cache:
actor = self.env_servlet_cache[env_name]
ray.kill(actor)

del self.env_servlet_cache[env_name]
await self.aremove_env_servlet_name(env_name)

deleted_keys = await self.aclear_all_references_to_env_servlet_name(env_name)
return deleted_keys

def delete_env_contents(self, env_name: Any):
Expand Down

0 comments on commit f5d79e6

Please sign in to comment.