Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix for a fuzzer-discovered integer underflow of the flow control window size #692

Merged
merged 2 commits into from
Jun 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion src/proto/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,9 @@ where

/// connection flow control
pub(crate) fn set_target_window_size(&mut self, size: WindowSize) {
self.inner.streams.set_target_connection_window_size(size);
let _res = self.inner.streams.set_target_connection_window_size(size);
// TODO: proper error handling
debug_assert!(_res.is_ok());

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are some debug left around here and in other files. Are these on purpose?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. Whenever it was obvious on how to handle a potential overflow, I did. However, many of the call sites do not have a Result return type, so it is not easy to propagate the Error. I assume that those functions should not be able not fail. I did not want to incur additional overhead in release builds, so I stuck with debug_assert! and left the TODO comment. For those TODOs someone with more experience with the code should check whether they are fine to be left as debug_assert!, whether they should be checked with assert! or whether error propagation is needed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI: I did more fuzzing runs with this patch and none of the debug_assert! are triggered, so that gives some assurance that the they are indeed not reachable. So I think we could remove the todo comments, but I would leave the debug_assert! as is.

}

/// Send a new SETTINGS frame with an updated initial window size.
Expand Down
2 changes: 1 addition & 1 deletion src/proto/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ pub type PingPayload = [u8; 8];
pub type WindowSize = u32;

// Constants
pub const MAX_WINDOW_SIZE: WindowSize = (1 << 31) - 1;
pub const MAX_WINDOW_SIZE: WindowSize = (1 << 31) - 1; // i32::MAX as u32
pub const DEFAULT_REMOTE_RESET_STREAM_MAX: usize = 20;
pub const DEFAULT_RESET_STREAM_MAX: usize = 10;
pub const DEFAULT_RESET_STREAM_SECS: u64 = 30;
Expand Down
73 changes: 40 additions & 33 deletions src/proto/streams/flow_control.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,12 +75,12 @@ impl FlowControl {
self.window_size > self.available
}

pub fn claim_capacity(&mut self, capacity: WindowSize) {
self.available -= capacity;
pub fn claim_capacity(&mut self, capacity: WindowSize) -> Result<(), Reason> {
self.available.decrease_by(capacity)
}

pub fn assign_capacity(&mut self, capacity: WindowSize) {
self.available += capacity;
pub fn assign_capacity(&mut self, capacity: WindowSize) -> Result<(), Reason> {
self.available.increase_by(capacity)
}

/// If a WINDOW_UPDATE frame should be sent, returns a positive number
Expand Down Expand Up @@ -136,36 +136,38 @@ impl FlowControl {
///
/// This is called after receiving a SETTINGS frame with a lower
/// INITIAL_WINDOW_SIZE value.
pub fn dec_send_window(&mut self, sz: WindowSize) {
pub fn dec_send_window(&mut self, sz: WindowSize) -> Result<(), Reason> {
tracing::trace!(
"dec_window; sz={}; window={}, available={}",
sz,
self.window_size,
self.available
);
// This should not be able to overflow `window_size` from the bottom.
self.window_size -= sz;
// ~~This should not be able to overflow `window_size` from the bottom.~~ wrong. it can.
self.window_size.decrease_by(sz)?;
Ok(())
}

/// Decrement the recv-side window size.
///
/// This is called after receiving a SETTINGS ACK frame with a lower
/// INITIAL_WINDOW_SIZE value.
pub fn dec_recv_window(&mut self, sz: WindowSize) {
pub fn dec_recv_window(&mut self, sz: WindowSize) -> Result<(), Reason> {
tracing::trace!(
"dec_recv_window; sz={}; window={}, available={}",
sz,
self.window_size,
self.available
);
// This should not be able to overflow `window_size` from the bottom.
self.window_size -= sz;
self.available -= sz;
self.window_size.decrease_by(sz)?;
self.available.decrease_by(sz)?;
Ok(())
}

/// Decrements the window reflecting data has actually been sent. The caller
/// must ensure that the window has capacity.
pub fn send_data(&mut self, sz: WindowSize) {
pub fn send_data(&mut self, sz: WindowSize) -> Result<(), Reason> {
tracing::trace!(
"send_data; sz={}; window={}; available={}",
sz,
Expand All @@ -176,12 +178,13 @@ impl FlowControl {
// If send size is zero it's meaningless to update flow control window
if sz > 0 {
// Ensure that the argument is correct
assert!(self.window_size >= sz as usize);
assert!(self.window_size.0 >= sz as i32);

// Update values
self.window_size -= sz;
self.available -= sz;
self.window_size.decrease_by(sz)?;
self.available.decrease_by(sz)?;
}
Ok(())
}
}

Expand All @@ -208,6 +211,29 @@ impl Window {
assert!(self.0 >= 0, "negative Window");
self.0 as WindowSize
}

pub fn decrease_by(&mut self, other: WindowSize) -> Result<(), Reason> {
if let Some(v) = self.0.checked_sub(other as i32) {
self.0 = v;
Ok(())
} else {
Err(Reason::FLOW_CONTROL_ERROR)
}
}

pub fn increase_by(&mut self, other: WindowSize) -> Result<(), Reason> {
let other = self.add(other)?;
self.0 = other.0;
Ok(())
}

pub fn add(&self, other: WindowSize) -> Result<Self, Reason> {
if let Some(v) = self.0.checked_add(other as i32) {
Ok(Self(v))
} else {
Err(Reason::FLOW_CONTROL_ERROR)
}
}
}

impl PartialEq<usize> for Window {
Expand All @@ -230,25 +256,6 @@ impl PartialOrd<usize> for Window {
}
}

impl ::std::ops::SubAssign<WindowSize> for Window {
fn sub_assign(&mut self, other: WindowSize) {
self.0 -= other as i32;
}
}

impl ::std::ops::Add<WindowSize> for Window {
type Output = Self;
fn add(self, other: WindowSize) -> Self::Output {
Window(self.0 + other as i32)
}
}

impl ::std::ops::AddAssign<WindowSize> for Window {
fn add_assign(&mut self, other: WindowSize) {
self.0 += other as i32;
}
}

impl fmt::Display for Window {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
fmt::Display::fmt(&self.0, f)
Expand Down
32 changes: 24 additions & 8 deletions src/proto/streams/prioritize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,9 @@ impl Prioritize {
flow.inc_window(config.remote_init_window_sz)
.expect("invalid initial window size");

flow.assign_capacity(config.remote_init_window_sz);
// TODO: proper error handling
let _res = flow.assign_capacity(config.remote_init_window_sz);
debug_assert!(_res.is_ok());

tracing::trace!("Prioritize::new; flow={:?}", flow);

Expand Down Expand Up @@ -253,7 +255,9 @@ impl Prioritize {
if available as usize > capacity {
let diff = available - capacity as WindowSize;

stream.send_flow.claim_capacity(diff);
// TODO: proper error handling
let _res = stream.send_flow.claim_capacity(diff);
debug_assert!(_res.is_ok());

self.assign_connection_capacity(diff, stream, counts);
}
Expand Down Expand Up @@ -324,7 +328,9 @@ impl Prioritize {
pub fn reclaim_all_capacity(&mut self, stream: &mut store::Ptr, counts: &mut Counts) {
let available = stream.send_flow.available().as_size();
if available > 0 {
stream.send_flow.claim_capacity(available);
// TODO: proper error handling
let _res = stream.send_flow.claim_capacity(available);
debug_assert!(_res.is_ok());
// Re-assign all capacity to the connection
self.assign_connection_capacity(available, stream, counts);
}
Expand All @@ -337,7 +343,9 @@ impl Prioritize {
if stream.requested_send_capacity as usize > stream.buffered_send_data {
let reserved = stream.requested_send_capacity - stream.buffered_send_data as WindowSize;

stream.send_flow.claim_capacity(reserved);
// TODO: proper error handling
let _res = stream.send_flow.claim_capacity(reserved);
debug_assert!(_res.is_ok());
self.assign_connection_capacity(reserved, stream, counts);
}
}
Expand All @@ -363,7 +371,9 @@ impl Prioritize {
let span = tracing::trace_span!("assign_connection_capacity", inc);
let _e = span.enter();

self.flow.assign_capacity(inc);
// TODO: proper error handling
let _res = self.flow.assign_capacity(inc);
debug_assert!(_res.is_ok());

// Assign newly acquired capacity to streams pending capacity.
while self.flow.available() > 0 {
Expand Down Expand Up @@ -443,7 +453,9 @@ impl Prioritize {
stream.assign_capacity(assign, self.max_buffer_size);

// Claim the capacity from the connection
self.flow.claim_capacity(assign);
// TODO: proper error handling
let _res = self.flow.claim_capacity(assign);
debug_assert!(_res.is_ok());
}

tracing::trace!(
Expand Down Expand Up @@ -763,12 +775,16 @@ impl Prioritize {
// Assign the capacity back to the connection that
// was just consumed from the stream in the previous
// line.
self.flow.assign_capacity(len);
// TODO: proper error handling
let _res = self.flow.assign_capacity(len);
debug_assert!(_res.is_ok());
});

let (eos, len) = tracing::trace_span!("updating connection flow")
.in_scope(|| {
self.flow.send_data(len);
// TODO: proper error handling
let _res = self.flow.send_data(len);
debug_assert!(_res.is_ok());

// Wrap the frame's data payload to ensure that the
// correct amount of data gets written.
Expand Down
49 changes: 36 additions & 13 deletions src/proto/streams/recv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ impl Recv {
// settings
flow.inc_window(DEFAULT_INITIAL_WINDOW_SIZE)
.expect("invalid initial remote window size");
flow.assign_capacity(DEFAULT_INITIAL_WINDOW_SIZE);
flow.assign_capacity(DEFAULT_INITIAL_WINDOW_SIZE).unwrap();
f0rki marked this conversation as resolved.
Show resolved Hide resolved

Recv {
init_window_sz: config.local_init_window_sz,
Expand Down Expand Up @@ -354,7 +354,9 @@ impl Recv {
self.in_flight_data -= capacity;

// Assign capacity to connection
self.flow.assign_capacity(capacity);
// TODO: proper error handling
let _res = self.flow.assign_capacity(capacity);
debug_assert!(_res.is_ok());

if self.flow.unclaimed_capacity().is_some() {
if let Some(task) = task.take() {
Expand Down Expand Up @@ -382,7 +384,9 @@ impl Recv {
stream.in_flight_recv_data -= capacity;

// Assign capacity to stream
stream.recv_flow.assign_capacity(capacity);
// TODO: proper error handling
let _res = stream.recv_flow.assign_capacity(capacity);
debug_assert!(_res.is_ok());

if stream.recv_flow.unclaimed_capacity().is_some() {
// Queue the stream for sending the WINDOW_UPDATE frame.
Expand Down Expand Up @@ -428,7 +432,11 @@ impl Recv {
///
/// The `task` is an optional parked task for the `Connection` that might
/// be blocked on needing more window capacity.
pub fn set_target_connection_window(&mut self, target: WindowSize, task: &mut Option<Waker>) {
pub fn set_target_connection_window(
&mut self,
target: WindowSize,
task: &mut Option<Waker>,
) -> Result<(), Reason> {
tracing::trace!(
"set_target_connection_window; target={}; available={}, reserved={}",
target,
Expand All @@ -441,11 +449,15 @@ impl Recv {
//
// Update the flow controller with the difference between the new
// target and the current target.
let current = (self.flow.available() + self.in_flight_data).checked_size();
let current = self
.flow
.available()
.add(self.in_flight_data)?
.checked_size();
if target > current {
self.flow.assign_capacity(target - current);
self.flow.assign_capacity(target - current)?;
} else {
self.flow.claim_capacity(current - target);
self.flow.claim_capacity(current - target)?;
}

// If changing the target capacity means we gained a bunch of capacity,
Expand All @@ -456,6 +468,7 @@ impl Recv {
task.wake();
}
}
Ok(())
}

pub(crate) fn apply_local_settings(
Expand Down Expand Up @@ -495,9 +508,13 @@ impl Recv {
let dec = old_sz - target;
tracing::trace!("decrementing all windows; dec={}", dec);

store.for_each(|mut stream| {
stream.recv_flow.dec_recv_window(dec);
})
store.try_for_each(|mut stream| {
stream
.recv_flow
.dec_recv_window(dec)
.map_err(proto::Error::library_go_away)?;
Ok::<_, proto::Error>(())
})?;
}
Ordering::Greater => {
// We must increase the (local) window on every open stream.
Expand All @@ -510,7 +527,10 @@ impl Recv {
.recv_flow
.inc_window(inc)
.map_err(proto::Error::library_go_away)?;
stream.recv_flow.assign_capacity(inc);
stream
.recv_flow
.assign_capacity(inc)
.map_err(proto::Error::library_go_away)?;
Ok::<_, proto::Error>(())
})?;
}
Expand Down Expand Up @@ -617,7 +637,10 @@ impl Recv {
}

// Update stream level flow control
stream.recv_flow.send_data(sz);
stream
.recv_flow
.send_data(sz)
.map_err(proto::Error::library_go_away)?;

// Track the data as in-flight
stream.in_flight_recv_data += sz;
Expand Down Expand Up @@ -658,7 +681,7 @@ impl Recv {
}

// Update connection level flow control
self.flow.send_data(sz);
self.flow.send_data(sz).map_err(Error::library_go_away)?;

// Track the data as in-flight
self.in_flight_data += sz;
Expand Down
Loading