diff --git a/fastapi_websocket_pubsub/event_broadcaster.py b/fastapi_websocket_pubsub/event_broadcaster.py index 5efec7d..89dadd6 100644 --- a/fastapi_websocket_pubsub/event_broadcaster.py +++ b/fastapi_websocket_pubsub/event_broadcaster.py @@ -157,6 +157,7 @@ def __init__( # If we opt to manage the context directly (i.e. call async with on the event broadcaster itself) self._context_manager = None self._context_manager_lock = asyncio.Lock() + self._tasks = set() async def __broadcast_notifications__(self, subscription: Subscription, data): """ @@ -267,11 +268,18 @@ async def __read_notifications__(self): ) ) # Notify subscribers of message received from broadcast - await self._notifier.notify( - notification.topics, - notification.data, - notifier_id=self._id, + task = asyncio.create_task( + self._notifier.notify( + notification.topics, + notification.data, + notifier_id=self._id, + ) ) + + self._tasks.add(task) + def cleanup(task): + self._tasks.remove(task) + task.add_done_callback(cleanup) except: logger.exception("Failed handling incoming broadcast") logger.info(