Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ready-cache: Ensure cancelation can be observed #668

Merged
merged 10 commits into from
Jun 17, 2022
74 changes: 50 additions & 24 deletions tower/src/ready_cache/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -268,21 +268,10 @@ where
// recreated after the service is used.
self.ready.insert(key, (svc, (cancel_tx, cancel_rx)));
} else {
// This should not technically be possible. We must have decided to cancel
// a Service (by sending on the CancelTx), yet that same service then
// returns Ready. Since polling a Pending _first_ polls the CancelRx, that
// _should_ always see our CancelTx send. Yet empirically, that isn't true:
//
// https://github.com/tower-rs/tower/issues/415
//
// So, we instead detect the endpoint as canceled at this point. That
// should be fine, since the oneshot is only really there to ensure that
// the Pending is polled again anyway.
//
// We assert that this can't happen in debug mode so that hopefully one day
// we can find a test that triggers this reliably.
debug_assert!(cancel_tx.is_some());
debug!("canceled endpoint removed when ready");
assert!(
cancel_tx.is_some(),
"services that become ready must have a pending cancelation"
);
}
}
Poll::Ready(Some(Err(PendingError::Canceled(_)))) => {
Expand All @@ -292,13 +281,11 @@ where
}
Poll::Ready(Some(Err(PendingError::Inner(key, e)))) => {
let cancel_tx = self.pending_cancel_txs.swap_remove(&key);
if cancel_tx.is_some() {
return Err(error::Failed(key, e.into())).into();
} else {
// See comment for the same clause under Ready(Some(Ok)).
debug_assert!(cancel_tx.is_some());
debug!("canceled endpoint removed on error");
}
assert!(
cancel_tx.is_some(),
"services that return an error must have a pending cancelation"
);
return Err(error::Failed(key, e.into())).into();
}
}
}
Expand Down Expand Up @@ -410,8 +397,13 @@ where
type Output = Result<(K, S, CancelRx), PendingError<K, S::Error>>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut fut = self.cancel.as_mut().expect("polled after complete");
if let Poll::Ready(r) = Pin::new(&mut fut).poll(cx) {
// This MUST return ready as soon as the sender has been notified so
// that we don't return a service that has been canceled, so we disable
// cooperative scheduling on the receiver. Otherwise, the receiver can
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tiny nit:

Suggested change
// cooperative scheduling on the receiver. Otherwise, the receiver can
// cooperative yielding on the receiver. Otherwise, the receiver can

it's all cooperative scheduling :)

// sporadically return pending even though the sender has fired.
let mut cancel =
tokio::task::unconstrained(self.cancel.as_mut().expect("polled after complete"));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style nit, take it or leave it: might consider something like this, so that it's very obvious that the comment is referring to the use of unconstrained:

Suggested change
// This MUST return ready as soon as the sender has been notified so
// that we don't return a service that has been canceled, so we disable
// cooperative scheduling on the receiver. Otherwise, the receiver can
// sporadically return pending even though the sender has fired.
let mut cancel =
tokio::task::unconstrained(self.cancel.as_mut().expect("polled after complete"));
let cancel = self.cancel.as_mut().expect("polled after complete");
// This MUST return ready as soon as the sender has been notified so
// that we don't return a service that has been canceled, so we disable
// cooperative scheduling on the receiver. Otherwise, the receiver can
// sporadically return pending even though the sender has fired.
let mut cancel = tokio::task::unconstrained(cancel);

if let Poll::Ready(r) = Pin::new(&mut cancel).poll(cx) {
assert!(r.is_ok(), "cancel sender lost");
let key = self.key.take().expect("polled after complete");
return Err(PendingError::Canceled(key)).into();
Expand Down Expand Up @@ -456,3 +448,37 @@ where
.finish()
}
}

#[cfg(test)]
mod test {
use super::*;

// Tests https://github.com/tower-rs/tower/issues/415
#[tokio::test(flavor = "current_thread")]
async fn cancelation_observed() {
let mut cache = ReadyCache::default();
let mut handles = vec![];

// NOTE This test passes at 129 items, but fails at 130 items (if the
// cancelation receiver is not marked `unconstrained`).
for _ in 0..130 {
let (svc, mut handle) = tower_test::mock::pair::<(), ()>();
handle.allow(1);
cache.push("ep0", svc);
handles.push(handle);
}

struct Ready(ReadyCache<&'static str, tower_test::mock::Mock<(), ()>, ()>);
impl Unpin for Ready {}
impl std::future::Future for Ready {
type Output = Result<(), error::Failed<&'static str>>;
fn poll(
self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Self::Output> {
self.get_mut().0.poll_pending(cx)
}
}
Ready(cache).await.unwrap();
}
}
olix0r marked this conversation as resolved.
Show resolved Hide resolved