Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

More tests 4 #1069

Merged
merged 36 commits into from
Jul 9, 2018
Merged
Show file tree
Hide file tree
Changes from 31 commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
d4c271c
Remove no longer relevant FIXME
MajorBreakfast Jul 4, 2018
aa84e98
Fix send all
MajorBreakfast Jul 4, 2018
c6362e1
Test for split
MajorBreakfast Jul 4, 2018
b6ba956
Restructure macros
MajorBreakfast Jul 4, 2018
c656939
`FutureExt::inspect` test
MajorBreakfast Jul 4, 2018
0661769
Scope free functions
MajorBreakfast Jul 4, 2018
df75825
recurse test
MajorBreakfast Jul 4, 2018
468943b
Same features attribute for all the tests
MajorBreakfast Jul 4, 2018
824a29f
Remove `extern crate futures;` from tests
MajorBreakfast Jul 4, 2018
0c74e93
Extract contents of await module into submodules
MajorBreakfast Jul 4, 2018
30a17a0
Split test support module into submodules
MajorBreakfast Jul 4, 2018
b5c3aa5
Unfold test
MajorBreakfast Jul 4, 2018
337b8c0
Remove unused macro use warning
MajorBreakfast Jul 4, 2018
05a6b99
Rename `await` module to `async_await`
MajorBreakfast Jul 4, 2018
0ce2116
Refactored various combinators, added eager_drop test
MajorBreakfast Jul 5, 2018
73d454e
Remove unnecessary option in `Chain`
MajorBreakfast Jul 6, 2018
ef01e40
Name `TryChain` callback `op` becaue it is not always async
MajorBreakfast Jul 6, 2018
34e20bc
Restrict visibility of combinator functions
MajorBreakfast Jul 6, 2018
cf8bccb
Move test file `enventual.rs`
MajorBreakfast Jul 6, 2018
785e3ac
Implement tests from eventual.rs
MajorBreakfast Jul 6, 2018
3c7400c
Some name changes
MajorBreakfast Jul 6, 2018
5c39182
Remove accidentally included tests from oneshot.rs
MajorBreakfast Jul 6, 2018
2a78234
Align naming with mpsc::channel() docs
MajorBreakfast Jul 6, 2018
39c3e47
Compatibility with new nightly: FutureObj
MajorBreakfast Jul 7, 2018
3722454
Make docs build, light refactoring, enable tests in CI
MajorBreakfast Jul 7, 2018
4f1653f
Streamline `use` statements
MajorBreakfast Jul 7, 2018
4e7c924
`result()` is now called `ready()`
MajorBreakfast Jul 7, 2018
dd92921
Tweak macro documentation and document pin macros
MajorBreakfast Jul 7, 2018
cc3c9b0
Ensure all future and stream reexports
MajorBreakfast Jul 7, 2018
ad744ac
Add benchmarks and no_std build to CI
MajorBreakfast Jul 7, 2018
9cb2f85
Final tweaks to use statement order
MajorBreakfast Jul 8, 2018
b66bfdf
doc(hidden) top-level reexports
MajorBreakfast Jul 9, 2018
ade4c92
futures_core hidden top level exports
MajorBreakfast Jul 9, 2018
0f82429
task::Context
MajorBreakfast Jul 9, 2018
a89c0dd
Remove `Context` from top-level exports
MajorBreakfast Jul 9, 2018
a5222ba
Change to `task::Context` in docs for `oneshot`
MajorBreakfast Jul 9, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 14 additions & 25 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,8 @@ sudo: false
matrix:
include:
- os: osx
# - rust: stable
# - rust: beta
# - rust: nightly
# script: cargo bench --all && cd futures-util && cargo bench --features=bench
- rust: nightly
script: cargo bench --all && cd futures-util && cargo bench --features=bench
- rust: nightly
before_script:
- pip install ghp-import --user && export PATH=$HOME/.local/bin:$PATH
Expand All @@ -18,38 +16,29 @@ matrix:
after_success:
- ghp-import -n target/doc
- git push -qf https://${GH_TOKEN}@github.com/${TRAVIS_REPO_SLUG}.git gh-pages
# - rust: stable
# script:
# - cargo build --manifest-path futures-core/Cargo.toml --no-default-features
# - cargo build --manifest-path futures/Cargo.toml --no-default-features
# - cargo build --manifest-path futures-channel/Cargo.toml --no-default-features
# - cargo build --manifest-path futures-executor/Cargo.toml --no-default-features
# - cargo build --manifest-path futures-sink/Cargo.toml --no-default-features
# - cargo build --manifest-path futures-util/Cargo.toml --no-default-features
# - rust: nightly
# script:
# - cargo build --manifest-path futures-core/Cargo.toml --features nightly
# - cargo build --manifest-path futures-stable/Cargo.toml --features nightly
# - cargo build --manifest-path futures-async-runtime/Cargo.toml --features nightly
# - cargo build --manifest-path futures-macro-async/Cargo.toml --features nightly
# - cargo build --manifest-path futures/Cargo.toml --features nightly
# - cargo test --manifest-path futures-macro-await/Cargo.toml --features nightly
# - cargo test --manifest-path futures/Cargo.toml --features nightly --test async_await_tests
- rust: nightly
script:
- cargo build --manifest-path futures/Cargo.toml --no-default-features
- cargo build --manifest-path futures-channel/Cargo.toml --no-default-features
- cargo build --manifest-path futures-core/Cargo.toml --no-default-features
- cargo build --manifest-path futures-executor/Cargo.toml --no-default-features
- cargo build --manifest-path futures-sink/Cargo.toml --no-default-features
- cargo build --manifest-path futures-util/Cargo.toml --no-default-features
# - rust: nightly
# script:
# - rustup component add rust-src
# - cargo install xargo
# - xargo build --manifest-path futures/Cargo.toml --target thumbv6m-none-eabi --no-default-features --features nightly
# - xargo build --manifest-path futures/Cargo.toml --target thumbv6m-none-eabi --no-default-features
# - rust: 1.20.0
# script: cargo test --all
# - rust: nightly
# script:
# - cargo test --manifest-path futures/testcrate/Cargo.toml

script:
- cargo build --all # for now build only; tests require the full suite of crates
# - cargo test --all
# - cargo test --all --release
- cargo build --all
- cargo test --all
- cargo test --all --release

env:
global:
Expand Down
2 changes: 1 addition & 1 deletion futures-channel/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
#![deny(missing_docs, missing_debug_implementations, warnings)]
#![deny(bare_trait_objects)]

#![doc(html_root_url = "https://docs.rs/futures-channel/0.2.0")]
#![doc(html_root_url = "https://docs.rs/futures-channel/0.3.0-alpha")]

#[cfg(feature = "std")]
extern crate std;
Expand Down
79 changes: 48 additions & 31 deletions futures-channel/src/mpsc/mod.rs
Original file line number Diff line number Diff line change
@@ -1,38 +1,42 @@
//! A multi-producer, single-consumer queue for sending values across
//! asynchronous tasks.
//!
//! Similarly to the `std`, channel creation provides [`Receiver`](Receiver) and
//! [`Sender`](Sender) handles. [`Receiver`](Receiver) implements
//! [`Stream`](futures_core::Stream) and allows a task to read values out of the
//! channel. If there is no message to read from the channel, the current task
//! will be notified when a new value is sent. [`Sender`](Sender) implements the
//! `Sink` trait and allows a task to send messages into
//! Similarly to the `std`, channel creation provides [`Receiver`] and
//! [`Sender`] handles. [`Receiver`] implements [`Stream`] and allows a task to
//! read values out of the channel. If there is no message to read from the
//! channel, the current task will be notified when a new value is sent.
//! [`Sender`] implements the `Sink` trait and allows a task to send messages into
//! the channel. If the channel is at capacity, the send will be rejected and
//! the task will be notified when additional capacity is available. In other
//! words, the channel provides backpressure.
//!
//! Unbounded channels are also available using the [`unbounded`](unbounded)
//! constructor.
//! Unbounded channels are also available using the `unbounded` constructor.
//!
//! # Disconnection
//!
//! When all [`Sender`](Sender) handles have been dropped, it is no longer
//! When all [`Sender`] handles have been dropped, it is no longer
//! possible to send values into the channel. This is considered the termination
//! event of the stream. As such, [`Receiver::poll_next`](Receiver::poll_next)
//! event of the stream. As such, [`Receiver::poll_next`]
//! will return `Ok(Ready(None))`.
//!
//! If the [`Receiver`](Receiver) handle is dropped, then messages can no longer
//! If the [`Receiver`] handle is dropped, then messages can no longer
//! be read out of the channel. In this case, all further attempts to send will
//! result in an error.
//!
//! # Clean Shutdown
//!
//! If the [`Receiver`](Receiver) is simply dropped, then it is possible for
//! If the [`Receiver`] is simply dropped, then it is possible for
//! there to be messages still in the channel that will not be processed. As
//! such, it is usually desirable to perform a "clean" shutdown. To do this, the
//! receiver will first call `close`, which will prevent any further messages to
//! be sent into the channel. Then, the receiver consumes the channel to
//! completion, at which point the receiver can be dropped.
//!
//! [`Sender`]: struct.Sender.html
//! [`Receiver`]: struct.Receiver.html
//! [`Stream`]: ../../futures_core/stream/trait.Stream.html
//! [`Receiver::poll_next`]:
//! ../../futures_core/stream/trait.Stream.html#tymethod.poll_next
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added these links like so. I wasn't able to make it build otherwise. Seems to be a bug in all //! comments


// At the core, the channel uses an atomic FIFO queue for message passing. This
// queue is used as the primary coordination primitive. In order to enforce
Expand Down Expand Up @@ -74,20 +78,19 @@
// happens-before semantics required for the acquire / release semantics used
// by the queue structure.

use std::mem::PinMut;
use std::marker::Unpin;
use std::fmt;
use std::error::Error;
use futures_core::stream::Stream;
use futures_core::task::{self, Waker, Poll};
use std::any::Any;
use std::error::Error;
use std::fmt;
use std::marker::Unpin;
use std::mem::PinMut;
use std::sync::{Arc, Mutex};
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering::SeqCst;
use std::sync::{Arc, Mutex};
use std::thread;
use std::usize;

use futures_core::task::{self, Waker};
use futures_core::{Poll, Stream};

use crate::mpsc::queue::{Queue, PopResult};

mod queue;
Expand Down Expand Up @@ -362,7 +365,7 @@ impl SenderTask {
/// `buffer` "first come, first serve" slots available to all senders.
///
/// The [`Receiver`](Receiver) returned implements the
/// [`Stream`](futures_core::Stream) trait, while [`Sender`](Sender) implements
/// [`Stream`](futures_core::stream::Stream) trait, while [`Sender`](Sender) implements
/// `Sink`.
pub fn channel<T>(buffer: usize) -> (Sender<T>, Receiver<T>) {
// Check that the requested buffer size does not exceed the maximum buffer
Expand All @@ -371,7 +374,8 @@ pub fn channel<T>(buffer: usize) -> (Sender<T>, Receiver<T>) {
channel2(Some(buffer))
}

/// Creates an unbounded mpsc channel for communicating between asynchronous tasks.
/// Creates an unbounded mpsc channel for communicating between asynchronous
/// tasks.
///
/// A `send` on this channel will always succeed as long as the receive half has
/// not been closed. If the receiver falls behind, messages will be arbitrarily
Expand Down Expand Up @@ -556,8 +560,8 @@ impl<T> Sender<T> {
// This probably is never hit? Odds are the process will run out of
// memory first. It may be worth to return something else in this
// case?
assert!(state.num_messages < MAX_CAPACITY, "buffer space exhausted; \
sending this messages would overflow the state");
assert!(state.num_messages < MAX_CAPACITY, "buffer space \
exhausted; sending this messages would overflow the state");

state.num_messages += 1;

Expand Down Expand Up @@ -642,9 +646,13 @@ impl<T> Sender<T> {
///
/// - `Ok(Async::Ready(_))` if there is sufficient capacity;
/// - `Ok(Async::Pending)` if the channel may not have
/// capacity, in which case the current task is queued to be notified once capacity is available;
/// capacity, in which case the current task is queued to be notified once
/// capacity is available;
/// - `Err(SendError)` if the receiver has been dropped.
pub fn poll_ready(&mut self, cx: &mut task::Context) -> Poll<Result<(), SendError>> {
pub fn poll_ready(
&mut self,
cx: &mut task::Context
) -> Poll<Result<(), SendError>> {
let state = decode_state(self.inner.state.load(SeqCst));
if !state.is_open {
return Poll::Ready(Err(SendError {
Expand Down Expand Up @@ -698,7 +706,10 @@ impl<T> Sender<T> {

impl<T> UnboundedSender<T> {
/// Check if the channel is ready to receive a message.
pub fn poll_ready(&self, _: &mut task::Context) -> Poll<Result<(), SendError>> {
pub fn poll_ready(
&self,
_: &mut task::Context,
) -> Poll<Result<(), SendError>> {
self.0.poll_ready_nb()
}

Expand Down Expand Up @@ -851,8 +862,8 @@ impl<T> Receiver<T> {
loop {
match unsafe { self.inner.message_queue.pop() } {
PopResult::Data(msg) => {
// If there are any parked task handles in the parked queue, pop
// one and unpark it.
// If there are any parked task handles in the parked queue,
// pop one and unpark it.
self.unpark_one();

// Decrement number of messages
Expand Down Expand Up @@ -948,7 +959,10 @@ impl<T> Unpin for Receiver<T> {}
impl<T> Stream for Receiver<T> {
type Item = T;

fn poll_next(mut self: PinMut<Self>, cx: &mut task::Context) -> Poll<Option<T>> {
fn poll_next(
mut self: PinMut<Self>,
cx: &mut task::Context,
) -> Poll<Option<T>> {
loop {
// Try to read a message off of the message queue.
let msg = match self.next_message() {
Expand Down Expand Up @@ -1015,7 +1029,10 @@ impl<T> UnboundedReceiver<T> {
impl<T> Stream for UnboundedReceiver<T> {
type Item = T;

fn poll_next(mut self: PinMut<Self>, cx: &mut task::Context) -> Poll<Option<T>> {
fn poll_next(
mut self: PinMut<Self>,
cx: &mut task::Context,
) -> Poll<Option<T>> {
PinMut::new(&mut self.0).poll_next(cx)
}
}
Expand Down
Loading