diff --git a/futures-util/Cargo.toml b/futures-util/Cargo.toml index e7d532d95c..1ea067e378 100644 --- a/futures-util/Cargo.toml +++ b/futures-util/Cargo.toml @@ -30,6 +30,7 @@ unstable = ["futures-core/unstable", "futures-task/unstable"] cfg-target-has-atomic = ["futures-core/cfg-target-has-atomic", "futures-task/cfg-target-has-atomic"] bilock = [] read-initializer = ["io", "futures-io/read-initializer", "futures-io/unstable"] +write-all-vectored = ["io"] [dependencies] futures-core = { path = "../futures-core", version = "0.3.4", default-features = false } diff --git a/futures-util/src/io/mod.rs b/futures-util/src/io/mod.rs index 43f183f424..0707e72408 100644 --- a/futures-util/src/io/mod.rs +++ b/futures-util/src/io/mod.rs @@ -124,6 +124,11 @@ pub use self::write_vectored::WriteVectored; mod write_all; pub use self::write_all::WriteAll; +#[cfg(feature = "write_all_vectored")] +mod write_all_vectored; +#[cfg(feature = "write_all_vectored")] +pub use self::write_all_vectored::WriteAllVectored; + /// An extension trait which adds utility methods to `AsyncRead` types. pub trait AsyncReadExt: AsyncRead { /// Creates an adaptor which will chain this stream with another. @@ -460,6 +465,60 @@ pub trait AsyncWriteExt: AsyncWrite { WriteAll::new(self, buf) } + /// Attempts to write multiple buffers into this writer. + /// + /// Creates a future that will write the entire contents of `bufs` into this + /// `AsyncWrite` using [vectored writes]. + /// + /// The returned future will not complete until all the data has been + /// written. + /// + /// [vectored writes]: std::io::Write::write_vectored + /// + /// # Notes + /// + /// Unlike `io::Write::write_vectored`, this takes a *mutable* reference to + /// a slice of `IoSlice`s, not an immutable one. That's because we need to + /// modify the slice to keep track of the bytes already written. + /// + /// Once this futures returns, the contents of `bufs` are unspecified, as + /// this depends on how many calls to `write_vectored` were necessary. It is + /// best to understand this function as taking ownership of `bufs` and to + /// not use `bufs` afterwards. The underlying buffers, to which the + /// `IoSlice`s point (but not the `IoSlice`s themselves), are unchanged and + /// can be reused. + /// + /// # Examples + /// + /// ``` + /// # futures::executor::block_on(async { + /// use futures::io::AsyncWriteExt; + /// use std::io::{Cursor, IoSlice}; + /// + /// let mut writer = Cursor::new([0u8; 7]); + /// let bufs = &mut [ + /// IoSlice::new(&[1]), + /// IoSlice::new(&[2, 3]), + /// IoSlice::new(&[4, 5, 6]), + /// ]; + /// + /// writer.write_all_vectored(bufs).await?; + /// // Note: the contents of `bufs` is now undefined, see the Notes section. + /// + /// assert_eq!(writer.into_inner(), [1, 2, 3, 4, 5, 6, 0]); + /// # Ok::<(), Box>(()) }).unwrap(); + /// ``` + #[cfg(feature = "write_all_vectored")] + fn write_all_vectored<'a>( + &'a mut self, + bufs: &'a mut [IoSlice<'a>], + ) -> WriteAllVectored<'a, Self> + where + Self: Unpin, + { + WriteAllVectored::new(self, bufs) + } + /// Wraps an [`AsyncWrite`] in a compatibility wrapper that allows it to be /// used as a futures 0.1 / tokio-io 0.1 `AsyncWrite`. /// Requires the `io-compat` feature to enable. @@ -470,7 +529,6 @@ pub trait AsyncWriteExt: AsyncWrite { Compat::new(self) } - /// Allow using an [`AsyncWrite`] as a [`Sink`](futures_sink::Sink)`>`. /// /// This adapter produces a sink that will write each value passed to it diff --git a/futures-util/src/io/write_all.rs b/futures-util/src/io/write_all.rs index 57f1400b0e..f9ffb49ea2 100644 --- a/futures-util/src/io/write_all.rs +++ b/futures-util/src/io/write_all.rs @@ -33,7 +33,7 @@ impl Future for WriteAll<'_, W> { this.buf = rest; } if n == 0 { - return Poll::Ready(Err(io::ErrorKind::WriteZero.into())) + return Poll::Ready(Err(io::ErrorKind::WriteZero.into())); } } diff --git a/futures-util/src/io/write_all_vectored.rs b/futures-util/src/io/write_all_vectored.rs new file mode 100644 index 0000000000..fbe7e73ae5 --- /dev/null +++ b/futures-util/src/io/write_all_vectored.rs @@ -0,0 +1,200 @@ +use futures_core::future::Future; +use futures_core::task::{Context, Poll}; +use futures_io::AsyncWrite; +use futures_io::IoSlice; +use std::io; +use std::mem; +use std::pin::Pin; + +/// Future for the +/// [`write_all_vectored`](super::AsyncWriteExt::write_all_vectored) method. +#[derive(Debug)] +#[must_use = "futures do nothing unless you `.await` or poll them"] +pub struct WriteAllVectored<'a, W: ?Sized + Unpin> { + writer: &'a mut W, + bufs: &'a mut [IoSlice<'a>], +} + +impl Unpin for WriteAllVectored<'_, W> {} + +impl<'a, W: AsyncWrite + ?Sized + Unpin> WriteAllVectored<'a, W> { + pub(super) fn new(writer: &'a mut W, bufs: &'a mut [IoSlice<'a>]) -> Self { + WriteAllVectored { writer, bufs } + } +} + +impl Future for WriteAllVectored<'_, W> { + type Output = io::Result<()>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let this = &mut *self; + while !this.bufs.is_empty() { + let n = ready!(Pin::new(&mut this.writer).poll_write_vectored(cx, this.bufs))?; + if n == 0 { + return Poll::Ready(Err(io::ErrorKind::WriteZero.into())); + } else { + this.bufs = IoSlice::advance(mem::take(&mut this.bufs), n); + } + } + + Poll::Ready(Ok(())) + } +} + +#[cfg(test)] +mod tests { + use std::cmp::min; + use std::future::Future; + use std::io; + use std::pin::Pin; + use std::task::{Context, Poll}; + + use crate::io::{AsyncWrite, AsyncWriteExt, IoSlice}; + use crate::task::noop_waker; + + /// Create a new writer that reads from at most `n_bufs` and reads + /// `per_call` bytes (in total) per call to write. + fn test_writer(n_bufs: usize, per_call: usize) -> TestWriter { + TestWriter { + n_bufs, + per_call, + written: Vec::new(), + } + } + + // TODO: maybe move this the future-test crate? + struct TestWriter { + n_bufs: usize, + per_call: usize, + written: Vec, + } + + impl AsyncWrite for TestWriter { + fn poll_write( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + self.poll_write_vectored(cx, &[IoSlice::new(buf)]) + } + + fn poll_write_vectored( + mut self: Pin<&mut Self>, + _cx: &mut Context<'_>, + bufs: &[IoSlice<'_>], + ) -> Poll> { + let mut left = self.per_call; + let mut written = 0; + for buf in bufs.iter().take(self.n_bufs) { + let n = min(left, buf.len()); + self.written.extend_from_slice(&buf[0..n]); + left -= n; + written += n; + } + Poll::Ready(Ok(written)) + } + + fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } + + fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } + } + + // TODO: maybe move this the future-test crate? + macro_rules! assert_poll_ok { + ($e:expr, $expected:expr) => { + let expected = $expected; + match $e { + Poll::Ready(Ok(ok)) if ok == expected => {} + got => panic!( + "unexpected result, got: {:?}, wanted: Ready(Ok({:?}))", + got, expected + ), + } + }; + } + + #[test] + fn test_writer_read_from_one_buf() { + let waker = noop_waker(); + let mut cx = Context::from_waker(&waker); + + let mut dst = test_writer(1, 2); + let mut dst = Pin::new(&mut dst); + + assert_poll_ok!(dst.as_mut().poll_write(&mut cx, &[]), 0); + assert_poll_ok!(dst.as_mut().poll_write_vectored(&mut cx, &[]), 0); + + // Read at most 2 bytes. + assert_poll_ok!(dst.as_mut().poll_write(&mut cx, &[1, 1, 1]), 2); + let bufs = &[IoSlice::new(&[2, 2, 2])]; + assert_poll_ok!(dst.as_mut().poll_write_vectored(&mut cx, bufs), 2); + + // Only read from first buf. + let bufs = &[IoSlice::new(&[3]), IoSlice::new(&[4, 4])]; + assert_poll_ok!(dst.as_mut().poll_write_vectored(&mut cx, bufs), 1); + + assert_eq!(dst.written, &[1, 1, 2, 2, 3]); + } + + #[test] + fn test_writer_read_from_multiple_bufs() { + let waker = noop_waker(); + let mut cx = Context::from_waker(&waker); + + let mut dst = test_writer(3, 3); + let mut dst = Pin::new(&mut dst); + + // Read at most 3 bytes from two buffers. + let bufs = &[IoSlice::new(&[1]), IoSlice::new(&[2, 2, 2])]; + assert_poll_ok!(dst.as_mut().poll_write_vectored(&mut cx, bufs), 3); + + // Read at most 3 bytes from three buffers. + let bufs = &[ + IoSlice::new(&[3]), + IoSlice::new(&[4]), + IoSlice::new(&[5, 5]), + ]; + assert_poll_ok!(dst.as_mut().poll_write_vectored(&mut cx, bufs), 3); + + assert_eq!(dst.written, &[1, 2, 2, 3, 4, 5]); + } + + #[test] + fn test_write_all_vectored() { + let waker = noop_waker(); + let mut cx = Context::from_waker(&waker); + + #[rustfmt::skip] // Becomes unreadable otherwise. + let tests: Vec<(_, &'static [u8])> = vec![ + (vec![], &[]), + (vec![IoSlice::new(&[1])], &[1]), + (vec![IoSlice::new(&[1, 2])], &[1, 2]), + (vec![IoSlice::new(&[1, 2, 3])], &[1, 2, 3]), + (vec![IoSlice::new(&[1, 2, 3, 4])], &[1, 2, 3, 4]), + (vec![IoSlice::new(&[1, 2, 3, 4, 5])], &[1, 2, 3, 4, 5]), + (vec![IoSlice::new(&[1]), IoSlice::new(&[2])], &[1, 2]), + (vec![IoSlice::new(&[1, 1]), IoSlice::new(&[2, 2])], &[1, 1, 2, 2]), + (vec![IoSlice::new(&[1, 1, 1]), IoSlice::new(&[2, 2, 2])], &[1, 1, 1, 2, 2, 2]), + (vec![IoSlice::new(&[1, 1, 1, 1]), IoSlice::new(&[2, 2, 2, 2])], &[1, 1, 1, 1, 2, 2, 2, 2]), + (vec![IoSlice::new(&[1]), IoSlice::new(&[2]), IoSlice::new(&[3])], &[1, 2, 3]), + (vec![IoSlice::new(&[1, 1]), IoSlice::new(&[2, 2]), IoSlice::new(&[3, 3])], &[1, 1, 2, 2, 3, 3]), + (vec![IoSlice::new(&[1, 1, 1]), IoSlice::new(&[2, 2, 2]), IoSlice::new(&[3, 3, 3])], &[1, 1, 1, 2, 2, 2, 3, 3, 3]), + ]; + + for (mut input, wanted) in tests.into_iter() { + let mut dst = test_writer(2, 2); + { + let mut future = dst.write_all_vectored(&mut *input); + match Pin::new(&mut future).poll(&mut cx) { + Poll::Ready(Ok(())) => {} + other => panic!("unexpected result polling future: {:?}", other), + } + } + assert_eq!(&*dst.written, &*wanted); + } + } +} diff --git a/futures-util/src/lib.rs b/futures-util/src/lib.rs index 21645b148d..9657c15a40 100644 --- a/futures-util/src/lib.rs +++ b/futures-util/src/lib.rs @@ -3,6 +3,7 @@ #![cfg_attr(feature = "cfg-target-has-atomic", feature(cfg_target_has_atomic))] #![cfg_attr(feature = "read-initializer", feature(read_initializer))] +#![cfg_attr(feature = "write-all-vectored", feature(io_slice_advance))] #![cfg_attr(not(feature = "std"), no_std)] #![warn(missing_docs, missing_debug_implementations, rust_2018_idioms, unreachable_pub)] diff --git a/futures/Cargo.toml b/futures/Cargo.toml index 6619121fa4..6b30a8dde7 100644 --- a/futures/Cargo.toml +++ b/futures/Cargo.toml @@ -51,6 +51,7 @@ unstable = ["futures-core/unstable", "futures-task/unstable", "futures-channel/u cfg-target-has-atomic = ["futures-core/cfg-target-has-atomic", "futures-task/cfg-target-has-atomic", "futures-channel/cfg-target-has-atomic", "futures-util/cfg-target-has-atomic"] bilock = ["futures-util/bilock"] read-initializer = ["futures-io/read-initializer", "futures-util/read-initializer"] +write-all-vectored = ["futures-util/write-all-vectored"] [package.metadata.docs.rs] all-features = true