From 661717f1ee8a634bb12e09daf4d43d0692225f01 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Thu, 15 Jun 2023 22:33:16 +0200 Subject: [PATCH 01/15] Remove `handler` field from `ConnectionClosed` --- misc/allow-block-list/src/lib.rs | 2 +- misc/connection-limits/src/lib.rs | 2 +- protocols/autonat/src/behaviour.rs | 6 +- protocols/dcutr/src/behaviour_impl.rs | 4 +- protocols/floodsub/src/layer.rs | 4 +- protocols/gossipsub/src/behaviour.rs | 4 +- protocols/gossipsub/src/behaviour/tests.rs | 4 - protocols/identify/src/behaviour.rs | 2 +- protocols/kad/src/behaviour.rs | 4 +- protocols/mdns/src/behaviour.rs | 2 +- protocols/perf/src/client.rs | 3 +- protocols/perf/src/client/behaviour.rs | 2 +- protocols/perf/src/server.rs | 2 +- protocols/perf/src/server/behaviour.rs | 2 +- protocols/ping/src/lib.rs | 2 +- protocols/relay/src/behaviour.rs | 4 +- protocols/relay/src/priv_client.rs | 4 +- protocols/rendezvous/src/client.rs | 2 +- protocols/rendezvous/src/server.rs | 2 +- protocols/request-response/src/lib.rs | 4 +- swarm-derive/src/lib.rs | 19 +--- swarm/CHANGELOG.md | 5 + swarm/src/behaviour.rs | 112 +-------------------- swarm/src/behaviour/either.rs | 12 +-- swarm/src/behaviour/external_addresses.rs | 19 ++-- swarm/src/behaviour/listen_addresses.rs | 7 +- swarm/src/behaviour/toggle.rs | 6 +- swarm/src/connection.rs | 20 +++- swarm/src/connection/pool.rs | 9 +- swarm/src/connection/pool/task.rs | 34 +++++-- swarm/src/dummy.rs | 2 +- swarm/src/handler.rs | 9 ++ swarm/src/keep_alive.rs | 2 +- swarm/src/lib.rs | 2 - swarm/src/test.rs | 8 +- swarm/tests/listener.rs | 2 +- swarm/tests/swarm_derive.rs | 2 +- 37 files changed, 116 insertions(+), 215 deletions(-) diff --git a/misc/allow-block-list/src/lib.rs b/misc/allow-block-list/src/lib.rs index 7aa0dd87822..64c3c9cf8cb 100644 --- a/misc/allow-block-list/src/lib.rs +++ b/misc/allow-block-list/src/lib.rs @@ -231,7 +231,7 @@ where Ok(dummy::ConnectionHandler) } - fn on_swarm_event(&mut self, event: FromSwarm) { + fn on_swarm_event(&mut self, event: FromSwarm) { match event { FromSwarm::ConnectionClosed(_) => {} FromSwarm::ConnectionEstablished(_) => {} diff --git a/misc/connection-limits/src/lib.rs b/misc/connection-limits/src/lib.rs index 52d0aa62c39..acee9c0f8e3 100644 --- a/misc/connection-limits/src/lib.rs +++ b/misc/connection-limits/src/lib.rs @@ -314,7 +314,7 @@ impl NetworkBehaviour for Behaviour { Ok(dummy::ConnectionHandler) } - fn on_swarm_event(&mut self, event: FromSwarm) { + fn on_swarm_event(&mut self, event: FromSwarm) { match event { FromSwarm::ConnectionClosed(ConnectionClosed { peer_id, diff --git a/protocols/autonat/src/behaviour.rs b/protocols/autonat/src/behaviour.rs index 439543f8318..81eb4aacaa8 100644 --- a/protocols/autonat/src/behaviour.rs +++ b/protocols/autonat/src/behaviour.rs @@ -362,16 +362,14 @@ impl Behaviour { peer_id, connection_id, endpoint, - handler, remaining_established, - }: ConnectionClosed<::ConnectionHandler>, + }: ConnectionClosed, ) { self.inner .on_swarm_event(FromSwarm::ConnectionClosed(ConnectionClosed { peer_id, connection_id, endpoint, - handler, remaining_established, })); @@ -536,7 +534,7 @@ impl NetworkBehaviour for Behaviour { .handle_established_outbound_connection(connection_id, peer, addr, role_override) } - fn on_swarm_event(&mut self, event: FromSwarm) { + fn on_swarm_event(&mut self, event: FromSwarm) { self.listen_addresses.on_swarm_event(&event); match event { diff --git a/protocols/dcutr/src/behaviour_impl.rs b/protocols/dcutr/src/behaviour_impl.rs index 4993da655d3..3c7927c31ae 100644 --- a/protocols/dcutr/src/behaviour_impl.rs +++ b/protocols/dcutr/src/behaviour_impl.rs @@ -164,7 +164,7 @@ impl Behaviour { connection_id, endpoint: connected_point, .. - }: ConnectionClosed<::ConnectionHandler>, + }: ConnectionClosed, ) { if !connected_point.is_relayed() { let connections = self @@ -357,7 +357,7 @@ impl NetworkBehaviour for Behaviour { Poll::Pending } - fn on_swarm_event(&mut self, event: FromSwarm) { + fn on_swarm_event(&mut self, event: FromSwarm) { self.external_addresses.on_swarm_event(&event); match event { diff --git a/protocols/floodsub/src/layer.rs b/protocols/floodsub/src/layer.rs index 29fe8ba250f..e7c71c813fa 100644 --- a/protocols/floodsub/src/layer.rs +++ b/protocols/floodsub/src/layer.rs @@ -307,7 +307,7 @@ impl Floodsub { peer_id, remaining_established, .. - }: ConnectionClosed<::ConnectionHandler>, + }: ConnectionClosed, ) { if remaining_established > 0 { // we only care about peer disconnections @@ -478,7 +478,7 @@ impl NetworkBehaviour for Floodsub { Poll::Pending } - fn on_swarm_event(&mut self, event: FromSwarm) { + fn on_swarm_event(&mut self, event: FromSwarm) { match event { FromSwarm::ConnectionEstablished(connection_established) => { self.on_connection_established(connection_established) diff --git a/protocols/gossipsub/src/behaviour.rs b/protocols/gossipsub/src/behaviour.rs index db4ee58864f..3d8869f9739 100644 --- a/protocols/gossipsub/src/behaviour.rs +++ b/protocols/gossipsub/src/behaviour.rs @@ -3135,7 +3135,7 @@ where endpoint, remaining_established, .. - }: ConnectionClosed<::ConnectionHandler>, + }: ConnectionClosed, ) { // Remove IP from peer scoring system if let Some((peer_score, ..)) = &mut self.peer_score { @@ -3484,7 +3484,7 @@ where Poll::Pending } - fn on_swarm_event(&mut self, event: FromSwarm) { + fn on_swarm_event(&mut self, event: FromSwarm) { match event { FromSwarm::ConnectionEstablished(connection_established) => { self.on_connection_established(connection_established) diff --git a/protocols/gossipsub/src/behaviour/tests.rs b/protocols/gossipsub/src/behaviour/tests.rs index 37f25631ae4..7298cd42c9d 100644 --- a/protocols/gossipsub/src/behaviour/tests.rs +++ b/protocols/gossipsub/src/behaviour/tests.rs @@ -21,7 +21,6 @@ // Collection of tests for the gossipsub network behaviour use super::*; -use crate::protocol::ProtocolConfig; use crate::subscription_filter::WhitelistSubscriptionFilter; use crate::transform::{DataTransform, IdentityTransform}; use crate::types::FastMessageId; @@ -272,13 +271,10 @@ where for connection_id in peer_connections.connections.clone() { active_connections = active_connections.checked_sub(1).unwrap(); - let dummy_handler = Handler::new(ProtocolConfig::default(), Duration::ZERO); - gs.on_swarm_event(FromSwarm::ConnectionClosed(ConnectionClosed { peer_id: *peer_id, connection_id, endpoint: &fake_endpoint, - handler: dummy_handler, remaining_established: active_connections, })); } diff --git a/protocols/identify/src/behaviour.rs b/protocols/identify/src/behaviour.rs index f572b937d38..24f14562488 100644 --- a/protocols/identify/src/behaviour.rs +++ b/protocols/identify/src/behaviour.rs @@ -339,7 +339,7 @@ impl NetworkBehaviour for Behaviour { Ok(self.discovered_peers.get(&peer)) } - fn on_swarm_event(&mut self, event: FromSwarm) { + fn on_swarm_event(&mut self, event: FromSwarm) { let listen_addr_changed = self.listen_addresses.on_swarm_event(&event); let external_addr_changed = self.external_addresses.on_swarm_event(&event); diff --git a/protocols/kad/src/behaviour.rs b/protocols/kad/src/behaviour.rs index 7df17c91e1a..f3489515477 100644 --- a/protocols/kad/src/behaviour.rs +++ b/protocols/kad/src/behaviour.rs @@ -1941,7 +1941,7 @@ where remaining_established, connection_id, .. - }: ConnectionClosed<::ConnectionHandler>, + }: ConnectionClosed, ) { self.connections.remove(&connection_id); @@ -2433,7 +2433,7 @@ where } } - fn on_swarm_event(&mut self, event: FromSwarm) { + fn on_swarm_event(&mut self, event: FromSwarm) { self.listen_addresses.on_swarm_event(&event); let external_addresses_changed = self.external_addresses.on_swarm_event(&event); diff --git a/protocols/mdns/src/behaviour.rs b/protocols/mdns/src/behaviour.rs index bc102f832df..da518930945 100644 --- a/protocols/mdns/src/behaviour.rs +++ b/protocols/mdns/src/behaviour.rs @@ -224,7 +224,7 @@ where void::unreachable(ev) } - fn on_swarm_event(&mut self, event: FromSwarm) { + fn on_swarm_event(&mut self, event: FromSwarm) { self.listen_addresses.on_swarm_event(&event); match event { diff --git a/protocols/perf/src/client.rs b/protocols/perf/src/client.rs index 93c2086a49e..03cfe1e117f 100644 --- a/protocols/perf/src/client.rs +++ b/protocols/perf/src/client.rs @@ -155,7 +155,7 @@ impl NetworkBehaviour for Behaviour { ) } - fn on_swarm_event(&mut self, event: FromSwarm) { + fn on_swarm_event(&mut self, event: FromSwarm) { match event { FromSwarm::ConnectionEstablished(ConnectionEstablished { peer_id, .. }) => { self.connected.insert(peer_id); @@ -164,7 +164,6 @@ impl NetworkBehaviour for Behaviour { peer_id, connection_id: _, endpoint: _, - handler: _, remaining_established, }) => { if remaining_established == 0 { diff --git a/protocols/perf/src/client/behaviour.rs b/protocols/perf/src/client/behaviour.rs index 912f6d5bb9e..e48447c91e0 100644 --- a/protocols/perf/src/client/behaviour.rs +++ b/protocols/perf/src/client/behaviour.rs @@ -104,7 +104,7 @@ impl NetworkBehaviour for Behaviour { Ok(Handler::default()) } - fn on_swarm_event(&mut self, event: FromSwarm) { + fn on_swarm_event(&mut self, event: FromSwarm) { match event { FromSwarm::ConnectionEstablished(ConnectionEstablished { peer_id, .. }) => { self.connected.insert(peer_id); diff --git a/protocols/perf/src/server.rs b/protocols/perf/src/server.rs index 79f77c74650..a23f188bbc8 100644 --- a/protocols/perf/src/server.rs +++ b/protocols/perf/src/server.rs @@ -118,7 +118,7 @@ impl NetworkBehaviour for Behaviour { ) } - fn on_swarm_event(&mut self, event: FromSwarm) { + fn on_swarm_event(&mut self, event: FromSwarm) { self.request_response.on_swarm_event(event); } diff --git a/protocols/perf/src/server/behaviour.rs b/protocols/perf/src/server/behaviour.rs index b15cb70110d..b59b027308b 100644 --- a/protocols/perf/src/server/behaviour.rs +++ b/protocols/perf/src/server/behaviour.rs @@ -77,7 +77,7 @@ impl NetworkBehaviour for Behaviour { Ok(Handler::default()) } - fn on_swarm_event(&mut self, event: FromSwarm) { + fn on_swarm_event(&mut self, event: FromSwarm) { match event { FromSwarm::ConnectionEstablished(_) => {} FromSwarm::ConnectionClosed(_) => {} diff --git a/protocols/ping/src/lib.rs b/protocols/ping/src/lib.rs index d1c4a2facaf..b2e8b38abc6 100644 --- a/protocols/ping/src/lib.rs +++ b/protocols/ping/src/lib.rs @@ -153,7 +153,7 @@ impl NetworkBehaviour for Behaviour { } } - fn on_swarm_event(&mut self, event: FromSwarm) { + fn on_swarm_event(&mut self, event: FromSwarm) { match event { FromSwarm::ConnectionEstablished(_) | FromSwarm::ConnectionClosed(_) diff --git a/protocols/relay/src/behaviour.rs b/protocols/relay/src/behaviour.rs index eb2f662581f..451611aac32 100644 --- a/protocols/relay/src/behaviour.rs +++ b/protocols/relay/src/behaviour.rs @@ -219,7 +219,7 @@ impl Behaviour { peer_id, connection_id, .. - }: ConnectionClosed<::ConnectionHandler>, + }: ConnectionClosed, ) { if let hash_map::Entry::Occupied(mut peer) = self.reservations.entry(peer_id) { peer.get_mut().remove(&connection_id); @@ -301,7 +301,7 @@ impl NetworkBehaviour for Behaviour { ))) } - fn on_swarm_event(&mut self, event: FromSwarm) { + fn on_swarm_event(&mut self, event: FromSwarm) { self.external_addresses.on_swarm_event(&event); match event { diff --git a/protocols/relay/src/priv_client.rs b/protocols/relay/src/priv_client.rs index c3c80c5b504..b82fbbacdb6 100644 --- a/protocols/relay/src/priv_client.rs +++ b/protocols/relay/src/priv_client.rs @@ -125,7 +125,7 @@ impl Behaviour { connection_id, endpoint, .. - }: ConnectionClosed<::ConnectionHandler>, + }: ConnectionClosed, ) { if !endpoint.is_relayed() { match self.directly_connected_peers.entry(peer_id) { @@ -193,7 +193,7 @@ impl NetworkBehaviour for Behaviour { Ok(Either::Left(handler)) } - fn on_swarm_event(&mut self, event: FromSwarm) { + fn on_swarm_event(&mut self, event: FromSwarm) { match event { FromSwarm::ConnectionEstablished(ConnectionEstablished { peer_id, diff --git a/protocols/rendezvous/src/client.rs b/protocols/rendezvous/src/client.rs index d1a514f1820..7234575ccb6 100644 --- a/protocols/rendezvous/src/client.rs +++ b/protocols/rendezvous/src/client.rs @@ -293,7 +293,7 @@ impl NetworkBehaviour for Behaviour { Poll::Pending } - fn on_swarm_event(&mut self, event: FromSwarm) { + fn on_swarm_event(&mut self, event: FromSwarm) { self.external_addresses.on_swarm_event(&event); match event { diff --git a/protocols/rendezvous/src/server.rs b/protocols/rendezvous/src/server.rs index 6d64938ca3d..9cd7fb9ce27 100644 --- a/protocols/rendezvous/src/server.rs +++ b/protocols/rendezvous/src/server.rs @@ -179,7 +179,7 @@ impl NetworkBehaviour for Behaviour { Poll::Pending } - fn on_swarm_event(&mut self, event: FromSwarm) { + fn on_swarm_event(&mut self, event: FromSwarm) { match event { FromSwarm::ConnectionEstablished(_) | FromSwarm::ConnectionClosed(_) diff --git a/protocols/request-response/src/lib.rs b/protocols/request-response/src/lib.rs index 7b1a8088443..725f469d92f 100644 --- a/protocols/request-response/src/lib.rs +++ b/protocols/request-response/src/lib.rs @@ -644,7 +644,7 @@ where connection_id, remaining_established, .. - }: ConnectionClosed<::ConnectionHandler>, + }: ConnectionClosed, ) { let connections = self .connected @@ -765,7 +765,7 @@ where )) } - fn on_swarm_event(&mut self, event: FromSwarm) { + fn on_swarm_event(&mut self, event: FromSwarm) { match event { FromSwarm::ConnectionEstablished(connection_established) => { self.on_connection_established(connection_established) diff --git a/swarm-derive/src/lib.rs b/swarm-derive/src/lib.rs index e54cd058daf..8ddc2636c09 100644 --- a/swarm-derive/src/lib.rs +++ b/swarm-derive/src/lib.rs @@ -286,26 +286,15 @@ fn build_struct(ast: &DeriveInput, data_struct: &DataStruct) -> syn::Result quote! { self.#i.on_swarm_event(#from_swarm::ConnectionClosed(#connection_closed { peer_id, connection_id, endpoint, - handler, remaining_established, })); }, @@ -314,14 +303,12 @@ fn build_struct(ast: &DeriveInput, data_struct: &DataStruct) -> syn::Result syn::Result) { + fn on_swarm_event(&mut self, event: #from_swarm) { match event { #from_swarm::ConnectionEstablished( #connection_established { peer_id, connection_id, endpoint, failed_addresses, other_established }) @@ -851,7 +838,7 @@ fn build_struct(ast: &DeriveInput, data_struct: &DataStruct) -> syn::Result { #(#on_address_change_stmts)* } #from_swarm::ConnectionClosed( - #connection_closed { peer_id, connection_id, endpoint, handler: handlers, remaining_established }) + #connection_closed { peer_id, connection_id, endpoint, remaining_established }) => { #(#on_connection_closed_stmts)* } #from_swarm::DialFailure( #dial_failure { peer_id, connection_id, error }) diff --git a/swarm/CHANGELOG.md b/swarm/CHANGELOG.md index 13147908721..3e2de9851c8 100644 --- a/swarm/CHANGELOG.md +++ b/swarm/CHANGELOG.md @@ -66,6 +66,10 @@ - Add ability to `downcast_ref` ConnectionDenied errors. See [PR 4020]. +- Remove `handler` field from `ConnectionClosed`. + If you need to transfer state from a `ConnectionHandler` to its `NetworkBehaviour` when a connection closes, use `ConnectionHandler::poll_close`. + See [PR XXXX. + [PR 3292]: https://github.com/libp2p/rust-libp2p/pull/3292 [PR 3605]: https://github.com/libp2p/rust-libp2p/pull/3605 [PR 3651]: https://github.com/libp2p/rust-libp2p/pull/3651 @@ -84,6 +88,7 @@ [PR 3956]: https://github.com/libp2p/rust-libp2p/pull/3956 [PR 4020]: https://github.com/libp2p/rust-libp2p/pull/4020 [PR 4037]: https://github.com/libp2p/rust-libp2p/pull/4037 +[PR XXXX]: https://github.com/libp2p/rust-libp2p/pull/XXXX ## 0.42.2 diff --git a/swarm/src/behaviour.rs b/swarm/src/behaviour.rs index 0615457291a..50b1e7c7c1d 100644 --- a/swarm/src/behaviour.rs +++ b/swarm/src/behaviour.rs @@ -191,7 +191,7 @@ pub trait NetworkBehaviour: 'static { ) -> Result, ConnectionDenied>; /// Informs the behaviour about an event from the [`Swarm`](crate::Swarm). - fn on_swarm_event(&mut self, event: FromSwarm); + fn on_swarm_event(&mut self, event: FromSwarm); /// Informs the behaviour about an event generated by the [`ConnectionHandler`](crate::ConnectionHandler) /// dedicated to the peer identified by `peer_id`. for the behaviour. @@ -408,7 +408,7 @@ pub enum CloseConnection { /// Enumeration with the list of the possible events /// to pass to [`on_swarm_event`](NetworkBehaviour::on_swarm_event). -pub enum FromSwarm<'a, Handler> { +pub enum FromSwarm<'a> { /// Informs the behaviour about a newly established connection to a peer. ConnectionEstablished(ConnectionEstablished<'a>), /// Informs the behaviour about a closed connection to a peer. @@ -416,7 +416,7 @@ pub enum FromSwarm<'a, Handler> { /// This event is always paired with an earlier /// [`FromSwarm::ConnectionEstablished`] with the same peer ID, connection ID /// and endpoint. - ConnectionClosed(ConnectionClosed<'a, Handler>), + ConnectionClosed(ConnectionClosed<'a>), /// Informs the behaviour that the [`ConnectedPoint`] of an existing /// connection has changed. AddressChange(AddressChange<'a>), @@ -464,11 +464,10 @@ pub struct ConnectionEstablished<'a> { /// This event is always paired with an earlier /// [`FromSwarm::ConnectionEstablished`] with the same peer ID, connection ID /// and endpoint. -pub struct ConnectionClosed<'a, Handler> { +pub struct ConnectionClosed<'a> { pub peer_id: PeerId, pub connection_id: ConnectionId, pub endpoint: &'a ConnectedPoint, - pub handler: Handler, pub remaining_established: usize, } @@ -559,106 +558,3 @@ pub struct ExternalAddrConfirmed<'a> { pub struct ExternalAddrExpired<'a> { pub addr: &'a Multiaddr, } - -impl<'a, Handler> FromSwarm<'a, Handler> { - fn map_handler( - self, - map_handler: impl FnOnce(Handler) -> NewHandler, - ) -> FromSwarm<'a, NewHandler> { - self.maybe_map_handler(|h| Some(map_handler(h))) - .expect("To return Some as all closures return Some.") - } - - fn maybe_map_handler( - self, - map_handler: impl FnOnce(Handler) -> Option, - ) -> Option> { - match self { - FromSwarm::ConnectionClosed(ConnectionClosed { - peer_id, - connection_id, - endpoint, - handler, - remaining_established, - }) => Some(FromSwarm::ConnectionClosed(ConnectionClosed { - peer_id, - connection_id, - endpoint, - handler: map_handler(handler)?, - remaining_established, - })), - FromSwarm::ConnectionEstablished(ConnectionEstablished { - peer_id, - connection_id, - endpoint, - failed_addresses, - other_established, - }) => Some(FromSwarm::ConnectionEstablished(ConnectionEstablished { - peer_id, - connection_id, - endpoint, - failed_addresses, - other_established, - })), - FromSwarm::AddressChange(AddressChange { - peer_id, - connection_id, - old, - new, - }) => Some(FromSwarm::AddressChange(AddressChange { - peer_id, - connection_id, - old, - new, - })), - FromSwarm::DialFailure(DialFailure { - peer_id, - error, - connection_id, - }) => Some(FromSwarm::DialFailure(DialFailure { - peer_id, - error, - connection_id, - })), - FromSwarm::ListenFailure(ListenFailure { - local_addr, - send_back_addr, - connection_id, - error, - }) => Some(FromSwarm::ListenFailure(ListenFailure { - local_addr, - send_back_addr, - connection_id, - error, - })), - FromSwarm::NewListener(NewListener { listener_id }) => { - Some(FromSwarm::NewListener(NewListener { listener_id })) - } - FromSwarm::NewListenAddr(NewListenAddr { listener_id, addr }) => { - Some(FromSwarm::NewListenAddr(NewListenAddr { - listener_id, - addr, - })) - } - FromSwarm::ExpiredListenAddr(ExpiredListenAddr { listener_id, addr }) => { - Some(FromSwarm::ExpiredListenAddr(ExpiredListenAddr { - listener_id, - addr, - })) - } - FromSwarm::ListenerError(ListenerError { listener_id, err }) => { - Some(FromSwarm::ListenerError(ListenerError { listener_id, err })) - } - FromSwarm::ListenerClosed(ListenerClosed { - listener_id, - reason, - }) => Some(FromSwarm::ListenerClosed(ListenerClosed { - listener_id, - reason, - })), - FromSwarm::NewExternalAddrCandidate(e) => Some(FromSwarm::NewExternalAddrCandidate(e)), - FromSwarm::ExternalAddrExpired(e) => Some(FromSwarm::ExternalAddrExpired(e)), - FromSwarm::ExternalAddrConfirmed(e) => Some(FromSwarm::ExternalAddrConfirmed(e)), - } - } -} diff --git a/swarm/src/behaviour/either.rs b/swarm/src/behaviour/either.rs index c6e0870d11c..3de861e3bb4 100644 --- a/swarm/src/behaviour/either.rs +++ b/swarm/src/behaviour/either.rs @@ -122,16 +122,10 @@ where Ok(handler) } - fn on_swarm_event(&mut self, event: behaviour::FromSwarm) { + fn on_swarm_event(&mut self, event: behaviour::FromSwarm) { match self { - Either::Left(b) => b.on_swarm_event(event.map_handler(|h| match h { - Either::Left(h) => h, - Either::Right(_) => unreachable!(), - })), - Either::Right(b) => b.on_swarm_event(event.map_handler(|h| match h { - Either::Right(h) => h, - Either::Left(_) => unreachable!(), - })), + Either::Left(b) => b.on_swarm_event(event), + Either::Right(b) => b.on_swarm_event(event), } } diff --git a/swarm/src/behaviour/external_addresses.rs b/swarm/src/behaviour/external_addresses.rs index 307f0f938dd..14cdb301fbd 100644 --- a/swarm/src/behaviour/external_addresses.rs +++ b/swarm/src/behaviour/external_addresses.rs @@ -25,7 +25,7 @@ impl ExternalAddresses { /// Feed a [`FromSwarm`] event to this struct. /// /// Returns whether the event changed our set of external addresses. - pub fn on_swarm_event(&mut self, event: &FromSwarm) -> bool { + pub fn on_swarm_event(&mut self, event: &FromSwarm) -> bool { match event { FromSwarm::ExternalAddrConfirmed(ExternalAddrConfirmed { addr }) => { if let Some(pos) = self @@ -81,7 +81,6 @@ impl ExternalAddresses { #[cfg(test)] mod tests { use super::*; - use crate::dummy; use libp2p_core::multiaddr::Protocol; use once_cell::sync::Lazy; use rand::Rng; @@ -129,13 +128,9 @@ mod tests { while addresses.as_slice().len() < MAX_LOCAL_EXTERNAL_ADDRS { let random_address = Multiaddr::empty().with(Protocol::Memory(rand::thread_rng().gen_range(0..1000))); - addresses.on_swarm_event( - &FromSwarm::<'_, dummy::ConnectionHandler>::ExternalAddrConfirmed( - ExternalAddrConfirmed { - addr: &random_address, - }, - ), - ); + addresses.on_swarm_event(&FromSwarm::ExternalAddrConfirmed(ExternalAddrConfirmed { + addr: &random_address, + })); } addresses.on_swarm_event(&new_external_addr2()); @@ -158,19 +153,19 @@ mod tests { ); } - fn new_external_addr1() -> FromSwarm<'static, dummy::ConnectionHandler> { + fn new_external_addr1() -> FromSwarm<'static> { FromSwarm::ExternalAddrConfirmed(ExternalAddrConfirmed { addr: &MEMORY_ADDR_1000, }) } - fn new_external_addr2() -> FromSwarm<'static, dummy::ConnectionHandler> { + fn new_external_addr2() -> FromSwarm<'static> { FromSwarm::ExternalAddrConfirmed(ExternalAddrConfirmed { addr: &MEMORY_ADDR_2000, }) } - fn expired_external_addr1() -> FromSwarm<'static, dummy::ConnectionHandler> { + fn expired_external_addr1() -> FromSwarm<'static> { FromSwarm::ExternalAddrExpired(ExternalAddrExpired { addr: &MEMORY_ADDR_1000, }) diff --git a/swarm/src/behaviour/listen_addresses.rs b/swarm/src/behaviour/listen_addresses.rs index 8882db64a50..6076f5e7923 100644 --- a/swarm/src/behaviour/listen_addresses.rs +++ b/swarm/src/behaviour/listen_addresses.rs @@ -17,7 +17,7 @@ impl ListenAddresses { /// Feed a [`FromSwarm`] event to this struct. /// /// Returns whether the event changed our set of listen addresses. - pub fn on_swarm_event(&mut self, event: &FromSwarm) -> bool { + pub fn on_swarm_event(&mut self, event: &FromSwarm) -> bool { match event { FromSwarm::NewListenAddr(NewListenAddr { addr, .. }) => { self.addresses.insert((*addr).clone()) @@ -33,7 +33,6 @@ impl ListenAddresses { #[cfg(test)] mod tests { use super::*; - use crate::dummy; use libp2p_core::{multiaddr::Protocol, transport::ListenerId}; use once_cell::sync::Lazy; @@ -60,14 +59,14 @@ mod tests { assert!(!changed) } - fn new_listen_addr() -> FromSwarm<'static, dummy::ConnectionHandler> { + fn new_listen_addr() -> FromSwarm<'static> { FromSwarm::NewListenAddr(NewListenAddr { listener_id: ListenerId::next(), addr: &MEMORY_ADDR, }) } - fn expired_listen_addr() -> FromSwarm<'static, dummy::ConnectionHandler> { + fn expired_listen_addr() -> FromSwarm<'static> { FromSwarm::ExpiredListenAddr(ExpiredListenAddr { listener_id: ListenerId::next(), addr: &MEMORY_ADDR, diff --git a/swarm/src/behaviour/toggle.rs b/swarm/src/behaviour/toggle.rs index 92bd8963502..f2ed999583f 100644 --- a/swarm/src/behaviour/toggle.rs +++ b/swarm/src/behaviour/toggle.rs @@ -159,11 +159,9 @@ where }) } - fn on_swarm_event(&mut self, event: FromSwarm) { + fn on_swarm_event(&mut self, event: FromSwarm) { if let Some(behaviour) = &mut self.inner { - if let Some(event) = event.maybe_map_handler(|h| h.inner) { - behaviour.on_swarm_event(event); - } + behaviour.on_swarm_event(event); } } diff --git a/swarm/src/connection.rs b/swarm/src/connection.rs index 6646967f590..6f6695c5e76 100644 --- a/swarm/src/connection.rs +++ b/swarm/src/connection.rs @@ -41,8 +41,8 @@ use crate::{ }; use futures::future::BoxFuture; use futures::stream::FuturesUnordered; -use futures::FutureExt; use futures::StreamExt; +use futures::{stream, FutureExt}; use futures_timer::Delay; use instant::Instant; use libp2p_core::connection::ConnectedPoint; @@ -206,8 +206,22 @@ where /// Begins an orderly shutdown of the connection, returning the connection /// handler and a `Future` that resolves when connection shutdown is complete. - pub(crate) fn close(self) -> (THandler, impl Future>) { - (self.handler, self.muxing.close()) + pub(crate) fn close( + self, + ) -> ( + impl futures::Stream, + impl Future>, + ) { + let Connection { + mut handler, + muxing, + .. + } = self; + + ( + stream::poll_fn(move |cx| handler.poll_close(cx)), + muxing.close(), + ) } /// Polls the handler and the substream, forwarding events from the former to the latter and diff --git a/swarm/src/connection/pool.rs b/swarm/src/connection/pool.rs index e9f7504f529..59b2758c7ad 100644 --- a/swarm/src/connection/pool.rs +++ b/swarm/src/connection/pool.rs @@ -257,7 +257,6 @@ pub(crate) enum PoolEvent { error: Option>, /// The remaining established connections to the same peer. remaining_established_connection_ids: Vec, - handler: THandler, }, /// An outbound connection attempt failed. @@ -571,12 +570,7 @@ where old_endpoint, }); } - Poll::Ready(Some(task::EstablishedConnectionEvent::Closed { - id, - peer_id, - error, - handler, - })) => { + Poll::Ready(Some(task::EstablishedConnectionEvent::Closed { id, peer_id, error })) => { let connections = self .established .get_mut(&peer_id) @@ -594,7 +588,6 @@ where connected: Connected { endpoint, peer_id }, error, remaining_established_connection_ids, - handler, }); } } diff --git a/swarm/src/connection/pool/task.rs b/swarm/src/connection/pool/task.rs index 175da668bda..bc925da6153 100644 --- a/swarm/src/connection/pool/task.rs +++ b/swarm/src/connection/pool/task.rs @@ -32,6 +32,7 @@ use crate::{ }; use futures::{ channel::{mpsc, oneshot}, + future, future::{poll_fn, Either, Future}, SinkExt, StreamExt, }; @@ -87,7 +88,6 @@ pub(crate) enum EstablishedConnectionEvent { id: ConnectionId, peer_id: PeerId, error: Option>, - handler: THandler, }, } @@ -187,15 +187,27 @@ pub(crate) async fn new_for_established_connection( Command::NotifyHandler(event) => connection.on_behaviour_event(event), Command::Close => { command_receiver.close(); - let (handler, closing_muxer) = connection.close(); + let (remaining_events, closing_muxer) = connection.close(); + + let (_, muxer_close_error) = future::join( + events.send_all(&mut remaining_events.map(|event| { + Ok(EstablishedConnectionEvent::Notify { + id: connection_id, + event, + peer_id, + }) + })), + closing_muxer, + ) + .await; + + let error = muxer_close_error.err().map(ConnectionError::IO); - let error = closing_muxer.await.err().map(ConnectionError::IO); let _ = events .send(EstablishedConnectionEvent::Closed { id: connection_id, peer_id, error, - handler, }) .await; return; @@ -227,14 +239,24 @@ pub(crate) async fn new_for_established_connection( } Err(error) => { command_receiver.close(); - let (handler, _closing_muxer) = connection.close(); + let (remaining_events, _closing_muxer) = connection.close(); + + let _ = events + .send_all(&mut remaining_events.map(|event| { + Ok(EstablishedConnectionEvent::Notify { + id: connection_id, + event, + peer_id, + }) + })) + .await; + // Terminate the task with the error, dropping the connection. let _ = events .send(EstablishedConnectionEvent::Closed { id: connection_id, peer_id, error: Some(error), - handler, }) .await; return; diff --git a/swarm/src/dummy.rs b/swarm/src/dummy.rs index 6810abec591..4e6f92d21f6 100644 --- a/swarm/src/dummy.rs +++ b/swarm/src/dummy.rs @@ -58,7 +58,7 @@ impl NetworkBehaviour for Behaviour { Poll::Pending } - fn on_swarm_event(&mut self, event: FromSwarm) { + fn on_swarm_event(&mut self, event: FromSwarm) { match event { FromSwarm::ConnectionEstablished(_) | FromSwarm::ConnectionClosed(_) diff --git a/swarm/src/handler.rs b/swarm/src/handler.rs index dcc7ab1c09d..811faec1c86 100644 --- a/swarm/src/handler.rs +++ b/swarm/src/handler.rs @@ -158,6 +158,15 @@ pub trait ConnectionHandler: Send + 'static { >, >; + /// Gracefully close the [`ConnectionHandler`]. + /// + /// Implementations may transfer one or more events to their [`NetworkBehaviour`] implementation + /// by emitting [`Poll::Ready`] with [`Self::ToBehaviour`]. Implementations should eventually + /// return [`Poll::Ready(None)`] to signal successful closing. + fn poll_close(&mut self, _: &mut Context<'_>) -> Poll> { + Poll::Ready(None) + } + /// Adds a closure that turns the input event into something else. fn map_in_event(self, map: TMap) -> MapInEvent where diff --git a/swarm/src/keep_alive.rs b/swarm/src/keep_alive.rs index 05cbcdf7b8c..4b43454bb0b 100644 --- a/swarm/src/keep_alive.rs +++ b/swarm/src/keep_alive.rs @@ -61,7 +61,7 @@ impl NetworkBehaviour for Behaviour { Poll::Pending } - fn on_swarm_event(&mut self, event: FromSwarm) { + fn on_swarm_event(&mut self, event: FromSwarm) { match event { FromSwarm::ConnectionEstablished(_) | FromSwarm::ConnectionClosed(_) diff --git a/swarm/src/lib.rs b/swarm/src/lib.rs index 9b739f33780..990f443f849 100644 --- a/swarm/src/lib.rs +++ b/swarm/src/lib.rs @@ -827,7 +827,6 @@ where connected, error, remaining_established_connection_ids, - handler, .. } => { if let Some(error) = error.as_ref() { @@ -854,7 +853,6 @@ where peer_id, connection_id: id, endpoint: &endpoint, - handler, remaining_established: num_established as usize, })); return Some(SwarmEvent::ConnectionClosed { diff --git a/swarm/src/test.rs b/swarm/src/test.rs index 6f39d56da91..30a9e7636c4 100644 --- a/swarm/src/test.rs +++ b/swarm/src/test.rs @@ -118,7 +118,7 @@ where self.next_action.take().map_or(Poll::Pending, Poll::Ready) } - fn on_swarm_event(&mut self, event: FromSwarm) { + fn on_swarm_event(&mut self, event: FromSwarm) { match event { FromSwarm::ConnectionEstablished(_) | FromSwarm::ConnectionClosed(_) @@ -320,9 +320,8 @@ where peer_id, connection_id, endpoint, - handler, remaining_established, - }: ConnectionClosed<::ConnectionHandler>, + }: ConnectionClosed, ) { let mut other_closed_connections = self .on_connection_established @@ -370,7 +369,6 @@ where peer_id, connection_id, endpoint, - handler, remaining_established, })); } @@ -458,7 +456,7 @@ where .handle_established_outbound_connection(connection_id, peer, addr, role_override) } - fn on_swarm_event(&mut self, event: FromSwarm) { + fn on_swarm_event(&mut self, event: FromSwarm) { match event { FromSwarm::ConnectionEstablished(connection_established) => { self.on_connection_established(connection_established) diff --git a/swarm/tests/listener.rs b/swarm/tests/listener.rs index 71d92cb0e1f..98f55800a75 100644 --- a/swarm/tests/listener.rs +++ b/swarm/tests/listener.rs @@ -105,7 +105,7 @@ impl NetworkBehaviour for Behaviour { ) { } - fn on_swarm_event(&mut self, event: FromSwarm) { + fn on_swarm_event(&mut self, event: FromSwarm) { match event { FromSwarm::NewListener(NewListener { listener_id }) => { assert!(self.listeners.contains(&listener_id)); diff --git a/swarm/tests/swarm_derive.rs b/swarm/tests/swarm_derive.rs index fa3f6c69dd0..15f3357a930 100644 --- a/swarm/tests/swarm_derive.rs +++ b/swarm/tests/swarm_derive.rs @@ -506,7 +506,7 @@ fn custom_out_event_no_type_parameters() { Poll::Pending } - fn on_swarm_event(&mut self, event: FromSwarm) { + fn on_swarm_event(&mut self, event: FromSwarm) { match event { FromSwarm::ConnectionEstablished(_) | FromSwarm::ConnectionClosed(_) From 93ffdddd47edb8ecb58e3ab6c93ae04e74c67c06 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Thu, 15 Jun 2023 22:35:46 +0200 Subject: [PATCH 02/15] Fill in PR number --- swarm/CHANGELOG.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/swarm/CHANGELOG.md b/swarm/CHANGELOG.md index 3e2de9851c8..fa05c573ad0 100644 --- a/swarm/CHANGELOG.md +++ b/swarm/CHANGELOG.md @@ -68,7 +68,7 @@ - Remove `handler` field from `ConnectionClosed`. If you need to transfer state from a `ConnectionHandler` to its `NetworkBehaviour` when a connection closes, use `ConnectionHandler::poll_close`. - See [PR XXXX. + See [PR 4076. [PR 3292]: https://github.com/libp2p/rust-libp2p/pull/3292 [PR 3605]: https://github.com/libp2p/rust-libp2p/pull/3605 @@ -88,7 +88,7 @@ [PR 3956]: https://github.com/libp2p/rust-libp2p/pull/3956 [PR 4020]: https://github.com/libp2p/rust-libp2p/pull/4020 [PR 4037]: https://github.com/libp2p/rust-libp2p/pull/4037 -[PR XXXX]: https://github.com/libp2p/rust-libp2p/pull/XXXX +[PR 4076]: https://github.com/libp2p/rust-libp2p/pull/4076 ## 0.42.2 From 36fa04b9b00f2c2a77a5a9de31de250df2509dab Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Thu, 15 Jun 2023 22:37:20 +0200 Subject: [PATCH 03/15] Fix docs --- swarm/src/connection.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/swarm/src/connection.rs b/swarm/src/connection.rs index 6f6695c5e76..85944100381 100644 --- a/swarm/src/connection.rs +++ b/swarm/src/connection.rs @@ -204,8 +204,7 @@ where self.handler.on_behaviour_event(event); } - /// Begins an orderly shutdown of the connection, returning the connection - /// handler and a `Future` that resolves when connection shutdown is complete. + /// Begins an orderly shutdown of the connection, returning a stream of final events a `Future` that resolves when connection shutdown is complete. pub(crate) fn close( self, ) -> ( From 852c798c9fcb4282df42348980d9571cea82a207 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Thu, 15 Jun 2023 22:53:21 +0200 Subject: [PATCH 04/15] Add test --- swarm/tests/connection_close.rs | 159 ++++++++++++++++++++++++++++++++ 1 file changed, 159 insertions(+) create mode 100644 swarm/tests/connection_close.rs diff --git a/swarm/tests/connection_close.rs b/swarm/tests/connection_close.rs new file mode 100644 index 00000000000..4862f143858 --- /dev/null +++ b/swarm/tests/connection_close.rs @@ -0,0 +1,159 @@ +use libp2p_core::upgrade::DeniedUpgrade; +use libp2p_core::{Endpoint, Multiaddr}; +use libp2p_identity::PeerId; +use libp2p_swarm::handler::ConnectionEvent; +use libp2p_swarm::{ + ConnectionDenied, ConnectionHandler, ConnectionHandlerEvent, ConnectionId, FromSwarm, + KeepAlive, NetworkBehaviour, PollParameters, SubstreamProtocol, Swarm, SwarmEvent, THandler, + THandlerInEvent, THandlerOutEvent, ToSwarm, +}; +use libp2p_swarm_test::SwarmExt; +use std::task::{Context, Poll}; +use void::Void; + +#[async_std::test] +async fn sends_remaining_events_to_behaviour_on_connection_close() { + let mut swarm1 = Swarm::new_ephemeral(|_| Behaviour::new(3)); + let mut swarm2 = Swarm::new_ephemeral(|_| Behaviour::new(3)); + + swarm2.listen().await; + swarm1.connect(&mut swarm2).await; + + swarm1.disconnect_peer_id(*swarm2.local_peer_id()).unwrap(); + + match libp2p_swarm_test::drive(&mut swarm1, &mut swarm2).await { + ([SwarmEvent::ConnectionClosed { .. }], [_, SwarmEvent::ConnectionClosed { .. }]) => { + assert_eq!(swarm1.behaviour().state, 0); + assert_eq!(swarm2.behaviour().state, 0); + } + (e1, e2) => panic!("Unexpected events: {:?} {:?}", e1, e2), + } +} + +struct HandlerWithState { + precious_state: u64, +} + +struct Behaviour { + state: u64, +} + +impl Behaviour { + fn new(state: u64) -> Self { + Behaviour { state } + } +} + +impl NetworkBehaviour for Behaviour { + type ConnectionHandler = HandlerWithState; + type ToSwarm = (); + + fn handle_established_inbound_connection( + &mut self, + _: ConnectionId, + _: PeerId, + _: &Multiaddr, + _: &Multiaddr, + ) -> Result, ConnectionDenied> { + Ok(HandlerWithState { + precious_state: self.state, + }) + } + + fn handle_established_outbound_connection( + &mut self, + _: ConnectionId, + _: PeerId, + _: &Multiaddr, + _: Endpoint, + ) -> Result, ConnectionDenied> { + Ok(HandlerWithState { + precious_state: self.state, + }) + } + + fn on_swarm_event(&mut self, event: FromSwarm) { + match event { + FromSwarm::ConnectionClosed(_) => { + assert_eq!(self.state, 0); + } + _ => {} + } + } + + fn on_connection_handler_event( + &mut self, + _peer_id: PeerId, + _connection_id: ConnectionId, + event: THandlerOutEvent, + ) { + assert_eq!(self.state, event); + self.state -= 1; + } + + fn poll( + &mut self, + _: &mut Context<'_>, + _: &mut impl PollParameters, + ) -> Poll>> { + Poll::Pending + } +} + +impl ConnectionHandler for HandlerWithState { + type FromBehaviour = Void; + type ToBehaviour = u64; + type Error = Void; + type InboundProtocol = DeniedUpgrade; + type OutboundProtocol = DeniedUpgrade; + type InboundOpenInfo = (); + type OutboundOpenInfo = (); + + fn listen_protocol(&self) -> SubstreamProtocol { + SubstreamProtocol::new(DeniedUpgrade, ()) + } + + fn connection_keep_alive(&self) -> KeepAlive { + KeepAlive::Yes + } + + fn poll( + &mut self, + _: &mut Context<'_>, + ) -> Poll< + ConnectionHandlerEvent< + Self::OutboundProtocol, + Self::OutboundOpenInfo, + Self::ToBehaviour, + Self::Error, + >, + > { + Poll::Pending + } + + fn poll_close(&mut self, _: &mut Context<'_>) -> Poll> { + if self.precious_state > 0 { + let state = self.precious_state; + self.precious_state -= 1; + + return Poll::Ready(Some(state)); + } + + Poll::Ready(None) + } + + fn on_behaviour_event(&mut self, event: Self::FromBehaviour) { + void::unreachable(event) + } + + fn on_connection_event( + &mut self, + _: ConnectionEvent< + Self::InboundProtocol, + Self::OutboundProtocol, + Self::InboundOpenInfo, + Self::OutboundOpenInfo, + >, + ) { + } +} From 7d69e1017a785a28732ff6b02dfafd0718828909 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Thu, 15 Jun 2023 22:54:19 +0200 Subject: [PATCH 05/15] Fix clippy warnings --- protocols/rendezvous/src/client.rs | 2 +- swarm/tests/connection_close.rs | 7 ++----- 2 files changed, 3 insertions(+), 6 deletions(-) diff --git a/protocols/rendezvous/src/client.rs b/protocols/rendezvous/src/client.rs index 363e31965eb..a7073142f54 100644 --- a/protocols/rendezvous/src/client.rs +++ b/protocols/rendezvous/src/client.rs @@ -226,7 +226,7 @@ impl NetworkBehaviour for Behaviour { .on_connection_handler_event(peer_id, connection_id, event); } - fn on_swarm_event(&mut self, event: FromSwarm) { + fn on_swarm_event(&mut self, event: FromSwarm) { self.external_addresses.on_swarm_event(&event); self.inner.on_swarm_event(event); diff --git a/swarm/tests/connection_close.rs b/swarm/tests/connection_close.rs index 4862f143858..763e10d2cea 100644 --- a/swarm/tests/connection_close.rs +++ b/swarm/tests/connection_close.rs @@ -73,11 +73,8 @@ impl NetworkBehaviour for Behaviour { } fn on_swarm_event(&mut self, event: FromSwarm) { - match event { - FromSwarm::ConnectionClosed(_) => { - assert_eq!(self.state, 0); - } - _ => {} + if let FromSwarm::ConnectionClosed(_) = event { + assert_eq!(self.state, 0); } } From b5573d566be37934472cbc08406f65bd68e0f8bd Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Thu, 15 Jun 2023 22:55:47 +0200 Subject: [PATCH 06/15] Update swarm/src/connection.rs --- swarm/src/connection.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/swarm/src/connection.rs b/swarm/src/connection.rs index 85944100381..a2303cfdf47 100644 --- a/swarm/src/connection.rs +++ b/swarm/src/connection.rs @@ -204,7 +204,7 @@ where self.handler.on_behaviour_event(event); } - /// Begins an orderly shutdown of the connection, returning a stream of final events a `Future` that resolves when connection shutdown is complete. + /// Begins an orderly shutdown of the connection, returning a stream of final events and a `Future` that resolves when connection shutdown is complete. pub(crate) fn close( self, ) -> ( From 889ee4ec538854b264bea744fc0dc8d776489aa3 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Mon, 19 Jun 2023 16:28:50 +0200 Subject: [PATCH 07/15] Fix doc string --- swarm/src/handler.rs | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/swarm/src/handler.rs b/swarm/src/handler.rs index 811faec1c86..5f569663399 100644 --- a/swarm/src/handler.rs +++ b/swarm/src/handler.rs @@ -160,9 +160,8 @@ pub trait ConnectionHandler: Send + 'static { /// Gracefully close the [`ConnectionHandler`]. /// - /// Implementations may transfer one or more events to their [`NetworkBehaviour`] implementation - /// by emitting [`Poll::Ready`] with [`Self::ToBehaviour`]. Implementations should eventually - /// return [`Poll::Ready(None)`] to signal successful closing. + /// The contract for this function is equivalent to a [`Stream`](futures::Stream). + /// It will be polled to completion _in parallel_ with closing the underlying physical connection. fn poll_close(&mut self, _: &mut Context<'_>) -> Poll> { Poll::Ready(None) } From 70092a3d1c15460f62e3dfb1988bbd6750a4f66e Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Mon, 19 Jun 2023 16:31:57 +0200 Subject: [PATCH 08/15] Document IO behaviour --- swarm/src/handler.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/swarm/src/handler.rs b/swarm/src/handler.rs index 5f569663399..953689bff62 100644 --- a/swarm/src/handler.rs +++ b/swarm/src/handler.rs @@ -162,6 +162,9 @@ pub trait ConnectionHandler: Send + 'static { /// /// The contract for this function is equivalent to a [`Stream`](futures::Stream). /// It will be polled to completion _in parallel_ with closing the underlying physical connection. + /// Performing IO on the connection will likely fail. + /// + /// To signal completion, [`Poll::Ready(None)`] should be returned. fn poll_close(&mut self, _: &mut Context<'_>) -> Poll> { Poll::Ready(None) } From 092d6af18ab85c6445d64b1e2097512e7d5489e4 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Mon, 19 Jun 2023 16:36:42 +0200 Subject: [PATCH 09/15] Fix failing test --- swarm/tests/connection_close.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/swarm/tests/connection_close.rs b/swarm/tests/connection_close.rs index 763e10d2cea..e734352c633 100644 --- a/swarm/tests/connection_close.rs +++ b/swarm/tests/connection_close.rs @@ -22,7 +22,7 @@ async fn sends_remaining_events_to_behaviour_on_connection_close() { swarm1.disconnect_peer_id(*swarm2.local_peer_id()).unwrap(); match libp2p_swarm_test::drive(&mut swarm1, &mut swarm2).await { - ([SwarmEvent::ConnectionClosed { .. }], [_, SwarmEvent::ConnectionClosed { .. }]) => { + ([SwarmEvent::ConnectionClosed { .. }], [SwarmEvent::ConnectionClosed { .. }]) => { assert_eq!(swarm1.behaviour().state, 0); assert_eq!(swarm2.behaviour().state, 0); } From eb5e8e66f74799a4398aab677d82a6443de7bc46 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Thu, 21 Sep 2023 18:45:27 +1000 Subject: [PATCH 10/15] Fix new compile errors --- Cargo.lock | 2 +- Cargo.toml | 2 +- misc/connection-limits/src/lib.rs | 2 +- misc/memory-connection-limits/src/lib.rs | 2 +- misc/memory-connection-limits/tests/util.rs | 2 +- protocols/upnp/src/behaviour.rs | 2 +- swarm/CHANGELOG.md | 11 ++++++----- swarm/Cargo.toml | 2 +- 8 files changed, 13 insertions(+), 12 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 96d9696995e..06cd39db717 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3039,7 +3039,7 @@ dependencies = [ [[package]] name = "libp2p-swarm" -version = "0.43.4" +version = "0.44.0" dependencies = [ "async-std", "either", diff --git a/Cargo.toml b/Cargo.toml index 9b703d9c4ee..2a0078a61e2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -100,7 +100,7 @@ libp2p-rendezvous = { version = "0.13.0", path = "protocols/rendezvous" } libp2p-upnp = { version = "0.1.0", path = "protocols/upnp" } libp2p-request-response = { version = "0.25.1", path = "protocols/request-response" } libp2p-server = { version = "0.12.3", path = "misc/server" } -libp2p-swarm = { version = "0.43.4", path = "swarm" } +libp2p-swarm = { version = "0.44.0", path = "swarm" } libp2p-swarm-derive = { version = "0.33.0", path = "swarm-derive" } libp2p-swarm-test = { version = "0.2.0", path = "swarm-test" } libp2p-tcp = { version = "0.40.0", path = "transports/tcp" } diff --git a/misc/connection-limits/src/lib.rs b/misc/connection-limits/src/lib.rs index 2b5b7c47491..29da49f56cd 100644 --- a/misc/connection-limits/src/lib.rs +++ b/misc/connection-limits/src/lib.rs @@ -579,7 +579,7 @@ mod tests { ))) } - fn on_swarm_event(&mut self, _event: FromSwarm) {} + fn on_swarm_event(&mut self, _event: FromSwarm) {} fn on_connection_handler_event( &mut self, diff --git a/misc/memory-connection-limits/src/lib.rs b/misc/memory-connection-limits/src/lib.rs index 33e40b11843..f705861253e 100644 --- a/misc/memory-connection-limits/src/lib.rs +++ b/misc/memory-connection-limits/src/lib.rs @@ -181,7 +181,7 @@ impl NetworkBehaviour for Behaviour { Ok(dummy::ConnectionHandler) } - fn on_swarm_event(&mut self, _: FromSwarm) {} + fn on_swarm_event(&mut self, _: FromSwarm) {} fn on_connection_handler_event( &mut self, diff --git a/misc/memory-connection-limits/tests/util.rs b/misc/memory-connection-limits/tests/util.rs index a2fd7c20fed..7057a4d5605 100644 --- a/misc/memory-connection-limits/tests/util.rs +++ b/misc/memory-connection-limits/tests/util.rs @@ -107,7 +107,7 @@ impl NetworkBehaviour Ok(dummy::ConnectionHandler) } - fn on_swarm_event(&mut self, _: FromSwarm) {} + fn on_swarm_event(&mut self, _: FromSwarm) {} fn on_connection_handler_event( &mut self, diff --git a/protocols/upnp/src/behaviour.rs b/protocols/upnp/src/behaviour.rs index f582e96e1e7..e63074434ad 100644 --- a/protocols/upnp/src/behaviour.rs +++ b/protocols/upnp/src/behaviour.rs @@ -252,7 +252,7 @@ impl NetworkBehaviour for Behaviour { Ok(dummy::ConnectionHandler) } - fn on_swarm_event(&mut self, event: FromSwarm) { + fn on_swarm_event(&mut self, event: FromSwarm) { match event { FromSwarm::NewListenAddr(NewListenAddr { listener_id, diff --git a/swarm/CHANGELOG.md b/swarm/CHANGELOG.md index f910d7a1b7c..dab9871f7b1 100644 --- a/swarm/CHANGELOG.md +++ b/swarm/CHANGELOG.md @@ -1,3 +1,9 @@ +## 0.44.0 - unreleased + +- Remove `handler` field from `ConnectionClosed`. + If you need to transfer state from a `ConnectionHandler` to its `NetworkBehaviour` when a connection closes, use `ConnectionHandler::poll_close`. + See [PR 4076](https://github.com/libp2p/rust-libp2p/pull/4076). + ## 0.43.4 - unreleased - Implement `Debug` for event structs. @@ -102,10 +108,6 @@ - Add ability to `downcast_ref` ConnectionDenied errors. See [PR 4020]. -- Remove `handler` field from `ConnectionClosed`. - If you need to transfer state from a `ConnectionHandler` to its `NetworkBehaviour` when a connection closes, use `ConnectionHandler::poll_close`. - See [PR 4076. - [PR 3292]: https://github.com/libp2p/rust-libp2p/pull/3292 [PR 3605]: https://github.com/libp2p/rust-libp2p/pull/3605 [PR 3651]: https://github.com/libp2p/rust-libp2p/pull/3651 @@ -124,7 +126,6 @@ [PR 3956]: https://github.com/libp2p/rust-libp2p/pull/3956 [PR 4020]: https://github.com/libp2p/rust-libp2p/pull/4020 [PR 4037]: https://github.com/libp2p/rust-libp2p/pull/4037 -[PR 4076]: https://github.com/libp2p/rust-libp2p/pull/4076 ## 0.42.2 diff --git a/swarm/Cargo.toml b/swarm/Cargo.toml index 86f4c158387..095a0abf94c 100644 --- a/swarm/Cargo.toml +++ b/swarm/Cargo.toml @@ -3,7 +3,7 @@ name = "libp2p-swarm" edition = "2021" rust-version = { workspace = true } description = "The libp2p swarm" -version = "0.43.4" +version = "0.44.0" authors = ["Parity Technologies "] license = "MIT" repository = "https://github.com/libp2p/rust-libp2p" From 94fd012972cfa3b232173c68b3f06124a5be1b9e Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Thu, 21 Sep 2023 18:47:18 +1000 Subject: [PATCH 11/15] Bump swarm-derive version --- Cargo.lock | 2 +- Cargo.toml | 2 +- swarm-derive/CHANGELOG.md | 5 +++++ swarm-derive/Cargo.toml | 2 +- 4 files changed, 8 insertions(+), 3 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 06cd39db717..05b286eaf33 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3072,7 +3072,7 @@ dependencies = [ [[package]] name = "libp2p-swarm-derive" -version = "0.33.0" +version = "0.34.0" dependencies = [ "heck", "proc-macro-warning", diff --git a/Cargo.toml b/Cargo.toml index 2a0078a61e2..736796b286a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -101,7 +101,7 @@ libp2p-upnp = { version = "0.1.0", path = "protocols/upnp" } libp2p-request-response = { version = "0.25.1", path = "protocols/request-response" } libp2p-server = { version = "0.12.3", path = "misc/server" } libp2p-swarm = { version = "0.44.0", path = "swarm" } -libp2p-swarm-derive = { version = "0.33.0", path = "swarm-derive" } +libp2p-swarm-derive = { version = "0.34.0", path = "swarm-derive" } libp2p-swarm-test = { version = "0.2.0", path = "swarm-test" } libp2p-tcp = { version = "0.40.0", path = "transports/tcp" } libp2p-tls = { version = "0.2.1", path = "transports/tls" } diff --git a/swarm-derive/CHANGELOG.md b/swarm-derive/CHANGELOG.md index 3a33771b099..fc228b80a62 100644 --- a/swarm-derive/CHANGELOG.md +++ b/swarm-derive/CHANGELOG.md @@ -1,3 +1,8 @@ +## 0.34.0 - unreleased + +- Adapt to interface changes in `libp2p-swarm`. + See [PR 4706](https://github.com/libp2p/rust-libp2p/pull/4076). + ## 0.33.0 - Raise MSRV to 1.65. diff --git a/swarm-derive/Cargo.toml b/swarm-derive/Cargo.toml index 75a3ac29eee..5e064a3f6ef 100644 --- a/swarm-derive/Cargo.toml +++ b/swarm-derive/Cargo.toml @@ -3,7 +3,7 @@ name = "libp2p-swarm-derive" edition = "2021" rust-version = { workspace = true } description = "Procedural macros of libp2p-swarm" -version = "0.33.0" +version = "0.34.0" authors = ["Parity Technologies "] license = "MIT" repository = "https://github.com/libp2p/rust-libp2p" From 95c113a2c49e5daf94857b5a1f443575f68855a9 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Mon, 23 Oct 2023 10:34:03 +1100 Subject: [PATCH 12/15] Update docs --- swarm/src/handler.rs | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/swarm/src/handler.rs b/swarm/src/handler.rs index f3d57f96e7b..662bb84902b 100644 --- a/swarm/src/handler.rs +++ b/swarm/src/handler.rs @@ -161,8 +161,11 @@ pub trait ConnectionHandler: Send + 'static { /// Gracefully close the [`ConnectionHandler`]. /// /// The contract for this function is equivalent to a [`Stream`](futures::Stream). - /// It will be polled to completion _in parallel_ with closing the underlying physical connection. - /// Performing IO on the connection will likely fail. + /// When a connection is being shut down, we will first poll this function to completion. + /// Following that, the physical connection will be shut down. + /// + /// This is also called when the shutdown was initiated due to an error on the connection. + /// We therefore cannot guarantee that performing IO within here will succeed. /// /// To signal completion, [`Poll::Ready(None)`] should be returned. fn poll_close(&mut self, _: &mut Context<'_>) -> Poll> { From 79a5a689ad3217b8149fab525a986196ee3062fb Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Mon, 23 Oct 2023 10:35:23 +1100 Subject: [PATCH 13/15] Call `poll_close` to completion before terminating connection --- swarm/src/connection/pool/task.rs | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/swarm/src/connection/pool/task.rs b/swarm/src/connection/pool/task.rs index bc925da6153..d6cf6ae0580 100644 --- a/swarm/src/connection/pool/task.rs +++ b/swarm/src/connection/pool/task.rs @@ -189,19 +189,17 @@ pub(crate) async fn new_for_established_connection( command_receiver.close(); let (remaining_events, closing_muxer) = connection.close(); - let (_, muxer_close_error) = future::join( - events.send_all(&mut remaining_events.map(|event| { + let _ = events + .send_all(&mut remaining_events.map(|event| { Ok(EstablishedConnectionEvent::Notify { id: connection_id, event, peer_id, }) - })), - closing_muxer, - ) - .await; + })) + .await; - let error = muxer_close_error.err().map(ConnectionError::IO); + let error = closing_muxer.await.err().map(ConnectionError::IO); let _ = events .send(EstablishedConnectionEvent::Closed { From 68403aee8d0b0cf8200e7847d7080d1864f714cc Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Mon, 23 Oct 2023 10:37:59 +1100 Subject: [PATCH 14/15] Remove unused import --- swarm/src/connection/pool/task.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/swarm/src/connection/pool/task.rs b/swarm/src/connection/pool/task.rs index d6cf6ae0580..f2c6928cd27 100644 --- a/swarm/src/connection/pool/task.rs +++ b/swarm/src/connection/pool/task.rs @@ -32,7 +32,6 @@ use crate::{ }; use futures::{ channel::{mpsc, oneshot}, - future, future::{poll_fn, Either, Future}, SinkExt, StreamExt, }; From bbc84c52dc7c06ee852fa81d4566eaf1af03ee34 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Wed, 25 Oct 2023 11:00:34 +1100 Subject: [PATCH 15/15] Fix compile error --- swarm/tests/connection_close.rs | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/swarm/tests/connection_close.rs b/swarm/tests/connection_close.rs index e734352c633..343910f36ba 100644 --- a/swarm/tests/connection_close.rs +++ b/swarm/tests/connection_close.rs @@ -4,8 +4,8 @@ use libp2p_identity::PeerId; use libp2p_swarm::handler::ConnectionEvent; use libp2p_swarm::{ ConnectionDenied, ConnectionHandler, ConnectionHandlerEvent, ConnectionId, FromSwarm, - KeepAlive, NetworkBehaviour, PollParameters, SubstreamProtocol, Swarm, SwarmEvent, THandler, - THandlerInEvent, THandlerOutEvent, ToSwarm, + KeepAlive, NetworkBehaviour, SubstreamProtocol, Swarm, SwarmEvent, THandler, THandlerInEvent, + THandlerOutEvent, ToSwarm, }; use libp2p_swarm_test::SwarmExt; use std::task::{Context, Poll}; @@ -88,11 +88,7 @@ impl NetworkBehaviour for Behaviour { self.state -= 1; } - fn poll( - &mut self, - _: &mut Context<'_>, - _: &mut impl PollParameters, - ) -> Poll>> { + fn poll(&mut self, _: &mut Context<'_>) -> Poll>> { Poll::Pending } }