diff --git a/examples/browser-webrtc/src/main.rs b/examples/browser-webrtc/src/main.rs index f919f047af5..8a4034a436e 100644 --- a/examples/browser-webrtc/src/main.rs +++ b/examples/browser-webrtc/src/main.rs @@ -13,11 +13,12 @@ use libp2p::{ identity, multiaddr::{Multiaddr, Protocol}, ping, - swarm::{keep_alive, NetworkBehaviour, SwarmBuilder, SwarmEvent}, + swarm::{SwarmBuilder, SwarmEvent}, }; use libp2p_webrtc as webrtc; use rand::thread_rng; use std::net::{Ipv4Addr, SocketAddr}; +use std::time::Duration; use tower_http::cors::{Any, CorsLayer}; #[tokio::main] @@ -36,12 +37,10 @@ async fn main() -> anyhow::Result<()> { .map(|(peer_id, conn), _| (peer_id, StreamMuxerBox::new(conn))) .boxed(); - let behaviour = Behaviour { - ping: ping::Behaviour::new(ping::Config::new()), - keep_alive: keep_alive::Behaviour, - }; - - let mut swarm = SwarmBuilder::with_tokio_executor(transport, behaviour, local_peer_id).build(); + let mut swarm = + SwarmBuilder::with_tokio_executor(transport, ping::Behaviour::default(), local_peer_id) + .idle_connection_timeout(Duration::from_secs(30)) // Allows us to observe the pings. + .build(); let address_webrtc = Multiaddr::from(Ipv4Addr::UNSPECIFIED) .with(Protocol::Udp(0)) @@ -84,12 +83,6 @@ async fn main() -> anyhow::Result<()> { Ok(()) } -#[derive(NetworkBehaviour)] -struct Behaviour { - ping: ping::Behaviour, - keep_alive: keep_alive::Behaviour, -} - #[derive(rust_embed::RustEmbed)] #[folder = "$CARGO_MANIFEST_DIR/static"] struct StaticFiles; diff --git a/examples/metrics/src/main.rs b/examples/metrics/src/main.rs index fa9d5bd37e7..b28abaee941 100644 --- a/examples/metrics/src/main.rs +++ b/examples/metrics/src/main.rs @@ -26,12 +26,13 @@ use futures::stream::StreamExt; use libp2p::core::{upgrade::Version, Multiaddr, Transport}; use libp2p::identity::PeerId; use libp2p::metrics::{Metrics, Recorder}; -use libp2p::swarm::{keep_alive, NetworkBehaviour, SwarmBuilder, SwarmEvent}; +use libp2p::swarm::{NetworkBehaviour, SwarmBuilder, SwarmEvent}; use libp2p::{identify, identity, noise, ping, tcp, yamux}; use log::info; use prometheus_client::registry::Registry; use std::error::Error; use std::thread; +use std::time::Duration; mod http_service; @@ -51,6 +52,7 @@ fn main() -> Result<(), Box> { Behaviour::new(local_pub_key), local_peer_id, ) + .idle_connection_timeout(Duration::from_secs(60)) .build(); swarm.listen_on("/ip4/0.0.0.0/tcp/0".parse()?)?; @@ -87,13 +89,9 @@ fn main() -> Result<(), Box> { } /// Our network behaviour. -/// -/// For illustrative purposes, this includes the [`keep_alive::Behaviour`]) behaviour so the ping actually happen -/// and can be observed via the metrics. #[derive(NetworkBehaviour)] struct Behaviour { identify: identify::Behaviour, - keep_alive: keep_alive::Behaviour, ping: ping::Behaviour, } @@ -105,7 +103,6 @@ impl Behaviour { "/ipfs/0.1.0".into(), local_pub_key, )), - keep_alive: keep_alive::Behaviour, } } } diff --git a/examples/ping-example/src/main.rs b/examples/ping-example/src/main.rs index 6af079ccfd0..898a25813e0 100644 --- a/examples/ping-example/src/main.rs +++ b/examples/ping-example/src/main.rs @@ -24,10 +24,11 @@ use futures::prelude::*; use libp2p::core::upgrade::Version; use libp2p::{ identity, noise, ping, - swarm::{keep_alive, NetworkBehaviour, SwarmBuilder, SwarmEvent}, + swarm::{SwarmBuilder, SwarmEvent}, tcp, yamux, Multiaddr, PeerId, Transport, }; use std::error::Error; +use std::time::Duration; #[async_std::main] async fn main() -> Result<(), Box> { @@ -42,7 +43,8 @@ async fn main() -> Result<(), Box> { .boxed(); let mut swarm = - SwarmBuilder::with_async_std_executor(transport, Behaviour::default(), local_peer_id) + SwarmBuilder::with_async_std_executor(transport, ping::Behaviour::default(), local_peer_id) + .idle_connection_timeout(Duration::from_secs(60)) // For illustrative purposes, keep idle connections alive for a minute so we can observe a few pings. .build(); // Tell the swarm to listen on all interfaces and a random, OS-assigned @@ -65,13 +67,3 @@ async fn main() -> Result<(), Box> { } } } - -/// Our network behaviour. -/// -/// For illustrative purposes, this includes the [`KeepAlive`](keep_alive::Behaviour) behaviour so a continuous sequence of -/// pings can be observed. -#[derive(NetworkBehaviour, Default)] -struct Behaviour { - keep_alive: keep_alive::Behaviour, - ping: ping::Behaviour, -} diff --git a/examples/rendezvous/src/bin/rzv-discover.rs b/examples/rendezvous/src/bin/rzv-discover.rs index bd510496ae3..ac45afae840 100644 --- a/examples/rendezvous/src/bin/rzv-discover.rs +++ b/examples/rendezvous/src/bin/rzv-discover.rs @@ -24,7 +24,7 @@ use libp2p::{ identity, multiaddr::Protocol, noise, ping, rendezvous, - swarm::{keep_alive, NetworkBehaviour, SwarmBuilder, SwarmEvent}, + swarm::{NetworkBehaviour, SwarmBuilder, SwarmEvent}, tcp, yamux, Multiaddr, PeerId, Transport, }; use std::time::Duration; @@ -50,10 +50,10 @@ async fn main() { MyBehaviour { rendezvous: rendezvous::client::Behaviour::new(key_pair.clone()), ping: ping::Behaviour::new(ping::Config::new().with_interval(Duration::from_secs(1))), - keep_alive: keep_alive::Behaviour, }, PeerId::from(key_pair.public()), ) + .idle_connection_timeout(Duration::from_secs(5)) .build(); swarm.dial(rendezvous_point_address.clone()).unwrap(); @@ -127,5 +127,4 @@ async fn main() { struct MyBehaviour { rendezvous: rendezvous::client::Behaviour, ping: ping::Behaviour, - keep_alive: keep_alive::Behaviour, } diff --git a/examples/rendezvous/src/bin/rzv-identify.rs b/examples/rendezvous/src/bin/rzv-identify.rs index 1ef8569c612..95ed7a5ccd8 100644 --- a/examples/rendezvous/src/bin/rzv-identify.rs +++ b/examples/rendezvous/src/bin/rzv-identify.rs @@ -22,7 +22,7 @@ use futures::StreamExt; use libp2p::{ core::transport::upgrade::Version, identify, identity, noise, ping, rendezvous, - swarm::{keep_alive, NetworkBehaviour, SwarmBuilder, SwarmEvent}, + swarm::{NetworkBehaviour, SwarmBuilder, SwarmEvent}, tcp, yamux, Multiaddr, PeerId, Transport, }; use std::time::Duration; @@ -50,10 +50,10 @@ async fn main() { )), rendezvous: rendezvous::client::Behaviour::new(key_pair.clone()), ping: ping::Behaviour::new(ping::Config::new().with_interval(Duration::from_secs(1))), - keep_alive: keep_alive::Behaviour, }, PeerId::from(key_pair.public()), ) + .idle_connection_timeout(Duration::from_secs(5)) .build(); let _ = swarm.listen_on("/ip4/0.0.0.0/tcp/0".parse().unwrap()); @@ -133,5 +133,4 @@ struct MyBehaviour { identify: identify::Behaviour, rendezvous: rendezvous::client::Behaviour, ping: ping::Behaviour, - keep_alive: keep_alive::Behaviour, } diff --git a/examples/rendezvous/src/bin/rzv-register.rs b/examples/rendezvous/src/bin/rzv-register.rs index 95407dd9c7d..51acfee2a71 100644 --- a/examples/rendezvous/src/bin/rzv-register.rs +++ b/examples/rendezvous/src/bin/rzv-register.rs @@ -22,7 +22,7 @@ use futures::StreamExt; use libp2p::{ core::transport::upgrade::Version, identity, noise, ping, rendezvous, - swarm::{keep_alive, NetworkBehaviour, SwarmBuilder, SwarmEvent}, + swarm::{NetworkBehaviour, SwarmBuilder, SwarmEvent}, tcp, yamux, Multiaddr, PeerId, Transport, }; use std::time::Duration; @@ -46,10 +46,10 @@ async fn main() { MyBehaviour { rendezvous: rendezvous::client::Behaviour::new(key_pair.clone()), ping: ping::Behaviour::new(ping::Config::new().with_interval(Duration::from_secs(1))), - keep_alive: keep_alive::Behaviour, }, PeerId::from(key_pair.public()), ) + .idle_connection_timeout(Duration::from_secs(5)) .build(); // In production the external address should be the publicly facing IP address of the rendezvous point. @@ -130,5 +130,4 @@ async fn main() { struct MyBehaviour { rendezvous: rendezvous::client::Behaviour, ping: ping::Behaviour, - keep_alive: keep_alive::Behaviour, } diff --git a/examples/rendezvous/src/main.rs b/examples/rendezvous/src/main.rs index 44b1716e176..a3ed3c0fce5 100644 --- a/examples/rendezvous/src/main.rs +++ b/examples/rendezvous/src/main.rs @@ -24,7 +24,7 @@ use futures::StreamExt; use libp2p::{ core::transport::upgrade::Version, identify, identity, noise, ping, rendezvous, - swarm::{keep_alive, NetworkBehaviour, SwarmBuilder, SwarmEvent}, + swarm::{NetworkBehaviour, SwarmBuilder, SwarmEvent}, tcp, yamux, PeerId, Transport, }; use std::time::Duration; @@ -48,10 +48,10 @@ async fn main() { )), rendezvous: rendezvous::server::Behaviour::new(rendezvous::server::Config::default()), ping: ping::Behaviour::new(ping::Config::new().with_interval(Duration::from_secs(1))), - keep_alive: keep_alive::Behaviour, }, PeerId::from(key_pair.public()), ) + .idle_connection_timeout(Duration::from_secs(5)) .build(); let _ = swarm.listen_on("/ip4/0.0.0.0/tcp/62649".parse().unwrap()); @@ -97,5 +97,4 @@ struct MyBehaviour { identify: identify::Behaviour, rendezvous: rendezvous::server::Behaviour, ping: ping::Behaviour, - keep_alive: keep_alive::Behaviour, } diff --git a/interop-tests/src/lib.rs b/interop-tests/src/lib.rs index 54d94430c8e..40c06b57810 100644 --- a/interop-tests/src/lib.rs +++ b/interop-tests/src/lib.rs @@ -3,8 +3,8 @@ use std::time::Duration; use anyhow::{bail, Context, Result}; use futures::{FutureExt, StreamExt}; -use libp2p::swarm::{keep_alive, NetworkBehaviour, SwarmEvent}; -use libp2p::{identify, identity, ping, Multiaddr, PeerId}; +use libp2p::swarm::SwarmEvent; +use libp2p::{identify, identity, ping, swarm::NetworkBehaviour, Multiaddr, PeerId}; #[cfg(target_arch = "wasm32")] use wasm_bindgen::prelude::*; @@ -33,8 +33,7 @@ pub async fn run_test( let mut swarm = swarm_builder( boxed_transport, Behaviour { - ping: ping::Behaviour::new(ping::Config::new().with_interval(Duration::from_secs(1))), - keep_alive: keep_alive::Behaviour, + ping: ping::Behaviour::new(ping::Config::new().with_interval(Duration::from_secs(10))), // Need to include identify until https://github.com/status-im/nim-libp2p/issues/924 is resolved. identify: identify::Behaviour::new(identify::Config::new( "/interop-tests".to_owned(), @@ -43,6 +42,7 @@ pub async fn run_test( }, local_peer_id, ) + .idle_connection_timeout(Duration::from_secs(5)) .build(); log::info!("Running ping test: {}", swarm.local_peer_id()); @@ -242,7 +242,6 @@ impl FromStr for SecProtocol { #[derive(NetworkBehaviour)] struct Behaviour { ping: ping::Behaviour, - keep_alive: keep_alive::Behaviour, identify: identify::Behaviour, } diff --git a/libp2p/src/tutorials/ping.rs b/libp2p/src/tutorials/ping.rs index 006c807803c..aedc149228e 100644 --- a/libp2p/src/tutorials/ping.rs +++ b/libp2p/src/tutorials/ping.rs @@ -143,7 +143,7 @@ //! With the above in mind, let's extend our example, creating a [`ping::Behaviour`](crate::ping::Behaviour) at the end: //! //! ```rust -//! use libp2p::swarm::{keep_alive, NetworkBehaviour}; +//! use libp2p::swarm::NetworkBehaviour; //! use libp2p::{identity, ping, PeerId}; //! use std::error::Error; //! @@ -155,20 +155,10 @@ //! //! let transport = libp2p::development_transport(local_key).await?; //! -//! let behaviour = Behaviour::default(); +//! let behaviour = ping::Behaviour::default(); //! //! Ok(()) //! } -//! -//! /// Our network behaviour. -//! /// -//! /// For illustrative purposes, this includes the [`KeepAlive`](behaviour::KeepAlive) behaviour so a continuous sequence of -//! /// pings can be observed. -//! #[derive(NetworkBehaviour, Default)] -//! struct Behaviour { -//! keep_alive: keep_alive::Behaviour, -//! ping: ping::Behaviour, -//! } //! ``` //! //! ## Swarm @@ -185,7 +175,7 @@ //! (In our example, `env_logger` is used) //! //! ```rust -//! use libp2p::swarm::{keep_alive, NetworkBehaviour, SwarmBuilder}; +//! use libp2p::swarm::{NetworkBehaviour, SwarmBuilder}; //! use libp2p::{identity, ping, PeerId}; //! use std::error::Error; //! @@ -197,21 +187,46 @@ //! //! let transport = libp2p::development_transport(local_key).await?; //! -//! let behaviour = Behaviour::default(); +//! let behaviour = ping::Behaviour::default(); //! //! let mut swarm = SwarmBuilder::with_async_std_executor(transport, behaviour, local_peer_id).build(); //! //! Ok(()) //! } +//! ``` +//! +//! ## Idle connection timeout +//! +//! Now, for this example in particular, we need set the idle connection timeout. +//! Otherwise, the connection will be closed immediately. +//! +//! Whether you need to set this in your application too depends on your usecase. +//! Typically, connections are kept alive if they are "in use" by a certain protocol. +//! The ping protocol however is only an "auxiliary" kind of protocol. +//! Thus, without any other behaviour in place, we would not be able to observe the pings. +//! +//! ```rust +//! use libp2p::swarm::{NetworkBehaviour, SwarmBuilder}; +//! use libp2p::{identity, ping, PeerId}; +//! use std::error::Error; +//! use std::time::Duration; +//! +//! #[async_std::main] +//! async fn main() -> Result<(), Box> { +//! use std::time::Duration; +//! let local_key = identity::Keypair::generate_ed25519(); +//! let local_peer_id = PeerId::from(local_key.public()); +//! println!("Local peer id: {local_peer_id:?}"); +//! +//! let transport = libp2p::development_transport(local_key).await?; //! -//! /// Our network behaviour. -//! /// -//! /// For illustrative purposes, this includes the [`KeepAlive`](behaviour:: -//! /// KeepAlive) behaviour so a continuous sequence of pings can be observed. -//! #[derive(NetworkBehaviour, Default)] -//! struct Behaviour { -//! keep_alive: keep_alive::Behaviour, -//! ping: ping::Behaviour, +//! let behaviour = ping::Behaviour::default(); +//! +//! let mut swarm = SwarmBuilder::with_async_std_executor(transport, behaviour, local_peer_id) +//! .idle_connection_timeout(Duration::from_secs(30)) // Allows us to observe pings for 30 seconds. +//! .build(); +//! +//! Ok(()) //! } //! ``` //! @@ -242,9 +257,10 @@ //! remote peer. //! //! ```rust -//! use libp2p::swarm::{keep_alive, NetworkBehaviour, SwarmBuilder}; +//! use libp2p::swarm::{NetworkBehaviour, SwarmBuilder}; //! use libp2p::{identity, ping, Multiaddr, PeerId}; //! use std::error::Error; +//! use std::time::Duration; //! //! #[async_std::main] //! async fn main() -> Result<(), Box> { @@ -254,9 +270,11 @@ //! //! let transport = libp2p::development_transport(local_key).await?; //! -//! let behaviour = Behaviour::default(); +//! let behaviour = ping::Behaviour::default(); //! -//! let mut swarm = SwarmBuilder::with_async_std_executor(transport, behaviour, local_peer_id).build(); +//! let mut swarm = SwarmBuilder::with_async_std_executor(transport, behaviour, local_peer_id) +//! .idle_connection_timeout(Duration::from_secs(30)) // Allows us to observe pings for 30 seconds. +//! .build(); //! //! // Tell the swarm to listen on all interfaces and a random, OS-assigned //! // port. @@ -272,16 +290,6 @@ //! //! Ok(()) //! } -//! -//! /// Our network behaviour. -//! /// -//! /// For illustrative purposes, this includes the [`KeepAlive`](behaviour::KeepAlive) behaviour so a continuous sequence of -//! /// pings can be observed. -//! #[derive(NetworkBehaviour, Default)] -//! struct Behaviour { -//! keep_alive: keep_alive::Behaviour, -//! ping: ping::Behaviour, -//! } //! ``` //! //! ## Continuously polling the Swarm @@ -292,9 +300,10 @@ //! //! ```no_run //! use futures::prelude::*; -//! use libp2p::swarm::{keep_alive, NetworkBehaviour, SwarmEvent, SwarmBuilder}; +//! use libp2p::swarm::{NetworkBehaviour, SwarmEvent, SwarmBuilder}; //! use libp2p::{identity, ping, Multiaddr, PeerId}; //! use std::error::Error; +//! use std::time::Duration; //! //! #[async_std::main] //! async fn main() -> Result<(), Box> { @@ -304,9 +313,11 @@ //! //! let transport = libp2p::development_transport(local_key).await?; //! -//! let behaviour = Behaviour::default(); +//! let behaviour = ping::Behaviour::default(); //! -//! let mut swarm = SwarmBuilder::with_async_std_executor(transport, behaviour, local_peer_id).build(); +//! let mut swarm = SwarmBuilder::with_async_std_executor(transport, behaviour, local_peer_id) +//! .idle_connection_timeout(Duration::from_secs(30)) // Allows us to observe pings for 30 seconds. +//! .build(); //! //! // Tell the swarm to listen on all interfaces and a random, OS-assigned //! // port. @@ -328,16 +339,6 @@ //! } //! } //! } -//! -//! /// Our network behaviour. -//! /// -//! /// For illustrative purposes, this includes the [`KeepAlive`](behaviour::KeepAlive) behaviour so a continuous sequence of -//! /// pings can be observed. -//! #[derive(NetworkBehaviour, Default)] -//! struct Behaviour { -//! keep_alive: keep_alive::Behaviour, -//! ping: ping::Behaviour, -//! } //! ``` //! //! ## Running two nodes diff --git a/misc/allow-block-list/src/lib.rs b/misc/allow-block-list/src/lib.rs index eed79d740a1..1950c47f28b 100644 --- a/misc/allow-block-list/src/lib.rs +++ b/misc/allow-block-list/src/lib.rs @@ -283,14 +283,11 @@ mod tests { #[async_std::test] async fn cannot_dial_blocked_peer() { - let mut dialer = Swarm::new_ephemeral(|_| Behaviour::::new()); - let mut listener = Swarm::new_ephemeral(|_| Behaviour::::new()); + let mut dialer = Swarm::new_ephemeral(|_| Behaviour::::default()); + let mut listener = Swarm::new_ephemeral(|_| Behaviour::::default()); listener.listen().await; - dialer - .behaviour_mut() - .list - .block_peer(*listener.local_peer_id()); + dialer.behaviour_mut().block_peer(*listener.local_peer_id()); let DialError::Denied { cause } = dial(&mut dialer, &listener).unwrap_err() else { panic!("unexpected dial error") @@ -300,17 +297,13 @@ mod tests { #[async_std::test] async fn can_dial_unblocked_peer() { - let mut dialer = Swarm::new_ephemeral(|_| Behaviour::::new()); - let mut listener = Swarm::new_ephemeral(|_| Behaviour::::new()); + let mut dialer = Swarm::new_ephemeral(|_| Behaviour::::default()); + let mut listener = Swarm::new_ephemeral(|_| Behaviour::::default()); listener.listen().await; + dialer.behaviour_mut().block_peer(*listener.local_peer_id()); dialer .behaviour_mut() - .list - .block_peer(*listener.local_peer_id()); - dialer - .behaviour_mut() - .list .unblock_peer(*listener.local_peer_id()); dial(&mut dialer, &listener).unwrap(); @@ -318,14 +311,11 @@ mod tests { #[async_std::test] async fn blocked_peer_cannot_dial_us() { - let mut dialer = Swarm::new_ephemeral(|_| Behaviour::::new()); - let mut listener = Swarm::new_ephemeral(|_| Behaviour::::new()); + let mut dialer = Swarm::new_ephemeral(|_| Behaviour::::default()); + let mut listener = Swarm::new_ephemeral(|_| Behaviour::::default()); listener.listen().await; - listener - .behaviour_mut() - .list - .block_peer(*dialer.local_peer_id()); + listener.behaviour_mut().block_peer(*dialer.local_peer_id()); dial(&mut dialer, &listener).unwrap(); async_std::task::spawn(dialer.loop_on_next()); @@ -343,15 +333,12 @@ mod tests { #[async_std::test] async fn connections_get_closed_upon_blocked() { - let mut dialer = Swarm::new_ephemeral(|_| Behaviour::::new()); - let mut listener = Swarm::new_ephemeral(|_| Behaviour::::new()); + let mut dialer = Swarm::new_ephemeral(|_| Behaviour::::default()); + let mut listener = Swarm::new_ephemeral(|_| Behaviour::::default()); listener.listen().await; dialer.connect(&mut listener).await; - dialer - .behaviour_mut() - .list - .block_peer(*listener.local_peer_id()); + dialer.behaviour_mut().block_peer(*listener.local_peer_id()); let ( [SwarmEvent::ConnectionClosed { @@ -372,8 +359,8 @@ mod tests { #[async_std::test] async fn cannot_dial_peer_unless_allowed() { - let mut dialer = Swarm::new_ephemeral(|_| Behaviour::::new()); - let mut listener = Swarm::new_ephemeral(|_| Behaviour::::new()); + let mut dialer = Swarm::new_ephemeral(|_| Behaviour::::default()); + let mut listener = Swarm::new_ephemeral(|_| Behaviour::::default()); listener.listen().await; let DialError::Denied { cause } = dial(&mut dialer, &listener).unwrap_err() else { @@ -381,26 +368,19 @@ mod tests { }; assert!(cause.downcast::().is_ok()); - dialer - .behaviour_mut() - .list - .allow_peer(*listener.local_peer_id()); + dialer.behaviour_mut().allow_peer(*listener.local_peer_id()); assert!(dial(&mut dialer, &listener).is_ok()); } #[async_std::test] async fn cannot_dial_disallowed_peer() { - let mut dialer = Swarm::new_ephemeral(|_| Behaviour::::new()); - let mut listener = Swarm::new_ephemeral(|_| Behaviour::::new()); + let mut dialer = Swarm::new_ephemeral(|_| Behaviour::::default()); + let mut listener = Swarm::new_ephemeral(|_| Behaviour::::default()); listener.listen().await; + dialer.behaviour_mut().allow_peer(*listener.local_peer_id()); dialer .behaviour_mut() - .list - .allow_peer(*listener.local_peer_id()); - dialer - .behaviour_mut() - .list .disallow_peer(*listener.local_peer_id()); let DialError::Denied { cause } = dial(&mut dialer, &listener).unwrap_err() else { @@ -411,8 +391,8 @@ mod tests { #[async_std::test] async fn not_allowed_peer_cannot_dial_us() { - let mut dialer = Swarm::new_ephemeral(|_| Behaviour::::new()); - let mut listener = Swarm::new_ephemeral(|_| Behaviour::::new()); + let mut dialer = Swarm::new_ephemeral(|_| Behaviour::::default()); + let mut listener = Swarm::new_ephemeral(|_| Behaviour::::default()); listener.listen().await; dialer @@ -448,23 +428,16 @@ mod tests { #[async_std::test] async fn connections_get_closed_upon_disallow() { - let mut dialer = Swarm::new_ephemeral(|_| Behaviour::::new()); - let mut listener = Swarm::new_ephemeral(|_| Behaviour::::new()); + let mut dialer = Swarm::new_ephemeral(|_| Behaviour::::default()); + let mut listener = Swarm::new_ephemeral(|_| Behaviour::::default()); listener.listen().await; - dialer - .behaviour_mut() - .list - .allow_peer(*listener.local_peer_id()); - listener - .behaviour_mut() - .list - .allow_peer(*dialer.local_peer_id()); + dialer.behaviour_mut().allow_peer(*listener.local_peer_id()); + listener.behaviour_mut().allow_peer(*dialer.local_peer_id()); dialer.connect(&mut listener).await; dialer .behaviour_mut() - .list .disallow_peer(*listener.local_peer_id()); let ( [SwarmEvent::ConnectionClosed { @@ -496,27 +469,4 @@ mod tests { .build(), ) } - - #[derive(libp2p_swarm_derive::NetworkBehaviour)] - #[behaviour(prelude = "libp2p_swarm::derive_prelude")] - struct Behaviour { - list: super::Behaviour, - keep_alive: libp2p_swarm::keep_alive::Behaviour, - } - - impl Behaviour - where - S: Default, - { - fn new() -> Self { - Self { - list: super::Behaviour { - waker: None, - close_connections: VecDeque::new(), - state: S::default(), - }, - keep_alive: libp2p_swarm::keep_alive::Behaviour, - } - } - } } diff --git a/misc/connection-limits/src/lib.rs b/misc/connection-limits/src/lib.rs index e4723dd95c6..7de96cc1736 100644 --- a/misc/connection-limits/src/lib.rs +++ b/misc/connection-limits/src/lib.rs @@ -529,7 +529,6 @@ mod tests { #[behaviour(prelude = "libp2p_swarm::derive_prelude")] struct Behaviour { limits: super::Behaviour, - keep_alive: libp2p_swarm::keep_alive::Behaviour, connection_denier: Toggle, } @@ -537,14 +536,12 @@ mod tests { fn new(limits: ConnectionLimits) -> Self { Self { limits: super::Behaviour::new(limits), - keep_alive: libp2p_swarm::keep_alive::Behaviour, connection_denier: None.into(), } } fn new_with_connection_denier(limits: ConnectionLimits) -> Self { Self { limits: super::Behaviour::new(limits), - keep_alive: libp2p_swarm::keep_alive::Behaviour, connection_denier: Some(ConnectionDenier {}).into(), } } diff --git a/protocols/identify/tests/smoke.rs b/protocols/identify/tests/smoke.rs index c70ab3181b4..c1926b4125f 100644 --- a/protocols/identify/tests/smoke.rs +++ b/protocols/identify/tests/smoke.rs @@ -1,6 +1,6 @@ use libp2p_core::multiaddr::Protocol; use libp2p_identify as identify; -use libp2p_swarm::{keep_alive, Swarm, SwarmEvent}; +use libp2p_swarm::{Swarm, SwarmEvent}; use libp2p_swarm_test::SwarmExt; use std::iter; @@ -9,7 +9,7 @@ async fn periodic_identify() { let _ = env_logger::try_init(); let mut swarm1 = Swarm::new_ephemeral(|identity| { - Behaviour::new( + identify::Behaviour::new( identify::Config::new("a".to_string(), identity.public()) .with_agent_version("b".to_string()), ) @@ -17,7 +17,7 @@ async fn periodic_identify() { let swarm1_peer_id = *swarm1.local_peer_id(); let mut swarm2 = Swarm::new_ephemeral(|identity| { - Behaviour::new( + identify::Behaviour::new( identify::Config::new("c".to_string(), identity.public()) .with_agent_version("d".to_string()), ) @@ -33,20 +33,20 @@ async fn periodic_identify() { match libp2p_swarm_test::drive(&mut swarm1, &mut swarm2).await { ( - [BehaviourEvent::Identify(Received { info: s1_info, .. }), BehaviourEvent::Identify(Sent { .. })], - [BehaviourEvent::Identify(Received { info: s2_info, .. }), BehaviourEvent::Identify(Sent { .. })], + [Received { info: s1_info, .. }, Sent { .. }], + [Received { info: s2_info, .. }, Sent { .. }], ) | ( - [BehaviourEvent::Identify(Sent { .. }), BehaviourEvent::Identify(Received { info: s1_info, .. })], - [BehaviourEvent::Identify(Sent { .. }), BehaviourEvent::Identify(Received { info: s2_info, .. })], + [Sent { .. }, Received { info: s1_info, .. }], + [Sent { .. }, Received { info: s2_info, .. }], ) | ( - [BehaviourEvent::Identify(Received { info: s1_info, .. }), BehaviourEvent::Identify(Sent { .. })], - [BehaviourEvent::Identify(Sent { .. }), BehaviourEvent::Identify(Received { info: s2_info, .. })], + [Received { info: s1_info, .. }, Sent { .. }], + [Sent { .. }, Received { info: s2_info, .. }], ) | ( - [BehaviourEvent::Identify(Sent { .. }), BehaviourEvent::Identify(Received { info: s1_info, .. })], - [BehaviourEvent::Identify(Received { info: s2_info, .. }), BehaviourEvent::Identify(Sent { .. })], + [Sent { .. }, Received { info: s1_info, .. }], + [Received { info: s2_info, .. }, Sent { .. }], ) => { assert_eq!(s1_info.public_key.to_peer_id(), swarm2_peer_id); assert_eq!(s1_info.protocol_version, "c"); @@ -83,10 +83,10 @@ async fn identify_push() { let _ = env_logger::try_init(); let mut swarm1 = Swarm::new_ephemeral(|identity| { - Behaviour::new(identify::Config::new("a".to_string(), identity.public())) + identify::Behaviour::new(identify::Config::new("a".to_string(), identity.public())) }); let mut swarm2 = Swarm::new_ephemeral(|identity| { - Behaviour::new( + identify::Behaviour::new( identify::Config::new("a".to_string(), identity.public()) .with_agent_version("b".to_string()), ) @@ -96,33 +96,25 @@ async fn identify_push() { swarm2.connect(&mut swarm1).await; // First, let the periodic identify do its thing. - match libp2p_swarm_test::drive(&mut swarm1, &mut swarm2).await { - ( - [BehaviourEvent::Identify(e1), BehaviourEvent::Identify(e2)], - [BehaviourEvent::Identify(e3), BehaviourEvent::Identify(e4)], - ) => { - use identify::Event::{Received, Sent}; + let ([e1, e2], [e3, e4]) = libp2p_swarm_test::drive(&mut swarm1, &mut swarm2).await; - // These can be received in any order, hence assert them here instead of the pattern above. - assert!(matches!(e1, Received { .. } | Sent { .. })); - assert!(matches!(e2, Received { .. } | Sent { .. })); - assert!(matches!(e3, Received { .. } | Sent { .. })); - assert!(matches!(e4, Received { .. } | Sent { .. })); - } - other => panic!("Unexpected events: {other:?}"), - }; + { + use identify::Event::{Received, Sent}; + + // These can be received in any order, hence assert them here. + assert!(matches!(e1, Received { .. } | Sent { .. })); + assert!(matches!(e2, Received { .. } | Sent { .. })); + assert!(matches!(e3, Received { .. } | Sent { .. })); + assert!(matches!(e4, Received { .. } | Sent { .. })); + } // Second, actively push. swarm2 .behaviour_mut() - .identify .push(iter::once(*swarm1.local_peer_id())); let swarm1_received_info = match libp2p_swarm_test::drive(&mut swarm1, &mut swarm2).await { - ( - [BehaviourEvent::Identify(identify::Event::Received { info, .. })], - [BehaviourEvent::Identify(identify::Event::Pushed { .. })], - ) => info, + ([identify::Event::Received { info, .. }], [identify::Event::Pushed { .. }]) => info, other => panic!("Unexpected events: {other:?}"), }; @@ -141,10 +133,10 @@ async fn discover_peer_after_disconnect() { let _ = env_logger::try_init(); let mut swarm1 = Swarm::new_ephemeral(|identity| { - Behaviour::new(identify::Config::new("a".to_string(), identity.public())) + identify::Behaviour::new(identify::Config::new("a".to_string(), identity.public())) }); let mut swarm2 = Swarm::new_ephemeral(|identity| { - Behaviour::new( + identify::Behaviour::new( identify::Config::new("a".to_string(), identity.public()) .with_agent_version("b".to_string()), ) @@ -161,7 +153,7 @@ async fn discover_peer_after_disconnect() { .wait(|event| { matches!( event, - SwarmEvent::Behaviour(BehaviourEvent::Identify(identify::Event::Received { .. })) + SwarmEvent::Behaviour(identify::Event::Received { .. }) ) .then_some(()) }) @@ -186,23 +178,3 @@ async fn discover_peer_after_disconnect() { assert_eq!(connected_peer, swarm1_peer_id); } - -/// Combined behaviour to keep the connection alive after the periodic identify. -/// -/// The identify implementation sets `keep_alive` to `No` once it has done its thing. -/// This can result in unexpected connection closures if one peer is faster than the other. -#[derive(libp2p_swarm::NetworkBehaviour)] -#[behaviour(prelude = "libp2p_swarm::derive_prelude")] -struct Behaviour { - identify: identify::Behaviour, - keep_alive: keep_alive::Behaviour, -} - -impl Behaviour { - fn new(config: identify::Config) -> Self { - Self { - identify: identify::Behaviour::new(config), - keep_alive: keep_alive::Behaviour, - } - } -} diff --git a/protocols/ping/tests/ping.rs b/protocols/ping/tests/ping.rs index 63836a15a78..946a2daadb6 100644 --- a/protocols/ping/tests/ping.rs +++ b/protocols/ping/tests/ping.rs @@ -21,8 +21,8 @@ //! Integration tests for the `Ping` network behaviour. use libp2p_ping as ping; -use libp2p_swarm::keep_alive; -use libp2p_swarm::{NetworkBehaviour, Swarm, SwarmEvent}; +use libp2p_swarm::dummy; +use libp2p_swarm::{Swarm, SwarmEvent}; use libp2p_swarm_test::SwarmExt; use quickcheck::*; use std::{num::NonZeroU8, time::Duration}; @@ -32,18 +32,16 @@ fn ping_pong() { fn prop(count: NonZeroU8) { let cfg = ping::Config::new().with_interval(Duration::from_millis(10)); - let mut swarm1 = Swarm::new_ephemeral(|_| Behaviour::new(cfg.clone())); - let mut swarm2 = Swarm::new_ephemeral(|_| Behaviour::new(cfg.clone())); + let mut swarm1 = Swarm::new_ephemeral(|_| ping::Behaviour::new(cfg.clone())); + let mut swarm2 = Swarm::new_ephemeral(|_| ping::Behaviour::new(cfg.clone())); async_std::task::block_on(async { swarm1.listen().await; swarm2.connect(&mut swarm1).await; for _ in 0..count.get() { - let (e1, e2) = match libp2p_swarm_test::drive(&mut swarm1, &mut swarm2).await { - ([BehaviourEvent::Ping(e1)], [BehaviourEvent::Ping(e2)]) => (e1, e2), - events => panic!("Unexpected events: {events:?}"), - }; + let ([e1], [e2]): ([ping::Event; 1], [ping::Event; 1]) = + libp2p_swarm_test::drive(&mut swarm1, &mut swarm2).await; assert_eq!(&e1.peer, swarm2.local_peer_id()); assert_eq!(&e2.peer, swarm1.local_peer_id()); @@ -65,8 +63,8 @@ fn assert_ping_rtt_less_than_50ms(e: ping::Event) { #[test] fn unsupported_doesnt_fail() { - let mut swarm1 = Swarm::new_ephemeral(|_| keep_alive::Behaviour); - let mut swarm2 = Swarm::new_ephemeral(|_| Behaviour::new(ping::Config::new())); + let mut swarm1 = Swarm::new_ephemeral(|_| dummy::Behaviour); + let mut swarm2 = Swarm::new_ephemeral(|_| ping::Behaviour::new(ping::Config::new())); let result = async_std::task::block_on(async { swarm1.listen().await; @@ -76,10 +74,10 @@ fn unsupported_doesnt_fail() { loop { match swarm2.next_swarm_event().await { - SwarmEvent::Behaviour(BehaviourEvent::Ping(ping::Event { + SwarmEvent::Behaviour(ping::Event { result: Err(ping::Failure::Unsupported), .. - })) => { + }) => { swarm2.disconnect_peer_id(swarm1_peer_id).unwrap(); } SwarmEvent::ConnectionClosed { cause: Some(e), .. } => { @@ -95,19 +93,3 @@ fn unsupported_doesnt_fail() { result.expect("node with ping should not fail connection due to unsupported protocol"); } - -#[derive(NetworkBehaviour, Default)] -#[behaviour(prelude = "libp2p_swarm::derive_prelude")] -struct Behaviour { - keep_alive: keep_alive::Behaviour, - ping: ping::Behaviour, -} - -impl Behaviour { - fn new(config: ping::Config) -> Self { - Self { - keep_alive: keep_alive::Behaviour, - ping: ping::Behaviour::new(config), - } - } -} diff --git a/swarm-test/src/lib.rs b/swarm-test/src/lib.rs index 0ed8dbce220..819db33ba88 100644 --- a/swarm-test/src/lib.rs +++ b/swarm-test/src/lib.rs @@ -218,7 +218,9 @@ where .timeout(Duration::from_secs(20)) .boxed(); - SwarmBuilder::without_executor(transport, behaviour_fn(identity), peer_id).build() + SwarmBuilder::without_executor(transport, behaviour_fn(identity), peer_id) + .idle_connection_timeout(Duration::from_secs(5)) // Some tests need connections to be kept alive beyond what the individual behaviour configures. + .build() } async fn connect(&mut self, other: &mut Swarm) diff --git a/swarm/CHANGELOG.md b/swarm/CHANGELOG.md index 77ddf3458d1..06ddd740873 100644 --- a/swarm/CHANGELOG.md +++ b/swarm/CHANGELOG.md @@ -6,8 +6,12 @@ - Improve error message when `DialPeerCondition` prevents a dial. See [PR 4409]. +- Introduce `SwarmBuilder::idle_conncetion_timeout` and deprecate `keep_alive::Behaviour` as a result. + See [PR 4161]. + [PR 4426]: https://github.com/libp2p/rust-libp2p/pull/4426 [PR 4409]: https://github.com/libp2p/rust-libp2p/pull/4409 +[PR 4161]: https://github.com/libp2p/rust-libp2p/pull/4161 ## 0.43.3 diff --git a/swarm/Cargo.toml b/swarm/Cargo.toml index d00829ce8b2..86f4c158387 100644 --- a/swarm/Cargo.toml +++ b/swarm/Cargo.toml @@ -55,6 +55,7 @@ quickcheck = { workspace = true } void = "1" once_cell = "1.18.0" trybuild = "1.0.85" +tokio = { version = "1.29.1", features = ["time", "rt", "macros"] } [[test]] name = "swarm_derive" diff --git a/swarm/src/connection.rs b/swarm/src/connection.rs index 3796d9a027d..310cf3a81e3 100644 --- a/swarm/src/connection.rs +++ b/swarm/src/connection.rs @@ -52,6 +52,7 @@ use libp2p_core::upgrade; use libp2p_core::upgrade::{NegotiationError, ProtocolError}; use libp2p_core::Endpoint; use libp2p_identity::PeerId; +use std::cmp::max; use std::collections::HashSet; use std::fmt::{Display, Formatter}; use std::future::Future; @@ -156,6 +157,7 @@ where local_supported_protocols: HashSet, remote_supported_protocols: HashSet, + idle_timeout: Duration, } impl fmt::Debug for Connection @@ -183,9 +185,9 @@ where mut handler: THandler, substream_upgrade_protocol_override: Option, max_negotiating_inbound_streams: usize, + idle_timeout: Duration, ) -> Self { let initial_protocols = gather_supported_protocols(&handler); - if !initial_protocols.is_empty() { handler.on_connection_event(ConnectionEvent::LocalProtocolsChange( ProtocolsChange::Added(ProtocolsAdded::from_set(&initial_protocols)), @@ -203,6 +205,7 @@ where requested_substreams: Default::default(), local_supported_protocols: initial_protocols, remote_supported_protocols: Default::default(), + idle_timeout, } } @@ -234,6 +237,7 @@ where substream_upgrade_protocol_override, local_supported_protocols: supported_protocols, remote_supported_protocols, + idle_timeout, } = self.get_mut(); loop { @@ -348,17 +352,36 @@ where (Shutdown::Later(timer, deadline), KeepAlive::Until(t)) => { if *deadline != t { *deadline = t; - if let Some(dur) = deadline.checked_duration_since(Instant::now()) { - timer.reset(dur) + if let Some(new_duration) = deadline.checked_duration_since(Instant::now()) + { + let effective_keep_alive = max(new_duration, *idle_timeout); + + timer.reset(effective_keep_alive) } } } - (_, KeepAlive::Until(t)) => { - if let Some(dur) = t.checked_duration_since(Instant::now()) { - *shutdown = Shutdown::Later(Delay::new(dur), t) + (_, KeepAlive::Until(earliest_shutdown)) => { + if let Some(requested_keep_alive) = + earliest_shutdown.checked_duration_since(Instant::now()) + { + let effective_keep_alive = max(requested_keep_alive, *idle_timeout); + + // Important: We store the _original_ `Instant` given by the `ConnectionHandler` in the `Later` instance to ensure we can compare it in the above branch. + // This is quite subtle but will hopefully become simpler soon once `KeepAlive::Until` is fully deprecated. See / + *shutdown = + Shutdown::Later(Delay::new(effective_keep_alive), earliest_shutdown) } } - (_, KeepAlive::No) => *shutdown = Shutdown::Asap, + (_, KeepAlive::No) if idle_timeout == &Duration::ZERO => { + *shutdown = Shutdown::Asap; + } + (Shutdown::Later(_, _), KeepAlive::No) => { + // Do nothing, i.e. let the shutdown timer continue to tick. + } + (_, KeepAlive::No) => { + let deadline = Instant::now() + *idle_timeout; + *shutdown = Shutdown::Later(Delay::new(*idle_timeout), deadline); + } (_, KeepAlive::Yes) => *shutdown = Shutdown::None, }; @@ -696,7 +719,7 @@ enum Shutdown { #[cfg(test)] mod tests { use super::*; - use crate::keep_alive; + use crate::dummy; use futures::future; use futures::AsyncRead; use futures::AsyncWrite; @@ -704,6 +727,7 @@ mod tests { use libp2p_core::StreamMuxer; use quickcheck::*; use std::sync::{Arc, Weak}; + use std::time::Instant; use void::Void; #[test] @@ -712,14 +736,14 @@ mod tests { let max_negotiating_inbound_streams: usize = max_negotiating_inbound_streams.into(); let alive_substream_counter = Arc::new(()); - let mut connection = Connection::new( StreamMuxerBox::new(DummyStreamMuxer { counter: alive_substream_counter.clone(), }), - keep_alive::ConnectionHandler, + MockConnectionHandler::new(Duration::ZERO), None, max_negotiating_inbound_streams, + Duration::ZERO, ); let result = connection.poll_noop_waker(); @@ -743,6 +767,7 @@ mod tests { MockConnectionHandler::new(upgrade_timeout), None, 2, + Duration::ZERO, ); connection.handler.open_new_outbound(); @@ -765,6 +790,7 @@ mod tests { ConfigurableProtocolConnectionHandler::default(), None, 0, + Duration::ZERO, ); // First, start listening on a single protocol. @@ -803,6 +829,7 @@ mod tests { ConfigurableProtocolConnectionHandler::default(), None, 0, + Duration::ZERO, ); // First, remote supports a single protocol. @@ -846,6 +873,141 @@ mod tests { assert_eq!(connection.handler.remote_removed, vec![vec!["/bar"]]); } + #[tokio::test] + async fn idle_timeout_with_keep_alive_no() { + let idle_timeout = Duration::from_millis(100); + + let mut connection = Connection::new( + StreamMuxerBox::new(PendingStreamMuxer), + dummy::ConnectionHandler, + None, + 0, + idle_timeout, + ); + + assert!(connection.poll_noop_waker().is_pending()); + + tokio::time::sleep(idle_timeout).await; + + assert!(matches!( + connection.poll_noop_waker(), + Poll::Ready(Err(ConnectionError::KeepAliveTimeout)) + )); + } + + #[tokio::test] + async fn idle_timeout_with_keep_alive_until_greater_than_idle_timeout() { + let idle_timeout = Duration::from_millis(100); + + let mut connection = Connection::new( + StreamMuxerBox::new(PendingStreamMuxer), + KeepAliveUntilConnectionHandler { + until: Instant::now() + idle_timeout * 2, + }, + None, + 0, + idle_timeout, + ); + + assert!(connection.poll_noop_waker().is_pending()); + + tokio::time::sleep(idle_timeout).await; + + assert!( + connection.poll_noop_waker().is_pending(), + "`KeepAlive::Until` is greater than idle-timeout, continue sleeping" + ); + + tokio::time::sleep(idle_timeout).await; + + assert!(matches!( + connection.poll_noop_waker(), + Poll::Ready(Err(ConnectionError::KeepAliveTimeout)) + )); + } + + #[tokio::test] + async fn idle_timeout_with_keep_alive_until_less_than_idle_timeout() { + let idle_timeout = Duration::from_millis(100); + + let mut connection = Connection::new( + StreamMuxerBox::new(PendingStreamMuxer), + KeepAliveUntilConnectionHandler { + until: Instant::now() + idle_timeout / 2, + }, + None, + 0, + idle_timeout, + ); + + assert!(connection.poll_noop_waker().is_pending()); + + tokio::time::sleep(idle_timeout / 2).await; + + assert!( + connection.poll_noop_waker().is_pending(), + "`KeepAlive::Until` is less than idle-timeout, honor idle-timeout" + ); + + tokio::time::sleep(idle_timeout / 2).await; + + assert!(matches!( + connection.poll_noop_waker(), + Poll::Ready(Err(ConnectionError::KeepAliveTimeout)) + )); + } + + struct KeepAliveUntilConnectionHandler { + until: Instant, + } + + impl ConnectionHandler for KeepAliveUntilConnectionHandler { + type FromBehaviour = Void; + type ToBehaviour = Void; + type Error = Void; + type InboundProtocol = DeniedUpgrade; + type OutboundProtocol = DeniedUpgrade; + type InboundOpenInfo = (); + type OutboundOpenInfo = Void; + + fn listen_protocol( + &self, + ) -> SubstreamProtocol { + SubstreamProtocol::new(DeniedUpgrade, ()) + } + + fn connection_keep_alive(&self) -> KeepAlive { + KeepAlive::Until(self.until) + } + + fn poll( + &mut self, + _: &mut Context<'_>, + ) -> Poll< + ConnectionHandlerEvent< + Self::OutboundProtocol, + Self::OutboundOpenInfo, + Self::ToBehaviour, + Self::Error, + >, + > { + Poll::Pending + } + + fn on_behaviour_event(&mut self, _: Self::FromBehaviour) {} + + fn on_connection_event( + &mut self, + _: ConnectionEvent< + Self::InboundProtocol, + Self::OutboundProtocol, + Self::InboundOpenInfo, + Self::OutboundOpenInfo, + >, + ) { + } + } + struct DummyStreamMuxer { counter: Arc<()>, } diff --git a/swarm/src/connection/pool.rs b/swarm/src/connection/pool.rs index e9f7504f529..07fc9075806 100644 --- a/swarm/src/connection/pool.rs +++ b/swarm/src/connection/pool.rs @@ -37,7 +37,7 @@ use futures::{ ready, stream::FuturesUnordered, }; -use instant::Instant; +use instant::{Duration, Instant}; use libp2p_core::connection::Endpoint; use libp2p_core::muxing::{StreamMuxerBox, StreamMuxerExt}; use std::task::Waker; @@ -135,6 +135,9 @@ where /// Receivers for [`NewConnection`] objects that are dropped. new_connection_dropped_listeners: FuturesUnordered>, + + /// How long a connection should be kept alive once it starts idling. + idle_connection_timeout: Duration, } #[derive(Debug)] @@ -322,6 +325,7 @@ where substream_upgrade_protocol_override: config.substream_upgrade_protocol_override, max_negotiating_inbound_streams: config.max_negotiating_inbound_streams, per_connection_event_buffer_size: config.per_connection_event_buffer_size, + idle_connection_timeout: config.idle_connection_timeout, executor, pending_connection_events_tx, pending_connection_events_rx, @@ -518,6 +522,7 @@ where handler, self.substream_upgrade_protocol_override, self.max_negotiating_inbound_streams, + self.idle_connection_timeout, ); self.executor.spawn(task::new_for_established_connection( @@ -947,6 +952,8 @@ pub(crate) struct PoolConfig { pub(crate) per_connection_event_buffer_size: usize, /// Number of addresses concurrently dialed for a single outbound connection attempt. pub(crate) dial_concurrency_factor: NonZeroU8, + /// How long a connection should be kept alive once it is idling. + pub(crate) idle_connection_timeout: Duration, /// The configured override for substream protocol upgrades, if any. substream_upgrade_protocol_override: Option, @@ -963,6 +970,7 @@ impl PoolConfig { task_command_buffer_size: 32, per_connection_event_buffer_size: 7, dial_concurrency_factor: NonZeroU8::new(8).expect("8 > 0"), + idle_connection_timeout: Duration::ZERO, substream_upgrade_protocol_override: None, max_negotiating_inbound_streams: 128, } diff --git a/swarm/src/lib.rs b/swarm/src/lib.rs index 92bc614777e..93238d04da0 100644 --- a/swarm/src/lib.rs +++ b/swarm/src/lib.rs @@ -67,6 +67,9 @@ pub mod behaviour; pub mod dial_opts; pub mod dummy; pub mod handler; +#[deprecated( + note = "Configure an appropriate idle connection timeout via `SwarmBuilder::idle_connection_timeout` instead. To keep connections alive 'forever', use `Duration::from_secs(u64::MAX)`." +)] pub mod keep_alive; mod listen_opts; @@ -146,6 +149,7 @@ use libp2p_identity::PeerId; use smallvec::SmallVec; use std::collections::{HashMap, HashSet}; use std::num::{NonZeroU32, NonZeroU8, NonZeroUsize}; +use std::time::Duration; use std::{ convert::TryFrom, error, fmt, io, @@ -1518,6 +1522,14 @@ where self } + /// How long to keep a connection alive once it is idling. + /// + /// Defaults to 0. + pub fn idle_connection_timeout(mut self, timeout: Duration) -> Self { + self.pool_config.idle_connection_timeout = timeout; + self + } + /// Builds a `Swarm` with the current configuration. pub fn build(self) -> Swarm { log::info!("Local peer id: {}", self.local_peer_id); @@ -1808,6 +1820,7 @@ fn p2p_addr(peer: Option, addr: Multiaddr) -> Result( - handler_proto: T, - ) -> SwarmBuilder>> - where - T: ConnectionHandler + Clone, - T::ToBehaviour: Clone, - O: Send + 'static, - { + fn new_test_swarm( + ) -> SwarmBuilder>> { let id_keys = identity::Keypair::generate_ed25519(); let local_public_key = id_keys.public(); let transport = transport::MemoryTransport::default() @@ -1846,13 +1853,15 @@ mod tests { }) .multiplex(yamux::Config::default()) .boxed(); - let behaviour = CallTraceBehaviour::new(MockBehaviour::new(handler_proto)); - match ThreadPool::new().ok() { + let behaviour = CallTraceBehaviour::new(MockBehaviour::new(dummy::ConnectionHandler)); + let builder = match ThreadPool::new().ok() { Some(tp) => { SwarmBuilder::with_executor(transport, behaviour, local_public_key.into(), tp) } None => SwarmBuilder::without_executor(transport, behaviour, local_public_key.into()), - } + }; + + builder.idle_connection_timeout(Duration::from_secs(5)) } fn swarms_connected( @@ -1903,12 +1912,8 @@ mod tests { /// with pairs of [`FromSwarm::ConnectionEstablished`] / [`FromSwarm::ConnectionClosed`] #[test] fn test_swarm_disconnect() { - // Since the test does not try to open any substreams, we can - // use the dummy protocols handler. - let handler_proto = keep_alive::ConnectionHandler; - - let mut swarm1 = new_test_swarm::<_, ()>(handler_proto.clone()).build(); - let mut swarm2 = new_test_swarm::<_, ()>(handler_proto).build(); + let mut swarm1 = new_test_swarm().build(); + let mut swarm2 = new_test_swarm().build(); let addr1: Multiaddr = multiaddr::Protocol::Memory(rand::random::()).into(); let addr2: Multiaddr = multiaddr::Protocol::Memory(rand::random::()).into(); @@ -1969,12 +1974,8 @@ mod tests { /// with pairs of [`FromSwarm::ConnectionEstablished`] / [`FromSwarm::ConnectionClosed`] #[test] fn test_behaviour_disconnect_all() { - // Since the test does not try to open any substreams, we can - // use the dummy protocols handler. - let handler_proto = keep_alive::ConnectionHandler; - - let mut swarm1 = new_test_swarm::<_, ()>(handler_proto.clone()).build(); - let mut swarm2 = new_test_swarm::<_, ()>(handler_proto).build(); + let mut swarm1 = new_test_swarm().build(); + let mut swarm2 = new_test_swarm().build(); let addr1: Multiaddr = multiaddr::Protocol::Memory(rand::random::()).into(); let addr2: Multiaddr = multiaddr::Protocol::Memory(rand::random::()).into(); @@ -2039,12 +2040,8 @@ mod tests { /// with pairs of [`FromSwarm::ConnectionEstablished`] / [`FromSwarm::ConnectionClosed`] #[test] fn test_behaviour_disconnect_one() { - // Since the test does not try to open any substreams, we can - // use the dummy protocols handler. - let handler_proto = keep_alive::ConnectionHandler; - - let mut swarm1 = new_test_swarm::<_, ()>(handler_proto.clone()).build(); - let mut swarm2 = new_test_swarm::<_, ()>(handler_proto).build(); + let mut swarm1 = new_test_swarm().build(); + let mut swarm2 = new_test_swarm().build(); let addr1: Multiaddr = multiaddr::Protocol::Memory(rand::random::()).into(); let addr2: Multiaddr = multiaddr::Protocol::Memory(rand::random::()).into(); @@ -2122,7 +2119,7 @@ mod tests { fn prop(concurrency_factor: DialConcurrencyFactor) { block_on(async { - let mut swarm = new_test_swarm::<_, ()>(keep_alive::ConnectionHandler) + let mut swarm = new_test_swarm() .dial_concurrency_factor(concurrency_factor.0) .build(); @@ -2190,8 +2187,8 @@ mod tests { // Checks whether dialing an address containing the wrong peer id raises an error // for the expected peer id instead of the obtained peer id. - let mut swarm1 = new_test_swarm::<_, ()>(dummy::ConnectionHandler).build(); - let mut swarm2 = new_test_swarm::<_, ()>(dummy::ConnectionHandler).build(); + let mut swarm1 = new_test_swarm().build(); + let mut swarm2 = new_test_swarm().build(); swarm1.listen_on("/memory/0".parse().unwrap()).unwrap(); @@ -2250,7 +2247,7 @@ mod tests { // // The last two can happen in any order. - let mut swarm = new_test_swarm::<_, ()>(dummy::ConnectionHandler).build(); + let mut swarm = new_test_swarm().build(); swarm.listen_on("/memory/0".parse().unwrap()).unwrap(); let local_address = @@ -2310,7 +2307,7 @@ mod tests { fn dial_self_by_id() { // Trying to dial self by passing the same `PeerId` shouldn't even be possible in the first // place. - let swarm = new_test_swarm::<_, ()>(dummy::ConnectionHandler).build(); + let swarm = new_test_swarm().build(); let peer_id = *swarm.local_peer_id(); assert!(!swarm.is_connected(&peer_id)); } @@ -2321,7 +2318,7 @@ mod tests { let target = PeerId::random(); - let mut swarm = new_test_swarm::<_, ()>(dummy::ConnectionHandler).build(); + let mut swarm = new_test_swarm().build(); let addresses = HashSet::from([ multiaddr![Ip4([0, 0, 0, 0]), Tcp(rand::random::())], @@ -2367,8 +2364,8 @@ mod tests { fn aborting_pending_connection_surfaces_error() { let _ = env_logger::try_init(); - let mut dialer = new_test_swarm::<_, ()>(dummy::ConnectionHandler).build(); - let mut listener = new_test_swarm::<_, ()>(dummy::ConnectionHandler).build(); + let mut dialer = new_test_swarm().build(); + let mut listener = new_test_swarm().build(); let listener_peer_id = *listener.local_peer_id(); listener.listen_on(multiaddr![Memory(0u64)]).unwrap(); diff --git a/transports/pnet/tests/smoke.rs b/transports/pnet/tests/smoke.rs index a7635c00ca3..5e02ed856c6 100644 --- a/transports/pnet/tests/smoke.rs +++ b/transports/pnet/tests/smoke.rs @@ -6,7 +6,7 @@ use libp2p_core::upgrade::Version; use libp2p_core::Transport; use libp2p_core::{multiaddr::Protocol, Multiaddr}; use libp2p_pnet::{PnetConfig, PreSharedKey}; -use libp2p_swarm::{keep_alive, NetworkBehaviour, Swarm, SwarmBuilder, SwarmEvent}; +use libp2p_swarm::{dummy, NetworkBehaviour, Swarm, SwarmBuilder, SwarmEvent}; const TIMEOUT: Duration = Duration::from_secs(5); @@ -98,7 +98,7 @@ where assert_eq!(&outbound_peer_id, swarm1.local_peer_id()); } -fn make_swarm(transport: T, pnet: PnetConfig) -> Swarm +fn make_swarm(transport: T, pnet: PnetConfig) -> Swarm where T: Transport + Send + Unpin + 'static, ::Error: Send + Sync + 'static, @@ -113,12 +113,9 @@ where .authenticate(libp2p_noise::Config::new(&identity).unwrap()) .multiplex(libp2p_yamux::Config::default()) .boxed(); - SwarmBuilder::with_tokio_executor( - transport, - keep_alive::Behaviour, - identity.public().to_peer_id(), - ) - .build() + SwarmBuilder::with_tokio_executor(transport, dummy::Behaviour, identity.public().to_peer_id()) + .idle_connection_timeout(Duration::from_secs(5)) + .build() } async fn listen_on(swarm: &mut Swarm, addr: Multiaddr) -> Multiaddr { diff --git a/transports/tls/tests/smoke.rs b/transports/tls/tests/smoke.rs index 17aa959c4b2..0db39edf280 100644 --- a/transports/tls/tests/smoke.rs +++ b/transports/tls/tests/smoke.rs @@ -3,7 +3,8 @@ use libp2p_core::multiaddr::Protocol; use libp2p_core::transport::MemoryTransport; use libp2p_core::upgrade::Version; use libp2p_core::Transport; -use libp2p_swarm::{keep_alive, Swarm, SwarmBuilder, SwarmEvent}; +use libp2p_swarm::{dummy, Swarm, SwarmBuilder, SwarmEvent}; +use std::time::Duration; #[tokio::test] async fn can_establish_connection() { @@ -55,7 +56,7 @@ async fn can_establish_connection() { assert_eq!(&outbound_peer_id, swarm1.local_peer_id()); } -fn make_swarm() -> Swarm { +fn make_swarm() -> Swarm { let identity = libp2p_identity::Keypair::generate_ed25519(); let transport = MemoryTransport::default() @@ -64,10 +65,7 @@ fn make_swarm() -> Swarm { .multiplex(libp2p_yamux::Config::default()) .boxed(); - SwarmBuilder::without_executor( - transport, - keep_alive::Behaviour, - identity.public().to_peer_id(), - ) - .build() + SwarmBuilder::without_executor(transport, dummy::Behaviour, identity.public().to_peer_id()) + .idle_connection_timeout(Duration::from_secs(5)) + .build() } diff --git a/transports/webrtc/examples/listen_ping.rs b/transports/webrtc/examples/listen_ping.rs index 8475195a1ce..ad867f26db0 100644 --- a/transports/webrtc/examples/listen_ping.rs +++ b/transports/webrtc/examples/listen_ping.rs @@ -4,8 +4,9 @@ use libp2p_core::muxing::StreamMuxerBox; use libp2p_core::Transport; use libp2p_identity as identity; use libp2p_ping as ping; -use libp2p_swarm::{keep_alive, NetworkBehaviour, Swarm, SwarmBuilder}; +use libp2p_swarm::{Swarm, SwarmBuilder}; use rand::thread_rng; +use std::time::Duration; use void::Void; /// An example WebRTC server that will accept connections and run the ping protocol on them. @@ -21,7 +22,7 @@ async fn main() -> Result<()> { } } -fn create_swarm() -> Result> { +fn create_swarm() -> Result> { let id_keys = identity::Keypair::generate_ed25519(); let peer_id = id_keys.public().to_peer_id(); let transport = libp2p_webrtc::tokio::Transport::new( @@ -29,18 +30,16 @@ fn create_swarm() -> Result> { libp2p_webrtc::tokio::Certificate::generate(&mut thread_rng())?, ); + let cfg = ping::Config::new().with_interval(Duration::from_millis(10)); let transport = transport .map(|(peer_id, conn), _| (peer_id, StreamMuxerBox::new(conn))) .boxed(); - Ok(SwarmBuilder::with_tokio_executor(transport, Behaviour::default(), peer_id).build()) -} - -#[derive(NetworkBehaviour, Default)] -#[behaviour(to_swarm = "Event", prelude = "libp2p_swarm::derive_prelude")] -struct Behaviour { - ping: ping::Behaviour, - keep_alive: keep_alive::Behaviour, + Ok( + SwarmBuilder::with_tokio_executor(transport, ping::Behaviour::new(cfg.clone()), peer_id) + .idle_connection_timeout(Duration::from_secs(5)) + .build(), + ) } #[derive(Debug)]