From 52f11b968ab57da239c4e2a22e289b8da9ab140f Mon Sep 17 00:00:00 2001 From: Sumit <106421807+startup-dreamer@users.noreply.github.com> Date: Wed, 20 Sep 2023 04:02:29 +0530 Subject: [PATCH] feat(swarm): allow configuration to idle connection timeout Previously, a connection would be shut down immediately as soon as its `ConnectionHandler` reports `KeepAlive::No`. As we have gained experience with libp2p, it turned out that this isn't ideal. For one, tests often need to keep connections alive longer than the configured protocols require. Plus, some usecases require connections to be kept alive in general. Both of these needs are currently served by the `keep_alive::Behaviour`. That one does essentially nothing other than statically returning `KeepAlive::Yes` from its `ConnectionHandler`. It makes much more sense to deprecate `keep_alive::Behaviour` and instead allow users to globally configure an `idle_conncetion_timeout` on the `Swarm`. This timeout comes into effect once a `ConnectionHandler` reports `KeepAlive::No`. To start with, this timeout is 0. Together with https://github.com/libp2p/rust-libp2p/issues/3844, this will allow us to move towards a much more aggressive closing of idle connections, together with a more ergonomic way of opting out of this behaviour. Fixes #4121. Pull-Request: #4161. --- examples/browser-webrtc/src/main.rs | 19 +- examples/metrics/src/main.rs | 9 +- examples/ping-example/src/main.rs | 16 +- examples/rendezvous/src/bin/rzv-discover.rs | 5 +- examples/rendezvous/src/bin/rzv-identify.rs | 5 +- examples/rendezvous/src/bin/rzv-register.rs | 5 +- examples/rendezvous/src/main.rs | 5 +- interop-tests/src/lib.rs | 9 +- libp2p/src/tutorials/ping.rs | 97 +++++------ misc/allow-block-list/src/lib.rs | 98 +++-------- misc/connection-limits/src/lib.rs | 3 - protocols/identify/tests/smoke.rs | 82 +++------ protocols/ping/tests/ping.rs | 38 ++-- swarm-test/src/lib.rs | 4 +- swarm/CHANGELOG.md | 4 + swarm/Cargo.toml | 1 + swarm/src/connection.rs | 182 ++++++++++++++++++-- swarm/src/connection/pool.rs | 10 +- swarm/src/lib.rs | 71 ++++---- transports/pnet/tests/smoke.rs | 13 +- transports/tls/tests/smoke.rs | 14 +- transports/webrtc/examples/listen_ping.rs | 19 +- 22 files changed, 378 insertions(+), 331 deletions(-) diff --git a/examples/browser-webrtc/src/main.rs b/examples/browser-webrtc/src/main.rs index f919f047af55..8a4034a436ee 100644 --- a/examples/browser-webrtc/src/main.rs +++ b/examples/browser-webrtc/src/main.rs @@ -13,11 +13,12 @@ use libp2p::{ identity, multiaddr::{Multiaddr, Protocol}, ping, - swarm::{keep_alive, NetworkBehaviour, SwarmBuilder, SwarmEvent}, + swarm::{SwarmBuilder, SwarmEvent}, }; use libp2p_webrtc as webrtc; use rand::thread_rng; use std::net::{Ipv4Addr, SocketAddr}; +use std::time::Duration; use tower_http::cors::{Any, CorsLayer}; #[tokio::main] @@ -36,12 +37,10 @@ async fn main() -> anyhow::Result<()> { .map(|(peer_id, conn), _| (peer_id, StreamMuxerBox::new(conn))) .boxed(); - let behaviour = Behaviour { - ping: ping::Behaviour::new(ping::Config::new()), - keep_alive: keep_alive::Behaviour, - }; - - let mut swarm = SwarmBuilder::with_tokio_executor(transport, behaviour, local_peer_id).build(); + let mut swarm = + SwarmBuilder::with_tokio_executor(transport, ping::Behaviour::default(), local_peer_id) + .idle_connection_timeout(Duration::from_secs(30)) // Allows us to observe the pings. + .build(); let address_webrtc = Multiaddr::from(Ipv4Addr::UNSPECIFIED) .with(Protocol::Udp(0)) @@ -84,12 +83,6 @@ async fn main() -> anyhow::Result<()> { Ok(()) } -#[derive(NetworkBehaviour)] -struct Behaviour { - ping: ping::Behaviour, - keep_alive: keep_alive::Behaviour, -} - #[derive(rust_embed::RustEmbed)] #[folder = "$CARGO_MANIFEST_DIR/static"] struct StaticFiles; diff --git a/examples/metrics/src/main.rs b/examples/metrics/src/main.rs index fa9d5bd37e70..b28abaee9418 100644 --- a/examples/metrics/src/main.rs +++ b/examples/metrics/src/main.rs @@ -26,12 +26,13 @@ use futures::stream::StreamExt; use libp2p::core::{upgrade::Version, Multiaddr, Transport}; use libp2p::identity::PeerId; use libp2p::metrics::{Metrics, Recorder}; -use libp2p::swarm::{keep_alive, NetworkBehaviour, SwarmBuilder, SwarmEvent}; +use libp2p::swarm::{NetworkBehaviour, SwarmBuilder, SwarmEvent}; use libp2p::{identify, identity, noise, ping, tcp, yamux}; use log::info; use prometheus_client::registry::Registry; use std::error::Error; use std::thread; +use std::time::Duration; mod http_service; @@ -51,6 +52,7 @@ fn main() -> Result<(), Box> { Behaviour::new(local_pub_key), local_peer_id, ) + .idle_connection_timeout(Duration::from_secs(60)) .build(); swarm.listen_on("/ip4/0.0.0.0/tcp/0".parse()?)?; @@ -87,13 +89,9 @@ fn main() -> Result<(), Box> { } /// Our network behaviour. -/// -/// For illustrative purposes, this includes the [`keep_alive::Behaviour`]) behaviour so the ping actually happen -/// and can be observed via the metrics. #[derive(NetworkBehaviour)] struct Behaviour { identify: identify::Behaviour, - keep_alive: keep_alive::Behaviour, ping: ping::Behaviour, } @@ -105,7 +103,6 @@ impl Behaviour { "/ipfs/0.1.0".into(), local_pub_key, )), - keep_alive: keep_alive::Behaviour, } } } diff --git a/examples/ping-example/src/main.rs b/examples/ping-example/src/main.rs index 6af079ccfd08..898a25813e05 100644 --- a/examples/ping-example/src/main.rs +++ b/examples/ping-example/src/main.rs @@ -24,10 +24,11 @@ use futures::prelude::*; use libp2p::core::upgrade::Version; use libp2p::{ identity, noise, ping, - swarm::{keep_alive, NetworkBehaviour, SwarmBuilder, SwarmEvent}, + swarm::{SwarmBuilder, SwarmEvent}, tcp, yamux, Multiaddr, PeerId, Transport, }; use std::error::Error; +use std::time::Duration; #[async_std::main] async fn main() -> Result<(), Box> { @@ -42,7 +43,8 @@ async fn main() -> Result<(), Box> { .boxed(); let mut swarm = - SwarmBuilder::with_async_std_executor(transport, Behaviour::default(), local_peer_id) + SwarmBuilder::with_async_std_executor(transport, ping::Behaviour::default(), local_peer_id) + .idle_connection_timeout(Duration::from_secs(60)) // For illustrative purposes, keep idle connections alive for a minute so we can observe a few pings. .build(); // Tell the swarm to listen on all interfaces and a random, OS-assigned @@ -65,13 +67,3 @@ async fn main() -> Result<(), Box> { } } } - -/// Our network behaviour. -/// -/// For illustrative purposes, this includes the [`KeepAlive`](keep_alive::Behaviour) behaviour so a continuous sequence of -/// pings can be observed. -#[derive(NetworkBehaviour, Default)] -struct Behaviour { - keep_alive: keep_alive::Behaviour, - ping: ping::Behaviour, -} diff --git a/examples/rendezvous/src/bin/rzv-discover.rs b/examples/rendezvous/src/bin/rzv-discover.rs index bd510496ae37..ac45afae8408 100644 --- a/examples/rendezvous/src/bin/rzv-discover.rs +++ b/examples/rendezvous/src/bin/rzv-discover.rs @@ -24,7 +24,7 @@ use libp2p::{ identity, multiaddr::Protocol, noise, ping, rendezvous, - swarm::{keep_alive, NetworkBehaviour, SwarmBuilder, SwarmEvent}, + swarm::{NetworkBehaviour, SwarmBuilder, SwarmEvent}, tcp, yamux, Multiaddr, PeerId, Transport, }; use std::time::Duration; @@ -50,10 +50,10 @@ async fn main() { MyBehaviour { rendezvous: rendezvous::client::Behaviour::new(key_pair.clone()), ping: ping::Behaviour::new(ping::Config::new().with_interval(Duration::from_secs(1))), - keep_alive: keep_alive::Behaviour, }, PeerId::from(key_pair.public()), ) + .idle_connection_timeout(Duration::from_secs(5)) .build(); swarm.dial(rendezvous_point_address.clone()).unwrap(); @@ -127,5 +127,4 @@ async fn main() { struct MyBehaviour { rendezvous: rendezvous::client::Behaviour, ping: ping::Behaviour, - keep_alive: keep_alive::Behaviour, } diff --git a/examples/rendezvous/src/bin/rzv-identify.rs b/examples/rendezvous/src/bin/rzv-identify.rs index 1ef8569c612b..95ed7a5ccd8a 100644 --- a/examples/rendezvous/src/bin/rzv-identify.rs +++ b/examples/rendezvous/src/bin/rzv-identify.rs @@ -22,7 +22,7 @@ use futures::StreamExt; use libp2p::{ core::transport::upgrade::Version, identify, identity, noise, ping, rendezvous, - swarm::{keep_alive, NetworkBehaviour, SwarmBuilder, SwarmEvent}, + swarm::{NetworkBehaviour, SwarmBuilder, SwarmEvent}, tcp, yamux, Multiaddr, PeerId, Transport, }; use std::time::Duration; @@ -50,10 +50,10 @@ async fn main() { )), rendezvous: rendezvous::client::Behaviour::new(key_pair.clone()), ping: ping::Behaviour::new(ping::Config::new().with_interval(Duration::from_secs(1))), - keep_alive: keep_alive::Behaviour, }, PeerId::from(key_pair.public()), ) + .idle_connection_timeout(Duration::from_secs(5)) .build(); let _ = swarm.listen_on("/ip4/0.0.0.0/tcp/0".parse().unwrap()); @@ -133,5 +133,4 @@ struct MyBehaviour { identify: identify::Behaviour, rendezvous: rendezvous::client::Behaviour, ping: ping::Behaviour, - keep_alive: keep_alive::Behaviour, } diff --git a/examples/rendezvous/src/bin/rzv-register.rs b/examples/rendezvous/src/bin/rzv-register.rs index 95407dd9c7d7..51acfee2a71b 100644 --- a/examples/rendezvous/src/bin/rzv-register.rs +++ b/examples/rendezvous/src/bin/rzv-register.rs @@ -22,7 +22,7 @@ use futures::StreamExt; use libp2p::{ core::transport::upgrade::Version, identity, noise, ping, rendezvous, - swarm::{keep_alive, NetworkBehaviour, SwarmBuilder, SwarmEvent}, + swarm::{NetworkBehaviour, SwarmBuilder, SwarmEvent}, tcp, yamux, Multiaddr, PeerId, Transport, }; use std::time::Duration; @@ -46,10 +46,10 @@ async fn main() { MyBehaviour { rendezvous: rendezvous::client::Behaviour::new(key_pair.clone()), ping: ping::Behaviour::new(ping::Config::new().with_interval(Duration::from_secs(1))), - keep_alive: keep_alive::Behaviour, }, PeerId::from(key_pair.public()), ) + .idle_connection_timeout(Duration::from_secs(5)) .build(); // In production the external address should be the publicly facing IP address of the rendezvous point. @@ -130,5 +130,4 @@ async fn main() { struct MyBehaviour { rendezvous: rendezvous::client::Behaviour, ping: ping::Behaviour, - keep_alive: keep_alive::Behaviour, } diff --git a/examples/rendezvous/src/main.rs b/examples/rendezvous/src/main.rs index 44b1716e176d..a3ed3c0fce5c 100644 --- a/examples/rendezvous/src/main.rs +++ b/examples/rendezvous/src/main.rs @@ -24,7 +24,7 @@ use futures::StreamExt; use libp2p::{ core::transport::upgrade::Version, identify, identity, noise, ping, rendezvous, - swarm::{keep_alive, NetworkBehaviour, SwarmBuilder, SwarmEvent}, + swarm::{NetworkBehaviour, SwarmBuilder, SwarmEvent}, tcp, yamux, PeerId, Transport, }; use std::time::Duration; @@ -48,10 +48,10 @@ async fn main() { )), rendezvous: rendezvous::server::Behaviour::new(rendezvous::server::Config::default()), ping: ping::Behaviour::new(ping::Config::new().with_interval(Duration::from_secs(1))), - keep_alive: keep_alive::Behaviour, }, PeerId::from(key_pair.public()), ) + .idle_connection_timeout(Duration::from_secs(5)) .build(); let _ = swarm.listen_on("/ip4/0.0.0.0/tcp/62649".parse().unwrap()); @@ -97,5 +97,4 @@ struct MyBehaviour { identify: identify::Behaviour, rendezvous: rendezvous::server::Behaviour, ping: ping::Behaviour, - keep_alive: keep_alive::Behaviour, } diff --git a/interop-tests/src/lib.rs b/interop-tests/src/lib.rs index 54d94430c8e9..40c06b57810e 100644 --- a/interop-tests/src/lib.rs +++ b/interop-tests/src/lib.rs @@ -3,8 +3,8 @@ use std::time::Duration; use anyhow::{bail, Context, Result}; use futures::{FutureExt, StreamExt}; -use libp2p::swarm::{keep_alive, NetworkBehaviour, SwarmEvent}; -use libp2p::{identify, identity, ping, Multiaddr, PeerId}; +use libp2p::swarm::SwarmEvent; +use libp2p::{identify, identity, ping, swarm::NetworkBehaviour, Multiaddr, PeerId}; #[cfg(target_arch = "wasm32")] use wasm_bindgen::prelude::*; @@ -33,8 +33,7 @@ pub async fn run_test( let mut swarm = swarm_builder( boxed_transport, Behaviour { - ping: ping::Behaviour::new(ping::Config::new().with_interval(Duration::from_secs(1))), - keep_alive: keep_alive::Behaviour, + ping: ping::Behaviour::new(ping::Config::new().with_interval(Duration::from_secs(10))), // Need to include identify until https://github.com/status-im/nim-libp2p/issues/924 is resolved. identify: identify::Behaviour::new(identify::Config::new( "/interop-tests".to_owned(), @@ -43,6 +42,7 @@ pub async fn run_test( }, local_peer_id, ) + .idle_connection_timeout(Duration::from_secs(5)) .build(); log::info!("Running ping test: {}", swarm.local_peer_id()); @@ -242,7 +242,6 @@ impl FromStr for SecProtocol { #[derive(NetworkBehaviour)] struct Behaviour { ping: ping::Behaviour, - keep_alive: keep_alive::Behaviour, identify: identify::Behaviour, } diff --git a/libp2p/src/tutorials/ping.rs b/libp2p/src/tutorials/ping.rs index 006c807803cc..aedc149228e5 100644 --- a/libp2p/src/tutorials/ping.rs +++ b/libp2p/src/tutorials/ping.rs @@ -143,7 +143,7 @@ //! With the above in mind, let's extend our example, creating a [`ping::Behaviour`](crate::ping::Behaviour) at the end: //! //! ```rust -//! use libp2p::swarm::{keep_alive, NetworkBehaviour}; +//! use libp2p::swarm::NetworkBehaviour; //! use libp2p::{identity, ping, PeerId}; //! use std::error::Error; //! @@ -155,20 +155,10 @@ //! //! let transport = libp2p::development_transport(local_key).await?; //! -//! let behaviour = Behaviour::default(); +//! let behaviour = ping::Behaviour::default(); //! //! Ok(()) //! } -//! -//! /// Our network behaviour. -//! /// -//! /// For illustrative purposes, this includes the [`KeepAlive`](behaviour::KeepAlive) behaviour so a continuous sequence of -//! /// pings can be observed. -//! #[derive(NetworkBehaviour, Default)] -//! struct Behaviour { -//! keep_alive: keep_alive::Behaviour, -//! ping: ping::Behaviour, -//! } //! ``` //! //! ## Swarm @@ -185,7 +175,7 @@ //! (In our example, `env_logger` is used) //! //! ```rust -//! use libp2p::swarm::{keep_alive, NetworkBehaviour, SwarmBuilder}; +//! use libp2p::swarm::{NetworkBehaviour, SwarmBuilder}; //! use libp2p::{identity, ping, PeerId}; //! use std::error::Error; //! @@ -197,21 +187,46 @@ //! //! let transport = libp2p::development_transport(local_key).await?; //! -//! let behaviour = Behaviour::default(); +//! let behaviour = ping::Behaviour::default(); //! //! let mut swarm = SwarmBuilder::with_async_std_executor(transport, behaviour, local_peer_id).build(); //! //! Ok(()) //! } +//! ``` +//! +//! ## Idle connection timeout +//! +//! Now, for this example in particular, we need set the idle connection timeout. +//! Otherwise, the connection will be closed immediately. +//! +//! Whether you need to set this in your application too depends on your usecase. +//! Typically, connections are kept alive if they are "in use" by a certain protocol. +//! The ping protocol however is only an "auxiliary" kind of protocol. +//! Thus, without any other behaviour in place, we would not be able to observe the pings. +//! +//! ```rust +//! use libp2p::swarm::{NetworkBehaviour, SwarmBuilder}; +//! use libp2p::{identity, ping, PeerId}; +//! use std::error::Error; +//! use std::time::Duration; +//! +//! #[async_std::main] +//! async fn main() -> Result<(), Box> { +//! use std::time::Duration; +//! let local_key = identity::Keypair::generate_ed25519(); +//! let local_peer_id = PeerId::from(local_key.public()); +//! println!("Local peer id: {local_peer_id:?}"); +//! +//! let transport = libp2p::development_transport(local_key).await?; //! -//! /// Our network behaviour. -//! /// -//! /// For illustrative purposes, this includes the [`KeepAlive`](behaviour:: -//! /// KeepAlive) behaviour so a continuous sequence of pings can be observed. -//! #[derive(NetworkBehaviour, Default)] -//! struct Behaviour { -//! keep_alive: keep_alive::Behaviour, -//! ping: ping::Behaviour, +//! let behaviour = ping::Behaviour::default(); +//! +//! let mut swarm = SwarmBuilder::with_async_std_executor(transport, behaviour, local_peer_id) +//! .idle_connection_timeout(Duration::from_secs(30)) // Allows us to observe pings for 30 seconds. +//! .build(); +//! +//! Ok(()) //! } //! ``` //! @@ -242,9 +257,10 @@ //! remote peer. //! //! ```rust -//! use libp2p::swarm::{keep_alive, NetworkBehaviour, SwarmBuilder}; +//! use libp2p::swarm::{NetworkBehaviour, SwarmBuilder}; //! use libp2p::{identity, ping, Multiaddr, PeerId}; //! use std::error::Error; +//! use std::time::Duration; //! //! #[async_std::main] //! async fn main() -> Result<(), Box> { @@ -254,9 +270,11 @@ //! //! let transport = libp2p::development_transport(local_key).await?; //! -//! let behaviour = Behaviour::default(); +//! let behaviour = ping::Behaviour::default(); //! -//! let mut swarm = SwarmBuilder::with_async_std_executor(transport, behaviour, local_peer_id).build(); +//! let mut swarm = SwarmBuilder::with_async_std_executor(transport, behaviour, local_peer_id) +//! .idle_connection_timeout(Duration::from_secs(30)) // Allows us to observe pings for 30 seconds. +//! .build(); //! //! // Tell the swarm to listen on all interfaces and a random, OS-assigned //! // port. @@ -272,16 +290,6 @@ //! //! Ok(()) //! } -//! -//! /// Our network behaviour. -//! /// -//! /// For illustrative purposes, this includes the [`KeepAlive`](behaviour::KeepAlive) behaviour so a continuous sequence of -//! /// pings can be observed. -//! #[derive(NetworkBehaviour, Default)] -//! struct Behaviour { -//! keep_alive: keep_alive::Behaviour, -//! ping: ping::Behaviour, -//! } //! ``` //! //! ## Continuously polling the Swarm @@ -292,9 +300,10 @@ //! //! ```no_run //! use futures::prelude::*; -//! use libp2p::swarm::{keep_alive, NetworkBehaviour, SwarmEvent, SwarmBuilder}; +//! use libp2p::swarm::{NetworkBehaviour, SwarmEvent, SwarmBuilder}; //! use libp2p::{identity, ping, Multiaddr, PeerId}; //! use std::error::Error; +//! use std::time::Duration; //! //! #[async_std::main] //! async fn main() -> Result<(), Box> { @@ -304,9 +313,11 @@ //! //! let transport = libp2p::development_transport(local_key).await?; //! -//! let behaviour = Behaviour::default(); +//! let behaviour = ping::Behaviour::default(); //! -//! let mut swarm = SwarmBuilder::with_async_std_executor(transport, behaviour, local_peer_id).build(); +//! let mut swarm = SwarmBuilder::with_async_std_executor(transport, behaviour, local_peer_id) +//! .idle_connection_timeout(Duration::from_secs(30)) // Allows us to observe pings for 30 seconds. +//! .build(); //! //! // Tell the swarm to listen on all interfaces and a random, OS-assigned //! // port. @@ -328,16 +339,6 @@ //! } //! } //! } -//! -//! /// Our network behaviour. -//! /// -//! /// For illustrative purposes, this includes the [`KeepAlive`](behaviour::KeepAlive) behaviour so a continuous sequence of -//! /// pings can be observed. -//! #[derive(NetworkBehaviour, Default)] -//! struct Behaviour { -//! keep_alive: keep_alive::Behaviour, -//! ping: ping::Behaviour, -//! } //! ``` //! //! ## Running two nodes diff --git a/misc/allow-block-list/src/lib.rs b/misc/allow-block-list/src/lib.rs index b1be0ca29dbf..8f8892b78103 100644 --- a/misc/allow-block-list/src/lib.rs +++ b/misc/allow-block-list/src/lib.rs @@ -282,14 +282,11 @@ mod tests { #[async_std::test] async fn cannot_dial_blocked_peer() { - let mut dialer = Swarm::new_ephemeral(|_| Behaviour::::new()); - let mut listener = Swarm::new_ephemeral(|_| Behaviour::::new()); + let mut dialer = Swarm::new_ephemeral(|_| Behaviour::::default()); + let mut listener = Swarm::new_ephemeral(|_| Behaviour::::default()); listener.listen().await; - dialer - .behaviour_mut() - .list - .block_peer(*listener.local_peer_id()); + dialer.behaviour_mut().block_peer(*listener.local_peer_id()); let DialError::Denied { cause } = dial(&mut dialer, &listener).unwrap_err() else { panic!("unexpected dial error") @@ -299,17 +296,13 @@ mod tests { #[async_std::test] async fn can_dial_unblocked_peer() { - let mut dialer = Swarm::new_ephemeral(|_| Behaviour::::new()); - let mut listener = Swarm::new_ephemeral(|_| Behaviour::::new()); + let mut dialer = Swarm::new_ephemeral(|_| Behaviour::::default()); + let mut listener = Swarm::new_ephemeral(|_| Behaviour::::default()); listener.listen().await; + dialer.behaviour_mut().block_peer(*listener.local_peer_id()); dialer .behaviour_mut() - .list - .block_peer(*listener.local_peer_id()); - dialer - .behaviour_mut() - .list .unblock_peer(*listener.local_peer_id()); dial(&mut dialer, &listener).unwrap(); @@ -317,14 +310,11 @@ mod tests { #[async_std::test] async fn blocked_peer_cannot_dial_us() { - let mut dialer = Swarm::new_ephemeral(|_| Behaviour::::new()); - let mut listener = Swarm::new_ephemeral(|_| Behaviour::::new()); + let mut dialer = Swarm::new_ephemeral(|_| Behaviour::::default()); + let mut listener = Swarm::new_ephemeral(|_| Behaviour::::default()); listener.listen().await; - listener - .behaviour_mut() - .list - .block_peer(*dialer.local_peer_id()); + listener.behaviour_mut().block_peer(*dialer.local_peer_id()); dial(&mut dialer, &listener).unwrap(); async_std::task::spawn(dialer.loop_on_next()); @@ -342,15 +332,12 @@ mod tests { #[async_std::test] async fn connections_get_closed_upon_blocked() { - let mut dialer = Swarm::new_ephemeral(|_| Behaviour::::new()); - let mut listener = Swarm::new_ephemeral(|_| Behaviour::::new()); + let mut dialer = Swarm::new_ephemeral(|_| Behaviour::::default()); + let mut listener = Swarm::new_ephemeral(|_| Behaviour::::default()); listener.listen().await; dialer.connect(&mut listener).await; - dialer - .behaviour_mut() - .list - .block_peer(*listener.local_peer_id()); + dialer.behaviour_mut().block_peer(*listener.local_peer_id()); let ( [SwarmEvent::ConnectionClosed { @@ -371,8 +358,8 @@ mod tests { #[async_std::test] async fn cannot_dial_peer_unless_allowed() { - let mut dialer = Swarm::new_ephemeral(|_| Behaviour::::new()); - let mut listener = Swarm::new_ephemeral(|_| Behaviour::::new()); + let mut dialer = Swarm::new_ephemeral(|_| Behaviour::::default()); + let mut listener = Swarm::new_ephemeral(|_| Behaviour::::default()); listener.listen().await; let DialError::Denied { cause } = dial(&mut dialer, &listener).unwrap_err() else { @@ -380,26 +367,19 @@ mod tests { }; assert!(cause.downcast::().is_ok()); - dialer - .behaviour_mut() - .list - .allow_peer(*listener.local_peer_id()); + dialer.behaviour_mut().allow_peer(*listener.local_peer_id()); assert!(dial(&mut dialer, &listener).is_ok()); } #[async_std::test] async fn cannot_dial_disallowed_peer() { - let mut dialer = Swarm::new_ephemeral(|_| Behaviour::::new()); - let mut listener = Swarm::new_ephemeral(|_| Behaviour::::new()); + let mut dialer = Swarm::new_ephemeral(|_| Behaviour::::default()); + let mut listener = Swarm::new_ephemeral(|_| Behaviour::::default()); listener.listen().await; + dialer.behaviour_mut().allow_peer(*listener.local_peer_id()); dialer .behaviour_mut() - .list - .allow_peer(*listener.local_peer_id()); - dialer - .behaviour_mut() - .list .disallow_peer(*listener.local_peer_id()); let DialError::Denied { cause } = dial(&mut dialer, &listener).unwrap_err() else { @@ -410,8 +390,8 @@ mod tests { #[async_std::test] async fn not_allowed_peer_cannot_dial_us() { - let mut dialer = Swarm::new_ephemeral(|_| Behaviour::::new()); - let mut listener = Swarm::new_ephemeral(|_| Behaviour::::new()); + let mut dialer = Swarm::new_ephemeral(|_| Behaviour::::default()); + let mut listener = Swarm::new_ephemeral(|_| Behaviour::::default()); listener.listen().await; dialer @@ -447,23 +427,16 @@ mod tests { #[async_std::test] async fn connections_get_closed_upon_disallow() { - let mut dialer = Swarm::new_ephemeral(|_| Behaviour::::new()); - let mut listener = Swarm::new_ephemeral(|_| Behaviour::::new()); + let mut dialer = Swarm::new_ephemeral(|_| Behaviour::::default()); + let mut listener = Swarm::new_ephemeral(|_| Behaviour::::default()); listener.listen().await; - dialer - .behaviour_mut() - .list - .allow_peer(*listener.local_peer_id()); - listener - .behaviour_mut() - .list - .allow_peer(*dialer.local_peer_id()); + dialer.behaviour_mut().allow_peer(*listener.local_peer_id()); + listener.behaviour_mut().allow_peer(*dialer.local_peer_id()); dialer.connect(&mut listener).await; dialer .behaviour_mut() - .list .disallow_peer(*listener.local_peer_id()); let ( [SwarmEvent::ConnectionClosed { @@ -495,27 +468,4 @@ mod tests { .build(), ) } - - #[derive(libp2p_swarm_derive::NetworkBehaviour)] - #[behaviour(prelude = "libp2p_swarm::derive_prelude")] - struct Behaviour { - list: super::Behaviour, - keep_alive: libp2p_swarm::keep_alive::Behaviour, - } - - impl Behaviour - where - S: Default, - { - fn new() -> Self { - Self { - list: super::Behaviour { - waker: None, - close_connections: VecDeque::new(), - state: S::default(), - }, - keep_alive: libp2p_swarm::keep_alive::Behaviour, - } - } - } } diff --git a/misc/connection-limits/src/lib.rs b/misc/connection-limits/src/lib.rs index 57edf5d3f64b..0cad67b9a98f 100644 --- a/misc/connection-limits/src/lib.rs +++ b/misc/connection-limits/src/lib.rs @@ -525,7 +525,6 @@ mod tests { #[behaviour(prelude = "libp2p_swarm::derive_prelude")] struct Behaviour { limits: super::Behaviour, - keep_alive: libp2p_swarm::keep_alive::Behaviour, connection_denier: Toggle, } @@ -533,14 +532,12 @@ mod tests { fn new(limits: ConnectionLimits) -> Self { Self { limits: super::Behaviour::new(limits), - keep_alive: libp2p_swarm::keep_alive::Behaviour, connection_denier: None.into(), } } fn new_with_connection_denier(limits: ConnectionLimits) -> Self { Self { limits: super::Behaviour::new(limits), - keep_alive: libp2p_swarm::keep_alive::Behaviour, connection_denier: Some(ConnectionDenier {}).into(), } } diff --git a/protocols/identify/tests/smoke.rs b/protocols/identify/tests/smoke.rs index c70ab3181b4a..c1926b4125f5 100644 --- a/protocols/identify/tests/smoke.rs +++ b/protocols/identify/tests/smoke.rs @@ -1,6 +1,6 @@ use libp2p_core::multiaddr::Protocol; use libp2p_identify as identify; -use libp2p_swarm::{keep_alive, Swarm, SwarmEvent}; +use libp2p_swarm::{Swarm, SwarmEvent}; use libp2p_swarm_test::SwarmExt; use std::iter; @@ -9,7 +9,7 @@ async fn periodic_identify() { let _ = env_logger::try_init(); let mut swarm1 = Swarm::new_ephemeral(|identity| { - Behaviour::new( + identify::Behaviour::new( identify::Config::new("a".to_string(), identity.public()) .with_agent_version("b".to_string()), ) @@ -17,7 +17,7 @@ async fn periodic_identify() { let swarm1_peer_id = *swarm1.local_peer_id(); let mut swarm2 = Swarm::new_ephemeral(|identity| { - Behaviour::new( + identify::Behaviour::new( identify::Config::new("c".to_string(), identity.public()) .with_agent_version("d".to_string()), ) @@ -33,20 +33,20 @@ async fn periodic_identify() { match libp2p_swarm_test::drive(&mut swarm1, &mut swarm2).await { ( - [BehaviourEvent::Identify(Received { info: s1_info, .. }), BehaviourEvent::Identify(Sent { .. })], - [BehaviourEvent::Identify(Received { info: s2_info, .. }), BehaviourEvent::Identify(Sent { .. })], + [Received { info: s1_info, .. }, Sent { .. }], + [Received { info: s2_info, .. }, Sent { .. }], ) | ( - [BehaviourEvent::Identify(Sent { .. }), BehaviourEvent::Identify(Received { info: s1_info, .. })], - [BehaviourEvent::Identify(Sent { .. }), BehaviourEvent::Identify(Received { info: s2_info, .. })], + [Sent { .. }, Received { info: s1_info, .. }], + [Sent { .. }, Received { info: s2_info, .. }], ) | ( - [BehaviourEvent::Identify(Received { info: s1_info, .. }), BehaviourEvent::Identify(Sent { .. })], - [BehaviourEvent::Identify(Sent { .. }), BehaviourEvent::Identify(Received { info: s2_info, .. })], + [Received { info: s1_info, .. }, Sent { .. }], + [Sent { .. }, Received { info: s2_info, .. }], ) | ( - [BehaviourEvent::Identify(Sent { .. }), BehaviourEvent::Identify(Received { info: s1_info, .. })], - [BehaviourEvent::Identify(Received { info: s2_info, .. }), BehaviourEvent::Identify(Sent { .. })], + [Sent { .. }, Received { info: s1_info, .. }], + [Received { info: s2_info, .. }, Sent { .. }], ) => { assert_eq!(s1_info.public_key.to_peer_id(), swarm2_peer_id); assert_eq!(s1_info.protocol_version, "c"); @@ -83,10 +83,10 @@ async fn identify_push() { let _ = env_logger::try_init(); let mut swarm1 = Swarm::new_ephemeral(|identity| { - Behaviour::new(identify::Config::new("a".to_string(), identity.public())) + identify::Behaviour::new(identify::Config::new("a".to_string(), identity.public())) }); let mut swarm2 = Swarm::new_ephemeral(|identity| { - Behaviour::new( + identify::Behaviour::new( identify::Config::new("a".to_string(), identity.public()) .with_agent_version("b".to_string()), ) @@ -96,33 +96,25 @@ async fn identify_push() { swarm2.connect(&mut swarm1).await; // First, let the periodic identify do its thing. - match libp2p_swarm_test::drive(&mut swarm1, &mut swarm2).await { - ( - [BehaviourEvent::Identify(e1), BehaviourEvent::Identify(e2)], - [BehaviourEvent::Identify(e3), BehaviourEvent::Identify(e4)], - ) => { - use identify::Event::{Received, Sent}; + let ([e1, e2], [e3, e4]) = libp2p_swarm_test::drive(&mut swarm1, &mut swarm2).await; - // These can be received in any order, hence assert them here instead of the pattern above. - assert!(matches!(e1, Received { .. } | Sent { .. })); - assert!(matches!(e2, Received { .. } | Sent { .. })); - assert!(matches!(e3, Received { .. } | Sent { .. })); - assert!(matches!(e4, Received { .. } | Sent { .. })); - } - other => panic!("Unexpected events: {other:?}"), - }; + { + use identify::Event::{Received, Sent}; + + // These can be received in any order, hence assert them here. + assert!(matches!(e1, Received { .. } | Sent { .. })); + assert!(matches!(e2, Received { .. } | Sent { .. })); + assert!(matches!(e3, Received { .. } | Sent { .. })); + assert!(matches!(e4, Received { .. } | Sent { .. })); + } // Second, actively push. swarm2 .behaviour_mut() - .identify .push(iter::once(*swarm1.local_peer_id())); let swarm1_received_info = match libp2p_swarm_test::drive(&mut swarm1, &mut swarm2).await { - ( - [BehaviourEvent::Identify(identify::Event::Received { info, .. })], - [BehaviourEvent::Identify(identify::Event::Pushed { .. })], - ) => info, + ([identify::Event::Received { info, .. }], [identify::Event::Pushed { .. }]) => info, other => panic!("Unexpected events: {other:?}"), }; @@ -141,10 +133,10 @@ async fn discover_peer_after_disconnect() { let _ = env_logger::try_init(); let mut swarm1 = Swarm::new_ephemeral(|identity| { - Behaviour::new(identify::Config::new("a".to_string(), identity.public())) + identify::Behaviour::new(identify::Config::new("a".to_string(), identity.public())) }); let mut swarm2 = Swarm::new_ephemeral(|identity| { - Behaviour::new( + identify::Behaviour::new( identify::Config::new("a".to_string(), identity.public()) .with_agent_version("b".to_string()), ) @@ -161,7 +153,7 @@ async fn discover_peer_after_disconnect() { .wait(|event| { matches!( event, - SwarmEvent::Behaviour(BehaviourEvent::Identify(identify::Event::Received { .. })) + SwarmEvent::Behaviour(identify::Event::Received { .. }) ) .then_some(()) }) @@ -186,23 +178,3 @@ async fn discover_peer_after_disconnect() { assert_eq!(connected_peer, swarm1_peer_id); } - -/// Combined behaviour to keep the connection alive after the periodic identify. -/// -/// The identify implementation sets `keep_alive` to `No` once it has done its thing. -/// This can result in unexpected connection closures if one peer is faster than the other. -#[derive(libp2p_swarm::NetworkBehaviour)] -#[behaviour(prelude = "libp2p_swarm::derive_prelude")] -struct Behaviour { - identify: identify::Behaviour, - keep_alive: keep_alive::Behaviour, -} - -impl Behaviour { - fn new(config: identify::Config) -> Self { - Self { - identify: identify::Behaviour::new(config), - keep_alive: keep_alive::Behaviour, - } - } -} diff --git a/protocols/ping/tests/ping.rs b/protocols/ping/tests/ping.rs index 63836a15a788..946a2daadb61 100644 --- a/protocols/ping/tests/ping.rs +++ b/protocols/ping/tests/ping.rs @@ -21,8 +21,8 @@ //! Integration tests for the `Ping` network behaviour. use libp2p_ping as ping; -use libp2p_swarm::keep_alive; -use libp2p_swarm::{NetworkBehaviour, Swarm, SwarmEvent}; +use libp2p_swarm::dummy; +use libp2p_swarm::{Swarm, SwarmEvent}; use libp2p_swarm_test::SwarmExt; use quickcheck::*; use std::{num::NonZeroU8, time::Duration}; @@ -32,18 +32,16 @@ fn ping_pong() { fn prop(count: NonZeroU8) { let cfg = ping::Config::new().with_interval(Duration::from_millis(10)); - let mut swarm1 = Swarm::new_ephemeral(|_| Behaviour::new(cfg.clone())); - let mut swarm2 = Swarm::new_ephemeral(|_| Behaviour::new(cfg.clone())); + let mut swarm1 = Swarm::new_ephemeral(|_| ping::Behaviour::new(cfg.clone())); + let mut swarm2 = Swarm::new_ephemeral(|_| ping::Behaviour::new(cfg.clone())); async_std::task::block_on(async { swarm1.listen().await; swarm2.connect(&mut swarm1).await; for _ in 0..count.get() { - let (e1, e2) = match libp2p_swarm_test::drive(&mut swarm1, &mut swarm2).await { - ([BehaviourEvent::Ping(e1)], [BehaviourEvent::Ping(e2)]) => (e1, e2), - events => panic!("Unexpected events: {events:?}"), - }; + let ([e1], [e2]): ([ping::Event; 1], [ping::Event; 1]) = + libp2p_swarm_test::drive(&mut swarm1, &mut swarm2).await; assert_eq!(&e1.peer, swarm2.local_peer_id()); assert_eq!(&e2.peer, swarm1.local_peer_id()); @@ -65,8 +63,8 @@ fn assert_ping_rtt_less_than_50ms(e: ping::Event) { #[test] fn unsupported_doesnt_fail() { - let mut swarm1 = Swarm::new_ephemeral(|_| keep_alive::Behaviour); - let mut swarm2 = Swarm::new_ephemeral(|_| Behaviour::new(ping::Config::new())); + let mut swarm1 = Swarm::new_ephemeral(|_| dummy::Behaviour); + let mut swarm2 = Swarm::new_ephemeral(|_| ping::Behaviour::new(ping::Config::new())); let result = async_std::task::block_on(async { swarm1.listen().await; @@ -76,10 +74,10 @@ fn unsupported_doesnt_fail() { loop { match swarm2.next_swarm_event().await { - SwarmEvent::Behaviour(BehaviourEvent::Ping(ping::Event { + SwarmEvent::Behaviour(ping::Event { result: Err(ping::Failure::Unsupported), .. - })) => { + }) => { swarm2.disconnect_peer_id(swarm1_peer_id).unwrap(); } SwarmEvent::ConnectionClosed { cause: Some(e), .. } => { @@ -95,19 +93,3 @@ fn unsupported_doesnt_fail() { result.expect("node with ping should not fail connection due to unsupported protocol"); } - -#[derive(NetworkBehaviour, Default)] -#[behaviour(prelude = "libp2p_swarm::derive_prelude")] -struct Behaviour { - keep_alive: keep_alive::Behaviour, - ping: ping::Behaviour, -} - -impl Behaviour { - fn new(config: ping::Config) -> Self { - Self { - keep_alive: keep_alive::Behaviour, - ping: ping::Behaviour::new(config), - } - } -} diff --git a/swarm-test/src/lib.rs b/swarm-test/src/lib.rs index 0ed8dbce2208..819db33ba88c 100644 --- a/swarm-test/src/lib.rs +++ b/swarm-test/src/lib.rs @@ -218,7 +218,9 @@ where .timeout(Duration::from_secs(20)) .boxed(); - SwarmBuilder::without_executor(transport, behaviour_fn(identity), peer_id).build() + SwarmBuilder::without_executor(transport, behaviour_fn(identity), peer_id) + .idle_connection_timeout(Duration::from_secs(5)) // Some tests need connections to be kept alive beyond what the individual behaviour configures. + .build() } async fn connect(&mut self, other: &mut Swarm) diff --git a/swarm/CHANGELOG.md b/swarm/CHANGELOG.md index 77ddf3458d13..06ddd740873a 100644 --- a/swarm/CHANGELOG.md +++ b/swarm/CHANGELOG.md @@ -6,8 +6,12 @@ - Improve error message when `DialPeerCondition` prevents a dial. See [PR 4409]. +- Introduce `SwarmBuilder::idle_conncetion_timeout` and deprecate `keep_alive::Behaviour` as a result. + See [PR 4161]. + [PR 4426]: https://github.com/libp2p/rust-libp2p/pull/4426 [PR 4409]: https://github.com/libp2p/rust-libp2p/pull/4409 +[PR 4161]: https://github.com/libp2p/rust-libp2p/pull/4161 ## 0.43.3 diff --git a/swarm/Cargo.toml b/swarm/Cargo.toml index d00829ce8b25..86f4c1583873 100644 --- a/swarm/Cargo.toml +++ b/swarm/Cargo.toml @@ -55,6 +55,7 @@ quickcheck = { workspace = true } void = "1" once_cell = "1.18.0" trybuild = "1.0.85" +tokio = { version = "1.29.1", features = ["time", "rt", "macros"] } [[test]] name = "swarm_derive" diff --git a/swarm/src/connection.rs b/swarm/src/connection.rs index 3796d9a027db..310cf3a81e3f 100644 --- a/swarm/src/connection.rs +++ b/swarm/src/connection.rs @@ -52,6 +52,7 @@ use libp2p_core::upgrade; use libp2p_core::upgrade::{NegotiationError, ProtocolError}; use libp2p_core::Endpoint; use libp2p_identity::PeerId; +use std::cmp::max; use std::collections::HashSet; use std::fmt::{Display, Formatter}; use std::future::Future; @@ -156,6 +157,7 @@ where local_supported_protocols: HashSet, remote_supported_protocols: HashSet, + idle_timeout: Duration, } impl fmt::Debug for Connection @@ -183,9 +185,9 @@ where mut handler: THandler, substream_upgrade_protocol_override: Option, max_negotiating_inbound_streams: usize, + idle_timeout: Duration, ) -> Self { let initial_protocols = gather_supported_protocols(&handler); - if !initial_protocols.is_empty() { handler.on_connection_event(ConnectionEvent::LocalProtocolsChange( ProtocolsChange::Added(ProtocolsAdded::from_set(&initial_protocols)), @@ -203,6 +205,7 @@ where requested_substreams: Default::default(), local_supported_protocols: initial_protocols, remote_supported_protocols: Default::default(), + idle_timeout, } } @@ -234,6 +237,7 @@ where substream_upgrade_protocol_override, local_supported_protocols: supported_protocols, remote_supported_protocols, + idle_timeout, } = self.get_mut(); loop { @@ -348,17 +352,36 @@ where (Shutdown::Later(timer, deadline), KeepAlive::Until(t)) => { if *deadline != t { *deadline = t; - if let Some(dur) = deadline.checked_duration_since(Instant::now()) { - timer.reset(dur) + if let Some(new_duration) = deadline.checked_duration_since(Instant::now()) + { + let effective_keep_alive = max(new_duration, *idle_timeout); + + timer.reset(effective_keep_alive) } } } - (_, KeepAlive::Until(t)) => { - if let Some(dur) = t.checked_duration_since(Instant::now()) { - *shutdown = Shutdown::Later(Delay::new(dur), t) + (_, KeepAlive::Until(earliest_shutdown)) => { + if let Some(requested_keep_alive) = + earliest_shutdown.checked_duration_since(Instant::now()) + { + let effective_keep_alive = max(requested_keep_alive, *idle_timeout); + + // Important: We store the _original_ `Instant` given by the `ConnectionHandler` in the `Later` instance to ensure we can compare it in the above branch. + // This is quite subtle but will hopefully become simpler soon once `KeepAlive::Until` is fully deprecated. See / + *shutdown = + Shutdown::Later(Delay::new(effective_keep_alive), earliest_shutdown) } } - (_, KeepAlive::No) => *shutdown = Shutdown::Asap, + (_, KeepAlive::No) if idle_timeout == &Duration::ZERO => { + *shutdown = Shutdown::Asap; + } + (Shutdown::Later(_, _), KeepAlive::No) => { + // Do nothing, i.e. let the shutdown timer continue to tick. + } + (_, KeepAlive::No) => { + let deadline = Instant::now() + *idle_timeout; + *shutdown = Shutdown::Later(Delay::new(*idle_timeout), deadline); + } (_, KeepAlive::Yes) => *shutdown = Shutdown::None, }; @@ -696,7 +719,7 @@ enum Shutdown { #[cfg(test)] mod tests { use super::*; - use crate::keep_alive; + use crate::dummy; use futures::future; use futures::AsyncRead; use futures::AsyncWrite; @@ -704,6 +727,7 @@ mod tests { use libp2p_core::StreamMuxer; use quickcheck::*; use std::sync::{Arc, Weak}; + use std::time::Instant; use void::Void; #[test] @@ -712,14 +736,14 @@ mod tests { let max_negotiating_inbound_streams: usize = max_negotiating_inbound_streams.into(); let alive_substream_counter = Arc::new(()); - let mut connection = Connection::new( StreamMuxerBox::new(DummyStreamMuxer { counter: alive_substream_counter.clone(), }), - keep_alive::ConnectionHandler, + MockConnectionHandler::new(Duration::ZERO), None, max_negotiating_inbound_streams, + Duration::ZERO, ); let result = connection.poll_noop_waker(); @@ -743,6 +767,7 @@ mod tests { MockConnectionHandler::new(upgrade_timeout), None, 2, + Duration::ZERO, ); connection.handler.open_new_outbound(); @@ -765,6 +790,7 @@ mod tests { ConfigurableProtocolConnectionHandler::default(), None, 0, + Duration::ZERO, ); // First, start listening on a single protocol. @@ -803,6 +829,7 @@ mod tests { ConfigurableProtocolConnectionHandler::default(), None, 0, + Duration::ZERO, ); // First, remote supports a single protocol. @@ -846,6 +873,141 @@ mod tests { assert_eq!(connection.handler.remote_removed, vec![vec!["/bar"]]); } + #[tokio::test] + async fn idle_timeout_with_keep_alive_no() { + let idle_timeout = Duration::from_millis(100); + + let mut connection = Connection::new( + StreamMuxerBox::new(PendingStreamMuxer), + dummy::ConnectionHandler, + None, + 0, + idle_timeout, + ); + + assert!(connection.poll_noop_waker().is_pending()); + + tokio::time::sleep(idle_timeout).await; + + assert!(matches!( + connection.poll_noop_waker(), + Poll::Ready(Err(ConnectionError::KeepAliveTimeout)) + )); + } + + #[tokio::test] + async fn idle_timeout_with_keep_alive_until_greater_than_idle_timeout() { + let idle_timeout = Duration::from_millis(100); + + let mut connection = Connection::new( + StreamMuxerBox::new(PendingStreamMuxer), + KeepAliveUntilConnectionHandler { + until: Instant::now() + idle_timeout * 2, + }, + None, + 0, + idle_timeout, + ); + + assert!(connection.poll_noop_waker().is_pending()); + + tokio::time::sleep(idle_timeout).await; + + assert!( + connection.poll_noop_waker().is_pending(), + "`KeepAlive::Until` is greater than idle-timeout, continue sleeping" + ); + + tokio::time::sleep(idle_timeout).await; + + assert!(matches!( + connection.poll_noop_waker(), + Poll::Ready(Err(ConnectionError::KeepAliveTimeout)) + )); + } + + #[tokio::test] + async fn idle_timeout_with_keep_alive_until_less_than_idle_timeout() { + let idle_timeout = Duration::from_millis(100); + + let mut connection = Connection::new( + StreamMuxerBox::new(PendingStreamMuxer), + KeepAliveUntilConnectionHandler { + until: Instant::now() + idle_timeout / 2, + }, + None, + 0, + idle_timeout, + ); + + assert!(connection.poll_noop_waker().is_pending()); + + tokio::time::sleep(idle_timeout / 2).await; + + assert!( + connection.poll_noop_waker().is_pending(), + "`KeepAlive::Until` is less than idle-timeout, honor idle-timeout" + ); + + tokio::time::sleep(idle_timeout / 2).await; + + assert!(matches!( + connection.poll_noop_waker(), + Poll::Ready(Err(ConnectionError::KeepAliveTimeout)) + )); + } + + struct KeepAliveUntilConnectionHandler { + until: Instant, + } + + impl ConnectionHandler for KeepAliveUntilConnectionHandler { + type FromBehaviour = Void; + type ToBehaviour = Void; + type Error = Void; + type InboundProtocol = DeniedUpgrade; + type OutboundProtocol = DeniedUpgrade; + type InboundOpenInfo = (); + type OutboundOpenInfo = Void; + + fn listen_protocol( + &self, + ) -> SubstreamProtocol { + SubstreamProtocol::new(DeniedUpgrade, ()) + } + + fn connection_keep_alive(&self) -> KeepAlive { + KeepAlive::Until(self.until) + } + + fn poll( + &mut self, + _: &mut Context<'_>, + ) -> Poll< + ConnectionHandlerEvent< + Self::OutboundProtocol, + Self::OutboundOpenInfo, + Self::ToBehaviour, + Self::Error, + >, + > { + Poll::Pending + } + + fn on_behaviour_event(&mut self, _: Self::FromBehaviour) {} + + fn on_connection_event( + &mut self, + _: ConnectionEvent< + Self::InboundProtocol, + Self::OutboundProtocol, + Self::InboundOpenInfo, + Self::OutboundOpenInfo, + >, + ) { + } + } + struct DummyStreamMuxer { counter: Arc<()>, } diff --git a/swarm/src/connection/pool.rs b/swarm/src/connection/pool.rs index e9f7504f5292..07fc9075806c 100644 --- a/swarm/src/connection/pool.rs +++ b/swarm/src/connection/pool.rs @@ -37,7 +37,7 @@ use futures::{ ready, stream::FuturesUnordered, }; -use instant::Instant; +use instant::{Duration, Instant}; use libp2p_core::connection::Endpoint; use libp2p_core::muxing::{StreamMuxerBox, StreamMuxerExt}; use std::task::Waker; @@ -135,6 +135,9 @@ where /// Receivers for [`NewConnection`] objects that are dropped. new_connection_dropped_listeners: FuturesUnordered>, + + /// How long a connection should be kept alive once it starts idling. + idle_connection_timeout: Duration, } #[derive(Debug)] @@ -322,6 +325,7 @@ where substream_upgrade_protocol_override: config.substream_upgrade_protocol_override, max_negotiating_inbound_streams: config.max_negotiating_inbound_streams, per_connection_event_buffer_size: config.per_connection_event_buffer_size, + idle_connection_timeout: config.idle_connection_timeout, executor, pending_connection_events_tx, pending_connection_events_rx, @@ -518,6 +522,7 @@ where handler, self.substream_upgrade_protocol_override, self.max_negotiating_inbound_streams, + self.idle_connection_timeout, ); self.executor.spawn(task::new_for_established_connection( @@ -947,6 +952,8 @@ pub(crate) struct PoolConfig { pub(crate) per_connection_event_buffer_size: usize, /// Number of addresses concurrently dialed for a single outbound connection attempt. pub(crate) dial_concurrency_factor: NonZeroU8, + /// How long a connection should be kept alive once it is idling. + pub(crate) idle_connection_timeout: Duration, /// The configured override for substream protocol upgrades, if any. substream_upgrade_protocol_override: Option, @@ -963,6 +970,7 @@ impl PoolConfig { task_command_buffer_size: 32, per_connection_event_buffer_size: 7, dial_concurrency_factor: NonZeroU8::new(8).expect("8 > 0"), + idle_connection_timeout: Duration::ZERO, substream_upgrade_protocol_override: None, max_negotiating_inbound_streams: 128, } diff --git a/swarm/src/lib.rs b/swarm/src/lib.rs index bf94e9370b32..6d484d0691fb 100644 --- a/swarm/src/lib.rs +++ b/swarm/src/lib.rs @@ -67,6 +67,9 @@ pub mod behaviour; pub mod dial_opts; pub mod dummy; pub mod handler; +#[deprecated( + note = "Configure an appropriate idle connection timeout via `SwarmBuilder::idle_connection_timeout` instead. To keep connections alive 'forever', use `Duration::from_secs(u64::MAX)`." +)] pub mod keep_alive; mod listen_opts; @@ -145,6 +148,7 @@ use libp2p_identity::PeerId; use smallvec::SmallVec; use std::collections::{HashMap, HashSet}; use std::num::{NonZeroU32, NonZeroU8, NonZeroUsize}; +use std::time::Duration; use std::{ convert::TryFrom, error, fmt, io, @@ -1492,6 +1496,14 @@ where self } + /// How long to keep a connection alive once it is idling. + /// + /// Defaults to 0. + pub fn idle_connection_timeout(mut self, timeout: Duration) -> Self { + self.pool_config.idle_connection_timeout = timeout; + self + } + /// Builds a `Swarm` with the current configuration. pub fn build(self) -> Swarm { log::info!("Local peer id: {}", self.local_peer_id); @@ -1782,6 +1794,7 @@ fn p2p_addr(peer: Option, addr: Multiaddr) -> Result( - handler_proto: T, - ) -> SwarmBuilder>> - where - T: ConnectionHandler + Clone, - T::ToBehaviour: Clone, - O: Send + 'static, - { + fn new_test_swarm( + ) -> SwarmBuilder>> { let id_keys = identity::Keypair::generate_ed25519(); let local_public_key = id_keys.public(); let transport = transport::MemoryTransport::default() @@ -1820,13 +1827,15 @@ mod tests { }) .multiplex(yamux::Config::default()) .boxed(); - let behaviour = CallTraceBehaviour::new(MockBehaviour::new(handler_proto)); - match ThreadPool::new().ok() { + let behaviour = CallTraceBehaviour::new(MockBehaviour::new(dummy::ConnectionHandler)); + let builder = match ThreadPool::new().ok() { Some(tp) => { SwarmBuilder::with_executor(transport, behaviour, local_public_key.into(), tp) } None => SwarmBuilder::without_executor(transport, behaviour, local_public_key.into()), - } + }; + + builder.idle_connection_timeout(Duration::from_secs(5)) } fn swarms_connected( @@ -1877,12 +1886,8 @@ mod tests { /// with pairs of [`FromSwarm::ConnectionEstablished`] / [`FromSwarm::ConnectionClosed`] #[test] fn test_swarm_disconnect() { - // Since the test does not try to open any substreams, we can - // use the dummy protocols handler. - let handler_proto = keep_alive::ConnectionHandler; - - let mut swarm1 = new_test_swarm::<_, ()>(handler_proto.clone()).build(); - let mut swarm2 = new_test_swarm::<_, ()>(handler_proto).build(); + let mut swarm1 = new_test_swarm().build(); + let mut swarm2 = new_test_swarm().build(); let addr1: Multiaddr = multiaddr::Protocol::Memory(rand::random::()).into(); let addr2: Multiaddr = multiaddr::Protocol::Memory(rand::random::()).into(); @@ -1943,12 +1948,8 @@ mod tests { /// with pairs of [`FromSwarm::ConnectionEstablished`] / [`FromSwarm::ConnectionClosed`] #[test] fn test_behaviour_disconnect_all() { - // Since the test does not try to open any substreams, we can - // use the dummy protocols handler. - let handler_proto = keep_alive::ConnectionHandler; - - let mut swarm1 = new_test_swarm::<_, ()>(handler_proto.clone()).build(); - let mut swarm2 = new_test_swarm::<_, ()>(handler_proto).build(); + let mut swarm1 = new_test_swarm().build(); + let mut swarm2 = new_test_swarm().build(); let addr1: Multiaddr = multiaddr::Protocol::Memory(rand::random::()).into(); let addr2: Multiaddr = multiaddr::Protocol::Memory(rand::random::()).into(); @@ -2013,12 +2014,8 @@ mod tests { /// with pairs of [`FromSwarm::ConnectionEstablished`] / [`FromSwarm::ConnectionClosed`] #[test] fn test_behaviour_disconnect_one() { - // Since the test does not try to open any substreams, we can - // use the dummy protocols handler. - let handler_proto = keep_alive::ConnectionHandler; - - let mut swarm1 = new_test_swarm::<_, ()>(handler_proto.clone()).build(); - let mut swarm2 = new_test_swarm::<_, ()>(handler_proto).build(); + let mut swarm1 = new_test_swarm().build(); + let mut swarm2 = new_test_swarm().build(); let addr1: Multiaddr = multiaddr::Protocol::Memory(rand::random::()).into(); let addr2: Multiaddr = multiaddr::Protocol::Memory(rand::random::()).into(); @@ -2096,7 +2093,7 @@ mod tests { fn prop(concurrency_factor: DialConcurrencyFactor) { block_on(async { - let mut swarm = new_test_swarm::<_, ()>(keep_alive::ConnectionHandler) + let mut swarm = new_test_swarm() .dial_concurrency_factor(concurrency_factor.0) .build(); @@ -2164,8 +2161,8 @@ mod tests { // Checks whether dialing an address containing the wrong peer id raises an error // for the expected peer id instead of the obtained peer id. - let mut swarm1 = new_test_swarm::<_, ()>(dummy::ConnectionHandler).build(); - let mut swarm2 = new_test_swarm::<_, ()>(dummy::ConnectionHandler).build(); + let mut swarm1 = new_test_swarm().build(); + let mut swarm2 = new_test_swarm().build(); swarm1.listen_on("/memory/0".parse().unwrap()).unwrap(); @@ -2224,7 +2221,7 @@ mod tests { // // The last two can happen in any order. - let mut swarm = new_test_swarm::<_, ()>(dummy::ConnectionHandler).build(); + let mut swarm = new_test_swarm().build(); swarm.listen_on("/memory/0".parse().unwrap()).unwrap(); let local_address = @@ -2284,7 +2281,7 @@ mod tests { fn dial_self_by_id() { // Trying to dial self by passing the same `PeerId` shouldn't even be possible in the first // place. - let swarm = new_test_swarm::<_, ()>(dummy::ConnectionHandler).build(); + let swarm = new_test_swarm().build(); let peer_id = *swarm.local_peer_id(); assert!(!swarm.is_connected(&peer_id)); } @@ -2295,7 +2292,7 @@ mod tests { let target = PeerId::random(); - let mut swarm = new_test_swarm::<_, ()>(dummy::ConnectionHandler).build(); + let mut swarm = new_test_swarm().build(); let addresses = HashSet::from([ multiaddr![Ip4([0, 0, 0, 0]), Tcp(rand::random::())], @@ -2341,8 +2338,8 @@ mod tests { fn aborting_pending_connection_surfaces_error() { let _ = env_logger::try_init(); - let mut dialer = new_test_swarm::<_, ()>(dummy::ConnectionHandler).build(); - let mut listener = new_test_swarm::<_, ()>(dummy::ConnectionHandler).build(); + let mut dialer = new_test_swarm().build(); + let mut listener = new_test_swarm().build(); let listener_peer_id = *listener.local_peer_id(); listener.listen_on(multiaddr![Memory(0u64)]).unwrap(); diff --git a/transports/pnet/tests/smoke.rs b/transports/pnet/tests/smoke.rs index a7635c00ca3e..5e02ed856c6f 100644 --- a/transports/pnet/tests/smoke.rs +++ b/transports/pnet/tests/smoke.rs @@ -6,7 +6,7 @@ use libp2p_core::upgrade::Version; use libp2p_core::Transport; use libp2p_core::{multiaddr::Protocol, Multiaddr}; use libp2p_pnet::{PnetConfig, PreSharedKey}; -use libp2p_swarm::{keep_alive, NetworkBehaviour, Swarm, SwarmBuilder, SwarmEvent}; +use libp2p_swarm::{dummy, NetworkBehaviour, Swarm, SwarmBuilder, SwarmEvent}; const TIMEOUT: Duration = Duration::from_secs(5); @@ -98,7 +98,7 @@ where assert_eq!(&outbound_peer_id, swarm1.local_peer_id()); } -fn make_swarm(transport: T, pnet: PnetConfig) -> Swarm +fn make_swarm(transport: T, pnet: PnetConfig) -> Swarm where T: Transport + Send + Unpin + 'static, ::Error: Send + Sync + 'static, @@ -113,12 +113,9 @@ where .authenticate(libp2p_noise::Config::new(&identity).unwrap()) .multiplex(libp2p_yamux::Config::default()) .boxed(); - SwarmBuilder::with_tokio_executor( - transport, - keep_alive::Behaviour, - identity.public().to_peer_id(), - ) - .build() + SwarmBuilder::with_tokio_executor(transport, dummy::Behaviour, identity.public().to_peer_id()) + .idle_connection_timeout(Duration::from_secs(5)) + .build() } async fn listen_on(swarm: &mut Swarm, addr: Multiaddr) -> Multiaddr { diff --git a/transports/tls/tests/smoke.rs b/transports/tls/tests/smoke.rs index 17aa959c4b2a..0db39edf2809 100644 --- a/transports/tls/tests/smoke.rs +++ b/transports/tls/tests/smoke.rs @@ -3,7 +3,8 @@ use libp2p_core::multiaddr::Protocol; use libp2p_core::transport::MemoryTransport; use libp2p_core::upgrade::Version; use libp2p_core::Transport; -use libp2p_swarm::{keep_alive, Swarm, SwarmBuilder, SwarmEvent}; +use libp2p_swarm::{dummy, Swarm, SwarmBuilder, SwarmEvent}; +use std::time::Duration; #[tokio::test] async fn can_establish_connection() { @@ -55,7 +56,7 @@ async fn can_establish_connection() { assert_eq!(&outbound_peer_id, swarm1.local_peer_id()); } -fn make_swarm() -> Swarm { +fn make_swarm() -> Swarm { let identity = libp2p_identity::Keypair::generate_ed25519(); let transport = MemoryTransport::default() @@ -64,10 +65,7 @@ fn make_swarm() -> Swarm { .multiplex(libp2p_yamux::Config::default()) .boxed(); - SwarmBuilder::without_executor( - transport, - keep_alive::Behaviour, - identity.public().to_peer_id(), - ) - .build() + SwarmBuilder::without_executor(transport, dummy::Behaviour, identity.public().to_peer_id()) + .idle_connection_timeout(Duration::from_secs(5)) + .build() } diff --git a/transports/webrtc/examples/listen_ping.rs b/transports/webrtc/examples/listen_ping.rs index 8475195a1ce9..ad867f26db0e 100644 --- a/transports/webrtc/examples/listen_ping.rs +++ b/transports/webrtc/examples/listen_ping.rs @@ -4,8 +4,9 @@ use libp2p_core::muxing::StreamMuxerBox; use libp2p_core::Transport; use libp2p_identity as identity; use libp2p_ping as ping; -use libp2p_swarm::{keep_alive, NetworkBehaviour, Swarm, SwarmBuilder}; +use libp2p_swarm::{Swarm, SwarmBuilder}; use rand::thread_rng; +use std::time::Duration; use void::Void; /// An example WebRTC server that will accept connections and run the ping protocol on them. @@ -21,7 +22,7 @@ async fn main() -> Result<()> { } } -fn create_swarm() -> Result> { +fn create_swarm() -> Result> { let id_keys = identity::Keypair::generate_ed25519(); let peer_id = id_keys.public().to_peer_id(); let transport = libp2p_webrtc::tokio::Transport::new( @@ -29,18 +30,16 @@ fn create_swarm() -> Result> { libp2p_webrtc::tokio::Certificate::generate(&mut thread_rng())?, ); + let cfg = ping::Config::new().with_interval(Duration::from_millis(10)); let transport = transport .map(|(peer_id, conn), _| (peer_id, StreamMuxerBox::new(conn))) .boxed(); - Ok(SwarmBuilder::with_tokio_executor(transport, Behaviour::default(), peer_id).build()) -} - -#[derive(NetworkBehaviour, Default)] -#[behaviour(to_swarm = "Event", prelude = "libp2p_swarm::derive_prelude")] -struct Behaviour { - ping: ping::Behaviour, - keep_alive: keep_alive::Behaviour, + Ok( + SwarmBuilder::with_tokio_executor(transport, ping::Behaviour::new(cfg.clone()), peer_id) + .idle_connection_timeout(Duration::from_secs(5)) + .build(), + ) } #[derive(Debug)]