From d30d6a1e536d041560ce3e5d2fc19ac9f342ef1f Mon Sep 17 00:00:00 2001 From: Alex Crichton Date: Thu, 10 Nov 2016 10:55:24 -0800 Subject: [PATCH] Add a select combinator for streams Similar to merge but requires the items/errors to be the same and then only yields one at a time. Closes #239 --- src/stream/mod.rs | 22 ++++++++++++++- src/stream/select.rs | 64 ++++++++++++++++++++++++++++++++++++++++++++ tests/stream.rs | 15 +++++++++++ 3 files changed, 100 insertions(+), 1 deletion(-) create mode 100644 src/stream/select.rs diff --git a/src/stream/mod.rs b/src/stream/mod.rs index 89a57b6df81..1d6fee0a364 100755 --- a/src/stream/mod.rs +++ b/src/stream/mod.rs @@ -31,6 +31,7 @@ mod merge; mod once; mod or_else; mod peek; +mod select; mod skip; mod skip_while; mod take; @@ -50,12 +51,13 @@ pub use self::map_err::MapErr; pub use self::merge::{Merge, MergedItem}; pub use self::once::{Once, once}; pub use self::or_else::OrElse; +pub use self::peek::Peekable; +pub use self::select::Select; pub use self::skip::Skip; pub use self::skip_while::SkipWhile; pub use self::take::Take; pub use self::then::Then; pub use self::zip::Zip; -pub use self::peek::Peekable; if_std! { use std; @@ -801,6 +803,24 @@ pub trait Stream { { chunks::new(self, capacity) } + + /// Creates a stream that selects the next element from either this stream + /// or the provided one, whichever is ready first. + /// + /// This combinator will attempt to pull items from both streams. Each + /// stream will be polled in a round-robin fashion, and whenever a stream is + /// ready to yield an item that item is yielded. + /// + /// The `select` function is similar to `merge` except that it requires both + /// streams to have the same item and error types. + /// + /// Error are passed through from either stream. + fn select(self, other: S) -> Select + where S: Stream, + Self: Sized, + { + select::new(self, other) + } } impl<'a, S: ?Sized + Stream> Stream for &'a mut S { diff --git a/src/stream/select.rs b/src/stream/select.rs new file mode 100644 index 00000000000..f0bce2c18d4 --- /dev/null +++ b/src/stream/select.rs @@ -0,0 +1,64 @@ +use {Poll, Async}; +use stream::{Stream, Fuse}; + +/// An adapter for merging the output of two streams. +/// +/// The merged stream produces items from either of the underlying streams as +/// they become available, and the streams are polled in a round-robin fashion. +/// Errors, however, are not merged: you get at most one error at a time. +#[must_use = "streams do nothing unless polled"] +pub struct Select { + stream1: Fuse, + stream2: Fuse, + flag: bool, +} + +pub fn new(stream1: S1, stream2: S2) -> Select + where S1: Stream, + S2: Stream +{ + Select { + stream1: stream1.fuse(), + stream2: stream2.fuse(), + flag: false, + } +} + +impl Stream for Select + where S1: Stream, + S2: Stream +{ + type Item = S1::Item; + type Error = S1::Error; + + fn poll(&mut self) -> Poll, S1::Error> { + let (a, b) = if self.flag { + (&mut self.stream2 as &mut Stream, + &mut self.stream1 as &mut Stream) + } else { + (&mut self.stream1 as &mut Stream, + &mut self.stream2 as &mut Stream) + }; + self.flag = !self.flag; + + let a_done = match try!(a.poll()) { + Async::Ready(Some(item)) => return Ok(Some(item).into()), + Async::Ready(None) => true, + Async::NotReady => false, + }; + + match try!(b.poll()) { + Async::Ready(Some(item)) => { + // If the other stream isn't finished yet, give them a chance to + // go first next time as we pulled something off `b`. + if !a_done { + self.flag = !self.flag; + } + return Ok(Some(item).into()) + } + Async::Ready(None) if a_done => Ok(None.into()), + Async::Ready(None) => Ok(Async::NotReady), + Async::NotReady => Ok(Async::NotReady), + } + } +} diff --git a/tests/stream.rs b/tests/stream.rs index 695f71d27c3..c5cb98f71d0 100644 --- a/tests/stream.rs +++ b/tests/stream.rs @@ -287,3 +287,18 @@ fn chunks() { fn chunks_panic_on_cap_zero() { let _ = list().chunks(0); } + +#[test] +fn select() { + let a = iter(vec![Ok::<_, u32>(1), Ok(2), Ok(3)]); + let b = iter(vec![Ok(4), Ok(5), Ok(6)]); + assert_done(|| a.select(b).collect(), Ok(vec![1, 4, 2, 5, 3, 6])); + + let a = iter(vec![Ok::<_, u32>(1), Ok(2), Ok(3)]); + let b = iter(vec![Ok(1), Ok(2)]); + assert_done(|| a.select(b).collect(), Ok(vec![1, 1, 2, 2, 3])); + + let a = iter(vec![Ok(1), Ok(2)]); + let b = iter(vec![Ok::<_, u32>(1), Ok(2), Ok(3)]); + assert_done(|| a.select(b).collect(), Ok(vec![1, 1, 2, 2, 3])); +}