Skip to content

Commit

Permalink
Fix Subscription cancelation when never awaiting
Browse files Browse the repository at this point in the history
`StreamExt::forward` will keep polling a ready `Stream` in a loop. If the
`Stream` is always ready, the `poll` method of `Forward` effectively
blocks (see rust-lang/futures-rs#2552).

The fix consists in manually implementing a simpler version of `Forward`.
  • Loading branch information
hecrj committed May 27, 2022
1 parent 5de337f commit ad69ddd
Showing 1 changed file with 23 additions and 9 deletions.
32 changes: 23 additions & 9 deletions futures/src/subscription/tracker.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
use crate::{BoxFuture, MaybeSend, Subscription};

use futures::{channel::mpsc, sink::Sink};
use futures::{
channel::mpsc,
sink::{Sink, SinkExt},
};
use std::{collections::HashMap, marker::PhantomData};

/// A registry of subscription streams.
Expand Down Expand Up @@ -85,19 +88,30 @@ where
continue;
}

let (cancel, cancelled) = futures::channel::oneshot::channel();
let (cancel, mut canceled) = futures::channel::oneshot::channel();

// TODO: Use bus if/when it supports async
let (event_sender, event_receiver) =
futures::channel::mpsc::channel(100);

let stream = recipe.stream(event_receiver.boxed());

let future = futures::future::select(
cancelled,
stream.map(Ok).forward(receiver.clone()),
)
.map(|_| ());
let mut receiver = receiver.clone();
let mut stream = recipe.stream(event_receiver.boxed());

let future = async move {
loop {
let select =
futures::future::select(&mut canceled, stream.next());

match select.await {
futures::future::Either::Left(_)
| futures::future::Either::Right((None, _)) => break,
futures::future::Either::Right((Some(message), _)) => {
let _ = receiver.send(message).await;
}
}
}
}
.boxed();

let _ = self.subscriptions.insert(
id,
Expand Down

0 comments on commit ad69ddd

Please sign in to comment.