Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add AsyncWriteExt::write_all_vectored utility #1741

Merged
merged 1 commit into from
Apr 22, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions futures-util/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ unstable = ["futures-core/unstable", "futures-task/unstable"]
cfg-target-has-atomic = ["futures-core/cfg-target-has-atomic", "futures-task/cfg-target-has-atomic"]
bilock = []
read-initializer = ["io", "futures-io/read-initializer", "futures-io/unstable"]
write-all-vectored = ["io"]

[dependencies]
futures-core = { path = "../futures-core", version = "0.3.4", default-features = false }
Expand Down
60 changes: 59 additions & 1 deletion futures-util/src/io/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,11 @@ pub use self::write_vectored::WriteVectored;
mod write_all;
pub use self::write_all::WriteAll;

#[cfg(feature = "write_all_vectored")]
mod write_all_vectored;
#[cfg(feature = "write_all_vectored")]
pub use self::write_all_vectored::WriteAllVectored;

/// An extension trait which adds utility methods to `AsyncRead` types.
pub trait AsyncReadExt: AsyncRead {
/// Creates an adaptor which will chain this stream with another.
Expand Down Expand Up @@ -460,6 +465,60 @@ pub trait AsyncWriteExt: AsyncWrite {
WriteAll::new(self, buf)
}

/// Attempts to write multiple buffers into this writer.
///
/// Creates a future that will write the entire contents of `bufs` into this
/// `AsyncWrite` using [vectored writes].
///
/// The returned future will not complete until all the data has been
/// written.
///
/// [vectored writes]: std::io::Write::write_vectored
///
/// # Notes
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please carefully review this section, I don't if it clearly states what I mean.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the future successfully completes, bufs is an empty slice, right?

On failure bufs may have been modified, but is not guaranteed to match with what has actually been written out (because I believe the underlying IO doesn't guarantee that it either writes some data or returns an error, it could have written some of the data and then returned an error). That's pretty much equivalent to what write_all guarantees, if it returns an error some part of the data will have been written with no way to know how much.

I think this should try to avoid the "undefined" term since that has such strong memory-safety connotations, and the failure cases are relatively well-defined.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the future successfully completes, bufs is an empty slice, right?

That is correct, but in the future we could optimise to just return ready if the entire (remaining) buffer is written, not modifying bufs.

On failure bufs may have been modified, but is not guaranteed to match with what has actually been written out (because I believe the underlying IO doesn't guarantee that it either writes some data or returns an error, it could have written some of the data and then returned an error). That's pretty much equivalent to what write_all guarantees, if it returns an error some part of the data will have been written with no way to know how much.

Do you want this to be reflected in the documentation somehow, or leave it as is?

I think this should try to avoid the "undefined" term since that has such strong memory-safety connotations, and the failure cases are relatively well-defined.

Hence I added the it was not about memory safety, but I do understand the concern. Should I replace "undefined" with "unknown" (or something else) or you want to provide the guarantee that bufs will be empty after it successfully returns (also see my first remark in this comment)?

///
/// Unlike `io::Write::write_vectored`, this takes a *mutable* reference to
/// a slice of `IoSlice`s, not an immutable one. That's because we need to
/// modify the slice to keep track of the bytes already written.
///
/// Once this futures returns, the contents of `bufs` are unspecified, as
/// this depends on how many calls to `write_vectored` were necessary. It is
/// best to understand this function as taking ownership of `bufs` and to
/// not use `bufs` afterwards. The underlying buffers, to which the
/// `IoSlice`s point (but not the `IoSlice`s themselves), are unchanged and
/// can be reused.
///
/// # Examples
///
/// ```
/// # futures::executor::block_on(async {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for including this example!

/// use futures::io::AsyncWriteExt;
/// use std::io::{Cursor, IoSlice};
///
/// let mut writer = Cursor::new([0u8; 7]);
/// let bufs = &mut [
/// IoSlice::new(&[1]),
/// IoSlice::new(&[2, 3]),
/// IoSlice::new(&[4, 5, 6]),
/// ];
///
/// writer.write_all_vectored(bufs).await?;
/// // Note: the contents of `bufs` is now undefined, see the Notes section.
///
/// assert_eq!(writer.into_inner(), [1, 2, 3, 4, 5, 6, 0]);
/// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
/// ```
#[cfg(feature = "write_all_vectored")]
fn write_all_vectored<'a>(
&'a mut self,
bufs: &'a mut [IoSlice<'a>],
) -> WriteAllVectored<'a, Self>
where
Self: Unpin,
{
WriteAllVectored::new(self, bufs)
}

/// Wraps an [`AsyncWrite`] in a compatibility wrapper that allows it to be
/// used as a futures 0.1 / tokio-io 0.1 `AsyncWrite`.
/// Requires the `io-compat` feature to enable.
Expand All @@ -470,7 +529,6 @@ pub trait AsyncWriteExt: AsyncWrite {
Compat::new(self)
}


/// Allow using an [`AsyncWrite`] as a [`Sink`](futures_sink::Sink)`<Item: AsRef<[u8]>>`.
///
/// This adapter produces a sink that will write each value passed to it
Expand Down
2 changes: 1 addition & 1 deletion futures-util/src/io/write_all.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ impl<W: AsyncWrite + ?Sized + Unpin> Future for WriteAll<'_, W> {
this.buf = rest;
}
if n == 0 {
return Poll::Ready(Err(io::ErrorKind::WriteZero.into()))
return Poll::Ready(Err(io::ErrorKind::WriteZero.into()));
}
}

Expand Down
200 changes: 200 additions & 0 deletions futures-util/src/io/write_all_vectored.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,200 @@
use futures_core::future::Future;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Implementation and tests here look good to me, thanks!

use futures_core::task::{Context, Poll};
use futures_io::AsyncWrite;
use futures_io::IoSlice;
use std::io;
use std::mem;
use std::pin::Pin;

/// Future for the
/// [`write_all_vectored`](super::AsyncWriteExt::write_all_vectored) method.
#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct WriteAllVectored<'a, W: ?Sized + Unpin> {
writer: &'a mut W,
bufs: &'a mut [IoSlice<'a>],
}

impl<W: ?Sized + Unpin> Unpin for WriteAllVectored<'_, W> {}

impl<'a, W: AsyncWrite + ?Sized + Unpin> WriteAllVectored<'a, W> {
pub(super) fn new(writer: &'a mut W, bufs: &'a mut [IoSlice<'a>]) -> Self {
WriteAllVectored { writer, bufs }
}
}

impl<W: AsyncWrite + ?Sized + Unpin> Future for WriteAllVectored<'_, W> {
type Output = io::Result<()>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
let this = &mut *self;
while !this.bufs.is_empty() {
let n = ready!(Pin::new(&mut this.writer).poll_write_vectored(cx, this.bufs))?;
if n == 0 {
return Poll::Ready(Err(io::ErrorKind::WriteZero.into()));
} else {
this.bufs = IoSlice::advance(mem::take(&mut this.bufs), n);
}
}

Poll::Ready(Ok(()))
}
}

#[cfg(test)]
mod tests {
use std::cmp::min;
use std::future::Future;
use std::io;
use std::pin::Pin;
use std::task::{Context, Poll};

use crate::io::{AsyncWrite, AsyncWriteExt, IoSlice};
use crate::task::noop_waker;

/// Create a new writer that reads from at most `n_bufs` and reads
/// `per_call` bytes (in total) per call to write.
fn test_writer(n_bufs: usize, per_call: usize) -> TestWriter {
TestWriter {
n_bufs,
per_call,
written: Vec::new(),
}
}

// TODO: maybe move this the future-test crate?
struct TestWriter {
n_bufs: usize,
per_call: usize,
written: Vec<u8>,
}

impl AsyncWrite for TestWriter {
fn poll_write(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<io::Result<usize>> {
self.poll_write_vectored(cx, &[IoSlice::new(buf)])
}

fn poll_write_vectored(
mut self: Pin<&mut Self>,
_cx: &mut Context<'_>,
bufs: &[IoSlice<'_>],
) -> Poll<io::Result<usize>> {
let mut left = self.per_call;
let mut written = 0;
for buf in bufs.iter().take(self.n_bufs) {
let n = min(left, buf.len());
self.written.extend_from_slice(&buf[0..n]);
left -= n;
written += n;
}
Poll::Ready(Ok(written))
}

fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
Poll::Ready(Ok(()))
}

fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
Poll::Ready(Ok(()))
}
}

// TODO: maybe move this the future-test crate?
macro_rules! assert_poll_ok {
($e:expr, $expected:expr) => {
let expected = $expected;
match $e {
Poll::Ready(Ok(ok)) if ok == expected => {}
got => panic!(
"unexpected result, got: {:?}, wanted: Ready(Ok({:?}))",
got, expected
),
}
};
}

#[test]
fn test_writer_read_from_one_buf() {
let waker = noop_waker();
let mut cx = Context::from_waker(&waker);

let mut dst = test_writer(1, 2);
let mut dst = Pin::new(&mut dst);

assert_poll_ok!(dst.as_mut().poll_write(&mut cx, &[]), 0);
assert_poll_ok!(dst.as_mut().poll_write_vectored(&mut cx, &[]), 0);

// Read at most 2 bytes.
assert_poll_ok!(dst.as_mut().poll_write(&mut cx, &[1, 1, 1]), 2);
let bufs = &[IoSlice::new(&[2, 2, 2])];
assert_poll_ok!(dst.as_mut().poll_write_vectored(&mut cx, bufs), 2);

// Only read from first buf.
let bufs = &[IoSlice::new(&[3]), IoSlice::new(&[4, 4])];
assert_poll_ok!(dst.as_mut().poll_write_vectored(&mut cx, bufs), 1);

assert_eq!(dst.written, &[1, 1, 2, 2, 3]);
}

#[test]
fn test_writer_read_from_multiple_bufs() {
let waker = noop_waker();
let mut cx = Context::from_waker(&waker);

let mut dst = test_writer(3, 3);
let mut dst = Pin::new(&mut dst);

// Read at most 3 bytes from two buffers.
let bufs = &[IoSlice::new(&[1]), IoSlice::new(&[2, 2, 2])];
assert_poll_ok!(dst.as_mut().poll_write_vectored(&mut cx, bufs), 3);

// Read at most 3 bytes from three buffers.
let bufs = &[
IoSlice::new(&[3]),
IoSlice::new(&[4]),
IoSlice::new(&[5, 5]),
];
assert_poll_ok!(dst.as_mut().poll_write_vectored(&mut cx, bufs), 3);

assert_eq!(dst.written, &[1, 2, 2, 3, 4, 5]);
}

#[test]
fn test_write_all_vectored() {
let waker = noop_waker();
let mut cx = Context::from_waker(&waker);

#[rustfmt::skip] // Becomes unreadable otherwise.
let tests: Vec<(_, &'static [u8])> = vec![
(vec![], &[]),
(vec![IoSlice::new(&[1])], &[1]),
(vec![IoSlice::new(&[1, 2])], &[1, 2]),
(vec![IoSlice::new(&[1, 2, 3])], &[1, 2, 3]),
(vec![IoSlice::new(&[1, 2, 3, 4])], &[1, 2, 3, 4]),
(vec![IoSlice::new(&[1, 2, 3, 4, 5])], &[1, 2, 3, 4, 5]),
(vec![IoSlice::new(&[1]), IoSlice::new(&[2])], &[1, 2]),
(vec![IoSlice::new(&[1, 1]), IoSlice::new(&[2, 2])], &[1, 1, 2, 2]),
(vec![IoSlice::new(&[1, 1, 1]), IoSlice::new(&[2, 2, 2])], &[1, 1, 1, 2, 2, 2]),
(vec![IoSlice::new(&[1, 1, 1, 1]), IoSlice::new(&[2, 2, 2, 2])], &[1, 1, 1, 1, 2, 2, 2, 2]),
(vec![IoSlice::new(&[1]), IoSlice::new(&[2]), IoSlice::new(&[3])], &[1, 2, 3]),
(vec![IoSlice::new(&[1, 1]), IoSlice::new(&[2, 2]), IoSlice::new(&[3, 3])], &[1, 1, 2, 2, 3, 3]),
(vec![IoSlice::new(&[1, 1, 1]), IoSlice::new(&[2, 2, 2]), IoSlice::new(&[3, 3, 3])], &[1, 1, 1, 2, 2, 2, 3, 3, 3]),
];

for (mut input, wanted) in tests.into_iter() {
let mut dst = test_writer(2, 2);
{
let mut future = dst.write_all_vectored(&mut *input);
match Pin::new(&mut future).poll(&mut cx) {
Poll::Ready(Ok(())) => {}
other => panic!("unexpected result polling future: {:?}", other),
}
}
assert_eq!(&*dst.written, &*wanted);
}
}
}
1 change: 1 addition & 0 deletions futures-util/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

#![cfg_attr(feature = "cfg-target-has-atomic", feature(cfg_target_has_atomic))]
#![cfg_attr(feature = "read-initializer", feature(read_initializer))]
#![cfg_attr(feature = "write-all-vectored", feature(io_slice_advance))]

#![cfg_attr(not(feature = "std"), no_std)]
#![warn(missing_docs, missing_debug_implementations, rust_2018_idioms, unreachable_pub)]
Expand Down
1 change: 1 addition & 0 deletions futures/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ unstable = ["futures-core/unstable", "futures-task/unstable", "futures-channel/u
cfg-target-has-atomic = ["futures-core/cfg-target-has-atomic", "futures-task/cfg-target-has-atomic", "futures-channel/cfg-target-has-atomic", "futures-util/cfg-target-has-atomic"]
bilock = ["futures-util/bilock"]
read-initializer = ["futures-io/read-initializer", "futures-util/read-initializer"]
write-all-vectored = ["futures-util/write-all-vectored"]

[package.metadata.docs.rs]
all-features = true
Expand Down