From 6d20bf0a69ff8d53effec439cb4a1cda3942a038 Mon Sep 17 00:00:00 2001 From: Rohin Bhasin Date: Mon, 15 Apr 2024 14:39:57 -0400 Subject: [PATCH] Clean up an `EnvServlet` from global Runhouse daemon state if it dies unexpectedly. --- runhouse/servers/cluster_servlet.py | 10 +++++++++- runhouse/servers/obj_store.py | 29 +++++++++++++++++++++-------- 2 files changed, 30 insertions(+), 9 deletions(-) diff --git a/runhouse/servers/cluster_servlet.py b/runhouse/servers/cluster_servlet.py index 4cae0682b..16fd323e5 100644 --- a/runhouse/servers/cluster_servlet.py +++ b/runhouse/servers/cluster_servlet.py @@ -197,5 +197,13 @@ async def aclear_key_to_env_servlet_name_dict(self): ############################################## # Remove Env Servlet ############################################## - async def aremove_env_servlet_name(self, env_servlet_name: str): + async def aclear_all_references_to_env_servlet_name(self, env_servlet_name: str): self._initialized_env_servlet_names.remove(env_servlet_name) + deleted_keys = [ + key + for key, env in self._key_to_env_servlet_name.items() + if env == env_servlet_name + ] + for key in deleted_keys: + self._key_to_env_servlet_name.pop(key) + return deleted_keys diff --git a/runhouse/servers/obj_store.py b/runhouse/servers/obj_store.py index f3d31ca98..40227169b 100644 --- a/runhouse/servers/obj_store.py +++ b/runhouse/servers/obj_store.py @@ -264,7 +264,21 @@ def initialize( @staticmethod async def acall_env_servlet_method(servlet_name: str, method: str, *args, **kwargs): env_servlet = ObjStore.get_env_servlet(servlet_name) - return await ObjStore.acall_actor_method(env_servlet, method, *args, **kwargs) + try: + return await ObjStore.acall_actor_method( + env_servlet, method, *args, **kwargs + ) + except (ray.exceptions.RayActorError, ray.exceptions.OutOfMemoryError) as e: + if isinstance( + e, ray.exceptions.OutOfMemoryError + ) or "died unexpectedly before finishing this task" in str(e): + # Need to clean up the env_servlet everywhere + # Need an actual instance of the ObjStore that has a cluster servlet set up in order to do this + from runhouse.globals import obj_store + + await obj_store.adelete_env_contents(servlet_name) + + raise e @staticmethod async def acall_actor_method( @@ -534,9 +548,11 @@ async def _apop_env_servlet_name_for_key(self, key: Any, *args) -> str: ############################################## # Remove Env Servlet ############################################## - async def aremove_env_servlet_name(self, env_servlet_name: str): + async def aclear_all_references_to_env_servlet_name(self, env_servlet_name: str): return await self.acall_actor_method( - self.cluster_servlet, "aremove_env_servlet_name", env_servlet_name + self.cluster_servlet, + "aclear_all_references_to_env_servlet_name", + env_servlet_name, ) ############################################## @@ -875,17 +891,14 @@ async def adelete_local(self, key: Any): async def adelete_env_contents(self, env_name: Any): from runhouse.globals import env_servlets - # clear keys in the env servlet - deleted_keys = await self.akeys_for_env_servlet_name(env_name) - await self.aclear_for_env_servlet_name(env_name) - # delete the env servlet actor and remove its references if env_name in env_servlets: actor = env_servlets[env_name] ray.kill(actor) del env_servlets[env_name] - await self.aremove_env_servlet_name(env_name) + + deleted_keys = await self.aclear_all_references_to_env_servlet_name(env_name) return deleted_keys