Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

apps: Linux GSO + pacing support for quiche-server #1138

Merged
merged 1 commit into from
Aug 26, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
165 changes: 72 additions & 93 deletions apps/src/bin/quiche-server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,10 @@
#[macro_use]
extern crate log;

use std::net;

use std::io;

use std::net;

use std::io::prelude::*;

use std::collections::HashMap;
Expand All @@ -45,13 +45,15 @@ use quiche_apps::args::*;

use quiche_apps::common::*;

const MAX_BUF_SIZE: usize = 65535;
use quiche_apps::sendto::*;

const MAX_BUF_SIZE: usize = 65507;

const MAX_DATAGRAM_SIZE: usize = 1350;

fn main() {
let mut buf = [0; MAX_BUF_SIZE];
let mut out = [0; MAX_DATAGRAM_SIZE];
let mut out = [0; MAX_BUF_SIZE];
let mut pacing = false;

env_logger::builder()
Expand Down Expand Up @@ -87,6 +89,11 @@ fn main() {
.register(&mut socket, mio::Token(0), mio::Interest::READABLE)
.unwrap();

let max_datagram_size = MAX_DATAGRAM_SIZE;
let enable_gso = detect_gso(&socket, max_datagram_size);

trace!("GSO detected: {}", enable_gso);

// Create the configuration for the QUIC connections.
let mut config = quiche::Config::new(quiche::PROTOCOL_VERSION).unwrap();

Expand All @@ -96,8 +103,8 @@ fn main() {
config.set_application_protos(&conn_args.alpns).unwrap();

config.set_max_idle_timeout(conn_args.idle_timeout);
config.set_max_recv_udp_payload_size(MAX_DATAGRAM_SIZE);
config.set_max_send_udp_payload_size(MAX_DATAGRAM_SIZE);
config.set_max_recv_udp_payload_size(max_datagram_size);
config.set_max_send_udp_payload_size(max_datagram_size);
config.set_initial_max_data(conn_args.max_data);
config.set_initial_max_stream_data_bidi_local(conn_args.max_stream_data);
config.set_initial_max_stream_data_bidi_remote(conn_args.max_stream_data);
Expand Down Expand Up @@ -366,7 +373,9 @@ fn main() {
partial_responses: HashMap::new(),
siduck_conn: None,
app_proto_selected: false,
bytes_sent: 0,
max_datagram_size,
loss_rate: 0.0,
max_send_burst: MAX_BUF_SIZE,
};

clients.insert(client_id, client);
Expand Down Expand Up @@ -450,6 +459,10 @@ fn main() {

client.app_proto_selected = true;
}

// Update max_datagram_size after connection established.
client.max_datagram_size =
client.conn.max_send_udp_payload_size();
}

if client.http_conn.is_some() {
Expand Down Expand Up @@ -515,12 +528,29 @@ fn main() {
// packets to be sent.
continue_write = false;
for client in clients.values_mut() {
let max_send_burst = client.conn.send_quantum().min(MAX_BUF_SIZE) /
MAX_DATAGRAM_SIZE *
MAX_DATAGRAM_SIZE;
// Reduce max_send_burst by 25% if loss is increasing more than 0.1%.
let loss_rate =
client.conn.stats().lost as f64 / client.conn.stats().sent as f64;
if loss_rate > client.loss_rate + 0.001 {
client.max_send_burst = client.max_send_burst / 4 * 3;
// Minimun bound of 10xMSS.
client.max_send_burst =
client.max_send_burst.max(client.max_datagram_size * 10);
client.loss_rate = loss_rate;
}

loop {
let (write, send_info) = match client.conn.send(&mut out) {
let max_send_burst =
client.conn.send_quantum().min(client.max_send_burst) /
client.max_datagram_size *
client.max_datagram_size;
let mut total_write = 0;
let mut dst_info = None;

while total_write < max_send_burst {
let (write, send_info) = match client
.conn
.send(&mut out[total_write..max_send_burst])
{
Ok(v) => v,

Err(quiche::Error::Done) => {
Expand All @@ -536,33 +566,42 @@ fn main() {
},
};

// TODO: coalesce packets.
if let Err(e) =
send_to(&socket, &out[..write], &send_info, pacing)
{
if e.kind() == std::io::ErrorKind::WouldBlock {
trace!("send() would block");
break;
}
total_write += write;

panic!("send() failed: {:?}", e);
}
// Use the first packet time to send, not the last.
let _ = dst_info.get_or_insert(send_info);

trace!("{} written {} bytes", client.conn.trace_id(), write);
if write < client.max_datagram_size {
break;
}
}

// limit write bursting
client.bytes_sent += write;
if total_write == 0 || dst_info.is_none() {
break;
}

if client.bytes_sent >= max_send_burst {
trace!(
"{} pause writing at {}",
client.conn.trace_id(),
client.bytes_sent
);
client.bytes_sent = 0;
continue_write = true;
if let Err(e) = send_to(
&socket,
&out[..total_write],
&dst_info.unwrap(),
client.max_datagram_size,
pacing,
enable_gso,
) {
if e.kind() == std::io::ErrorKind::WouldBlock {
trace!("send() would block");
break;
}

panic!("send_to() failed: {:?}", e);
}

trace!("{} written {} bytes", client.conn.trace_id(), total_write);

if total_write >= max_send_burst {
trace!("{} pause writing", client.conn.trace_id(),);
continue_write = true;
break;
}
}

Expand Down Expand Up @@ -741,63 +780,3 @@ fn set_txtime_sockopt(_: &mio::net::UdpSocket) -> io::Result<()> {
"Not supported on this platform",
))
}

/// Send outgoing UDP packet to kernel using sendmsg syscall
///
/// sendmsg syscall also includes the time the packet needs to be
/// sent by the kernel in msghdr.
///
/// Note that sendmsg syscall is used only on linux platforms.
#[cfg(target_os = "linux")]
fn send_to(
sock: &mio::net::UdpSocket, send_buf: &[u8], send_info: &quiche::SendInfo,
pacing: bool,
) -> io::Result<usize> {
use nix::sys::socket::sendmsg;
use nix::sys::socket::ControlMessage;
use nix::sys::socket::MsgFlags;
use nix::sys::socket::SockaddrStorage;
use std::io::IoSlice;
use std::os::unix::io::AsRawFd;

if !pacing {
return sock.send_to(send_buf, send_info.to);
}

let nanos_per_sec: u64 = 1_000_000_000;
let sockfd = sock.as_raw_fd();
let len = send_buf.len();
let iov = [IoSlice::new(&send_buf[..len])];

let mut time_spec = libc::timespec {
tv_sec: 0,
tv_nsec: 0,
};

unsafe {
std::ptr::copy_nonoverlapping(
&send_info.at as *const _ as *const libc::timespec,
&mut time_spec,
1,
)
};

let send_time =
time_spec.tv_sec as u64 * nanos_per_sec + time_spec.tv_nsec as u64;

let cmsg = ControlMessage::TxTime(&send_time);
let addr = SockaddrStorage::from(send_info.to);

match sendmsg(sockfd, &iov, &[cmsg], MsgFlags::empty(), Some(&addr)) {
Ok(written) => Ok(written),
Err(e) => Err(e.into()),
}
}

#[cfg(not(target_os = "linux"))]
fn send_to(
sock: &mio::net::UdpSocket, send_buf: &[u8], send_info: &quiche::SendInfo,
_: bool,
) -> io::Result<usize> {
sock.send_to(send_buf, send_info.to)
}
6 changes: 5 additions & 1 deletion apps/src/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,11 @@ pub struct Client {

pub partial_responses: std::collections::HashMap<u64, PartialResponse>,

pub bytes_sent: usize,
pub max_datagram_size: usize,

pub loss_rate: f64,

pub max_send_burst: usize,
}

pub type ClientIdMap = HashMap<ConnectionId<'static>, ClientId>;
Expand Down
1 change: 1 addition & 0 deletions apps/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,3 +30,4 @@ extern crate log;
pub mod args;
pub mod client;
pub mod common;
pub mod sendto;
Loading