diff --git a/protocols/relay/CHANGELOG.md b/protocols/relay/CHANGELOG.md index f49e57af0f7..97638d1ae6a 100644 --- a/protocols/relay/CHANGELOG.md +++ b/protocols/relay/CHANGELOG.md @@ -1,9 +1,12 @@ ## 0.17.3 - Use `web-time` instead of `instant`. See [PR 5347](https://github.com/libp2p/rust-libp2p/pull/5347). +- Fix manual closure of relayed listener. + See [PR 5491](https://github.com/libp2p/rust-libp2p/pull/5491) - Add resource limits to `CircuitReq` to be set See [PR 5493](https://github.com/libp2p/rust-libp2p/pull/5493) + ## 0.17.2 - Fix support for unlimited relay connection according to spec. diff --git a/protocols/relay/src/priv_client/transport.rs b/protocols/relay/src/priv_client/transport.rs index 7147f0b5e55..b4374aa4672 100644 --- a/protocols/relay/src/priv_client/transport.rs +++ b/protocols/relay/src/priv_client/transport.rs @@ -27,7 +27,6 @@ use crate::RequestId; use futures::channel::mpsc; use futures::channel::oneshot; use futures::future::{ready, BoxFuture, FutureExt, Ready}; -use futures::ready; use futures::sink::SinkExt; use futures::stream::SelectAll; use futures::stream::{Stream, StreamExt}; @@ -36,7 +35,7 @@ use libp2p_core::transport::{ListenerId, TransportError, TransportEvent}; use libp2p_identity::PeerId; use std::collections::VecDeque; use std::pin::Pin; -use std::task::{Context, Poll}; +use std::task::{Context, Poll, Waker}; use thiserror::Error; /// A [`Transport`] enabling client relay capabilities. @@ -151,6 +150,7 @@ impl libp2p_core::Transport for Transport { queued_events: Default::default(), from_behaviour, is_closed: false, + waker: None, }; self.listeners.push(listener); Ok(()) @@ -313,6 +313,7 @@ pub(crate) struct Listener { /// The listener can be closed either manually with [`Transport::remove_listener`](libp2p_core::Transport) or if /// the sender side of the `from_behaviour` channel is dropped. is_closed: bool, + waker: Option, } impl Listener { @@ -328,6 +329,10 @@ impl Listener { reason, }); self.is_closed = true; + + if let Some(waker) = self.waker.take() { + waker.wake(); + } } } @@ -337,18 +342,27 @@ impl Stream for Listener { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { loop { if let Some(event) = self.queued_events.pop_front() { + self.waker = None; return Poll::Ready(Some(event)); } if self.is_closed { // Terminate the stream if the listener closed and all remaining events have been reported. + self.waker = None; return Poll::Ready(None); } - let Some(msg) = ready!(self.from_behaviour.poll_next_unpin(cx)) else { - // Sender of `from_behaviour` has been dropped, signaling listener to close. - self.close(Ok(())); - continue; + let msg = match self.from_behaviour.poll_next_unpin(cx) { + Poll::Ready(Some(msg)) => msg, + Poll::Ready(None) => { + // Sender of `from_behaviour` has been dropped, signaling listener to close. + self.close(Ok(())); + continue; + } + Poll::Pending => { + self.waker = Some(cx.waker().clone()); + return Poll::Pending; + } }; match msg {