Skip to content

Commit

Permalink
stream: avoid yielding in AllFuture and AnyFuture (#3625)
Browse files Browse the repository at this point in the history
  • Loading branch information
Kai Jewson committed Mar 21, 2021
1 parent 0bfcbc8 commit 69b129b
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 18 deletions.
21 changes: 12 additions & 9 deletions tokio-stream/src/stream_ext/all.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,18 +38,21 @@ where

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let me = self.project();
let next = futures_core::ready!(Pin::new(me.stream).poll_next(cx));
let mut stream = Pin::new(me.stream);

match next {
Some(v) => {
if !(me.f)(v) {
Poll::Ready(false)
} else {
cx.waker().wake_by_ref();
Poll::Pending
// Take a maximum of 32 items from the stream before yielding.
for _ in 0..32 {
match futures_core::ready!(stream.as_mut().poll_next(cx)) {
Some(v) => {
if !(me.f)(v) {
return Poll::Ready(false);
}
}
None => return Poll::Ready(true),
}
None => Poll::Ready(true),
}

cx.waker().wake_by_ref();
Poll::Pending
}
}
21 changes: 12 additions & 9 deletions tokio-stream/src/stream_ext/any.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,18 +38,21 @@ where

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let me = self.project();
let next = futures_core::ready!(Pin::new(me.stream).poll_next(cx));
let mut stream = Pin::new(me.stream);

match next {
Some(v) => {
if (me.f)(v) {
Poll::Ready(true)
} else {
cx.waker().wake_by_ref();
Poll::Pending
// Take a maximum of 32 items from the stream before yielding.
for _ in 0..32 {
match futures_core::ready!(stream.as_mut().poll_next(cx)) {
Some(v) => {
if (me.f)(v) {
return Poll::Ready(true);
}
}
None => return Poll::Ready(false),
}
None => Poll::Ready(false),
}

cx.waker().wake_by_ref();
Poll::Pending
}
}

0 comments on commit 69b129b

Please sign in to comment.