From 93b8a4289b412d0c714a8b80e5b3eefffacc5819 Mon Sep 17 00:00:00 2001 From: Tom Christie Date: Thu, 15 Jun 2023 13:14:59 +0100 Subject: [PATCH 01/18] Add 'AsyncShieldCancellation' context manager --- httpcore/_synchronization.py | 68 ++++++++++++++++++++++++++++++++++++ 1 file changed, 68 insertions(+) diff --git a/httpcore/_synchronization.py b/httpcore/_synchronization.py index cca84665..2cb6d421 100644 --- a/httpcore/_synchronization.py +++ b/httpcore/_synchronization.py @@ -171,6 +171,58 @@ async def release(self) -> None: self._anyio_semaphore.release() +class AsyncShieldCancellation: + # For certain portions of our codebase where we're dealing with + # closing connections during exception handling we want to shield + # the operation from being cancelled. + # + # with AsyncShieldCancellation(): + # ... # clean-up operations, shielded from cancellation. + + def __init__(self) -> None: + """ + Detect if we're running under 'asyncio' or 'trio' and create + a semaphore with the correct implementation. + """ + self._backend = sniffio.current_async_library() + + if self._backend == "trio": + if trio is None: # pragma: nocover + raise RuntimeError( + "Running under trio requires the 'trio' package to be installed." + ) + + self._trio_shield = trio.CancelScope(shield=True) + else: + if anyio is None: # pragma: nocover + raise RuntimeError( + "Running under asyncio requires the 'anyio' package to be installed." + ) + + self._anyio_shield = anyio.CancelScope(shield=True) + + def __enter__(self) -> "AsyncShieldCancellation": + if not self._backend: + self.setup() + + if self._backend == "trio": + self._trio_shield.__enter__() + else: + self._anyio_shield.__enter__() + return self + + def __exit__( + self, + exc_type: Optional[Type[BaseException]] = None, + exc_value: Optional[BaseException] = None, + traceback: Optional[TracebackType] = None, + ) -> None: + if self._backend == "trio": + self._trio_shield.__exit__(exc_type, exc_value, traceback) + else: + self._anyio_shield.__exit__(exc_type, exc_value, traceback) + + # Our thread-based synchronization primitives... @@ -212,3 +264,19 @@ def acquire(self) -> None: def release(self) -> None: self._semaphore.release() + + +class ShieldCancellation: + # Thread-synchronous codebases don't support cancellation semantics. + # We have this class because we need to mirror the async and sync + # cases within our package, but it's just a no-op. + def __enter__(self) -> "ShieldCancellation": + return self + + def __exit__( + self, + exc_type: Optional[Type[BaseException]] = None, + exc_value: Optional[BaseException] = None, + traceback: Optional[TracebackType] = None, + ) -> None: + pass \ No newline at end of file From a3ba6dfaf880afbeefe29f2ff584c6494350a6ac Mon Sep 17 00:00:00 2001 From: Tom Christie Date: Wed, 21 Jun 2023 11:27:21 +0100 Subject: [PATCH 02/18] Update _synchronization.py --- httpcore/_synchronization.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/httpcore/_synchronization.py b/httpcore/_synchronization.py index 2cb6d421..ee7281f9 100644 --- a/httpcore/_synchronization.py +++ b/httpcore/_synchronization.py @@ -202,9 +202,6 @@ def __init__(self) -> None: self._anyio_shield = anyio.CancelScope(shield=True) def __enter__(self) -> "AsyncShieldCancellation": - if not self._backend: - self.setup() - if self._backend == "trio": self._trio_shield.__enter__() else: @@ -279,4 +276,4 @@ def __exit__( exc_value: Optional[BaseException] = None, traceback: Optional[TracebackType] = None, ) -> None: - pass \ No newline at end of file + pass From 61cef57bb9fc20513d16e74976df8a42e162608f Mon Sep 17 00:00:00 2001 From: Tom Christie Date: Wed, 21 Jun 2023 11:33:27 +0100 Subject: [PATCH 03/18] Linting --- httpcore/_synchronization.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/httpcore/_synchronization.py b/httpcore/_synchronization.py index ee7281f9..8aba6a8d 100644 --- a/httpcore/_synchronization.py +++ b/httpcore/_synchronization.py @@ -266,7 +266,7 @@ def release(self) -> None: class ShieldCancellation: # Thread-synchronous codebases don't support cancellation semantics. # We have this class because we need to mirror the async and sync - # cases within our package, but it's just a no-op. + # cases within our package, but it's just a no-op. def __enter__(self) -> "ShieldCancellation": return self From aa86c6e617be66ffe7cf9361aeca3624bb8dc654 Mon Sep 17 00:00:00 2001 From: Tom Christie Date: Thu, 22 Jun 2023 12:50:27 +0100 Subject: [PATCH 04/18] Fix docstring wording --- httpcore/_synchronization.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/httpcore/_synchronization.py b/httpcore/_synchronization.py index 8aba6a8d..bae27c1b 100644 --- a/httpcore/_synchronization.py +++ b/httpcore/_synchronization.py @@ -182,7 +182,7 @@ class AsyncShieldCancellation: def __init__(self) -> None: """ Detect if we're running under 'asyncio' or 'trio' and create - a semaphore with the correct implementation. + a shielded scope with the correct implementation. """ self._backend = sniffio.current_async_library() From 962eef9defec684f2f69471ae106d96ea67a28dd Mon Sep 17 00:00:00 2001 From: Tom Christie Date: Mon, 26 Jun 2023 10:41:24 +0100 Subject: [PATCH 05/18] Add interim 'nocover' to show tests passing. --- httpcore/_synchronization.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/httpcore/_synchronization.py b/httpcore/_synchronization.py index bae27c1b..76391c6a 100644 --- a/httpcore/_synchronization.py +++ b/httpcore/_synchronization.py @@ -171,7 +171,7 @@ async def release(self) -> None: self._anyio_semaphore.release() -class AsyncShieldCancellation: +class AsyncShieldCancellation: # pragma: nocover # For certain portions of our codebase where we're dealing with # closing connections during exception handling we want to shield # the operation from being cancelled. @@ -263,7 +263,7 @@ def release(self) -> None: self._semaphore.release() -class ShieldCancellation: +class ShieldCancellation: # pragma: nocover # Thread-synchronous codebases don't support cancellation semantics. # We have this class because we need to mirror the async and sync # cases within our package, but it's just a no-op. From a7afdf5b2f52aab974285c6c99fb1a648101451b Mon Sep 17 00:00:00 2001 From: Tom Christie Date: Mon, 26 Jun 2023 10:44:32 +0100 Subject: [PATCH 06/18] Add failing test case for HTTP/1.1 cancellations --- tests/test_cancellations.py | 30 ++++++++++++++++++++++++++++++ 1 file changed, 30 insertions(+) create mode 100644 tests/test_cancellations.py diff --git a/tests/test_cancellations.py b/tests/test_cancellations.py new file mode 100644 index 00000000..6165aac9 --- /dev/null +++ b/tests/test_cancellations.py @@ -0,0 +1,30 @@ +import typing + +import anyio +import pytest + +import httpcore + + +class SlowWriteStream(httpcore.AsyncNetworkStream): + async def write( + self, buffer: bytes, timeout: typing.Optional[float] = None + ) -> None: + await anyio.sleep(2) + + async def aclose(self) -> None: + pass + + +@pytest.mark.anyio +async def test_h11_response_closed(): + """ + An async timeout on an HTTP/1.1 connection should leave the connection + in a neatly closed state. + """ + origin = httpcore.Origin(b"http", b"example.com", 80) + stream = SlowWriteStream() + async with httpcore.AsyncHTTP11Connection(origin, stream) as conn: + with anyio.move_on_after(0.001): + await conn.request("GET", "http://example.com") + assert conn.is_closed() From 6846a4f4b7dd5742dcbde3645152ed2d6aba97b6 Mon Sep 17 00:00:00 2001 From: Tom Christie Date: Mon, 26 Jun 2023 10:46:29 +0100 Subject: [PATCH 07/18] Neat cleanup for HTTP/1.1 write cancellations --- httpcore/_async/http11.py | 7 ++++--- httpcore/_sync/http11.py | 7 ++++--- 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/httpcore/_async/http11.py b/httpcore/_async/http11.py index 3ef6c80f..3d26b6f4 100644 --- a/httpcore/_async/http11.py +++ b/httpcore/_async/http11.py @@ -23,7 +23,7 @@ map_exceptions, ) from .._models import Origin, Request, Response -from .._synchronization import AsyncLock +from .._synchronization import AsyncLock, AsyncShieldCancellation from .._trace import Trace from .interfaces import AsyncConnectionInterface @@ -115,8 +115,9 @@ async def handle_async_request(self, request: Request) -> Response: }, ) except BaseException as exc: - async with Trace("response_closed", logger, request) as trace: - await self._response_closed() + with AsyncShieldCancellation(): + async with Trace("response_closed", logger, request) as trace: + await self._response_closed() raise exc # Sending the request... diff --git a/httpcore/_sync/http11.py b/httpcore/_sync/http11.py index 448cf8de..a21058ff 100644 --- a/httpcore/_sync/http11.py +++ b/httpcore/_sync/http11.py @@ -23,7 +23,7 @@ map_exceptions, ) from .._models import Origin, Request, Response -from .._synchronization import Lock +from .._synchronization import Lock, ShieldCancellation from .._trace import Trace from .interfaces import ConnectionInterface @@ -115,8 +115,9 @@ def handle_request(self, request: Request) -> Response: }, ) except BaseException as exc: - with Trace("response_closed", logger, request) as trace: - self._response_closed() + with ShieldCancellation(): + with Trace("response_closed", logger, request) as trace: + self._response_closed() raise exc # Sending the request... From 664d02b31d3dc3c3aad220520faf15b1b9afb2a1 Mon Sep 17 00:00:00 2001 From: Tom Christie Date: Mon, 26 Jun 2023 10:48:25 +0100 Subject: [PATCH 08/18] Drop 'nocover' for ShieldCancellation --- httpcore/_synchronization.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/httpcore/_synchronization.py b/httpcore/_synchronization.py index 76391c6a..bae27c1b 100644 --- a/httpcore/_synchronization.py +++ b/httpcore/_synchronization.py @@ -171,7 +171,7 @@ async def release(self) -> None: self._anyio_semaphore.release() -class AsyncShieldCancellation: # pragma: nocover +class AsyncShieldCancellation: # For certain portions of our codebase where we're dealing with # closing connections during exception handling we want to shield # the operation from being cancelled. @@ -263,7 +263,7 @@ def release(self) -> None: self._semaphore.release() -class ShieldCancellation: # pragma: nocover +class ShieldCancellation: # Thread-synchronous codebases don't support cancellation semantics. # We have this class because we need to mirror the async and sync # cases within our package, but it's just a no-op. From 956dbfd7e6dbe9a6837e2f6694613ee6b48beb9f Mon Sep 17 00:00:00 2001 From: Tom Christie Date: Mon, 26 Jun 2023 10:55:14 +0100 Subject: [PATCH 09/18] Add failing test case for HTTP/1.1 cancellations during response reading --- tests/test_cancellations.py | 58 ++++++++++++++++++++++++++++++++++--- 1 file changed, 54 insertions(+), 4 deletions(-) diff --git a/tests/test_cancellations.py b/tests/test_cancellations.py index 6165aac9..d70b983d 100644 --- a/tests/test_cancellations.py +++ b/tests/test_cancellations.py @@ -7,20 +7,48 @@ class SlowWriteStream(httpcore.AsyncNetworkStream): + """ + A stream that we can use to test cancellations during + the request writing. + """ + async def write( self, buffer: bytes, timeout: typing.Optional[float] = None ) -> None: - await anyio.sleep(2) + await anyio.sleep(999) async def aclose(self) -> None: pass +class SlowReadStream(httpcore.AsyncNetworkStream): + """ + A stream that we can use to test cancellations during + the response reading. + """ + + def __init__(self, buffer: typing.List[bytes]): + self._buffer = buffer + + async def write(self, buffer, timeout=None): + pass + + async def read( + self, max_bytes: int, timeout: typing.Optional[float] = None + ) -> bytes: + if not self._buffer: + await anyio.sleep(999) + return self._buffer.pop(0) + + async def aclose(self): + pass + + @pytest.mark.anyio -async def test_h11_response_closed(): +async def test_h11_timeout_during_request(): """ - An async timeout on an HTTP/1.1 connection should leave the connection - in a neatly closed state. + An async timeout on an HTTP/1.1 during the request writing + should leave the connection in a neatly closed state. """ origin = httpcore.Origin(b"http", b"example.com", 80) stream = SlowWriteStream() @@ -28,3 +56,25 @@ async def test_h11_response_closed(): with anyio.move_on_after(0.001): await conn.request("GET", "http://example.com") assert conn.is_closed() + + +@pytest.mark.anyio +async def test_h11_timeout_during_response(): + """ + An async timeout on an HTTP/1.1 during the response reading + should leave the connection in a neatly closed state. + """ + origin = httpcore.Origin(b"http", b"example.com", 80) + stream = SlowReadStream( + [ + b"HTTP/1.1 200 OK\r\n", + b"Content-Type: plain/text\r\n", + b"Content-Length: 1000\r\n", + b"\r\n", + b"Hello, world!...", + ] + ) + async with httpcore.AsyncHTTP11Connection(origin, stream) as conn: + with anyio.move_on_after(0.001): + await conn.request("GET", "http://example.com") + assert conn.is_closed() From 991888af33eaa551735bfcee9a5864ed0c3185f1 Mon Sep 17 00:00:00 2001 From: Tom Christie Date: Mon, 26 Jun 2023 10:57:25 +0100 Subject: [PATCH 10/18] Resolve failing test case --- httpcore/_async/http11.py | 3 ++- httpcore/_sync/http11.py | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/httpcore/_async/http11.py b/httpcore/_async/http11.py index 3d26b6f4..7ad36642 100644 --- a/httpcore/_async/http11.py +++ b/httpcore/_async/http11.py @@ -320,7 +320,8 @@ async def __aiter__(self) -> AsyncIterator[bytes]: # If we get an exception while streaming the response, # we want to close the response (and possibly the connection) # before raising that exception. - await self.aclose() + with AsyncShieldCancellation(): + await self.aclose() raise exc async def aclose(self) -> None: diff --git a/httpcore/_sync/http11.py b/httpcore/_sync/http11.py index a21058ff..edcce72a 100644 --- a/httpcore/_sync/http11.py +++ b/httpcore/_sync/http11.py @@ -320,7 +320,8 @@ def __iter__(self) -> Iterator[bytes]: # If we get an exception while streaming the response, # we want to close the response (and possibly the connection) # before raising that exception. - self.close() + with ShieldCancellation(): + self.close() raise exc def close(self) -> None: From 57bee0f6546e6a524eb00f3fdd60e2f555a67622 Mon Sep 17 00:00:00 2001 From: Tom Christie Date: Mon, 26 Jun 2023 11:17:52 +0100 Subject: [PATCH 11/18] Add failing test cases for cancellations on connection pools --- tests/test_cancellations.py | 53 +++++++++++++++++++++++++++++++++++++ 1 file changed, 53 insertions(+) diff --git a/tests/test_cancellations.py b/tests/test_cancellations.py index d70b983d..2adbb680 100644 --- a/tests/test_cancellations.py +++ b/tests/test_cancellations.py @@ -44,6 +44,59 @@ async def aclose(self): pass +class SlowWriteBackend(httpcore.AsyncNetworkBackend): + async def connect_tcp( + self, + host: str, + port: int, + timeout: typing.Optional[float] = None, + local_address: typing.Optional[str] = None, + socket_options: typing.Optional[typing.Iterable[httpcore.SOCKET_OPTION]] = None, + ) -> httpcore.AsyncNetworkStream: + return SlowWriteStream() + + +class SlowReadBackend(httpcore.AsyncNetworkBackend): + def __init__(self, buffer: typing.List[bytes]): + self._buffer = buffer + + async def connect_tcp( + self, + host: str, + port: int, + timeout: typing.Optional[float] = None, + local_address: typing.Optional[str] = None, + socket_options: typing.Optional[typing.Iterable[httpcore.SOCKET_OPTION]] = None, + ) -> httpcore.AsyncNetworkStream: + return SlowReadStream(self._buffer) + + +@pytest.mark.anyio +async def test_connection_pool_timeout_during_request(): + network_backend = SlowWriteBackend() + async with httpcore.AsyncConnectionPool(network_backend=network_backend) as pool: + with anyio.move_on_after(0.001): + await pool.request("GET", "http://example.com") + assert not pool.connections + + +@pytest.mark.anyio +async def test_connection_pool_timeout_during_response(): + network_backend = SlowReadBackend( + [ + b"HTTP/1.1 200 OK\r\n", + b"Content-Type: plain/text\r\n", + b"Content-Length: 1000\r\n", + b"\r\n", + b"Hello, world!...", + ] + ) + async with httpcore.AsyncConnectionPool(network_backend=network_backend) as pool: + with anyio.move_on_after(0.001): + await pool.request("GET", "http://example.com") + assert not pool.connections + + @pytest.mark.anyio async def test_h11_timeout_during_request(): """ From 8bd548ff2e818d7931b1ff1b675ed5646bf76f51 Mon Sep 17 00:00:00 2001 From: Tom Christie Date: Mon, 26 Jun 2023 11:19:10 +0100 Subject: [PATCH 12/18] Resolve failing test cases --- httpcore/_async/connection_pool.py | 8 +++++--- httpcore/_sync/connection_pool.py | 8 +++++--- 2 files changed, 10 insertions(+), 6 deletions(-) diff --git a/httpcore/_async/connection_pool.py b/httpcore/_async/connection_pool.py index 935f34db..ddc0510e 100644 --- a/httpcore/_async/connection_pool.py +++ b/httpcore/_async/connection_pool.py @@ -7,7 +7,7 @@ from .._backends.base import SOCKET_OPTION, AsyncNetworkBackend from .._exceptions import ConnectionNotAvailable, UnsupportedProtocol from .._models import Origin, Request, Response -from .._synchronization import AsyncEvent, AsyncLock +from .._synchronization import AsyncEvent, AsyncLock, AsyncShieldCancellation from .connection import AsyncHTTPConnection from .interfaces import AsyncConnectionInterface, AsyncRequestInterface @@ -257,7 +257,8 @@ async def handle_async_request(self, request: Request) -> Response: status.unset_connection() await self._attempt_to_acquire_connection(status) except BaseException as exc: - await self.response_closed(status) + with AsyncShieldCancellation(): + await self.response_closed(status) raise exc else: break @@ -351,4 +352,5 @@ async def aclose(self) -> None: if hasattr(self._stream, "aclose"): await self._stream.aclose() finally: - await self._pool.response_closed(self._status) + with AsyncShieldCancellation(): + await self._pool.response_closed(self._status) diff --git a/httpcore/_sync/connection_pool.py b/httpcore/_sync/connection_pool.py index f64334af..dbcaff1f 100644 --- a/httpcore/_sync/connection_pool.py +++ b/httpcore/_sync/connection_pool.py @@ -7,7 +7,7 @@ from .._backends.base import SOCKET_OPTION, NetworkBackend from .._exceptions import ConnectionNotAvailable, UnsupportedProtocol from .._models import Origin, Request, Response -from .._synchronization import Event, Lock +from .._synchronization import Event, Lock, ShieldCancellation from .connection import HTTPConnection from .interfaces import ConnectionInterface, RequestInterface @@ -257,7 +257,8 @@ def handle_request(self, request: Request) -> Response: status.unset_connection() self._attempt_to_acquire_connection(status) except BaseException as exc: - self.response_closed(status) + with ShieldCancellation(): + self.response_closed(status) raise exc else: break @@ -351,4 +352,5 @@ def close(self) -> None: if hasattr(self._stream, "close"): self._stream.close() finally: - self._pool.response_closed(self._status) + with ShieldCancellation(): + self._pool.response_closed(self._status) From 7b9a3f4d9129efb73c4667f809b5d793389a63ec Mon Sep 17 00:00:00 2001 From: Tom Christie Date: Mon, 26 Jun 2023 19:16:53 +0100 Subject: [PATCH 13/18] Add failing test cases for cancellations on HTTP/2 connections --- tests/test_cancellations.py | 63 ++++++++++++++++++++++++++++++++++--- 1 file changed, 59 insertions(+), 4 deletions(-) diff --git a/tests/test_cancellations.py b/tests/test_cancellations.py index 2adbb680..00ba87fa 100644 --- a/tests/test_cancellations.py +++ b/tests/test_cancellations.py @@ -21,6 +21,27 @@ async def aclose(self) -> None: pass +class HandshakeThenSlowWriteStream(httpcore.AsyncNetworkStream): + """ + A stream that we can use to test cancellations during + the request writing. + """ + + def __init__(self) -> None: + self._handshake_complete = False + + async def write( + self, buffer: bytes, timeout: typing.Optional[float] = None + ) -> None: + if not self._handshake_complete: + self._handshake_complete = True + else: + await anyio.sleep(999) + + async def aclose(self) -> None: + pass + + class SlowReadStream(httpcore.AsyncNetworkStream): """ A stream that we can use to test cancellations during @@ -75,7 +96,7 @@ async def connect_tcp( async def test_connection_pool_timeout_during_request(): network_backend = SlowWriteBackend() async with httpcore.AsyncConnectionPool(network_backend=network_backend) as pool: - with anyio.move_on_after(0.001): + with anyio.move_on_after(0.01): await pool.request("GET", "http://example.com") assert not pool.connections @@ -92,7 +113,7 @@ async def test_connection_pool_timeout_during_response(): ] ) async with httpcore.AsyncConnectionPool(network_backend=network_backend) as pool: - with anyio.move_on_after(0.001): + with anyio.move_on_after(0.01): await pool.request("GET", "http://example.com") assert not pool.connections @@ -106,7 +127,7 @@ async def test_h11_timeout_during_request(): origin = httpcore.Origin(b"http", b"example.com", 80) stream = SlowWriteStream() async with httpcore.AsyncHTTP11Connection(origin, stream) as conn: - with anyio.move_on_after(0.001): + with anyio.move_on_after(0.01): await conn.request("GET", "http://example.com") assert conn.is_closed() @@ -128,6 +149,40 @@ async def test_h11_timeout_during_response(): ] ) async with httpcore.AsyncHTTP11Connection(origin, stream) as conn: - with anyio.move_on_after(0.001): + with anyio.move_on_after(0.01): + await conn.request("GET", "http://example.com") + assert conn.is_closed() + + +@pytest.mark.anyio +async def test_h2_timeout_during_handshake(): + """ + An async timeout on an HTTP/2 during the initial handshake + should leave the connection in a neatly closed state. + """ + origin = httpcore.Origin(b"http", b"example.com", 80) + stream = SlowWriteStream() + async with httpcore.AsyncHTTP2Connection(origin, stream) as conn: + with anyio.move_on_after(0.01): await conn.request("GET", "http://example.com") assert conn.is_closed() + + +@pytest.mark.anyio +async def test_h2_timeout_during_request(): + """ + An async timeout on an HTTP/2 during a request + should leave the connection in a neatly idle state. + + The connection is not closed because it is multiplexed, + and a timeout on one request does not require the entire + connection be closed. + """ + origin = httpcore.Origin(b"http", b"example.com", 80) + stream = HandshakeThenSlowWriteStream() + async with httpcore.AsyncHTTP2Connection(origin, stream) as conn: + with anyio.move_on_after(0.01): + await conn.request("GET", "http://example.com") + + assert not conn.is_closed() + assert conn.is_idle() From 9c85920d1f205a4ac168e3ca2e157200fb1e89cb Mon Sep 17 00:00:00 2001 From: Tom Christie Date: Mon, 26 Jun 2023 19:17:44 +0100 Subject: [PATCH 14/18] Resolve failing test cases --- httpcore/_async/http2.py | 23 +++++++++++++++-------- httpcore/_sync/http2.py | 23 +++++++++++++++-------- 2 files changed, 30 insertions(+), 16 deletions(-) diff --git a/httpcore/_async/http2.py b/httpcore/_async/http2.py index cc957601..20c0b8a2 100644 --- a/httpcore/_async/http2.py +++ b/httpcore/_async/http2.py @@ -17,7 +17,7 @@ RemoteProtocolError, ) from .._models import Origin, Request, Response -from .._synchronization import AsyncLock, AsyncSemaphore +from .._synchronization import AsyncLock, AsyncSemaphore, AsyncShieldCancellation from .._trace import Trace from .interfaces import AsyncConnectionInterface @@ -103,9 +103,15 @@ async def handle_async_request(self, request: Request) -> Response: async with self._init_lock: if not self._sent_connection_init: - kwargs = {"request": request} - async with Trace("send_connection_init", logger, request, kwargs): - await self._send_connection_init(**kwargs) + try: + kwargs = {"request": request} + async with Trace("send_connection_init", logger, request, kwargs): + await self._send_connection_init(**kwargs) + except BaseException as exc: + with AsyncShieldCancellation(): + await self.aclose() + raise exc + self._sent_connection_init = True # Initially start with just 1 until the remote server provides @@ -154,10 +160,11 @@ async def handle_async_request(self, request: Request) -> Response: "stream_id": stream_id, }, ) - except Exception as exc: # noqa: PIE786 - kwargs = {"stream_id": stream_id} - async with Trace("response_closed", logger, request, kwargs): - await self._response_closed(stream_id=stream_id) + except BaseException as exc: # noqa: PIE786 + with AsyncShieldCancellation(): + kwargs = {"stream_id": stream_id} + async with Trace("response_closed", logger, request, kwargs): + await self._response_closed(stream_id=stream_id) if isinstance(exc, h2.exceptions.ProtocolError): # One case where h2 can raise a protocol error is when a diff --git a/httpcore/_sync/http2.py b/httpcore/_sync/http2.py index fbbc67bf..02870514 100644 --- a/httpcore/_sync/http2.py +++ b/httpcore/_sync/http2.py @@ -17,7 +17,7 @@ RemoteProtocolError, ) from .._models import Origin, Request, Response -from .._synchronization import Lock, Semaphore +from .._synchronization import Lock, Semaphore, ShieldCancellation from .._trace import Trace from .interfaces import ConnectionInterface @@ -103,9 +103,15 @@ def handle_request(self, request: Request) -> Response: with self._init_lock: if not self._sent_connection_init: - kwargs = {"request": request} - with Trace("send_connection_init", logger, request, kwargs): - self._send_connection_init(**kwargs) + try: + kwargs = {"request": request} + with Trace("send_connection_init", logger, request, kwargs): + self._send_connection_init(**kwargs) + except BaseException as exc: + with ShieldCancellation(): + self.close() + raise exc + self._sent_connection_init = True # Initially start with just 1 until the remote server provides @@ -154,10 +160,11 @@ def handle_request(self, request: Request) -> Response: "stream_id": stream_id, }, ) - except Exception as exc: # noqa: PIE786 - kwargs = {"stream_id": stream_id} - with Trace("response_closed", logger, request, kwargs): - self._response_closed(stream_id=stream_id) + except BaseException as exc: # noqa: PIE786 + with ShieldCancellation(): + kwargs = {"stream_id": stream_id} + with Trace("response_closed", logger, request, kwargs): + self._response_closed(stream_id=stream_id) if isinstance(exc, h2.exceptions.ProtocolError): # One case where h2 can raise a protocol error is when a From 317e17c79303963141f38e781ef2addecbdd6d2a Mon Sep 17 00:00:00 2001 From: Tom Christie Date: Mon, 26 Jun 2023 19:55:25 +0100 Subject: [PATCH 15/18] Add failing test cases for cancellations on HTTP/2 connections when reading response --- tests/test_cancellations.py | 56 ++++++++++++++++++++++++++++++++++++- 1 file changed, 55 insertions(+), 1 deletion(-) diff --git a/tests/test_cancellations.py b/tests/test_cancellations.py index 00ba87fa..b5a71871 100644 --- a/tests/test_cancellations.py +++ b/tests/test_cancellations.py @@ -1,6 +1,8 @@ import typing import anyio +import hpack +import hyperframe import pytest import httpcore @@ -24,7 +26,8 @@ async def aclose(self) -> None: class HandshakeThenSlowWriteStream(httpcore.AsyncNetworkStream): """ A stream that we can use to test cancellations during - the request writing. + the HTTP/2 request writing, after allowing the initial + handshake to complete. """ def __init__(self) -> None: @@ -94,6 +97,13 @@ async def connect_tcp( @pytest.mark.anyio async def test_connection_pool_timeout_during_request(): + """ + An async timeout when writing an HTTP/1.1 response on the connection pool + should leave the pool in a consistent state. + + In this case, that means the connection will become closed, and no + longer remain in the pool. + """ network_backend = SlowWriteBackend() async with httpcore.AsyncConnectionPool(network_backend=network_backend) as pool: with anyio.move_on_after(0.01): @@ -103,6 +113,13 @@ async def test_connection_pool_timeout_during_request(): @pytest.mark.anyio async def test_connection_pool_timeout_during_response(): + """ + An async timeout when reading an HTTP/1.1 response on the connection pool + should leave the pool in a consistent state. + + In this case, that means the connection will become closed, and no + longer remain in the pool. + """ network_backend = SlowReadBackend( [ b"HTTP/1.1 200 OK\r\n", @@ -186,3 +203,40 @@ async def test_h2_timeout_during_request(): assert not conn.is_closed() assert conn.is_idle() + + +@pytest.mark.anyio +async def test_h2_timeout_during_response(): + """ + An async timeout on an HTTP/2 during the response reading + should leave the connection in a neatly idle state. + + The connection is not closed because it is multiplexed, + and a timeout on one request does not require the entire + connection be closed. + """ + origin = httpcore.Origin(b"http", b"example.com", 80) + stream = SlowReadStream( + [ + hyperframe.frame.SettingsFrame().serialize(), + hyperframe.frame.HeadersFrame( + stream_id=1, + data=hpack.Encoder().encode( + [ + (b":status", b"200"), + (b"content-type", b"plain/text"), + ] + ), + flags=["END_HEADERS"], + ).serialize(), + hyperframe.frame.DataFrame( + stream_id=1, data=b"Hello, world!...", flags=[] + ).serialize(), + ] + ) + async with httpcore.AsyncHTTP2Connection(origin, stream) as conn: + with anyio.move_on_after(0.01): + await conn.request("GET", "http://example.com") + + assert not conn.is_closed() + assert conn.is_idle() From 5dc0af8a3dd1170e2233bb317d6fe22ad08b908b Mon Sep 17 00:00:00 2001 From: Tom Christie Date: Mon, 26 Jun 2023 19:56:15 +0100 Subject: [PATCH 16/18] Resolve failing test cases --- httpcore/_async/http2.py | 3 ++- httpcore/_sync/http2.py | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/httpcore/_async/http2.py b/httpcore/_async/http2.py index 20c0b8a2..8dc776ff 100644 --- a/httpcore/_async/http2.py +++ b/httpcore/_async/http2.py @@ -577,7 +577,8 @@ async def __aiter__(self) -> typing.AsyncIterator[bytes]: # If we get an exception while streaming the response, # we want to close the response (and possibly the connection) # before raising that exception. - await self.aclose() + with AsyncShieldCancellation(): + await self.aclose() raise exc async def aclose(self) -> None: diff --git a/httpcore/_sync/http2.py b/httpcore/_sync/http2.py index 02870514..d141d459 100644 --- a/httpcore/_sync/http2.py +++ b/httpcore/_sync/http2.py @@ -577,7 +577,8 @@ def __iter__(self) -> typing.Iterator[bytes]: # If we get an exception while streaming the response, # we want to close the response (and possibly the connection) # before raising that exception. - self.close() + with ShieldCancellation(): + self.close() raise exc def close(self) -> None: From 988bab076ff106c50e4bf3ea0b25a642dead863e Mon Sep 17 00:00:00 2001 From: Tom Christie Date: Mon, 26 Jun 2023 19:59:19 +0100 Subject: [PATCH 17/18] Update CHANGELOG --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 911371d5..4fc3b5bf 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/). ## unreleased - The networking backend interface has [been added to the public API](https://www.encode.io/httpcore/network-backends). Some classes which were previously private implementation detail are now part of the top-level public API. (#699) +- Support async cancellations, ensuring that the connection pool is left in a clean state when cancellations occur. (#726) - Graceful handling of HTTP/2 GoAway frames, with requests being transparently retried on a new connection. (#730) - Add exceptions when a synchronous `trace callback` is passed to an asynchronous request or an asynchronous `trace callback` is passed to a synchronous request. (#717) From ab359f5596f86e2ba90d7812954384a3472f74f1 Mon Sep 17 00:00:00 2001 From: Tom Christie Date: Tue, 27 Jun 2023 11:16:07 +0100 Subject: [PATCH 18/18] Fix yield behaviour --- tests/test_cancellations.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/test_cancellations.py b/tests/test_cancellations.py index b5a71871..fe8d3c91 100644 --- a/tests/test_cancellations.py +++ b/tests/test_cancellations.py @@ -20,7 +20,7 @@ async def write( await anyio.sleep(999) async def aclose(self) -> None: - pass + await anyio.sleep(0) class HandshakeThenSlowWriteStream(httpcore.AsyncNetworkStream): @@ -42,7 +42,7 @@ async def write( await anyio.sleep(999) async def aclose(self) -> None: - pass + await anyio.sleep(0) class SlowReadStream(httpcore.AsyncNetworkStream): @@ -65,7 +65,7 @@ async def read( return self._buffer.pop(0) async def aclose(self): - pass + await anyio.sleep(0) class SlowWriteBackend(httpcore.AsyncNetworkBackend):