From bbe67c9a599ee9f828ba455f0a3174945f47b28e Mon Sep 17 00:00:00 2001 From: KaiJewson Date: Sat, 20 Mar 2021 11:27:30 +0000 Subject: [PATCH 1/2] stream: avoid yielding in AllFuture and AnyFuture --- tokio-stream/src/stream_ext/all.rs | 17 ++++++++--------- tokio-stream/src/stream_ext/any.rs | 17 ++++++++--------- 2 files changed, 16 insertions(+), 18 deletions(-) diff --git a/tokio-stream/src/stream_ext/all.rs b/tokio-stream/src/stream_ext/all.rs index 11573f9b973..ddd10352854 100644 --- a/tokio-stream/src/stream_ext/all.rs +++ b/tokio-stream/src/stream_ext/all.rs @@ -38,18 +38,17 @@ where fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let me = self.project(); - let next = futures_core::ready!(Pin::new(me.stream).poll_next(cx)); + let mut stream = Pin::new(me.stream); - match next { - Some(v) => { - if !(me.f)(v) { - Poll::Ready(false) - } else { - cx.waker().wake_by_ref(); - Poll::Pending + loop { + match futures_core::ready!(stream.as_mut().poll_next(cx)) { + Some(v) => { + if !(me.f)(v) { + break Poll::Ready(false); + } } + None => break Poll::Ready(true), } - None => Poll::Ready(true), } } } diff --git a/tokio-stream/src/stream_ext/any.rs b/tokio-stream/src/stream_ext/any.rs index 4c4c5939483..efeeebee8a8 100644 --- a/tokio-stream/src/stream_ext/any.rs +++ b/tokio-stream/src/stream_ext/any.rs @@ -38,18 +38,17 @@ where fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let me = self.project(); - let next = futures_core::ready!(Pin::new(me.stream).poll_next(cx)); + let mut stream = Pin::new(me.stream); - match next { - Some(v) => { - if (me.f)(v) { - Poll::Ready(true) - } else { - cx.waker().wake_by_ref(); - Poll::Pending + loop { + match futures_core::ready!(stream.as_mut().poll_next(cx)) { + Some(v) => { + if (me.f)(v) { + break Poll::Ready(true); + } } + None => break Poll::Ready(false), } - None => Poll::Ready(false), } } } From 07232cfa71bb90362984736e6f50b7e29ad3817e Mon Sep 17 00:00:00 2001 From: KaiJewson Date: Sat, 20 Mar 2021 19:53:13 +0000 Subject: [PATCH 2/2] stream: yield after 32 items in `all` and `any` --- tokio-stream/src/stream_ext/all.rs | 10 +++++++--- tokio-stream/src/stream_ext/any.rs | 10 +++++++--- 2 files changed, 14 insertions(+), 6 deletions(-) diff --git a/tokio-stream/src/stream_ext/all.rs b/tokio-stream/src/stream_ext/all.rs index ddd10352854..b4dbc1e97c3 100644 --- a/tokio-stream/src/stream_ext/all.rs +++ b/tokio-stream/src/stream_ext/all.rs @@ -40,15 +40,19 @@ where let me = self.project(); let mut stream = Pin::new(me.stream); - loop { + // Take a maximum of 32 items from the stream before yielding. + for _ in 0..32 { match futures_core::ready!(stream.as_mut().poll_next(cx)) { Some(v) => { if !(me.f)(v) { - break Poll::Ready(false); + return Poll::Ready(false); } } - None => break Poll::Ready(true), + None => return Poll::Ready(true), } } + + cx.waker().wake_by_ref(); + Poll::Pending } } diff --git a/tokio-stream/src/stream_ext/any.rs b/tokio-stream/src/stream_ext/any.rs index efeeebee8a8..31394f249b8 100644 --- a/tokio-stream/src/stream_ext/any.rs +++ b/tokio-stream/src/stream_ext/any.rs @@ -40,15 +40,19 @@ where let me = self.project(); let mut stream = Pin::new(me.stream); - loop { + // Take a maximum of 32 items from the stream before yielding. + for _ in 0..32 { match futures_core::ready!(stream.as_mut().poll_next(cx)) { Some(v) => { if (me.f)(v) { - break Poll::Ready(true); + return Poll::Ready(true); } } - None => break Poll::Ready(false), + None => return Poll::Ready(false), } } + + cx.waker().wake_by_ref(); + Poll::Pending } }