From 073cee1708523fd57fb381f0411843a5475e0414 Mon Sep 17 00:00:00 2001 From: startup-dreamer Date: Sat, 8 Jul 2023 17:36:39 +0530 Subject: [PATCH 01/26] added idle_timeout and test for swarm builder --- swarm/src/connection.rs | 42 ++++++++++++++++++++++++++++++++++-- swarm/src/connection/pool.rs | 4 +++- swarm/src/lib.rs | 2 +- 3 files changed, 44 insertions(+), 4 deletions(-) diff --git a/swarm/src/connection.rs b/swarm/src/connection.rs index 6646967f590..dcb6b1bcc27 100644 --- a/swarm/src/connection.rs +++ b/swarm/src/connection.rs @@ -149,6 +149,7 @@ where local_supported_protocols: HashSet, remote_supported_protocols: HashSet, + idle_timeout: Duration, } impl fmt::Debug for Connection @@ -176,15 +177,16 @@ where mut handler: THandler, substream_upgrade_protocol_override: Option, max_negotiating_inbound_streams: usize, + idle_timeout: Option, ) -> Self { let initial_protocols = gather_supported_protocols(&handler); - if !initial_protocols.is_empty() { handler.on_connection_event(ConnectionEvent::LocalProtocolsChange( ProtocolsChange::Added(ProtocolsAdded::from_set(&initial_protocols)), )); } + let timeout = idle_timeout.unwrap_or_else(|| Duration::new(0, 0)); Connection { muxing: muxer, handler, @@ -196,6 +198,7 @@ where requested_substreams: Default::default(), local_supported_protocols: initial_protocols, remote_supported_protocols: Default::default(), + idle_timeout: timeout, } } @@ -227,6 +230,7 @@ where substream_upgrade_protocol_override, local_supported_protocols: supported_protocols, remote_supported_protocols, + idle_timeout, } = self.get_mut(); loop { @@ -351,7 +355,16 @@ where *shutdown = Shutdown::Later(Delay::new(dur), t) } } - (_, KeepAlive::No) => *shutdown = Shutdown::Asap, + (_, KeepAlive::No) => { + // handle idle_timeout + let duration = *idle_timeout; // Default timeout is 0 seconds + if duration > Duration::new(0, 0) { + let deadline = Instant::now() + duration; + *shutdown = Shutdown::Later(Delay::new(duration), deadline); + } else { + *shutdown = Shutdown::Asap; + } + } (_, KeepAlive::Yes) => *shutdown = Shutdown::None, }; @@ -713,6 +726,7 @@ mod tests { keep_alive::ConnectionHandler, None, max_negotiating_inbound_streams, + None, ); let result = connection.poll_noop_waker(); @@ -736,6 +750,7 @@ mod tests { MockConnectionHandler::new(upgrade_timeout), None, 2, + None, ); connection.handler.open_new_outbound(); @@ -750,6 +765,27 @@ mod tests { StreamUpgradeError::Timeout )) } + + #[test] + fn test_idle_timeout() { + // Create a custom idle timeout + let idle_timeout = Duration::from_secs(5); + + let mut connection = Connection::new( + StreamMuxerBox::new(PendingStreamMuxer), + ConfigurableProtocolConnectionHandler::default(), + None, + 0, + Some(idle_timeout), + ); + + // Create a mock context and pin the connection + let mut cx = Context::from_waker(futures::task::noop_waker_ref()); + let connection = Pin::new(&mut connection); + + let poll_result = connection.poll(&mut cx); + assert!(poll_result.is_pending()); + } #[test] fn propagates_changes_to_supported_inbound_protocols() { @@ -758,6 +794,7 @@ mod tests { ConfigurableProtocolConnectionHandler::default(), None, 0, + None, ); // First, start listening on a single protocol. @@ -796,6 +833,7 @@ mod tests { ConfigurableProtocolConnectionHandler::default(), None, 0, + None, ); // First, remote supports a single protocol. diff --git a/swarm/src/connection/pool.rs b/swarm/src/connection/pool.rs index e9f7504f529..e34dc645d5d 100644 --- a/swarm/src/connection/pool.rs +++ b/swarm/src/connection/pool.rs @@ -37,7 +37,7 @@ use futures::{ ready, stream::FuturesUnordered, }; -use instant::Instant; +use instant::{Duration, Instant}; use libp2p_core::connection::Endpoint; use libp2p_core::muxing::{StreamMuxerBox, StreamMuxerExt}; use std::task::Waker; @@ -492,6 +492,7 @@ where endpoint: &ConnectedPoint, connection: NewConnection, handler: THandler, + idle_timeout: Option, ) { let connection = connection.extract(); @@ -518,6 +519,7 @@ where handler, self.substream_upgrade_protocol_override, self.max_negotiating_inbound_streams, + idle_timeout, ); self.executor.spawn(task::new_for_established_connection( diff --git a/swarm/src/lib.rs b/swarm/src/lib.rs index 36fe082f3ff..ce2d1df3de9 100644 --- a/swarm/src/lib.rs +++ b/swarm/src/lib.rs @@ -736,7 +736,7 @@ where .expect("n + 1 is always non-zero; qed"); self.pool - .spawn_connection(id, peer_id, &endpoint, connection, handler); + .spawn_connection(id, peer_id, &endpoint, connection, handler, None); log::debug!( "Connection established: {:?} {:?}; Total (peer): {}.", From 459cb4d04f979b4a7d4d417273a7995bec26dbbb Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Mon, 31 Jul 2023 14:29:51 +0200 Subject: [PATCH 02/26] Allow configuration of idle timeout via `SwarmBuilder` --- swarm/src/connection.rs | 18 ++++++++---------- swarm/src/connection/pool.rs | 10 ++++++++-- swarm/src/lib.rs | 11 ++++++++++- 3 files changed, 26 insertions(+), 13 deletions(-) diff --git a/swarm/src/connection.rs b/swarm/src/connection.rs index dcb6b1bcc27..ee2440cc244 100644 --- a/swarm/src/connection.rs +++ b/swarm/src/connection.rs @@ -177,7 +177,7 @@ where mut handler: THandler, substream_upgrade_protocol_override: Option, max_negotiating_inbound_streams: usize, - idle_timeout: Option, + idle_timeout: Duration, ) -> Self { let initial_protocols = gather_supported_protocols(&handler); if !initial_protocols.is_empty() { @@ -186,7 +186,6 @@ where )); } - let timeout = idle_timeout.unwrap_or_else(|| Duration::new(0, 0)); Connection { muxing: muxer, handler, @@ -198,7 +197,7 @@ where requested_substreams: Default::default(), local_supported_protocols: initial_protocols, remote_supported_protocols: Default::default(), - idle_timeout: timeout, + idle_timeout, } } @@ -726,7 +725,7 @@ mod tests { keep_alive::ConnectionHandler, None, max_negotiating_inbound_streams, - None, + Duration::ZERO, ); let result = connection.poll_noop_waker(); @@ -750,7 +749,7 @@ mod tests { MockConnectionHandler::new(upgrade_timeout), None, 2, - None, + Duration::ZERO, ); connection.handler.open_new_outbound(); @@ -765,18 +764,17 @@ mod tests { StreamUpgradeError::Timeout )) } - + #[test] fn test_idle_timeout() { // Create a custom idle timeout - let idle_timeout = Duration::from_secs(5); let mut connection = Connection::new( StreamMuxerBox::new(PendingStreamMuxer), ConfigurableProtocolConnectionHandler::default(), None, 0, - Some(idle_timeout), + Duration::from_secs(5), ); // Create a mock context and pin the connection @@ -794,7 +792,7 @@ mod tests { ConfigurableProtocolConnectionHandler::default(), None, 0, - None, + Duration::ZERO, ); // First, start listening on a single protocol. @@ -833,7 +831,7 @@ mod tests { ConfigurableProtocolConnectionHandler::default(), None, 0, - None, + Duration::ZERO, ); // First, remote supports a single protocol. diff --git a/swarm/src/connection/pool.rs b/swarm/src/connection/pool.rs index e34dc645d5d..07fc9075806 100644 --- a/swarm/src/connection/pool.rs +++ b/swarm/src/connection/pool.rs @@ -135,6 +135,9 @@ where /// Receivers for [`NewConnection`] objects that are dropped. new_connection_dropped_listeners: FuturesUnordered>, + + /// How long a connection should be kept alive once it starts idling. + idle_connection_timeout: Duration, } #[derive(Debug)] @@ -322,6 +325,7 @@ where substream_upgrade_protocol_override: config.substream_upgrade_protocol_override, max_negotiating_inbound_streams: config.max_negotiating_inbound_streams, per_connection_event_buffer_size: config.per_connection_event_buffer_size, + idle_connection_timeout: config.idle_connection_timeout, executor, pending_connection_events_tx, pending_connection_events_rx, @@ -492,7 +496,6 @@ where endpoint: &ConnectedPoint, connection: NewConnection, handler: THandler, - idle_timeout: Option, ) { let connection = connection.extract(); @@ -519,7 +522,7 @@ where handler, self.substream_upgrade_protocol_override, self.max_negotiating_inbound_streams, - idle_timeout, + self.idle_connection_timeout, ); self.executor.spawn(task::new_for_established_connection( @@ -949,6 +952,8 @@ pub(crate) struct PoolConfig { pub(crate) per_connection_event_buffer_size: usize, /// Number of addresses concurrently dialed for a single outbound connection attempt. pub(crate) dial_concurrency_factor: NonZeroU8, + /// How long a connection should be kept alive once it is idling. + pub(crate) idle_connection_timeout: Duration, /// The configured override for substream protocol upgrades, if any. substream_upgrade_protocol_override: Option, @@ -965,6 +970,7 @@ impl PoolConfig { task_command_buffer_size: 32, per_connection_event_buffer_size: 7, dial_concurrency_factor: NonZeroU8::new(8).expect("8 > 0"), + idle_connection_timeout: Duration::ZERO, substream_upgrade_protocol_override: None, max_negotiating_inbound_streams: 128, } diff --git a/swarm/src/lib.rs b/swarm/src/lib.rs index ce2d1df3de9..aecaf75567f 100644 --- a/swarm/src/lib.rs +++ b/swarm/src/lib.rs @@ -146,6 +146,7 @@ use libp2p_identity::PeerId; use smallvec::SmallVec; use std::collections::{HashMap, HashSet}; use std::num::{NonZeroU32, NonZeroU8, NonZeroUsize}; +use std::time::Duration; use std::{ convert::TryFrom, error, fmt, io, @@ -736,7 +737,7 @@ where .expect("n + 1 is always non-zero; qed"); self.pool - .spawn_connection(id, peer_id, &endpoint, connection, handler, None); + .spawn_connection(id, peer_id, &endpoint, connection, handler); log::debug!( "Connection established: {:?} {:?}; Total (peer): {}.", @@ -1515,6 +1516,14 @@ where self } + /// How long to keep a connection alive once it is idling. + /// + /// Defaults to 0. + pub fn idle_connection_timeout(mut self, timeout: Duration) -> Self { + self.pool_config.idle_connection_timeout = timeout; + self + } + /// Builds a `Swarm` with the current configuration. pub fn build(self) -> Swarm { Swarm { From 6134c5611bfc2cd6ca607a20fb3112ad0433ace1 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Mon, 31 Jul 2023 14:38:33 +0200 Subject: [PATCH 03/26] Set `idle_connection_timeout` in `Swarm::new_ephemeral` and rewrite ping tests --- protocols/ping/tests/ping.rs | 38 ++++++++++-------------------------- swarm-test/src/lib.rs | 4 +++- swarm/src/keep_alive.rs | 2 ++ 3 files changed, 15 insertions(+), 29 deletions(-) diff --git a/protocols/ping/tests/ping.rs b/protocols/ping/tests/ping.rs index 63836a15a78..946a2daadb6 100644 --- a/protocols/ping/tests/ping.rs +++ b/protocols/ping/tests/ping.rs @@ -21,8 +21,8 @@ //! Integration tests for the `Ping` network behaviour. use libp2p_ping as ping; -use libp2p_swarm::keep_alive; -use libp2p_swarm::{NetworkBehaviour, Swarm, SwarmEvent}; +use libp2p_swarm::dummy; +use libp2p_swarm::{Swarm, SwarmEvent}; use libp2p_swarm_test::SwarmExt; use quickcheck::*; use std::{num::NonZeroU8, time::Duration}; @@ -32,18 +32,16 @@ fn ping_pong() { fn prop(count: NonZeroU8) { let cfg = ping::Config::new().with_interval(Duration::from_millis(10)); - let mut swarm1 = Swarm::new_ephemeral(|_| Behaviour::new(cfg.clone())); - let mut swarm2 = Swarm::new_ephemeral(|_| Behaviour::new(cfg.clone())); + let mut swarm1 = Swarm::new_ephemeral(|_| ping::Behaviour::new(cfg.clone())); + let mut swarm2 = Swarm::new_ephemeral(|_| ping::Behaviour::new(cfg.clone())); async_std::task::block_on(async { swarm1.listen().await; swarm2.connect(&mut swarm1).await; for _ in 0..count.get() { - let (e1, e2) = match libp2p_swarm_test::drive(&mut swarm1, &mut swarm2).await { - ([BehaviourEvent::Ping(e1)], [BehaviourEvent::Ping(e2)]) => (e1, e2), - events => panic!("Unexpected events: {events:?}"), - }; + let ([e1], [e2]): ([ping::Event; 1], [ping::Event; 1]) = + libp2p_swarm_test::drive(&mut swarm1, &mut swarm2).await; assert_eq!(&e1.peer, swarm2.local_peer_id()); assert_eq!(&e2.peer, swarm1.local_peer_id()); @@ -65,8 +63,8 @@ fn assert_ping_rtt_less_than_50ms(e: ping::Event) { #[test] fn unsupported_doesnt_fail() { - let mut swarm1 = Swarm::new_ephemeral(|_| keep_alive::Behaviour); - let mut swarm2 = Swarm::new_ephemeral(|_| Behaviour::new(ping::Config::new())); + let mut swarm1 = Swarm::new_ephemeral(|_| dummy::Behaviour); + let mut swarm2 = Swarm::new_ephemeral(|_| ping::Behaviour::new(ping::Config::new())); let result = async_std::task::block_on(async { swarm1.listen().await; @@ -76,10 +74,10 @@ fn unsupported_doesnt_fail() { loop { match swarm2.next_swarm_event().await { - SwarmEvent::Behaviour(BehaviourEvent::Ping(ping::Event { + SwarmEvent::Behaviour(ping::Event { result: Err(ping::Failure::Unsupported), .. - })) => { + }) => { swarm2.disconnect_peer_id(swarm1_peer_id).unwrap(); } SwarmEvent::ConnectionClosed { cause: Some(e), .. } => { @@ -95,19 +93,3 @@ fn unsupported_doesnt_fail() { result.expect("node with ping should not fail connection due to unsupported protocol"); } - -#[derive(NetworkBehaviour, Default)] -#[behaviour(prelude = "libp2p_swarm::derive_prelude")] -struct Behaviour { - keep_alive: keep_alive::Behaviour, - ping: ping::Behaviour, -} - -impl Behaviour { - fn new(config: ping::Config) -> Self { - Self { - keep_alive: keep_alive::Behaviour, - ping: ping::Behaviour::new(config), - } - } -} diff --git a/swarm-test/src/lib.rs b/swarm-test/src/lib.rs index 0ed8dbce220..819db33ba88 100644 --- a/swarm-test/src/lib.rs +++ b/swarm-test/src/lib.rs @@ -218,7 +218,9 @@ where .timeout(Duration::from_secs(20)) .boxed(); - SwarmBuilder::without_executor(transport, behaviour_fn(identity), peer_id).build() + SwarmBuilder::without_executor(transport, behaviour_fn(identity), peer_id) + .idle_connection_timeout(Duration::from_secs(5)) // Some tests need connections to be kept alive beyond what the individual behaviour configures. + .build() } async fn connect(&mut self, other: &mut Swarm) diff --git a/swarm/src/keep_alive.rs b/swarm/src/keep_alive.rs index 05cbcdf7b8c..e8fd1d6bf06 100644 --- a/swarm/src/keep_alive.rs +++ b/swarm/src/keep_alive.rs @@ -18,8 +18,10 @@ use void::Void; /// they are not in use. Connections can also fail at any time so really, your application should be /// designed to establish them when necessary, making the use of this behaviour likely redundant. #[derive(Default)] +#[deprecated(note = "Configure a very large idle timeout via `SwarmBuilder")] pub struct Behaviour; +#[allow(deprecated)] impl NetworkBehaviour for Behaviour { type ConnectionHandler = ConnectionHandler; type ToSwarm = Void; From 837a038248f1c10b18a6181d778976e505a3366a Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Mon, 31 Jul 2023 14:50:01 +0200 Subject: [PATCH 04/26] Fix deprecation message --- swarm/src/keep_alive.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/swarm/src/keep_alive.rs b/swarm/src/keep_alive.rs index e8fd1d6bf06..56111ed0fb3 100644 --- a/swarm/src/keep_alive.rs +++ b/swarm/src/keep_alive.rs @@ -18,7 +18,9 @@ use void::Void; /// they are not in use. Connections can also fail at any time so really, your application should be /// designed to establish them when necessary, making the use of this behaviour likely redundant. #[derive(Default)] -#[deprecated(note = "Configure a very large idle timeout via `SwarmBuilder")] +#[deprecated( + note = "Configure an appropriate idle connection timeout via `SwarmBuilder::idle_connection_timeout` instead." +)] pub struct Behaviour; #[allow(deprecated)] From 647e2730e469999e725641b3c15a9db96fa605f9 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Mon, 31 Jul 2023 14:50:59 +0200 Subject: [PATCH 05/26] Move deprecation attribute to module declaration instead --- swarm/src/keep_alive.rs | 4 ---- swarm/src/lib.rs | 3 +++ 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/swarm/src/keep_alive.rs b/swarm/src/keep_alive.rs index 56111ed0fb3..05cbcdf7b8c 100644 --- a/swarm/src/keep_alive.rs +++ b/swarm/src/keep_alive.rs @@ -18,12 +18,8 @@ use void::Void; /// they are not in use. Connections can also fail at any time so really, your application should be /// designed to establish them when necessary, making the use of this behaviour likely redundant. #[derive(Default)] -#[deprecated( - note = "Configure an appropriate idle connection timeout via `SwarmBuilder::idle_connection_timeout` instead." -)] pub struct Behaviour; -#[allow(deprecated)] impl NetworkBehaviour for Behaviour { type ConnectionHandler = ConnectionHandler; type ToSwarm = Void; diff --git a/swarm/src/lib.rs b/swarm/src/lib.rs index aecaf75567f..a386818ac19 100644 --- a/swarm/src/lib.rs +++ b/swarm/src/lib.rs @@ -67,6 +67,9 @@ pub mod behaviour; pub mod dial_opts; pub mod dummy; pub mod handler; +#[deprecated( + note = "Configure an appropriate idle connection timeout via `SwarmBuilder::idle_connection_timeout` instead." +)] pub mod keep_alive; mod listen_opts; From 3dcec1494cd9e811746ba8cb03736365f9792610 Mon Sep 17 00:00:00 2001 From: Sumit kumar <106421807+startup-dreamer@users.noreply.github.com> Date: Fri, 4 Aug 2023 07:55:01 +0000 Subject: [PATCH 06/26] refactor: deprecated keep_alive in transports tls smoke test --- transports/tls/tests/smoke.rs | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/transports/tls/tests/smoke.rs b/transports/tls/tests/smoke.rs index 17aa959c4b2..3f85e9e9767 100644 --- a/transports/tls/tests/smoke.rs +++ b/transports/tls/tests/smoke.rs @@ -3,7 +3,9 @@ use libp2p_core::multiaddr::Protocol; use libp2p_core::transport::MemoryTransport; use libp2p_core::upgrade::Version; use libp2p_core::Transport; -use libp2p_swarm::{keep_alive, Swarm, SwarmBuilder, SwarmEvent}; +use libp2p_swarm::dummy::Behaviour; +use libp2p_swarm::{Swarm, SwarmBuilder, SwarmEvent}; +use std::time::Duration; #[tokio::test] async fn can_establish_connection() { @@ -55,7 +57,7 @@ async fn can_establish_connection() { assert_eq!(&outbound_peer_id, swarm1.local_peer_id()); } -fn make_swarm() -> Swarm { +fn make_swarm() -> Swarm { let identity = libp2p_identity::Keypair::generate_ed25519(); let transport = MemoryTransport::default() @@ -64,10 +66,7 @@ fn make_swarm() -> Swarm { .multiplex(libp2p_yamux::Config::default()) .boxed(); - SwarmBuilder::without_executor( - transport, - keep_alive::Behaviour, - identity.public().to_peer_id(), - ) - .build() + SwarmBuilder::without_executor(transport, Behaviour, identity.public().to_peer_id()) + .idle_connection_timeout(Duration::from_secs(5)) + .build() } From fdc555b6897ef020c185f4677dba8cce4b17f807 Mon Sep 17 00:00:00 2001 From: Sumit kumar <106421807+startup-dreamer@users.noreply.github.com> Date: Fri, 4 Aug 2023 18:42:22 +0000 Subject: [PATCH 07/26] refactor: deprecated keep_alive in interop-test suite --- interop-tests/src/lib.rs | 22 +++++++--------------- 1 file changed, 7 insertions(+), 15 deletions(-) diff --git a/interop-tests/src/lib.rs b/interop-tests/src/lib.rs index beb7c91c63d..7d111b8724b 100644 --- a/interop-tests/src/lib.rs +++ b/interop-tests/src/lib.rs @@ -3,7 +3,7 @@ use std::time::Duration; use anyhow::{bail, Context, Result}; use futures::{FutureExt, StreamExt}; -use libp2p::swarm::{keep_alive, NetworkBehaviour, SwarmEvent}; +use libp2p::swarm::SwarmEvent; use libp2p::{identity, ping, Multiaddr, PeerId}; #[cfg(target_arch = "wasm32")] use wasm_bindgen::prelude::*; @@ -27,17 +27,16 @@ pub async fn run_test( let local_key = identity::Keypair::generate_ed25519(); let local_peer_id = PeerId::from(local_key.public()); let redis_client = RedisClient::new(redis_addr).context("Could not connect to redis")?; + let cfg = ping::Config::new().with_interval(Duration::from_millis(10)); // Build the transport from the passed ENV var. let (boxed_transport, local_addr) = build_transport(local_key, ip, transport)?; let mut swarm = swarm_builder( boxed_transport, - Behaviour { - ping: ping::Behaviour::new(ping::Config::new().with_interval(Duration::from_secs(1))), - keep_alive: keep_alive::Behaviour, - }, + ping::Behaviour::new(cfg.clone()), local_peer_id, ) + .idle_connection_timeout(Duration::from_secs(5)) .build(); log::info!("Running ping test: {}", swarm.local_peer_id()); @@ -67,10 +66,9 @@ pub async fn run_test( log::info!("Test instance, dialing multiaddress on: {}.", other); let rtt = loop { - if let Some(SwarmEvent::Behaviour(BehaviourEvent::Ping(ping::Event { - result: Ok(rtt), - .. - }))) = swarm.next().await + if let Some(SwarmEvent::Behaviour(ping::Event { + result: Ok(rtt), .. + })) = swarm.next().await { log::info!("Ping successful: {rtt:?}"); break rtt.as_micros() as f32 / 1000.; @@ -233,12 +231,6 @@ impl FromStr for SecProtocol { } } -#[derive(NetworkBehaviour)] -struct Behaviour { - ping: ping::Behaviour, - keep_alive: keep_alive::Behaviour, -} - /// Helper function to get a ENV variable into an test parameter like `Transport`. pub fn from_env(env_var: &str) -> Result where From d4eea1217cbbd625e372ec9905d79876fd9924c1 Mon Sep 17 00:00:00 2001 From: Sumit kumar <106421807+startup-dreamer@users.noreply.github.com> Date: Fri, 4 Aug 2023 20:43:52 +0000 Subject: [PATCH 08/26] refactor: deprecated keep_alive in transport webrtc example --- transports/webrtc/examples/listen_ping.rs | 19 +++++++++---------- 1 file changed, 9 insertions(+), 10 deletions(-) diff --git a/transports/webrtc/examples/listen_ping.rs b/transports/webrtc/examples/listen_ping.rs index 8475195a1ce..ad867f26db0 100644 --- a/transports/webrtc/examples/listen_ping.rs +++ b/transports/webrtc/examples/listen_ping.rs @@ -4,8 +4,9 @@ use libp2p_core::muxing::StreamMuxerBox; use libp2p_core::Transport; use libp2p_identity as identity; use libp2p_ping as ping; -use libp2p_swarm::{keep_alive, NetworkBehaviour, Swarm, SwarmBuilder}; +use libp2p_swarm::{Swarm, SwarmBuilder}; use rand::thread_rng; +use std::time::Duration; use void::Void; /// An example WebRTC server that will accept connections and run the ping protocol on them. @@ -21,7 +22,7 @@ async fn main() -> Result<()> { } } -fn create_swarm() -> Result> { +fn create_swarm() -> Result> { let id_keys = identity::Keypair::generate_ed25519(); let peer_id = id_keys.public().to_peer_id(); let transport = libp2p_webrtc::tokio::Transport::new( @@ -29,18 +30,16 @@ fn create_swarm() -> Result> { libp2p_webrtc::tokio::Certificate::generate(&mut thread_rng())?, ); + let cfg = ping::Config::new().with_interval(Duration::from_millis(10)); let transport = transport .map(|(peer_id, conn), _| (peer_id, StreamMuxerBox::new(conn))) .boxed(); - Ok(SwarmBuilder::with_tokio_executor(transport, Behaviour::default(), peer_id).build()) -} - -#[derive(NetworkBehaviour, Default)] -#[behaviour(to_swarm = "Event", prelude = "libp2p_swarm::derive_prelude")] -struct Behaviour { - ping: ping::Behaviour, - keep_alive: keep_alive::Behaviour, + Ok( + SwarmBuilder::with_tokio_executor(transport, ping::Behaviour::new(cfg.clone()), peer_id) + .idle_connection_timeout(Duration::from_secs(5)) + .build(), + ) } #[derive(Debug)] From 42b9e3bf7e5c9156a550e4e7019736e44f12ee77 Mon Sep 17 00:00:00 2001 From: Sumit kumar <106421807+startup-dreamer@users.noreply.github.com> Date: Sun, 6 Aug 2023 04:27:08 +0000 Subject: [PATCH 09/26] fix: idle timeout test case and added zero constant in comparison --- swarm/src/connection.rs | 15 +++++---------- 1 file changed, 5 insertions(+), 10 deletions(-) diff --git a/swarm/src/connection.rs b/swarm/src/connection.rs index ee2440cc244..a8738789078 100644 --- a/swarm/src/connection.rs +++ b/swarm/src/connection.rs @@ -357,7 +357,7 @@ where (_, KeepAlive::No) => { // handle idle_timeout let duration = *idle_timeout; // Default timeout is 0 seconds - if duration > Duration::new(0, 0) { + if duration > Duration::ZERO { let deadline = Instant::now() + duration; *shutdown = Shutdown::Later(Delay::new(duration), deadline); } else { @@ -701,7 +701,7 @@ enum Shutdown { #[cfg(test)] mod tests { use super::*; - use crate::keep_alive; + use crate::dummy; use futures::future; use futures::AsyncRead; use futures::AsyncWrite; @@ -717,12 +717,11 @@ mod tests { let max_negotiating_inbound_streams: usize = max_negotiating_inbound_streams.into(); let alive_substream_counter = Arc::new(()); - let mut connection = Connection::new( StreamMuxerBox::new(DummyStreamMuxer { counter: alive_substream_counter.clone(), }), - keep_alive::ConnectionHandler, + MockConnectionHandler::new(Duration::ZERO), None, max_negotiating_inbound_streams, Duration::ZERO, @@ -771,17 +770,13 @@ mod tests { let mut connection = Connection::new( StreamMuxerBox::new(PendingStreamMuxer), - ConfigurableProtocolConnectionHandler::default(), + dummy::ConnectionHandler, None, 0, Duration::from_secs(5), ); - // Create a mock context and pin the connection - let mut cx = Context::from_waker(futures::task::noop_waker_ref()); - let connection = Pin::new(&mut connection); - - let poll_result = connection.poll(&mut cx); + let poll_result = connection.poll_noop_waker(); assert!(poll_result.is_pending()); } From f91d242890abbae8606f5ab4ee4916ce7f9967e5 Mon Sep 17 00:00:00 2001 From: Sumit kumar <106421807+startup-dreamer@users.noreply.github.com> Date: Sun, 6 Aug 2023 08:26:18 +0000 Subject: [PATCH 10/26] refactor: deprecated keep_alive from swarm lib tests --- swarm/src/lib.rs | 38 ++++++++++++++++++++++++-------------- 1 file changed, 24 insertions(+), 14 deletions(-) diff --git a/swarm/src/lib.rs b/swarm/src/lib.rs index a386818ac19..decfc4242bb 100644 --- a/swarm/src/lib.rs +++ b/swarm/src/lib.rs @@ -67,10 +67,6 @@ pub mod behaviour; pub mod dial_opts; pub mod dummy; pub mod handler; -#[deprecated( - note = "Configure an appropriate idle connection timeout via `SwarmBuilder::idle_connection_timeout` instead." -)] -pub mod keep_alive; mod listen_opts; /// Bundles all symbols required for the [`libp2p_swarm_derive::NetworkBehaviour`] macro. @@ -1816,6 +1812,7 @@ fn p2p_addr(peer: Option, addr: Multiaddr) -> Result(handler_proto.clone()).build(); - let mut swarm2 = new_test_swarm::<_, ()>(handler_proto).build(); + let mut swarm1 = new_test_swarm::<_, ()>(handler_proto.clone()) + .idle_connection_timeout(Duration::from_secs(5)) + .build(); + let mut swarm2 = new_test_swarm::<_, ()>(handler_proto) + .idle_connection_timeout(Duration::from_secs(5)) + .build(); let addr1: Multiaddr = multiaddr::Protocol::Memory(rand::random::()).into(); let addr2: Multiaddr = multiaddr::Protocol::Memory(rand::random::()).into(); @@ -1979,10 +1980,14 @@ mod tests { fn test_behaviour_disconnect_all() { // Since the test does not try to open any substreams, we can // use the dummy protocols handler. - let handler_proto = keep_alive::ConnectionHandler; + let handler_proto = dummy::ConnectionHandler; - let mut swarm1 = new_test_swarm::<_, ()>(handler_proto.clone()).build(); - let mut swarm2 = new_test_swarm::<_, ()>(handler_proto).build(); + let mut swarm1 = new_test_swarm::<_, ()>(handler_proto.clone()) + .idle_connection_timeout(Duration::from_secs(5)) + .build(); + let mut swarm2 = new_test_swarm::<_, ()>(handler_proto) + .idle_connection_timeout(Duration::from_secs(5)) + .build(); let addr1: Multiaddr = multiaddr::Protocol::Memory(rand::random::()).into(); let addr2: Multiaddr = multiaddr::Protocol::Memory(rand::random::()).into(); @@ -2049,10 +2054,14 @@ mod tests { fn test_behaviour_disconnect_one() { // Since the test does not try to open any substreams, we can // use the dummy protocols handler. - let handler_proto = keep_alive::ConnectionHandler; + let handler_proto = dummy::ConnectionHandler; - let mut swarm1 = new_test_swarm::<_, ()>(handler_proto.clone()).build(); - let mut swarm2 = new_test_swarm::<_, ()>(handler_proto).build(); + let mut swarm1 = new_test_swarm::<_, ()>(handler_proto.clone()) + .idle_connection_timeout(Duration::from_secs(5)) + .build(); + let mut swarm2 = new_test_swarm::<_, ()>(handler_proto) + .idle_connection_timeout(Duration::from_secs(5)) + .build(); let addr1: Multiaddr = multiaddr::Protocol::Memory(rand::random::()).into(); let addr2: Multiaddr = multiaddr::Protocol::Memory(rand::random::()).into(); @@ -2130,7 +2139,8 @@ mod tests { fn prop(concurrency_factor: DialConcurrencyFactor) { block_on(async { - let mut swarm = new_test_swarm::<_, ()>(keep_alive::ConnectionHandler) + let mut swarm = new_test_swarm::<_, ()>(dummy::ConnectionHandler) + .idle_connection_timeout(Duration::from_secs(5)) .dial_concurrency_factor(concurrency_factor.0) .build(); From 5cfd7f5ae7375f66a91ef59c7acab76ca270996b Mon Sep 17 00:00:00 2001 From: Sumit kumar <106421807+startup-dreamer@users.noreply.github.com> Date: Mon, 7 Aug 2023 07:32:12 +0000 Subject: [PATCH 11/26] refactor: deprecated keep_alive from the rest of the tests --- examples/metrics/src/main.rs | 10 ++++++---- examples/ping-example/src/main.rs | 16 ++++------------ examples/rendezvous/src/bin/rzv-discover.rs | 7 ++++--- examples/rendezvous/src/bin/rzv-identify.rs | 7 ++++--- examples/rendezvous/src/bin/rzv-register.rs | 7 ++++--- examples/rendezvous/src/main.rs | 7 ++++--- misc/allow-block-list/src/lib.rs | 4 ++-- misc/connection-limits/src/lib.rs | 4 ++-- protocols/identify/tests/smoke.rs | 6 +++--- swarm/src/lib.rs | 4 ++++ transports/pnet/tests/smoke.rs | 13 +++++-------- 11 files changed, 42 insertions(+), 43 deletions(-) diff --git a/examples/metrics/src/main.rs b/examples/metrics/src/main.rs index 177ff3af09d..bcab8b2706c 100644 --- a/examples/metrics/src/main.rs +++ b/examples/metrics/src/main.rs @@ -26,12 +26,13 @@ use futures::stream::StreamExt; use libp2p::core::{upgrade::Version, Multiaddr, Transport}; use libp2p::identity::PeerId; use libp2p::metrics::{Metrics, Recorder}; -use libp2p::swarm::{keep_alive, NetworkBehaviour, SwarmBuilder, SwarmEvent}; +use libp2p::swarm::{dummy, NetworkBehaviour, SwarmBuilder, SwarmEvent}; use libp2p::{identify, identity, noise, ping, tcp, yamux}; use log::info; use prometheus_client::registry::Registry; use std::error::Error; use std::thread; +use std::time::Duration; mod http_service; @@ -52,6 +53,7 @@ fn main() -> Result<(), Box> { Behaviour::new(local_pub_key), local_peer_id, ) + .idle_connection_timeout(Duration::from_secs(5)) .build(); swarm.listen_on("/ip4/0.0.0.0/tcp/0".parse()?)?; @@ -89,12 +91,12 @@ fn main() -> Result<(), Box> { /// Our network behaviour. /// -/// For illustrative purposes, this includes the [`keep_alive::Behaviour`]) behaviour so the ping actually happen +/// For illustrative purposes, this includes the [`dummy::Behaviour`]) behaviour so the ping actually happen /// and can be observed via the metrics. #[derive(NetworkBehaviour)] struct Behaviour { identify: identify::Behaviour, - keep_alive: keep_alive::Behaviour, + dummy_behaviour: dummy::Behaviour, ping: ping::Behaviour, } @@ -106,7 +108,7 @@ impl Behaviour { "/ipfs/0.1.0".into(), local_pub_key, )), - keep_alive: keep_alive::Behaviour, + dummy_behaviour: dummy::Behaviour, } } } diff --git a/examples/ping-example/src/main.rs b/examples/ping-example/src/main.rs index 6b993bcb6e3..c26bf65658c 100644 --- a/examples/ping-example/src/main.rs +++ b/examples/ping-example/src/main.rs @@ -24,10 +24,11 @@ use futures::prelude::*; use libp2p::core::upgrade::Version; use libp2p::{ identity, noise, ping, - swarm::{keep_alive, NetworkBehaviour, SwarmBuilder, SwarmEvent}, + swarm::{SwarmBuilder, SwarmEvent}, tcp, yamux, Multiaddr, PeerId, Transport, }; use std::error::Error; +use std::time::Duration; #[async_std::main] async fn main() -> Result<(), Box> { @@ -42,7 +43,8 @@ async fn main() -> Result<(), Box> { .boxed(); let mut swarm = - SwarmBuilder::with_async_std_executor(transport, Behaviour::default(), local_peer_id) + SwarmBuilder::with_async_std_executor(transport, ping::Behaviour::default(), local_peer_id) + .idle_connection_timeout(Duration::from_secs(5)) .build(); // Tell the swarm to listen on all interfaces and a random, OS-assigned @@ -65,13 +67,3 @@ async fn main() -> Result<(), Box> { } } } - -/// Our network behaviour. -/// -/// For illustrative purposes, this includes the [`KeepAlive`](keep_alive::Behaviour) behaviour so a continuous sequence of -/// pings can be observed. -#[derive(NetworkBehaviour, Default)] -struct Behaviour { - keep_alive: keep_alive::Behaviour, - ping: ping::Behaviour, -} diff --git a/examples/rendezvous/src/bin/rzv-discover.rs b/examples/rendezvous/src/bin/rzv-discover.rs index 710b491ff0a..528a4617440 100644 --- a/examples/rendezvous/src/bin/rzv-discover.rs +++ b/examples/rendezvous/src/bin/rzv-discover.rs @@ -24,7 +24,7 @@ use libp2p::{ identity, multiaddr::Protocol, noise, ping, rendezvous, - swarm::{keep_alive, NetworkBehaviour, SwarmBuilder, SwarmEvent}, + swarm::{dummy, NetworkBehaviour, SwarmBuilder, SwarmEvent}, tcp, yamux, Multiaddr, PeerId, Transport, }; use std::time::Duration; @@ -50,10 +50,11 @@ async fn main() { MyBehaviour { rendezvous: rendezvous::client::Behaviour::new(key_pair.clone()), ping: ping::Behaviour::new(ping::Config::new().with_interval(Duration::from_secs(1))), - keep_alive: keep_alive::Behaviour, + dummy_behaviour: dummy::Behaviour, }, PeerId::from(key_pair.public()), ) + .idle_connection_timeout(Duration::from_secs(5)) .build(); log::info!("Local peer id: {}", swarm.local_peer_id()); @@ -129,5 +130,5 @@ async fn main() { struct MyBehaviour { rendezvous: rendezvous::client::Behaviour, ping: ping::Behaviour, - keep_alive: keep_alive::Behaviour, + dummy_behaviour: dummy::Behaviour, } diff --git a/examples/rendezvous/src/bin/rzv-identify.rs b/examples/rendezvous/src/bin/rzv-identify.rs index 7c326688231..580743dea6f 100644 --- a/examples/rendezvous/src/bin/rzv-identify.rs +++ b/examples/rendezvous/src/bin/rzv-identify.rs @@ -22,7 +22,7 @@ use futures::StreamExt; use libp2p::{ core::transport::upgrade::Version, identify, identity, noise, ping, rendezvous, - swarm::{keep_alive, NetworkBehaviour, SwarmBuilder, SwarmEvent}, + swarm::{dummy, NetworkBehaviour, SwarmBuilder, SwarmEvent}, tcp, yamux, Multiaddr, PeerId, Transport, }; use std::time::Duration; @@ -50,10 +50,11 @@ async fn main() { )), rendezvous: rendezvous::client::Behaviour::new(key_pair.clone()), ping: ping::Behaviour::new(ping::Config::new().with_interval(Duration::from_secs(1))), - keep_alive: keep_alive::Behaviour, + dummy_behaviour: dummy::Behaviour, }, PeerId::from(key_pair.public()), ) + .idle_connection_timeout(Duration::from_secs(5)) .build(); log::info!("Local peer id: {}", swarm.local_peer_id()); @@ -135,5 +136,5 @@ struct MyBehaviour { identify: identify::Behaviour, rendezvous: rendezvous::client::Behaviour, ping: ping::Behaviour, - keep_alive: keep_alive::Behaviour, + dummy_behaviour: dummy::Behaviour, } diff --git a/examples/rendezvous/src/bin/rzv-register.rs b/examples/rendezvous/src/bin/rzv-register.rs index f9fd12b1493..47de4a728fd 100644 --- a/examples/rendezvous/src/bin/rzv-register.rs +++ b/examples/rendezvous/src/bin/rzv-register.rs @@ -22,7 +22,7 @@ use futures::StreamExt; use libp2p::{ core::transport::upgrade::Version, identity, noise, ping, rendezvous, - swarm::{keep_alive, NetworkBehaviour, SwarmBuilder, SwarmEvent}, + swarm::{dummy, NetworkBehaviour, SwarmBuilder, SwarmEvent}, tcp, yamux, Multiaddr, PeerId, Transport, }; use std::time::Duration; @@ -46,10 +46,11 @@ async fn main() { MyBehaviour { rendezvous: rendezvous::client::Behaviour::new(key_pair.clone()), ping: ping::Behaviour::new(ping::Config::new().with_interval(Duration::from_secs(1))), - keep_alive: keep_alive::Behaviour, + dummy_behaviour: dummy::Behaviour, }, PeerId::from(key_pair.public()), ) + .idle_connection_timeout(Duration::from_secs(5)) .build(); // In production the external address should be the publicly facing IP address of the rendezvous point. @@ -132,5 +133,5 @@ async fn main() { struct MyBehaviour { rendezvous: rendezvous::client::Behaviour, ping: ping::Behaviour, - keep_alive: keep_alive::Behaviour, + dummy_behaviour: dummy::Behaviour, } diff --git a/examples/rendezvous/src/main.rs b/examples/rendezvous/src/main.rs index 4f5aca75e52..4f0b9da0c0c 100644 --- a/examples/rendezvous/src/main.rs +++ b/examples/rendezvous/src/main.rs @@ -24,7 +24,7 @@ use futures::StreamExt; use libp2p::{ core::transport::upgrade::Version, identify, identity, noise, ping, rendezvous, - swarm::{keep_alive, NetworkBehaviour, SwarmBuilder, SwarmEvent}, + swarm::{dummy, NetworkBehaviour, SwarmBuilder, SwarmEvent}, tcp, yamux, PeerId, Transport, }; use std::time::Duration; @@ -48,10 +48,11 @@ async fn main() { )), rendezvous: rendezvous::server::Behaviour::new(rendezvous::server::Config::default()), ping: ping::Behaviour::new(ping::Config::new().with_interval(Duration::from_secs(1))), - keep_alive: keep_alive::Behaviour, + dummy_behaviour: dummy::Behaviour, }, PeerId::from(key_pair.public()), ) + .idle_connection_timeout(Duration::from_secs(5)) .build(); log::info!("Local peer id: {}", swarm.local_peer_id()); @@ -99,5 +100,5 @@ struct MyBehaviour { identify: identify::Behaviour, rendezvous: rendezvous::server::Behaviour, ping: ping::Behaviour, - keep_alive: keep_alive::Behaviour, + dummy_behaviour: dummy::Behaviour, } diff --git a/misc/allow-block-list/src/lib.rs b/misc/allow-block-list/src/lib.rs index a9e3280c417..5766bef79c0 100644 --- a/misc/allow-block-list/src/lib.rs +++ b/misc/allow-block-list/src/lib.rs @@ -474,7 +474,7 @@ mod tests { #[behaviour(prelude = "libp2p_swarm::derive_prelude")] struct Behaviour { list: super::Behaviour, - keep_alive: libp2p_swarm::keep_alive::Behaviour, + dummy_behaviour: libp2p_swarm::dummy::Behaviour, } impl Behaviour @@ -488,7 +488,7 @@ mod tests { close_connections: VecDeque::new(), state: S::default(), }, - keep_alive: libp2p_swarm::keep_alive::Behaviour, + dummy_behaviour: libp2p_swarm::dummy::Behaviour, } } } diff --git a/misc/connection-limits/src/lib.rs b/misc/connection-limits/src/lib.rs index 52d0aa62c39..d4efda70768 100644 --- a/misc/connection-limits/src/lib.rs +++ b/misc/connection-limits/src/lib.rs @@ -470,14 +470,14 @@ mod tests { #[behaviour(prelude = "libp2p_swarm::derive_prelude")] struct Behaviour { limits: super::Behaviour, - keep_alive: libp2p_swarm::keep_alive::Behaviour, + dummy_behaviour: libp2p_swarm::dummy::Behaviour, } impl Behaviour { fn new(limits: ConnectionLimits) -> Self { Self { limits: super::Behaviour::new(limits), - keep_alive: libp2p_swarm::keep_alive::Behaviour, + dummy_behaviour: libp2p_swarm::dummy::Behaviour, } } } diff --git a/protocols/identify/tests/smoke.rs b/protocols/identify/tests/smoke.rs index c70ab3181b4..6799299f332 100644 --- a/protocols/identify/tests/smoke.rs +++ b/protocols/identify/tests/smoke.rs @@ -1,6 +1,6 @@ use libp2p_core::multiaddr::Protocol; use libp2p_identify as identify; -use libp2p_swarm::{keep_alive, Swarm, SwarmEvent}; +use libp2p_swarm::{dummy, Swarm, SwarmEvent}; use libp2p_swarm_test::SwarmExt; use std::iter; @@ -195,14 +195,14 @@ async fn discover_peer_after_disconnect() { #[behaviour(prelude = "libp2p_swarm::derive_prelude")] struct Behaviour { identify: identify::Behaviour, - keep_alive: keep_alive::Behaviour, + dummy_behaviour: dummy::Behaviour, } impl Behaviour { fn new(config: identify::Config) -> Self { Self { identify: identify::Behaviour::new(config), - keep_alive: keep_alive::Behaviour, + dummy_behaviour: dummy::Behaviour, } } } diff --git a/swarm/src/lib.rs b/swarm/src/lib.rs index decfc4242bb..bbf87fc40bf 100644 --- a/swarm/src/lib.rs +++ b/swarm/src/lib.rs @@ -67,6 +67,10 @@ pub mod behaviour; pub mod dial_opts; pub mod dummy; pub mod handler; +#[deprecated( + note = "Configure an appropriate idle connection timeout via `SwarmBuilder::idle_connection_timeout` instead." +)] +pub mod keep_alive; mod listen_opts; /// Bundles all symbols required for the [`libp2p_swarm_derive::NetworkBehaviour`] macro. diff --git a/transports/pnet/tests/smoke.rs b/transports/pnet/tests/smoke.rs index a7635c00ca3..5e02ed856c6 100644 --- a/transports/pnet/tests/smoke.rs +++ b/transports/pnet/tests/smoke.rs @@ -6,7 +6,7 @@ use libp2p_core::upgrade::Version; use libp2p_core::Transport; use libp2p_core::{multiaddr::Protocol, Multiaddr}; use libp2p_pnet::{PnetConfig, PreSharedKey}; -use libp2p_swarm::{keep_alive, NetworkBehaviour, Swarm, SwarmBuilder, SwarmEvent}; +use libp2p_swarm::{dummy, NetworkBehaviour, Swarm, SwarmBuilder, SwarmEvent}; const TIMEOUT: Duration = Duration::from_secs(5); @@ -98,7 +98,7 @@ where assert_eq!(&outbound_peer_id, swarm1.local_peer_id()); } -fn make_swarm(transport: T, pnet: PnetConfig) -> Swarm +fn make_swarm(transport: T, pnet: PnetConfig) -> Swarm where T: Transport + Send + Unpin + 'static, ::Error: Send + Sync + 'static, @@ -113,12 +113,9 @@ where .authenticate(libp2p_noise::Config::new(&identity).unwrap()) .multiplex(libp2p_yamux::Config::default()) .boxed(); - SwarmBuilder::with_tokio_executor( - transport, - keep_alive::Behaviour, - identity.public().to_peer_id(), - ) - .build() + SwarmBuilder::with_tokio_executor(transport, dummy::Behaviour, identity.public().to_peer_id()) + .idle_connection_timeout(Duration::from_secs(5)) + .build() } async fn listen_on(swarm: &mut Swarm, addr: Multiaddr) -> Multiaddr { From 73c90f8627ec7ac115d5cdd833838c3fe80bcb12 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Tue, 12 Sep 2023 14:18:43 +1000 Subject: [PATCH 12/26] Remove `dummy::Behaviour` --- examples/metrics/src/main.rs | 7 +- examples/rendezvous/src/bin/rzv-discover.rs | 4 +- examples/rendezvous/src/bin/rzv-identify.rs | 4 +- examples/rendezvous/src/bin/rzv-register.rs | 4 +- examples/rendezvous/src/main.rs | 4 +- misc/allow-block-list/src/lib.rs | 2 - protocols/identify/tests/smoke.rs | 82 +++++++-------------- 7 files changed, 32 insertions(+), 75 deletions(-) diff --git a/examples/metrics/src/main.rs b/examples/metrics/src/main.rs index bcab8b2706c..0126bb598da 100644 --- a/examples/metrics/src/main.rs +++ b/examples/metrics/src/main.rs @@ -26,7 +26,7 @@ use futures::stream::StreamExt; use libp2p::core::{upgrade::Version, Multiaddr, Transport}; use libp2p::identity::PeerId; use libp2p::metrics::{Metrics, Recorder}; -use libp2p::swarm::{dummy, NetworkBehaviour, SwarmBuilder, SwarmEvent}; +use libp2p::swarm::{NetworkBehaviour, SwarmBuilder, SwarmEvent}; use libp2p::{identify, identity, noise, ping, tcp, yamux}; use log::info; use prometheus_client::registry::Registry; @@ -90,13 +90,9 @@ fn main() -> Result<(), Box> { } /// Our network behaviour. -/// -/// For illustrative purposes, this includes the [`dummy::Behaviour`]) behaviour so the ping actually happen -/// and can be observed via the metrics. #[derive(NetworkBehaviour)] struct Behaviour { identify: identify::Behaviour, - dummy_behaviour: dummy::Behaviour, ping: ping::Behaviour, } @@ -108,7 +104,6 @@ impl Behaviour { "/ipfs/0.1.0".into(), local_pub_key, )), - dummy_behaviour: dummy::Behaviour, } } } diff --git a/examples/rendezvous/src/bin/rzv-discover.rs b/examples/rendezvous/src/bin/rzv-discover.rs index 528a4617440..1b687a6e5ac 100644 --- a/examples/rendezvous/src/bin/rzv-discover.rs +++ b/examples/rendezvous/src/bin/rzv-discover.rs @@ -24,7 +24,7 @@ use libp2p::{ identity, multiaddr::Protocol, noise, ping, rendezvous, - swarm::{dummy, NetworkBehaviour, SwarmBuilder, SwarmEvent}, + swarm::{NetworkBehaviour, SwarmBuilder, SwarmEvent}, tcp, yamux, Multiaddr, PeerId, Transport, }; use std::time::Duration; @@ -50,7 +50,6 @@ async fn main() { MyBehaviour { rendezvous: rendezvous::client::Behaviour::new(key_pair.clone()), ping: ping::Behaviour::new(ping::Config::new().with_interval(Duration::from_secs(1))), - dummy_behaviour: dummy::Behaviour, }, PeerId::from(key_pair.public()), ) @@ -130,5 +129,4 @@ async fn main() { struct MyBehaviour { rendezvous: rendezvous::client::Behaviour, ping: ping::Behaviour, - dummy_behaviour: dummy::Behaviour, } diff --git a/examples/rendezvous/src/bin/rzv-identify.rs b/examples/rendezvous/src/bin/rzv-identify.rs index 580743dea6f..be0fbb2480b 100644 --- a/examples/rendezvous/src/bin/rzv-identify.rs +++ b/examples/rendezvous/src/bin/rzv-identify.rs @@ -22,7 +22,7 @@ use futures::StreamExt; use libp2p::{ core::transport::upgrade::Version, identify, identity, noise, ping, rendezvous, - swarm::{dummy, NetworkBehaviour, SwarmBuilder, SwarmEvent}, + swarm::{NetworkBehaviour, SwarmBuilder, SwarmEvent}, tcp, yamux, Multiaddr, PeerId, Transport, }; use std::time::Duration; @@ -50,7 +50,6 @@ async fn main() { )), rendezvous: rendezvous::client::Behaviour::new(key_pair.clone()), ping: ping::Behaviour::new(ping::Config::new().with_interval(Duration::from_secs(1))), - dummy_behaviour: dummy::Behaviour, }, PeerId::from(key_pair.public()), ) @@ -136,5 +135,4 @@ struct MyBehaviour { identify: identify::Behaviour, rendezvous: rendezvous::client::Behaviour, ping: ping::Behaviour, - dummy_behaviour: dummy::Behaviour, } diff --git a/examples/rendezvous/src/bin/rzv-register.rs b/examples/rendezvous/src/bin/rzv-register.rs index 47de4a728fd..f5849f5f200 100644 --- a/examples/rendezvous/src/bin/rzv-register.rs +++ b/examples/rendezvous/src/bin/rzv-register.rs @@ -22,7 +22,7 @@ use futures::StreamExt; use libp2p::{ core::transport::upgrade::Version, identity, noise, ping, rendezvous, - swarm::{dummy, NetworkBehaviour, SwarmBuilder, SwarmEvent}, + swarm::{NetworkBehaviour, SwarmBuilder, SwarmEvent}, tcp, yamux, Multiaddr, PeerId, Transport, }; use std::time::Duration; @@ -46,7 +46,6 @@ async fn main() { MyBehaviour { rendezvous: rendezvous::client::Behaviour::new(key_pair.clone()), ping: ping::Behaviour::new(ping::Config::new().with_interval(Duration::from_secs(1))), - dummy_behaviour: dummy::Behaviour, }, PeerId::from(key_pair.public()), ) @@ -133,5 +132,4 @@ async fn main() { struct MyBehaviour { rendezvous: rendezvous::client::Behaviour, ping: ping::Behaviour, - dummy_behaviour: dummy::Behaviour, } diff --git a/examples/rendezvous/src/main.rs b/examples/rendezvous/src/main.rs index 4f0b9da0c0c..edba2dbf572 100644 --- a/examples/rendezvous/src/main.rs +++ b/examples/rendezvous/src/main.rs @@ -24,7 +24,7 @@ use futures::StreamExt; use libp2p::{ core::transport::upgrade::Version, identify, identity, noise, ping, rendezvous, - swarm::{dummy, NetworkBehaviour, SwarmBuilder, SwarmEvent}, + swarm::{NetworkBehaviour, SwarmBuilder, SwarmEvent}, tcp, yamux, PeerId, Transport, }; use std::time::Duration; @@ -48,7 +48,6 @@ async fn main() { )), rendezvous: rendezvous::server::Behaviour::new(rendezvous::server::Config::default()), ping: ping::Behaviour::new(ping::Config::new().with_interval(Duration::from_secs(1))), - dummy_behaviour: dummy::Behaviour, }, PeerId::from(key_pair.public()), ) @@ -100,5 +99,4 @@ struct MyBehaviour { identify: identify::Behaviour, rendezvous: rendezvous::server::Behaviour, ping: ping::Behaviour, - dummy_behaviour: dummy::Behaviour, } diff --git a/misc/allow-block-list/src/lib.rs b/misc/allow-block-list/src/lib.rs index eba2eb17ff1..d8be7bab5ee 100644 --- a/misc/allow-block-list/src/lib.rs +++ b/misc/allow-block-list/src/lib.rs @@ -501,7 +501,6 @@ mod tests { #[behaviour(prelude = "libp2p_swarm::derive_prelude")] struct Behaviour { list: super::Behaviour, - dummy_behaviour: libp2p_swarm::dummy::Behaviour, } impl Behaviour @@ -515,7 +514,6 @@ mod tests { close_connections: VecDeque::new(), state: S::default(), }, - dummy_behaviour: libp2p_swarm::dummy::Behaviour, } } } diff --git a/protocols/identify/tests/smoke.rs b/protocols/identify/tests/smoke.rs index 6799299f332..c1926b4125f 100644 --- a/protocols/identify/tests/smoke.rs +++ b/protocols/identify/tests/smoke.rs @@ -1,6 +1,6 @@ use libp2p_core::multiaddr::Protocol; use libp2p_identify as identify; -use libp2p_swarm::{dummy, Swarm, SwarmEvent}; +use libp2p_swarm::{Swarm, SwarmEvent}; use libp2p_swarm_test::SwarmExt; use std::iter; @@ -9,7 +9,7 @@ async fn periodic_identify() { let _ = env_logger::try_init(); let mut swarm1 = Swarm::new_ephemeral(|identity| { - Behaviour::new( + identify::Behaviour::new( identify::Config::new("a".to_string(), identity.public()) .with_agent_version("b".to_string()), ) @@ -17,7 +17,7 @@ async fn periodic_identify() { let swarm1_peer_id = *swarm1.local_peer_id(); let mut swarm2 = Swarm::new_ephemeral(|identity| { - Behaviour::new( + identify::Behaviour::new( identify::Config::new("c".to_string(), identity.public()) .with_agent_version("d".to_string()), ) @@ -33,20 +33,20 @@ async fn periodic_identify() { match libp2p_swarm_test::drive(&mut swarm1, &mut swarm2).await { ( - [BehaviourEvent::Identify(Received { info: s1_info, .. }), BehaviourEvent::Identify(Sent { .. })], - [BehaviourEvent::Identify(Received { info: s2_info, .. }), BehaviourEvent::Identify(Sent { .. })], + [Received { info: s1_info, .. }, Sent { .. }], + [Received { info: s2_info, .. }, Sent { .. }], ) | ( - [BehaviourEvent::Identify(Sent { .. }), BehaviourEvent::Identify(Received { info: s1_info, .. })], - [BehaviourEvent::Identify(Sent { .. }), BehaviourEvent::Identify(Received { info: s2_info, .. })], + [Sent { .. }, Received { info: s1_info, .. }], + [Sent { .. }, Received { info: s2_info, .. }], ) | ( - [BehaviourEvent::Identify(Received { info: s1_info, .. }), BehaviourEvent::Identify(Sent { .. })], - [BehaviourEvent::Identify(Sent { .. }), BehaviourEvent::Identify(Received { info: s2_info, .. })], + [Received { info: s1_info, .. }, Sent { .. }], + [Sent { .. }, Received { info: s2_info, .. }], ) | ( - [BehaviourEvent::Identify(Sent { .. }), BehaviourEvent::Identify(Received { info: s1_info, .. })], - [BehaviourEvent::Identify(Received { info: s2_info, .. }), BehaviourEvent::Identify(Sent { .. })], + [Sent { .. }, Received { info: s1_info, .. }], + [Received { info: s2_info, .. }, Sent { .. }], ) => { assert_eq!(s1_info.public_key.to_peer_id(), swarm2_peer_id); assert_eq!(s1_info.protocol_version, "c"); @@ -83,10 +83,10 @@ async fn identify_push() { let _ = env_logger::try_init(); let mut swarm1 = Swarm::new_ephemeral(|identity| { - Behaviour::new(identify::Config::new("a".to_string(), identity.public())) + identify::Behaviour::new(identify::Config::new("a".to_string(), identity.public())) }); let mut swarm2 = Swarm::new_ephemeral(|identity| { - Behaviour::new( + identify::Behaviour::new( identify::Config::new("a".to_string(), identity.public()) .with_agent_version("b".to_string()), ) @@ -96,33 +96,25 @@ async fn identify_push() { swarm2.connect(&mut swarm1).await; // First, let the periodic identify do its thing. - match libp2p_swarm_test::drive(&mut swarm1, &mut swarm2).await { - ( - [BehaviourEvent::Identify(e1), BehaviourEvent::Identify(e2)], - [BehaviourEvent::Identify(e3), BehaviourEvent::Identify(e4)], - ) => { - use identify::Event::{Received, Sent}; + let ([e1, e2], [e3, e4]) = libp2p_swarm_test::drive(&mut swarm1, &mut swarm2).await; - // These can be received in any order, hence assert them here instead of the pattern above. - assert!(matches!(e1, Received { .. } | Sent { .. })); - assert!(matches!(e2, Received { .. } | Sent { .. })); - assert!(matches!(e3, Received { .. } | Sent { .. })); - assert!(matches!(e4, Received { .. } | Sent { .. })); - } - other => panic!("Unexpected events: {other:?}"), - }; + { + use identify::Event::{Received, Sent}; + + // These can be received in any order, hence assert them here. + assert!(matches!(e1, Received { .. } | Sent { .. })); + assert!(matches!(e2, Received { .. } | Sent { .. })); + assert!(matches!(e3, Received { .. } | Sent { .. })); + assert!(matches!(e4, Received { .. } | Sent { .. })); + } // Second, actively push. swarm2 .behaviour_mut() - .identify .push(iter::once(*swarm1.local_peer_id())); let swarm1_received_info = match libp2p_swarm_test::drive(&mut swarm1, &mut swarm2).await { - ( - [BehaviourEvent::Identify(identify::Event::Received { info, .. })], - [BehaviourEvent::Identify(identify::Event::Pushed { .. })], - ) => info, + ([identify::Event::Received { info, .. }], [identify::Event::Pushed { .. }]) => info, other => panic!("Unexpected events: {other:?}"), }; @@ -141,10 +133,10 @@ async fn discover_peer_after_disconnect() { let _ = env_logger::try_init(); let mut swarm1 = Swarm::new_ephemeral(|identity| { - Behaviour::new(identify::Config::new("a".to_string(), identity.public())) + identify::Behaviour::new(identify::Config::new("a".to_string(), identity.public())) }); let mut swarm2 = Swarm::new_ephemeral(|identity| { - Behaviour::new( + identify::Behaviour::new( identify::Config::new("a".to_string(), identity.public()) .with_agent_version("b".to_string()), ) @@ -161,7 +153,7 @@ async fn discover_peer_after_disconnect() { .wait(|event| { matches!( event, - SwarmEvent::Behaviour(BehaviourEvent::Identify(identify::Event::Received { .. })) + SwarmEvent::Behaviour(identify::Event::Received { .. }) ) .then_some(()) }) @@ -186,23 +178,3 @@ async fn discover_peer_after_disconnect() { assert_eq!(connected_peer, swarm1_peer_id); } - -/// Combined behaviour to keep the connection alive after the periodic identify. -/// -/// The identify implementation sets `keep_alive` to `No` once it has done its thing. -/// This can result in unexpected connection closures if one peer is faster than the other. -#[derive(libp2p_swarm::NetworkBehaviour)] -#[behaviour(prelude = "libp2p_swarm::derive_prelude")] -struct Behaviour { - identify: identify::Behaviour, - dummy_behaviour: dummy::Behaviour, -} - -impl Behaviour { - fn new(config: identify::Config) -> Self { - Self { - identify: identify::Behaviour::new(config), - dummy_behaviour: dummy::Behaviour, - } - } -} From 45b2bd254fe876c36298c2aca87e90df37ff01d4 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Tue, 12 Sep 2023 14:23:03 +1000 Subject: [PATCH 13/26] Simplify behaviours where possible --- libp2p/src/tutorials/ping.rs | 40 ------------- misc/allow-block-list/src/lib.rs | 96 ++++++++------------------------ 2 files changed, 24 insertions(+), 112 deletions(-) diff --git a/libp2p/src/tutorials/ping.rs b/libp2p/src/tutorials/ping.rs index 976b45e1e22..a469b172473 100644 --- a/libp2p/src/tutorials/ping.rs +++ b/libp2p/src/tutorials/ping.rs @@ -158,16 +158,6 @@ //! //! Ok(()) //! } -//! -//! /// Our network behaviour. -//! /// -//! /// For illustrative purposes, this includes the [`KeepAlive`](behaviour::KeepAlive) behaviour so a continuous sequence of -//! /// pings can be observed. -//! #[derive(NetworkBehaviour, Default)] -//! struct Behaviour { -//! keep_alive: keep_alive::Behaviour, -//! ping: ping::Behaviour, -//! } //! ``` //! //! ## Swarm @@ -198,16 +188,6 @@ //! //! Ok(()) //! } -//! -//! /// Our network behaviour. -//! /// -//! /// For illustrative purposes, this includes the [`KeepAlive`](behaviour:: -//! /// KeepAlive) behaviour so a continuous sequence of pings can be observed. -//! #[derive(NetworkBehaviour, Default)] -//! struct Behaviour { -//! keep_alive: keep_alive::Behaviour, -//! ping: ping::Behaviour, -//! } //! ``` //! //! ## Multiaddr @@ -267,16 +247,6 @@ //! //! Ok(()) //! } -//! -//! /// Our network behaviour. -//! /// -//! /// For illustrative purposes, this includes the [`KeepAlive`](behaviour::KeepAlive) behaviour so a continuous sequence of -//! /// pings can be observed. -//! #[derive(NetworkBehaviour, Default)] -//! struct Behaviour { -//! keep_alive: keep_alive::Behaviour, -//! ping: ping::Behaviour, -//! } //! ``` //! //! ## Continuously polling the Swarm @@ -323,16 +293,6 @@ //! } //! } //! } -//! -//! /// Our network behaviour. -//! /// -//! /// For illustrative purposes, this includes the [`KeepAlive`](behaviour::KeepAlive) behaviour so a continuous sequence of -//! /// pings can be observed. -//! #[derive(NetworkBehaviour, Default)] -//! struct Behaviour { -//! keep_alive: keep_alive::Behaviour, -//! ping: ping::Behaviour, -//! } //! ``` //! //! ## Running two nodes diff --git a/misc/allow-block-list/src/lib.rs b/misc/allow-block-list/src/lib.rs index d8be7bab5ee..1950c47f28b 100644 --- a/misc/allow-block-list/src/lib.rs +++ b/misc/allow-block-list/src/lib.rs @@ -283,14 +283,11 @@ mod tests { #[async_std::test] async fn cannot_dial_blocked_peer() { - let mut dialer = Swarm::new_ephemeral(|_| Behaviour::::new()); - let mut listener = Swarm::new_ephemeral(|_| Behaviour::::new()); + let mut dialer = Swarm::new_ephemeral(|_| Behaviour::::default()); + let mut listener = Swarm::new_ephemeral(|_| Behaviour::::default()); listener.listen().await; - dialer - .behaviour_mut() - .list - .block_peer(*listener.local_peer_id()); + dialer.behaviour_mut().block_peer(*listener.local_peer_id()); let DialError::Denied { cause } = dial(&mut dialer, &listener).unwrap_err() else { panic!("unexpected dial error") @@ -300,17 +297,13 @@ mod tests { #[async_std::test] async fn can_dial_unblocked_peer() { - let mut dialer = Swarm::new_ephemeral(|_| Behaviour::::new()); - let mut listener = Swarm::new_ephemeral(|_| Behaviour::::new()); + let mut dialer = Swarm::new_ephemeral(|_| Behaviour::::default()); + let mut listener = Swarm::new_ephemeral(|_| Behaviour::::default()); listener.listen().await; + dialer.behaviour_mut().block_peer(*listener.local_peer_id()); dialer .behaviour_mut() - .list - .block_peer(*listener.local_peer_id()); - dialer - .behaviour_mut() - .list .unblock_peer(*listener.local_peer_id()); dial(&mut dialer, &listener).unwrap(); @@ -318,14 +311,11 @@ mod tests { #[async_std::test] async fn blocked_peer_cannot_dial_us() { - let mut dialer = Swarm::new_ephemeral(|_| Behaviour::::new()); - let mut listener = Swarm::new_ephemeral(|_| Behaviour::::new()); + let mut dialer = Swarm::new_ephemeral(|_| Behaviour::::default()); + let mut listener = Swarm::new_ephemeral(|_| Behaviour::::default()); listener.listen().await; - listener - .behaviour_mut() - .list - .block_peer(*dialer.local_peer_id()); + listener.behaviour_mut().block_peer(*dialer.local_peer_id()); dial(&mut dialer, &listener).unwrap(); async_std::task::spawn(dialer.loop_on_next()); @@ -343,15 +333,12 @@ mod tests { #[async_std::test] async fn connections_get_closed_upon_blocked() { - let mut dialer = Swarm::new_ephemeral(|_| Behaviour::::new()); - let mut listener = Swarm::new_ephemeral(|_| Behaviour::::new()); + let mut dialer = Swarm::new_ephemeral(|_| Behaviour::::default()); + let mut listener = Swarm::new_ephemeral(|_| Behaviour::::default()); listener.listen().await; dialer.connect(&mut listener).await; - dialer - .behaviour_mut() - .list - .block_peer(*listener.local_peer_id()); + dialer.behaviour_mut().block_peer(*listener.local_peer_id()); let ( [SwarmEvent::ConnectionClosed { @@ -372,8 +359,8 @@ mod tests { #[async_std::test] async fn cannot_dial_peer_unless_allowed() { - let mut dialer = Swarm::new_ephemeral(|_| Behaviour::::new()); - let mut listener = Swarm::new_ephemeral(|_| Behaviour::::new()); + let mut dialer = Swarm::new_ephemeral(|_| Behaviour::::default()); + let mut listener = Swarm::new_ephemeral(|_| Behaviour::::default()); listener.listen().await; let DialError::Denied { cause } = dial(&mut dialer, &listener).unwrap_err() else { @@ -381,26 +368,19 @@ mod tests { }; assert!(cause.downcast::().is_ok()); - dialer - .behaviour_mut() - .list - .allow_peer(*listener.local_peer_id()); + dialer.behaviour_mut().allow_peer(*listener.local_peer_id()); assert!(dial(&mut dialer, &listener).is_ok()); } #[async_std::test] async fn cannot_dial_disallowed_peer() { - let mut dialer = Swarm::new_ephemeral(|_| Behaviour::::new()); - let mut listener = Swarm::new_ephemeral(|_| Behaviour::::new()); + let mut dialer = Swarm::new_ephemeral(|_| Behaviour::::default()); + let mut listener = Swarm::new_ephemeral(|_| Behaviour::::default()); listener.listen().await; + dialer.behaviour_mut().allow_peer(*listener.local_peer_id()); dialer .behaviour_mut() - .list - .allow_peer(*listener.local_peer_id()); - dialer - .behaviour_mut() - .list .disallow_peer(*listener.local_peer_id()); let DialError::Denied { cause } = dial(&mut dialer, &listener).unwrap_err() else { @@ -411,8 +391,8 @@ mod tests { #[async_std::test] async fn not_allowed_peer_cannot_dial_us() { - let mut dialer = Swarm::new_ephemeral(|_| Behaviour::::new()); - let mut listener = Swarm::new_ephemeral(|_| Behaviour::::new()); + let mut dialer = Swarm::new_ephemeral(|_| Behaviour::::default()); + let mut listener = Swarm::new_ephemeral(|_| Behaviour::::default()); listener.listen().await; dialer @@ -448,23 +428,16 @@ mod tests { #[async_std::test] async fn connections_get_closed_upon_disallow() { - let mut dialer = Swarm::new_ephemeral(|_| Behaviour::::new()); - let mut listener = Swarm::new_ephemeral(|_| Behaviour::::new()); + let mut dialer = Swarm::new_ephemeral(|_| Behaviour::::default()); + let mut listener = Swarm::new_ephemeral(|_| Behaviour::::default()); listener.listen().await; - dialer - .behaviour_mut() - .list - .allow_peer(*listener.local_peer_id()); - listener - .behaviour_mut() - .list - .allow_peer(*dialer.local_peer_id()); + dialer.behaviour_mut().allow_peer(*listener.local_peer_id()); + listener.behaviour_mut().allow_peer(*dialer.local_peer_id()); dialer.connect(&mut listener).await; dialer .behaviour_mut() - .list .disallow_peer(*listener.local_peer_id()); let ( [SwarmEvent::ConnectionClosed { @@ -496,25 +469,4 @@ mod tests { .build(), ) } - - #[derive(libp2p_swarm_derive::NetworkBehaviour)] - #[behaviour(prelude = "libp2p_swarm::derive_prelude")] - struct Behaviour { - list: super::Behaviour, - } - - impl Behaviour - where - S: Default, - { - fn new() -> Self { - Self { - list: super::Behaviour { - waker: None, - close_connections: VecDeque::new(), - state: S::default(), - }, - } - } - } } From 40388df299f44e076a5b5371fa7583282ec0c851 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Tue, 12 Sep 2023 14:29:40 +1000 Subject: [PATCH 14/26] Set `idle_connection_timeout` in tutorial --- libp2p/src/tutorials/ping.rs | 50 +++++++++++++++++++++++++++++++----- 1 file changed, 44 insertions(+), 6 deletions(-) diff --git a/libp2p/src/tutorials/ping.rs b/libp2p/src/tutorials/ping.rs index a469b172473..d2e02ace7ef 100644 --- a/libp2p/src/tutorials/ping.rs +++ b/libp2p/src/tutorials/ping.rs @@ -154,7 +154,7 @@ //! //! let transport = libp2p::development_transport(local_key).await?; //! -//! let behaviour = Behaviour::default(); +//! let behaviour = ping::Behaviour::default(); //! //! Ok(()) //! } @@ -182,7 +182,7 @@ //! //! let transport = libp2p::development_transport(local_key).await?; //! -//! let behaviour = Behaviour::default(); +//! let behaviour = ping::Behaviour::default(); //! //! let mut swarm = SwarmBuilder::with_async_std_executor(transport, behaviour, local_peer_id).build(); //! @@ -190,6 +190,40 @@ //! } //! ``` //! +//! ## Idle connection timeout +//! +//! Now, for this example in particular, we need set the idle connection timeout. +//! Otherwise, the connection will be closed immediately. +//! +//! Whether you need to set this in your application too depends on your usecase. +//! Typically, connections are kept alive if they are "in use" by a certain protocol. +//! The ping protocol however is only an "auxiliary" kind of protocol. +//! Thus, without any other behaviour in place, we would not be able to observe the pings. +//! +//! ```rust +//! use libp2p::swarm::{keep_alive, NetworkBehaviour, SwarmBuilder}; +//! use libp2p::{identity, ping, PeerId}; +//! use std::error::Error; +//! +//! #[async_std::main] +//! async fn main() -> Result<(), Box> { +//! use std::time::Duration; +//! let local_key = identity::Keypair::generate_ed25519(); +//! let local_peer_id = PeerId::from(local_key.public()); +//! println!("Local peer id: {local_peer_id:?}"); +//! +//! let transport = libp2p::development_transport(local_key).await?; +//! +//! let behaviour = ping::Behaviour::default(); +//! +//! let mut swarm = SwarmBuilder::with_async_std_executor(transport, behaviour, local_peer_id) +//! .idle_connection_timeout(Duration::from_secs(30)) // Allows us to observe pings for 30 seconds. +//! .build(); +//! +//! Ok(()) +//! } +//! ``` +//! //! ## Multiaddr //! //! With the [`Swarm`] in place, we are all set to listen for incoming @@ -229,9 +263,11 @@ //! //! let transport = libp2p::development_transport(local_key).await?; //! -//! let behaviour = Behaviour::default(); +//! let behaviour = ping::Behaviour::default(); //! -//! let mut swarm = SwarmBuilder::with_async_std_executor(transport, behaviour, local_peer_id).build(); +//! let mut swarm = SwarmBuilder::with_async_std_executor(transport, behaviour, local_peer_id) +//! .idle_connection_timeout(Duration::from_secs(30)) // Allows us to observe pings for 30 seconds. +//! .build(); //! //! // Tell the swarm to listen on all interfaces and a random, OS-assigned //! // port. @@ -269,9 +305,11 @@ //! //! let transport = libp2p::development_transport(local_key).await?; //! -//! let behaviour = Behaviour::default(); +//! let behaviour = ping::Behaviour::default(); //! -//! let mut swarm = SwarmBuilder::with_async_std_executor(transport, behaviour, local_peer_id).build(); +//! let mut swarm = SwarmBuilder::with_async_std_executor(transport, behaviour, local_peer_id) +//! .idle_connection_timeout(Duration::from_secs(30)) // Allows us to observe pings for 30 seconds. +//! .build(); //! //! // Tell the swarm to listen on all interfaces and a random, OS-assigned //! // port. From 7a790970b6995b3e79037ba3a5a225c94465645f Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Tue, 12 Sep 2023 14:30:47 +1000 Subject: [PATCH 15/26] Update deprecation notice --- swarm/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/swarm/src/lib.rs b/swarm/src/lib.rs index 15e376ca2d2..d28c8069332 100644 --- a/swarm/src/lib.rs +++ b/swarm/src/lib.rs @@ -68,7 +68,7 @@ pub mod dial_opts; pub mod dummy; pub mod handler; #[deprecated( - note = "Configure an appropriate idle connection timeout via `SwarmBuilder::idle_connection_timeout` instead." + note = "Configure an appropriate idle connection timeout via `SwarmBuilder::idle_connection_timeout` instead. To keep connections alive 'forever', use `Duration::from_secs(u64::MAX)`." )] pub mod keep_alive; mod listen_opts; From 7f492b389d8405748e166eded30ceb5f82d701f9 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Tue, 12 Sep 2023 14:31:35 +1000 Subject: [PATCH 16/26] Increase to 1min --- examples/metrics/src/main.rs | 2 +- examples/ping-example/src/main.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/examples/metrics/src/main.rs b/examples/metrics/src/main.rs index 0126bb598da..e41c2aef559 100644 --- a/examples/metrics/src/main.rs +++ b/examples/metrics/src/main.rs @@ -53,7 +53,7 @@ fn main() -> Result<(), Box> { Behaviour::new(local_pub_key), local_peer_id, ) - .idle_connection_timeout(Duration::from_secs(5)) + .idle_connection_timeout(Duration::from_secs(60)) .build(); swarm.listen_on("/ip4/0.0.0.0/tcp/0".parse()?)?; diff --git a/examples/ping-example/src/main.rs b/examples/ping-example/src/main.rs index c26bf65658c..2a521dfad7e 100644 --- a/examples/ping-example/src/main.rs +++ b/examples/ping-example/src/main.rs @@ -44,7 +44,7 @@ async fn main() -> Result<(), Box> { let mut swarm = SwarmBuilder::with_async_std_executor(transport, ping::Behaviour::default(), local_peer_id) - .idle_connection_timeout(Duration::from_secs(5)) + .idle_connection_timeout(Duration::from_secs(60)) // For illustrative purposes, keep idle connections alive for a minute so we can observe a few pings. .build(); // Tell the swarm to listen on all interfaces and a random, OS-assigned From 9c7ce2c5ff48a30b5b4d4112b0c5e53b0b05aaae Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Tue, 12 Sep 2023 14:34:21 +1000 Subject: [PATCH 17/26] Use via module import --- transports/tls/tests/smoke.rs | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/transports/tls/tests/smoke.rs b/transports/tls/tests/smoke.rs index 3f85e9e9767..0db39edf280 100644 --- a/transports/tls/tests/smoke.rs +++ b/transports/tls/tests/smoke.rs @@ -3,8 +3,7 @@ use libp2p_core::multiaddr::Protocol; use libp2p_core::transport::MemoryTransport; use libp2p_core::upgrade::Version; use libp2p_core::Transport; -use libp2p_swarm::dummy::Behaviour; -use libp2p_swarm::{Swarm, SwarmBuilder, SwarmEvent}; +use libp2p_swarm::{dummy, Swarm, SwarmBuilder, SwarmEvent}; use std::time::Duration; #[tokio::test] @@ -57,7 +56,7 @@ async fn can_establish_connection() { assert_eq!(&outbound_peer_id, swarm1.local_peer_id()); } -fn make_swarm() -> Swarm { +fn make_swarm() -> Swarm { let identity = libp2p_identity::Keypair::generate_ed25519(); let transport = MemoryTransport::default() @@ -66,7 +65,7 @@ fn make_swarm() -> Swarm { .multiplex(libp2p_yamux::Config::default()) .boxed(); - SwarmBuilder::without_executor(transport, Behaviour, identity.public().to_peer_id()) + SwarmBuilder::without_executor(transport, dummy::Behaviour, identity.public().to_peer_id()) .idle_connection_timeout(Duration::from_secs(5)) .build() } From 1a0674964e6c99af4fe5bf5f3d6d0c30e731f2f9 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Tue, 12 Sep 2023 14:51:29 +1000 Subject: [PATCH 18/26] Refactor usage of `new_test_swarm` --- swarm/src/lib.rs | 71 ++++++++++++++---------------------------------- 1 file changed, 21 insertions(+), 50 deletions(-) diff --git a/swarm/src/lib.rs b/swarm/src/lib.rs index d28c8069332..573ba6006bf 100644 --- a/swarm/src/lib.rs +++ b/swarm/src/lib.rs @@ -1841,14 +1841,8 @@ mod tests { Disconnecting, } - fn new_test_swarm( - handler_proto: T, - ) -> SwarmBuilder>> - where - T: ConnectionHandler + Clone, - T::ToBehaviour: Clone, - O: Send + 'static, - { + fn new_test_swarm( + ) -> SwarmBuilder>> { let id_keys = identity::Keypair::generate_ed25519(); let local_public_key = id_keys.public(); let transport = transport::MemoryTransport::default() @@ -1858,13 +1852,15 @@ mod tests { }) .multiplex(yamux::Config::default()) .boxed(); - let behaviour = CallTraceBehaviour::new(MockBehaviour::new(handler_proto)); - match ThreadPool::new().ok() { + let behaviour = CallTraceBehaviour::new(MockBehaviour::new(dummy::ConnectionHandler)); + let builder = match ThreadPool::new().ok() { Some(tp) => { SwarmBuilder::with_executor(transport, behaviour, local_public_key.into(), tp) } None => SwarmBuilder::without_executor(transport, behaviour, local_public_key.into()), - } + }; + + builder.idle_connection_timeout(Duration::from_secs(5)) } fn swarms_connected( @@ -1915,16 +1911,8 @@ mod tests { /// with pairs of [`FromSwarm::ConnectionEstablished`] / [`FromSwarm::ConnectionClosed`] #[test] fn test_swarm_disconnect() { - // Since the test does not try to open any substreams, we can - // use the dummy protocols handler. - let handler_proto = dummy::ConnectionHandler; - - let mut swarm1 = new_test_swarm::<_, ()>(handler_proto.clone()) - .idle_connection_timeout(Duration::from_secs(5)) - .build(); - let mut swarm2 = new_test_swarm::<_, ()>(handler_proto) - .idle_connection_timeout(Duration::from_secs(5)) - .build(); + let mut swarm1 = new_test_swarm().build(); + let mut swarm2 = new_test_swarm().build(); let addr1: Multiaddr = multiaddr::Protocol::Memory(rand::random::()).into(); let addr2: Multiaddr = multiaddr::Protocol::Memory(rand::random::()).into(); @@ -1985,16 +1973,8 @@ mod tests { /// with pairs of [`FromSwarm::ConnectionEstablished`] / [`FromSwarm::ConnectionClosed`] #[test] fn test_behaviour_disconnect_all() { - // Since the test does not try to open any substreams, we can - // use the dummy protocols handler. - let handler_proto = dummy::ConnectionHandler; - - let mut swarm1 = new_test_swarm::<_, ()>(handler_proto.clone()) - .idle_connection_timeout(Duration::from_secs(5)) - .build(); - let mut swarm2 = new_test_swarm::<_, ()>(handler_proto) - .idle_connection_timeout(Duration::from_secs(5)) - .build(); + let mut swarm1 = new_test_swarm().build(); + let mut swarm2 = new_test_swarm().build(); let addr1: Multiaddr = multiaddr::Protocol::Memory(rand::random::()).into(); let addr2: Multiaddr = multiaddr::Protocol::Memory(rand::random::()).into(); @@ -2059,16 +2039,8 @@ mod tests { /// with pairs of [`FromSwarm::ConnectionEstablished`] / [`FromSwarm::ConnectionClosed`] #[test] fn test_behaviour_disconnect_one() { - // Since the test does not try to open any substreams, we can - // use the dummy protocols handler. - let handler_proto = dummy::ConnectionHandler; - - let mut swarm1 = new_test_swarm::<_, ()>(handler_proto.clone()) - .idle_connection_timeout(Duration::from_secs(5)) - .build(); - let mut swarm2 = new_test_swarm::<_, ()>(handler_proto) - .idle_connection_timeout(Duration::from_secs(5)) - .build(); + let mut swarm1 = new_test_swarm().build(); + let mut swarm2 = new_test_swarm().build(); let addr1: Multiaddr = multiaddr::Protocol::Memory(rand::random::()).into(); let addr2: Multiaddr = multiaddr::Protocol::Memory(rand::random::()).into(); @@ -2146,8 +2118,7 @@ mod tests { fn prop(concurrency_factor: DialConcurrencyFactor) { block_on(async { - let mut swarm = new_test_swarm::<_, ()>(dummy::ConnectionHandler) - .idle_connection_timeout(Duration::from_secs(5)) + let mut swarm = new_test_swarm() .dial_concurrency_factor(concurrency_factor.0) .build(); @@ -2215,8 +2186,8 @@ mod tests { // Checks whether dialing an address containing the wrong peer id raises an error // for the expected peer id instead of the obtained peer id. - let mut swarm1 = new_test_swarm::<_, ()>(dummy::ConnectionHandler).build(); - let mut swarm2 = new_test_swarm::<_, ()>(dummy::ConnectionHandler).build(); + let mut swarm1 = new_test_swarm().build(); + let mut swarm2 = new_test_swarm().build(); swarm1.listen_on("/memory/0".parse().unwrap()).unwrap(); @@ -2275,7 +2246,7 @@ mod tests { // // The last two can happen in any order. - let mut swarm = new_test_swarm::<_, ()>(dummy::ConnectionHandler).build(); + let mut swarm = new_test_swarm().build(); swarm.listen_on("/memory/0".parse().unwrap()).unwrap(); let local_address = @@ -2335,7 +2306,7 @@ mod tests { fn dial_self_by_id() { // Trying to dial self by passing the same `PeerId` shouldn't even be possible in the first // place. - let swarm = new_test_swarm::<_, ()>(dummy::ConnectionHandler).build(); + let swarm = new_test_swarm().build(); let peer_id = *swarm.local_peer_id(); assert!(!swarm.is_connected(&peer_id)); } @@ -2346,7 +2317,7 @@ mod tests { let target = PeerId::random(); - let mut swarm = new_test_swarm::<_, ()>(dummy::ConnectionHandler).build(); + let mut swarm = new_test_swarm().build(); let addresses = HashSet::from([ multiaddr![Ip4([0, 0, 0, 0]), Tcp(rand::random::())], @@ -2392,8 +2363,8 @@ mod tests { fn aborting_pending_connection_surfaces_error() { let _ = env_logger::try_init(); - let mut dialer = new_test_swarm::<_, ()>(dummy::ConnectionHandler).build(); - let mut listener = new_test_swarm::<_, ()>(dummy::ConnectionHandler).build(); + let mut dialer = new_test_swarm().build(); + let mut listener = new_test_swarm().build(); let listener_peer_id = *listener.local_peer_id(); listener.listen_on(multiaddr![Memory(0u64)]).unwrap(); From d7898adf280b5ae1e51300368ca4d1b8a53a127e Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Tue, 12 Sep 2023 14:54:19 +1000 Subject: [PATCH 19/26] Use match-arm guard to reduce indentation --- swarm/src/connection.rs | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/swarm/src/connection.rs b/swarm/src/connection.rs index d60db88157d..a082e27e55f 100644 --- a/swarm/src/connection.rs +++ b/swarm/src/connection.rs @@ -361,15 +361,12 @@ where *shutdown = Shutdown::Later(Delay::new(dur), t) } } + (_, KeepAlive::No) if idle_timeout == &Duration::ZERO => { + *shutdown = Shutdown::Asap; + } (_, KeepAlive::No) => { - // handle idle_timeout - let duration = *idle_timeout; // Default timeout is 0 seconds - if duration > Duration::ZERO { - let deadline = Instant::now() + duration; - *shutdown = Shutdown::Later(Delay::new(duration), deadline); - } else { - *shutdown = Shutdown::Asap; - } + let deadline = Instant::now() + *idle_timeout; + *shutdown = Shutdown::Later(Delay::new(*idle_timeout), deadline); } (_, KeepAlive::Yes) => *shutdown = Shutdown::None, }; From f86240ee5b6327e56e2b5bb6ff0688a0f11c5128 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Tue, 12 Sep 2023 15:02:03 +1000 Subject: [PATCH 20/26] Fix tests and bug in impl! --- swarm/Cargo.toml | 1 + swarm/src/connection.rs | 21 +++++++++++++++------ 2 files changed, 16 insertions(+), 6 deletions(-) diff --git a/swarm/Cargo.toml b/swarm/Cargo.toml index ca49d92031f..fb04aaee560 100644 --- a/swarm/Cargo.toml +++ b/swarm/Cargo.toml @@ -55,6 +55,7 @@ quickcheck = { workspace = true } void = "1" once_cell = "1.18.0" trybuild = "1.0.84" +tokio = { version = "1.29.1", features = ["time", "rt", "macros"] } [[test]] name = "swarm_derive" diff --git a/swarm/src/connection.rs b/swarm/src/connection.rs index a082e27e55f..0a7172e5975 100644 --- a/swarm/src/connection.rs +++ b/swarm/src/connection.rs @@ -364,6 +364,9 @@ where (_, KeepAlive::No) if idle_timeout == &Duration::ZERO => { *shutdown = Shutdown::Asap; } + (Shutdown::Later(_, _), KeepAlive::No) => { + // Do nothing, i.e. let the shutdown timer continue to tick. + } (_, KeepAlive::No) => { let deadline = Instant::now() + *idle_timeout; *shutdown = Shutdown::Later(Delay::new(*idle_timeout), deadline); @@ -768,20 +771,26 @@ mod tests { )) } - #[test] - fn test_idle_timeout() { - // Create a custom idle timeout + #[tokio::test] + async fn idle_timeout() { + let idle_timeout = Duration::from_millis(100); let mut connection = Connection::new( StreamMuxerBox::new(PendingStreamMuxer), dummy::ConnectionHandler, None, 0, - Duration::from_secs(5), + idle_timeout, ); - let poll_result = connection.poll_noop_waker(); - assert!(poll_result.is_pending()); + assert!(connection.poll_noop_waker().is_pending()); + + tokio::time::sleep(idle_timeout).await; + + assert!(matches!( + connection.poll_noop_waker(), + Poll::Ready(Err(ConnectionError::KeepAliveTimeout)) + )); } #[test] From 0b73c5e7d8122e25c4c34c17f21cb8fda5258649 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Tue, 12 Sep 2023 15:11:13 +1000 Subject: [PATCH 21/26] Fix missing import in tutorial --- libp2p/src/tutorials/ping.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/libp2p/src/tutorials/ping.rs b/libp2p/src/tutorials/ping.rs index d2e02ace7ef..a65f1ceadc6 100644 --- a/libp2p/src/tutorials/ping.rs +++ b/libp2p/src/tutorials/ping.rs @@ -204,6 +204,7 @@ //! use libp2p::swarm::{keep_alive, NetworkBehaviour, SwarmBuilder}; //! use libp2p::{identity, ping, PeerId}; //! use std::error::Error; +//! use std::time::Duration; //! //! #[async_std::main] //! async fn main() -> Result<(), Box> { @@ -254,6 +255,7 @@ //! use libp2p::swarm::{keep_alive, NetworkBehaviour, SwarmBuilder}; //! use libp2p::{identity, ping, Multiaddr, PeerId}; //! use std::error::Error; +//! use std::time::Duration; //! //! #[async_std::main] //! async fn main() -> Result<(), Box> { @@ -296,6 +298,7 @@ //! use libp2p::swarm::{keep_alive, NetworkBehaviour, SwarmEvent, SwarmBuilder}; //! use libp2p::{identity, ping, Multiaddr, PeerId}; //! use std::error::Error; +//! use std::time::Duration; //! //! #[async_std::main] //! async fn main() -> Result<(), Box> { From e33817870df9cc68a10372331905be8c45be8afd Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Tue, 12 Sep 2023 15:12:11 +1000 Subject: [PATCH 22/26] Remove deprecated imports --- libp2p/src/tutorials/ping.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/libp2p/src/tutorials/ping.rs b/libp2p/src/tutorials/ping.rs index a65f1ceadc6..cc2e3d3c675 100644 --- a/libp2p/src/tutorials/ping.rs +++ b/libp2p/src/tutorials/ping.rs @@ -142,7 +142,7 @@ //! With the above in mind, let's extend our example, creating a [`ping::Behaviour`](crate::ping::Behaviour) at the end: //! //! ```rust -//! use libp2p::swarm::{keep_alive, NetworkBehaviour}; +//! use libp2p::swarm::NetworkBehaviour; //! use libp2p::{identity, ping, PeerId}; //! use std::error::Error; //! @@ -170,7 +170,7 @@ //! [`Transport`] to the [`NetworkBehaviour`]. //! //! ```rust -//! use libp2p::swarm::{keep_alive, NetworkBehaviour, SwarmBuilder}; +//! use libp2p::swarm::{NetworkBehaviour, SwarmBuilder}; //! use libp2p::{identity, ping, PeerId}; //! use std::error::Error; //! @@ -201,7 +201,7 @@ //! Thus, without any other behaviour in place, we would not be able to observe the pings. //! //! ```rust -//! use libp2p::swarm::{keep_alive, NetworkBehaviour, SwarmBuilder}; +//! use libp2p::swarm::{NetworkBehaviour, SwarmBuilder}; //! use libp2p::{identity, ping, PeerId}; //! use std::error::Error; //! use std::time::Duration; @@ -252,7 +252,7 @@ //! remote peer. //! //! ```rust -//! use libp2p::swarm::{keep_alive, NetworkBehaviour, SwarmBuilder}; +//! use libp2p::swarm::{NetworkBehaviour, SwarmBuilder}; //! use libp2p::{identity, ping, Multiaddr, PeerId}; //! use std::error::Error; //! use std::time::Duration; @@ -295,7 +295,7 @@ //! //! ```no_run //! use futures::prelude::*; -//! use libp2p::swarm::{keep_alive, NetworkBehaviour, SwarmEvent, SwarmBuilder}; +//! use libp2p::swarm::{NetworkBehaviour, SwarmEvent, SwarmBuilder}; //! use libp2p::{identity, ping, Multiaddr, PeerId}; //! use std::error::Error; //! use std::time::Duration; From 88f1f1ba7c30a9adc929ee9c0aaecf1bbe772480 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Tue, 12 Sep 2023 15:40:15 +1000 Subject: [PATCH 23/26] Add changelog entry --- swarm/CHANGELOG.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/swarm/CHANGELOG.md b/swarm/CHANGELOG.md index 77ddf3458d1..06ddd740873 100644 --- a/swarm/CHANGELOG.md +++ b/swarm/CHANGELOG.md @@ -6,8 +6,12 @@ - Improve error message when `DialPeerCondition` prevents a dial. See [PR 4409]. +- Introduce `SwarmBuilder::idle_conncetion_timeout` and deprecate `keep_alive::Behaviour` as a result. + See [PR 4161]. + [PR 4426]: https://github.com/libp2p/rust-libp2p/pull/4426 [PR 4409]: https://github.com/libp2p/rust-libp2p/pull/4409 +[PR 4161]: https://github.com/libp2p/rust-libp2p/pull/4161 ## 0.43.3 From f566a48c4e12cc49bac24a2874a3bb504c5502cd Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Mon, 18 Sep 2023 09:45:46 +1000 Subject: [PATCH 24/26] Shutdown `ConnectionHandler` after `max(KeepAlive::Until, idle_timeout)` --- swarm/src/connection.rs | 179 ++++++++++++++++++++++++++++++++++------ 1 file changed, 152 insertions(+), 27 deletions(-) diff --git a/swarm/src/connection.rs b/swarm/src/connection.rs index 0a7172e5975..310cf3a81e3 100644 --- a/swarm/src/connection.rs +++ b/swarm/src/connection.rs @@ -52,6 +52,7 @@ use libp2p_core::upgrade; use libp2p_core::upgrade::{NegotiationError, ProtocolError}; use libp2p_core::Endpoint; use libp2p_identity::PeerId; +use std::cmp::max; use std::collections::HashSet; use std::fmt::{Display, Formatter}; use std::future::Future; @@ -351,14 +352,24 @@ where (Shutdown::Later(timer, deadline), KeepAlive::Until(t)) => { if *deadline != t { *deadline = t; - if let Some(dur) = deadline.checked_duration_since(Instant::now()) { - timer.reset(dur) + if let Some(new_duration) = deadline.checked_duration_since(Instant::now()) + { + let effective_keep_alive = max(new_duration, *idle_timeout); + + timer.reset(effective_keep_alive) } } } - (_, KeepAlive::Until(t)) => { - if let Some(dur) = t.checked_duration_since(Instant::now()) { - *shutdown = Shutdown::Later(Delay::new(dur), t) + (_, KeepAlive::Until(earliest_shutdown)) => { + if let Some(requested_keep_alive) = + earliest_shutdown.checked_duration_since(Instant::now()) + { + let effective_keep_alive = max(requested_keep_alive, *idle_timeout); + + // Important: We store the _original_ `Instant` given by the `ConnectionHandler` in the `Later` instance to ensure we can compare it in the above branch. + // This is quite subtle but will hopefully become simpler soon once `KeepAlive::Until` is fully deprecated. See / + *shutdown = + Shutdown::Later(Delay::new(effective_keep_alive), earliest_shutdown) } } (_, KeepAlive::No) if idle_timeout == &Duration::ZERO => { @@ -716,6 +727,7 @@ mod tests { use libp2p_core::StreamMuxer; use quickcheck::*; use std::sync::{Arc, Weak}; + use std::time::Instant; use void::Void; #[test] @@ -771,28 +783,6 @@ mod tests { )) } - #[tokio::test] - async fn idle_timeout() { - let idle_timeout = Duration::from_millis(100); - - let mut connection = Connection::new( - StreamMuxerBox::new(PendingStreamMuxer), - dummy::ConnectionHandler, - None, - 0, - idle_timeout, - ); - - assert!(connection.poll_noop_waker().is_pending()); - - tokio::time::sleep(idle_timeout).await; - - assert!(matches!( - connection.poll_noop_waker(), - Poll::Ready(Err(ConnectionError::KeepAliveTimeout)) - )); - } - #[test] fn propagates_changes_to_supported_inbound_protocols() { let mut connection = Connection::new( @@ -883,6 +873,141 @@ mod tests { assert_eq!(connection.handler.remote_removed, vec![vec!["/bar"]]); } + #[tokio::test] + async fn idle_timeout_with_keep_alive_no() { + let idle_timeout = Duration::from_millis(100); + + let mut connection = Connection::new( + StreamMuxerBox::new(PendingStreamMuxer), + dummy::ConnectionHandler, + None, + 0, + idle_timeout, + ); + + assert!(connection.poll_noop_waker().is_pending()); + + tokio::time::sleep(idle_timeout).await; + + assert!(matches!( + connection.poll_noop_waker(), + Poll::Ready(Err(ConnectionError::KeepAliveTimeout)) + )); + } + + #[tokio::test] + async fn idle_timeout_with_keep_alive_until_greater_than_idle_timeout() { + let idle_timeout = Duration::from_millis(100); + + let mut connection = Connection::new( + StreamMuxerBox::new(PendingStreamMuxer), + KeepAliveUntilConnectionHandler { + until: Instant::now() + idle_timeout * 2, + }, + None, + 0, + idle_timeout, + ); + + assert!(connection.poll_noop_waker().is_pending()); + + tokio::time::sleep(idle_timeout).await; + + assert!( + connection.poll_noop_waker().is_pending(), + "`KeepAlive::Until` is greater than idle-timeout, continue sleeping" + ); + + tokio::time::sleep(idle_timeout).await; + + assert!(matches!( + connection.poll_noop_waker(), + Poll::Ready(Err(ConnectionError::KeepAliveTimeout)) + )); + } + + #[tokio::test] + async fn idle_timeout_with_keep_alive_until_less_than_idle_timeout() { + let idle_timeout = Duration::from_millis(100); + + let mut connection = Connection::new( + StreamMuxerBox::new(PendingStreamMuxer), + KeepAliveUntilConnectionHandler { + until: Instant::now() + idle_timeout / 2, + }, + None, + 0, + idle_timeout, + ); + + assert!(connection.poll_noop_waker().is_pending()); + + tokio::time::sleep(idle_timeout / 2).await; + + assert!( + connection.poll_noop_waker().is_pending(), + "`KeepAlive::Until` is less than idle-timeout, honor idle-timeout" + ); + + tokio::time::sleep(idle_timeout / 2).await; + + assert!(matches!( + connection.poll_noop_waker(), + Poll::Ready(Err(ConnectionError::KeepAliveTimeout)) + )); + } + + struct KeepAliveUntilConnectionHandler { + until: Instant, + } + + impl ConnectionHandler for KeepAliveUntilConnectionHandler { + type FromBehaviour = Void; + type ToBehaviour = Void; + type Error = Void; + type InboundProtocol = DeniedUpgrade; + type OutboundProtocol = DeniedUpgrade; + type InboundOpenInfo = (); + type OutboundOpenInfo = Void; + + fn listen_protocol( + &self, + ) -> SubstreamProtocol { + SubstreamProtocol::new(DeniedUpgrade, ()) + } + + fn connection_keep_alive(&self) -> KeepAlive { + KeepAlive::Until(self.until) + } + + fn poll( + &mut self, + _: &mut Context<'_>, + ) -> Poll< + ConnectionHandlerEvent< + Self::OutboundProtocol, + Self::OutboundOpenInfo, + Self::ToBehaviour, + Self::Error, + >, + > { + Poll::Pending + } + + fn on_behaviour_event(&mut self, _: Self::FromBehaviour) {} + + fn on_connection_event( + &mut self, + _: ConnectionEvent< + Self::InboundProtocol, + Self::OutboundProtocol, + Self::InboundOpenInfo, + Self::OutboundOpenInfo, + >, + ) { + } + } + struct DummyStreamMuxer { counter: Arc<()>, } From 21d4d171ab29dbc1442f157683c48177eec54dc6 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Mon, 18 Sep 2023 20:05:09 +1000 Subject: [PATCH 25/26] Update swarm/Cargo.toml --- swarm/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/swarm/Cargo.toml b/swarm/Cargo.toml index b96cd688dfe..86f4c158387 100644 --- a/swarm/Cargo.toml +++ b/swarm/Cargo.toml @@ -45,7 +45,7 @@ env_logger = "0.10" futures = "0.3.28" libp2p-identify = { path = "../protocols/identify" } # Using `path` here because this is a cyclic dev-dependency which otherwise breaks releasing. libp2p-identity = { workspace = true, features = ["ed25519"] } -libp2p-kad = { path = "../protocols/kad" } # Using `path` here because this is a cyclic dev-dependency which otherwise breaks re +libp2p-kad = { path = "../protocols/kad" } # Using `path` here because this is a cyclic dev-dependency which otherwise breaks releasing. libp2p-ping = { path = "../protocols/ping" } # Using `path` here because this is a cyclic dev-dependency which otherwise breaks releasing. libp2p-plaintext = { path = "../transports/plaintext" } # Using `path` here because this is a cyclic dev-dependency which otherwise breaks releasing. libp2p-swarm-derive = { path = "../swarm-derive" } # Using `path` here because this is a cyclic dev-dependency which otherwise breaks releasing. From 4287c500846f6474ef4a496aa3206b15e7f33095 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Wed, 20 Sep 2023 08:18:52 +1000 Subject: [PATCH 26/26] Update webrtc example to remove keep-alive --- examples/browser-webrtc/src/main.rs | 19 ++++++------------- 1 file changed, 6 insertions(+), 13 deletions(-) diff --git a/examples/browser-webrtc/src/main.rs b/examples/browser-webrtc/src/main.rs index f919f047af5..8a4034a436e 100644 --- a/examples/browser-webrtc/src/main.rs +++ b/examples/browser-webrtc/src/main.rs @@ -13,11 +13,12 @@ use libp2p::{ identity, multiaddr::{Multiaddr, Protocol}, ping, - swarm::{keep_alive, NetworkBehaviour, SwarmBuilder, SwarmEvent}, + swarm::{SwarmBuilder, SwarmEvent}, }; use libp2p_webrtc as webrtc; use rand::thread_rng; use std::net::{Ipv4Addr, SocketAddr}; +use std::time::Duration; use tower_http::cors::{Any, CorsLayer}; #[tokio::main] @@ -36,12 +37,10 @@ async fn main() -> anyhow::Result<()> { .map(|(peer_id, conn), _| (peer_id, StreamMuxerBox::new(conn))) .boxed(); - let behaviour = Behaviour { - ping: ping::Behaviour::new(ping::Config::new()), - keep_alive: keep_alive::Behaviour, - }; - - let mut swarm = SwarmBuilder::with_tokio_executor(transport, behaviour, local_peer_id).build(); + let mut swarm = + SwarmBuilder::with_tokio_executor(transport, ping::Behaviour::default(), local_peer_id) + .idle_connection_timeout(Duration::from_secs(30)) // Allows us to observe the pings. + .build(); let address_webrtc = Multiaddr::from(Ipv4Addr::UNSPECIFIED) .with(Protocol::Udp(0)) @@ -84,12 +83,6 @@ async fn main() -> anyhow::Result<()> { Ok(()) } -#[derive(NetworkBehaviour)] -struct Behaviour { - ping: ping::Behaviour, - keep_alive: keep_alive::Behaviour, -} - #[derive(rust_embed::RustEmbed)] #[folder = "$CARGO_MANIFEST_DIR/static"] struct StaticFiles;