Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for streaming responses to ASGITransport #998

Closed
wants to merge 9 commits into from

Conversation

florimondmanca
Copy link
Member

@florimondmanca florimondmanca commented May 25, 2020

cc @juhovh-aiven

An alternative for #994, with support for asyncio and trio.

@tomchristie I took a different approach than the "nursery on the transport" approach you mentioned in #994 (comment). As it turns out, we can get away with a per-.request() "background task manager" kind of thing. As a result, we don't need any __aenter__()/__aexit__() on transports.

@florimondmanca florimondmanca added the enhancement New feature or request label May 25, 2020
@ghost
Copy link

ghost commented May 26, 2020

I actually started doing something similar, but when I got to the point of calling __aexit__ manually and doing multiple sniffio checks I kind of gave up and decided to go with the simplest approach and drop trio, since I don't know how we want to handle that.

However, in general this seems like it should work and is a very cool hack to work around the API limitations. Also returning the aclose hides the internal implementation nicely, and also using channel + Queue is nice (was considering that as well at some point).

I personally think we should probably go with this approach and consider supporting an internal context manager for transports later on (which in the case of trio could be a nursery). Only suggestion for improvement is that we might want to call the aclose in some finally block just to make sure that it always gets called. Also, I see run_until_first_complete approach is what Starlette is using as well, so it makes sense to go with the same pattern, it does make things quite readable.

httpx/_transports/asgi.py Outdated Show resolved Hide resolved
yeraydiazdiaz
yeraydiazdiaz previously approved these changes Jun 1, 2020
Copy link
Contributor

@yeraydiazdiaz yeraydiazdiaz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks great to me, nice job! 💯

My only nitpick would be the request method is getting a bit long and hard to follow, but I don't have any suggestions on how to best break it up atm. In any case I'd rather land this as it stands and revisit later if we feel it's worth it.

Copy link

@ghost ghost left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FWIW I approve this pull request as well. 😄

@tomchristie
Copy link
Member

I'm going to have to be really awkward, and say that I'm not sure I'm comfortable with us jumping in on this one, at least without spending a chunk of review that I've not managed to make for it yet.

@florimondmanca
Copy link
Member Author

@yeraydiazdiaz Yeah I sort of agree; though the only refactor I can think of is a class, with the stuff we initialize as attributes and functions we define as methods. I’m not convinced it would be shorter/easier to read (especially since I predict some new difficulties due to the fact we won’t be able to just use closures), but defo something to explore.

@tomchristie Sure, happy for this to wait on some more reviews. I’d actually probably want to give this a new look myself...

@florimondmanca
Copy link
Member Author

florimondmanca commented Jun 13, 2020

I reworked this PR into something I'm more confident with, using a channel API inspired by trio's for the response body… I think it makes better sense now, and there's no more low-level event-based synchronization.

Happy to have a new pair of 👀 on this. I reckon there are some bits that are not strictly relevant to adding streaming response support (reorganization of variables, run tests on trio too, etc), I'll see if I can add those as separate PRs.

@florimondmanca florimondmanca requested a review from a team June 15, 2020 17:11
@tomchristie
Copy link
Member

So I'm pretty cautious against us including a case where we're calling directly into a nursery __aenter__, __aexit__, since we're essentially just digging into the internals and bypassing the proper protections of structured concurrency by doing that.

I think the streaming ASGI case is a good point for use to really make sure that our transport API also properly encapsulates the case of transports that need to be used in a strictly context-managed manner, and this is a neat side-step and functional, but also bypasses us taking a "here's how a context-managed transport implementation looks" approach.

So. Not necessarily 100% a no on this now, but also not sufficiently convinced that we oughta roll it in.

I'm kinda okay that our ASGI use-case has the non-streaming constraint at the moment, at least until we've got an approach that feels like it's more properly in line with structured concurrency. In the meantime users who do strictly need this could feasibly package it up as an alternate transport implementation.

Hoping I've managed to explain this sufficiently clearly? Perhaps something I could try to spend some time thrashing out more fully in the next few days?

@florimondmanca
Copy link
Member Author

@tomchristie I must say I'm a bit confused, and I don't think I have a vision for what you're thinking of. So…

Perhaps something I could try to spend some time thrashing out more fully in the next few days?

I think that would help. :-)

@tomchristie tomchristie mentioned this pull request Jul 27, 2020
11 tasks
@ghost
Copy link

ghost commented Jul 28, 2020

Just for the record, I'm using this version internally as a custom transport and it's working alright. What's a bit inconvenient on that side is that I had to basically duplicate ContentStream and also the other helpers ByteStream and AsyncIteratorStream since they are only available in private modules. I see @tomchristie has been pondering about making ContentStream public in 1.0 so just thought it might be useful feedback, basically these content streams are quite helpful if making custom transports, but probably not all that useful otherwise. Interestingly also imported StreamConsumed from httpx, and it was the only httpx specific part, everything else is httpcore now which is cool.

I'm not sure, but I think I get the idea that @tomchristie is saying about transports are encapsulated in a context-managed manner, in case it means that the transports should have a clear lifecycle and they would be (async) context managers themselves. I need to point out that in this case it would still require calling __aenter__ and __aexit__ manually, but at least it feels a bit cleaner to call __aenter__ in another __aenter__ and __aexit__ in another __aexit__ which would basically ensure that the contexts are in sync. Currently it's a bit chaotic and it's theoretically possible that in some corner cases they get out of sync one way or another (exit called before enter or enter called without exit).

@tomchristie
Copy link
Member

@juhovh-aiven Thanks, there's some good observations there.

Something to note is that we don't really need to be using ContentStream here. They provide for optionally replay-able streams, which we use when sending requests. Really we probably ought to switch the implementation over to instead just using the plain httpcore.AsyncByteStream() class to keep things clearer.

Similarly I don't think you should need to be importing StreamConsumed.

I think I get the idea that @tomchristie is saying about transports are encapsulated in a context-managed manner, in case it means that the transports should have a clear lifecycle and they would be (async) context managers themselves.

That's about it, yup. Essentially two things here:

  • We ought to be calling into the transport __aenter__ and __aexit__ methods from the client __aenter__ and __exit__ methods, so that installed transports end up with the same context management as the client.
  • If we're calling into a trio nursery __aenter__/__aexit__ directly, then we really ought to make sure to only do that from a the __aenter__/__aexit__ of the transport class. Otherwise we're essentially just bypassing the structured concurrency constraint that it's intended for by using a bit of internal trickery.

@florimondmanca
Copy link
Member Author

florimondmanca commented Aug 8, 2020

Also, the reason I'm using anyio here is because implementing a task group for asyncio with proper exception handling is super hard. (I tried. Currently I think a proper task group implementation needs to interact with events and queues so that exceptions that occur concurrently during await operations are immediately surfaced instead of making the await's wait forever. But I'm not sure.)

@florimondmanca florimondmanca force-pushed the asgi-streaming branch 2 times, most recently from 111ed19 to dc940e7 Compare August 8, 2020 12:32
@tomchristie
Copy link
Member

tomchristie commented Aug 8, 2020

This is pretty elegant actually.

I’m a bit cautious wrt. calling directly into __aenter__ and __aexit__, since it doesn’t really provide the same resilience that structured concurrency brings. (Ie. We’re actually side-stepping it slightly, and if we fail to call into the stream.aclose() then we’re leaking tasks.) but having said that, it’s done in a really limited way, that might be okay.

The alternative here would be a task group instance that is scoped to the transport __aenter__/__aexit__, which is broader scoping that we really want, but at least holds more tightly that they’ll def. be cleaned up.

I haven’t quite gotten my head around if there’s an alternative to the Transport API, that would more neatly also encapsulate context management around the response lifetime, rather than the rather loose “you’ve got to call .aclose on this resource” that we’re working with at the moment.

@ghost
Copy link

ghost commented Aug 9, 2020

I also read through the changes and like this version quite a bit, anyio seems to nicely hide the complexity behind supporting both asyncio and trio, and making the run_asgi an async contextmanager encapsulates the context into a very simple __aenter__ and __aexit__ pair, which is quite clever.

I personally think the .aclose is not that big of a problem, since it's supported by httpcore.AsyncIteratorByteStream and a failure to call it in some error case is likely to impact not just this transport but any other component built on that stream. I think the main problem we have here is that the context manager pattern doesn't fit this type of return headers first and then stream until finished case very well. If we want to keep the context manager per request (which would make the most sense, since don't want to leak resources between requests), every alternative solution I can currently think of would look more or less like the current one.

Basically the only thing I can think of that we could do is to push the .aclose logic to one level higher and make the transport itself the context manager, the end result would be pretty much the same but it would not have to lie inside the transport. That would make the transport look like run_asgi async context manager, and the request would simply return that async context manager without doing anything else. Only difference would be to push the b"HTTP/1.1" inside the run_asgi function.

Even in that case we might want to consider merging this (assuming anyio is alright), since refactoring would be fairly trivial and the current solution is pretty low risk.

@florimondmanca
Copy link
Member Author

Basically the only thing I can think of that we could do is to push the .aclose logic to one level higher and make the transport itself the context manager, the end result would be pretty much the same but it would not have to lie inside the transport. That would make the transport look like run_asgi async context manager, and the request would simply return that async context manager without doing anything else. Only difference would be to push the b"HTTP/1.1" inside the run_asgi function.

Transports are technically already context managers (async with ASGITransport(...) as http: ...), so I suppose you meant "make transport.request() a context manager". Well, this is exactly the conclusion I arrived to reading your answer and Tom's! And it sounds ✨ brilliant ✨.

Lucky as we are, Python's with <ctx> as <...>: ... allows <...> to be destructured, so that it's possible for __enter__()/__aenter__() to return a tuple of values and destructure them into single variables.

So, such a proposal would mean transports would be used like this…

async with ASGITransport(app=app) as transport:
    async with transport.request(method, url, headers, stream) as (
        http_version,
        status_code,
        reason_phrase,
        headers,
        stream,
    ):
        pass

There are several benefits to this:

  • This better follows principles of structured concurrency, in that the async with control flow at the request level now guarantees that the response stream is now always cleaned up on exit. (Currently it's possible to eg only inspect status_code and forget that stream.aclose() must be called, which is a pretty big gotcha.)
  • The ASGITransport implementation could be written in a much more natural style, without bridging from the async with world of structured concurrency and the callback-based world where we manually call __aenter__() and __aexit__(). So, like so:
from contextlib import asynccontextmanager
import httpcore

class ASGITransport(httpcore.AsyncHTTPTransport):
    @asynccontextmanager
    async def request(self, ...):
        async with anyio.create_task_group() as tg:
            await tg.spawn(app, ...)
            yield (http_version, status_code, reason_phrase, headers, stream)

Lemme pitch this in the HTTPCore repo. It's a pretty fundamental idea since it changes the transport API a bit, which is something we ought to do before HTTPX reaches 1.0.

@florimondmanca
Copy link
Member Author

-> Filed encode/httpcore#145

@tomchristie
Copy link
Member

So, the tricky bit here that I've not figured out is how that would work together with the internal client flow, in particular dealing with redirects. (Or if it even could work there.)

@florimondmanca
Copy link
Member Author

Haven't thought about that either, and there's probably going to be some refactoring required to accomodate this change. :-) But I'm having the strong intuition that this change would lead to a more correct API, that we can then accomodate in HTTPX. (HTTPX does a good job of separating the streaming case, which enforces the async with stream() as response: ... usage, vs the non-streaming case, which closes all resources before returning a plain Response object, so I think we can make things work on that front, at least without breaking any public API.)

@tomchristie
Copy link
Member

Righty. We'll need to update this given the 0.18 Transport API, then it's probably time to think about getting this in.

@tomchristie
Copy link
Member

Dropping the milestone on this. We want to get it in for sure, but it's not API changing so it doesn't really need to block our 0.18 release.

@stale
Copy link

stale bot commented Feb 19, 2022

This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request wontfix
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants