Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Assert in _drain_helper when sending websocket message #2934

Closed
willmcgugan opened this issue Apr 12, 2018 · 7 comments
Closed

Assert in _drain_helper when sending websocket message #2934

willmcgugan opened this issue Apr 12, 2018 · 7 comments

Comments

@willmcgugan
Copy link
Contributor

Long story short

We see bursts of the following exception on our websocket server.

Traceback (most recent call last):
  File "", line 378, in _on_packet
    await self.dispatch(packet)
  File "", line 67, in dispatch
    await method(self, packet)
  File "", line 550, in handle_request_send
    packet.data
  File "", line 216, in route
    packets.Route(route_no, data)
  File ", line 289, in send
    if not await self.send_bytes(packet_bytes):
  File "", line 297, in send_bytes
    await self.ws.send_bytes(packet_bytes)
  File "lib/python3.6/site-packages/aiohttp/web_ws.py", line 263, in 
    await self._writer.send(data, binary=True, compress=compress)
  File "lib/python3.6/site-packages/aiohttp/http_websocket.py", line 607, 
    return await self._send_frame(message, WSMsgType.BINARY, compress)
  File "lib/python3.6/site-packages/aiohttp/http_websocket.py", line 588, 
    await self.protocol._drain_helper()
  File "lib/python3.6/asyncio/streams.py", line 214, in _drain_helper
    assert waiter is None or waiter.cancelled()
AssertionError

(paths censored at client's request)

I'm guessing its a flow control issue? We're essentially copying data from one websocket to another and rely on back pressure to keep things running.

I can't reproduce it easily, but happy to investigate if it would help.

Your environment

A websocket server running aiohttp==3.1.0 on Linux

@dangusev
Copy link

dangusev commented Jun 29, 2018

Let's bump this topic. I faced the same problem and found some related issues.
Long story short, _drain_helper method is not coroutine-safe and may fail if it has been called from different coroutines. The easiest fix is to wrap it with a lock. See more here:
python-websockets/websockets#16
https://bugs.python.org/issue29930
Polyconseil/aioamqp#137
Hope it will help to resolve the issue.

@crusaderky
Copy link

I'm getting exactly the same on a websocket client using aiohttp 3.6.1.

    await ws.send_bytes(payload)
  File "site-packages/aiohttp/client_ws.py", line 155, in send_bytes
    await self._writer.send(data, binary=True, compress=compress)
  File "site-packages/aiohttp/http_websocket.py", line 644, in send
    await self._send_frame(message, WSMsgType.BINARY, compress)
  File "site-packages/aiohttp/http_websocket.py", line 623, in _send_frame
    await self.protocol._drain_helper()
  File "site-packages/aiohttp/base_protocol.py", line 78, in _drain_helper
    assert waiter is None or waiter.cancelled()

The issue is triggered by having 50+ concurrent asyncio tasks invoke send_bytes in parallel on the same websocket (the documentation states this should be OK to do).

The workaround is to wrap send_bytes in an asyncio.Lock that is shared among all the tasks that operate on the same websocket.

@bdowning
Copy link

bdowning commented Oct 4, 2020

I also just saw this, and the same fix worked for me.

I believe what's happening here is that when messages are large enough to need to be fragmented the exact order of frames on the wire matter (since the CONTINUATION frame needs to directly follow the previous); hence the need for locking so you don't have competing writers. IMHO aiohttp should handle this locking itself as it is required for correctness here.

@kumaraditya303
Copy link

kumaraditya303 commented Sep 24, 2022

FYI I have this fixed in CPython upstream python/cpython#94705 and is backported to 3.11 and 3.10.

cc @willmcgugan @webknjaz

@Dreamsorcerer
Copy link
Member

Looks like we can probably close this issue then. If nobody has implemented a fix here for 4.5 years, I assume we can just live with the issue in Python 3.7-3.9 for a little longer.

@bdowning
Copy link

bdowning commented Oct 29, 2022

Does the upstream fix actually... fix this, though? From my understanding of the Websocket protocol, a fragmented message must be contiguous in the TCP stream. I think the situation here is that if I have multiple async tasks trying to write out a message, and a message fragments, only that task must write all its fragments until complete. If any other task writes out anything in between fragments, there will be a protocol error.

AFAIK, unless I'm either dramatically misunderstanding the websocket protocol or the upstream change, you will still get occasionally wrong results unless there's some mutexing in aiohttp.

@Dreamsorcerer
Copy link
Member

Think it's possible for you to create a reproducer/test for this?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

6 participants