Skip to content

Commit

Permalink
lib: window-based flow control
Browse files Browse the repository at this point in the history
This PR reimplments #529 which tried to implement a QUIC flow
control specified in https://docs.google.com/document/d/1F2YfdDXKpy20WVKJueEf4abn_LVZHhMUMS5gX6Pgjl4/edit?usp=sharing

It's basically window-based approach where window size is determined
by current receiving speed.

Since lot of things changed from #529, I tried to minimize the
changes in the existing code. Most of the logic is in src/flowcontrol.rs
and it will replace max_data and max_data_next logic.
  • Loading branch information
junhochoi committed Sep 12, 2021
1 parent 37bd501 commit 92e025a
Show file tree
Hide file tree
Showing 3 changed files with 313 additions and 39 deletions.
217 changes: 217 additions & 0 deletions src/flowcontrol.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,217 @@
// Copyright (C) 2021, Cloudflare, Inc.
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are
// met:
//
// * Redistributions of source code must retain the above copyright notice,
// this list of conditions and the following disclaimer.
//
// * Redistributions in binary form must reproduce the above copyright
// notice, this list of conditions and the following disclaimer in the
// documentation and/or other materials provided with the distribution.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
// IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
// THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

use std::time::Duration;
use std::time::Instant;

// When autotuning the receiver window, decide how much
// we increase the window.
const WINDOW_INCREASE_FACTOR: u64 = 2;

// When autotuning the receiver window, check if the last
// update is within RTT * this constant.
const WINDOW_TRIGGER_FACTOR: u32 = 2;

#[derive(Default, Debug)]
pub struct FlowControl {
/// Total consumed bytes by the receiver.
consumed: u64,

/// Flow control limit.
max_data: u64,

/// The maximum receive window. This value is used for updating
/// flow control limit.
window: u64,

/// Last update time of max_data for autotuning the window.
last_update: Option<Instant>,
}

impl FlowControl {
pub fn new(max_data: u64, window: u64) -> Self {
Self {
max_data,

window,

..Default::default()
}
}

/// Returns the current window size.
pub fn window(&self) -> u64 {
self.window
}

/// Returns the current flow limit.
pub fn max_data(&self) -> u64 {
self.max_data
}

/// Update consumed bytes.
pub fn add_consumed(&mut self, consumed: u64) {
self.consumed += consumed;
}

/// Returns true if the flow control needs to update max_data.
///
/// This happens when the available window is smaller than the half
/// of the current window.
pub fn should_update_max_data(&self) -> bool {
let available_window = self.max_data - self.consumed;

available_window < (self.window / 2)
}

/// Returns the new max_data limit.
pub fn max_data_next(&self) -> u64 {
self.consumed + self.window
}

/// Commits the new max_data limit.
pub fn update_max_data(&mut self, now: Instant) {
self.max_data = self.max_data_next();
self.last_update = Some(now);
}

/// Autotune the window size. When there is an another update
/// within RTT x 2, bump the window x 1.5, capped by
/// max_window.
pub fn autotune_window(
&mut self, now: Instant, rtt: Duration, max_window: u64,
) {
if let Some(last_update) = self.last_update {
if now - last_update < rtt * WINDOW_TRIGGER_FACTOR {
self.window = std::cmp::min(
self.window * WINDOW_INCREASE_FACTOR,
max_window,
);
}
}
}

/// Make sure the lower bound of the window is same to
/// the current window.
pub fn ensure_window_lower_bound(&mut self, min_window: u64) {
if min_window > self.window {
self.window = min_window;
}
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn max_data() {
let fc = FlowControl::new(100, 20);

assert_eq!(fc.max_data(), 100);
}

#[test]
fn should_update_max_data() {
let mut fc = FlowControl::new(100, 20);

fc.add_consumed(85);
assert_eq!(fc.should_update_max_data(), false);

fc.add_consumed(10);
assert_eq!(fc.should_update_max_data(), true);
}

#[test]
fn max_data_next() {
let mut fc = FlowControl::new(100, 20);

let consumed = 95;

fc.add_consumed(consumed);
assert_eq!(fc.should_update_max_data(), true);
assert_eq!(fc.max_data_next(), consumed + 20);
}

#[test]
fn update_max_data() {
let mut fc = FlowControl::new(100, 20);

let consumed = 95;

fc.add_consumed(consumed);
assert_eq!(fc.should_update_max_data(), true);

let max_data_next = fc.max_data_next();
assert_eq!(fc.max_data_next(), consumed + 20);

fc.update_max_data(Instant::now());
assert_eq!(fc.max_data(), max_data_next);
}

#[test]
fn autotune_window() {
let w = 20;
let mut fc = FlowControl::new(100, w);

let consumed = 95;

fc.add_consumed(consumed);
assert_eq!(fc.should_update_max_data(), true);

let max_data_next = fc.max_data_next();
assert_eq!(max_data_next, consumed + w);

fc.update_max_data(Instant::now());
assert_eq!(fc.max_data(), max_data_next);

// Window size should be doubled.
fc.autotune_window(Instant::now(), Duration::from_millis(100), 100);

let w = w * 2;
let consumed_inc = 15;

fc.add_consumed(consumed_inc);
assert_eq!(fc.should_update_max_data(), true);

let max_data_next = fc.max_data_next();
assert_eq!(max_data_next, consumed + consumed_inc + w);
}

#[test]
fn ensure_window_lower_bound() {
let w = 20;
let mut fc = FlowControl::new(100, w);

// Window doesn't change.
fc.ensure_window_lower_bound(w);
assert_eq!(fc.window(), 20);

// Window changed to the new value.
fc.ensure_window_lower_bound(w * 2);
assert_eq!(fc.window(), 40);
}
}
Loading

0 comments on commit 92e025a

Please sign in to comment.