Skip to content

Commit

Permalink
Add AsyncWriteExt::write_all_vectored utility
Browse files Browse the repository at this point in the history
This adds a new feature to future-util, named write_all_vectored, to
enable the utility since it requires the unstable io_slice_advance
Rust feature.

This matches the same API found in io::Write::write_all_vectored in the
std lib.
  • Loading branch information
Thomasdezeeuw authored and cramertj committed Apr 22, 2020
1 parent 28dfa30 commit d7d8216
Show file tree
Hide file tree
Showing 6 changed files with 263 additions and 2 deletions.
1 change: 1 addition & 0 deletions futures-util/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ unstable = ["futures-core/unstable", "futures-task/unstable"]
cfg-target-has-atomic = ["futures-core/cfg-target-has-atomic", "futures-task/cfg-target-has-atomic"]
bilock = []
read-initializer = ["io", "futures-io/read-initializer", "futures-io/unstable"]
write-all-vectored = ["io"]

[dependencies]
futures-core = { path = "../futures-core", version = "0.3.4", default-features = false }
Expand Down
60 changes: 59 additions & 1 deletion futures-util/src/io/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,11 @@ pub use self::write_vectored::WriteVectored;
mod write_all;
pub use self::write_all::WriteAll;

#[cfg(feature = "write_all_vectored")]
mod write_all_vectored;
#[cfg(feature = "write_all_vectored")]
pub use self::write_all_vectored::WriteAllVectored;

/// An extension trait which adds utility methods to `AsyncRead` types.
pub trait AsyncReadExt: AsyncRead {
/// Creates an adaptor which will chain this stream with another.
Expand Down Expand Up @@ -460,6 +465,60 @@ pub trait AsyncWriteExt: AsyncWrite {
WriteAll::new(self, buf)
}

/// Attempts to write multiple buffers into this writer.
///
/// Creates a future that will write the entire contents of `bufs` into this
/// `AsyncWrite` using [vectored writes].
///
/// The returned future will not complete until all the data has been
/// written.
///
/// [vectored writes]: std::io::Write::write_vectored
///
/// # Notes
///
/// Unlike `io::Write::write_vectored`, this takes a *mutable* reference to
/// a slice of `IoSlice`s, not an immutable one. That's because we need to
/// modify the slice to keep track of the bytes already written.
///
/// Once this futures returns, the contents of `bufs` are unspecified, as
/// this depends on how many calls to `write_vectored` were necessary. It is
/// best to understand this function as taking ownership of `bufs` and to
/// not use `bufs` afterwards. The underlying buffers, to which the
/// `IoSlice`s point (but not the `IoSlice`s themselves), are unchanged and
/// can be reused.
///
/// # Examples
///
/// ```
/// # futures::executor::block_on(async {
/// use futures::io::AsyncWriteExt;
/// use std::io::{Cursor, IoSlice};
///
/// let mut writer = Cursor::new([0u8; 7]);
/// let bufs = &mut [
/// IoSlice::new(&[1]),
/// IoSlice::new(&[2, 3]),
/// IoSlice::new(&[4, 5, 6]),
/// ];
///
/// writer.write_all_vectored(bufs).await?;
/// // Note: the contents of `bufs` is now undefined, see the Notes section.
///
/// assert_eq!(writer.into_inner(), [1, 2, 3, 4, 5, 6, 0]);
/// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
/// ```
#[cfg(feature = "write_all_vectored")]
fn write_all_vectored<'a>(
&'a mut self,
bufs: &'a mut [IoSlice<'a>],
) -> WriteAllVectored<'a, Self>
where
Self: Unpin,
{
WriteAllVectored::new(self, bufs)
}

/// Wraps an [`AsyncWrite`] in a compatibility wrapper that allows it to be
/// used as a futures 0.1 / tokio-io 0.1 `AsyncWrite`.
/// Requires the `io-compat` feature to enable.
Expand All @@ -470,7 +529,6 @@ pub trait AsyncWriteExt: AsyncWrite {
Compat::new(self)
}


/// Allow using an [`AsyncWrite`] as a [`Sink`](futures_sink::Sink)`<Item: AsRef<[u8]>>`.
///
/// This adapter produces a sink that will write each value passed to it
Expand Down
2 changes: 1 addition & 1 deletion futures-util/src/io/write_all.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ impl<W: AsyncWrite + ?Sized + Unpin> Future for WriteAll<'_, W> {
this.buf = rest;
}
if n == 0 {
return Poll::Ready(Err(io::ErrorKind::WriteZero.into()))
return Poll::Ready(Err(io::ErrorKind::WriteZero.into()));
}
}

Expand Down
200 changes: 200 additions & 0 deletions futures-util/src/io/write_all_vectored.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,200 @@
use futures_core::future::Future;
use futures_core::task::{Context, Poll};
use futures_io::AsyncWrite;
use futures_io::IoSlice;
use std::io;
use std::mem;
use std::pin::Pin;

/// Future for the
/// [`write_all_vectored`](super::AsyncWriteExt::write_all_vectored) method.
#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct WriteAllVectored<'a, W: ?Sized + Unpin> {
writer: &'a mut W,
bufs: &'a mut [IoSlice<'a>],
}

impl<W: ?Sized + Unpin> Unpin for WriteAllVectored<'_, W> {}

impl<'a, W: AsyncWrite + ?Sized + Unpin> WriteAllVectored<'a, W> {
pub(super) fn new(writer: &'a mut W, bufs: &'a mut [IoSlice<'a>]) -> Self {
WriteAllVectored { writer, bufs }
}
}

impl<W: AsyncWrite + ?Sized + Unpin> Future for WriteAllVectored<'_, W> {
type Output = io::Result<()>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
let this = &mut *self;
while !this.bufs.is_empty() {
let n = ready!(Pin::new(&mut this.writer).poll_write_vectored(cx, this.bufs))?;
if n == 0 {
return Poll::Ready(Err(io::ErrorKind::WriteZero.into()));
} else {
this.bufs = IoSlice::advance(mem::take(&mut this.bufs), n);
}
}

Poll::Ready(Ok(()))
}
}

#[cfg(test)]
mod tests {
use std::cmp::min;
use std::future::Future;
use std::io;
use std::pin::Pin;
use std::task::{Context, Poll};

use crate::io::{AsyncWrite, AsyncWriteExt, IoSlice};
use crate::task::noop_waker;

/// Create a new writer that reads from at most `n_bufs` and reads
/// `per_call` bytes (in total) per call to write.
fn test_writer(n_bufs: usize, per_call: usize) -> TestWriter {
TestWriter {
n_bufs,
per_call,
written: Vec::new(),
}
}

// TODO: maybe move this the future-test crate?
struct TestWriter {
n_bufs: usize,
per_call: usize,
written: Vec<u8>,
}

impl AsyncWrite for TestWriter {
fn poll_write(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<io::Result<usize>> {
self.poll_write_vectored(cx, &[IoSlice::new(buf)])
}

fn poll_write_vectored(
mut self: Pin<&mut Self>,
_cx: &mut Context<'_>,
bufs: &[IoSlice<'_>],
) -> Poll<io::Result<usize>> {
let mut left = self.per_call;
let mut written = 0;
for buf in bufs.iter().take(self.n_bufs) {
let n = min(left, buf.len());
self.written.extend_from_slice(&buf[0..n]);
left -= n;
written += n;
}
Poll::Ready(Ok(written))
}

fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
Poll::Ready(Ok(()))
}

fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
Poll::Ready(Ok(()))
}
}

// TODO: maybe move this the future-test crate?
macro_rules! assert_poll_ok {
($e:expr, $expected:expr) => {
let expected = $expected;
match $e {
Poll::Ready(Ok(ok)) if ok == expected => {}
got => panic!(
"unexpected result, got: {:?}, wanted: Ready(Ok({:?}))",
got, expected
),
}
};
}

#[test]
fn test_writer_read_from_one_buf() {
let waker = noop_waker();
let mut cx = Context::from_waker(&waker);

let mut dst = test_writer(1, 2);
let mut dst = Pin::new(&mut dst);

assert_poll_ok!(dst.as_mut().poll_write(&mut cx, &[]), 0);
assert_poll_ok!(dst.as_mut().poll_write_vectored(&mut cx, &[]), 0);

// Read at most 2 bytes.
assert_poll_ok!(dst.as_mut().poll_write(&mut cx, &[1, 1, 1]), 2);
let bufs = &[IoSlice::new(&[2, 2, 2])];
assert_poll_ok!(dst.as_mut().poll_write_vectored(&mut cx, bufs), 2);

// Only read from first buf.
let bufs = &[IoSlice::new(&[3]), IoSlice::new(&[4, 4])];
assert_poll_ok!(dst.as_mut().poll_write_vectored(&mut cx, bufs), 1);

assert_eq!(dst.written, &[1, 1, 2, 2, 3]);
}

#[test]
fn test_writer_read_from_multiple_bufs() {
let waker = noop_waker();
let mut cx = Context::from_waker(&waker);

let mut dst = test_writer(3, 3);
let mut dst = Pin::new(&mut dst);

// Read at most 3 bytes from two buffers.
let bufs = &[IoSlice::new(&[1]), IoSlice::new(&[2, 2, 2])];
assert_poll_ok!(dst.as_mut().poll_write_vectored(&mut cx, bufs), 3);

// Read at most 3 bytes from three buffers.
let bufs = &[
IoSlice::new(&[3]),
IoSlice::new(&[4]),
IoSlice::new(&[5, 5]),
];
assert_poll_ok!(dst.as_mut().poll_write_vectored(&mut cx, bufs), 3);

assert_eq!(dst.written, &[1, 2, 2, 3, 4, 5]);
}

#[test]
fn test_write_all_vectored() {
let waker = noop_waker();
let mut cx = Context::from_waker(&waker);

#[rustfmt::skip] // Becomes unreadable otherwise.
let tests: Vec<(_, &'static [u8])> = vec![
(vec![], &[]),
(vec![IoSlice::new(&[1])], &[1]),
(vec![IoSlice::new(&[1, 2])], &[1, 2]),
(vec![IoSlice::new(&[1, 2, 3])], &[1, 2, 3]),
(vec![IoSlice::new(&[1, 2, 3, 4])], &[1, 2, 3, 4]),
(vec![IoSlice::new(&[1, 2, 3, 4, 5])], &[1, 2, 3, 4, 5]),
(vec![IoSlice::new(&[1]), IoSlice::new(&[2])], &[1, 2]),
(vec![IoSlice::new(&[1, 1]), IoSlice::new(&[2, 2])], &[1, 1, 2, 2]),
(vec![IoSlice::new(&[1, 1, 1]), IoSlice::new(&[2, 2, 2])], &[1, 1, 1, 2, 2, 2]),
(vec![IoSlice::new(&[1, 1, 1, 1]), IoSlice::new(&[2, 2, 2, 2])], &[1, 1, 1, 1, 2, 2, 2, 2]),
(vec![IoSlice::new(&[1]), IoSlice::new(&[2]), IoSlice::new(&[3])], &[1, 2, 3]),
(vec![IoSlice::new(&[1, 1]), IoSlice::new(&[2, 2]), IoSlice::new(&[3, 3])], &[1, 1, 2, 2, 3, 3]),
(vec![IoSlice::new(&[1, 1, 1]), IoSlice::new(&[2, 2, 2]), IoSlice::new(&[3, 3, 3])], &[1, 1, 1, 2, 2, 2, 3, 3, 3]),
];

for (mut input, wanted) in tests.into_iter() {
let mut dst = test_writer(2, 2);
{
let mut future = dst.write_all_vectored(&mut *input);
match Pin::new(&mut future).poll(&mut cx) {
Poll::Ready(Ok(())) => {}
other => panic!("unexpected result polling future: {:?}", other),
}
}
assert_eq!(&*dst.written, &*wanted);
}
}
}
1 change: 1 addition & 0 deletions futures-util/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

#![cfg_attr(feature = "cfg-target-has-atomic", feature(cfg_target_has_atomic))]
#![cfg_attr(feature = "read-initializer", feature(read_initializer))]
#![cfg_attr(feature = "write-all-vectored", feature(io_slice_advance))]

#![cfg_attr(not(feature = "std"), no_std)]
#![warn(missing_docs, missing_debug_implementations, rust_2018_idioms, unreachable_pub)]
Expand Down
1 change: 1 addition & 0 deletions futures/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ unstable = ["futures-core/unstable", "futures-task/unstable", "futures-channel/u
cfg-target-has-atomic = ["futures-core/cfg-target-has-atomic", "futures-task/cfg-target-has-atomic", "futures-channel/cfg-target-has-atomic", "futures-util/cfg-target-has-atomic"]
bilock = ["futures-util/bilock"]
read-initializer = ["futures-io/read-initializer", "futures-util/read-initializer"]
write-all-vectored = ["futures-util/write-all-vectored"]

[package.metadata.docs.rs]
all-features = true
Expand Down

0 comments on commit d7d8216

Please sign in to comment.