From d60e494d14e927b53fa644ec3e62a5e506e3401f Mon Sep 17 00:00:00 2001 From: Junho Choi Date: Mon, 28 Jun 2021 17:28:37 -0700 Subject: [PATCH] apps: add GSO support for server on Linux When GSO can be enabled (on a supported linux platform), now quiche-server sends multiple packets using sendmsg() with GSO. For non-supported platforms, it should work same as before. --- apps/src/bin/quiche-server.rs | 165 +++++++++++++++------------------- apps/src/common.rs | 6 +- apps/src/lib.rs | 1 + apps/src/sendto.rs | 146 ++++++++++++++++++++++++++++++ 4 files changed, 224 insertions(+), 94 deletions(-) create mode 100644 apps/src/sendto.rs diff --git a/apps/src/bin/quiche-server.rs b/apps/src/bin/quiche-server.rs index 35d04c9896..438bbd6af4 100644 --- a/apps/src/bin/quiche-server.rs +++ b/apps/src/bin/quiche-server.rs @@ -27,10 +27,10 @@ #[macro_use] extern crate log; -use std::net; - use std::io; +use std::net; + use std::io::prelude::*; use std::collections::HashMap; @@ -45,13 +45,15 @@ use quiche_apps::args::*; use quiche_apps::common::*; -const MAX_BUF_SIZE: usize = 65535; +use quiche_apps::sendto::*; + +const MAX_BUF_SIZE: usize = 65507; const MAX_DATAGRAM_SIZE: usize = 1350; fn main() { let mut buf = [0; MAX_BUF_SIZE]; - let mut out = [0; MAX_DATAGRAM_SIZE]; + let mut out = [0; MAX_BUF_SIZE]; let mut pacing = false; env_logger::builder() @@ -87,6 +89,11 @@ fn main() { .register(&mut socket, mio::Token(0), mio::Interest::READABLE) .unwrap(); + let max_datagram_size = MAX_DATAGRAM_SIZE; + let enable_gso = detect_gso(&socket, max_datagram_size); + + trace!("GSO detected: {}", enable_gso); + // Create the configuration for the QUIC connections. let mut config = quiche::Config::new(quiche::PROTOCOL_VERSION).unwrap(); @@ -96,8 +103,8 @@ fn main() { config.set_application_protos(&conn_args.alpns).unwrap(); config.set_max_idle_timeout(conn_args.idle_timeout); - config.set_max_recv_udp_payload_size(MAX_DATAGRAM_SIZE); - config.set_max_send_udp_payload_size(MAX_DATAGRAM_SIZE); + config.set_max_recv_udp_payload_size(max_datagram_size); + config.set_max_send_udp_payload_size(max_datagram_size); config.set_initial_max_data(conn_args.max_data); config.set_initial_max_stream_data_bidi_local(conn_args.max_stream_data); config.set_initial_max_stream_data_bidi_remote(conn_args.max_stream_data); @@ -366,7 +373,9 @@ fn main() { partial_responses: HashMap::new(), siduck_conn: None, app_proto_selected: false, - bytes_sent: 0, + max_datagram_size, + loss_rate: 0.0, + max_send_burst: MAX_BUF_SIZE, }; clients.insert(client_id, client); @@ -450,6 +459,10 @@ fn main() { client.app_proto_selected = true; } + + // Update max_datagram_size after connection established. + client.max_datagram_size = + client.conn.max_send_udp_payload_size(); } if client.http_conn.is_some() { @@ -515,12 +528,29 @@ fn main() { // packets to be sent. continue_write = false; for client in clients.values_mut() { - let max_send_burst = client.conn.send_quantum().min(MAX_BUF_SIZE) / - MAX_DATAGRAM_SIZE * - MAX_DATAGRAM_SIZE; + // Reduce max_send_burst by 25% if loss is increasing more than 0.1%. + let loss_rate = + client.conn.stats().lost as f64 / client.conn.stats().sent as f64; + if loss_rate > client.loss_rate + 0.001 { + client.max_send_burst = client.max_send_burst / 4 * 3; + // Minimun bound of 10xMSS. + client.max_send_burst = + client.max_send_burst.max(client.max_datagram_size * 10); + client.loss_rate = loss_rate; + } - loop { - let (write, send_info) = match client.conn.send(&mut out) { + let max_send_burst = + client.conn.send_quantum().min(client.max_send_burst) / + client.max_datagram_size * + client.max_datagram_size; + let mut total_write = 0; + let mut dst_info = None; + + while total_write < max_send_burst { + let (write, send_info) = match client + .conn + .send(&mut out[total_write..max_send_burst]) + { Ok(v) => v, Err(quiche::Error::Done) => { @@ -536,33 +566,42 @@ fn main() { }, }; - // TODO: coalesce packets. - if let Err(e) = - send_to(&socket, &out[..write], &send_info, pacing) - { - if e.kind() == std::io::ErrorKind::WouldBlock { - trace!("send() would block"); - break; - } + total_write += write; - panic!("send() failed: {:?}", e); - } + // Use the first packet time to send, not the last. + let _ = dst_info.get_or_insert(send_info); - trace!("{} written {} bytes", client.conn.trace_id(), write); + if write < client.max_datagram_size { + break; + } + } - // limit write bursting - client.bytes_sent += write; + if total_write == 0 || dst_info.is_none() { + break; + } - if client.bytes_sent >= max_send_burst { - trace!( - "{} pause writing at {}", - client.conn.trace_id(), - client.bytes_sent - ); - client.bytes_sent = 0; - continue_write = true; + if let Err(e) = send_to( + &socket, + &out[..total_write], + &dst_info.unwrap(), + client.max_datagram_size, + pacing, + enable_gso, + ) { + if e.kind() == std::io::ErrorKind::WouldBlock { + trace!("send() would block"); break; } + + panic!("send_to() failed: {:?}", e); + } + + trace!("{} written {} bytes", client.conn.trace_id(), total_write); + + if total_write >= max_send_burst { + trace!("{} pause writing", client.conn.trace_id(),); + continue_write = true; + break; } } @@ -741,63 +780,3 @@ fn set_txtime_sockopt(_: &mio::net::UdpSocket) -> io::Result<()> { "Not supported on this platform", )) } - -/// Send outgoing UDP packet to kernel using sendmsg syscall -/// -/// sendmsg syscall also includes the time the packet needs to be -/// sent by the kernel in msghdr. -/// -/// Note that sendmsg syscall is used only on linux platforms. -#[cfg(target_os = "linux")] -fn send_to( - sock: &mio::net::UdpSocket, send_buf: &[u8], send_info: &quiche::SendInfo, - pacing: bool, -) -> io::Result { - use nix::sys::socket::sendmsg; - use nix::sys::socket::ControlMessage; - use nix::sys::socket::MsgFlags; - use nix::sys::socket::SockaddrStorage; - use std::io::IoSlice; - use std::os::unix::io::AsRawFd; - - if !pacing { - return sock.send_to(send_buf, send_info.to); - } - - let nanos_per_sec: u64 = 1_000_000_000; - let sockfd = sock.as_raw_fd(); - let len = send_buf.len(); - let iov = [IoSlice::new(&send_buf[..len])]; - - let mut time_spec = libc::timespec { - tv_sec: 0, - tv_nsec: 0, - }; - - unsafe { - std::ptr::copy_nonoverlapping( - &send_info.at as *const _ as *const libc::timespec, - &mut time_spec, - 1, - ) - }; - - let send_time = - time_spec.tv_sec as u64 * nanos_per_sec + time_spec.tv_nsec as u64; - - let cmsg = ControlMessage::TxTime(&send_time); - let addr = SockaddrStorage::from(send_info.to); - - match sendmsg(sockfd, &iov, &[cmsg], MsgFlags::empty(), Some(&addr)) { - Ok(written) => Ok(written), - Err(e) => Err(e.into()), - } -} - -#[cfg(not(target_os = "linux"))] -fn send_to( - sock: &mio::net::UdpSocket, send_buf: &[u8], send_info: &quiche::SendInfo, - _: bool, -) -> io::Result { - sock.send_to(send_buf, send_info.to) -} diff --git a/apps/src/common.rs b/apps/src/common.rs index 9842e442f2..a4c457b9c4 100644 --- a/apps/src/common.rs +++ b/apps/src/common.rs @@ -96,7 +96,11 @@ pub struct Client { pub partial_responses: std::collections::HashMap, - pub bytes_sent: usize, + pub max_datagram_size: usize, + + pub loss_rate: f64, + + pub max_send_burst: usize, } pub type ClientIdMap = HashMap, ClientId>; diff --git a/apps/src/lib.rs b/apps/src/lib.rs index c6b9225294..6c4941a6b9 100644 --- a/apps/src/lib.rs +++ b/apps/src/lib.rs @@ -30,3 +30,4 @@ extern crate log; pub mod args; pub mod client; pub mod common; +pub mod sendto; diff --git a/apps/src/sendto.rs b/apps/src/sendto.rs new file mode 100644 index 0000000000..652e01c413 --- /dev/null +++ b/apps/src/sendto.rs @@ -0,0 +1,146 @@ +// Copyright (C) 2021, Cloudflare, Inc. +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright notice, +// this list of conditions and the following disclaimer. +// +// * Redistributions in binary form must reproduce the above copyright +// notice, this list of conditions and the following disclaimer in the +// documentation and/or other materials provided with the distribution. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS +// IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, +// THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR +// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR +// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, +// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, +// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR +// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF +// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING +// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS +// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +use std::cmp; + +use std::io; + +/// For Linux, try to detect GSO is available. +#[cfg(target_os = "linux")] +pub fn detect_gso(socket: &mio::net::UdpSocket, segment_size: usize) -> bool { + use nix::sys::socket::setsockopt; + use nix::sys::socket::sockopt::UdpGsoSegment; + use std::os::unix::io::AsRawFd; + + setsockopt(socket.as_raw_fd(), UdpGsoSegment, &(segment_size as i32)).is_ok() +} + +/// For non-Linux, there is no GSO support. +#[cfg(not(target_os = "linux"))] +pub fn detect_gso(_socket: &mio::net::UdpSocket, _segment_size: usize) -> bool { + false +} + +/// Send packets using sendmsg() with GSO. +#[cfg(target_os = "linux")] +fn send_to_gso_pacing( + socket: &mio::net::UdpSocket, buf: &[u8], send_info: &quiche::SendInfo, + segment_size: usize, +) -> io::Result { + use nix::sys::socket::sendmsg; + use nix::sys::socket::ControlMessage; + use nix::sys::socket::MsgFlags; + use nix::sys::socket::SockaddrStorage; + use std::io::IoSlice; + use std::os::unix::io::AsRawFd; + + let iov = [IoSlice::new(buf)]; + let segment_size = segment_size as u16; + let dst = SockaddrStorage::from(send_info.to); + let sockfd = socket.as_raw_fd(); + + // GSO option. + let cmsg_gso = ControlMessage::UdpGsoSegments(&segment_size); + let nanos_per_sec: u64 = 1_000_000_000; + + // Pacing option. + let mut time_spec = libc::timespec { + tv_sec: 0, + tv_nsec: 0, + }; + + unsafe { + std::ptr::copy_nonoverlapping( + &send_info.at as *const _ as *const libc::timespec, + &mut time_spec, + 1, + ) + }; + + let send_time = + time_spec.tv_sec as u64 * nanos_per_sec + time_spec.tv_nsec as u64; + + let cmsg_txtime = ControlMessage::TxTime(&send_time); + + match sendmsg( + sockfd, + &iov, + &[cmsg_gso, cmsg_txtime], + MsgFlags::empty(), + Some(&dst), + ) { + Ok(v) => Ok(v), + Err(e) => Err(e.into()), + } +} + +/// For non-Linux platforms. +#[cfg(not(target_os = "linux"))] +fn send_to_gso_pacing( + _socket: &mio::net::UdpSocket, _buf: &[u8], _send_info: &quiche::SendInfo, + _segment_size: usize, +) -> io::Result { + panic!("send_to_gso() should not be called on non-linux platforms"); +} + +/// A wrapper function of send_to(). +/// - when GSO and SO_TXTIME enabled, send a packet using send_to_gso(). +/// Otherwise, send packet using socket.send_to(). +pub fn send_to( + socket: &mio::net::UdpSocket, buf: &[u8], send_info: &quiche::SendInfo, + segment_size: usize, pacing: bool, enable_gso: bool, +) -> io::Result { + if pacing && enable_gso { + match send_to_gso_pacing(socket, buf, send_info, segment_size) { + Ok(v) => { + return Ok(v); + }, + Err(e) => { + return Err(e); + }, + } + } + + let mut off = 0; + let mut left = buf.len(); + let mut written = 0; + + while left > 0 { + let pkt_len = cmp::min(left, segment_size); + + match socket.send_to(&buf[off..off + pkt_len], send_info.to) { + Ok(v) => { + written += v; + }, + Err(e) => return Err(e), + } + + off += pkt_len; + left -= pkt_len; + } + + Ok(written) +}