From 69b129b405518e60e51a26b6a95428878331a4c8 Mon Sep 17 00:00:00 2001 From: Kai Jewson Date: Sun, 21 Mar 2021 08:34:18 +0000 Subject: [PATCH] stream: avoid yielding in AllFuture and AnyFuture (#3625) --- tokio-stream/src/stream_ext/all.rs | 21 ++++++++++++--------- tokio-stream/src/stream_ext/any.rs | 21 ++++++++++++--------- 2 files changed, 24 insertions(+), 18 deletions(-) diff --git a/tokio-stream/src/stream_ext/all.rs b/tokio-stream/src/stream_ext/all.rs index 11573f9b973..b4dbc1e97c3 100644 --- a/tokio-stream/src/stream_ext/all.rs +++ b/tokio-stream/src/stream_ext/all.rs @@ -38,18 +38,21 @@ where fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let me = self.project(); - let next = futures_core::ready!(Pin::new(me.stream).poll_next(cx)); + let mut stream = Pin::new(me.stream); - match next { - Some(v) => { - if !(me.f)(v) { - Poll::Ready(false) - } else { - cx.waker().wake_by_ref(); - Poll::Pending + // Take a maximum of 32 items from the stream before yielding. + for _ in 0..32 { + match futures_core::ready!(stream.as_mut().poll_next(cx)) { + Some(v) => { + if !(me.f)(v) { + return Poll::Ready(false); + } } + None => return Poll::Ready(true), } - None => Poll::Ready(true), } + + cx.waker().wake_by_ref(); + Poll::Pending } } diff --git a/tokio-stream/src/stream_ext/any.rs b/tokio-stream/src/stream_ext/any.rs index 4c4c5939483..31394f249b8 100644 --- a/tokio-stream/src/stream_ext/any.rs +++ b/tokio-stream/src/stream_ext/any.rs @@ -38,18 +38,21 @@ where fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let me = self.project(); - let next = futures_core::ready!(Pin::new(me.stream).poll_next(cx)); + let mut stream = Pin::new(me.stream); - match next { - Some(v) => { - if (me.f)(v) { - Poll::Ready(true) - } else { - cx.waker().wake_by_ref(); - Poll::Pending + // Take a maximum of 32 items from the stream before yielding. + for _ in 0..32 { + match futures_core::ready!(stream.as_mut().poll_next(cx)) { + Some(v) => { + if (me.f)(v) { + return Poll::Ready(true); + } } + None => return Poll::Ready(false), } - None => Poll::Ready(false), } + + cx.waker().wake_by_ref(); + Poll::Pending } }