diff --git a/misc/allow-block-list/src/lib.rs b/misc/allow-block-list/src/lib.rs index 8f8892b7810..42d3c0ef613 100644 --- a/misc/allow-block-list/src/lib.rs +++ b/misc/allow-block-list/src/lib.rs @@ -231,7 +231,7 @@ where Ok(dummy::ConnectionHandler) } - fn on_swarm_event(&mut self, event: FromSwarm) { + fn on_swarm_event(&mut self, event: FromSwarm) { match event { FromSwarm::ConnectionClosed(_) => {} FromSwarm::ConnectionEstablished(_) => {} diff --git a/misc/connection-limits/src/lib.rs b/misc/connection-limits/src/lib.rs index f58e880cf91..b873da76be7 100644 --- a/misc/connection-limits/src/lib.rs +++ b/misc/connection-limits/src/lib.rs @@ -303,7 +303,7 @@ impl NetworkBehaviour for Behaviour { Ok(dummy::ConnectionHandler) } - fn on_swarm_event(&mut self, event: FromSwarm) { + fn on_swarm_event(&mut self, event: FromSwarm) { match event { FromSwarm::ConnectionClosed(ConnectionClosed { peer_id, @@ -583,7 +583,7 @@ mod tests { ))) } - fn on_swarm_event(&mut self, _event: FromSwarm) {} + fn on_swarm_event(&mut self, _event: FromSwarm) {} fn on_connection_handler_event( &mut self, diff --git a/misc/memory-connection-limits/src/lib.rs b/misc/memory-connection-limits/src/lib.rs index b1e68d80083..01ff04552e7 100644 --- a/misc/memory-connection-limits/src/lib.rs +++ b/misc/memory-connection-limits/src/lib.rs @@ -181,7 +181,7 @@ impl NetworkBehaviour for Behaviour { Ok(dummy::ConnectionHandler) } - fn on_swarm_event(&mut self, _: FromSwarm) {} + fn on_swarm_event(&mut self, _: FromSwarm) {} fn on_connection_handler_event( &mut self, diff --git a/misc/memory-connection-limits/tests/util.rs b/misc/memory-connection-limits/tests/util.rs index 8d9d73af187..f40ce319929 100644 --- a/misc/memory-connection-limits/tests/util.rs +++ b/misc/memory-connection-limits/tests/util.rs @@ -107,7 +107,7 @@ impl NetworkBehaviour Ok(dummy::ConnectionHandler) } - fn on_swarm_event(&mut self, _: FromSwarm) {} + fn on_swarm_event(&mut self, _: FromSwarm) {} fn on_connection_handler_event( &mut self, diff --git a/protocols/autonat/src/behaviour.rs b/protocols/autonat/src/behaviour.rs index e0e311e3666..d43ee224fc9 100644 --- a/protocols/autonat/src/behaviour.rs +++ b/protocols/autonat/src/behaviour.rs @@ -362,16 +362,14 @@ impl Behaviour { peer_id, connection_id, endpoint, - handler, remaining_established, - }: ConnectionClosed<::ConnectionHandler>, + }: ConnectionClosed, ) { self.inner .on_swarm_event(FromSwarm::ConnectionClosed(ConnectionClosed { peer_id, connection_id, endpoint, - handler, remaining_established, })); @@ -539,7 +537,7 @@ impl NetworkBehaviour for Behaviour { .handle_established_outbound_connection(connection_id, peer, addr, role_override) } - fn on_swarm_event(&mut self, event: FromSwarm) { + fn on_swarm_event(&mut self, event: FromSwarm) { self.listen_addresses.on_swarm_event(&event); match event { diff --git a/protocols/dcutr/src/behaviour_impl.rs b/protocols/dcutr/src/behaviour_impl.rs index 86a16387b09..e7ecdd3c6ad 100644 --- a/protocols/dcutr/src/behaviour_impl.rs +++ b/protocols/dcutr/src/behaviour_impl.rs @@ -167,7 +167,7 @@ impl Behaviour { connection_id, endpoint: connected_point, .. - }: ConnectionClosed<::ConnectionHandler>, + }: ConnectionClosed, ) { if !connected_point.is_relayed() { let connections = self @@ -358,7 +358,7 @@ impl NetworkBehaviour for Behaviour { Poll::Pending } - fn on_swarm_event(&mut self, event: FromSwarm) { + fn on_swarm_event(&mut self, event: FromSwarm) { self.external_addresses.on_swarm_event(&event); match event { diff --git a/protocols/floodsub/src/layer.rs b/protocols/floodsub/src/layer.rs index 5f80f63c38e..af0d3373ec1 100644 --- a/protocols/floodsub/src/layer.rs +++ b/protocols/floodsub/src/layer.rs @@ -307,7 +307,7 @@ impl Floodsub { peer_id, remaining_established, .. - }: ConnectionClosed<::ConnectionHandler>, + }: ConnectionClosed, ) { if remaining_established > 0 { // we only care about peer disconnections @@ -482,7 +482,7 @@ impl NetworkBehaviour for Floodsub { Poll::Pending } - fn on_swarm_event(&mut self, event: FromSwarm) { + fn on_swarm_event(&mut self, event: FromSwarm) { match event { FromSwarm::ConnectionEstablished(connection_established) => { self.on_connection_established(connection_established) diff --git a/protocols/gossipsub/src/behaviour.rs b/protocols/gossipsub/src/behaviour.rs index 4db3342365f..2a3a13ea6e7 100644 --- a/protocols/gossipsub/src/behaviour.rs +++ b/protocols/gossipsub/src/behaviour.rs @@ -3091,7 +3091,7 @@ where endpoint, remaining_established, .. - }: ConnectionClosed<::ConnectionHandler>, + }: ConnectionClosed, ) { // Remove IP from peer scoring system if let Some((peer_score, ..)) = &mut self.peer_score { @@ -3433,7 +3433,7 @@ where Poll::Pending } - fn on_swarm_event(&mut self, event: FromSwarm) { + fn on_swarm_event(&mut self, event: FromSwarm) { match event { FromSwarm::ConnectionEstablished(connection_established) => { self.on_connection_established(connection_established) diff --git a/protocols/gossipsub/src/behaviour/tests.rs b/protocols/gossipsub/src/behaviour/tests.rs index 47410d890ce..dba5db4c01d 100644 --- a/protocols/gossipsub/src/behaviour/tests.rs +++ b/protocols/gossipsub/src/behaviour/tests.rs @@ -21,7 +21,6 @@ // Collection of tests for the gossipsub network behaviour use super::*; -use crate::protocol::ProtocolConfig; use crate::subscription_filter::WhitelistSubscriptionFilter; use crate::transform::{DataTransform, IdentityTransform}; use crate::ValidationError; @@ -267,13 +266,10 @@ where for connection_id in peer_connections.connections.clone() { active_connections = active_connections.checked_sub(1).unwrap(); - let dummy_handler = Handler::new(ProtocolConfig::default()); - gs.on_swarm_event(FromSwarm::ConnectionClosed(ConnectionClosed { peer_id: *peer_id, connection_id, endpoint: &fake_endpoint, - handler: dummy_handler, remaining_established: active_connections, })); } diff --git a/protocols/identify/src/behaviour.rs b/protocols/identify/src/behaviour.rs index 8a3e545a229..d58bcb4f5eb 100644 --- a/protocols/identify/src/behaviour.rs +++ b/protocols/identify/src/behaviour.rs @@ -335,7 +335,7 @@ impl NetworkBehaviour for Behaviour { Ok(self.discovered_peers.get(&peer)) } - fn on_swarm_event(&mut self, event: FromSwarm) { + fn on_swarm_event(&mut self, event: FromSwarm) { let listen_addr_changed = self.listen_addresses.on_swarm_event(&event); let external_addr_changed = self.external_addresses.on_swarm_event(&event); diff --git a/protocols/kad/src/behaviour.rs b/protocols/kad/src/behaviour.rs index 9d19546399d..0b187955e39 100644 --- a/protocols/kad/src/behaviour.rs +++ b/protocols/kad/src/behaviour.rs @@ -2009,7 +2009,7 @@ where remaining_established, connection_id, .. - }: ConnectionClosed<::ConnectionHandler>, + }: ConnectionClosed, ) { self.connections.remove(&connection_id); @@ -2526,7 +2526,7 @@ where } } - fn on_swarm_event(&mut self, event: FromSwarm) { + fn on_swarm_event(&mut self, event: FromSwarm) { self.listen_addresses.on_swarm_event(&event); let external_addresses_changed = self.external_addresses.on_swarm_event(&event); diff --git a/protocols/mdns/src/behaviour.rs b/protocols/mdns/src/behaviour.rs index bda0910c45c..a460d56ad18 100644 --- a/protocols/mdns/src/behaviour.rs +++ b/protocols/mdns/src/behaviour.rs @@ -275,7 +275,7 @@ where void::unreachable(ev) } - fn on_swarm_event(&mut self, event: FromSwarm) { + fn on_swarm_event(&mut self, event: FromSwarm) { self.listen_addresses .write() .unwrap_or_else(|e| e.into_inner()) diff --git a/protocols/perf/src/client/behaviour.rs b/protocols/perf/src/client/behaviour.rs index 0647ce81130..a4dc354fac0 100644 --- a/protocols/perf/src/client/behaviour.rs +++ b/protocols/perf/src/client/behaviour.rs @@ -106,7 +106,7 @@ impl NetworkBehaviour for Behaviour { Ok(Handler::default()) } - fn on_swarm_event(&mut self, event: FromSwarm) { + fn on_swarm_event(&mut self, event: FromSwarm) { match event { FromSwarm::ConnectionEstablished(ConnectionEstablished { peer_id, .. }) => { self.connected.insert(peer_id); @@ -115,7 +115,6 @@ impl NetworkBehaviour for Behaviour { peer_id, connection_id: _, endpoint: _, - handler: _, remaining_established, }) => { if remaining_established == 0 { diff --git a/protocols/perf/src/server/behaviour.rs b/protocols/perf/src/server/behaviour.rs index e1d4c817d0a..c699f706d87 100644 --- a/protocols/perf/src/server/behaviour.rs +++ b/protocols/perf/src/server/behaviour.rs @@ -75,7 +75,7 @@ impl NetworkBehaviour for Behaviour { Ok(Handler::default()) } - fn on_swarm_event(&mut self, event: FromSwarm) { + fn on_swarm_event(&mut self, event: FromSwarm) { match event { FromSwarm::ConnectionEstablished(_) => {} FromSwarm::ConnectionClosed(_) => {} diff --git a/protocols/ping/src/lib.rs b/protocols/ping/src/lib.rs index b25adad9e4e..3e3d14477b5 100644 --- a/protocols/ping/src/lib.rs +++ b/protocols/ping/src/lib.rs @@ -149,7 +149,7 @@ impl NetworkBehaviour for Behaviour { } } - fn on_swarm_event(&mut self, event: FromSwarm) { + fn on_swarm_event(&mut self, event: FromSwarm) { match event { FromSwarm::ConnectionEstablished(_) | FromSwarm::ConnectionClosed(_) diff --git a/protocols/relay/src/behaviour.rs b/protocols/relay/src/behaviour.rs index 7de9cfced88..2857d23655c 100644 --- a/protocols/relay/src/behaviour.rs +++ b/protocols/relay/src/behaviour.rs @@ -252,7 +252,7 @@ impl Behaviour { peer_id, connection_id, .. - }: ConnectionClosed<::ConnectionHandler>, + }: ConnectionClosed, ) { if let hash_map::Entry::Occupied(mut peer) = self.reservations.entry(peer_id) { peer.get_mut().remove(&connection_id); @@ -332,7 +332,7 @@ impl NetworkBehaviour for Behaviour { ))) } - fn on_swarm_event(&mut self, event: FromSwarm) { + fn on_swarm_event(&mut self, event: FromSwarm) { self.external_addresses.on_swarm_event(&event); match event { diff --git a/protocols/relay/src/priv_client.rs b/protocols/relay/src/priv_client.rs index 770f552cb79..df668ec1798 100644 --- a/protocols/relay/src/priv_client.rs +++ b/protocols/relay/src/priv_client.rs @@ -125,7 +125,7 @@ impl Behaviour { connection_id, endpoint, .. - }: ConnectionClosed<::ConnectionHandler>, + }: ConnectionClosed, ) { if !endpoint.is_relayed() { match self.directly_connected_peers.entry(peer_id) { @@ -192,7 +192,7 @@ impl NetworkBehaviour for Behaviour { Ok(Either::Left(handler)) } - fn on_swarm_event(&mut self, event: FromSwarm) { + fn on_swarm_event(&mut self, event: FromSwarm) { match event { FromSwarm::ConnectionEstablished(ConnectionEstablished { peer_id, diff --git a/protocols/rendezvous/src/client.rs b/protocols/rendezvous/src/client.rs index 876ced1ee96..ec573e5ae4d 100644 --- a/protocols/rendezvous/src/client.rs +++ b/protocols/rendezvous/src/client.rs @@ -223,7 +223,7 @@ impl NetworkBehaviour for Behaviour { .on_connection_handler_event(peer_id, connection_id, event); } - fn on_swarm_event(&mut self, event: FromSwarm) { + fn on_swarm_event(&mut self, event: FromSwarm) { let changed = self.external_addresses.on_swarm_event(&event); self.inner.on_swarm_event(event); diff --git a/protocols/rendezvous/src/server.rs b/protocols/rendezvous/src/server.rs index 2e2e4c0ee1d..8911f2cea01 100644 --- a/protocols/rendezvous/src/server.rs +++ b/protocols/rendezvous/src/server.rs @@ -230,7 +230,7 @@ impl NetworkBehaviour for Behaviour { } } - fn on_swarm_event(&mut self, event: FromSwarm) { + fn on_swarm_event(&mut self, event: FromSwarm) { self.inner.on_swarm_event(event); } } diff --git a/protocols/request-response/src/lib.rs b/protocols/request-response/src/lib.rs index db6859dd244..42aa12774ea 100644 --- a/protocols/request-response/src/lib.rs +++ b/protocols/request-response/src/lib.rs @@ -610,7 +610,7 @@ where connection_id, remaining_established, .. - }: ConnectionClosed<::ConnectionHandler>, + }: ConnectionClosed, ) { let connections = self .connected @@ -764,7 +764,7 @@ where Ok(handler) } - fn on_swarm_event(&mut self, event: FromSwarm) { + fn on_swarm_event(&mut self, event: FromSwarm) { match event { FromSwarm::ConnectionEstablished(_) => {} FromSwarm::ConnectionClosed(connection_closed) => { diff --git a/protocols/upnp/src/behaviour.rs b/protocols/upnp/src/behaviour.rs index cd153020f63..3d83545b952 100644 --- a/protocols/upnp/src/behaviour.rs +++ b/protocols/upnp/src/behaviour.rs @@ -252,7 +252,7 @@ impl NetworkBehaviour for Behaviour { Ok(dummy::ConnectionHandler) } - fn on_swarm_event(&mut self, event: FromSwarm) { + fn on_swarm_event(&mut self, event: FromSwarm) { match event { FromSwarm::NewListenAddr(NewListenAddr { listener_id, diff --git a/swarm-derive/CHANGELOG.md b/swarm-derive/CHANGELOG.md index 9230edf6f26..fc228b80a62 100644 --- a/swarm-derive/CHANGELOG.md +++ b/swarm-derive/CHANGELOG.md @@ -1,5 +1,7 @@ ## 0.34.0 - unreleased +- Adapt to interface changes in `libp2p-swarm`. + See [PR 4706](https://github.com/libp2p/rust-libp2p/pull/4076). ## 0.33.0 diff --git a/swarm-derive/src/lib.rs b/swarm-derive/src/lib.rs index 02f2df037c0..cbc81876c55 100644 --- a/swarm-derive/src/lib.rs +++ b/swarm-derive/src/lib.rs @@ -285,26 +285,15 @@ fn build_struct(ast: &DeriveInput, data_struct: &DataStruct) -> syn::Result quote! { self.#i.on_swarm_event(#from_swarm::ConnectionClosed(#connection_closed { peer_id, connection_id, endpoint, - handler, remaining_established, })); }, @@ -313,14 +302,12 @@ fn build_struct(ast: &DeriveInput, data_struct: &DataStruct) -> syn::Result syn::Result) { + fn on_swarm_event(&mut self, event: #from_swarm) { match event { #from_swarm::ConnectionEstablished( #connection_established { peer_id, connection_id, endpoint, failed_addresses, other_established }) @@ -850,7 +837,7 @@ fn build_struct(ast: &DeriveInput, data_struct: &DataStruct) -> syn::Result { #(#on_address_change_stmts)* } #from_swarm::ConnectionClosed( - #connection_closed { peer_id, connection_id, endpoint, handler: handlers, remaining_established }) + #connection_closed { peer_id, connection_id, endpoint, remaining_established }) => { #(#on_connection_closed_stmts)* } #from_swarm::DialFailure( #dial_failure { peer_id, connection_id, error }) diff --git a/swarm/CHANGELOG.md b/swarm/CHANGELOG.md index 02b0bf70c31..814d37cda8f 100644 --- a/swarm/CHANGELOG.md +++ b/swarm/CHANGELOG.md @@ -1,5 +1,8 @@ ## 0.44.0 - unreleased +- Remove `handler` field from `ConnectionClosed`. + If you need to transfer state from a `ConnectionHandler` to its `NetworkBehaviour` when a connection closes, use `ConnectionHandler::poll_close`. + See [PR 4076](https://github.com/libp2p/rust-libp2p/pull/4076). - Remove deprecated `PollParameters` from `NetworkBehaviour::poll` function. See [PR 4490](https://github.com/libp2p/rust-libp2p/pull/4490). - Add `PeerCondition::DisconnectedAndNotDialing` variant, combining pre-existing conditions. diff --git a/swarm/src/behaviour.rs b/swarm/src/behaviour.rs index 4866b9cc29e..c89796f8e25 100644 --- a/swarm/src/behaviour.rs +++ b/swarm/src/behaviour.rs @@ -191,7 +191,7 @@ pub trait NetworkBehaviour: 'static { ) -> Result, ConnectionDenied>; /// Informs the behaviour about an event from the [`Swarm`](crate::Swarm). - fn on_swarm_event(&mut self, event: FromSwarm); + fn on_swarm_event(&mut self, event: FromSwarm); /// Informs the behaviour about an event generated by the [`ConnectionHandler`] /// dedicated to the peer identified by `peer_id`. for the behaviour. @@ -389,7 +389,7 @@ pub enum CloseConnection { /// Enumeration with the list of the possible events /// to pass to [`on_swarm_event`](NetworkBehaviour::on_swarm_event). #[derive(Debug)] -pub enum FromSwarm<'a, Handler> { +pub enum FromSwarm<'a> { /// Informs the behaviour about a newly established connection to a peer. ConnectionEstablished(ConnectionEstablished<'a>), /// Informs the behaviour about a closed connection to a peer. @@ -397,7 +397,7 @@ pub enum FromSwarm<'a, Handler> { /// This event is always paired with an earlier /// [`FromSwarm::ConnectionEstablished`] with the same peer ID, connection ID /// and endpoint. - ConnectionClosed(ConnectionClosed<'a, Handler>), + ConnectionClosed(ConnectionClosed<'a>), /// Informs the behaviour that the [`ConnectedPoint`] of an existing /// connection has changed. AddressChange(AddressChange<'a>), @@ -446,11 +446,10 @@ pub struct ConnectionEstablished<'a> { /// [`FromSwarm::ConnectionEstablished`] with the same peer ID, connection ID /// and endpoint. #[derive(Debug)] -pub struct ConnectionClosed<'a, Handler> { +pub struct ConnectionClosed<'a> { pub peer_id: PeerId, pub connection_id: ConnectionId, pub endpoint: &'a ConnectedPoint, - pub handler: Handler, pub remaining_established: usize, } @@ -540,106 +539,3 @@ pub struct ExternalAddrConfirmed<'a> { pub struct ExternalAddrExpired<'a> { pub addr: &'a Multiaddr, } - -impl<'a, Handler> FromSwarm<'a, Handler> { - fn map_handler( - self, - map_handler: impl FnOnce(Handler) -> NewHandler, - ) -> FromSwarm<'a, NewHandler> { - self.maybe_map_handler(|h| Some(map_handler(h))) - .expect("To return Some as all closures return Some.") - } - - fn maybe_map_handler( - self, - map_handler: impl FnOnce(Handler) -> Option, - ) -> Option> { - match self { - FromSwarm::ConnectionClosed(ConnectionClosed { - peer_id, - connection_id, - endpoint, - handler, - remaining_established, - }) => Some(FromSwarm::ConnectionClosed(ConnectionClosed { - peer_id, - connection_id, - endpoint, - handler: map_handler(handler)?, - remaining_established, - })), - FromSwarm::ConnectionEstablished(ConnectionEstablished { - peer_id, - connection_id, - endpoint, - failed_addresses, - other_established, - }) => Some(FromSwarm::ConnectionEstablished(ConnectionEstablished { - peer_id, - connection_id, - endpoint, - failed_addresses, - other_established, - })), - FromSwarm::AddressChange(AddressChange { - peer_id, - connection_id, - old, - new, - }) => Some(FromSwarm::AddressChange(AddressChange { - peer_id, - connection_id, - old, - new, - })), - FromSwarm::DialFailure(DialFailure { - peer_id, - error, - connection_id, - }) => Some(FromSwarm::DialFailure(DialFailure { - peer_id, - error, - connection_id, - })), - FromSwarm::ListenFailure(ListenFailure { - local_addr, - send_back_addr, - connection_id, - error, - }) => Some(FromSwarm::ListenFailure(ListenFailure { - local_addr, - send_back_addr, - connection_id, - error, - })), - FromSwarm::NewListener(NewListener { listener_id }) => { - Some(FromSwarm::NewListener(NewListener { listener_id })) - } - FromSwarm::NewListenAddr(NewListenAddr { listener_id, addr }) => { - Some(FromSwarm::NewListenAddr(NewListenAddr { - listener_id, - addr, - })) - } - FromSwarm::ExpiredListenAddr(ExpiredListenAddr { listener_id, addr }) => { - Some(FromSwarm::ExpiredListenAddr(ExpiredListenAddr { - listener_id, - addr, - })) - } - FromSwarm::ListenerError(ListenerError { listener_id, err }) => { - Some(FromSwarm::ListenerError(ListenerError { listener_id, err })) - } - FromSwarm::ListenerClosed(ListenerClosed { - listener_id, - reason, - }) => Some(FromSwarm::ListenerClosed(ListenerClosed { - listener_id, - reason, - })), - FromSwarm::NewExternalAddrCandidate(e) => Some(FromSwarm::NewExternalAddrCandidate(e)), - FromSwarm::ExternalAddrExpired(e) => Some(FromSwarm::ExternalAddrExpired(e)), - FromSwarm::ExternalAddrConfirmed(e) => Some(FromSwarm::ExternalAddrConfirmed(e)), - } - } -} diff --git a/swarm/src/behaviour/either.rs b/swarm/src/behaviour/either.rs index 0e92c54aaf6..25da83fa11f 100644 --- a/swarm/src/behaviour/either.rs +++ b/swarm/src/behaviour/either.rs @@ -122,16 +122,10 @@ where Ok(handler) } - fn on_swarm_event(&mut self, event: behaviour::FromSwarm) { + fn on_swarm_event(&mut self, event: behaviour::FromSwarm) { match self { - Either::Left(b) => b.on_swarm_event(event.map_handler(|h| match h { - Either::Left(h) => h, - Either::Right(_) => unreachable!(), - })), - Either::Right(b) => b.on_swarm_event(event.map_handler(|h| match h { - Either::Right(h) => h, - Either::Left(_) => unreachable!(), - })), + Either::Left(b) => b.on_swarm_event(event), + Either::Right(b) => b.on_swarm_event(event), } } diff --git a/swarm/src/behaviour/external_addresses.rs b/swarm/src/behaviour/external_addresses.rs index 307f0f938dd..14cdb301fbd 100644 --- a/swarm/src/behaviour/external_addresses.rs +++ b/swarm/src/behaviour/external_addresses.rs @@ -25,7 +25,7 @@ impl ExternalAddresses { /// Feed a [`FromSwarm`] event to this struct. /// /// Returns whether the event changed our set of external addresses. - pub fn on_swarm_event(&mut self, event: &FromSwarm) -> bool { + pub fn on_swarm_event(&mut self, event: &FromSwarm) -> bool { match event { FromSwarm::ExternalAddrConfirmed(ExternalAddrConfirmed { addr }) => { if let Some(pos) = self @@ -81,7 +81,6 @@ impl ExternalAddresses { #[cfg(test)] mod tests { use super::*; - use crate::dummy; use libp2p_core::multiaddr::Protocol; use once_cell::sync::Lazy; use rand::Rng; @@ -129,13 +128,9 @@ mod tests { while addresses.as_slice().len() < MAX_LOCAL_EXTERNAL_ADDRS { let random_address = Multiaddr::empty().with(Protocol::Memory(rand::thread_rng().gen_range(0..1000))); - addresses.on_swarm_event( - &FromSwarm::<'_, dummy::ConnectionHandler>::ExternalAddrConfirmed( - ExternalAddrConfirmed { - addr: &random_address, - }, - ), - ); + addresses.on_swarm_event(&FromSwarm::ExternalAddrConfirmed(ExternalAddrConfirmed { + addr: &random_address, + })); } addresses.on_swarm_event(&new_external_addr2()); @@ -158,19 +153,19 @@ mod tests { ); } - fn new_external_addr1() -> FromSwarm<'static, dummy::ConnectionHandler> { + fn new_external_addr1() -> FromSwarm<'static> { FromSwarm::ExternalAddrConfirmed(ExternalAddrConfirmed { addr: &MEMORY_ADDR_1000, }) } - fn new_external_addr2() -> FromSwarm<'static, dummy::ConnectionHandler> { + fn new_external_addr2() -> FromSwarm<'static> { FromSwarm::ExternalAddrConfirmed(ExternalAddrConfirmed { addr: &MEMORY_ADDR_2000, }) } - fn expired_external_addr1() -> FromSwarm<'static, dummy::ConnectionHandler> { + fn expired_external_addr1() -> FromSwarm<'static> { FromSwarm::ExternalAddrExpired(ExternalAddrExpired { addr: &MEMORY_ADDR_1000, }) diff --git a/swarm/src/behaviour/listen_addresses.rs b/swarm/src/behaviour/listen_addresses.rs index 8882db64a50..6076f5e7923 100644 --- a/swarm/src/behaviour/listen_addresses.rs +++ b/swarm/src/behaviour/listen_addresses.rs @@ -17,7 +17,7 @@ impl ListenAddresses { /// Feed a [`FromSwarm`] event to this struct. /// /// Returns whether the event changed our set of listen addresses. - pub fn on_swarm_event(&mut self, event: &FromSwarm) -> bool { + pub fn on_swarm_event(&mut self, event: &FromSwarm) -> bool { match event { FromSwarm::NewListenAddr(NewListenAddr { addr, .. }) => { self.addresses.insert((*addr).clone()) @@ -33,7 +33,6 @@ impl ListenAddresses { #[cfg(test)] mod tests { use super::*; - use crate::dummy; use libp2p_core::{multiaddr::Protocol, transport::ListenerId}; use once_cell::sync::Lazy; @@ -60,14 +59,14 @@ mod tests { assert!(!changed) } - fn new_listen_addr() -> FromSwarm<'static, dummy::ConnectionHandler> { + fn new_listen_addr() -> FromSwarm<'static> { FromSwarm::NewListenAddr(NewListenAddr { listener_id: ListenerId::next(), addr: &MEMORY_ADDR, }) } - fn expired_listen_addr() -> FromSwarm<'static, dummy::ConnectionHandler> { + fn expired_listen_addr() -> FromSwarm<'static> { FromSwarm::ExpiredListenAddr(ExpiredListenAddr { listener_id: ListenerId::next(), addr: &MEMORY_ADDR, diff --git a/swarm/src/behaviour/toggle.rs b/swarm/src/behaviour/toggle.rs index cd3713b201f..e1da71a0450 100644 --- a/swarm/src/behaviour/toggle.rs +++ b/swarm/src/behaviour/toggle.rs @@ -157,11 +157,9 @@ where }) } - fn on_swarm_event(&mut self, event: FromSwarm) { + fn on_swarm_event(&mut self, event: FromSwarm) { if let Some(behaviour) = &mut self.inner { - if let Some(event) = event.maybe_map_handler(|h| h.inner) { - behaviour.on_swarm_event(event); - } + behaviour.on_swarm_event(event); } } diff --git a/swarm/src/connection.rs b/swarm/src/connection.rs index aa353a912ca..ee2729e0c82 100644 --- a/swarm/src/connection.rs +++ b/swarm/src/connection.rs @@ -41,8 +41,8 @@ use crate::{ }; use futures::future::BoxFuture; use futures::stream::FuturesUnordered; -use futures::FutureExt; use futures::StreamExt; +use futures::{stream, FutureExt}; use futures_timer::Delay; use instant::Instant; use libp2p_core::connection::ConnectedPoint; @@ -214,10 +214,23 @@ where self.handler.on_behaviour_event(event); } - /// Begins an orderly shutdown of the connection, returning the connection - /// handler and a `Future` that resolves when connection shutdown is complete. - pub(crate) fn close(self) -> (THandler, impl Future>) { - (self.handler, self.muxing.close()) + /// Begins an orderly shutdown of the connection, returning a stream of final events and a `Future` that resolves when connection shutdown is complete. + pub(crate) fn close( + self, + ) -> ( + impl futures::Stream, + impl Future>, + ) { + let Connection { + mut handler, + muxing, + .. + } = self; + + ( + stream::poll_fn(move |cx| handler.poll_close(cx)), + muxing.close(), + ) } /// Polls the handler and the substream, forwarding events from the former to the latter and diff --git a/swarm/src/connection/pool.rs b/swarm/src/connection/pool.rs index b6100989a04..8a2f1cb6b20 100644 --- a/swarm/src/connection/pool.rs +++ b/swarm/src/connection/pool.rs @@ -260,7 +260,6 @@ pub(crate) enum PoolEvent { error: Option>, /// The remaining established connections to the same peer. remaining_established_connection_ids: Vec, - handler: THandler, }, /// An outbound connection attempt failed. @@ -576,12 +575,7 @@ where old_endpoint, }); } - Poll::Ready(Some(task::EstablishedConnectionEvent::Closed { - id, - peer_id, - error, - handler, - })) => { + Poll::Ready(Some(task::EstablishedConnectionEvent::Closed { id, peer_id, error })) => { let connections = self .established .get_mut(&peer_id) @@ -599,7 +593,6 @@ where connected: Connected { endpoint, peer_id }, error, remaining_established_connection_ids, - handler, }); } } diff --git a/swarm/src/connection/pool/task.rs b/swarm/src/connection/pool/task.rs index 175da668bda..f2c6928cd27 100644 --- a/swarm/src/connection/pool/task.rs +++ b/swarm/src/connection/pool/task.rs @@ -87,7 +87,6 @@ pub(crate) enum EstablishedConnectionEvent { id: ConnectionId, peer_id: PeerId, error: Option>, - handler: THandler, }, } @@ -187,15 +186,25 @@ pub(crate) async fn new_for_established_connection( Command::NotifyHandler(event) => connection.on_behaviour_event(event), Command::Close => { command_receiver.close(); - let (handler, closing_muxer) = connection.close(); + let (remaining_events, closing_muxer) = connection.close(); + + let _ = events + .send_all(&mut remaining_events.map(|event| { + Ok(EstablishedConnectionEvent::Notify { + id: connection_id, + event, + peer_id, + }) + })) + .await; let error = closing_muxer.await.err().map(ConnectionError::IO); + let _ = events .send(EstablishedConnectionEvent::Closed { id: connection_id, peer_id, error, - handler, }) .await; return; @@ -227,14 +236,24 @@ pub(crate) async fn new_for_established_connection( } Err(error) => { command_receiver.close(); - let (handler, _closing_muxer) = connection.close(); + let (remaining_events, _closing_muxer) = connection.close(); + + let _ = events + .send_all(&mut remaining_events.map(|event| { + Ok(EstablishedConnectionEvent::Notify { + id: connection_id, + event, + peer_id, + }) + })) + .await; + // Terminate the task with the error, dropping the connection. let _ = events .send(EstablishedConnectionEvent::Closed { id: connection_id, peer_id, error: Some(error), - handler, }) .await; return; diff --git a/swarm/src/dummy.rs b/swarm/src/dummy.rs index 3122825e2c0..1005c4be035 100644 --- a/swarm/src/dummy.rs +++ b/swarm/src/dummy.rs @@ -54,7 +54,7 @@ impl NetworkBehaviour for Behaviour { Poll::Pending } - fn on_swarm_event(&mut self, event: FromSwarm) { + fn on_swarm_event(&mut self, event: FromSwarm) { match event { FromSwarm::ConnectionEstablished(_) | FromSwarm::ConnectionClosed(_) diff --git a/swarm/src/handler.rs b/swarm/src/handler.rs index aadb435242d..f1f0547f6d2 100644 --- a/swarm/src/handler.rs +++ b/swarm/src/handler.rs @@ -157,6 +157,20 @@ pub trait ConnectionHandler: Send + 'static { >, >; + /// Gracefully close the [`ConnectionHandler`]. + /// + /// The contract for this function is equivalent to a [`Stream`](futures::Stream). + /// When a connection is being shut down, we will first poll this function to completion. + /// Following that, the physical connection will be shut down. + /// + /// This is also called when the shutdown was initiated due to an error on the connection. + /// We therefore cannot guarantee that performing IO within here will succeed. + /// + /// To signal completion, [`Poll::Ready(None)`] should be returned. + fn poll_close(&mut self, _: &mut Context<'_>) -> Poll> { + Poll::Ready(None) + } + /// Adds a closure that turns the input event into something else. fn map_in_event(self, map: TMap) -> MapInEvent where diff --git a/swarm/src/lib.rs b/swarm/src/lib.rs index 3482865f50e..65384a54c35 100644 --- a/swarm/src/lib.rs +++ b/swarm/src/lib.rs @@ -852,7 +852,6 @@ where connected, error, remaining_established_connection_ids, - handler, .. } => { if let Some(error) = error.as_ref() { @@ -879,7 +878,6 @@ where peer_id, connection_id: id, endpoint: &endpoint, - handler, remaining_established: num_established as usize, })); return Some(SwarmEvent::ConnectionClosed { diff --git a/swarm/src/test.rs b/swarm/src/test.rs index a4520545998..9a192444aec 100644 --- a/swarm/src/test.rs +++ b/swarm/src/test.rs @@ -114,7 +114,7 @@ where self.next_action.take().map_or(Poll::Pending, Poll::Ready) } - fn on_swarm_event(&mut self, event: FromSwarm) { + fn on_swarm_event(&mut self, event: FromSwarm) { match event { FromSwarm::ConnectionEstablished(_) | FromSwarm::ConnectionClosed(_) @@ -316,9 +316,8 @@ where peer_id, connection_id, endpoint, - handler, remaining_established, - }: ConnectionClosed<::ConnectionHandler>, + }: ConnectionClosed, ) { let mut other_closed_connections = self .on_connection_established @@ -366,7 +365,6 @@ where peer_id, connection_id, endpoint, - handler, remaining_established, })); } @@ -454,7 +452,7 @@ where .handle_established_outbound_connection(connection_id, peer, addr, role_override) } - fn on_swarm_event(&mut self, event: FromSwarm) { + fn on_swarm_event(&mut self, event: FromSwarm) { match event { FromSwarm::ConnectionEstablished(connection_established) => { self.on_connection_established(connection_established) diff --git a/swarm/tests/connection_close.rs b/swarm/tests/connection_close.rs new file mode 100644 index 00000000000..a44518fa4ad --- /dev/null +++ b/swarm/tests/connection_close.rs @@ -0,0 +1,152 @@ +use libp2p_core::upgrade::DeniedUpgrade; +use libp2p_core::{Endpoint, Multiaddr}; +use libp2p_identity::PeerId; +use libp2p_swarm::handler::ConnectionEvent; +use libp2p_swarm::{ + ConnectionDenied, ConnectionHandler, ConnectionHandlerEvent, ConnectionId, FromSwarm, + NetworkBehaviour, SubstreamProtocol, Swarm, SwarmEvent, THandler, THandlerInEvent, + THandlerOutEvent, ToSwarm, +}; +use libp2p_swarm_test::SwarmExt; +use std::task::{Context, Poll}; +use void::Void; + +#[async_std::test] +async fn sends_remaining_events_to_behaviour_on_connection_close() { + let mut swarm1 = Swarm::new_ephemeral(|_| Behaviour::new(3)); + let mut swarm2 = Swarm::new_ephemeral(|_| Behaviour::new(3)); + + swarm2.listen().await; + swarm1.connect(&mut swarm2).await; + + swarm1.disconnect_peer_id(*swarm2.local_peer_id()).unwrap(); + + match libp2p_swarm_test::drive(&mut swarm1, &mut swarm2).await { + ([SwarmEvent::ConnectionClosed { .. }], [SwarmEvent::ConnectionClosed { .. }]) => { + assert_eq!(swarm1.behaviour().state, 0); + assert_eq!(swarm2.behaviour().state, 0); + } + (e1, e2) => panic!("Unexpected events: {:?} {:?}", e1, e2), + } +} + +struct HandlerWithState { + precious_state: u64, +} + +struct Behaviour { + state: u64, +} + +impl Behaviour { + fn new(state: u64) -> Self { + Behaviour { state } + } +} + +impl NetworkBehaviour for Behaviour { + type ConnectionHandler = HandlerWithState; + type ToSwarm = (); + + fn handle_established_inbound_connection( + &mut self, + _: ConnectionId, + _: PeerId, + _: &Multiaddr, + _: &Multiaddr, + ) -> Result, ConnectionDenied> { + Ok(HandlerWithState { + precious_state: self.state, + }) + } + + fn handle_established_outbound_connection( + &mut self, + _: ConnectionId, + _: PeerId, + _: &Multiaddr, + _: Endpoint, + ) -> Result, ConnectionDenied> { + Ok(HandlerWithState { + precious_state: self.state, + }) + } + + fn on_swarm_event(&mut self, event: FromSwarm) { + if let FromSwarm::ConnectionClosed(_) = event { + assert_eq!(self.state, 0); + } + } + + fn on_connection_handler_event( + &mut self, + _peer_id: PeerId, + _connection_id: ConnectionId, + event: THandlerOutEvent, + ) { + assert_eq!(self.state, event); + self.state -= 1; + } + + fn poll(&mut self, _: &mut Context<'_>) -> Poll>> { + Poll::Pending + } +} + +impl ConnectionHandler for HandlerWithState { + type FromBehaviour = Void; + type ToBehaviour = u64; + type Error = Void; + type InboundProtocol = DeniedUpgrade; + type OutboundProtocol = DeniedUpgrade; + type InboundOpenInfo = (); + type OutboundOpenInfo = (); + + fn listen_protocol(&self) -> SubstreamProtocol { + SubstreamProtocol::new(DeniedUpgrade, ()) + } + + fn connection_keep_alive(&self) -> bool { + true + } + + fn poll( + &mut self, + _: &mut Context<'_>, + ) -> Poll< + ConnectionHandlerEvent< + Self::OutboundProtocol, + Self::OutboundOpenInfo, + Self::ToBehaviour, + Self::Error, + >, + > { + Poll::Pending + } + + fn poll_close(&mut self, _: &mut Context<'_>) -> Poll> { + if self.precious_state > 0 { + let state = self.precious_state; + self.precious_state -= 1; + + return Poll::Ready(Some(state)); + } + + Poll::Ready(None) + } + + fn on_behaviour_event(&mut self, event: Self::FromBehaviour) { + void::unreachable(event) + } + + fn on_connection_event( + &mut self, + _: ConnectionEvent< + Self::InboundProtocol, + Self::OutboundProtocol, + Self::InboundOpenInfo, + Self::OutboundOpenInfo, + >, + ) { + } +} diff --git a/swarm/tests/listener.rs b/swarm/tests/listener.rs index 6faee330ab1..8d22acc90e2 100644 --- a/swarm/tests/listener.rs +++ b/swarm/tests/listener.rs @@ -105,7 +105,7 @@ impl NetworkBehaviour for Behaviour { ) { } - fn on_swarm_event(&mut self, event: FromSwarm) { + fn on_swarm_event(&mut self, event: FromSwarm) { match event { FromSwarm::NewListener(NewListener { listener_id }) => { assert!(self.listeners.contains(&listener_id)); diff --git a/swarm/tests/swarm_derive.rs b/swarm/tests/swarm_derive.rs index 5f6c03451a6..f13917dcd6c 100644 --- a/swarm/tests/swarm_derive.rs +++ b/swarm/tests/swarm_derive.rs @@ -501,7 +501,7 @@ fn custom_out_event_no_type_parameters() { Poll::Pending } - fn on_swarm_event(&mut self, event: FromSwarm) { + fn on_swarm_event(&mut self, event: FromSwarm) { match event { FromSwarm::ConnectionEstablished(_) | FromSwarm::ConnectionClosed(_)