From ad69ddd54d2dd9f303384e0d01dbb93528c5969d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?H=C3=A9ctor=20Ram=C3=B3n=20Jim=C3=A9nez?= Date: Fri, 27 May 2022 19:43:50 +0200 Subject: [PATCH] Fix `Subscription` cancelation when never awaiting `StreamExt::forward` will keep polling a ready `Stream` in a loop. If the `Stream` is always ready, the `poll` method of `Forward` effectively blocks (see https://github.com/rust-lang/futures-rs/issues/2552). The fix consists in manually implementing a simpler version of `Forward`. --- futures/src/subscription/tracker.rs | 32 +++++++++++++++++++++-------- 1 file changed, 23 insertions(+), 9 deletions(-) diff --git a/futures/src/subscription/tracker.rs b/futures/src/subscription/tracker.rs index 421fb91730..f93f7c2c52 100644 --- a/futures/src/subscription/tracker.rs +++ b/futures/src/subscription/tracker.rs @@ -1,6 +1,9 @@ use crate::{BoxFuture, MaybeSend, Subscription}; -use futures::{channel::mpsc, sink::Sink}; +use futures::{ + channel::mpsc, + sink::{Sink, SinkExt}, +}; use std::{collections::HashMap, marker::PhantomData}; /// A registry of subscription streams. @@ -85,19 +88,30 @@ where continue; } - let (cancel, cancelled) = futures::channel::oneshot::channel(); + let (cancel, mut canceled) = futures::channel::oneshot::channel(); // TODO: Use bus if/when it supports async let (event_sender, event_receiver) = futures::channel::mpsc::channel(100); - let stream = recipe.stream(event_receiver.boxed()); - - let future = futures::future::select( - cancelled, - stream.map(Ok).forward(receiver.clone()), - ) - .map(|_| ()); + let mut receiver = receiver.clone(); + let mut stream = recipe.stream(event_receiver.boxed()); + + let future = async move { + loop { + let select = + futures::future::select(&mut canceled, stream.next()); + + match select.await { + futures::future::Either::Left(_) + | futures::future::Either::Right((None, _)) => break, + futures::future::Either::Right((Some(message), _)) => { + let _ = receiver.send(message).await; + } + } + } + } + .boxed(); let _ = self.subscriptions.insert( id,