Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Object stream randomly drops items #146

Closed
mjwestcott opened this issue Aug 13, 2020 · 11 comments · Fixed by #147 or #595
Closed

Object stream randomly drops items #146

mjwestcott opened this issue Aug 13, 2020 · 11 comments · Fixed by #147 or #595
Labels
bug Something isn't working
Milestone

Comments

@mjwestcott
Copy link
Contributor

This issue concerns master@23803be

Memory object streams appear to randomly drop items in the following scenario (across all backends):

import anyio


async def receiver(r):
    while True:
        async with anyio.move_on_after(0.1):
            print(await r.receive())


async def main():
    s, r = anyio.create_memory_object_stream()

    async with anyio.create_task_group() as tg:
        await tg.spawn(receiver, r)

        for i in range(10):
            await anyio.sleep(0.2)
            await s.send(i)
        await tg.cancel_scope.cancel()


anyio.run(main)

Whereas the equivalent Trio code works as expected (every item is received and printed):

import trio


async def receiver(r):
    while True:
        with trio.move_on_after(0.1):
            print(await r.receive())


async def main():
    s, r = trio.open_memory_channel(max_buffer_size=0)

    async with trio.open_nursery() as n:
        n.start_soon(receiver, r)

        for i in range(10):
            await trio.sleep(0.2)
            await s.send(i)
        n.cancel_scope.cancel()


trio.run(main)
@mjwestcott
Copy link
Contributor Author

diff --git a/src/anyio/streams/memory.py b/src/anyio/streams/memory.py
index 9b8e8ee..693caba 100644
--- a/src/anyio/streams/memory.py
+++ b/src/anyio/streams/memory.py
@@ -70,6 +70,8 @@ class MemoryObjectReceiveStream(Generic[T_Item], ObjectReceiveStream[T_Item]):
             try:
                 await receive_event.wait()
             except BaseException:
+                if container:
+                    self._state.buffer.append(container[0])
                 self._state.waiting_receivers.pop(receive_event, None)
                 raise

The basic problem: when receive is cancelled, it might have already been given the item. This patch just illustrates one avenue to explore.

@agronholm agronholm added the bug Something isn't working label Aug 13, 2020
@agronholm agronholm added this to the 2.0.0 milestone Aug 13, 2020
@agronholm
Copy link
Owner

Thanks for the report, the accurate analysis and solution! The only change I would make is to use .appendleft() instead.

@njsmith
Copy link
Collaborator

njsmith commented Aug 13, 2020 via email

@agronholm
Copy link
Owner

I'm not sure how to fix it then, but I will look at trio's implementation for inspiration.

@agronholm
Copy link
Owner

agronholm commented Aug 13, 2020

I don't understand how trio circumvents the same problem. If a task is waiting to receive from a memory channel and gets cancelled, and then a sender task gets scheduled, won't it try to reschedule that cancelled task with the value to be sent?

@njsmith
Copy link
Collaborator

njsmith commented Aug 13, 2020

@agronholm when wait_task_rescheduled is cancelled, it runs the abort_fn immediately, without waiting for the task to be scheduled again. And in this case, the abort_fn removes the cancelled task from the data structure that's tracking receivers, so a sender will never try to pass an object to a receiver that was cancelled.

@agronholm
Copy link
Owner

I see. But since AnyIO does not have the same low level facilities at its disposal, I will have to think of something else.

@njsmith
Copy link
Collaborator

njsmith commented Aug 13, 2020

Maybe you could detect when you've been simultaneously awoken by a cancellation + receiving an object to return, and "undo" the cancellation so the operation completes successfully?

@mjwestcott
Copy link
Contributor Author

@njsmith I thought about that, but I wasn't sure that it's correct to ignore the cancellation. I guess if your 'simultaneously awoken by two things' interpretation is correct, then simply picking one of them is legit. Here's an implementation:

diff --git a/src/anyio/streams/memory.py b/src/anyio/streams/memory.py
index 9b8e8ee..6dc08f1 100644
--- a/src/anyio/streams/memory.py
+++ b/src/anyio/streams/memory.py
@@ -69,7 +69,11 @@ class MemoryObjectReceiveStream(Generic[T_Item], ObjectReceiveStream[T_Item]):

             try:
                 await receive_event.wait()
-            except BaseException:
+            except BaseException as e:
+                if type(e) == anyio.get_cancelled_exc_class() and container:
+                    # We were simultaneously awoken by a cancellation and receiving an
+                    # item. We choose the latter to ensure that items are not dropped.
+                    return container[0]
                 self._state.waiting_receivers.pop(receive_event, None)
                 raise

It seems to work on the asyncio and Trio backends, but not Curio.

@mjwestcott
Copy link
Contributor Author

Curio is raising TaskTimeout instead of TaskCancelled. Hopefully that's the only thing wrong with the patch

@agronholm
Copy link
Owner

That really rubs me the wrong way. I've opened a PR with an alternative fix.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
3 participants