Skip to content

Commit

Permalink
Added an alternate fix for MemoryObjectReceiveStream.receive()` on …
Browse files Browse the repository at this point in the history
…asyncio (#595)

Co-authored-by: Ganden Schaffner <gschaffner@pm.me>
  • Loading branch information
agronholm and gschaffner committed Jul 25, 2023
1 parent 6307392 commit 6b0a1f3
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 8 deletions.
5 changes: 5 additions & 0 deletions docs/versionhistory.rst
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ This library adheres to `Semantic Versioning 2.0 <http://semver.org/>`_.
tasks were spawned and an outer cancellation scope had been cancelled before
- Ensured that exiting a ``TaskGroup`` always hits a yield point, regardless of
whether there are running child tasks to be waited on
- On asyncio, cancel scopes will defer cancelling tasks that are scheduled to resume
with a finished future
- Task groups on all backends now raise a single cancellation exception when an outer
cancel scope is cancelled, and no exceptions other than cancellation exceptions are
raised in the group
Expand All @@ -40,6 +42,9 @@ This library adheres to `Semantic Versioning 2.0 <http://semver.org/>`_.
scope suppressed a cancellation exception
- Fixed ``fail_after()`` raising an unwarranted ``TimeoutError`` when the cancel scope
was cancelled before reaching its deadline
- Fixed ``MemoryObjectReceiveStream.receive()`` causing the receiving task on asyncio to
remain in a cancelled state if the operation was cancelled after an item was queued to
be received by the task (but before the task could actually receive the item)
- Removed unnecessary extra waiting cycle in ``Event.wait()`` on asyncio in the case
where the event was not yet set

Expand Down
6 changes: 4 additions & 2 deletions src/anyio/_backends/_asyncio.py
Original file line number Diff line number Diff line change
Expand Up @@ -468,8 +468,10 @@ def _deliver_cancellation(self) -> None:
if task is not current and (
task is self._host_task or _task_started(task)
):
self._cancel_calls += 1
task.cancel()
waiter = task._fut_waiter # type: ignore[attr-defined]
if not isinstance(waiter, asyncio.Future) or not waiter.done():
self._cancel_calls += 1
task.cancel()

# Schedule another callback if there are still tasks left
if should_retry:
Expand Down
6 changes: 0 additions & 6 deletions src/anyio/streams/memory.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
ClosedResourceError,
EndOfStream,
WouldBlock,
get_cancelled_exc_class,
)
from ..abc import Event, ObjectReceiveStream, ObjectSendStream
from ..lowlevel import checkpoint
Expand Down Expand Up @@ -104,11 +103,6 @@ async def receive(self) -> T_co:

try:
await receive_event.wait()
except get_cancelled_exc_class():
# Ignore the immediate cancellation if we already received an item, so
# as not to lose it
if not container:
raise
finally:
self._state.waiting_receivers.pop(receive_event, None)

Expand Down
19 changes: 19 additions & 0 deletions tests/test_synchronization.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,25 @@ async def task() -> None:
assert task_started
assert not event_set

async def test_event_wait_before_set_before_cancel(self) -> None:
setter_started = waiter_woke = False

async def setter() -> None:
nonlocal setter_started
setter_started = True
assert not event.is_set()
event.set()
tg.cancel_scope.cancel()

event = Event()
async with create_task_group() as tg:
tg.start_soon(setter)
await event.wait()
waiter_woke = True

assert setter_started
assert waiter_woke

async def test_statistics(self) -> None:
async def waiter() -> None:
await event.wait()
Expand Down