Skip to content

Commit

Permalink
Ensure progress bar is continuously updated (#231)
Browse files Browse the repository at this point in the history
* refactor to update status

* rename vars

* update reset

* update return types
  • Loading branch information
skarimo committed Apr 9, 2024
1 parent a8b36d6 commit cc145d3
Showing 1 changed file with 24 additions and 18 deletions.
42 changes: 24 additions & 18 deletions datadog_sync/utils/workers.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
# Copyright 2019 Datadog, Inc.

from __future__ import annotations
from asyncio import Future, Queue, QueueEmpty, Task, gather, get_event_loop, sleep
from asyncio import AbstractEventLoop, Future, Queue, QueueEmpty, Task, gather, get_event_loop, sleep

from dataclasses import dataclass
from traceback import format_exc
Expand All @@ -22,14 +22,16 @@ def __init__(self, config: Configuration) -> None:
self.workers: List[Task] = []
self.work_queue: Queue = Queue()
self.counter: Counter = Counter()
self._shutdown: bool = False
self.pbar: Optional[tqdm] = None
self._running_workers_count: int = 0
self._loop: AbstractEventLoop = get_event_loop()
self._shutdown_workers: bool = False
self._cb: Optional[Awaitable] = None
self._cancel_cb: Callable = self.work_queue.empty
self.pbar: Optional[tqdm] = None

async def init_workers(
self, cb: Awaitable, cancel_cb: Optional[Callable], worker_count: Optional[int], *args, **kwargs
) -> None:
) -> Awaitable[None]:
await self._reset()

max_workers = self.config.max_workers
Expand All @@ -41,13 +43,14 @@ async def init_workers(
self._cancel_cb = cancel_cb
await self._create_workers(max_workers, *args, **kwargs)

async def _create_workers(self, max_workers: Optional[int], *args, **kwargs):
async def _create_workers(self, max_workers: int, *args, **kwargs) -> Awaitable[None]:
for _ in range(max_workers):
self.workers.append(self._worker(*args, **kwargs))
self._running_workers_count = max_workers
self.workers.append(self._cancel_worker())

async def _worker(self, *args, **kwargs) -> None:
while not self._shutdown or (self._shutdown and not self.work_queue.empty()):
async def _worker(self, *args, **kwargs) -> Awaitable[None]:
while not self._shutdown_workers or (self._shutdown_workers and not self.work_queue.empty()):
try:
t = self.work_queue.get_nowait()
try:
Expand All @@ -58,40 +61,43 @@ async def _worker(self, *args, **kwargs) -> None:
finally:
self.work_queue.task_done()
if self.pbar:
self.pbar.update()
await self._loop.run_in_executor(None, self.pbar.update)
except QueueEmpty:
pass
except Exception as e:
self.config.logger.debug(format_exc())
self.config.logger.error(f"Error processing task: {e}")
await sleep(0)
self._running_workers_count -= 1

async def _cancel_worker(self) -> None:
loop = get_event_loop()
while True:
if await loop.run_in_executor(None, self._cancel_cb):
self._shutdown = True
if await self._loop.run_in_executor(None, self._cancel_cb):
self._shutdown_workers = True
break
if self.pbar:
await loop.run_in_executor(None, self.pbar.refresh)
await sleep(0)

async def _reset(self):
async def _reset(self) -> Awaitable[None]:
self.workers.clear()
self.work_queue = Queue()
self.counter.reset_counter()
self._shutdown = False
self._shutdown_workers = False
self.pbar = None
self._running_workers_count = 0

async def _refresh_pbar(self) -> Awaitable[None]:
while self._running_workers_count > 0 and self.pbar:
await self._loop.run_in_executor(None, self.pbar.display)

async def schedule_workers(self, additional_coros: List = []) -> Future:
self._shutdown = False
self._shutdown_workers = False
return await gather(*self.workers, *additional_coros, return_exceptions=True)

async def schedule_workers_with_pbar(self, total, additional_coros: List = []) -> Future:
self.pbar = tqdm(total=total)

self._shutdown = False
self._shutdown_workers = False
with logging_redirect_tqdm():
additional_coros.append(self._refresh_pbar())
await self.schedule_workers(additional_coros)

self.pbar.close()
Expand Down

0 comments on commit cc145d3

Please sign in to comment.