diff --git a/core/src/connection.rs b/core/src/connection.rs index 961eb4abc0e..4ad1e9aa003 100644 --- a/core/src/connection.rs +++ b/core/src/connection.rs @@ -22,7 +22,7 @@ mod error; pub(crate) mod handler; mod listeners; mod substream; - +pub mod util; pub(crate) mod pool; pub use error::{ diff --git a/core/src/connection/pool.rs b/core/src/connection/pool.rs index c8a8a4c9456..ca3c88d2e3d 100644 --- a/core/src/connection/pool.rs +++ b/core/src/connection/pool.rs @@ -969,7 +969,12 @@ impl PendingConnection<'_, THandler> { /// Aborts the connection attempt, closing the connection. pub fn abort(mut self) { if let Some(notifier) = self.entry.get_mut().abort_notifier.take() { - notifier.send(task::PendingConnectionCommand::Abort); + match notifier.send(task::PendingConnectionCommand::Abort) { + Ok(()) => {} + Err(e) => { + log::debug!("Failed to pending connection: {:?}", e); + } + } } } } @@ -1357,3 +1362,49 @@ impl<'a, K: 'a, V: 'a> EntryExt<'a, K, V> for hash_map::Entry<'a, K, V> { } } } + +#[cfg(test)] +mod tests { + use super::*; + + use crate::{ + connection::PendingOutboundConnectionError, + transport::dummy::DummyTransport, + }; + use futures::{executor::block_on, future::poll_fn}; + + #[test] + fn aborting_pending_connection_surfaces_error() { + let mut pool = Pool::new( + PeerId::random(), + PoolConfig::default(), + ConnectionLimits::default(), + ); + let target_peer = PeerId::random(); + + pool.add_outgoing( + DummyTransport::default(), + ["/ip4/127.0.0.1/tcp/1234".parse::().unwrap()].into_iter(), + Some(target_peer), + crate::connection::util::TestHandler(), + ) + .expect("Failed to add an outgoing connection to test pool."); + + // Disconnect from the peer, thus aborting all pending connections. + pool.disconnect(&target_peer); + + // Poll the pool until the pending connection is aborted. + block_on(poll_fn(|cx| match pool.poll(cx) { + Poll::Pending => return Poll::Pending, + Poll::Ready(PoolEvent::PendingOutboundConnectionError { + error: PendingOutboundConnectionError::Aborted, + .. + }) => { + return Poll::Ready(()); + } + Poll::Ready(_) => { + panic!("We should see an aborted error, nothing else.") + } + })); + } +} diff --git a/core/src/connection/util.rs b/core/src/connection/util.rs new file mode 100644 index 00000000000..55986eafa04 --- /dev/null +++ b/core/src/connection/util.rs @@ -0,0 +1,38 @@ +use std::{io, task::{Poll, Context}}; + +use multiaddr::Multiaddr; + +use crate::muxing::StreamMuxerBox; + +use super::{ConnectionHandler, Substream, SubstreamEndpoint, ConnectionHandlerEvent}; + + +#[derive(Debug)] +pub struct TestHandler(); + +impl ConnectionHandler for TestHandler { + type InEvent = (); + type OutEvent = (); + type Error = io::Error; + type Substream = Substream; + type OutboundOpenInfo = (); + + fn inject_substream( + &mut self, + _: Self::Substream, + _: SubstreamEndpoint, + ) { + } + + fn inject_event(&mut self, _: Self::InEvent) {} + + fn inject_address_change(&mut self, _: &Multiaddr) {} + + fn poll( + &mut self, + _: &mut Context<'_>, + ) -> Poll, Self::Error>> + { + Poll::Pending + } +} \ No newline at end of file diff --git a/core/tests/aborted_connection.rs b/core/tests/aborted_connection.rs deleted file mode 100644 index 23c2eacc626..00000000000 --- a/core/tests/aborted_connection.rs +++ /dev/null @@ -1,49 +0,0 @@ -// Permission is hereby granted, free of charge, to any person obtaining a -// copy of this software and associated documentation files (the "Software"), -// to deal in the Software without restriction, including without limitation -// the rights to use, copy, modify, merge, publish, distribute, sublicense, -// and/or sell copies of the Software, and to permit persons to whom the -// Software is furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS -// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING -// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER -// DEALINGS IN THE SOFTWARE. - -use std::task::Context; - -use libp2p_core::{ - connection::{ - pool::{ConnectionLimits, Pool, PoolConfig}, - ConnectionHandler, - }, - transport::dummy::DummyTransport, - Multiaddr, PeerId, -}; - -mod util; - -#[test] -fn aborted_connection() { - let pool = Pool::new( - PeerId::random(), - PoolConfig::default(), - ConnectionLimits::default(), - ); - let target_peer = PeerId::random(); - pool.add_outgoing( - DummyTransport::default(), - ["/ip4/127.0.0.1/tcp/1234".parse::().unwrap()], - Some(target_peer), - util::TestHandler(), - ); - pool.disconnect(&target_peer); - let (cx, handle) = Context::new(); - pool.poll(cx); -} diff --git a/core/tests/concurrent_dialing.rs b/core/tests/concurrent_dialing.rs index 08b1b5782e0..13b0e8fe5a2 100644 --- a/core/tests/concurrent_dialing.rs +++ b/core/tests/concurrent_dialing.rs @@ -27,12 +27,13 @@ use libp2p_core::{ multiaddr::Protocol, network::{NetworkConfig, NetworkEvent}, ConnectedPoint, + connection::util::TestHandler, }; use quickcheck::*; use rand07::Rng; use std::num::NonZeroU8; use std::task::Poll; -use util::{test_network, TestHandler}; +use util::{test_network}; #[test] fn concurrent_dialing() { diff --git a/core/tests/connection_limits.rs b/core/tests/connection_limits.rs index bee42b53055..bc2ad402c06 100644 --- a/core/tests/connection_limits.rs +++ b/core/tests/connection_limits.rs @@ -23,13 +23,13 @@ mod util; use futures::{future::poll_fn, ready}; use libp2p_core::multiaddr::{multiaddr, Multiaddr}; use libp2p_core::{ - connection::PendingConnectionError, + connection::{PendingConnectionError, util::TestHandler}, network::{ConnectionLimits, DialError, NetworkConfig, NetworkEvent}, PeerId, }; use quickcheck::*; use std::task::Poll; -use util::{test_network, TestHandler}; +use util::test_network; #[test] fn max_outgoing() { diff --git a/core/tests/network_dial_error.rs b/core/tests/network_dial_error.rs index 827db92e444..5aba2efeca3 100644 --- a/core/tests/network_dial_error.rs +++ b/core/tests/network_dial_error.rs @@ -23,14 +23,14 @@ mod util; use futures::prelude::*; use libp2p_core::multiaddr::multiaddr; use libp2p_core::{ - connection::PendingConnectionError, + connection::{PendingConnectionError, util::TestHandler}, multiaddr::Protocol, network::{NetworkConfig, NetworkEvent}, PeerId, }; use rand::seq::SliceRandom; use std::{io, task::Poll}; -use util::{test_network, TestHandler}; +use util::test_network; #[test] fn deny_incoming_connec() { diff --git a/core/tests/util.rs b/core/tests/util.rs index 9592daca9eb..b423da2d25f 100644 --- a/core/tests/util.rs +++ b/core/tests/util.rs @@ -2,7 +2,7 @@ use futures::prelude::*; use libp2p_core::{ - connection::{ConnectionHandler, ConnectionHandlerEvent, Substream, SubstreamEndpoint}, + connection::util::TestHandler, identity, muxing::{StreamMuxer, StreamMuxerBox}, network::{Network, NetworkConfig}, @@ -32,36 +32,6 @@ pub fn test_network(cfg: NetworkConfig) -> TestNetwork { TestNetwork::new(transport, local_public_key.into(), cfg) } -#[derive(Debug)] -pub struct TestHandler(); - -impl ConnectionHandler for TestHandler { - type InEvent = (); - type OutEvent = (); - type Error = io::Error; - type Substream = Substream; - type OutboundOpenInfo = (); - - fn inject_substream( - &mut self, - _: Self::Substream, - _: SubstreamEndpoint, - ) { - } - - fn inject_event(&mut self, _: Self::InEvent) {} - - fn inject_address_change(&mut self, _: &Multiaddr) {} - - fn poll( - &mut self, - _: &mut Context<'_>, - ) -> Poll, Self::Error>> - { - Poll::Pending - } -} - pub struct CloseMuxer { state: CloseMuxerState, }