Skip to content

Commit

Permalink
perf: Improve throughput when vectored IO is not enabled (hyperium#712)
Browse files Browse the repository at this point in the history
As discussed in hyperium#711, the
current implementation of sending data is suboptimal when vectored
I/O is not enabled: data frame's head is likely to be sent in a
separate TCP segment, whose payload is of only 9 bytes.

This PR adds some specialized implementaton for non-vectored I/O
case. In short, it sets a larget chain threhold, and also makes
sure a data frame's head is sent along with the beginning part of
the real data payload.

All existing unit tests passed. Also I take a look at the e2e
https://github.com/hyperium/hyper/blob/0.14.x/benches/end_to_end.rs
but realize that all the benchmarks there are for the case of
vectored I/O if the OS supports vectored I/O. There isn't a specific
case for non-vectored I/O so I am not sure how to proceed with
benchmark for performance evaluations.

Signed-off-by: Sven Pfennig <s.pfennig@reply.de>
  • Loading branch information
xiaoyawei authored and 0xE282B0 committed Jan 16, 2024
1 parent 7a4e278 commit de4d9ae
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 49 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ unstable = []
futures-core = { version = "0.3", default-features = false }
futures-sink = { version = "0.3", default-features = false }
futures-util = { version = "0.3", default-features = false }
tokio-util_wasi = { version = "0.7.4", features = ["codec"] }
tokio-util_wasi = { version = "0.7.4", features = ["codec", "io"] }
tokio_wasi = {version = "1", features = ["io-util"] }
bytes = "1"
http = "0.2"
Expand Down
84 changes: 36 additions & 48 deletions src/codec/framed_write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,9 @@ use bytes::{Buf, BufMut, BytesMut};
use std::pin::Pin;
use std::task::{Context, Poll};
use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
use tokio_util::io::poll_write_buf;

use std::io::{self, Cursor, IoSlice};
use std::io::{self, Cursor};

// A macro to get around a method needing to borrow &mut self
macro_rules! limited_write_buf {
Expand Down Expand Up @@ -45,8 +46,11 @@ struct Encoder<B> {
/// Max frame size, this is specified by the peer
max_frame_size: FrameSize,

/// Whether or not the wrapped `AsyncWrite` supports vectored IO.
is_write_vectored: bool,
/// Chain payloads bigger than this.
chain_threshold: usize,

/// Min buffer required to attempt to write a frame
min_buffer_capacity: usize,
}

#[derive(Debug)]
Expand All @@ -61,22 +65,28 @@ enum Next<B> {
/// frame that big.
const DEFAULT_BUFFER_CAPACITY: usize = 16 * 1_024;

/// Min buffer required to attempt to write a frame
const MIN_BUFFER_CAPACITY: usize = frame::HEADER_LEN + CHAIN_THRESHOLD;

/// Chain payloads bigger than this. The remote will never advertise a max frame
/// size less than this (well, the spec says the max frame size can't be less
/// than 16kb, so not even close).
/// Chain payloads bigger than this when vectored I/O is enabled. The remote
/// will never advertise a max frame size less than this (well, the spec says
/// the max frame size can't be less than 16kb, so not even close).
const CHAIN_THRESHOLD: usize = 256;

/// Chain payloads bigger than this when vectored I/O is **not** enabled.
/// A larger value in this scenario will reduce the number of small and
/// fragmented data being sent, and hereby improve the throughput.
const CHAIN_THRESHOLD_WITHOUT_VECTORED_IO: usize = 1024;

// TODO: Make generic
impl<T, B> FramedWrite<T, B>
where
T: AsyncWrite + Unpin,
B: Buf,
{
pub fn new(inner: T) -> FramedWrite<T, B> {
let is_write_vectored = inner.is_write_vectored();
let chain_threshold = if inner.is_write_vectored() {
CHAIN_THRESHOLD
} else {
CHAIN_THRESHOLD_WITHOUT_VECTORED_IO
};
FramedWrite {
inner,
encoder: Encoder {
Expand All @@ -85,7 +95,8 @@ where
next: None,
last_data_frame: None,
max_frame_size: frame::DEFAULT_MAX_FRAME_SIZE,
is_write_vectored,
chain_threshold,
min_buffer_capacity: chain_threshold + frame::HEADER_LEN,
},
}
}
Expand Down Expand Up @@ -126,23 +137,17 @@ where
Some(Next::Data(ref mut frame)) => {
tracing::trace!(queued_data_frame = true);
let mut buf = (&mut self.encoder.buf).chain(frame.payload_mut());
ready!(write(
&mut self.inner,
self.encoder.is_write_vectored,
&mut buf,
cx,
))?
ready!(poll_write_buf(Pin::new(&mut self.inner), cx, &mut buf))?
}
_ => {
tracing::trace!(queued_data_frame = false);
ready!(write(
&mut self.inner,
self.encoder.is_write_vectored,
&mut self.encoder.buf,
ready!(poll_write_buf(
Pin::new(&mut self.inner),
cx,
&mut self.encoder.buf
))?
}
}
};
}

match self.encoder.unset_frame() {
Expand All @@ -165,30 +170,6 @@ where
}
}

fn write<T, B>(
writer: &mut T,
is_write_vectored: bool,
buf: &mut B,
cx: &mut Context<'_>,
) -> Poll<io::Result<()>>
where
T: AsyncWrite + Unpin,
B: Buf,
{
// TODO(eliza): when tokio-util 0.5.1 is released, this
// could just use `poll_write_buf`...
const MAX_IOVS: usize = 64;
let n = if is_write_vectored {
let mut bufs = [IoSlice::new(&[]); MAX_IOVS];
let cnt = buf.chunks_vectored(&mut bufs);
ready!(Pin::new(writer).poll_write_vectored(cx, &bufs[..cnt]))?
} else {
ready!(Pin::new(writer).poll_write(cx, buf.chunk()))?
};
buf.advance(n);
Ok(()).into()
}

#[must_use]
enum ControlFlow {
Continue,
Expand Down Expand Up @@ -240,12 +221,17 @@ where
return Err(PayloadTooBig);
}

if len >= CHAIN_THRESHOLD {
if len >= self.chain_threshold {
let head = v.head();

// Encode the frame head to the buffer
head.encode(len, self.buf.get_mut());

if self.buf.get_ref().remaining() < self.chain_threshold {
let extra_bytes = self.chain_threshold - self.buf.remaining();
self.buf.get_mut().put(v.payload_mut().take(extra_bytes));
}

// Save the data frame
self.next = Some(Next::Data(v));
} else {
Expand Down Expand Up @@ -305,7 +291,9 @@ where
}

fn has_capacity(&self) -> bool {
self.next.is_none() && self.buf.get_ref().remaining_mut() >= MIN_BUFFER_CAPACITY
self.next.is_none()
&& (self.buf.get_ref().capacity() - self.buf.get_ref().len()
>= self.min_buffer_capacity)
}

fn is_empty(&self) -> bool {
Expand Down

0 comments on commit de4d9ae

Please sign in to comment.