From 7173d33858f2b4af3818270f8f41199a29c00429 Mon Sep 17 00:00:00 2001 From: Jack Maloney Date: Tue, 7 Dec 2021 01:03:06 -0600 Subject: [PATCH 1/8] Report aborted connection from Pool::poll --- core/src/connection/pool.rs | 57 +++++++++++++++----------------- core/src/connection/pool/task.rs | 35 +++++++++++++------- 2 files changed, 49 insertions(+), 43 deletions(-) diff --git a/core/src/connection/pool.rs b/core/src/connection/pool.rs index 59d41908f3a..a9274865b5b 100644 --- a/core/src/connection/pool.rs +++ b/core/src/connection/pool.rs @@ -50,7 +50,8 @@ use std::{ task::Context, task::Poll, }; -use void::Void; + +use self::task::PendingConnectionCommand; mod concurrent_dial; mod task; @@ -113,7 +114,7 @@ struct EstablishedConnectionInfo { peer_id: PeerId, endpoint: ConnectedPoint, /// Channel endpoint to send commands to the task. - sender: mpsc::Sender>, + sender: mpsc::Sender>, } impl EstablishedConnectionInfo { @@ -123,7 +124,11 @@ impl EstablishedConnectionInfo { pub fn start_close(&mut self) { // Clone the sender so that we are guaranteed to have // capacity for the close command (every sender gets a slot). - match self.sender.clone().try_send(task::Command::Close) { + match self + .sender + .clone() + .try_send(task::EstablishedConnectionCommand::Close) + { Ok(()) => {} Err(e) => assert!(e.is_disconnected(), "No capacity for close command."), }; @@ -137,7 +142,7 @@ struct PendingConnectionInfo { handler: THandler, endpoint: PendingPoint, /// When dropped, notifies the task which then knows to terminate. - _drop_notifier: oneshot::Sender, + abort_notifier: Option>, } impl fmt::Debug for Pool { @@ -341,10 +346,7 @@ where /// Returns `None` if the pool has no connection with the given ID. pub fn get(&mut self, id: ConnectionId) -> Option> { if let hash_map::Entry::Occupied(entry) = self.pending.entry(id) { - Some(PoolConnection::Pending(PendingConnection { - entry, - counters: &mut self.counters, - })) + Some(PoolConnection::Pending(PendingConnection { entry })) } else { self.established .iter_mut() @@ -371,10 +373,7 @@ where /// Gets a pending outgoing connection by ID. pub fn get_outgoing(&mut self, id: ConnectionId) -> Option> { match self.pending.entry(id) { - hash_map::Entry::Occupied(entry) => Some(PendingConnection { - entry, - counters: &mut self.counters, - }), + hash_map::Entry::Occupied(entry) => Some(PendingConnection { entry }), hash_map::Entry::Vacant(_) => None, } } @@ -418,11 +417,7 @@ where .entry(pending_connection) .expect_occupied("Iterating pending connections"); - PendingConnection { - entry, - counters: &mut self.counters, - } - .abort(); + PendingConnection { entry }.abort(); } } @@ -548,13 +543,13 @@ where let connection_id = self.next_connection_id(); - let (drop_notifier, drop_receiver) = oneshot::channel(); + let (abort_notifier, abort_receiver) = oneshot::channel(); self.spawn( task::new_for_pending_outgoing_connection( connection_id, dial, - drop_receiver, + abort_receiver, self.pending_connection_events_tx.clone(), ) .boxed(), @@ -567,7 +562,7 @@ where peer_id: peer, handler, endpoint: PendingPoint::Dialer, - _drop_notifier: drop_notifier, + abort_notifier: Some(abort_notifier), }, ); Ok(connection_id) @@ -595,13 +590,13 @@ where let connection_id = self.next_connection_id(); - let (drop_notifier, drop_receiver) = oneshot::channel(); + let (abort_notifier, abort_receiver) = oneshot::channel(); self.spawn( task::new_for_pending_incoming_connection( connection_id, future, - drop_receiver, + abort_receiver, self.pending_connection_events_tx.clone(), ) .boxed(), @@ -614,7 +609,7 @@ where peer_id: None, handler, endpoint: endpoint.into(), - _drop_notifier: drop_notifier, + abort_notifier: Some(abort_notifier), }, ); Ok(connection_id) @@ -730,7 +725,7 @@ where peer_id: expected_peer_id, handler, endpoint, - _drop_notifier, + abort_notifier: _, } = self .pending .remove(&id) @@ -898,7 +893,7 @@ where peer_id, handler, endpoint, - _drop_notifier, + abort_notifier: _, }) = self.pending.remove(&id) { self.counters.dec_pending(&endpoint); @@ -955,7 +950,6 @@ pub enum PoolConnection<'a, THandler: IntoConnectionHandler> { /// A pending connection in a pool. pub struct PendingConnection<'a, THandler: IntoConnectionHandler> { entry: hash_map::OccupiedEntry<'a, ConnectionId, PendingConnectionInfo>, - counters: &'a mut ConnectionCounters, } impl PendingConnection<'_, THandler> { @@ -975,9 +969,10 @@ impl PendingConnection<'_, THandler> { } /// Aborts the connection attempt, closing the connection. - pub fn abort(self) { - self.counters.dec_pending(&self.entry.get().endpoint); - self.entry.remove(); + pub fn abort(mut self) { + if let Some(notifier) = self.entry.get_mut().abort_notifier.take() { + notifier.send(PendingConnectionCommand::Abort); + } } } @@ -1024,13 +1019,13 @@ impl EstablishedConnection<'_, TInEvent> { /// of `notify_handler`, it only fails if the connection is now about /// to close. pub fn notify_handler(&mut self, event: TInEvent) -> Result<(), TInEvent> { - let cmd = task::Command::NotifyHandler(event); + let cmd = task::EstablishedConnectionCommand::NotifyHandler(event); self.entry .get_mut() .sender .try_send(cmd) .map_err(|e| match e.into_inner() { - task::Command::NotifyHandler(event) => event, + task::EstablishedConnectionCommand::NotifyHandler(event) => event, _ => unreachable!("Expect failed send to return initial event."), }) } diff --git a/core/src/connection/pool/task.rs b/core/src/connection/pool/task.rs index 9062583fd79..d1d0e98b948 100644 --- a/core/src/connection/pool/task.rs +++ b/core/src/connection/pool/task.rs @@ -39,11 +39,16 @@ use futures::{ SinkExt, StreamExt, }; use std::pin::Pin; -use void::Void; -/// Commands that can be sent to a task. +/// Commands that can be sent to a task driving a pending connection. #[derive(Debug)] -pub enum Command { +pub enum PendingConnectionCommand { + Abort, +} + +/// Commands that can be sent to a task driving an established connection. +#[derive(Debug)] +pub enum EstablishedConnectionCommand { /// Notify the connection handler of an event. NotifyHandler(T), /// Gracefully close the connection (active close) before @@ -103,13 +108,16 @@ pub enum EstablishedConnectionEvent { pub async fn new_for_pending_outgoing_connection( connection_id: ConnectionId, dial: ConcurrentDial, - drop_receiver: oneshot::Receiver, + abort_receiver: oneshot::Receiver, mut events: mpsc::Sender>, ) where TTrans: Transport, { - match futures::future::select(drop_receiver, Box::pin(dial)).await { + match futures::future::select(abort_receiver, Box::pin(dial)).await { Either::Left((Err(oneshot::Canceled), _)) => { + unreachable!("Pool never drops channel to task."); + } + Either::Left((Ok(PendingConnectionCommand::Abort), _)) => { let _ = events .send(PendingConnectionEvent::PendingFailed { id: connection_id, @@ -117,7 +125,6 @@ pub async fn new_for_pending_outgoing_connection( }) .await; } - Either::Left((Ok(v), _)) => void::unreachable(v), Either::Right((Ok((address, output, errors)), _)) => { let _ = events .send(PendingConnectionEvent::ConnectionEstablished { @@ -141,14 +148,17 @@ pub async fn new_for_pending_outgoing_connection( pub async fn new_for_pending_incoming_connection( connection_id: ConnectionId, future: TFut, - drop_receiver: oneshot::Receiver, + abort_receiver: oneshot::Receiver, mut events: mpsc::Sender>, ) where TTrans: Transport, TFut: Future> + Send + 'static, { - match futures::future::select(drop_receiver, Box::pin(future)).await { + match futures::future::select(abort_receiver, Box::pin(future)).await { Either::Left((Err(oneshot::Canceled), _)) => { + unreachable!("Pool never drops channel to task."); + } + Either::Left((Ok(PendingConnectionCommand::Abort), _)) => { let _ = events .send(PendingConnectionEvent::PendingFailed { id: connection_id, @@ -156,7 +166,6 @@ pub async fn new_for_pending_incoming_connection( }) .await; } - Either::Left((Ok(v), _)) => void::unreachable(v), Either::Right((Ok(output), _)) => { let _ = events .send(PendingConnectionEvent::ConnectionEstablished { @@ -183,7 +192,7 @@ pub async fn new_for_established_connection( connection_id: ConnectionId, peer_id: PeerId, mut connection: crate::connection::Connection, - mut command_receiver: mpsc::Receiver>>, + mut command_receiver: mpsc::Receiver>>, mut events: mpsc::Sender>, ) where TMuxer: StreamMuxer, @@ -198,8 +207,10 @@ pub async fn new_for_established_connection( .await { Either::Left((Some(command), _)) => match command { - Command::NotifyHandler(event) => connection.inject_event(event), - Command::Close => { + EstablishedConnectionCommand::NotifyHandler(event) => { + connection.inject_event(event) + } + EstablishedConnectionCommand::Close => { command_receiver.close(); let (handler, closing_muxer) = connection.close(); From 48723f0e41fd4e23d9478e3bb21b99a8925be78d Mon Sep 17 00:00:00 2001 From: Jack Maloney Date: Fri, 17 Dec 2021 00:22:40 -0600 Subject: [PATCH 2/8] Don't `use ...` task::PendingConnectionCommand --- core/src/connection/pool.rs | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/core/src/connection/pool.rs b/core/src/connection/pool.rs index a9274865b5b..c8a8a4c9456 100644 --- a/core/src/connection/pool.rs +++ b/core/src/connection/pool.rs @@ -51,8 +51,6 @@ use std::{ task::Poll, }; -use self::task::PendingConnectionCommand; - mod concurrent_dial; mod task; @@ -142,7 +140,7 @@ struct PendingConnectionInfo { handler: THandler, endpoint: PendingPoint, /// When dropped, notifies the task which then knows to terminate. - abort_notifier: Option>, + abort_notifier: Option>, } impl fmt::Debug for Pool { @@ -971,7 +969,7 @@ impl PendingConnection<'_, THandler> { /// Aborts the connection attempt, closing the connection. pub fn abort(mut self) { if let Some(notifier) = self.entry.get_mut().abort_notifier.take() { - notifier.send(PendingConnectionCommand::Abort); + notifier.send(task::PendingConnectionCommand::Abort); } } } From 16188bb676fc20a1f25d5c724146b0c269e142b8 Mon Sep 17 00:00:00 2001 From: Jack Maloney Date: Fri, 17 Dec 2021 00:27:27 -0600 Subject: [PATCH 3/8] First attempt on testing aborted connections --- core/tests/aborted_connection.rs | 49 ++++++++++++++++++++++++++++++++ 1 file changed, 49 insertions(+) create mode 100644 core/tests/aborted_connection.rs diff --git a/core/tests/aborted_connection.rs b/core/tests/aborted_connection.rs new file mode 100644 index 00000000000..23c2eacc626 --- /dev/null +++ b/core/tests/aborted_connection.rs @@ -0,0 +1,49 @@ +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +use std::task::Context; + +use libp2p_core::{ + connection::{ + pool::{ConnectionLimits, Pool, PoolConfig}, + ConnectionHandler, + }, + transport::dummy::DummyTransport, + Multiaddr, PeerId, +}; + +mod util; + +#[test] +fn aborted_connection() { + let pool = Pool::new( + PeerId::random(), + PoolConfig::default(), + ConnectionLimits::default(), + ); + let target_peer = PeerId::random(); + pool.add_outgoing( + DummyTransport::default(), + ["/ip4/127.0.0.1/tcp/1234".parse::().unwrap()], + Some(target_peer), + util::TestHandler(), + ); + pool.disconnect(&target_peer); + let (cx, handle) = Context::new(); + pool.poll(cx); +} From 40cdec370f10bd6dcd5d60d84e84f5289a937cc3 Mon Sep 17 00:00:00 2001 From: Jack Maloney Date: Thu, 6 Jan 2022 16:30:04 -0600 Subject: [PATCH 4/8] Fix tests and move tests::util::TestHandler to connection::util::TestHandler --- core/src/connection.rs | 2 +- core/src/connection/pool.rs | 53 +++++++++++++++++++++++++++++++- core/src/connection/util.rs | 38 +++++++++++++++++++++++ core/tests/aborted_connection.rs | 49 ----------------------------- core/tests/concurrent_dialing.rs | 3 +- core/tests/connection_limits.rs | 4 +-- core/tests/network_dial_error.rs | 4 +-- core/tests/util.rs | 32 +------------------ 8 files changed, 98 insertions(+), 87 deletions(-) create mode 100644 core/src/connection/util.rs delete mode 100644 core/tests/aborted_connection.rs diff --git a/core/src/connection.rs b/core/src/connection.rs index 961eb4abc0e..4ad1e9aa003 100644 --- a/core/src/connection.rs +++ b/core/src/connection.rs @@ -22,7 +22,7 @@ mod error; pub(crate) mod handler; mod listeners; mod substream; - +pub mod util; pub(crate) mod pool; pub use error::{ diff --git a/core/src/connection/pool.rs b/core/src/connection/pool.rs index c8a8a4c9456..ca3c88d2e3d 100644 --- a/core/src/connection/pool.rs +++ b/core/src/connection/pool.rs @@ -969,7 +969,12 @@ impl PendingConnection<'_, THandler> { /// Aborts the connection attempt, closing the connection. pub fn abort(mut self) { if let Some(notifier) = self.entry.get_mut().abort_notifier.take() { - notifier.send(task::PendingConnectionCommand::Abort); + match notifier.send(task::PendingConnectionCommand::Abort) { + Ok(()) => {} + Err(e) => { + log::debug!("Failed to pending connection: {:?}", e); + } + } } } } @@ -1357,3 +1362,49 @@ impl<'a, K: 'a, V: 'a> EntryExt<'a, K, V> for hash_map::Entry<'a, K, V> { } } } + +#[cfg(test)] +mod tests { + use super::*; + + use crate::{ + connection::PendingOutboundConnectionError, + transport::dummy::DummyTransport, + }; + use futures::{executor::block_on, future::poll_fn}; + + #[test] + fn aborting_pending_connection_surfaces_error() { + let mut pool = Pool::new( + PeerId::random(), + PoolConfig::default(), + ConnectionLimits::default(), + ); + let target_peer = PeerId::random(); + + pool.add_outgoing( + DummyTransport::default(), + ["/ip4/127.0.0.1/tcp/1234".parse::().unwrap()].into_iter(), + Some(target_peer), + crate::connection::util::TestHandler(), + ) + .expect("Failed to add an outgoing connection to test pool."); + + // Disconnect from the peer, thus aborting all pending connections. + pool.disconnect(&target_peer); + + // Poll the pool until the pending connection is aborted. + block_on(poll_fn(|cx| match pool.poll(cx) { + Poll::Pending => return Poll::Pending, + Poll::Ready(PoolEvent::PendingOutboundConnectionError { + error: PendingOutboundConnectionError::Aborted, + .. + }) => { + return Poll::Ready(()); + } + Poll::Ready(_) => { + panic!("We should see an aborted error, nothing else.") + } + })); + } +} diff --git a/core/src/connection/util.rs b/core/src/connection/util.rs new file mode 100644 index 00000000000..55986eafa04 --- /dev/null +++ b/core/src/connection/util.rs @@ -0,0 +1,38 @@ +use std::{io, task::{Poll, Context}}; + +use multiaddr::Multiaddr; + +use crate::muxing::StreamMuxerBox; + +use super::{ConnectionHandler, Substream, SubstreamEndpoint, ConnectionHandlerEvent}; + + +#[derive(Debug)] +pub struct TestHandler(); + +impl ConnectionHandler for TestHandler { + type InEvent = (); + type OutEvent = (); + type Error = io::Error; + type Substream = Substream; + type OutboundOpenInfo = (); + + fn inject_substream( + &mut self, + _: Self::Substream, + _: SubstreamEndpoint, + ) { + } + + fn inject_event(&mut self, _: Self::InEvent) {} + + fn inject_address_change(&mut self, _: &Multiaddr) {} + + fn poll( + &mut self, + _: &mut Context<'_>, + ) -> Poll, Self::Error>> + { + Poll::Pending + } +} \ No newline at end of file diff --git a/core/tests/aborted_connection.rs b/core/tests/aborted_connection.rs deleted file mode 100644 index 23c2eacc626..00000000000 --- a/core/tests/aborted_connection.rs +++ /dev/null @@ -1,49 +0,0 @@ -// Permission is hereby granted, free of charge, to any person obtaining a -// copy of this software and associated documentation files (the "Software"), -// to deal in the Software without restriction, including without limitation -// the rights to use, copy, modify, merge, publish, distribute, sublicense, -// and/or sell copies of the Software, and to permit persons to whom the -// Software is furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS -// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING -// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER -// DEALINGS IN THE SOFTWARE. - -use std::task::Context; - -use libp2p_core::{ - connection::{ - pool::{ConnectionLimits, Pool, PoolConfig}, - ConnectionHandler, - }, - transport::dummy::DummyTransport, - Multiaddr, PeerId, -}; - -mod util; - -#[test] -fn aborted_connection() { - let pool = Pool::new( - PeerId::random(), - PoolConfig::default(), - ConnectionLimits::default(), - ); - let target_peer = PeerId::random(); - pool.add_outgoing( - DummyTransport::default(), - ["/ip4/127.0.0.1/tcp/1234".parse::().unwrap()], - Some(target_peer), - util::TestHandler(), - ); - pool.disconnect(&target_peer); - let (cx, handle) = Context::new(); - pool.poll(cx); -} diff --git a/core/tests/concurrent_dialing.rs b/core/tests/concurrent_dialing.rs index 08b1b5782e0..13b0e8fe5a2 100644 --- a/core/tests/concurrent_dialing.rs +++ b/core/tests/concurrent_dialing.rs @@ -27,12 +27,13 @@ use libp2p_core::{ multiaddr::Protocol, network::{NetworkConfig, NetworkEvent}, ConnectedPoint, + connection::util::TestHandler, }; use quickcheck::*; use rand07::Rng; use std::num::NonZeroU8; use std::task::Poll; -use util::{test_network, TestHandler}; +use util::{test_network}; #[test] fn concurrent_dialing() { diff --git a/core/tests/connection_limits.rs b/core/tests/connection_limits.rs index bee42b53055..bc2ad402c06 100644 --- a/core/tests/connection_limits.rs +++ b/core/tests/connection_limits.rs @@ -23,13 +23,13 @@ mod util; use futures::{future::poll_fn, ready}; use libp2p_core::multiaddr::{multiaddr, Multiaddr}; use libp2p_core::{ - connection::PendingConnectionError, + connection::{PendingConnectionError, util::TestHandler}, network::{ConnectionLimits, DialError, NetworkConfig, NetworkEvent}, PeerId, }; use quickcheck::*; use std::task::Poll; -use util::{test_network, TestHandler}; +use util::test_network; #[test] fn max_outgoing() { diff --git a/core/tests/network_dial_error.rs b/core/tests/network_dial_error.rs index 827db92e444..5aba2efeca3 100644 --- a/core/tests/network_dial_error.rs +++ b/core/tests/network_dial_error.rs @@ -23,14 +23,14 @@ mod util; use futures::prelude::*; use libp2p_core::multiaddr::multiaddr; use libp2p_core::{ - connection::PendingConnectionError, + connection::{PendingConnectionError, util::TestHandler}, multiaddr::Protocol, network::{NetworkConfig, NetworkEvent}, PeerId, }; use rand::seq::SliceRandom; use std::{io, task::Poll}; -use util::{test_network, TestHandler}; +use util::test_network; #[test] fn deny_incoming_connec() { diff --git a/core/tests/util.rs b/core/tests/util.rs index 9592daca9eb..b423da2d25f 100644 --- a/core/tests/util.rs +++ b/core/tests/util.rs @@ -2,7 +2,7 @@ use futures::prelude::*; use libp2p_core::{ - connection::{ConnectionHandler, ConnectionHandlerEvent, Substream, SubstreamEndpoint}, + connection::util::TestHandler, identity, muxing::{StreamMuxer, StreamMuxerBox}, network::{Network, NetworkConfig}, @@ -32,36 +32,6 @@ pub fn test_network(cfg: NetworkConfig) -> TestNetwork { TestNetwork::new(transport, local_public_key.into(), cfg) } -#[derive(Debug)] -pub struct TestHandler(); - -impl ConnectionHandler for TestHandler { - type InEvent = (); - type OutEvent = (); - type Error = io::Error; - type Substream = Substream; - type OutboundOpenInfo = (); - - fn inject_substream( - &mut self, - _: Self::Substream, - _: SubstreamEndpoint, - ) { - } - - fn inject_event(&mut self, _: Self::InEvent) {} - - fn inject_address_change(&mut self, _: &Multiaddr) {} - - fn poll( - &mut self, - _: &mut Context<'_>, - ) -> Poll, Self::Error>> - { - Poll::Pending - } -} - pub struct CloseMuxer { state: CloseMuxerState, } From d695ca63250fa1b7ca3bd220dffc72bb711f7902 Mon Sep 17 00:00:00 2001 From: Marco Munizaga Date: Mon, 10 Jan 2022 11:10:57 -0800 Subject: [PATCH 5/8] Revert "Fix tests and move tests::util::TestHandler to connection::util::TestHandler" This reverts commit 40cdec370f10bd6dcd5d60d84e84f5289a937cc3. --- core/src/connection.rs | 2 +- core/src/connection/pool.rs | 53 +------------------------------- core/src/connection/util.rs | 38 ----------------------- core/tests/aborted_connection.rs | 49 +++++++++++++++++++++++++++++ core/tests/concurrent_dialing.rs | 3 +- core/tests/connection_limits.rs | 4 +-- core/tests/network_dial_error.rs | 4 +-- core/tests/util.rs | 32 ++++++++++++++++++- 8 files changed, 87 insertions(+), 98 deletions(-) delete mode 100644 core/src/connection/util.rs create mode 100644 core/tests/aborted_connection.rs diff --git a/core/src/connection.rs b/core/src/connection.rs index 4ad1e9aa003..961eb4abc0e 100644 --- a/core/src/connection.rs +++ b/core/src/connection.rs @@ -22,7 +22,7 @@ mod error; pub(crate) mod handler; mod listeners; mod substream; -pub mod util; + pub(crate) mod pool; pub use error::{ diff --git a/core/src/connection/pool.rs b/core/src/connection/pool.rs index ca3c88d2e3d..c8a8a4c9456 100644 --- a/core/src/connection/pool.rs +++ b/core/src/connection/pool.rs @@ -969,12 +969,7 @@ impl PendingConnection<'_, THandler> { /// Aborts the connection attempt, closing the connection. pub fn abort(mut self) { if let Some(notifier) = self.entry.get_mut().abort_notifier.take() { - match notifier.send(task::PendingConnectionCommand::Abort) { - Ok(()) => {} - Err(e) => { - log::debug!("Failed to pending connection: {:?}", e); - } - } + notifier.send(task::PendingConnectionCommand::Abort); } } } @@ -1362,49 +1357,3 @@ impl<'a, K: 'a, V: 'a> EntryExt<'a, K, V> for hash_map::Entry<'a, K, V> { } } } - -#[cfg(test)] -mod tests { - use super::*; - - use crate::{ - connection::PendingOutboundConnectionError, - transport::dummy::DummyTransport, - }; - use futures::{executor::block_on, future::poll_fn}; - - #[test] - fn aborting_pending_connection_surfaces_error() { - let mut pool = Pool::new( - PeerId::random(), - PoolConfig::default(), - ConnectionLimits::default(), - ); - let target_peer = PeerId::random(); - - pool.add_outgoing( - DummyTransport::default(), - ["/ip4/127.0.0.1/tcp/1234".parse::().unwrap()].into_iter(), - Some(target_peer), - crate::connection::util::TestHandler(), - ) - .expect("Failed to add an outgoing connection to test pool."); - - // Disconnect from the peer, thus aborting all pending connections. - pool.disconnect(&target_peer); - - // Poll the pool until the pending connection is aborted. - block_on(poll_fn(|cx| match pool.poll(cx) { - Poll::Pending => return Poll::Pending, - Poll::Ready(PoolEvent::PendingOutboundConnectionError { - error: PendingOutboundConnectionError::Aborted, - .. - }) => { - return Poll::Ready(()); - } - Poll::Ready(_) => { - panic!("We should see an aborted error, nothing else.") - } - })); - } -} diff --git a/core/src/connection/util.rs b/core/src/connection/util.rs deleted file mode 100644 index 55986eafa04..00000000000 --- a/core/src/connection/util.rs +++ /dev/null @@ -1,38 +0,0 @@ -use std::{io, task::{Poll, Context}}; - -use multiaddr::Multiaddr; - -use crate::muxing::StreamMuxerBox; - -use super::{ConnectionHandler, Substream, SubstreamEndpoint, ConnectionHandlerEvent}; - - -#[derive(Debug)] -pub struct TestHandler(); - -impl ConnectionHandler for TestHandler { - type InEvent = (); - type OutEvent = (); - type Error = io::Error; - type Substream = Substream; - type OutboundOpenInfo = (); - - fn inject_substream( - &mut self, - _: Self::Substream, - _: SubstreamEndpoint, - ) { - } - - fn inject_event(&mut self, _: Self::InEvent) {} - - fn inject_address_change(&mut self, _: &Multiaddr) {} - - fn poll( - &mut self, - _: &mut Context<'_>, - ) -> Poll, Self::Error>> - { - Poll::Pending - } -} \ No newline at end of file diff --git a/core/tests/aborted_connection.rs b/core/tests/aborted_connection.rs new file mode 100644 index 00000000000..23c2eacc626 --- /dev/null +++ b/core/tests/aborted_connection.rs @@ -0,0 +1,49 @@ +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +use std::task::Context; + +use libp2p_core::{ + connection::{ + pool::{ConnectionLimits, Pool, PoolConfig}, + ConnectionHandler, + }, + transport::dummy::DummyTransport, + Multiaddr, PeerId, +}; + +mod util; + +#[test] +fn aborted_connection() { + let pool = Pool::new( + PeerId::random(), + PoolConfig::default(), + ConnectionLimits::default(), + ); + let target_peer = PeerId::random(); + pool.add_outgoing( + DummyTransport::default(), + ["/ip4/127.0.0.1/tcp/1234".parse::().unwrap()], + Some(target_peer), + util::TestHandler(), + ); + pool.disconnect(&target_peer); + let (cx, handle) = Context::new(); + pool.poll(cx); +} diff --git a/core/tests/concurrent_dialing.rs b/core/tests/concurrent_dialing.rs index 13b0e8fe5a2..08b1b5782e0 100644 --- a/core/tests/concurrent_dialing.rs +++ b/core/tests/concurrent_dialing.rs @@ -27,13 +27,12 @@ use libp2p_core::{ multiaddr::Protocol, network::{NetworkConfig, NetworkEvent}, ConnectedPoint, - connection::util::TestHandler, }; use quickcheck::*; use rand07::Rng; use std::num::NonZeroU8; use std::task::Poll; -use util::{test_network}; +use util::{test_network, TestHandler}; #[test] fn concurrent_dialing() { diff --git a/core/tests/connection_limits.rs b/core/tests/connection_limits.rs index bc2ad402c06..bee42b53055 100644 --- a/core/tests/connection_limits.rs +++ b/core/tests/connection_limits.rs @@ -23,13 +23,13 @@ mod util; use futures::{future::poll_fn, ready}; use libp2p_core::multiaddr::{multiaddr, Multiaddr}; use libp2p_core::{ - connection::{PendingConnectionError, util::TestHandler}, + connection::PendingConnectionError, network::{ConnectionLimits, DialError, NetworkConfig, NetworkEvent}, PeerId, }; use quickcheck::*; use std::task::Poll; -use util::test_network; +use util::{test_network, TestHandler}; #[test] fn max_outgoing() { diff --git a/core/tests/network_dial_error.rs b/core/tests/network_dial_error.rs index 5aba2efeca3..827db92e444 100644 --- a/core/tests/network_dial_error.rs +++ b/core/tests/network_dial_error.rs @@ -23,14 +23,14 @@ mod util; use futures::prelude::*; use libp2p_core::multiaddr::multiaddr; use libp2p_core::{ - connection::{PendingConnectionError, util::TestHandler}, + connection::PendingConnectionError, multiaddr::Protocol, network::{NetworkConfig, NetworkEvent}, PeerId, }; use rand::seq::SliceRandom; use std::{io, task::Poll}; -use util::test_network; +use util::{test_network, TestHandler}; #[test] fn deny_incoming_connec() { diff --git a/core/tests/util.rs b/core/tests/util.rs index b423da2d25f..9592daca9eb 100644 --- a/core/tests/util.rs +++ b/core/tests/util.rs @@ -2,7 +2,7 @@ use futures::prelude::*; use libp2p_core::{ - connection::util::TestHandler, + connection::{ConnectionHandler, ConnectionHandlerEvent, Substream, SubstreamEndpoint}, identity, muxing::{StreamMuxer, StreamMuxerBox}, network::{Network, NetworkConfig}, @@ -32,6 +32,36 @@ pub fn test_network(cfg: NetworkConfig) -> TestNetwork { TestNetwork::new(transport, local_public_key.into(), cfg) } +#[derive(Debug)] +pub struct TestHandler(); + +impl ConnectionHandler for TestHandler { + type InEvent = (); + type OutEvent = (); + type Error = io::Error; + type Substream = Substream; + type OutboundOpenInfo = (); + + fn inject_substream( + &mut self, + _: Self::Substream, + _: SubstreamEndpoint, + ) { + } + + fn inject_event(&mut self, _: Self::InEvent) {} + + fn inject_address_change(&mut self, _: &Multiaddr) {} + + fn poll( + &mut self, + _: &mut Context<'_>, + ) -> Poll, Self::Error>> + { + Poll::Pending + } +} + pub struct CloseMuxer { state: CloseMuxerState, } From 076b4862d6d098ef6b44f5e472607fe3e67b7e28 Mon Sep 17 00:00:00 2001 From: Marco Munizaga Date: Mon, 10 Jan 2022 11:12:51 -0800 Subject: [PATCH 6/8] Add debug msg --- core/src/connection/pool.rs | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/core/src/connection/pool.rs b/core/src/connection/pool.rs index c8a8a4c9456..dc6abad0753 100644 --- a/core/src/connection/pool.rs +++ b/core/src/connection/pool.rs @@ -969,7 +969,12 @@ impl PendingConnection<'_, THandler> { /// Aborts the connection attempt, closing the connection. pub fn abort(mut self) { if let Some(notifier) = self.entry.get_mut().abort_notifier.take() { - notifier.send(task::PendingConnectionCommand::Abort); + match notifier.send(task::PendingConnectionCommand::Abort) { + Ok(()) => {} + Err(e) => { + log::debug!("Failed to pending connection: {:?}", e); + } + } } } } From 1ffa811ea4387ed9ec7b15d030098fc67040502b Mon Sep 17 00:00:00 2001 From: Marco Munizaga Date: Mon, 10 Jan 2022 11:13:53 -0800 Subject: [PATCH 7/8] Add test using public network object --- core/tests/aborted_connection.rs | 77 ++++++++++++++++---------------- 1 file changed, 39 insertions(+), 38 deletions(-) diff --git a/core/tests/aborted_connection.rs b/core/tests/aborted_connection.rs index 23c2eacc626..5aaf91c547a 100644 --- a/core/tests/aborted_connection.rs +++ b/core/tests/aborted_connection.rs @@ -1,49 +1,50 @@ -// Permission is hereby granted, free of charge, to any person obtaining a -// copy of this software and associated documentation files (the "Software"), -// to deal in the Software without restriction, including without limitation -// the rights to use, copy, modify, merge, publish, distribute, sublicense, -// and/or sell copies of the Software, and to permit persons to whom the -// Software is furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS -// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING -// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER -// DEALINGS IN THE SOFTWARE. - -use std::task::Context; +mod util; + +use std::task::Poll; use libp2p_core::{ - connection::{ - pool::{ConnectionLimits, Pool, PoolConfig}, - ConnectionHandler, - }, + connection::{self, PendingOutboundConnectionError}, + network::{NetworkConfig, NetworkEvent}, transport::dummy::DummyTransport, - Multiaddr, PeerId, + Multiaddr, Network, PeerId, }; -mod util; +use futures::{executor::block_on, future::poll_fn}; +use multihash::Multihash; #[test] -fn aborted_connection() { - let pool = Pool::new( +fn aborting_pending_connection_surfaces_error() { + let mut network = Network::new( + DummyTransport::default(), PeerId::random(), - PoolConfig::default(), - ConnectionLimits::default(), + NetworkConfig::default(), ); + let target_peer = PeerId::random(); - pool.add_outgoing( - DummyTransport::default(), - ["/ip4/127.0.0.1/tcp/1234".parse::().unwrap()], - Some(target_peer), - util::TestHandler(), - ); - pool.disconnect(&target_peer); - let (cx, handle) = Context::new(); - pool.poll(cx); + let mut target_multiaddr = "/ip4/127.0.0.1/tcp/1234".parse::().unwrap(); + target_multiaddr.push(multiaddr::Protocol::P2p(target_peer.into())); + + let handler = util::TestHandler(); + network + .dial(&target_multiaddr, handler) + .expect("dial failed"); + + let dialing_peer = network + .peer(target_peer) + .into_dialing() + .expect("peer should be dialing"); + + dialing_peer.disconnect(); + block_on(poll_fn(|cx| match network.poll(cx) { + Poll::Pending => return Poll::Pending, + Poll::Ready(NetworkEvent::DialError { + error: PendingOutboundConnectionError::Aborted, + .. + }) => { + return Poll::Ready(()); + } + Poll::Ready(_) => { + panic!("We should see an aborted error, nothing else.") + } + })); } From 549861c878f69bc0222fddaa6c31500022c764e1 Mon Sep 17 00:00:00 2001 From: Marco Munizaga Date: Mon, 10 Jan 2022 11:23:15 -0800 Subject: [PATCH 8/8] Signal abort by closing channel. And s/EstablishedConnectionCommand/Command --- core/src/connection/pool.rs | 22 +++++++--------------- core/src/connection/pool/task.rs | 29 +++++++++-------------------- 2 files changed, 16 insertions(+), 35 deletions(-) diff --git a/core/src/connection/pool.rs b/core/src/connection/pool.rs index dc6abad0753..2f5f881b4b3 100644 --- a/core/src/connection/pool.rs +++ b/core/src/connection/pool.rs @@ -50,6 +50,7 @@ use std::{ task::Context, task::Poll, }; +use void::Void; mod concurrent_dial; mod task; @@ -112,7 +113,7 @@ struct EstablishedConnectionInfo { peer_id: PeerId, endpoint: ConnectedPoint, /// Channel endpoint to send commands to the task. - sender: mpsc::Sender>, + sender: mpsc::Sender>, } impl EstablishedConnectionInfo { @@ -122,11 +123,7 @@ impl EstablishedConnectionInfo { pub fn start_close(&mut self) { // Clone the sender so that we are guaranteed to have // capacity for the close command (every sender gets a slot). - match self - .sender - .clone() - .try_send(task::EstablishedConnectionCommand::Close) - { + match self.sender.clone().try_send(task::Command::Close) { Ok(()) => {} Err(e) => assert!(e.is_disconnected(), "No capacity for close command."), }; @@ -140,7 +137,7 @@ struct PendingConnectionInfo { handler: THandler, endpoint: PendingPoint, /// When dropped, notifies the task which then knows to terminate. - abort_notifier: Option>, + abort_notifier: Option>, } impl fmt::Debug for Pool { @@ -969,12 +966,7 @@ impl PendingConnection<'_, THandler> { /// Aborts the connection attempt, closing the connection. pub fn abort(mut self) { if let Some(notifier) = self.entry.get_mut().abort_notifier.take() { - match notifier.send(task::PendingConnectionCommand::Abort) { - Ok(()) => {} - Err(e) => { - log::debug!("Failed to pending connection: {:?}", e); - } - } + drop(notifier); } } } @@ -1022,13 +1014,13 @@ impl EstablishedConnection<'_, TInEvent> { /// of `notify_handler`, it only fails if the connection is now about /// to close. pub fn notify_handler(&mut self, event: TInEvent) -> Result<(), TInEvent> { - let cmd = task::EstablishedConnectionCommand::NotifyHandler(event); + let cmd = task::Command::NotifyHandler(event); self.entry .get_mut() .sender .try_send(cmd) .map_err(|e| match e.into_inner() { - task::EstablishedConnectionCommand::NotifyHandler(event) => event, + task::Command::NotifyHandler(event) => event, _ => unreachable!("Expect failed send to return initial event."), }) } diff --git a/core/src/connection/pool/task.rs b/core/src/connection/pool/task.rs index d1d0e98b948..889847afdef 100644 --- a/core/src/connection/pool/task.rs +++ b/core/src/connection/pool/task.rs @@ -39,16 +39,11 @@ use futures::{ SinkExt, StreamExt, }; use std::pin::Pin; - -/// Commands that can be sent to a task driving a pending connection. -#[derive(Debug)] -pub enum PendingConnectionCommand { - Abort, -} +use void::Void; /// Commands that can be sent to a task driving an established connection. #[derive(Debug)] -pub enum EstablishedConnectionCommand { +pub enum Command { /// Notify the connection handler of an event. NotifyHandler(T), /// Gracefully close the connection (active close) before @@ -108,16 +103,13 @@ pub enum EstablishedConnectionEvent { pub async fn new_for_pending_outgoing_connection( connection_id: ConnectionId, dial: ConcurrentDial, - abort_receiver: oneshot::Receiver, + abort_receiver: oneshot::Receiver, mut events: mpsc::Sender>, ) where TTrans: Transport, { match futures::future::select(abort_receiver, Box::pin(dial)).await { Either::Left((Err(oneshot::Canceled), _)) => { - unreachable!("Pool never drops channel to task."); - } - Either::Left((Ok(PendingConnectionCommand::Abort), _)) => { let _ = events .send(PendingConnectionEvent::PendingFailed { id: connection_id, @@ -125,6 +117,7 @@ pub async fn new_for_pending_outgoing_connection( }) .await; } + Either::Left((Ok(v), _)) => void::unreachable(v), Either::Right((Ok((address, output, errors)), _)) => { let _ = events .send(PendingConnectionEvent::ConnectionEstablished { @@ -148,7 +141,7 @@ pub async fn new_for_pending_outgoing_connection( pub async fn new_for_pending_incoming_connection( connection_id: ConnectionId, future: TFut, - abort_receiver: oneshot::Receiver, + abort_receiver: oneshot::Receiver, mut events: mpsc::Sender>, ) where TTrans: Transport, @@ -156,9 +149,6 @@ pub async fn new_for_pending_incoming_connection( { match futures::future::select(abort_receiver, Box::pin(future)).await { Either::Left((Err(oneshot::Canceled), _)) => { - unreachable!("Pool never drops channel to task."); - } - Either::Left((Ok(PendingConnectionCommand::Abort), _)) => { let _ = events .send(PendingConnectionEvent::PendingFailed { id: connection_id, @@ -166,6 +156,7 @@ pub async fn new_for_pending_incoming_connection( }) .await; } + Either::Left((Ok(v), _)) => void::unreachable(v), Either::Right((Ok(output), _)) => { let _ = events .send(PendingConnectionEvent::ConnectionEstablished { @@ -192,7 +183,7 @@ pub async fn new_for_established_connection( connection_id: ConnectionId, peer_id: PeerId, mut connection: crate::connection::Connection, - mut command_receiver: mpsc::Receiver>>, + mut command_receiver: mpsc::Receiver>>, mut events: mpsc::Sender>, ) where TMuxer: StreamMuxer, @@ -207,10 +198,8 @@ pub async fn new_for_established_connection( .await { Either::Left((Some(command), _)) => match command { - EstablishedConnectionCommand::NotifyHandler(event) => { - connection.inject_event(event) - } - EstablishedConnectionCommand::Close => { + Command::NotifyHandler(event) => connection.inject_event(event), + Command::Close => { command_receiver.close(); let (handler, closing_muxer) = connection.close();