Skip to content

Commit

Permalink
swarm/: Report aborted connections (#2517)
Browse files Browse the repository at this point in the history
Disconnect pending connections with `Swarm::disconnect` and eport aborted
connections via `SwarmEvent::OutgoingConnectionError`.

Co-authored-by: Jack Maloney <git@jmmaloney4.xyz>
Co-authored-by: Marco Munizaga <git@marcopolo.io>
  • Loading branch information
3 people committed Feb 15, 2022
1 parent e66f04f commit 146ed5f
Show file tree
Hide file tree
Showing 5 changed files with 69 additions and 33 deletions.
5 changes: 5 additions & 0 deletions swarm/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,12 @@

- Update to `libp2p-core` `v0.32.0`.

- Disconnect pending connections with `Swarm::disconnect`. See [PR 2517].

- Report aborted connections via `SwarmEvent::OutgoingConnectionError`. See [PR 2517].

[PR 2492]: https://github.com/libp2p/rust-libp2p/pull/2492
[PR 2517]: https://github.com/libp2p/rust-libp2p/pull/2517

# 0.33.0 [2022-01-27]

Expand Down
1 change: 1 addition & 0 deletions swarm/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ void = "1"

[dev-dependencies]
async-std = { version = "1.6.2", features = ["attributes"] }
env_logger = "0.9"
libp2p = { path = "../", default-features = false, features = ["identify", "ping", "plaintext", "yamux"] }
libp2p-mplex = { path = "../muxers/mplex" }
libp2p-noise = { path = "../transports/noise" }
Expand Down
39 changes: 16 additions & 23 deletions swarm/src/connection/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ struct PendingConnectionInfo<THandler> {
handler: THandler,
endpoint: PendingPoint,
/// When dropped, notifies the task which then knows to terminate.
_drop_notifier: oneshot::Sender<Void>,
abort_notifier: Option<oneshot::Sender<Void>>,
}

impl<THandler: IntoConnectionHandler, TTrans: Transport> fmt::Debug for Pool<THandler, TTrans> {
Expand Down Expand Up @@ -340,10 +340,7 @@ where
/// Returns `None` if the pool has no connection with the given ID.
pub fn get(&mut self, id: ConnectionId) -> Option<PoolConnection<'_, THandler>> {
if let hash_map::Entry::Occupied(entry) = self.pending.entry(id) {
Some(PoolConnection::Pending(PendingConnection {
entry,
counters: &mut self.counters,
}))
Some(PoolConnection::Pending(PendingConnection { entry }))
} else {
self.established
.iter_mut()
Expand Down Expand Up @@ -406,11 +403,7 @@ where
.entry(pending_connection)
.expect_occupied("Iterating pending connections");

PendingConnection {
entry,
counters: &mut self.counters,
}
.abort();
PendingConnection { entry }.abort();
}
}

Expand Down Expand Up @@ -501,13 +494,13 @@ where

let connection_id = self.next_connection_id();

let (drop_notifier, drop_receiver) = oneshot::channel();
let (abort_notifier, abort_receiver) = oneshot::channel();

self.spawn(
task::new_for_pending_outgoing_connection(
connection_id,
dial,
drop_receiver,
abort_receiver,
self.pending_connection_events_tx.clone(),
)
.boxed(),
Expand All @@ -521,8 +514,8 @@ where
PendingConnectionInfo {
peer_id: peer,
handler,
endpoint: endpoint,
_drop_notifier: drop_notifier,
endpoint,
abort_notifier: Some(abort_notifier),
},
);
Ok(connection_id)
Expand Down Expand Up @@ -550,13 +543,13 @@ where

let connection_id = self.next_connection_id();

let (drop_notifier, drop_receiver) = oneshot::channel();
let (abort_notifier, abort_receiver) = oneshot::channel();

self.spawn(
task::new_for_pending_incoming_connection(
connection_id,
future,
drop_receiver,
abort_receiver,
self.pending_connection_events_tx.clone(),
)
.boxed(),
Expand All @@ -569,7 +562,7 @@ where
peer_id: None,
handler,
endpoint: endpoint.into(),
_drop_notifier: drop_notifier,
abort_notifier: Some(abort_notifier),
},
);
Ok(connection_id)
Expand Down Expand Up @@ -685,7 +678,7 @@ where
peer_id: expected_peer_id,
handler,
endpoint,
_drop_notifier,
abort_notifier: _,
} = self
.pending
.remove(&id)
Expand Down Expand Up @@ -854,7 +847,7 @@ where
peer_id,
handler,
endpoint,
_drop_notifier,
abort_notifier: _,
}) = self.pending.remove(&id)
{
self.counters.dec_pending(&endpoint);
Expand Down Expand Up @@ -911,14 +904,14 @@ pub enum PoolConnection<'a, THandler: IntoConnectionHandler> {
/// A pending connection in a pool.
pub struct PendingConnection<'a, THandler: IntoConnectionHandler> {
entry: hash_map::OccupiedEntry<'a, ConnectionId, PendingConnectionInfo<THandler>>,
counters: &'a mut ConnectionCounters,
}

impl<THandler: IntoConnectionHandler> PendingConnection<'_, THandler> {
/// Aborts the connection attempt, closing the connection.
pub fn abort(self) {
self.counters.dec_pending(&self.entry.get().endpoint);
self.entry.remove();
pub fn abort(mut self) {
if let Some(notifier) = self.entry.get_mut().abort_notifier.take() {
drop(notifier);
}
}
}

Expand Down
10 changes: 5 additions & 5 deletions swarm/src/connection/pool/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ use libp2p_core::muxing::StreamMuxer;
use std::pin::Pin;
use void::Void;

/// Commands that can be sent to a task.
/// Commands that can be sent to a task driving an established connection.
#[derive(Debug)]
pub enum Command<T> {
/// Notify the connection handler of an event.
Expand Down Expand Up @@ -104,12 +104,12 @@ pub enum EstablishedConnectionEvent<THandler: IntoConnectionHandler> {
pub async fn new_for_pending_outgoing_connection<TTrans>(
connection_id: ConnectionId,
dial: ConcurrentDial<TTrans>,
drop_receiver: oneshot::Receiver<Void>,
abort_receiver: oneshot::Receiver<Void>,
mut events: mpsc::Sender<PendingConnectionEvent<TTrans>>,
) where
TTrans: Transport,
{
match futures::future::select(drop_receiver, Box::pin(dial)).await {
match futures::future::select(abort_receiver, Box::pin(dial)).await {
Either::Left((Err(oneshot::Canceled), _)) => {
let _ = events
.send(PendingConnectionEvent::PendingFailed {
Expand Down Expand Up @@ -142,13 +142,13 @@ pub async fn new_for_pending_outgoing_connection<TTrans>(
pub async fn new_for_pending_incoming_connection<TFut, TTrans>(
connection_id: ConnectionId,
future: TFut,
drop_receiver: oneshot::Receiver<Void>,
abort_receiver: oneshot::Receiver<Void>,
mut events: mpsc::Sender<PendingConnectionEvent<TTrans>>,
) where
TTrans: Transport,
TFut: Future<Output = Result<TTrans::Output, TTrans::Error>> + Send + 'static,
{
match futures::future::select(drop_receiver, Box::pin(future)).await {
match futures::future::select(abort_receiver, Box::pin(future)).await {
Either::Left((Err(oneshot::Canceled), _)) => {
let _ = events
.send(PendingConnectionEvent::PendingFailed {
Expand Down
47 changes: 42 additions & 5 deletions swarm/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -624,12 +624,14 @@ where
/// with [`ProtocolsHandler::connection_keep_alive`] or directly with
/// [`ProtocolsHandlerEvent::Close`].
pub fn disconnect_peer_id(&mut self, peer_id: PeerId) -> Result<(), ()> {
if self.pool.is_connected(peer_id) {
self.pool.disconnect(peer_id);
return Ok(());
}
let was_connected = self.pool.is_connected(peer_id);
self.pool.disconnect(peer_id);

Err(())
if was_connected {
Ok(())
} else {
Err(())
}
}

/// Checks whether there is an established connection to a peer.
Expand Down Expand Up @@ -2422,4 +2424,39 @@ mod tests {
}))
.unwrap();
}

#[test]
fn aborting_pending_connection_surfaces_error() {
let _ = env_logger::try_init();

let mut dialer = new_test_swarm::<_, ()>(DummyProtocolsHandler::default()).build();
let mut listener = new_test_swarm::<_, ()>(DummyProtocolsHandler::default()).build();

let listener_peer_id = *listener.local_peer_id();
listener.listen_on(multiaddr![Memory(0u64)]).unwrap();
let listener_address = match block_on(listener.next()).unwrap() {
SwarmEvent::NewListenAddr { address, .. } => address,
e => panic!("Unexpected network event: {:?}", e),
};

dialer
.dial(
DialOpts::peer_id(listener_peer_id)
.addresses(vec![listener_address])
.build(),
)
.unwrap();

dialer
.disconnect_peer_id(listener_peer_id)
.expect_err("Expect peer to not yet be connected.");

match block_on(dialer.next()).unwrap() {
SwarmEvent::OutgoingConnectionError {
error: DialError::Aborted,
..
} => {}
e => panic!("Unexpected swarm event {:?}.", e),
}
}
}

0 comments on commit 146ed5f

Please sign in to comment.