Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

More tests 4 #1069

Merged
merged 36 commits into from
Jul 9, 2018
Merged
Show file tree
Hide file tree
Changes from 25 commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
d4c271c
Remove no longer relevant FIXME
MajorBreakfast Jul 4, 2018
aa84e98
Fix send all
MajorBreakfast Jul 4, 2018
c6362e1
Test for split
MajorBreakfast Jul 4, 2018
b6ba956
Restructure macros
MajorBreakfast Jul 4, 2018
c656939
`FutureExt::inspect` test
MajorBreakfast Jul 4, 2018
0661769
Scope free functions
MajorBreakfast Jul 4, 2018
df75825
recurse test
MajorBreakfast Jul 4, 2018
468943b
Same features attribute for all the tests
MajorBreakfast Jul 4, 2018
824a29f
Remove `extern crate futures;` from tests
MajorBreakfast Jul 4, 2018
0c74e93
Extract contents of await module into submodules
MajorBreakfast Jul 4, 2018
30a17a0
Split test support module into submodules
MajorBreakfast Jul 4, 2018
b5c3aa5
Unfold test
MajorBreakfast Jul 4, 2018
337b8c0
Remove unused macro use warning
MajorBreakfast Jul 4, 2018
05a6b99
Rename `await` module to `async_await`
MajorBreakfast Jul 4, 2018
0ce2116
Refactored various combinators, added eager_drop test
MajorBreakfast Jul 5, 2018
73d454e
Remove unnecessary option in `Chain`
MajorBreakfast Jul 6, 2018
ef01e40
Name `TryChain` callback `op` becaue it is not always async
MajorBreakfast Jul 6, 2018
34e20bc
Restrict visibility of combinator functions
MajorBreakfast Jul 6, 2018
cf8bccb
Move test file `enventual.rs`
MajorBreakfast Jul 6, 2018
785e3ac
Implement tests from eventual.rs
MajorBreakfast Jul 6, 2018
3c7400c
Some name changes
MajorBreakfast Jul 6, 2018
5c39182
Remove accidentally included tests from oneshot.rs
MajorBreakfast Jul 6, 2018
2a78234
Align naming with mpsc::channel() docs
MajorBreakfast Jul 6, 2018
39c3e47
Compatibility with new nightly: FutureObj
MajorBreakfast Jul 7, 2018
3722454
Make docs build, light refactoring, enable tests in CI
MajorBreakfast Jul 7, 2018
4f1653f
Streamline `use` statements
MajorBreakfast Jul 7, 2018
4e7c924
`result()` is now called `ready()`
MajorBreakfast Jul 7, 2018
dd92921
Tweak macro documentation and document pin macros
MajorBreakfast Jul 7, 2018
cc3c9b0
Ensure all future and stream reexports
MajorBreakfast Jul 7, 2018
ad744ac
Add benchmarks and no_std build to CI
MajorBreakfast Jul 7, 2018
9cb2f85
Final tweaks to use statement order
MajorBreakfast Jul 8, 2018
b66bfdf
doc(hidden) top-level reexports
MajorBreakfast Jul 9, 2018
ade4c92
futures_core hidden top level exports
MajorBreakfast Jul 9, 2018
0f82429
task::Context
MajorBreakfast Jul 9, 2018
a89c0dd
Remove `Context` from top-level exports
MajorBreakfast Jul 9, 2018
a5222ba
Change to `task::Context` in docs for `oneshot`
MajorBreakfast Jul 9, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,9 @@ matrix:
# - cargo test --manifest-path futures/testcrate/Cargo.toml

script:
- cargo build --all # for now build only; tests require the full suite of crates
# - cargo test --all
# - cargo test --all --release
- cargo build --all
- cargo test --all
- cargo test --all --release

env:
global:
Expand Down
2 changes: 1 addition & 1 deletion futures-channel/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
#![deny(missing_docs, missing_debug_implementations, warnings)]
#![deny(bare_trait_objects)]

#![doc(html_root_url = "https://docs.rs/futures-channel/0.2.0")]
#![doc(html_root_url = "https://docs.rs/futures-channel/0.3.0-alpha")]

#[cfg(feature = "std")]
extern crate std;
Expand Down
77 changes: 47 additions & 30 deletions futures-channel/src/mpsc/mod.rs
Original file line number Diff line number Diff line change
@@ -1,38 +1,42 @@
//! A multi-producer, single-consumer queue for sending values across
//! asynchronous tasks.
//!
//! Similarly to the `std`, channel creation provides [`Receiver`](Receiver) and
//! [`Sender`](Sender) handles. [`Receiver`](Receiver) implements
//! [`Stream`](futures_core::Stream) and allows a task to read values out of the
//! channel. If there is no message to read from the channel, the current task
//! will be notified when a new value is sent. [`Sender`](Sender) implements the
//! `Sink` trait and allows a task to send messages into
//! Similarly to the `std`, channel creation provides [`Receiver`] and
//! [`Sender`] handles. [`Receiver`] implements [`Stream`] and allows a task to
//! read values out of the channel. If there is no message to read from the
//! channel, the current task will be notified when a new value is sent.
//! [`Sender`] implements the `Sink` trait and allows a task to send messages into
//! the channel. If the channel is at capacity, the send will be rejected and
//! the task will be notified when additional capacity is available. In other
//! words, the channel provides backpressure.
//!
//! Unbounded channels are also available using the [`unbounded`](unbounded)
//! constructor.
//! Unbounded channels are also available using the `unbounded` constructor.
//!
//! # Disconnection
//!
//! When all [`Sender`](Sender) handles have been dropped, it is no longer
//! When all [`Sender`] handles have been dropped, it is no longer
//! possible to send values into the channel. This is considered the termination
//! event of the stream. As such, [`Receiver::poll_next`](Receiver::poll_next)
//! event of the stream. As such, [`Receiver::poll_next`]
//! will return `Ok(Ready(None))`.
//!
//! If the [`Receiver`](Receiver) handle is dropped, then messages can no longer
//! If the [`Receiver`] handle is dropped, then messages can no longer
//! be read out of the channel. In this case, all further attempts to send will
//! result in an error.
//!
//! # Clean Shutdown
//!
//! If the [`Receiver`](Receiver) is simply dropped, then it is possible for
//! If the [`Receiver`] is simply dropped, then it is possible for
//! there to be messages still in the channel that will not be processed. As
//! such, it is usually desirable to perform a "clean" shutdown. To do this, the
//! receiver will first call `close`, which will prevent any further messages to
//! be sent into the channel. Then, the receiver consumes the channel to
//! completion, at which point the receiver can be dropped.
//!
//! [`Sender`]: struct.Sender.html
//! [`Receiver`]: struct.Receiver.html
//! [`Stream`]: ../../futures_core/stream/trait.Stream.html
//! [`Receiver::poll_next`]:
//! ../../futures_core/stream/trait.Stream.html#tymethod.poll_next
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added these links like so. I wasn't able to make it build otherwise. Seems to be a bug in all //! comments


// At the core, the channel uses an atomic FIFO queue for message passing. This
// queue is used as the primary coordination primitive. In order to enforce
Expand Down Expand Up @@ -74,20 +78,19 @@
// happens-before semantics required for the acquire / release semantics used
// by the queue structure.

use std::mem::PinMut;
use std::marker::Unpin;
use std::fmt;
use std::error::Error;
use futures_core::stream::Stream;
use futures_core::task::{self, Waker, Poll};
use std::any::Any;
use std::error::Error;
use std::fmt;
use std::marker::Unpin;
use std::mem::PinMut;
use std::sync::{Arc, Mutex};
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering::SeqCst;
use std::sync::{Arc, Mutex};
use std::thread;
use std::usize;

use futures_core::task::{self, Waker};
use futures_core::{Poll, Stream};

use crate::mpsc::queue::{Queue, PopResult};

mod queue;
Expand Down Expand Up @@ -371,7 +374,8 @@ pub fn channel<T>(buffer: usize) -> (Sender<T>, Receiver<T>) {
channel2(Some(buffer))
}

/// Creates an unbounded mpsc channel for communicating between asynchronous tasks.
/// Creates an unbounded mpsc channel for communicating between asynchronous
/// tasks.
///
/// A `send` on this channel will always succeed as long as the receive half has
/// not been closed. If the receiver falls behind, messages will be arbitrarily
Expand Down Expand Up @@ -556,8 +560,8 @@ impl<T> Sender<T> {
// This probably is never hit? Odds are the process will run out of
// memory first. It may be worth to return something else in this
// case?
assert!(state.num_messages < MAX_CAPACITY, "buffer space exhausted; \
sending this messages would overflow the state");
assert!(state.num_messages < MAX_CAPACITY, "buffer space \
exhausted; sending this messages would overflow the state");

state.num_messages += 1;

Expand Down Expand Up @@ -642,9 +646,13 @@ impl<T> Sender<T> {
///
/// - `Ok(Async::Ready(_))` if there is sufficient capacity;
/// - `Ok(Async::Pending)` if the channel may not have
/// capacity, in which case the current task is queued to be notified once capacity is available;
/// capacity, in which case the current task is queued to be notified once
/// capacity is available;
/// - `Err(SendError)` if the receiver has been dropped.
pub fn poll_ready(&mut self, cx: &mut task::Context) -> Poll<Result<(), SendError>> {
pub fn poll_ready(
&mut self,
cx: &mut task::Context
) -> Poll<Result<(), SendError>> {
let state = decode_state(self.inner.state.load(SeqCst));
if !state.is_open {
return Poll::Ready(Err(SendError {
Expand Down Expand Up @@ -698,7 +706,10 @@ impl<T> Sender<T> {

impl<T> UnboundedSender<T> {
/// Check if the channel is ready to receive a message.
pub fn poll_ready(&self, _: &mut task::Context) -> Poll<Result<(), SendError>> {
pub fn poll_ready(
&self,
_: &mut task::Context,
) -> Poll<Result<(), SendError>> {
self.0.poll_ready_nb()
}

Expand Down Expand Up @@ -851,8 +862,8 @@ impl<T> Receiver<T> {
loop {
match unsafe { self.inner.message_queue.pop() } {
PopResult::Data(msg) => {
// If there are any parked task handles in the parked queue, pop
// one and unpark it.
// If there are any parked task handles in the parked queue,
// pop one and unpark it.
self.unpark_one();

// Decrement number of messages
Expand Down Expand Up @@ -948,7 +959,10 @@ impl<T> Unpin for Receiver<T> {}
impl<T> Stream for Receiver<T> {
type Item = T;

fn poll_next(mut self: PinMut<Self>, cx: &mut task::Context) -> Poll<Option<T>> {
fn poll_next(
mut self: PinMut<Self>,
cx: &mut task::Context,
) -> Poll<Option<T>> {
loop {
// Try to read a message off of the message queue.
let msg = match self.next_message() {
Expand Down Expand Up @@ -1015,7 +1029,10 @@ impl<T> UnboundedReceiver<T> {
impl<T> Stream for UnboundedReceiver<T> {
type Item = T;

fn poll_next(mut self: PinMut<Self>, cx: &mut task::Context) -> Poll<Option<T>> {
fn poll_next(
mut self: PinMut<Self>,
cx: &mut task::Context,
) -> Poll<Option<T>> {
PinMut::new(&mut self.0).poll_next(cx)
}
}
Expand Down
46 changes: 24 additions & 22 deletions futures-channel/src/oneshot.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
//! A channel for sending a single message between asynchronous tasks.

use futures_core::future::Future;
use futures_core::task::{self, Poll, Waker};
use std::marker::Unpin;
use std::mem::PinMut;
use std::sync::Arc;
Expand All @@ -8,9 +10,6 @@ use std::sync::atomic::Ordering::SeqCst;
use std::error::Error;
use std::fmt;

use futures_core::{Future, Poll};
use futures_core::task::{self, Waker};

use crate::lock::Lock;

/// A future for a value that will be provided by another asynchronous task.
Expand Down Expand Up @@ -73,11 +72,11 @@ struct Inner<T> {

/// Creates a new one-shot channel for sending values across asynchronous tasks.
///
/// This function is similar to Rust's channel constructor found in the standard library.
/// Two halves are returned, the first of which is a `Sender` handle, used to
/// signal the end of a computation and provide its value. The second half is a
/// `Receiver` which implements the `Future` trait, resolving to the value that
/// was given to the `Sender` handle.
/// This function is similar to Rust's channel constructor found in the standard
/// library. Two halves are returned, the first of which is a `Sender` handle,
/// used to signal the end of a computation and provide its value. The second
/// half is a `Receiver` which implements the `Future` trait, resolving to the
/// value that was given to the `Sender` handle.
///
/// Each half can be separately owned and sent across tasks.
///
Expand All @@ -92,18 +91,18 @@ struct Inner<T> {
/// use futures::*;
///
/// fn main() {
/// let (p, c) = oneshot::channel::<i32>();
/// let (sender, receiver) = oneshot::channel::<i32>();
///
/// # let t =
/// thread::spawn(|| {
/// let future = c.map(|i| {
/// let future = receiver.map(|i| {
/// println!("got: {:?}", i);
/// });
/// // ...
/// # return future;
/// });
///
/// p.send(3).unwrap();
/// sender.send(3).unwrap();
/// # futures::executor::block_on(t.join().unwrap());
/// }
/// ```
Expand Down Expand Up @@ -179,10 +178,10 @@ impl<T> Inner<T> {
// hence we're canceled. If it succeeds then we just store our handle.
//
// Crucially we then check `oneshot_gone` *again* before we return.
// While we were storing our handle inside `notify_cancel` the `Receiver`
// may have been dropped. The first thing it does is set the flag, and
// if it fails to acquire the lock it assumes that we'll see the flag
// later on. So... we then try to see the flag later on!
// While we were storing our handle inside `notify_cancel` the
// `Receiver` may have been dropped. The first thing it does is set the
// flag, and if it fails to acquire the lock it assumes that we'll see
// the flag later on. So... we then try to see the flag later on!
let handle = cx.waker().clone();
match self.tx_task.try_lock() {
Some(mut p) => *p = Some(handle),
Expand Down Expand Up @@ -284,8 +283,8 @@ impl<T> Inner<T> {
// successfully blocked our task and we return `Pending`.
if done || self.complete.load(SeqCst) {
// If taking the lock fails, the sender will realise that the we're
// `done` when it checks the `complete` flag on the way out, and will
// treat the send as a failure.
// `done` when it checks the `complete` flag on the way out, and
// will treat the send as a failure.
if let Some(mut slot) = self.data.try_lock() {
if let Some(data) = slot.take() {
return Poll::Ready(Ok(data));
Expand Down Expand Up @@ -399,16 +398,16 @@ impl<T> Receiver<T> {
///
/// Any `send` operation which happens after this method returns is
/// guaranteed to fail. After calling this method, you can use
/// [`Receiver::poll`](Receiver::poll) to determine whether a message had
/// previously been sent.
/// [`Receiver::poll`](<Receiver as Future>::poll) to determine whether a
/// message had previously been sent.
pub fn close(&mut self) {
self.inner.close_rx()
}

/// Attempts to receive a message outside of the context of a task.
///
/// Useful when a [`Context`](Context) is not available such as within a
/// `Drop` impl.
/// Useful when a [`Context`](futures_core::task::Context) is not available
/// such as within a `Drop` impl.
///
/// Does not schedule a task wakeup or have any other side effects.
///
Expand All @@ -424,7 +423,10 @@ impl<T> Receiver<T> {
impl<T> Future for Receiver<T> {
type Output = Result<T, Canceled>;

fn poll(self: PinMut<Self>, cx: &mut task::Context) -> Poll<Result<T, Canceled>> {
fn poll(
self: PinMut<Self>,
cx: &mut task::Context,
) -> Poll<Result<T, Canceled>> {
self.inner.recv(cx)
}
}
Expand Down
2 changes: 0 additions & 2 deletions futures-channel/tests/mpsc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -408,8 +408,6 @@ fn stress_close_receiver() {

async fn stress_poll_ready_sender(mut sender: mpsc::Sender<u32>, count: u32) {
for i in (1..=count).rev() {
// FIXME: should assert `is_ok()`, but cannot because assigning the result
// of this expression to a variable causes an ICE. See rust-lang/rust/issues/51995.
await!(sender.send(i)).unwrap();
}
}
Expand Down
5 changes: 2 additions & 3 deletions futures-core/src/future/either.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use crate::{task, Stream, Poll};

use crate::stream::{Stream};
use crate::task::{self, Poll};
use core::mem::PinMut;

use either::Either;

// impl<A, B> Future for Either<A, B>
Expand Down
Loading