Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: set MAX_CONCURRENT_STREAMS to usize::MAX if no value is advertised initially #736

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 11 additions & 4 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -312,9 +312,11 @@ pub struct Builder {
reset_stream_duration: Duration,

/// Initial maximum number of locally initiated (send) streams.
/// After receiving a Settings frame from the remote peer,
/// After receiving a SETTINGS frame from the remote peer,
/// the connection will overwrite this value with the
/// MAX_CONCURRENT_STREAMS specified in the frame.
/// If no value is advertised by the remote peer in the initial SETTINGS
/// frame, it will be set to usize::MAX.
initial_max_send_streams: usize,

/// Initial target window size for new connections.
Expand Down Expand Up @@ -844,8 +846,10 @@ impl Builder {
/// Sets the initial maximum of locally initiated (send) streams.
///
/// The initial settings will be overwritten by the remote peer when
/// the Settings frame is received. The new value will be set to the
/// `max_concurrent_streams()` from the frame.
/// the SETTINGS frame is received. The new value will be set to the
/// `max_concurrent_streams()` from the frame. If no value is advertised in
/// the initial SETTINGS frame from the remote peer as part of
/// [HTTP/2 Connection Preface], `usize::MAX` will be set.
///
/// This setting prevents the caller from exceeding this number of
/// streams that are counted towards the concurrency limit.
Expand All @@ -855,7 +859,10 @@ impl Builder {
///
/// See [Section 5.1.2] in the HTTP/2 spec for more details.
///
/// [Section 5.1.2]: https://http2.github.io/http2-spec/#rfc.section.5.1.2
/// The default value is `usize::MAX`.
///
/// [HTTP/2 Connection Preface]: https://httpwg.org/specs/rfc9113.html#preface
/// [Section 5.1.2]: https://httpwg.org/specs/rfc9113.html#rfc.section.5.1.2
///
/// # Examples
///
Expand Down
18 changes: 16 additions & 2 deletions src/proto/settings.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ pub(crate) struct Settings {
/// the socket first then the settings applied **before** receiving any
/// further frames.
remote: Option<frame::Settings>,
/// Whether the connection has received the initial SETTINGS frame from the
/// remote peer.
has_received_remote_initial_settings: bool,
}

#[derive(Debug)]
Expand All @@ -32,6 +35,7 @@ impl Settings {
// the handshake process.
local: Local::WaitingAck(local),
remote: None,
has_received_remote_initial_settings: false,
}
}

Expand Down Expand Up @@ -96,6 +100,15 @@ impl Settings {
}
}

/// Sets `true` to `self.has_received_remote_initial_settings`.
/// Returns `true` if this method is called for the first time.
/// (i.e. it is the initial SETTINGS frame from the remote peer)
fn mark_remote_initial_settings_as_received(&mut self) -> bool {
let has_received = self.has_received_remote_initial_settings;
self.has_received_remote_initial_settings = true;
!has_received
}

pub(crate) fn poll_send<T, B, C, P>(
&mut self,
cx: &mut Context,
Expand All @@ -108,7 +121,7 @@ impl Settings {
C: Buf,
P: Peer,
{
if let Some(settings) = &self.remote {
if let Some(settings) = self.remote.clone() {
if !dst.poll_ready(cx)?.is_ready() {
return Poll::Pending;
}
Expand All @@ -121,7 +134,8 @@ impl Settings {

tracing::trace!("ACK sent; applying settings");

streams.apply_remote_settings(settings)?;
let is_initial = self.mark_remote_initial_settings_as_received();
streams.apply_remote_settings(&settings, is_initial)?;

if let Some(val) = settings.header_table_size() {
dst.set_send_header_table_size(val as usize);
Expand Down
8 changes: 5 additions & 3 deletions src/proto/streams/counts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,9 +147,11 @@ impl Counts {
self.num_remote_reset_streams -= 1;
}

pub fn apply_remote_settings(&mut self, settings: &frame::Settings) {
if let Some(val) = settings.max_concurrent_streams() {
self.max_send_streams = val as usize;
pub fn apply_remote_settings(&mut self, settings: &frame::Settings, is_initial: bool) {
match settings.max_concurrent_streams() {
Some(val) => self.max_send_streams = val as usize,
None if is_initial => self.max_send_streams = usize::MAX,
None => {}
}
}

Expand Down
8 changes: 6 additions & 2 deletions src/proto/streams/streams.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,14 +186,18 @@ where
me.poll_complete(&self.send_buffer, cx, dst)
}

pub fn apply_remote_settings(&mut self, frame: &frame::Settings) -> Result<(), Error> {
pub fn apply_remote_settings(
&mut self,
frame: &frame::Settings,
is_initial: bool,
) -> Result<(), Error> {
let mut me = self.inner.lock().unwrap();
let me = &mut *me;

let mut send_buffer = self.send_buffer.inner.lock().unwrap();
let send_buffer = &mut *send_buffer;

me.counts.apply_remote_settings(frame);
me.counts.apply_remote_settings(frame, is_initial);

me.actions.send.apply_remote_settings(
frame,
Expand Down
161 changes: 161 additions & 0 deletions tests/h2-tests/tests/client_request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1661,6 +1661,167 @@ async fn client_builder_header_table_size() {
join(srv, h2).await;
}

#[tokio::test]
async fn configured_max_concurrent_send_streams_and_update_it_based_on_empty_settings_frame() {
h2_support::trace_init!();
let (io, mut srv) = mock::new();

let srv = async move {
// Send empty SETTINGS frame (no MAX_CONCURRENT_STREAMS is provided)
srv.send_frame(frames::settings()).await;
};

let h2 = async move {
let (_client, h2) = client::Builder::new()
// Configure the initial value to 2024
.initial_max_send_streams(2024)
.handshake::<_, bytes::Bytes>(io)
.await
.unwrap();
let mut h2 = std::pin::pin!(h2);
// It should be pre-configured value before it receives the initial
// SETTINGS frame from the server
assert_eq!(h2.max_concurrent_send_streams(), 2024);
h2.as_mut().await.unwrap();
// If the server's initial SETTINGS frame does not include
// MAX_CONCURRENT_STREAMS, this should be updated to usize::MAX.
assert_eq!(h2.max_concurrent_send_streams(), usize::MAX);
};

join(srv, h2).await;
}

#[tokio::test]
async fn configured_max_concurrent_send_streams_and_update_it_based_on_non_empty_settings_frame() {
h2_support::trace_init!();
let (io, mut srv) = mock::new();

let srv = async move {
// Send SETTINGS frame with MAX_CONCURRENT_STREAMS set to 42
srv.send_frame(frames::settings().max_concurrent_streams(42))
.await;
};

let h2 = async move {
let (_client, h2) = client::Builder::new()
// Configure the initial value to 2024
.initial_max_send_streams(2024)
.handshake::<_, bytes::Bytes>(io)
.await
.unwrap();
let mut h2 = std::pin::pin!(h2);
// It should be pre-configured value before it receives the initial
// SETTINGS frame from the server
assert_eq!(h2.max_concurrent_send_streams(), 2024);
h2.as_mut().await.unwrap();
// Now the client has received the initial SETTINGS frame from the
// server, which should update the value accordingly
assert_eq!(h2.max_concurrent_send_streams(), 42);
};

join(srv, h2).await;
}

#[tokio::test]
async fn receive_settings_frame_twice_with_second_one_empty() {
h2_support::trace_init!();
let (io, mut srv) = mock::new();

let srv = async move {
// Send the initial SETTINGS frame with MAX_CONCURRENT_STREAMS set to 42
srv.send_frame(frames::settings().max_concurrent_streams(42))
.await;

// Handle the client's connection preface
srv.read_preface().await.unwrap();
match srv.next().await {
Some(frame) => match frame.unwrap() {
h2::frame::Frame::Settings(_) => {
let ack = frame::Settings::ack();
srv.send(ack.into()).await.unwrap();
}
frame => {
panic!("unexpected frame: {:?}", frame);
}
},
None => {
panic!("unexpected EOF");
}
}

// Should receive the ack for the server's initial SETTINGS frame
let frame = assert_settings!(srv.next().await.unwrap().unwrap());
assert!(frame.is_ack());

// Send another SETTINGS frame with no MAX_CONCURRENT_STREAMS
// This should not update the max_concurrent_send_streams value that
// the client manages.
srv.send_frame(frames::settings()).await;
};

let h2 = async move {
let (_client, h2) = client::handshake(io).await.unwrap();
let mut h2 = std::pin::pin!(h2);
assert_eq!(h2.max_concurrent_send_streams(), usize::MAX);
h2.as_mut().await.unwrap();
// Even though the second SETTINGS frame contained no value for
// MAX_CONCURRENT_STREAMS, update to usize::MAX should not happen
assert_eq!(h2.max_concurrent_send_streams(), 42);
};

join(srv, h2).await;
}

#[tokio::test]
async fn receive_settings_frame_twice_with_second_one_non_empty() {
h2_support::trace_init!();
let (io, mut srv) = mock::new();

let srv = async move {
// Send the initial SETTINGS frame with MAX_CONCURRENT_STREAMS set to 42
srv.send_frame(frames::settings().max_concurrent_streams(42))
.await;

// Handle the client's connection preface
srv.read_preface().await.unwrap();
match srv.next().await {
Some(frame) => match frame.unwrap() {
h2::frame::Frame::Settings(_) => {
let ack = frame::Settings::ack();
srv.send(ack.into()).await.unwrap();
}
frame => {
panic!("unexpected frame: {:?}", frame);
}
},
None => {
panic!("unexpected EOF");
}
}

// Should receive the ack for the server's initial SETTINGS frame
let frame = assert_settings!(srv.next().await.unwrap().unwrap());
assert!(frame.is_ack());

// Send another SETTINGS frame with no MAX_CONCURRENT_STREAMS
// This should not update the max_concurrent_send_streams value that
// the client manages.
srv.send_frame(frames::settings().max_concurrent_streams(2024))
.await;
};

let h2 = async move {
let (_client, h2) = client::handshake(io).await.unwrap();
let mut h2 = std::pin::pin!(h2);
assert_eq!(h2.max_concurrent_send_streams(), usize::MAX);
h2.as_mut().await.unwrap();
// The most-recently advertised value should be used
assert_eq!(h2.max_concurrent_send_streams(), 2024);
};

join(srv, h2).await;
}

const SETTINGS: &[u8] = &[0, 0, 0, 4, 0, 0, 0, 0, 0];
const SETTINGS_ACK: &[u8] = &[0, 0, 0, 4, 1, 0, 0, 0, 0];

Expand Down