diff --git a/futures-util/Cargo.toml b/futures-util/Cargo.toml index e7d532d95c..dea7f67c2a 100644 --- a/futures-util/Cargo.toml +++ b/futures-util/Cargo.toml @@ -45,6 +45,7 @@ memchr = { version = "2.2", optional = true } futures_01 = { version = "0.1.25", optional = true, package = "futures" } tokio-io = { version = "0.1.9", optional = true } pin-utils = "0.1.0-alpha.4" +pin-project = "0.4.8" [dev-dependencies] futures = { path = "../futures", version = "0.3.4", features = ["async-await", "thread-pool"] } diff --git a/futures-util/src/future/future/chain.rs b/futures-util/src/future/future/chain.rs deleted file mode 100644 index 3f248e80fe..0000000000 --- a/futures-util/src/future/future/chain.rs +++ /dev/null @@ -1,58 +0,0 @@ -use core::pin::Pin; -use futures_core::future::Future; -use futures_core::task::{Context, Poll}; - -#[must_use = "futures do nothing unless you `.await` or poll them"] -#[derive(Debug)] -pub(crate) enum Chain { - First(Fut1, Option), - Second(Fut2), - Empty, -} - -impl Unpin for Chain {} - -impl Chain { - pub(crate)fn is_terminated(&self) -> bool { - if let Chain::Empty = *self { true } else { false } - } -} - -impl Chain - where Fut1: Future, - Fut2: Future, -{ - pub(crate) fn new(fut1: Fut1, data: Data) -> Chain { - Chain::First(fut1, Some(data)) - } - - pub(crate) fn poll( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - f: F, - ) -> Poll - where F: FnOnce(Fut1::Output, Data) -> Fut2, - { - let mut f = Some(f); - - // Safe to call `get_unchecked_mut` because we won't move the futures. - let this = unsafe { self.get_unchecked_mut() }; - - loop { - let (output, data) = match this { - Chain::First(fut1, data) => { - let output = ready!(unsafe { Pin::new_unchecked(fut1) }.poll(cx)); - (output, data.take().unwrap()) - } - Chain::Second(fut2) => { - return unsafe { Pin::new_unchecked(fut2) }.poll(cx); - } - Chain::Empty => unreachable!() - }; - - *this = Chain::Empty; // Drop fut1 - let fut2 = (f.take().unwrap())(output, data); - *this = Chain::Second(fut2) - } - } -} diff --git a/futures-util/src/future/future/flatten.rs b/futures-util/src/future/future/flatten.rs index 16b3a19de9..58b4ecddee 100644 --- a/futures-util/src/future/future/flatten.rs +++ b/futures-util/src/future/future/flatten.rs @@ -1,56 +1,84 @@ -use super::chain::Chain; -use core::fmt; +use core::fmt::{self, Debug}; use core::pin::Pin; use futures_core::future::{FusedFuture, Future}; use futures_core::task::{Context, Poll}; -use pin_utils::unsafe_pinned; +use pin_project::pin_project; -/// Future for the [`flatten`](super::FutureExt::flatten) method. -#[must_use = "futures do nothing unless you `.await` or poll them"] -pub struct Flatten +#[pin_project] +#[derive(Debug)] +enum InternalFlatten { + First(#[pin] Fut), + Second(#[pin] Fut::Output), + Empty, +} + +impl InternalFlatten { + fn new(future: Fut) -> Self { + Self::First(future) + } +} + +impl FusedFuture for InternalFlatten where Fut: Future, + Fut::Output: Future, { - state: Chain, + fn is_terminated(&self) -> bool { + match self { + Self::Empty => true, + _ => false, + } + } } -impl Flatten +impl Future for InternalFlatten where Fut: Future, Fut::Output: Future, { - unsafe_pinned!(state: Chain); + type Output = ::Output; - pub(super) fn new(future: Fut) -> Flatten { - Flatten { - state: Chain::new(future, ()), - } + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + Poll::Ready(loop { + match self.as_mut().project() { + __InternalFlattenProjection::First(f) => { + let f = ready!(f.poll(cx)); + self.set(Self::Second(f)); + }, + __InternalFlattenProjection::Second(f) => { + let output = ready!(f.poll(cx)); + self.set(Self::Empty); + break output; + }, + __InternalFlattenProjection::Empty => unreachable!() + } + }) } } -impl fmt::Debug for Flatten - where Fut: Future + fmt::Debug, - Fut::Output: fmt::Debug, -{ +/// Future for the [`flatten`](super::FutureExt::flatten) method. +#[must_use = "futures do nothing unless you `.await` or poll them"] +#[pin_project] +pub struct Flatten(#[pin] InternalFlatten); + +impl Debug for Flatten where Fut::Output: Debug { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("Flatten") - .field("state", &self.state) - .finish() + self.0.fmt(f) } } -impl FusedFuture for Flatten - where Fut: Future, - Fut::Output: Future, -{ - fn is_terminated(&self) -> bool { self.state.is_terminated() } +impl Flatten { + pub(super) fn new(future: Fut) -> Self { + Self(InternalFlatten::new(future)) + } } -impl Future for Flatten - where Fut: Future, - Fut::Output: Future, -{ +impl FusedFuture for Flatten where Fut::Output: Future { + fn is_terminated(&self) -> bool { self.0.is_terminated() } +} + +impl Future for Flatten where Fut::Output: Future { type Output = ::Output; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - self.state().poll(cx, |a, ()| a) + self.project().0.poll(cx) } } diff --git a/futures-util/src/future/future/mod.rs b/futures-util/src/future/future/mod.rs index e58cafc8c0..ff59907ce6 100644 --- a/futures-util/src/future/future/mod.rs +++ b/futures-util/src/future/future/mod.rs @@ -73,11 +73,6 @@ mod shared; #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 pub use self::shared::Shared; -// Implementation details - -mod chain; -pub(crate) use self::chain::Chain; - impl FutureExt for T where T: Future {} /// An extension trait for `Future`s that provides a variety of convenient @@ -137,13 +132,13 @@ pub trait FutureExt: Future { /// assert_eq!(future_of_4.await, 4); /// # }); /// ``` - fn then(self, f: F) -> Then + fn then(self, f: F) -> Then where F: FnOnce(Self::Output) -> Fut, Fut: Future, Self: Sized, { - assert_future::(Then::new(self, f)) + assert_future::(Flatten::new(Map::new(self, f))) } /// Wrap this future in an `Either` future, making it the left-hand variant diff --git a/futures-util/src/future/future/then.rs b/futures-util/src/future/future/then.rs index 9f30f09864..2cc3f4fb1c 100644 --- a/futures-util/src/future/future/then.rs +++ b/futures-util/src/future/future/then.rs @@ -1,46 +1,4 @@ -use super::Chain; -use core::pin::Pin; -use futures_core::future::{FusedFuture, Future}; -use futures_core::task::{Context, Poll}; -use pin_utils::unsafe_pinned; +use super::{Map, Flatten}; /// Future for the [`then`](super::FutureExt::then) method. -#[derive(Debug)] -#[must_use = "futures do nothing unless you `.await` or poll them"] -pub struct Then { - chain: Chain, -} - -impl Then - where Fut1: Future, - Fut2: Future, -{ - unsafe_pinned!(chain: Chain); - - /// Creates a new `Then`. - pub(super) fn new(future: Fut1, f: F) -> Then { - Then { - chain: Chain::new(future, f), - } - } -} - -impl FusedFuture for Then - where Fut1: Future, - Fut2: Future, - F: FnOnce(Fut1::Output) -> Fut2, -{ - fn is_terminated(&self) -> bool { self.chain.is_terminated() } -} - -impl Future for Then - where Fut1: Future, - Fut2: Future, - F: FnOnce(Fut1::Output) -> Fut2, -{ - type Output = Fut2::Output; - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - self.as_mut().chain().poll(cx, |output, f| f(output)) - } -} +pub type Then = Flatten>;