diff --git a/futures-util/src/stream/stream/for_each.rs b/futures-util/src/stream/stream/for_each.rs index c8af21becd..d27f8c541b 100644 --- a/futures-util/src/stream/stream/for_each.rs +++ b/futures-util/src/stream/stream/for_each.rs @@ -30,9 +30,10 @@ where } impl ForEach -where St: Stream, - F: FnMut(St::Item) -> Fut, - Fut: Future, +where + St: Stream, + F: FnMut(St::Item) -> Fut, + Fut: Future, { pub(super) fn new(stream: St, f: F) -> ForEach { ForEach { @@ -44,9 +45,10 @@ where St: Stream, } impl FusedFuture for ForEach - where St: FusedStream, - F: FnMut(St::Item) -> Fut, - Fut: Future, +where + St: FusedStream, + F: FnMut(St::Item) -> Fut, + Fut: Future, { fn is_terminated(&self) -> bool { self.future.is_none() && self.stream.is_terminated() @@ -54,24 +56,27 @@ impl FusedFuture for ForEach } impl Future for ForEach - where St: Stream, - F: FnMut(St::Item) -> Fut, - Fut: Future, +where + St: Stream, + F: FnMut(St::Item) -> Fut, + Fut: Future, { type Output = (); fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> { let mut this = self.project(); - loop { - if let Some(fut) = this.future.as_mut().as_pin_mut() { - ready!(fut.poll(cx)); - this.future.set(None); - } else if let Some(item) = ready!(this.stream.as_mut().poll_next(cx)) { - this.future.set(Some((this.f)(item))); - } else { - break; - } + + if let Some(fut) = this.future.as_mut().as_pin_mut() { + ready!(fut.poll(cx)); + cx.waker().wake_by_ref(); + this.future.set(None); + Poll::Pending + } else if let Some(item) = ready!(this.stream.as_mut().poll_next(cx)) { + cx.waker().wake_by_ref(); + this.future.set(Some((this.f)(item))); + Poll::Pending + } else { + Poll::Ready(()) } - Poll::Ready(()) } }