Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

On asyncio, Event.set() sometimes fails to notify all waiting tasks #536

Closed
gschaffner opened this issue Mar 8, 2023 · 6 comments · Fixed by #595
Closed

On asyncio, Event.set() sometimes fails to notify all waiting tasks #536

gschaffner opened this issue Mar 8, 2023 · 6 comments · Fixed by #595

Comments

@gschaffner
Copy link
Collaborator

hi!

if:

  1. task A is awaiting Event.wait()
  2. Event.set() is called
  3. the wait()'s cancel scope is cancelled before the event loop schedules A

then, on the asyncio backend only, task A's wait() raises cancelled instead of getting notified of the event.

in contrast, on the trio backend (and on pure trio), Event.set() still notifies all waiters (including task A) in this situation. trio does this by implementing Event.wait via wait_task_rescheduled.

here is a small reproducer: test_event_wait_before_set_before_cancel in 65d74e4. assert wait_woke fails on asyncio only.

this problem is also the direct cause of another bug: MemoryObjectSendStream.send(item) can raise cancelled after item has been delivered to a receiver!1 this MemoryObjectSendStream.send issue is just the send_event.wait() case to #146's receive_event.wait() case, i believe—the underlying issue for both seems to be this Event.wait() problem2.

a couple relevant comments from njsmith: #146 (comment), #147 (comment).

Footnotes

  1. raising cancelled after a receiver gets the item is a violation of trio's cancellation semantics for API in the trio namespace (https://trio.readthedocs.io/en/stable/reference-core.html#cancellation-and-primitive-operations). since anyio is documented as following trio's cancellation model, i would guess that anyio is also intended to adhere to this (but in the anyio namespace)? if so, i think it would be good to document this at https://anyio.readthedocs.io/en/stable/cancellation.html and to document the couple rare exceptions to this rule (e.g. https://trio.readthedocs.io/en/stable/reference-io.html#trio.abc.SendStream.send_all) too.

  2. nit: Object stream randomly drops items #146 actually had two underlying issues: the problem that MemoryObjectReceiveStream.receive() didn't checkpoint_if_cancelled() before trying receive_nowait() (on all backends), as well as this Event.wait() issue (on asyncio only).

@agronholm
Copy link
Owner

I'm really confused now. If a task is cancelled while blocked on Event.wait(), why should it not raise CancelledError if it's cancelled before the event loop gets around to running it? And why would it affect other tasks waiting on the same event? The subject line itself is misleading because Event.wait() doesn't notify any tasks, it waits for a notification. What am I missing here?

@smurfix
Copy link
Collaborator

smurfix commented Apr 10, 2023

My rationale for why this is a problem: Assume that the event is associated with reliably transferring some data from task A to task B. For example, a rendezvous object. A sees that somebody (B) is waiting, thus it stores the data, sets the event, and continues on its merry way. B's receive call on this object must now return the data that A stored in it.

If that doesn't happen when B gets cancelled, suddenly nobody is responsible for the data in question. You can't fix this by teaching the send side to wait for the data to have been read because, surprise, again you'd have to wait on an event, and now you run into the exact same problem on A's side: if it gets cancelled, you don't know whether the owner of the object is now A or B.

While it's certainly possible to add higher-level workarounds like an atomic and non-waiting "get the data out of this object if there happens to be any in there" method, that doesn't match every other read or get or receive call in Trio/AnyIO. The basic guarantee should be to either return the data that's been read, or to leave the channel in the state it was in before the call was made. (Same with send/put/write.)

@agronholm
Copy link
Owner

Don't you think those guarantees go right out the window when tasks start getting cancelled (rather than the memory object streams being closed)? And OP's argument was that, if a task was cancelled after being scheduled, AnyIO should somehow override asyncio's normal cancellation mechanism to allow the task to continue instead of raising a cancellation error like it normally would?

@smurfix
Copy link
Collaborator

smurfix commented Apr 10, 2023

Well they don't go out the window when you talk to network connections, so why should they do so when using a memory stream? (Besides, well that's just the first example I could think of, I'm sure there are others where this may be an issue.)

If fixing / working around this by way of the AnyIO wrapper is too difficult / not possible / degrades performance too much / too much work for too little gain / …, given asyncio's current cancellation semantics, the alternate solution would be to (a) document that yes this is a problem with no generic workaround by (mis)design, use Trio if you need a way to get guaranteed behavior, and (b) teach asyncio sane cancellation semantics so that this gets fixed long term.

@agronholm
Copy link
Owner

Well they don't go out the window when you talk to network connections, so why should they do so when using a memory stream? (Besides, well that's just the first example I could think of, I'm sure there are others where this may be an issue.)

If you're reading from a socket in a task, and the task is scheduled to be resumed and then gets cancelled, another task could take over and start receiving data from it without anything being lost.

As for memory streams, we already handle this problem (here) by ignoring the first cancellation so as not to lose the data.

@gschaffner gschaffner changed the title On asyncio, Event.wait() sometimes fails to notify all waiting tasks On asyncio, Event.set() sometimes fails to notify all waiting tasks May 9, 2023
@gschaffner
Copy link
Collaborator Author

gschaffner commented May 9, 2023

The subject line itself is misleading because Event.wait() doesn't notify any tasks, it waits for a notification.

oops, sorry—that's just a typo in the subject line. fixed now.

As for memory streams, we already handle this problem (here) by ignoring the first cancellation so as not to lose the data.

sort of—those lines handle half of the problem for memory object streams. as i mentioned briefly in the top post, memory object streams still exhibit the other half of the bug.

in more detail: consider a memory object stream pair with max_buffer_size=0. consider two cases:

  1. item = await receive() is called before send_nowait(item).

    in this case, send_nowait does

    if self._state.waiting_receivers:
    receive_event, container = self._state.waiting_receivers.popitem(last=False)
    container.append(item)
    receive_event.set()

    it then immediately returns, indicating that item was sent successfully. since there is buffer size 0, one of the waiting receivers must receive the item now.

    to ensure this, await receive() must not raise cancelled, even if in a cancelled scope, because the event was already set. as you mention, this is currently handled as a special-case here:

    try:
    await receive_event.wait()
    except get_cancelled_exc_class():
    # Ignore the immediate cancellation if we already received an item, so
    # as not to lose it
    if not container:
    raise

  2. await send(item) is called before receive_nowait(item).

    in this case, receive_nowait does

    if self._state.waiting_senders:
    # Get the item from the next sender
    send_event, item = self._state.waiting_senders.popitem(last=False)
    self._state.buffer.append(item)
    send_event.set()
    if self._state.buffer:
    return self._state.buffer.popleft()

    and then returns item. since item was received, await send(item) must not raise now, because it did successfully send the item. someone received it and will now process it!

    for example, if there is code like

    # Invariant: I must either (a) send ``item`` or (b) discard ``item`` and call
    # ``remove_state_that_tracks_item`` to clean up after the discarded item.
    try:
        await send(item)  # send_event.wait()
    except BaseException:
        remove_state_that_tracks_item(item)
        raise

    then on asyncio, AnyIO can raise cancelled, causing us to call remove_state_that_tracks_item even though the item was actually sent successfully.

what i mean to point out is that the current code that handles the special case of (1.) is the symptom of a lower-level problem. one should not have to patch Event.wait with a special-case band-aid every time they need Event.wait to have the same cancellation semantics on asyncio that it does on Trio. Event should just behave the same way on the asyncio backend that it does on the Trio backend.

with case (2.) in particular, the problem is within AnyIO so one can't apply a band-aid patch to it without forcing users to install a patched version of the library.

it may be clearer if you have time to peek at #537. in that proposed changeset, i essentially remove the asyncio-only band-aid from MemoryObjectReceiveStream.receive and instead fix the underlying problem in _backends.asyncio.Event.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
3 participants