Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

core/: Report pending connection being aborted #2329

Closed
mxinden opened this issue Nov 8, 2021 · 15 comments · Fixed by #2517
Closed

core/: Report pending connection being aborted #2329

mxinden opened this issue Nov 8, 2021 · 15 comments · Fixed by #2517
Labels
difficulty:moderate getting-started Issues that can be tackled if you don't know the internals of libp2p very well help wanted

Comments

@mxinden
Copy link
Member

mxinden commented Nov 8, 2021

Today when aborting a pending connection, we remove the pending connection from Pool::pending directly:

/// Aborts the connection attempt, closing the connection.
pub fn abort(self) {
self.counters.dec_pending(&self.entry.get().endpoint);
self.entry.remove();
}

Later on, when the task reports that it has been aborted, the Pool checks if it has an entry in Pool::pending, which it does not, thus dropping the error instead of reporting it.

if let Some(PendingConnectionInfo {
peer_id,
handler,
endpoint,
_drop_notifier,
}) = self.pending.remove(&id)

We should instead report the pending connection being aborted by emitting an event.

@mxinden mxinden added difficulty:moderate help wanted getting-started Issues that can be tackled if you don't know the internals of libp2p very well labels Nov 8, 2021
@JerryHue
Copy link
Contributor

@mxinden, hello! Could I try to look into this?

@mxinden
Copy link
Member Author

mxinden commented Nov 15, 2021

@JerryHue sure thing. Help here is very much appreciated.

Unit test should be fairly simple, checking for a PendingConnectionError::Aborted after aborting a pending connection.

@MarcoPolo
Copy link
Contributor

@JerryHue have you started working on this? If not, do you mind if I pick this up? Thanks!

@JerryHue
Copy link
Contributor

Hello, @MarcoPolo! I will start to work on this today. If by the end of the day I couldn't advance much, I will delegate it to you. Is that alright?

@MarcoPolo
Copy link
Contributor

Of course!

@JerryHue
Copy link
Contributor

@MarcoPolo, hello! I read through the code, but I feel like this might be a little bit too much for me, especially since I am not familiar with this part of the core (I only worked with PeerIds, which arguably, were an easier concept for me).

Sorry for the inconvenience, and thank you for the prompt responses, @mxinden and @MarcoPolo.

@MarcoPolo
Copy link
Contributor

@mxinden could you assign this to me please?

@jmmaloney4
Copy link
Contributor

@MarcoPolo are you working on this? Do you mind if I take a stab?

@MarcoPolo
Copy link
Contributor

yep! please take!

@MarcoPolo MarcoPolo removed their assignment Dec 3, 2021
@jmmaloney4
Copy link
Contributor

Hi @mxinden, I have a few questions about the code.

Will both of these be needed?

pub fn abort(self) {
self.counters.dec_pending(&self.entry.get().endpoint);
self.entry.remove();
}

}) = self.pending.remove(&id)
{
self.counters.dec_pending(&endpoint);

I'm assuming the ones in abort should be removed, and the PendingConnection should be removed from the hashmap and have the counter decremented in the Pool::poll method instead. Does this sound correct?

A follow up: If the above is correct and I remove both those actions from the PendingConnection::abort method, how does the connection get aborted? Those are the only two things the abort method does? I see that here:

let event = match self.pending_connection_events_rx.poll_next_unpin(cx) {

We pull events from a queue, and I think we want there to be a PendingConnectionEvent::PendingFailed event with a PendingConnectionError::Aborted in the error field, which we then return from the Pool::poll method. I hope I'm putting the pieces together correctly.

So, where does/should this PendingConnectionError::Aborted come from? It seems like the only method that calls the PendingConnection::abort method is the Pool::disconnect method here

PendingConnection {
entry,
counters: &mut self.counters,
}
.abort();

And I can't find anything in that method that seems to generate an event. Some guidance would be appreciated. Thanks! -Jack

@MarcoPolo
Copy link
Contributor

Thanks for looking into this @jmmaloney4! Some thoughts:

  1. When we abort a pending connection we drop the PendingConnectionInfo object. This drops the notifier which closes the oneshot channel.
  2. The task gets notified that the notifier dropped and adds the aborted event here (and the equivalent inbound version here)
  3. So the aborted event is already being propagated to the pool, but it isn’t getting returned by the Pool’s Poll method because there’s no else statement in the if let
  4. PendingOutboundConnectionError needs a handler and Option. So we’ll need to preserve the handler/peer info to send back a proper error.
  5. We should make sure there are no weird race conditions in the Pool’s poll if we remove the entry (I’m thinking about this case specifically but maybe there are others – but maybe this is a none issue since the task will stop advancing as soon as it gets the cancellation signal.)
  6. PendingConnection is a public struct (should it be? that’s a different q)
  7. PendingConnection.abort is also public.

As long as we can send back the proper PoolError then we should be set. The current blockers seem to be that we lose the handler/Option when we drop the entry.

I’d advise against not changing the public interface to PendingConnection.

There’s a couple different solutions so feel free to pick your own, but I personally would probably send a special value to the task itself telling it that we want to abort this pending connection via the field currently called _drop_notifier (and rename that field to drop_or_abort_notifier). Not have the abort method remove the entry or decrement the counter. And I think that should just work.

Good luck! let me know if anything above is unclear and I can try to elaborate :)

@mxinden
Copy link
Member Author

mxinden commented Dec 6, 2021

There’s a couple different solutions so feel free to pick your own, but I personally would probably send a special value to the task itself telling it that we want to abort this pending connection via the field currently called _drop_notifier (and rename that field to drop_or_abort_notifier). Not have the abort method remove the entry or decrement the counter. And I think that should just work.

Agreed.

How about something along the lines of the below?

diff --git a/core/src/connection/pool.rs b/core/src/connection/pool.rs
index 59d41908..2df0770d 100644
--- a/core/src/connection/pool.rs
+++ b/core/src/connection/pool.rs
@@ -136,8 +136,8 @@ struct PendingConnectionInfo<THandler> {
     /// Handler to handle connection once no longer pending but established.
     handler: THandler,
     endpoint: PendingPoint,
-    /// When dropped, notifies the task which then knows to terminate.
-    _drop_notifier: oneshot::Sender<Void>,
+    /// Channel to command the task to abort.
+    abort_notifier: Option<oneshot::Sender<()>>,
 }
 
 impl<THandler: IntoConnectionHandler, TTrans: Transport> fmt::Debug for Pool<THandler, TTrans> {
@@ -955,7 +955,6 @@ pub enum PoolConnection<'a, THandler: IntoConnectionHandler> {
 /// A pending connection in a pool.
 pub struct PendingConnection<'a, THandler: IntoConnectionHandler> {
     entry: hash_map::OccupiedEntry<'a, ConnectionId, PendingConnectionInfo<THandler>>,
-    counters: &'a mut ConnectionCounters,
 }
 
 impl<THandler: IntoConnectionHandler> PendingConnection<'_, THandler> {
@@ -976,8 +975,9 @@ impl<THandler: IntoConnectionHandler> PendingConnection<'_, THandler> {
 
     /// Aborts the connection attempt, closing the connection.
     pub fn abort(self) {
-        self.counters.dec_pending(&self.entry.get().endpoint);
-        self.entry.remove();
+        if let Some(notifier) = self.entry.get_mut().drop_notifier.take() {
+            notifier.send(task::PendingConnectionCommand::Abort)
+        }
     }
 }
 
diff --git a/core/src/connection/pool/task.rs b/core/src/connection/pool/task.rs
index 9062583f..86581166 100644
--- a/core/src/connection/pool/task.rs
+++ b/core/src/connection/pool/task.rs
@@ -41,9 +41,15 @@ use futures::{
 use std::pin::Pin;
 use void::Void;
 
-/// Commands that can be sent to a task.
+/// Commands that can be sent to a task driving a pending connection.
 #[derive(Debug)]
-pub enum Command<T> {
+pub enum PendingConnectionCommand<T> {
+    Abort,
+}
+
+/// Commands that can be sent to a task driving an established connection.
+#[derive(Debug)]
+pub enum EstablishedConnectionCommand<T> {
     /// Notify the connection handler of an event.
     NotifyHandler(T),
     /// Gracefully close the connection (active close) before
@@ -103,13 +109,16 @@ pub enum EstablishedConnectionEvent<THandler: IntoConnectionHandler> {
 pub async fn new_for_pending_outgoing_connection<TTrans>(
     connection_id: ConnectionId,
     dial: ConcurrentDial<TTrans>,
-    drop_receiver: oneshot::Receiver<Void>,
+    abort_receiver: oneshot::Receiver<PendingConnectionCommand>,
     mut events: mpsc::Sender<PendingConnectionEvent<TTrans>>,
 ) where
     TTrans: Transport,
 {
-    match futures::future::select(drop_receiver, Box::pin(dial)).await {
+    match futures::future::select(abort_receiver, Box::pin(dial)).await {
         Either::Left((Err(oneshot::Canceled), _)) => {
+            unreachable!("Pool never drops channel to task.");
+        }
+        Either::Left((Ok(PendingConnectionCommand::Abort), _)) => {
             let _ = events
                 .send(PendingConnectionEvent::PendingFailed {
                     id: connection_id,
@@ -117,7 +126,6 @@ pub async fn new_for_pending_outgoing_connection<TTrans>(
                 })
                 .await;
         }
-        Either::Left((Ok(v), _)) => void::unreachable(v),
         Either::Right((Ok((address, output, errors)), _)) => {
             let _ = events
                 .send(PendingConnectionEvent::ConnectionEstablished {

@jmmaloney4
Copy link
Contributor

Thanks for the pointers @MarcoPolo @mxinden. I have run into an issue now where the PendingConnection::abort method takes an immutable owned self, but calling self.entry.get_mut() requires a mutable self. I'm a bit of a rust beginner so I'm not sure if there is a good way to handle this without modifying the public interface of PendingConnection, but my intuition leads me to believe some kind of cell or smart pointer might do the trick, but I know these usually bring threading issues. What are your thoughts?

Also, when thinking about the potential race condition here:

let PendingConnectionInfo {
peer_id: expected_peer_id,
handler,
endpoint,
_drop_notifier,
} = self
.pending
.remove(&id)
.expect("Entry in `self.pending` for previously pending connection.");

when the pending connection has already been removed in the abort handler, I believe the proper behavior should just be to continue to abort the connection. I think this can be handled by switching the .expect() out for a if let style guard as here:

if let Some(PendingConnectionInfo {
peer_id,
handler,
endpoint,
_drop_notifier,
}) = self.pending.remove(&id)

I think the other case that could happen is that the abort message beats the successfully connected message. What then? The PendingConnection will have been removed from the list. Is it enough to just not handle the successfully connected message? Or do we need to do some cleanup of the connection and inform the other end that it has been aborted?

@MarcoPolo
Copy link
Contributor

I think you can just put mut in front of the self param like so:

diff --git a/core/src/connection/pool.rs b/core/src/connection/pool.rs
index 163da217..a6d49dc4 100644
--- a/core/src/connection/pool.rs
+++ b/core/src/connection/pool.rs
@@ -972,7 +972,7 @@ impl<THandler: IntoConnectionHandler> PendingConnection<'_, THandler> {
     }
 
     /// Aborts the connection attempt, closing the connection.
-    pub fn abort(self) {
+    pub fn abort(mut self) {
         self.counters.dec_pending(&self.entry.get().endpoint);
         self.entry.remove();
     }

It doesn't change the signature because it already takes the owned object. We're just declaring we are going to mutate that object as well.

@jmmaloney4
Copy link
Contributor

@MarcoPolo That makes sense. Thanks!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
difficulty:moderate getting-started Issues that can be tackled if you don't know the internals of libp2p very well help wanted
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants