Skip to content

Commit

Permalink
docs(h2mux): add doc comments & publish v0.0.1
Browse files Browse the repository at this point in the history
  • Loading branch information
Gowee committed Sep 14, 2023
1 parent 50d541f commit 59aa158
Show file tree
Hide file tree
Showing 7 changed files with 186 additions and 381 deletions.
11 changes: 8 additions & 3 deletions h2mux/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,13 +1,18 @@
[package]
name = "h2mux"
version = "0.1.0"
name = "tokio-h2mux"
version = "0.0.1"
edition = "2021"
description = "Stream multiplexing via HTTP/2 (like yamux)"
homepage = "https://github.com/Gowee/noisy-shuttle/tree/main/h2mux"
license = "MIT"
keywords = [ "http2", "networking" ]
categories = [ "network-programming" ]

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
bytes = "1.4.0"
h2 = "0.3"
h2 = "0.3.21"
http = "0.2.9"
tokio = { version = "1.29", features = ["time"] }
thiserror = "1.0"
Expand Down
136 changes: 72 additions & 64 deletions h2mux/src/client.rs
Original file line number Diff line number Diff line change
@@ -1,51 +1,40 @@
// use pin_project_lite::pin_project;
use std::error::Error as StdError;
use std::future::poll_fn;
use std::future::Future;
use std::future::Pending;
use std::io::{self, Cursor, IoSlice};
//! Client implementation of h2mux.
use std::future::{poll_fn, Future};
use std::io;
use std::mem;
use std::pin::Pin;
use std::task::{self, ready, Context, Poll};
use std::task::{Context, Poll};

use bytes::{Buf, Bytes};
use bytes::Bytes;
use h2::client::{ResponseFuture, SendRequest};
use h2::{Reason, RecvStream, SendStream};
use http::header::{HeaderName, CONNECTION, TE, TRAILER, TRANSFER_ENCODING, UPGRADE};
use h2::SendStream;
use http::response::Parts;
use http::{request, HeaderMap, Request};
use thiserror::Error;
use http::Request;
use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
use tokio::time::Duration;
use tracing::{debug, info, trace, warn};
use tracing::{debug, warn};

use crate::ping::{self, Ponged, Recorder};
use crate::stream::{
poll_read, poll_shutdown, poll_write, H2Upgraded, SendBuf, UpgradedSendStream,
};
use crate::utils::H2MapIoErr;
use crate::stream::{poll_shutdown, poll_write, H2Stream, H2Upgraded, SendBuf, UpgradedSendStream};
use crate::utils::h2_to_io_error;
use crate::SPEC_WINDOW_SIZE;

#[derive(Error, Debug)]
pub enum Error {
#[error("h2 layer error")]
H2Error(#[from] h2::Error),
#[error("pong timed out")]
KeepAliveTimedOut,
}

// pub type Result = Result

/// Client connection that wraps underlying I/O resource.
///
/// Sub-streams may be opened with [`Control`]. The object must be drived persistently with
/// [`Connection::poll`].
pub struct Connection<IO: AsyncRead + AsyncWrite + Unpin> {
conn: h2::client::Connection<IO, SendBuf<Bytes>>,
ponger: Option<ping::Ponger>,
}

/// Controller of [`Connection`].
pub struct Control {
send_request: SendRequest<SendBuf<Bytes>>,
ping: Recorder,
}

/// Builder of client connection with custom configurations.
#[derive(Clone, Debug)]
pub struct Builder {
proto_builder: h2::client::Builder,
Expand All @@ -55,11 +44,15 @@ pub struct Builder {
keep_alive_while_idle: bool,
}

pub struct PendingStream(EPendingStream);
/// Stream multiplexed over a HTTP/2 connection.
///
/// It exists as a prior state of [`H2Stream`] that might not be accepted by the server yet. It can
/// be converted into `H2Stream` as soon as the server responds.
pub struct InFlightH2Stream(StreamInner);

enum EPendingStream {
enum StreamInner {
Pending(SendStream<SendBuf<Bytes>>, ResponseFuture, Recorder),
Ready(H2Upgraded<Bytes>, Parts),
Ready(Parts, H2Stream),
Poisoned,
}

Expand All @@ -70,7 +63,7 @@ impl<IO: AsyncRead + AsyncWrite + Unpin> Future for Connection<IO> {
if let Some(ponger) = &mut self.ponger {
match ponger.poll(cx) {
Poll::Ready(Ponged::SizeUpdate(wnd)) => {
info!(wnd = wnd, "New window size calculated");
debug!(wnd = wnd, "New window size calculated");
self.conn.set_target_window_size(wnd);
self.conn.set_initial_window_size(wnd)?;
}
Expand All @@ -88,12 +81,19 @@ impl<IO: AsyncRead + AsyncWrite + Unpin> Future for Connection<IO> {
}

impl Control {
pub async fn open_stream(&mut self, request: Request<()>) -> Result<PendingStream, Error> {
use EPendingStream::*;
/// Open a new sub stream with custom request headers.
///
/// The content of request uri, method and headers has nothing to with the functionality of
/// h2mux. The caller may store any information or leave it to be [`Default::default`].
pub async fn open_stream(
&mut self,
request: Request<()>,
) -> Result<InFlightH2Stream, crate::Error> {
use StreamInner::*;

poll_fn(|cx: &mut Context<'_>| self.send_request.poll_ready(cx)).await?;
let (response_fut, send_stream) = self.send_request.send_request(request, false)?;
Ok(PendingStream(Pending(
Ok(InFlightH2Stream(Pending(
send_stream,
response_fut,
self.ping.clone(),
Expand All @@ -115,6 +115,7 @@ impl Default for Builder {

// Some methods are ported from hyper (licensed under MIT).
impl Builder {
/// Create a builder from a h2 builder.
pub fn new(proto_builder: h2::client::Builder) -> Self {
Self {
proto_builder,
Expand Down Expand Up @@ -172,10 +173,11 @@ impl Builder {
self
}

/// Perform h2c handshake over an I/O resource (typically a TLS stream).
pub async fn handshake<IO: AsyncRead + AsyncWrite + Unpin>(
&self,
io: IO,
) -> Result<(Control, Connection<IO>), Error> {
) -> Result<(Control, Connection<IO>), crate::Error> {
let (send_request, mut conn) = self
.proto_builder
.handshake::<_, SendBuf<Bytes>>(io)
Expand Down Expand Up @@ -208,40 +210,43 @@ impl Builder {
}
}

impl PendingStream {
pub async fn into_ready(self) -> Result<(H2Upgraded<Bytes>, Parts), Error> {
impl InFlightH2Stream {
/// Convert the inflight stream into an ready [`H2Stream`] and retrieve back the response headers.
pub async fn into_ready(self) -> Result<(Parts, H2Stream), h2::Error> {
match self.0 {
EPendingStream::Pending(send_stream, response_fut, ping) => {
StreamInner::Pending(send_stream, response_fut, ping) => {
let (head, recv_stream) = response_fut.await?.into_parts();
Ok((
head,
H2Upgraded {
ping,
send_stream: unsafe { UpgradedSendStream::new(send_stream) },
recv_stream,
buf: Bytes::new(),
},
head,
))
}
EPendingStream::Ready(stream, parts) => Ok((stream, parts)),
StreamInner::Ready(head, stream) => Ok((head, stream)),
_ => unreachable!(),
}
}
pub fn try_into_ready(self) -> Option<(H2Upgraded<Bytes>, Parts)> {

/// Try to convert the inflight stream into an [`H2Stream`] and retrieve back the response headers, if they are ready.
pub fn try_into_ready(self) -> Option<(Parts, H2Stream)> {
match self.0 {
EPendingStream::Ready(stream, parts) => Some((stream, parts)),
StreamInner::Ready(head, stream) => Some((head, stream)),
_ => None,
}
}
}

impl AsyncRead for PendingStream {
impl AsyncRead for InFlightH2Stream {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<io::Result<()>> {
use EPendingStream::*;
use StreamInner::*;

let this = self.get_mut();
loop {
Expand All @@ -254,32 +259,33 @@ impl AsyncRead for PendingStream {
let (head, recv_stream) = response.into_parts();
// TODO: handle head
this.0 = Ready(
head,
H2Upgraded {
ping,
send_stream: unsafe { UpgradedSendStream::new(send_stream) },
recv_stream,
buf: Bytes::new(),
},
head,
);
}
Poll::Ready(Err(e)) => {
// TODO: give back before?
ping.ensure_not_timed_out()
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;

debug!("client response error: {}", e);
this.0 = Pending(send_stream, response_fut, ping);
return Poll::Ready(Err(e).map_io_err());
return Poll::Ready(Err(h2_to_io_error(e)));
}
Poll::Pending => {
this.0 = Pending(send_stream, response_fut, ping);
return Poll::Pending;
}
}
}
Ready(mut stream, parts) => {
Ready(head, mut stream) => {
let r = Pin::new(&mut stream).poll_read(cx, buf);
this.0 = Ready(stream, parts);
this.0 = Ready(head, stream);
return r;
}
Poisoned => {
Expand All @@ -290,13 +296,13 @@ impl AsyncRead for PendingStream {
}
}

impl AsyncWrite for PendingStream {
impl AsyncWrite for InFlightH2Stream {
fn poll_write(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<Result<usize, io::Error>> {
use EPendingStream::*;
use StreamInner::*;
let this = self.get_mut();
match mem::replace(&mut this.0, Poisoned) {
Pending(send_stream, response_fut, ping) => {
Expand All @@ -305,9 +311,9 @@ impl AsyncWrite for PendingStream {
this.0 = Pending(unsafe { send_stream.into() }, response_fut, ping);
r
}
Ready(mut stream, parts) => {
Ready(head, mut stream) => {
let r = Pin::new(&mut stream).poll_write(cx, buf);
this.0 = Ready(stream, parts);
this.0 = Ready(head, stream);
r
}
Poisoned => {
Expand All @@ -316,12 +322,12 @@ impl AsyncWrite for PendingStream {
}
}

fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
Poll::Ready(Ok(()))
}

fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
use EPendingStream::*;
use StreamInner::*;

let this = self.get_mut();
match mem::replace(&mut this.0, Poisoned) {
Expand All @@ -331,9 +337,9 @@ impl AsyncWrite for PendingStream {
this.0 = Pending(unsafe { send_stream.into() }, response_fut, ping);
r
}
Ready(mut stream, parts) => {
Ready(head, mut stream) => {
let r = Pin::new(&mut stream).poll_shutdown(cx);
this.0 = Ready(stream, parts);
this.0 = Ready(head, stream);
r
}
Poisoned => {
Expand All @@ -343,14 +349,16 @@ impl AsyncWrite for PendingStream {
}
}

/// Perform h2c handshake over an I/O resource (typically a TLS stream).
///
/// It is a shortcut for [`Builder::handshake`] with default configs.
///
/// # Note
/// The default configs leaves initial connection/stream window size to the default spec value of
/// 64KiB, which is too small for general network environment. It is possible to specify reasonable
/// values (e.g. 15MiB/6MiB in [Chromium](https://source.chromium.org/search?q=kSpdySessionMaxRecvWindowSize)) or activate adaptive window with [`Builder`].
pub async fn handshake<IO: AsyncRead + AsyncWrite + Unpin>(
io: IO,
) -> Result<(Control, Connection<IO>), Error> {
Builder::default()
.adaptive_window(true)
.keep_alive_interval(Some(Duration::from_secs(5)))
.keep_alive_timeout(Duration::from_secs(20))
.keep_alive_while_idle(true)
.handshake(io)
.await
) -> Result<(Control, Connection<IO>), crate::Error> {
Builder::default().handshake(io).await
}
35 changes: 12 additions & 23 deletions h2mux/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,35 +1,24 @@
// use pin_project_lite::pin_project;
use std::error::Error as StdError;
use std::future::poll_fn;
use std::future::Future;
use std::future::Pending;
use std::io::{self, Cursor, IoSlice};
use std::mem;
use std::pin::Pin;
use std::task::{self, ready, Context, Poll};
//! Multiplexing streams over HTTP/2, built upon [h2](https://docs.rs/h2/).
//!
//! Unlike streams opened via the standard HTTP CONNECT method, h2mux streams can be written to
//! with data immediately after being opened by the client, without waiting 1 extra RTT for the
//! server to respond to the request.
//!
//! It supports auto-scaling HTTP/2 window size based on BDP estimation, ported from [hyper](https://docs.rs/hyper/)
//! with some params tuned.

use bytes::{Buf, Bytes};
use h2::client::{ResponseFuture, SendRequest};
use h2::{Reason, RecvStream, SendStream};
use http::header::{HeaderName, CONNECTION, TE, TRAILER, TRANSFER_ENCODING, UPGRADE};
use http::response::Parts;
use http::{request, HeaderMap, Request};
use ping::{Ponged, Recorder};
use stream::SendBuf;
use thiserror::Error;
use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
use tracing::{debug, trace, warn};

pub mod client;
mod ping;
pub mod server;

mod ping;
mod stream;
mod utils;

pub use crate::stream::H2Upgraded;
use crate::utils::H2MapIoErr;
pub use crate::{client::InFlightH2Stream, stream::H2Stream};

/// hyper::proto::h2: Default initial stream window size defined in HTTP2 spec.
/// hyper::proto::h2::SPEC_WINDOW_SIZE: Default initial stream window size defined in HTTP2 spec.
const SPEC_WINDOW_SIZE: u32 = 65_535;

#[derive(Error, Debug)]
Expand Down
Loading

0 comments on commit 59aa158

Please sign in to comment.