From 142d75118687f4163f0b62ea43c4025859aa2613 Mon Sep 17 00:00:00 2001 From: Tuong Date: Fri, 13 Oct 2023 20:25:10 +0700 Subject: [PATCH 01/10] fix(swarm): prevent overflow in keep-alive computation --- swarm/src/connection.rs | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/swarm/src/connection.rs b/swarm/src/connection.rs index 8fdc39edae0..1a8680ee8b4 100644 --- a/swarm/src/connection.rs +++ b/swarm/src/connection.rs @@ -354,8 +354,11 @@ where if let Some(new_duration) = deadline.checked_duration_since(Instant::now()) { let effective_keep_alive = max(new_duration, *idle_timeout); - - timer.reset(effective_keep_alive) + + let now = Instant::now(); + let safe_effective_keep_alive = checked_add_fraction(now, effective_keep_alive); + + timer.reset(safe_effective_keep_alive) } } } From 596eeeae5d45823a03c046fcc204f3a9f8151b41 Mon Sep 17 00:00:00 2001 From: Tuong Date: Fri, 13 Oct 2023 21:25:35 +0700 Subject: [PATCH 02/10] refactor(swarm): reorder code flow and add explanation comment --- swarm/src/connection.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/swarm/src/connection.rs b/swarm/src/connection.rs index 1a8680ee8b4..d562b4a8f52 100644 --- a/swarm/src/connection.rs +++ b/swarm/src/connection.rs @@ -351,13 +351,13 @@ where (Shutdown::Later(timer, deadline), KeepAlive::Until(t)) => { if *deadline != t { *deadline = t; + let now = Instant::now(); if let Some(new_duration) = deadline.checked_duration_since(Instant::now()) { let effective_keep_alive = max(new_duration, *idle_timeout); - let now = Instant::now(); - let safe_effective_keep_alive = checked_add_fraction(now, effective_keep_alive); - + // `Delay::reset` panics if `now + effective_keep_alive` is `> u64::MAX`. + let safe_effective_keep_alive = checked_add_fraction(now, effective_keep_alive); timer.reset(safe_effective_keep_alive) } } From 2376b89210add97c8384609fd12660723acc6a68 Mon Sep 17 00:00:00 2001 From: Tuong Date: Fri, 13 Oct 2023 21:30:40 +0700 Subject: [PATCH 03/10] docs(swarm): update CHANGELOG --- swarm/CHANGELOG.md | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/swarm/CHANGELOG.md b/swarm/CHANGELOG.md index 3e4ccd004b4..63b3d35300d 100644 --- a/swarm/CHANGELOG.md +++ b/swarm/CHANGELOG.md @@ -1,3 +1,8 @@ +## 0.43.7 - unreleased + +- Fix overflow in `KeepAlive` computation that could occur panic at `Delay::reset` if `SwarmBuilder::idle_connection_timeout` is configured too large. + See [PR 4644](https://github.com/libp2p/rust-libp2p/pull/4644). + ## 0.43.6 - unreleased - Deprecate `libp2p::swarm::SwarmBuilder`. From e7a3aa046476753788163fed5cf7fb290c608bcc Mon Sep 17 00:00:00 2001 From: Tuong Date: Sat, 14 Oct 2023 00:20:13 +0700 Subject: [PATCH 04/10] docs(swarm): update changes with the unreleased version --- swarm/CHANGELOG.md | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/swarm/CHANGELOG.md b/swarm/CHANGELOG.md index 63b3d35300d..489f4ac8980 100644 --- a/swarm/CHANGELOG.md +++ b/swarm/CHANGELOG.md @@ -1,8 +1,3 @@ -## 0.43.7 - unreleased - -- Fix overflow in `KeepAlive` computation that could occur panic at `Delay::reset` if `SwarmBuilder::idle_connection_timeout` is configured too large. - See [PR 4644](https://github.com/libp2p/rust-libp2p/pull/4644). - ## 0.43.6 - unreleased - Deprecate `libp2p::swarm::SwarmBuilder`. @@ -11,6 +6,8 @@ See [PR 4120]. - Make the `Debug` implementation of `StreamProtocol` more concise. See [PR 4631](https://github.com/libp2p/rust-libp2p/pull/4631). +- Fix overflow in `KeepAlive` computation that could occur panic at `Delay::reset` if `SwarmBuilder::idle_connection_timeout` is configured too large. + See [PR 4644](https://github.com/libp2p/rust-libp2p/pull/4644). [PR 4120]: https://github.com/libp2p/rust-libp2p/pull/4120 From 8180fdcaada1492669506d2b9ec3622efbaffd13 Mon Sep 17 00:00:00 2001 From: Tuong Date: Sat, 14 Oct 2023 11:47:26 +0700 Subject: [PATCH 05/10] refactor(swarm): apply suggest fixes --- swarm/CHANGELOG.md | 2 +- swarm/src/connection.rs | 135 +++++++++++++++++++++++++--------------- swarm/src/handler.rs | 26 +++++--- swarm/src/stream.rs | 51 +++++++++++---- 4 files changed, 144 insertions(+), 70 deletions(-) diff --git a/swarm/CHANGELOG.md b/swarm/CHANGELOG.md index 489f4ac8980..cb5a78b9374 100644 --- a/swarm/CHANGELOG.md +++ b/swarm/CHANGELOG.md @@ -6,7 +6,7 @@ See [PR 4120]. - Make the `Debug` implementation of `StreamProtocol` more concise. See [PR 4631](https://github.com/libp2p/rust-libp2p/pull/4631). -- Fix overflow in `KeepAlive` computation that could occur panic at `Delay::reset` if `SwarmBuilder::idle_connection_timeout` is configured too large. +- Fix overflow in `KeepAlive` computation that could occur panic at `Delay::new` if `SwarmBuilder::idle_connection_timeout` is configured too large. See [PR 4644](https://github.com/libp2p/rust-libp2p/pull/4644). [PR 4120]: https://github.com/libp2p/rust-libp2p/pull/4120 diff --git a/swarm/src/connection.rs b/swarm/src/connection.rs index d562b4a8f52..02a06c253c0 100644 --- a/swarm/src/connection.rs +++ b/swarm/src/connection.rs @@ -57,12 +57,16 @@ use std::collections::HashSet; use std::fmt::{Display, Formatter}; use std::future::Future; use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::Arc; use std::task::Waker; use std::time::Duration; use std::{fmt, io, mem, pin::Pin, task::Context, task::Poll}; static NEXT_CONNECTION_ID: AtomicUsize = AtomicUsize::new(1); +/// Counter of the number of active streams on a connection +type ActiveStreamCounter = Arc<()>; + /// Connection identifier. #[derive(Debug, Copy, Clone, Hash, PartialEq, Eq, PartialOrd, Ord)] pub struct ConnectionId(usize); @@ -157,6 +161,8 @@ where local_supported_protocols: HashSet, remote_supported_protocols: HashSet, idle_timeout: Duration, + /// The counter of active streams + stream_counter: ActiveStreamCounter, } impl fmt::Debug for Connection @@ -205,6 +211,7 @@ where local_supported_protocols: initial_protocols, remote_supported_protocols: Default::default(), idle_timeout, + stream_counter: Arc::new(()), } } @@ -237,6 +244,7 @@ where local_supported_protocols: supported_protocols, remote_supported_protocols, idle_timeout, + stream_counter, } = self.get_mut(); loop { @@ -344,58 +352,17 @@ where } } - // Ask the handler whether it wants the connection (and the handler itself) - // to be kept alive, which determines the planned shutdown, if any. - let keep_alive = handler.connection_keep_alive(); - match (&mut *shutdown, keep_alive) { - (Shutdown::Later(timer, deadline), KeepAlive::Until(t)) => { - if *deadline != t { - *deadline = t; - let now = Instant::now(); - if let Some(new_duration) = deadline.checked_duration_since(Instant::now()) - { - let effective_keep_alive = max(new_duration, *idle_timeout); - - // `Delay::reset` panics if `now + effective_keep_alive` is `> u64::MAX`. - let safe_effective_keep_alive = checked_add_fraction(now, effective_keep_alive); - timer.reset(safe_effective_keep_alive) - } - } - } - (_, KeepAlive::Until(earliest_shutdown)) => { - let now = Instant::now(); - - if let Some(requested) = earliest_shutdown.checked_duration_since(now) { - let effective_keep_alive = max(requested, *idle_timeout); - - let safe_keep_alive = checked_add_fraction(now, effective_keep_alive); - - // Important: We store the _original_ `Instant` given by the `ConnectionHandler` in the `Later` instance to ensure we can compare it in the above branch. - // This is quite subtle but will hopefully become simpler soon once `KeepAlive::Until` is fully deprecated. See / - *shutdown = Shutdown::Later(Delay::new(safe_keep_alive), earliest_shutdown) - } - } - (_, KeepAlive::No) if idle_timeout == &Duration::ZERO => { - *shutdown = Shutdown::Asap; - } - (Shutdown::Later(_, _), KeepAlive::No) => { - // Do nothing, i.e. let the shutdown timer continue to tick. - } - (_, KeepAlive::No) => { - let now = Instant::now(); - let safe_keep_alive = checked_add_fraction(now, *idle_timeout); - - *shutdown = Shutdown::Later(Delay::new(safe_keep_alive), now + safe_keep_alive); - } - (_, KeepAlive::Yes) => *shutdown = Shutdown::None, - }; - // Check if the connection (and handler) should be shut down. // As long as we're still negotiating substreams, shutdown is always postponed. if negotiating_in.is_empty() && negotiating_out.is_empty() && requested_substreams.is_empty() + && Arc::strong_count(stream_counter) == 1 { + if let Some(new_timeout) = compute_new_shutdown(handler, shutdown, *idle_timeout) { + *shutdown = new_timeout; + } + match shutdown { Shutdown::None => {} Shutdown::Asap => return Poll::Ready(Err(ConnectionError::KeepAliveTimeout)), @@ -430,6 +397,7 @@ where timeout, upgrade, *substream_upgrade_protocol_override, + stream_counter.clone(), )); continue; // Go back to the top, handler can potentially make progress again. @@ -443,7 +411,11 @@ where Poll::Ready(substream) => { let protocol = handler.listen_protocol(); - negotiating_in.push(StreamUpgrade::new_inbound(substream, protocol)); + negotiating_in.push(StreamUpgrade::new_inbound( + substream, + protocol, + stream_counter.clone(), + )); continue; // Go back to the top, handler can potentially make progress again. } @@ -484,6 +456,69 @@ fn gather_supported_protocols(handler: &impl ConnectionHandler) -> HashSet Option { + // Ask the handler whether it wants the connection (and the handler itself) + // to be kept alive, which determines the planned shutdown, if any. + let keep_alive = handler.connection_keep_alive(); + match (current_shutdown, keep_alive) { + (Shutdown::Later(_, deadline), KeepAlive::Until(t)) => { + let now = Instant::now(); + + if *deadline != t { + let new_deadline = t; + if let Some(new_duration) = new_deadline.checked_duration_since(now) { + let effective_keep_alive = max(new_duration, idle_timeout); + + let safe_keep_alive = checked_add_fraction(now, effective_keep_alive); + return Some(Shutdown::Later( + Delay::new(safe_keep_alive), + new_deadline, + )); + } + } + + None + } + (_, KeepAlive::Until(earliest_shutdown)) => { + let now = Instant::now(); + + if let Some(requested) = earliest_shutdown.checked_duration_since(now) { + let effective_keep_alive = max(requested, idle_timeout); + + let safe_keep_alive = checked_add_fraction(now, effective_keep_alive); + + // Important: We store the _original_ `Instant` given by the `ConnectionHandler` in the `Later` instance to ensure we can compare it in the above branch. + // This is quite subtle but will hopefully become simpler soon once `KeepAlive::Until` is fully deprecated. See / + return Some(Shutdown::Later( + Delay::new(safe_keep_alive), + earliest_shutdown, + )); + } + + None + } + (_, KeepAlive::No) if idle_timeout == Duration::ZERO => Some(Shutdown::Asap), + (Shutdown::Later(_, _), KeepAlive::No) => { + // Do nothing, i.e. let the shutdown timer continue to tick. + None + } + (_, KeepAlive::No) => { + let now = Instant::now(); + let safe_keep_alive = checked_add_fraction(now, idle_timeout); + + Some(Shutdown::Later( + Delay::new(safe_keep_alive), + now + safe_keep_alive, + )) + } + (_, KeepAlive::Yes) => Some(Shutdown::None), + } +} + /// Repeatedly halves and adds the [`Duration`] to the [`Instant`] until [`Instant::checked_add`] succeeds. /// /// [`Instant`] depends on the underlying platform and has a limit of which points in time it can represent. @@ -530,6 +565,7 @@ impl StreamUpgrade { timeout: Delay, upgrade: Upgrade, version_override: Option, + counter: Arc<()>, ) -> Self where Upgrade: OutboundUpgradeSend, @@ -561,7 +597,7 @@ impl StreamUpgrade { .map_err(to_stream_upgrade_error)?; let output = upgrade - .upgrade_outbound(Stream::new(stream), info) + .upgrade_outbound(Stream::new(stream, counter), info) .await .map_err(StreamUpgradeError::Apply)?; @@ -575,6 +611,7 @@ impl StreamUpgrade { fn new_inbound( substream: SubstreamBox, protocol: SubstreamProtocol, + counter: Arc<()>, ) -> Self where Upgrade: InboundUpgradeSend, @@ -593,7 +630,7 @@ impl StreamUpgrade { .map_err(to_stream_upgrade_error)?; let output = upgrade - .upgrade_inbound(Stream::new(stream), info) + .upgrade_inbound(Stream::new(stream, counter), info) .await .map_err(StreamUpgradeError::Apply)?; diff --git a/swarm/src/handler.rs b/swarm/src/handler.rs index 9374903f9b7..55894622af2 100644 --- a/swarm/src/handler.rs +++ b/swarm/src/handler.rs @@ -125,10 +125,11 @@ pub trait ConnectionHandler: Send + 'static { /// Returns until when the connection should be kept alive. /// - /// This method is called by the `Swarm` after each invocation of - /// [`ConnectionHandler::poll`] to determine if the connection and the associated - /// [`ConnectionHandler`]s should be kept alive as far as this handler is concerned - /// and if so, for how long. + /// `Swarm` checks if there are still active streams on this connection after + /// each invocation of [`ConnectionHandler::poll`]. If no, this method will + /// be called by the `Swarm` to determine if the connection and the associated + /// [`ConnectionHandler`]s should be kept alive as far as this handler is + /// concerned and if so, for how long. /// /// Returning [`KeepAlive::No`] indicates that the connection should be /// closed and this handler destroyed immediately. @@ -139,10 +140,19 @@ pub trait ConnectionHandler: Send + 'static { /// Returning [`KeepAlive::Yes`] indicates that the connection should /// be kept alive until the next call to this method. /// - /// > **Note**: The connection is always closed and the handler destroyed - /// > when [`ConnectionHandler::poll`] returns an error. Furthermore, the - /// > connection may be closed for reasons outside of the control - /// > of the handler. + /// By default, connections are considered active and thus kept-alive whilst: + /// + /// - There are streams currently being upgraded, see [`InboundUpgrade`](libp2p_core::upgrade::InboundUpgrade) and [`OutboundUpgrade`](libp2p_core::upgrade::OutboundUpgrade). + /// - There are still active streams, i.e. instances of [`Stream`](crate::stream::Stream) where the user did not call [Stream::no_keep_alive](crate::stream::Stream::no_keep_alive). + /// - The ConnectionHandler returns Poll::Ready. + /// + /// Only once none of these conditions are true do we invoke this function to determine, + /// whether the connection should be kept alive even further. + /// Note that for most protocols, this is not necessary as it represents a completely idle + /// connection with no active and no pending streams. + /// + /// If you'd like to delay the shutdown of idle connections, consider configuring + /// [SwarmBuilder::idle_connection_timeout](crate::SwarmBuilder) in your applications. fn connection_keep_alive(&self) -> KeepAlive; /// Should behave like `Stream::poll()`. diff --git a/swarm/src/stream.rs b/swarm/src/stream.rs index 3c4c52afc33..13e6588128f 100644 --- a/swarm/src/stream.rs +++ b/swarm/src/stream.rs @@ -1,16 +1,43 @@ use futures::{AsyncRead, AsyncWrite}; use libp2p_core::muxing::SubstreamBox; use libp2p_core::Negotiated; -use std::io::{IoSlice, IoSliceMut}; -use std::pin::Pin; -use std::task::{Context, Poll}; +use std::{ + io::{IoSlice, IoSliceMut}, + pin::Pin, + sync::{Arc, Weak}, + task::{Context, Poll}, +}; #[derive(Debug)] -pub struct Stream(Negotiated); +pub struct Stream { + stream: Negotiated, + counter: StreamCounter, +} + +#[derive(Debug)] +enum StreamCounter { + Arc(Arc<()>), + Weak(Weak<()>), +} impl Stream { - pub(crate) fn new(stream: Negotiated) -> Self { - Self(stream) + pub(crate) fn new(stream: Negotiated, counter: Arc<()>) -> Self { + let counter = StreamCounter::Arc(counter); + Self { stream, counter } + } + + /// Opt-out this stream from the [Swarm](crate::Swarm)s connection keep alive algorithm. + /// + /// By default, any active stream keeps a connection alive. For most protocols, + /// this is a good default as it ensures that the protocol is completed before + /// a connection is shut down. + /// Some protocols like libp2p's [ping](https://github.com/libp2p/specs/blob/master/ping/ping.md) + /// for example never complete and are of an auxiliary nature. + /// These protocols should opt-out of the keep alive algorithm using this method. + pub fn no_keep_alive(&mut self) { + if let StreamCounter::Arc(arc_counter) = &self.counter { + self.counter = StreamCounter::Weak(Arc::downgrade(arc_counter)); + } } } @@ -20,7 +47,7 @@ impl AsyncRead for Stream { cx: &mut Context<'_>, buf: &mut [u8], ) -> Poll> { - Pin::new(&mut self.get_mut().0).poll_read(cx, buf) + Pin::new(&mut self.get_mut().stream).poll_read(cx, buf) } fn poll_read_vectored( @@ -28,7 +55,7 @@ impl AsyncRead for Stream { cx: &mut Context<'_>, bufs: &mut [IoSliceMut<'_>], ) -> Poll> { - Pin::new(&mut self.get_mut().0).poll_read_vectored(cx, bufs) + Pin::new(&mut self.get_mut().stream).poll_read_vectored(cx, bufs) } } @@ -38,7 +65,7 @@ impl AsyncWrite for Stream { cx: &mut Context<'_>, buf: &[u8], ) -> Poll> { - Pin::new(&mut self.get_mut().0).poll_write(cx, buf) + Pin::new(&mut self.get_mut().stream).poll_write(cx, buf) } fn poll_write_vectored( @@ -46,14 +73,14 @@ impl AsyncWrite for Stream { cx: &mut Context<'_>, bufs: &[IoSlice<'_>], ) -> Poll> { - Pin::new(&mut self.get_mut().0).poll_write_vectored(cx, bufs) + Pin::new(&mut self.get_mut().stream).poll_write_vectored(cx, bufs) } fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - Pin::new(&mut self.get_mut().0).poll_flush(cx) + Pin::new(&mut self.get_mut().stream).poll_flush(cx) } fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - Pin::new(&mut self.get_mut().0).poll_close(cx) + Pin::new(&mut self.get_mut().stream).poll_close(cx) } } From d6d7516efedfd8d2994f2241296366f3a03331c0 Mon Sep 17 00:00:00 2001 From: Tuong Date: Sat, 14 Oct 2023 14:46:51 +0700 Subject: [PATCH 06/10] refactor(swarm): remove unrelated parts and extract computing new shutdown --- swarm/src/connection.rs | 39 +++++++++---------------------- swarm/src/handler.rs | 26 +++++++-------------- swarm/src/stream.rs | 51 ++++++++++------------------------------- 3 files changed, 31 insertions(+), 85 deletions(-) diff --git a/swarm/src/connection.rs b/swarm/src/connection.rs index 02a06c253c0..87e7e353fc6 100644 --- a/swarm/src/connection.rs +++ b/swarm/src/connection.rs @@ -57,16 +57,12 @@ use std::collections::HashSet; use std::fmt::{Display, Formatter}; use std::future::Future; use std::sync::atomic::{AtomicUsize, Ordering}; -use std::sync::Arc; use std::task::Waker; use std::time::Duration; use std::{fmt, io, mem, pin::Pin, task::Context, task::Poll}; static NEXT_CONNECTION_ID: AtomicUsize = AtomicUsize::new(1); -/// Counter of the number of active streams on a connection -type ActiveStreamCounter = Arc<()>; - /// Connection identifier. #[derive(Debug, Copy, Clone, Hash, PartialEq, Eq, PartialOrd, Ord)] pub struct ConnectionId(usize); @@ -161,8 +157,6 @@ where local_supported_protocols: HashSet, remote_supported_protocols: HashSet, idle_timeout: Duration, - /// The counter of active streams - stream_counter: ActiveStreamCounter, } impl fmt::Debug for Connection @@ -211,7 +205,6 @@ where local_supported_protocols: initial_protocols, remote_supported_protocols: Default::default(), idle_timeout, - stream_counter: Arc::new(()), } } @@ -244,7 +237,6 @@ where local_supported_protocols: supported_protocols, remote_supported_protocols, idle_timeout, - stream_counter, } = self.get_mut(); loop { @@ -352,17 +344,17 @@ where } } + // Compute new shutdown + if let Some(new_shutdown) = compute_new_shutdown(handler, shutdown, *idle_timeout) { + *shutdown = new_shutdown; + } + // Check if the connection (and handler) should be shut down. // As long as we're still negotiating substreams, shutdown is always postponed. if negotiating_in.is_empty() && negotiating_out.is_empty() && requested_substreams.is_empty() - && Arc::strong_count(stream_counter) == 1 { - if let Some(new_timeout) = compute_new_shutdown(handler, shutdown, *idle_timeout) { - *shutdown = new_timeout; - } - match shutdown { Shutdown::None => {} Shutdown::Asap => return Poll::Ready(Err(ConnectionError::KeepAliveTimeout)), @@ -397,7 +389,6 @@ where timeout, upgrade, *substream_upgrade_protocol_override, - stream_counter.clone(), )); continue; // Go back to the top, handler can potentially make progress again. @@ -411,11 +402,7 @@ where Poll::Ready(substream) => { let protocol = handler.listen_protocol(); - negotiating_in.push(StreamUpgrade::new_inbound( - substream, - protocol, - stream_counter.clone(), - )); + negotiating_in.push(StreamUpgrade::new_inbound(substream, protocol)); continue; // Go back to the top, handler can potentially make progress again. } @@ -469,18 +456,17 @@ fn compute_new_shutdown( let now = Instant::now(); if *deadline != t { - let new_deadline = t; - if let Some(new_duration) = new_deadline.checked_duration_since(now) { + let deadline = t; + if let Some(new_duration) = deadline.checked_duration_since(Instant::now()) { let effective_keep_alive = max(new_duration, idle_timeout); let safe_keep_alive = checked_add_fraction(now, effective_keep_alive); return Some(Shutdown::Later( Delay::new(safe_keep_alive), - new_deadline, + deadline, )); } } - None } (_, KeepAlive::Until(earliest_shutdown)) => { @@ -498,7 +484,6 @@ fn compute_new_shutdown( earliest_shutdown, )); } - None } (_, KeepAlive::No) if idle_timeout == Duration::ZERO => Some(Shutdown::Asap), @@ -565,7 +550,6 @@ impl StreamUpgrade { timeout: Delay, upgrade: Upgrade, version_override: Option, - counter: Arc<()>, ) -> Self where Upgrade: OutboundUpgradeSend, @@ -597,7 +581,7 @@ impl StreamUpgrade { .map_err(to_stream_upgrade_error)?; let output = upgrade - .upgrade_outbound(Stream::new(stream, counter), info) + .upgrade_outbound(Stream::new(stream), info) .await .map_err(StreamUpgradeError::Apply)?; @@ -611,7 +595,6 @@ impl StreamUpgrade { fn new_inbound( substream: SubstreamBox, protocol: SubstreamProtocol, - counter: Arc<()>, ) -> Self where Upgrade: InboundUpgradeSend, @@ -630,7 +613,7 @@ impl StreamUpgrade { .map_err(to_stream_upgrade_error)?; let output = upgrade - .upgrade_inbound(Stream::new(stream, counter), info) + .upgrade_inbound(Stream::new(stream), info) .await .map_err(StreamUpgradeError::Apply)?; diff --git a/swarm/src/handler.rs b/swarm/src/handler.rs index 55894622af2..9374903f9b7 100644 --- a/swarm/src/handler.rs +++ b/swarm/src/handler.rs @@ -125,11 +125,10 @@ pub trait ConnectionHandler: Send + 'static { /// Returns until when the connection should be kept alive. /// - /// `Swarm` checks if there are still active streams on this connection after - /// each invocation of [`ConnectionHandler::poll`]. If no, this method will - /// be called by the `Swarm` to determine if the connection and the associated - /// [`ConnectionHandler`]s should be kept alive as far as this handler is - /// concerned and if so, for how long. + /// This method is called by the `Swarm` after each invocation of + /// [`ConnectionHandler::poll`] to determine if the connection and the associated + /// [`ConnectionHandler`]s should be kept alive as far as this handler is concerned + /// and if so, for how long. /// /// Returning [`KeepAlive::No`] indicates that the connection should be /// closed and this handler destroyed immediately. @@ -140,19 +139,10 @@ pub trait ConnectionHandler: Send + 'static { /// Returning [`KeepAlive::Yes`] indicates that the connection should /// be kept alive until the next call to this method. /// - /// By default, connections are considered active and thus kept-alive whilst: - /// - /// - There are streams currently being upgraded, see [`InboundUpgrade`](libp2p_core::upgrade::InboundUpgrade) and [`OutboundUpgrade`](libp2p_core::upgrade::OutboundUpgrade). - /// - There are still active streams, i.e. instances of [`Stream`](crate::stream::Stream) where the user did not call [Stream::no_keep_alive](crate::stream::Stream::no_keep_alive). - /// - The ConnectionHandler returns Poll::Ready. - /// - /// Only once none of these conditions are true do we invoke this function to determine, - /// whether the connection should be kept alive even further. - /// Note that for most protocols, this is not necessary as it represents a completely idle - /// connection with no active and no pending streams. - /// - /// If you'd like to delay the shutdown of idle connections, consider configuring - /// [SwarmBuilder::idle_connection_timeout](crate::SwarmBuilder) in your applications. + /// > **Note**: The connection is always closed and the handler destroyed + /// > when [`ConnectionHandler::poll`] returns an error. Furthermore, the + /// > connection may be closed for reasons outside of the control + /// > of the handler. fn connection_keep_alive(&self) -> KeepAlive; /// Should behave like `Stream::poll()`. diff --git a/swarm/src/stream.rs b/swarm/src/stream.rs index 13e6588128f..3c4c52afc33 100644 --- a/swarm/src/stream.rs +++ b/swarm/src/stream.rs @@ -1,43 +1,16 @@ use futures::{AsyncRead, AsyncWrite}; use libp2p_core::muxing::SubstreamBox; use libp2p_core::Negotiated; -use std::{ - io::{IoSlice, IoSliceMut}, - pin::Pin, - sync::{Arc, Weak}, - task::{Context, Poll}, -}; +use std::io::{IoSlice, IoSliceMut}; +use std::pin::Pin; +use std::task::{Context, Poll}; #[derive(Debug)] -pub struct Stream { - stream: Negotiated, - counter: StreamCounter, -} - -#[derive(Debug)] -enum StreamCounter { - Arc(Arc<()>), - Weak(Weak<()>), -} +pub struct Stream(Negotiated); impl Stream { - pub(crate) fn new(stream: Negotiated, counter: Arc<()>) -> Self { - let counter = StreamCounter::Arc(counter); - Self { stream, counter } - } - - /// Opt-out this stream from the [Swarm](crate::Swarm)s connection keep alive algorithm. - /// - /// By default, any active stream keeps a connection alive. For most protocols, - /// this is a good default as it ensures that the protocol is completed before - /// a connection is shut down. - /// Some protocols like libp2p's [ping](https://github.com/libp2p/specs/blob/master/ping/ping.md) - /// for example never complete and are of an auxiliary nature. - /// These protocols should opt-out of the keep alive algorithm using this method. - pub fn no_keep_alive(&mut self) { - if let StreamCounter::Arc(arc_counter) = &self.counter { - self.counter = StreamCounter::Weak(Arc::downgrade(arc_counter)); - } + pub(crate) fn new(stream: Negotiated) -> Self { + Self(stream) } } @@ -47,7 +20,7 @@ impl AsyncRead for Stream { cx: &mut Context<'_>, buf: &mut [u8], ) -> Poll> { - Pin::new(&mut self.get_mut().stream).poll_read(cx, buf) + Pin::new(&mut self.get_mut().0).poll_read(cx, buf) } fn poll_read_vectored( @@ -55,7 +28,7 @@ impl AsyncRead for Stream { cx: &mut Context<'_>, bufs: &mut [IoSliceMut<'_>], ) -> Poll> { - Pin::new(&mut self.get_mut().stream).poll_read_vectored(cx, bufs) + Pin::new(&mut self.get_mut().0).poll_read_vectored(cx, bufs) } } @@ -65,7 +38,7 @@ impl AsyncWrite for Stream { cx: &mut Context<'_>, buf: &[u8], ) -> Poll> { - Pin::new(&mut self.get_mut().stream).poll_write(cx, buf) + Pin::new(&mut self.get_mut().0).poll_write(cx, buf) } fn poll_write_vectored( @@ -73,14 +46,14 @@ impl AsyncWrite for Stream { cx: &mut Context<'_>, bufs: &[IoSlice<'_>], ) -> Poll> { - Pin::new(&mut self.get_mut().stream).poll_write_vectored(cx, bufs) + Pin::new(&mut self.get_mut().0).poll_write_vectored(cx, bufs) } fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - Pin::new(&mut self.get_mut().stream).poll_flush(cx) + Pin::new(&mut self.get_mut().0).poll_flush(cx) } fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - Pin::new(&mut self.get_mut().stream).poll_close(cx) + Pin::new(&mut self.get_mut().0).poll_close(cx) } } From faa6083cda0e41765a16f9456184e486da879973 Mon Sep 17 00:00:00 2001 From: Max Inden Date: Wed, 18 Oct 2023 10:44:43 +0200 Subject: [PATCH 07/10] Add quickcheck test Panic reproduced with test without patch. --- swarm/src/connection.rs | 100 +++++++++++++++++++++++++++++++++++----- 1 file changed, 89 insertions(+), 11 deletions(-) diff --git a/swarm/src/connection.rs b/swarm/src/connection.rs index 87e7e353fc6..ee558d93292 100644 --- a/swarm/src/connection.rs +++ b/swarm/src/connection.rs @@ -345,8 +345,10 @@ where } // Compute new shutdown - if let Some(new_shutdown) = compute_new_shutdown(handler, shutdown, *idle_timeout) { - *shutdown = new_shutdown; + if let Some(new_shutdown) = + compute_new_shutdown(handler.connection_keep_alive(), shutdown, *idle_timeout) + { + *shutdown = new_shutdown; } // Check if the connection (and handler) should be shut down. @@ -444,14 +446,11 @@ fn gather_supported_protocols(handler: &impl ConnectionHandler) -> HashSet Option { - // Ask the handler whether it wants the connection (and the handler itself) - // to be kept alive, which determines the planned shutdown, if any. - let keep_alive = handler.connection_keep_alive(); - match (current_shutdown, keep_alive) { + match (current_shutdown, handler_keep_alive) { (Shutdown::Later(_, deadline), KeepAlive::Until(t)) => { let now = Instant::now(); @@ -461,10 +460,7 @@ fn compute_new_shutdown( let effective_keep_alive = max(new_duration, idle_timeout); let safe_keep_alive = checked_add_fraction(now, effective_keep_alive); - return Some(Shutdown::Later( - Delay::new(safe_keep_alive), - deadline, - )); + return Some(Shutdown::Later(Delay::new(safe_keep_alive), deadline)); } } None @@ -1008,6 +1004,88 @@ mod tests { assert!(start.checked_add(duration).is_some()) } + #[test] + fn compute_new_shutdown_does_not_panic() { + let _ = env_logger::try_init(); + + #[derive(Clone, Debug)] + struct ArbitraryKeepAlive(KeepAlive); + + impl Arbitrary for ArbitraryKeepAlive { + fn arbitrary(g: &mut Gen) -> Self { + let keep_alive = match g.gen_range(1u8..4) { + 1 => KeepAlive::Until( + Instant::now() + .checked_add(Duration::from_secs(u64::arbitrary(g))) + .unwrap_or(Instant::now()), + ), + 2 => KeepAlive::Yes, + 3 => KeepAlive::No, + _ => unreachable!(), + }; + + Self(keep_alive) + } + } + + #[derive(Debug)] + struct ArbitraryShutdown(Shutdown); + + impl Clone for ArbitraryShutdown { + fn clone(&self) -> Self { + let shutdown = match self.0 { + Shutdown::None => Shutdown::None, + Shutdown::Asap => Shutdown::Asap, + Shutdown::Later(_, instant) => Shutdown::Later( + // compute_new_shutdown does not touch the delay. Delay does not + // implement Clone. Thus use a placeholder delay. + Delay::new(Duration::from_secs(1)), + instant.clone(), + ), + }; + + ArbitraryShutdown(shutdown) + } + } + + impl Arbitrary for ArbitraryShutdown { + fn arbitrary(g: &mut Gen) -> Self { + let shutdown = match g.gen_range(1u8..4) { + 1 => Shutdown::None, + 2 => Shutdown::Asap, + 3 => Shutdown::Later( + Delay::new(Duration::from_secs(u32::arbitrary(g) as u64)), + Instant::now() + .checked_add(Duration::from_secs(u64::arbitrary(g))) + .unwrap_or(Instant::now()), + ), + _ => unreachable!(), + }; + + Self(shutdown) + } + } + + #[derive(Clone, Debug)] + struct ArbitraryDuration(Duration); + + impl Arbitrary for ArbitraryDuration { + fn arbitrary(g: &mut Gen) -> Self { + Self(Duration::from_secs(u64::arbitrary(g))) + } + } + + fn prop( + handler_keep_alive: ArbitraryKeepAlive, + current_shutdown: ArbitraryShutdown, + idle_timeout: ArbitraryDuration, + ) { + compute_new_shutdown(handler_keep_alive.0, ¤t_shutdown.0, idle_timeout.0); + } + + QuickCheck::new().quickcheck(prop as fn(_, _, _)); + } + struct KeepAliveUntilConnectionHandler { until: Instant, } From 0361da266483e6f201468c05c3d5ce10678149f9 Mon Sep 17 00:00:00 2001 From: Max Inden Date: Wed, 18 Oct 2023 11:13:52 +0200 Subject: [PATCH 08/10] Use Duration::arbitrary directly --- swarm/src/connection.rs | 17 ++++------------- 1 file changed, 4 insertions(+), 13 deletions(-) diff --git a/swarm/src/connection.rs b/swarm/src/connection.rs index ee558d93292..4cd972ba987 100644 --- a/swarm/src/connection.rs +++ b/swarm/src/connection.rs @@ -1016,7 +1016,7 @@ mod tests { let keep_alive = match g.gen_range(1u8..4) { 1 => KeepAlive::Until( Instant::now() - .checked_add(Duration::from_secs(u64::arbitrary(g))) + .checked_add(Duration::arbitrary(g)) .unwrap_or(Instant::now()), ), 2 => KeepAlive::Yes, @@ -1056,7 +1056,7 @@ mod tests { 3 => Shutdown::Later( Delay::new(Duration::from_secs(u32::arbitrary(g) as u64)), Instant::now() - .checked_add(Duration::from_secs(u64::arbitrary(g))) + .checked_add(Duration::arbitrary(g)) .unwrap_or(Instant::now()), ), _ => unreachable!(), @@ -1066,21 +1066,12 @@ mod tests { } } - #[derive(Clone, Debug)] - struct ArbitraryDuration(Duration); - - impl Arbitrary for ArbitraryDuration { - fn arbitrary(g: &mut Gen) -> Self { - Self(Duration::from_secs(u64::arbitrary(g))) - } - } - fn prop( handler_keep_alive: ArbitraryKeepAlive, current_shutdown: ArbitraryShutdown, - idle_timeout: ArbitraryDuration, + idle_timeout: Duration, ) { - compute_new_shutdown(handler_keep_alive.0, ¤t_shutdown.0, idle_timeout.0); + compute_new_shutdown(handler_keep_alive.0, ¤t_shutdown.0, idle_timeout); } QuickCheck::new().quickcheck(prop as fn(_, _, _)); From 7637af31a0ca8d4a4bd4600c0284c62b6d853908 Mon Sep 17 00:00:00 2001 From: Max Inden Date: Wed, 18 Oct 2023 12:02:06 +0200 Subject: [PATCH 09/10] Implement Arbitrary on KeepAlive directly --- swarm/src/connection.rs | 24 ++---------------------- swarm/src/handler.rs | 20 ++++++++++++++++++++ 2 files changed, 22 insertions(+), 22 deletions(-) diff --git a/swarm/src/connection.rs b/swarm/src/connection.rs index 1136b060f82..d1be6e844c2 100644 --- a/swarm/src/connection.rs +++ b/swarm/src/connection.rs @@ -1009,26 +1009,6 @@ mod tests { fn compute_new_shutdown_does_not_panic() { let _ = env_logger::try_init(); - #[derive(Clone, Debug)] - struct ArbitraryKeepAlive(KeepAlive); - - impl Arbitrary for ArbitraryKeepAlive { - fn arbitrary(g: &mut Gen) -> Self { - let keep_alive = match g.gen_range(1u8..4) { - 1 => KeepAlive::Until( - Instant::now() - .checked_add(Duration::arbitrary(g)) - .unwrap_or(Instant::now()), - ), - 2 => KeepAlive::Yes, - 3 => KeepAlive::No, - _ => unreachable!(), - }; - - Self(keep_alive) - } - } - #[derive(Debug)] struct ArbitraryShutdown(Shutdown); @@ -1068,11 +1048,11 @@ mod tests { } fn prop( - handler_keep_alive: ArbitraryKeepAlive, + handler_keep_alive: KeepAlive, current_shutdown: ArbitraryShutdown, idle_timeout: Duration, ) { - compute_new_shutdown(handler_keep_alive.0, ¤t_shutdown.0, idle_timeout); + compute_new_shutdown(handler_keep_alive, ¤t_shutdown.0, idle_timeout); } QuickCheck::new().quickcheck(prop as fn(_, _, _)); diff --git a/swarm/src/handler.rs b/swarm/src/handler.rs index 67ca095ed1e..02eb9f83935 100644 --- a/swarm/src/handler.rs +++ b/swarm/src/handler.rs @@ -765,6 +765,26 @@ impl Ord for KeepAlive { } } +#[cfg(test)] +impl quickcheck::Arbitrary for KeepAlive { + fn arbitrary(g: &mut quickcheck::Gen) -> Self { + match quickcheck::GenRange::gen_range(g, 1u8..4) { + 1 => + { + #[allow(deprecated)] + KeepAlive::Until( + Instant::now() + .checked_add(Duration::arbitrary(g)) + .unwrap_or(Instant::now()), + ) + } + 2 => KeepAlive::Yes, + 3 => KeepAlive::No, + _ => unreachable!(), + } + } +} + /// A statically declared, empty [`HashSet`] allows us to work around borrow-checker rules for /// [`ProtocolsAdded::from_set`]. The lifetimes don't work unless we have a [`HashSet`] with a `'static' lifetime. static EMPTY_HASHSET: Lazy> = Lazy::new(HashSet::new); From daffde46fb54fb87d3ce35de66008cdd4718ed5e Mon Sep 17 00:00:00 2001 From: Max Inden Date: Wed, 18 Oct 2023 13:23:44 +0200 Subject: [PATCH 10/10] Don't clone Copy type --- swarm/src/connection.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/swarm/src/connection.rs b/swarm/src/connection.rs index d1be6e844c2..a9c56c80d63 100644 --- a/swarm/src/connection.rs +++ b/swarm/src/connection.rs @@ -1021,7 +1021,7 @@ mod tests { // compute_new_shutdown does not touch the delay. Delay does not // implement Clone. Thus use a placeholder delay. Delay::new(Duration::from_secs(1)), - instant.clone(), + instant, ), };