Skip to content
This repository has been archived by the owner on Jan 13, 2021. It is now read-only.

Implement server push #40

Merged
merged 13 commits into from
Apr 9, 2014
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CONTRIBUTORS.rst
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,4 @@ In chronological order:
- Alek Storm (@alekstorm)

- Implemented Python 2.7 support.
- Implemented server push.
52 changes: 52 additions & 0 deletions docs/source/advanced.rst
Original file line number Diff line number Diff line change
Expand Up @@ -108,3 +108,55 @@ Note that we don't plug an instance of the class in, we plug the class itself
in. We do this because the connection object will spawn instances of the class
in order to manage the flow control windows of streams in addition to managing
the window of the connection itself.

.. _server-push:

Server Push
-----------

HTTP/2.0 provides servers with the ability to "push" additional resources to
clients in response to a request, as if the client had requested the resources
themselves. When minimizing the number of round trips is more critical than
maximizing bandwidth usage, this can be a significant performance improvement.

Servers may declare their intention to push a given resource by sending the
headers and other metadata of a request that would return that resource - this
is referred to as a "push promise". They may do this before sending the response
headers for the original request, after, or in the middle of sending the
response body.

In order to receive pushed resources, the
:class:`HTTP20Connection <hyper.HTTP20Connection>` object must be constructed
with ``enable_push=True``.

You may retrieve the push promises that the server has sent *so far* by calling
:meth:`getpushes() <hyper.HTTP20Connection.getpushes>`, which returns a
generator that yields :class:`HTTP20Push <hyper.HTTP20Push>` objects. Note that
this method is not idempotent; promises returned in one call will not be
returned in subsequent calls. If ``capture_all=False`` is passed (the default),
the generator will yield all buffered push promises without blocking. However,
if ``capture_all=True`` is passed, the generator will first yield all buffered
push promises, then yield additional ones as they arrive, and terminate when the
original stream closes. Using this parameter is only recommended when it is
known that all pushed streams, or a specific one, are of higher priority than
the original response, or when also processing the original response in a
separate thread (N.B. do not do this; ``hyper`` is not yet thread-safe)::

conn.request('GET', '/')
response = conn.getheaders()
for push in conn.getpushes(): # all pushes promised before response headers
print(push.path)
conn.read()
for push in conn.getpushes(): # all other pushes
print(push.path)

To cancel an in-progress pushed stream (for example, if the user already has
the given path in cache), call
:meth:`HTTP20Push.cancel() <hyper.HTTP20Push.cancel>`.

``hyper`` does not currently verify that pushed resources comply with the
Same-Origin Policy, so users must take care that they do not treat pushed
resources as authoritative without performing this check themselves (since
the server push mechanism is only an optimization, and clients are free to
issue requests for any pushed resources manually, there is little downside to
simply ignoring suspicious ones).
3 changes: 3 additions & 0 deletions docs/source/api.rst
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ Primary HTTP/2.0 Interface
.. autoclass:: hyper.HTTP20Response
:inherited-members:

.. autoclass:: hyper.HTTP20Push
:inherited-members:

Requests Transport Adapter
--------------------------

Expand Down
10 changes: 1 addition & 9 deletions docs/source/faq.rst
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,7 @@ It should! If you find it doesn't, that's a bug: please `report it on GitHub`_.
Does ``hyper`` support Server Push?
-----------------------------------

No, and I don't think it ever will directly. Support for Server Push
effectively *mandates* a multithreaded or event-loop based programming style,
which is incompatible with most current Python HTTP code. For that reason,
``hyper``'s default API is unlikely to ever allow Server Push.

However, there's no reason the underlying framing and stream layers couldn't
support it. If ``hyper`` ever grows a server implementation or a fully
event-loop based implementation, I'll revisit the decision not to support
Server Push.
Yes! See :ref:`server-push`.

I hit a bug! What should I do?
------------------------------
Expand Down
4 changes: 2 additions & 2 deletions hyper/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,14 @@
__version__ = '0.0.4'

from .http20.connection import HTTP20Connection
from .http20.response import HTTP20Response
from .http20.response import HTTP20Response, HTTP20Push

# Throw import errors on Python <2.7 and 3.0-3.2.
import sys as _sys
if _sys.version_info < (2,7) or (3,0) <= _sys.version_info < (3,3):
raise ImportError("hyper only supports Python 2.7 and Python 3.3 or higher.")

__all__ = [HTTP20Response, HTTP20Connection]
__all__ = [HTTP20Response, HTTP20Push, HTTP20Connection]

# Set default logging handler.
import logging
Expand Down
90 changes: 67 additions & 23 deletions hyper/http20/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,14 @@
from .stream import Stream
from .tls import wrap_socket
from .frame import (
DataFrame, HeadersFrame, SettingsFrame, Frame, WindowUpdateFrame,
GoAwayFrame
DataFrame, HeadersFrame, PushPromiseFrame, RstStreamFrame, SettingsFrame,
Frame, WindowUpdateFrame, GoAwayFrame
)
from .response import HTTP20Response, HTTP20Push
from .window import FlowControlManager
from .exceptions import ConnectionError

import errno
import logging
import socket

Expand Down Expand Up @@ -42,8 +44,11 @@ class HTTP20Connection(object):
If not provided,
:class:`FlowControlManager <hyper.http20.window.FlowControlManager>`
will be used.
:param enable_push: Whether the server is allowed to push resources to the
client (see :meth:`getpushes() <hyper.HTTP20Connection.getpushes>`).
"""
def __init__(self, host, port=None, window_manager=None, **kwargs):
def __init__(self, host, port=None, window_manager=None, enable_push=False,
**kwargs):
"""
Creates an HTTP/2.0 connection to a specific server.
"""
Expand All @@ -56,6 +61,8 @@ def __init__(self, host, port=None, window_manager=None, **kwargs):
else:
self.host, self.port = host, port

self._enable_push = enable_push
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is fine for now, but it'd be an interesting enhancement to have it be a property that causes the appropriate SETTINGS frame to be emitted.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed.


# Create the mutable state.
self.__wm_class = window_manager or FlowControlManager
self.__init_state()
Expand Down Expand Up @@ -134,6 +141,10 @@ def request(self, method, url, body=None, headers={}):

return stream_id

def _get_stream(self, stream_id):
return (self.streams[stream_id] if stream_id is not None
else self.recent_stream)

def getresponse(self, stream_id=None):
"""
Should be called after a request is sent to get a response from the
Expand All @@ -146,9 +157,27 @@ def getresponse(self, stream_id=None):
get a response.
:returns: A HTTP response object.
"""
stream = (self.streams[stream_id] if stream_id is not None
else self.recent_stream)
return stream.getresponse()
stream = self._get_stream(stream_id)
return HTTP20Response(stream.getheaders(), stream)

def getpushes(self, stream_id=None, capture_all=False):
"""
Returns a generator that yields push promises from the server. Note that
this method is not idempotent; promises returned in one call will not be
returned in subsequent calls. Iterating through generators returned by
multiple calls to this method simultaneously results in undefined
behavior.

:param stream_id: (optional) The stream ID of the request for which to
get push promises.
:param capture_all: If ``False``, the generator will yield all buffered
push promises without blocking. If ``True``, the generator will
first yield all buffered push promises, then yield additional ones
as they arrive, and terminate when the original stream closes.
"""
stream = self._get_stream(stream_id)
for promised_stream_id, headers in stream.getpushes(capture_all):
yield HTTP20Push(headers, self.streams[promised_stream_id])

def connect(self):
"""
Expand All @@ -166,7 +195,7 @@ def connect(self):
# connection, followed by an initial settings frame.
sock.send(b'PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n')
f = SettingsFrame(0)
f.settings[SettingsFrame.ENABLE_PUSH] = 0
f.settings[SettingsFrame.ENABLE_PUSH] = int(self._enable_push)
self._send_cb(f)

# The server will also send an initial settings frame, so get it.
Expand Down Expand Up @@ -209,7 +238,6 @@ def putrequest(self, method, selector, **kwargs):
s.add_header(":path", selector)

# Save the stream.
self.streams[s.stream_id] = s
self.recent_stream = s

return s.stream_id
Expand All @@ -229,9 +257,7 @@ def putheader(self, header, argument, stream_id=None):
header to.
:returns: Nothing.
"""
stream = (self.streams[stream_id] if stream_id is not None
else self.recent_stream)

stream = self._get_stream(stream_id)
stream.add_header(header, argument)

return
Expand All @@ -256,8 +282,7 @@ def endheaders(self, message_body=None, final=False, stream_id=None):
"""
self.connect()

stream = (self.streams[stream_id] if stream_id is not None
else self.recent_stream)
stream = self._get_stream(stream_id)

# Close this if we've been told no more data is coming and we don't
# have any to send.
Expand All @@ -283,9 +308,7 @@ def send(self, data, final=False, stream_id=None):
data on.
:returns: Nothing.
"""
stream = (self.streams[stream_id] if stream_id is not None
else self.recent_stream)

stream = self._get_stream(stream_id)
stream.send_data(data, final)

return
Expand Down Expand Up @@ -340,27 +363,33 @@ def _update_settings(self, frame):

self._settings[SettingsFrame.INITIAL_WINDOW_SIZE] = newsize

def _new_stream(self):
def _new_stream(self, stream_id=None, local_closed=False):
"""
Returns a new stream object for this connection.
"""
window_size = self._settings[SettingsFrame.INITIAL_WINDOW_SIZE]
s = Stream(
self.next_stream_id, self._send_cb, self._recv_cb,
stream_id or self.next_stream_id, self._send_cb, self._recv_cb,
self._close_stream, self.encoder, self.decoder,
self.__wm_class(window_size)
self.__wm_class(window_size), local_closed
)
self.streams[s.stream_id] = s
self.next_stream_id += 2

return s

def _close_stream(self, stream_id):
def _close_stream(self, stream_id, error_code=None):
"""
Called by a stream when it would like to be 'closed'.
"""
if error_code is not None:
f = RstStreamFrame(stream_id)
f.error_code = error_code
self._send_cb(f)

del self.streams[stream_id]

def _send_cb(self, frame):
def _send_cb(self, frame, tolerate_peer_gone=False):
"""
This is the callback used by streams to send data on the connection.

Expand All @@ -387,7 +416,11 @@ def _send_cb(self, frame):
frame.stream_id
)

self._sock.send(data)
try:
self._sock.send(data)
except socket.error as e:
if not tolerate_peer_gone or e.errno not in (errno.EPIPE, errno.ECONNRESET):
raise

def _adjust_receive_window(self, frame_len):
"""
Expand All @@ -399,7 +432,7 @@ def _adjust_receive_window(self, frame_len):
if increment:
f = WindowUpdateFrame(0)
f.window_increment = increment
self._send_cb(f)
self._send_cb(f, True)

return

Expand Down Expand Up @@ -440,6 +473,17 @@ def _recv_cb(self):
# Inform the WindowManager of how much data was received. If the
# manager tells us to increment the window, do so.
self._adjust_receive_window(len(frame.data))
elif isinstance(frame, PushPromiseFrame):
if self._enable_push:
self._new_stream(frame.promised_stream_id, local_closed=True)
else:
# Servers are forbidden from sending push promises when
# the ENABLE_PUSH setting is 0, but the spec leaves the client
# action undefined when they do it anyway. So we just refuse
# the stream and go about our business.
f = RstStreamFrame(frame.promised_stream_id)
f.error_code = 7 # REFUSED_STREAM
self._send_cb(f)

# Work out to whom this frame should go.
if frame.stream_id != 0:
Expand Down
15 changes: 10 additions & 5 deletions hyper/http20/frame.py
Original file line number Diff line number Diff line change
Expand Up @@ -221,14 +221,19 @@ class PushPromiseFrame(Frame):
"""
The PUSH_PROMISE frame is used to notify the peer endpoint in advance of
streams the sender intends to initiate.

Right now hyper doesn't support these, so we treat the body data as totally
opaque, along with the flags.
"""
defined_flags = [('END_PUSH_PROMISE', 0x01)]

type = 0x05

def __init__(self, stream_id):
raise NotImplementedError("hyper doesn't support server push")
def serialize(self):
data = self.build_frame_header(len(self.data) + 4)
data += struct.pack("!L", self.promised_stream_id)
return b''.join([data, self.data])

def parse_body(self, data):
self.promised_stream_id, = struct.unpack("!L", data[:4])
self.data = data[4:]


class PingFrame(Frame):
Expand Down
Loading