diff --git a/tokio-stream/src/stream_ext/all.rs b/tokio-stream/src/stream_ext/all.rs index 11573f9b973..b4dbc1e97c3 100644 --- a/tokio-stream/src/stream_ext/all.rs +++ b/tokio-stream/src/stream_ext/all.rs @@ -38,18 +38,21 @@ where fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let me = self.project(); - let next = futures_core::ready!(Pin::new(me.stream).poll_next(cx)); + let mut stream = Pin::new(me.stream); - match next { - Some(v) => { - if !(me.f)(v) { - Poll::Ready(false) - } else { - cx.waker().wake_by_ref(); - Poll::Pending + // Take a maximum of 32 items from the stream before yielding. + for _ in 0..32 { + match futures_core::ready!(stream.as_mut().poll_next(cx)) { + Some(v) => { + if !(me.f)(v) { + return Poll::Ready(false); + } } + None => return Poll::Ready(true), } - None => Poll::Ready(true), } + + cx.waker().wake_by_ref(); + Poll::Pending } } diff --git a/tokio-stream/src/stream_ext/any.rs b/tokio-stream/src/stream_ext/any.rs index 4c4c5939483..31394f249b8 100644 --- a/tokio-stream/src/stream_ext/any.rs +++ b/tokio-stream/src/stream_ext/any.rs @@ -38,18 +38,21 @@ where fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let me = self.project(); - let next = futures_core::ready!(Pin::new(me.stream).poll_next(cx)); + let mut stream = Pin::new(me.stream); - match next { - Some(v) => { - if (me.f)(v) { - Poll::Ready(true) - } else { - cx.waker().wake_by_ref(); - Poll::Pending + // Take a maximum of 32 items from the stream before yielding. + for _ in 0..32 { + match futures_core::ready!(stream.as_mut().poll_next(cx)) { + Some(v) => { + if (me.f)(v) { + return Poll::Ready(true); + } } + None => return Poll::Ready(false), } - None => Poll::Ready(false), } + + cx.waker().wake_by_ref(); + Poll::Pending } }