Skip to content

Commit

Permalink
apps: Linux GSO support for quiche-server
Browse files Browse the repository at this point in the history
When GSO can be enabled (on a supported linux platform in nix),
GSO can be used for a large transfer using sendmsg(). For
non-supported platforms, should work as same.
  • Loading branch information
Junho Choi committed Jun 29, 2021
1 parent f86de8f commit c9eeef2
Show file tree
Hide file tree
Showing 3 changed files with 148 additions and 23 deletions.
1 change: 1 addition & 0 deletions tools/apps/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ default = ["qlog"]
docopt = "1"
env_logger = "0.6"
mio = "0.6"
nix = "0.21"
url = "1"
log = "0.4"
ring = "0.16"
Expand Down
166 changes: 143 additions & 23 deletions tools/apps/src/bin/quiche-server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@
#[macro_use]
extern crate log;

use std::cmp;

use std::io;

use std::net;

use std::io::prelude::*;
Expand All @@ -43,13 +47,15 @@ use quiche_apps::args::*;

use quiche_apps::common::*;

const MAX_BUF_SIZE: usize = 65536;

const MAX_DATAGRAM_SIZE: usize = 1350;

const MAX_SEND_BURST_LIMIT: usize = MAX_DATAGRAM_SIZE * 10;
const MAX_SEND_BURST_PACKETS: usize = 10;

fn main() {
let mut buf = [0; 65535];
let mut out = [0; MAX_DATAGRAM_SIZE];
let mut buf = [0; MAX_BUF_SIZE];
let mut out = [0; MAX_BUF_SIZE];

env_logger::builder()
.default_format_timestamp_nanos(true)
Expand Down Expand Up @@ -78,6 +84,11 @@ fn main() {
)
.unwrap();

let max_datagram_size = MAX_DATAGRAM_SIZE;
let enable_gso = detect_gso(&socket, max_datagram_size);

trace!("GSO detected: {}", enable_gso);

// Create the configuration for the QUIC connections.
let mut config = quiche::Config::new(quiche::PROTOCOL_VERSION).unwrap();

Expand All @@ -87,8 +98,8 @@ fn main() {
config.set_application_protos(&conn_args.alpns).unwrap();

config.set_max_idle_timeout(conn_args.idle_timeout);
config.set_max_recv_udp_payload_size(MAX_DATAGRAM_SIZE);
config.set_max_send_udp_payload_size(MAX_DATAGRAM_SIZE);
config.set_max_recv_udp_payload_size(max_datagram_size);
config.set_max_send_udp_payload_size(max_datagram_size);
config.set_initial_max_data(conn_args.max_data);
config.set_initial_max_stream_data_bidi_local(conn_args.max_stream_data);
config.set_initial_max_stream_data_bidi_remote(conn_args.max_stream_data);
Expand Down Expand Up @@ -342,6 +353,11 @@ fn main() {
siduck_conn: None,
app_proto_selected: false,
bytes_sent: 0,
max_datagram_size,
max_send_burst: cmp::min(
MAX_BUF_SIZE,
max_datagram_size * MAX_SEND_BURST_PACKETS,
),
};

clients.insert(scid.clone(), client);
Expand Down Expand Up @@ -415,6 +431,14 @@ fn main() {

client.app_proto_selected = true;
}

// Update max_datagram_size after connection established.
client.max_datagram_size =
client.conn.max_send_udp_payload_size();
client.max_send_burst = cmp::min(
MAX_BUF_SIZE,
client.max_datagram_size * MAX_SEND_BURST_PACKETS,
);
}

if client.http_conn.is_some() {
Expand Down Expand Up @@ -458,39 +482,73 @@ fn main() {
// packets to be sent.
continue_write = false;
for client in clients.values_mut() {
loop {
let (write, send_info) = match client.conn.send(&mut out) {
Ok(v) => v,
let max_send_burst = client.max_send_burst;
let mut total_write = 0;
let mut dst_info = None;

'write: loop {
while total_write < max_send_burst {
let (write, send_info) = match client
.conn
.send(&mut out[total_write..max_send_burst])
{
Ok(v) => v,

Err(quiche::Error::Done) => {
trace!("{} done writing", client.conn.trace_id());
break;
},

Err(e) => {
error!(
"{} send failed: {:?}",
client.conn.trace_id(),
e
);

client.conn.close(false, 0x1, b"fail").ok();
break 'write;
},
};

Err(quiche::Error::Done) => {
trace!("{} done writing", client.conn.trace_id());
break;
},
total_write += write;

Err(e) => {
error!("{} send failed: {:?}", client.conn.trace_id(), e);
dst_info = Some(send_info);

client.conn.close(false, 0x1, b"fail").ok();
// No GSO, or last packet.
if !enable_gso || write < client.max_datagram_size {
break;
},
};
}
}

if total_write == 0 || dst_info.is_none() {
break;
}

// TODO: coalesce packets.
if let Err(e) = socket.send_to(&out[..write], &send_info.to) {
if let Err(e) = send_to(
&socket,
&out[..total_write],
&dst_info.unwrap().to,
client.max_datagram_size,
) {
if e.kind() == std::io::ErrorKind::WouldBlock {
trace!("send() would block");
break;
}

panic!("send() failed: {:?}", e);
panic!("send_to() failed: {:?}", e);
}

trace!("{} written {} bytes", client.conn.trace_id(), write);
trace!(
"{} written {} bytes",
client.conn.trace_id(),
total_write
);

// limit write bursting
client.bytes_sent += write;
client.bytes_sent += total_write;

if client.bytes_sent >= MAX_SEND_BURST_LIMIT {
if client.bytes_sent >= max_send_burst {
trace!(
"{} pause writing at {}",
client.conn.trace_id(),
Expand Down Expand Up @@ -575,3 +633,65 @@ fn validate_token<'a>(

Some(quiche::ConnectionId::from_ref(&token[addr.len()..]))
}

/// For Linux, try to detect GSO is available.
#[cfg(target_os = "linux")]
fn detect_gso(socket: &mio::net::UdpSocket, segment_size: usize) -> bool {
use nix::sys::socket::setsockopt;
use nix::sys::socket::sockopt::UdpGsoSegment;
use std::os::unix::io::AsRawFd;

setsockopt(socket.as_raw_fd(), UdpGsoSegment, &(segment_size as i32)).is_ok()
}

/// For non-Linux, there is no GSO support.
#[cfg(not(target_os = "linux"))]
fn detect_gso(_socket: &mio::net::UdpSocket, _segment_size: usize) -> bool {
false
}

/// When GSO enabled, send packets using sendmsg().
#[cfg(target_os = "linux")]
fn send_to(
socket: &mio::net::UdpSocket, buf: &[u8], target: &net::SocketAddr,
segment_size: usize,
) -> io::Result<usize> {
use nix::sys::socket::sendmsg;
use nix::sys::socket::ControlMessage;
use nix::sys::socket::InetAddr;
use nix::sys::socket::MsgFlags;
use nix::sys::socket::SockAddr;
use nix::sys::uio::IoVec;
use std::os::unix::io::AsRawFd;

let iov = [IoVec::from_slice(buf)];
let segment_size = segment_size as u16;
let cmsg = ControlMessage::UdpGsoSegments(&segment_size);
let dst = SockAddr::new_inet(InetAddr::from_std(target));

match sendmsg(
socket.as_raw_fd(),
&iov,
&[cmsg],
MsgFlags::empty(),
Some(&dst),
) {
Ok(v) => Ok(v),
Err(e) => {
let e = match e.as_errno() {
Some(v) => io::Error::from(v),
None => io::Error::new(io::ErrorKind::Other, e),
};
Err(e)
},
}
}

/// When GSO disabled, send a packet using send_to().
#[cfg(not(target_os = "linux"))]
fn send_to(
socket: &mio::net::UdpSocket, buf: &[u8], target: &net::SocketAddr,
_segment_size: usize,
) -> io::Result<usize> {
socket.send_to(buf, target)
}
4 changes: 4 additions & 0 deletions tools/apps/src/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,10 @@ pub struct Client {
pub partial_responses: std::collections::HashMap<u64, PartialResponse>,

pub bytes_sent: usize,

pub max_datagram_size: usize,

pub max_send_burst: usize,
}

pub type ClientMap = HashMap<ConnectionId<'static>, Client>;
Expand Down

0 comments on commit c9eeef2

Please sign in to comment.