Skip to content

Commit

Permalink
will this make the tests happier?
Browse files Browse the repository at this point in the history
  • Loading branch information
dlwh committed Sep 16, 2024
1 parent 3f60b82 commit 6838884
Showing 1 changed file with 21 additions and 4 deletions.
25 changes: 21 additions & 4 deletions src/levanter/utils/thread_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,24 +34,41 @@ class AsyncIteratorWrapper(Iterator):
def __init__(self, async_iter):
self.async_iter = async_iter
self.loop = asyncio.new_event_loop()
self.executor = ThreadPoolExecutor(max_workers=1)
self.thread = threading.Thread(target=self._run_loop, daemon=True)
self.thread.start()
self._exhausted = False # Flag to indicate if the iterator is exhausted

def _run_loop(self):
asyncio.set_event_loop(self.loop)
self.loop.run_forever()

def _run_async_task(self, coro):
return asyncio.run_coroutine_threadsafe(coro, self.loop).result()
if not self.loop.is_running() or not self.thread.is_alive():
raise StopIteration # Loop is not running or thread has been joined
try:
future = asyncio.run_coroutine_threadsafe(coro, self.loop)
return future.result()
except (RuntimeError, asyncio.CancelledError):
raise StopIteration # Either the loop was closed or the coroutine was cancelled

def __iter__(self):
return self

def __next__(self):
if self._exhausted:
raise StopIteration
try:
return self._run_async_task(self.async_iter.__anext__())
except StopAsyncIteration:
self.loop.call_soon_threadsafe(self.loop.stop)
self.thread.join()
self._exhausted = True # Mark the iterator as exhausted
if self.loop.is_running():
self.loop.call_soon_threadsafe(self.loop.stop)
self.thread.join() # Ensure the thread is safely joined
raise StopIteration

def close(self):
"""Close the event loop and thread gracefully."""
if self.loop.is_running():
self.loop.call_soon_threadsafe(self.loop.stop)
self.thread.join() # Join the thread to ensure the loop is fully stopped
self.loop.close() # Explicitly close the loop to avoid dangling tasks

0 comments on commit 6838884

Please sign in to comment.