Skip to content

Commit

Permalink
fix: set MAX_CONCURRENT_STREAMS to usize::MAX if no value is advertis…
Browse files Browse the repository at this point in the history
…ed initially
  • Loading branch information
magurotuna authored and seanmonstar committed Jan 13, 2024
1 parent 66a1ed8 commit d2f09fb
Show file tree
Hide file tree
Showing 4 changed files with 188 additions and 7 deletions.
18 changes: 16 additions & 2 deletions src/proto/settings.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ pub(crate) struct Settings {
/// the socket first then the settings applied **before** receiving any
/// further frames.
remote: Option<frame::Settings>,
/// Whether the connection has received the initial SETTINGS frame from the
/// remote peer.
has_received_remote_initial_settings: bool,
}

#[derive(Debug)]
Expand All @@ -32,6 +35,7 @@ impl Settings {
// the handshake process.
local: Local::WaitingAck(local),
remote: None,
has_received_remote_initial_settings: false,
}
}

Expand Down Expand Up @@ -96,6 +100,15 @@ impl Settings {
}
}

/// Sets `true` to `self.has_received_remote_initial_settings`.
/// Returns `true` if this method is called for the first time.
/// (i.e. it is the initial SETTINGS frame from the remote peer)
fn mark_remote_initial_settings_as_received(&mut self) -> bool {
let has_received = self.has_received_remote_initial_settings;
self.has_received_remote_initial_settings = true;
!has_received
}

pub(crate) fn poll_send<T, B, C, P>(
&mut self,
cx: &mut Context,
Expand All @@ -108,7 +121,7 @@ impl Settings {
C: Buf,
P: Peer,
{
if let Some(settings) = &self.remote {
if let Some(settings) = self.remote.clone() {
if !dst.poll_ready(cx)?.is_ready() {
return Poll::Pending;
}
Expand All @@ -121,7 +134,8 @@ impl Settings {

tracing::trace!("ACK sent; applying settings");

streams.apply_remote_settings(settings)?;
let is_initial = self.mark_remote_initial_settings_as_received();
streams.apply_remote_settings(&settings, is_initial)?;

if let Some(val) = settings.header_table_size() {
dst.set_send_header_table_size(val as usize);
Expand Down
8 changes: 5 additions & 3 deletions src/proto/streams/counts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,9 +147,11 @@ impl Counts {
self.num_remote_reset_streams -= 1;
}

pub fn apply_remote_settings(&mut self, settings: &frame::Settings) {
if let Some(val) = settings.max_concurrent_streams() {
self.max_send_streams = val as usize;
pub fn apply_remote_settings(&mut self, settings: &frame::Settings, is_initial: bool) {
match settings.max_concurrent_streams() {
Some(val) => self.max_send_streams = val as usize,
None if is_initial => self.max_send_streams = usize::MAX,
None => {}
}
}

Expand Down
8 changes: 6 additions & 2 deletions src/proto/streams/streams.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,14 +186,18 @@ where
me.poll_complete(&self.send_buffer, cx, dst)
}

pub fn apply_remote_settings(&mut self, frame: &frame::Settings) -> Result<(), Error> {
pub fn apply_remote_settings(
&mut self,
frame: &frame::Settings,
is_initial: bool,
) -> Result<(), Error> {
let mut me = self.inner.lock().unwrap();
let me = &mut *me;

let mut send_buffer = self.send_buffer.inner.lock().unwrap();
let send_buffer = &mut *send_buffer;

me.counts.apply_remote_settings(frame);
me.counts.apply_remote_settings(frame, is_initial);

me.actions.send.apply_remote_settings(
frame,
Expand Down
161 changes: 161 additions & 0 deletions tests/h2-tests/tests/client_request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1661,6 +1661,167 @@ async fn client_builder_header_table_size() {
join(srv, h2).await;
}

#[tokio::test]
async fn configured_max_concurrent_send_streams_and_update_it_based_on_empty_settings_frame() {
h2_support::trace_init!();
let (io, mut srv) = mock::new();

let srv = async move {
// Send empty SETTINGS frame (no MAX_CONCURRENT_STREAMS is provided)
srv.send_frame(frames::settings()).await;
};

let h2 = async move {
let (_client, h2) = client::Builder::new()
// Configure the initial value to 2024
.initial_max_send_streams(2024)
.handshake::<_, bytes::Bytes>(io)
.await
.unwrap();
let mut h2 = std::pin::pin!(h2);
// It should be pre-configured value before it receives the initial
// SETTINGS frame from the server
assert_eq!(h2.max_concurrent_send_streams(), 2024);
h2.as_mut().await.unwrap();
// If the server's initial SETTINGS frame does not include
// MAX_CONCURRENT_STREAMS, this should be updated to usize::MAX.
assert_eq!(h2.max_concurrent_send_streams(), usize::MAX);
};

join(srv, h2).await;
}

#[tokio::test]
async fn configured_max_concurrent_send_streams_and_update_it_based_on_non_empty_settings_frame() {
h2_support::trace_init!();
let (io, mut srv) = mock::new();

let srv = async move {
// Send SETTINGS frame with MAX_CONCURRENT_STREAMS set to 42
srv.send_frame(frames::settings().max_concurrent_streams(42))
.await;
};

let h2 = async move {
let (_client, h2) = client::Builder::new()
// Configure the initial value to 2024
.initial_max_send_streams(2024)
.handshake::<_, bytes::Bytes>(io)
.await
.unwrap();
let mut h2 = std::pin::pin!(h2);
// It should be pre-configured value before it receives the initial
// SETTINGS frame from the server
assert_eq!(h2.max_concurrent_send_streams(), 2024);
h2.as_mut().await.unwrap();
// Now the client has received the initial SETTINGS frame from the
// server, which should update the value accordingly
assert_eq!(h2.max_concurrent_send_streams(), 42);
};

join(srv, h2).await;
}

#[tokio::test]
async fn receive_settings_frame_twice_with_second_one_empty() {
h2_support::trace_init!();
let (io, mut srv) = mock::new();

let srv = async move {
// Send the initial SETTINGS frame with MAX_CONCURRENT_STREAMS set to 42
srv.send_frame(frames::settings().max_concurrent_streams(42))
.await;

// Handle the client's connection preface
srv.read_preface().await.unwrap();
match srv.next().await {
Some(frame) => match frame.unwrap() {
h2::frame::Frame::Settings(_) => {
let ack = frame::Settings::ack();
srv.send(ack.into()).await.unwrap();
}
frame => {
panic!("unexpected frame: {:?}", frame);
}
},
None => {
panic!("unexpected EOF");
}
}

// Should receive the ack for the server's initial SETTINGS frame
let frame = assert_settings!(srv.next().await.unwrap().unwrap());
assert!(frame.is_ack());

// Send another SETTINGS frame with no MAX_CONCURRENT_STREAMS
// This should not update the max_concurrent_send_streams value that
// the client manages.
srv.send_frame(frames::settings()).await;
};

let h2 = async move {
let (_client, h2) = client::handshake(io).await.unwrap();
let mut h2 = std::pin::pin!(h2);
assert_eq!(h2.max_concurrent_send_streams(), usize::MAX);
h2.as_mut().await.unwrap();
// Even though the second SETTINGS frame contained no value for
// MAX_CONCURRENT_STREAMS, update to usize::MAX should not happen
assert_eq!(h2.max_concurrent_send_streams(), 42);
};

join(srv, h2).await;
}

#[tokio::test]
async fn receive_settings_frame_twice_with_second_one_non_empty() {
h2_support::trace_init!();
let (io, mut srv) = mock::new();

let srv = async move {
// Send the initial SETTINGS frame with MAX_CONCURRENT_STREAMS set to 42
srv.send_frame(frames::settings().max_concurrent_streams(42))
.await;

// Handle the client's connection preface
srv.read_preface().await.unwrap();
match srv.next().await {
Some(frame) => match frame.unwrap() {
h2::frame::Frame::Settings(_) => {
let ack = frame::Settings::ack();
srv.send(ack.into()).await.unwrap();
}
frame => {
panic!("unexpected frame: {:?}", frame);
}
},
None => {
panic!("unexpected EOF");
}
}

// Should receive the ack for the server's initial SETTINGS frame
let frame = assert_settings!(srv.next().await.unwrap().unwrap());
assert!(frame.is_ack());

// Send another SETTINGS frame with no MAX_CONCURRENT_STREAMS
// This should not update the max_concurrent_send_streams value that
// the client manages.
srv.send_frame(frames::settings().max_concurrent_streams(2024))
.await;
};

let h2 = async move {
let (_client, h2) = client::handshake(io).await.unwrap();
let mut h2 = std::pin::pin!(h2);
assert_eq!(h2.max_concurrent_send_streams(), usize::MAX);
h2.as_mut().await.unwrap();
// The most-recently advertised value should be used
assert_eq!(h2.max_concurrent_send_streams(), 2024);
};

join(srv, h2).await;
}

const SETTINGS: &[u8] = &[0, 0, 0, 4, 0, 0, 0, 0, 0];
const SETTINGS_ACK: &[u8] = &[0, 0, 0, 4, 1, 0, 0, 0, 0];

Expand Down

0 comments on commit d2f09fb

Please sign in to comment.