Skip to content

Commit

Permalink
Add a method to learn about cancel on Complete
Browse files Browse the repository at this point in the history
This commits adds a `poll_cancel` method to learn about when the `Oneshot` half
of a complete/oneshot pair has gone away. This can then be used to detect when a
computation is no longer wanted.

Closes rust-lang#63
  • Loading branch information
alexcrichton committed Aug 19, 2016
1 parent df0e550 commit f30ba82
Show file tree
Hide file tree
Showing 2 changed files with 141 additions and 1 deletion.
78 changes: 77 additions & 1 deletion src/oneshot.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};

use {Future, Poll};
use slot::{Slot, Token};
use task;
use lock::Lock;
use task::{self, TaskHandle};

/// A future representing the completion of a computation happening elsewhere in
/// memory.
Expand All @@ -24,6 +26,8 @@ pub struct Complete<T> {

struct Inner<T> {
slot: Slot<Option<T>>,
oneshot_gone: AtomicBool,
notify_cancel: Lock<Option<TaskHandle>>,
}

/// Creates a new in-memory oneshot used to represent completing a computation.
Expand Down Expand Up @@ -55,6 +59,8 @@ struct Inner<T> {
pub fn oneshot<T>() -> (Complete<T>, Oneshot<T>) {
let inner = Arc::new(Inner {
slot: Slot::new(None),
oneshot_gone: AtomicBool::new(false),
notify_cancel: Lock::new(None),
});
let oneshot = Oneshot {
inner: inner.clone(),
Expand All @@ -78,6 +84,57 @@ impl<T> Complete<T> {
self.send(Some(t))
}

/// Polls this `Complete` half to detect whether the `Oneshot` this has
/// paired with has gone away.
///
/// This function can be used to learn about when the `Oneshot` (consumer)
/// half has gone away and nothing will be able to receive a message sent
/// from `complete`.
///
/// Like `Future::poll`, this function will panic if it's not called from
/// within the context of a task. In otherwords, this should only ever be
/// called from inside another future.
///
/// If `Poll::Ok` is returned then it means that the `Oneshot` has
/// disappeared and the result this `Complete` would otherwise produce
/// should no longer be produced.
///
/// If `Poll::NotReady` is returned then the `Oneshot` is still alive and
/// may be able to receive a message if sent. The current task, however,
/// is scheduled to receive a notification if the corresponding `Oneshot`
/// goes away.
pub fn poll_cancel(&mut self) -> Poll<(), ()> {
// Fast path up first, just read the flag and see if our other half is
// gone.
if self.inner.oneshot_gone.load(Ordering::SeqCst) {
return Poll::Ok(())
}

// If our other half is not gone then we need to park our current task
// and move it into the `notify_cancel` slot to get notified when it's
// actually gone.
//
// If `try_lock` fails, then the `Oneshot` is in the process of using
// it, so we can deduce that it's now in the process of going away and
// hence we're canceled. If it succeeds then we just store our handle.
//
// Crucially we then check `oneshot_gone` *again* before we return.
// While we were storing our handle inside `notify_cancel` the `Oneshot`
// may have been dropped. The first thing it does is set the flag, and
// if it fails to acquire the lock it assumes that we'll see the flag
// later on. So... we then try to see the flag later on!
let handle = task::park();
match self.inner.notify_cancel.try_lock() {
Some(mut p) => *p = Some(handle),
None => return Poll::Ok(()),
}
if self.inner.oneshot_gone.load(Ordering::SeqCst) {
Poll::Ok(())
} else {
Poll::NotReady
}
}

fn send(&mut self, t: Option<T>) {
if let Err(e) = self.inner.slot.try_produce(t) {
self.inner.slot.on_empty(Some(e.into_inner()), |slot, item| {
Expand Down Expand Up @@ -125,8 +182,27 @@ impl<T> Future for Oneshot<T> {

impl<T> Drop for Oneshot<T> {
fn drop(&mut self) {
// First up, if we squirreled away a task to get notified once the
// oneshot was filled in, we cancel that notification. We'll never end
// up actually receiving data (as we're being dropped) so no need to
// hold onto the task.
if let Some(cancel_token) = self.cancel_token.take() {
self.inner.slot.cancel(cancel_token)
}

// Next up, inform the `Complete` half that we're going away. First up
// we flag ourselves as gone, and next we'll attempt to wake up any
// handle that was stored.
//
// If we fail to acquire the lock on the handle, that means that a
// `Complete` is in the process of storing one, and it'll check
// `oneshot_gone` on its way out to see our write here.
self.inner.oneshot_gone.store(true, Ordering::SeqCst);
if let Some(mut handle) = self.inner.notify_cancel.try_lock() {
if let Some(task) = handle.take() {
drop(handle);
task.unpark()
}
}
}
}
64 changes: 64 additions & 0 deletions tests/oneshot.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
extern crate futures;

use std::sync::mpsc::{channel, Sender};
use std::thread;

use futures::{oneshot, Complete, Future, Poll};
use futures::task::Task;

#[test]
fn smoke_poll() {
let (mut tx, rx) = oneshot::<u32>();
Task::new().enter(|| {
assert!(tx.poll_cancel().is_not_ready());
assert!(tx.poll_cancel().is_not_ready());
drop(rx);
assert!(tx.poll_cancel().is_ready());
assert!(tx.poll_cancel().is_ready());
})
}

#[test]
fn cancel_notifies() {
let (tx, rx) = oneshot::<u32>();
let (tx2, rx2) = channel();

WaitForCancel { tx: tx }.then(move |v| tx2.send(v)).forget();
drop(rx);
rx2.recv().unwrap().unwrap();
}

struct WaitForCancel {
tx: Complete<u32>,
}

impl Future for WaitForCancel {
type Item = ();
type Error = ();

fn poll(&mut self) -> Poll<(), ()> {
self.tx.poll_cancel()
}
}

#[test]
fn cancel_lots() {
let (tx, rx) = channel::<(Complete<_>, Sender<_>)>();
let t = thread::spawn(move || {
for (tx, tx2) in rx {
WaitForCancel { tx: tx }.then(move |v| tx2.send(v)).forget();
}

});

for _ in 0..20000 {
let (otx, orx) = oneshot::<u32>();
let (tx2, rx2) = channel();
tx.send((otx, tx2)).unwrap();
drop(orx);
rx2.recv().unwrap().unwrap();
}
drop(tx);

t.join().unwrap();
}

0 comments on commit f30ba82

Please sign in to comment.