diff --git a/Cargo.toml b/Cargo.toml index 2efc9db53e6..3764ced864d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -80,7 +80,7 @@ libp2p-pnet = { version = "0.22.0", path = "transports/pnet", optional = true } libp2p-relay = { version = "0.4.0", path = "protocols/relay", optional = true } libp2p-request-response = { version = "0.13.0", path = "protocols/request-response", optional = true } libp2p-swarm = { version = "0.31.0", path = "swarm" } -libp2p-swarm-derive = { version = "0.24.0", path = "swarm-derive" } +libp2p-swarm-derive = { version = "0.25.0", path = "swarm-derive" } libp2p-uds = { version = "0.30.0", path = "transports/uds", optional = true } libp2p-wasm-ext = { version = "0.30.0", path = "transports/wasm-ext", default-features = false, optional = true } libp2p-yamux = { version = "0.34.0", path = "muxers/yamux", optional = true } diff --git a/core/CHANGELOG.md b/core/CHANGELOG.md index e252497f4eb..3b1ec3c43e1 100644 --- a/core/CHANGELOG.md +++ b/core/CHANGELOG.md @@ -22,11 +22,19 @@ - Remove `DisconnectedPeer::set_connected` and `Pool::add` (see [PR 2195]). +- Report `ConnectionLimit` error through `ConnectionError` and thus through + `NetworkEvent::ConnectionClosed` instead of previously through + `PendingConnectionError` and thus `NetworkEvent::{IncomingConnectionError, + DialError}` (see [PR 2191]). + +- Report abortion of pending connection through `DialError`, + `UnknownPeerDialError` or `IncomingConnectionError` (see [PR 2191]). [PR 2145]: https://github.com/libp2p/rust-libp2p/pull/2145 [PR 2142]: https://github.com/libp2p/rust-libp2p/pull/2142 [PR 2137]: https://github.com/libp2p/rust-libp2p/pull/2137 [PR 2183]: https://github.com/libp2p/rust-libp2p/pull/2183 +[PR 2191]: https://github.com/libp2p/rust-libp2p/pull/2191 [PR 2195]: https://github.com/libp2p/rust-libp2p/pull/2195 # 0.29.0 [2021-07-12] diff --git a/core/src/connection.rs b/core/src/connection.rs index 335e2046c2d..9e39ae21807 100644 --- a/core/src/connection.rs +++ b/core/src/connection.rs @@ -229,10 +229,10 @@ where self.handler.inject_event(event); } - /// Begins an orderly shutdown of the connection, returning a - /// `Future` that resolves when connection shutdown is complete. - pub fn close(self) -> Close { - self.muxing.close().0 + /// Begins an orderly shutdown of the connection, returning the connection + /// handler and a `Future` that resolves when connection shutdown is complete. + pub fn close(self) -> (THandler, Close) { + (self.handler, self.muxing.close().0) } /// Polls the connection for events produced by the associated handler diff --git a/core/src/connection/error.rs b/core/src/connection/error.rs index 66da0670c98..ec4f7ff6e61 100644 --- a/core/src/connection/error.rs +++ b/core/src/connection/error.rs @@ -29,6 +29,10 @@ pub enum ConnectionError { // TODO: Eventually this should also be a custom error? IO(io::Error), + /// The connection was dropped because the connection limit + /// for a peer has been reached. + ConnectionLimit(ConnectionLimit), + /// The connection handler produced an error. Handler(THandlerErr), } @@ -41,6 +45,9 @@ where match self { ConnectionError::IO(err) => write!(f, "Connection error: I/O error: {}", err), ConnectionError::Handler(err) => write!(f, "Connection error: Handler error: {}", err), + ConnectionError::ConnectionLimit(l) => { + write!(f, "Connection error: Connection limit: {}.", l) + } } } } @@ -53,6 +60,7 @@ where match self { ConnectionError::IO(err) => Some(err), ConnectionError::Handler(err) => Some(err), + ConnectionError::ConnectionLimit(..) => None, } } } @@ -63,14 +71,13 @@ pub enum PendingConnectionError { /// An error occurred while negotiating the transport protocol(s). Transport(TransportError), + /// Pending connection attempt has been aborted. + Aborted, + /// The peer identity obtained on the connection did not /// match the one that was expected or is otherwise invalid. InvalidPeerId, - /// The connection was dropped because the connection limit - /// for a peer has been reached. - ConnectionLimit(ConnectionLimit), - /// An I/O error occurred on the connection. // TODO: Eventually this should also be a custom error? IO(io::Error), @@ -83,15 +90,13 @@ where fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { PendingConnectionError::IO(err) => write!(f, "Pending connection: I/O error: {}", err), + PendingConnectionError::Aborted => write!(f, "Pending connection: Aborted."), PendingConnectionError::Transport(err) => { write!(f, "Pending connection: Transport error: {}", err) } PendingConnectionError::InvalidPeerId => { write!(f, "Pending connection: Invalid peer ID.") } - PendingConnectionError::ConnectionLimit(l) => { - write!(f, "Connection error: Connection limit: {}.", l) - } } } } @@ -105,7 +110,7 @@ where PendingConnectionError::IO(err) => Some(err), PendingConnectionError::Transport(err) => Some(err), PendingConnectionError::InvalidPeerId => None, - PendingConnectionError::ConnectionLimit(..) => None, + PendingConnectionError::Aborted => None, } } } diff --git a/core/src/connection/manager.rs b/core/src/connection/manager.rs index 2d99bd1809b..51c08024202 100644 --- a/core/src/connection/manager.rs +++ b/core/src/connection/manager.rs @@ -20,8 +20,8 @@ use super::{ handler::{THandlerError, THandlerInEvent, THandlerOutEvent}, - Connected, ConnectedPoint, ConnectionError, ConnectionHandler, IntoConnectionHandler, - PendingConnectionError, Substream, + Connected, ConnectedPoint, ConnectionError, ConnectionHandler, ConnectionLimit, + IntoConnectionHandler, PendingConnectionError, Substream, }; use crate::{muxing::StreamMuxer, Executor}; use fnv::FnvHashMap; @@ -192,6 +192,7 @@ pub enum Event<'a, H: IntoConnectionHandler, TE> { /// The error that occurred, if any. If `None`, the connection /// has been actively closed. error: Option>>, + handler: H::Handler, }, /// A connection has been established. @@ -350,7 +351,7 @@ impl Manager { new_endpoint: new, } } - task::Event::Closed { id, error } => { + task::Event::Closed { id, error, handler } => { let id = ConnectionId(id); let task = task.remove(); match task.state { @@ -358,6 +359,7 @@ impl Manager { id, connected, error, + handler, }, TaskState::Pending => unreachable!( "`Event::Closed` implies (2) occurred on that task and thus (3)." @@ -437,7 +439,7 @@ impl<'a, I> EstablishedEntry<'a, I> { /// /// When the connection is ultimately closed, [`Event::ConnectionClosed`] /// is emitted by [`Manager::poll`]. - pub fn start_close(mut self) { + pub fn start_close(mut self, error: Option) { // Clone the sender so that we are guaranteed to have // capacity for the close command (every sender gets a slot). match self @@ -445,7 +447,7 @@ impl<'a, I> EstablishedEntry<'a, I> { .get_mut() .sender .clone() - .try_send(task::Command::Close) + .try_send(task::Command::Close(error)) { Ok(()) => {} Err(e) => assert!(e.is_disconnected(), "No capacity for close command."), @@ -460,17 +462,6 @@ impl<'a, I> EstablishedEntry<'a, I> { } } - /// Instantly removes the entry from the manager, dropping - /// the command channel to the background task of the connection, - /// which will thus drop the connection asap without an orderly - /// close or emitting another event. - pub fn remove(self) -> Connected { - match self.task.remove().state { - TaskState::Established(c) => c, - TaskState::Pending => unreachable!("By Entry::new()"), - } - } - /// Returns the connection ID. pub fn id(&self) -> ConnectionId { ConnectionId(*self.task.key()) diff --git a/core/src/connection/manager/task.rs b/core/src/connection/manager/task.rs index cf217cc8f78..0ed331c6625 100644 --- a/core/src/connection/manager/task.rs +++ b/core/src/connection/manager/task.rs @@ -23,8 +23,8 @@ use crate::{ connection::{ self, handler::{THandlerError, THandlerInEvent, THandlerOutEvent}, - Close, Connected, Connection, ConnectionError, ConnectionHandler, IntoConnectionHandler, - PendingConnectionError, Substream, + Close, Connected, Connection, ConnectionError, ConnectionHandler, ConnectionLimit, + IntoConnectionHandler, PendingConnectionError, Substream, }, muxing::StreamMuxer, Multiaddr, @@ -43,7 +43,7 @@ pub enum Command { NotifyHandler(T), /// Gracefully close the connection (active close) before /// terminating the task. - Close, + Close(Option), } /// Events that a task can emit to its manager. @@ -71,6 +71,7 @@ pub enum Event { Closed { id: TaskId, error: Option>>, + handler: H::Handler, }, } @@ -159,7 +160,11 @@ where }, /// The connection is closing (active close). - Closing(Close), + Closing { + closing_muxer: Close, + handler: H::Handler, + error: Option, + }, /// The task is terminating with a final event for the `Manager`. Terminating(Event), @@ -204,7 +209,16 @@ where Poll::Pending => {} Poll::Ready(None) => { // The manager has dropped the task; abort. - return Poll::Ready(()); + // Don't accept any further commands and terminate the + // task with a final event. + this.commands.get_mut().close(); + let event = Event::Failed { + id, + handler, + error: PendingConnectionError::Aborted, + }; + this.state = State::Terminating(event); + continue 'poll; } Poll::Ready(Some(_)) => { panic!("Task received command while the connection is pending.") @@ -243,15 +257,20 @@ where Poll::Ready(Some(Command::NotifyHandler(event))) => { connection.inject_event(event) } - Poll::Ready(Some(Command::Close)) => { + Poll::Ready(Some(Command::Close(error))) => { // Don't accept any further commands. this.commands.get_mut().close(); // Discard the event, if any, and start a graceful close. - this.state = State::Closing(connection.close()); + let (handler, closing_muxer) = connection.close(); + this.state = State::Closing { + handler, + closing_muxer, + error, + }; continue 'poll; } Poll::Ready(None) => { - // The manager has dropped the task or disappeared; abort. + // The manager has disappeared; abort. return Poll::Ready(()); } } @@ -306,10 +325,12 @@ where Poll::Ready(Err(error)) => { // Don't accept any further commands. this.commands.get_mut().close(); + let (handler, _closing_muxer) = connection.close(); // Terminate the task with the error, dropping the connection. let event = Event::Closed { id, error: Some(error), + handler, }; this.state = State::Terminating(event); } @@ -317,13 +338,18 @@ where } } - State::Closing(mut closing) => { + State::Closing { + handler, + error, + mut closing_muxer, + } => { // Try to gracefully close the connection. - match closing.poll_unpin(cx) { + match closing_muxer.poll_unpin(cx) { Poll::Ready(Ok(())) => { let event = Event::Closed { id: this.id, - error: None, + error: error.map(|limit| ConnectionError::ConnectionLimit(limit)), + handler, }; this.state = State::Terminating(event); } @@ -331,11 +357,16 @@ where let event = Event::Closed { id: this.id, error: Some(ConnectionError::IO(e)), + handler, }; this.state = State::Terminating(event); } Poll::Pending => { - this.state = State::Closing(closing); + this.state = State::Closing { + handler, + error, + closing_muxer, + }; return Poll::Pending; } } diff --git a/core/src/connection/pool.rs b/core/src/connection/pool.rs index 9fcbba3254b..11861600c21 100644 --- a/core/src/connection/pool.rs +++ b/core/src/connection/pool.rs @@ -26,6 +26,7 @@ use crate::{ IntoConnectionHandler, OutgoingInfo, PendingConnectionError, Substream, }, muxing::StreamMuxer, + network::DialError, ConnectedPoint, PeerId, }; use either::Either; @@ -53,12 +54,6 @@ pub struct Pool { /// The pending connections that are currently being negotiated. pending: FnvHashMap)>, - - /// Established connections that have been closed in the context of - /// a [`Pool::disconnect`] in order to emit a `ConnectionClosed` - /// event for each. Every `ConnectionEstablished` event must be - /// paired with (eventually) a `ConnectionClosed`. - disconnected: Vec, } impl fmt::Debug for Pool { @@ -101,6 +96,7 @@ pub enum PoolEvent<'a, THandler: IntoConnectionHandler, TTransErr> { pool: &'a mut Pool, /// The remaining number of established connections to the same peer. num_established: u32, + handler: THandler::Handler, }, /// A connection attempt failed. @@ -113,7 +109,7 @@ pub enum PoolEvent<'a, THandler: IntoConnectionHandler, TTransErr> { error: PendingConnectionError, /// The handler that was supposed to handle the connection, /// if the connection failed before the handler was consumed. - handler: Option, + handler: THandler, /// The (expected) peer of the failed connection. peer: Option, /// A reference to the pool that managed the connection. @@ -199,7 +195,6 @@ impl Pool { manager: Manager::new(manager_config), established: Default::default(), pending: Default::default(), - disconnected: Vec::new(), } } @@ -245,7 +240,7 @@ impl Pool { future: TFut, handler: THandler, info: OutgoingInfo<'_>, - ) -> Result + ) -> Result> where TFut: Future>> + Send @@ -257,7 +252,9 @@ impl Pool { TMuxer: StreamMuxer + Send + Sync + 'static, TMuxer::OutboundSubstream: Send + 'static, { - self.counters.check_max_pending_outgoing()?; + if let Err(limit) = self.counters.check_max_pending_outgoing() { + return Err(DialError::ConnectionLimit { limit, handler }); + }; let endpoint = info.to_connected_point(); Ok(self.add_pending(future, handler, endpoint, info.peer_id.cloned())) } @@ -379,45 +376,24 @@ impl Pool { /// (Forcefully) close all connections to the given peer. /// /// All connections to the peer, whether pending or established are - /// dropped asap and no more events from these connections are emitted + /// closed asap and no more events from these connections are emitted /// by the pool effective immediately. - /// - /// > **Note**: Established connections are dropped without performing - /// > an orderly close. See [`EstablishedConnection::start_close`] for - /// > performing such an orderly close. pub fn disconnect(&mut self, peer: &PeerId) { if let Some(conns) = self.established.get(peer) { - // Count upwards because we push to / pop from the end. See also `Pool::poll`. - let mut num_established = 0; - for (&id, endpoint) in conns.iter() { + for (&id, _endpoint) in conns.iter() { if let Some(manager::Entry::Established(e)) = self.manager.entry(id) { - let connected = e.remove(); - self.disconnected.push(Disconnected { - id, - connected, - num_established, - }); - num_established += 1; + e.start_close(None); } - self.counters.dec_established(endpoint); } } - self.established.remove(peer); - let mut aborted = Vec::new(); for (&id, (_endpoint, peer2)) in &self.pending { if Some(peer) == peer2.as_ref() { if let Some(manager::Entry::Pending(e)) = self.manager.entry(id) { e.abort(); - aborted.push(id); } } } - for id in aborted { - if let Some((endpoint, _)) = self.pending.remove(&id) { - self.counters.dec_pending(&endpoint); - } - } } /// Counts the number of established connections to the given peer. @@ -503,28 +479,6 @@ impl Pool { &'a mut self, cx: &mut Context<'_>, ) -> Poll> { - // Drain events resulting from forced disconnections. - // - // Note: The `Disconnected` entries in `self.disconnected` - // are inserted in ascending order of the remaining `num_established` - // connections. Thus we `pop()` them off from the end to emit the - // events in an order that properly counts down `num_established`. - // See also `Pool::disconnect`. - if let Some(Disconnected { - id, - connected, - num_established, - }) = self.disconnected.pop() - { - return Poll::Ready(PoolEvent::ConnectionClosed { - id, - connected, - num_established, - error: None, - pool: self, - }); - } - // Poll the connection `Manager`. loop { let item = match self.manager.poll(cx) { @@ -540,7 +494,7 @@ impl Pool { id, endpoint, error, - handler: Some(handler), + handler, peer, pool: self, }); @@ -550,6 +504,7 @@ impl Pool { id, connected, error, + handler, } => { let num_established = if let Some(conns) = self.established.get_mut(&connected.peer_id) { @@ -569,6 +524,7 @@ impl Pool { error, num_established, pool: self, + handler, }); } manager::Event::ConnectionEstablished { entry } => { @@ -578,30 +534,16 @@ impl Pool { // Check general established connection limit. if let Err(e) = self.counters.check_max_established(&endpoint) { - let connected = entry.remove(); - return Poll::Ready(PoolEvent::PendingConnectionError { - id, - endpoint: connected.endpoint, - error: PendingConnectionError::ConnectionLimit(e), - handler: None, - peer, - pool: self, - }); + entry.start_close(Some(e)); + continue; } // Check per-peer established connection limit. let current = num_peer_established(&self.established, &entry.connected().peer_id); if let Err(e) = self.counters.check_max_established_per_peer(current) { - let connected = entry.remove(); - return Poll::Ready(PoolEvent::PendingConnectionError { - id, - endpoint: connected.endpoint, - error: PendingConnectionError::ConnectionLimit(e), - handler: None, - peer, - pool: self, - }); + entry.start_close(Some(e)); + continue; } // Peer ID checks must already have happened. See `add_pending`. @@ -790,7 +732,7 @@ impl EstablishedConnection<'_, TInEvent> { /// /// Has no effect if the connection is already closing. pub fn start_close(self) { - self.entry.start_close() + self.entry.start_close(None) } } @@ -1064,15 +1006,3 @@ impl ConnectionLimits { self } } - -/// Information about a former established connection to a peer -/// that was dropped via [`Pool::disconnect`]. -struct Disconnected { - /// The unique identifier of the dropped connection. - id: ConnectionId, - /// Information about the dropped connection. - connected: Connected, - /// The remaining number of established connections - /// to the same peer. - num_established: u32, -} diff --git a/core/src/network.rs b/core/src/network.rs index 784c1e01ca7..831a99c4b01 100644 --- a/core/src/network.rs +++ b/core/src/network.rs @@ -22,7 +22,7 @@ mod event; pub mod peer; pub use crate::connection::{ConnectionCounters, ConnectionLimits}; -pub use event::{IncomingConnection, NetworkEvent}; +pub use event::{DialAttemptsRemaining, IncomingConnection, NetworkEvent}; pub use peer::Peer; use crate::{ @@ -45,7 +45,7 @@ use std::{ collections::hash_map, convert::TryFrom as _, error, fmt, - num::NonZeroUsize, + num::{NonZeroU32, NonZeroUsize}, pin::Pin, task::{Context, Poll}, }; @@ -202,7 +202,7 @@ where &mut self, address: &Multiaddr, handler: THandler, - ) -> Result + ) -> Result> where TTrans: Transport, TTrans::Error: Send + 'static, @@ -235,15 +235,11 @@ where Ok(f) => { let f = f.map_err(|err| PendingConnectionError::Transport(TransportError::Other(err))); - self.pool - .add_outgoing(f, handler, info) - .map_err(DialError::ConnectionLimit) + self.pool.add_outgoing(f, handler, info) } Err(err) => { let f = future::err(PendingConnectionError::Transport(err)); - self.pool - .add_outgoing(f, handler, info) - .map_err(DialError::ConnectionLimit) + self.pool.add_outgoing(f, handler, info) } } } @@ -445,12 +441,14 @@ where connected, error, num_established, + handler, .. }) => NetworkEvent::ConnectionClosed { id, connected, num_established, error, + handler, }, Poll::Ready(PoolEvent::ConnectionEvent { connection, event }) => { NetworkEvent::ConnectionEvent { connection, event } @@ -470,7 +468,10 @@ where } /// Initiates a connection attempt to a known peer. - fn dial_peer(&mut self, opts: DialingOpts) -> Result + fn dial_peer( + &mut self, + opts: DialingOpts, + ) -> Result> where TTrans: Transport, TTrans::Dial: Send + 'static, @@ -502,7 +503,7 @@ fn dial_peer_impl( pool: &mut Pool, dialing: &mut FnvHashMap>, opts: DialingOpts, -) -> Result +) -> Result> where THandler: IntoConnectionHandler + Send + 'static, ::Error: error::Error + Send + 'static, @@ -517,7 +518,15 @@ where // Ensure the address to dial encapsulates the `p2p` protocol for the // targeted peer, so that the transport has a "fully qualified" address // to work with. - let addr = p2p_addr(opts.peer, opts.address).map_err(DialError::InvalidAddress)?; + let addr = match p2p_addr(opts.peer, opts.address) { + Ok(address) => address, + Err(address) => { + return Err(DialError::InvalidAddress { + address, + handler: opts.handler, + }) + } + }; let result = match transport.dial(addr.clone()) { Ok(fut) => { @@ -527,7 +536,6 @@ where peer_id: Some(&opts.peer), }; pool.add_outgoing(fut, opts.handler, info) - .map_err(DialError::ConnectionLimit) } Err(err) => { let fut = future::err(PendingConnectionError::Transport(err)); @@ -536,7 +544,6 @@ where peer_id: Some(&opts.peer), }; pool.add_outgoing(fut, opts.handler, info) - .map_err(DialError::ConnectionLimit) } }; @@ -563,7 +570,7 @@ fn on_connection_failed<'a, TTrans, THandler>( id: ConnectionId, endpoint: ConnectedPoint, error: PendingConnectionError, - handler: Option, + handler: THandler, ) -> ( Option>, NetworkEvent<'a, TTrans, THandlerInEvent, THandlerOutEvent, THandler>, @@ -591,24 +598,17 @@ where let num_remain = u32::try_from(attempt.remaining.len()).unwrap(); let failed_addr = attempt.current.1.clone(); - let (opts, attempts_remaining) = if num_remain > 0 { - if let Some(handler) = handler { - let next_attempt = attempt.remaining.remove(0); - let opts = DialingOpts { - peer: peer_id, - handler, - address: next_attempt, - remaining: attempt.remaining, - }; - (Some(opts), num_remain) - } else { - // The error is "fatal" for the dialing attempt, since - // the handler was already consumed. All potential - // remaining connection attempts are thus void. - (None, 0) - } + let (opts, attempts_remaining) = if let Some(num_remain) = NonZeroU32::new(num_remain) { + let next_attempt = attempt.remaining.remove(0); + let opts = DialingOpts { + peer: peer_id, + handler, + address: next_attempt, + remaining: attempt.remaining, + }; + (Some(opts), DialAttemptsRemaining::Some(num_remain)) } else { - (None, 0) + (None, DialAttemptsRemaining::None(handler)) }; ( @@ -628,6 +628,7 @@ where NetworkEvent::UnknownPeerDialError { multiaddr: address, error, + handler, }, ), ConnectedPoint::Listener { @@ -639,6 +640,7 @@ where local_addr, send_back_addr, error, + handler, }, ), } @@ -751,13 +753,42 @@ fn p2p_addr(peer: PeerId, addr: Multiaddr) -> Result { } /// Possible (synchronous) errors when dialing a peer. -#[derive(Clone, Debug)] -pub enum DialError { +#[derive(Clone)] +pub enum DialError { /// The dialing attempt is rejected because of a connection limit. - ConnectionLimit(ConnectionLimit), + ConnectionLimit { + limit: ConnectionLimit, + handler: THandler, + }, /// The address being dialed is invalid, e.g. if it refers to a different /// remote peer than the one being dialed. - InvalidAddress(Multiaddr), + InvalidAddress { + address: Multiaddr, + handler: THandler, + }, + /// The dialing attempt is rejected because the peer being dialed is the local peer. + LocalPeerId { handler: THandler }, +} + +impl fmt::Debug for DialError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> { + match self { + DialError::ConnectionLimit { limit, handler: _ } => f + .debug_struct("DialError::ConnectionLimit") + .field("limit", limit) + .finish(), + DialError::InvalidAddress { + address, + handler: _, + } => f + .debug_struct("DialError::InvalidAddress") + .field("address", address) + .finish(), + DialError::LocalPeerId { handler: _ } => { + f.debug_struct("DialError::LocalPeerId").finish() + } + } + } } #[cfg(test)] diff --git a/core/src/network/event.rs b/core/src/network/event.rs index 7b4158265d9..cea5bbddc21 100644 --- a/core/src/network/event.rs +++ b/core/src/network/event.rs @@ -92,6 +92,7 @@ where send_back_addr: Multiaddr, /// The error that happened. error: PendingConnectionError, + handler: THandler, }, /// A new connection to a peer has been established. @@ -124,12 +125,13 @@ where error: Option::Error>>, /// The remaining number of established connections to the same peer. num_established: u32, + handler: THandler::Handler, }, /// A dialing attempt to an address of a peer failed. DialError { /// The number of remaining dialing attempts. - attempts_remaining: u32, + attempts_remaining: DialAttemptsRemaining, /// Id of the peer we were trying to dial. peer_id: PeerId, @@ -148,6 +150,8 @@ where /// The error that happened. error: PendingConnectionError, + + handler: THandler, }, /// An established connection produced an event. @@ -169,6 +173,20 @@ where }, } +pub enum DialAttemptsRemaining { + Some(NonZeroU32), + None(THandler), +} + +impl DialAttemptsRemaining { + pub fn get_attempts(&self) -> u32 { + match self { + DialAttemptsRemaining::Some(attempts) => (*attempts).into(), + DialAttemptsRemaining::None(_) => 0, + } + } +} + impl fmt::Debug for NetworkEvent<'_, TTrans, TInEvent, TOutEvent, THandler> where @@ -221,6 +239,7 @@ where local_addr, send_back_addr, error, + handler: _, } => f .debug_struct("IncomingConnectionError") .field("local_addr", local_addr) @@ -249,7 +268,7 @@ where error, } => f .debug_struct("DialError") - .field("attempts_remaining", attempts_remaining) + .field("attempts_remaining", &attempts_remaining.get_attempts()) .field("peer_id", peer_id) .field("multiaddr", multiaddr) .field("error", error) diff --git a/core/src/network/peer.rs b/core/src/network/peer.rs index 7e904ce2c79..1eda5dde9e0 100644 --- a/core/src/network/peer.rs +++ b/core/src/network/peer.rs @@ -22,8 +22,8 @@ use super::{DialError, DialingOpts, Network}; use crate::{ connection::{ handler::THandlerInEvent, pool::Pool, ConnectedPoint, ConnectionHandler, ConnectionId, - ConnectionLimit, EstablishedConnection, EstablishedConnectionIter, IntoConnectionHandler, - PendingConnection, Substream, + EstablishedConnection, EstablishedConnectionIter, IntoConnectionHandler, PendingConnection, + Substream, }, Multiaddr, PeerId, StreamMuxer, Transport, }; @@ -163,7 +163,7 @@ where address: Multiaddr, remaining: I, handler: THandler, - ) -> Result<(ConnectionId, DialingPeer<'a, TTrans, THandler>), DialError> + ) -> Result<(ConnectionId, DialingPeer<'a, TTrans, THandler>), DialError> where I: IntoIterator, { @@ -171,12 +171,7 @@ where Peer::Connected(p) => (p.peer_id, p.network), Peer::Dialing(p) => (p.peer_id, p.network), Peer::Disconnected(p) => (p.peer_id, p.network), - Peer::Local => { - return Err(DialError::ConnectionLimit(ConnectionLimit { - current: 0, - limit: 0, - })) - } + Peer::Local => return Err(DialError::LocalPeerId { handler }), }; let id = network.dial_peer(DialingOpts { diff --git a/core/tests/connection_limits.rs b/core/tests/connection_limits.rs index 65e61c4b3c4..d5156664ffb 100644 --- a/core/tests/connection_limits.rs +++ b/core/tests/connection_limits.rs @@ -23,7 +23,7 @@ mod util; use futures::{future::poll_fn, ready}; use libp2p_core::multiaddr::{multiaddr, Multiaddr}; use libp2p_core::{ - connection::PendingConnectionError, + connection::ConnectionError, network::{ConnectionLimits, DialError, NetworkConfig, NetworkEvent}, PeerId, }; @@ -53,9 +53,9 @@ fn max_outgoing() { .dial(Multiaddr::empty(), Vec::new(), TestHandler()) .expect_err("Unexpected dialing success.") { - DialError::ConnectionLimit(err) => { - assert_eq!(err.current, outgoing_limit); - assert_eq!(err.limit, outgoing_limit); + DialError::ConnectionLimit { limit, handler: _ } => { + assert_eq!(limit.current, outgoing_limit); + assert_eq!(limit.limit, outgoing_limit); } e => panic!("Unexpected error: {:?}", e), } @@ -111,8 +111,8 @@ fn max_established_incoming() { network1.accept(connection, TestHandler()).unwrap(); } NetworkEvent::ConnectionEstablished { .. } => {} - NetworkEvent::IncomingConnectionError { - error: PendingConnectionError::ConnectionLimit(err), + NetworkEvent::ConnectionClosed { + error: Some(ConnectionError::ConnectionLimit(err)), .. } => { assert_eq!(err.limit, limit); diff --git a/core/tests/network_dial_error.rs b/core/tests/network_dial_error.rs index 224d7950eac..bed4c06e023 100644 --- a/core/tests/network_dial_error.rs +++ b/core/tests/network_dial_error.rs @@ -65,11 +65,12 @@ fn deny_incoming_connec() { match swarm2.poll(cx) { Poll::Ready(NetworkEvent::DialError { - attempts_remaining: 0, + attempts_remaining, peer_id, multiaddr, error: PendingConnectionError::Transport(_), }) => { + assert_eq!(0u32, attempts_remaining.get_attempts()); assert_eq!(&peer_id, swarm1.local_peer_id()); assert_eq!( multiaddr, @@ -201,10 +202,10 @@ fn multiple_addresses_err() { .with(Protocol::P2p(target.clone().into())); assert_eq!(multiaddr, expected); if addresses.is_empty() { - assert_eq!(attempts_remaining, 0); + assert_eq!(attempts_remaining.get_attempts(), 0); return Poll::Ready(Ok(())); } else { - assert_eq!(attempts_remaining, addresses.len() as u32); + assert_eq!(attempts_remaining.get_attempts(), addresses.len() as u32); } } Poll::Ready(_) => unreachable!(), diff --git a/misc/metrics/src/swarm.rs b/misc/metrics/src/swarm.rs index c5c4558ca7f..630479a2f9c 100644 --- a/misc/metrics/src/swarm.rs +++ b/misc/metrics/src/swarm.rs @@ -230,7 +230,7 @@ enum PendingConnectionError { InvalidPeerId, TransportErrorMultiaddrNotSupported, TransportErrorOther, - ConnectionLimit, + Aborted, Io, } @@ -248,8 +248,8 @@ impl From<&libp2p_core::connection::PendingConnectionError libp2p_core::connection::PendingConnectionError::Transport( libp2p_core::transport::TransportError::Other(_), ) => PendingConnectionError::TransportErrorOther, - libp2p_core::connection::PendingConnectionError::ConnectionLimit(_) => { - PendingConnectionError::ConnectionLimit + libp2p_core::connection::PendingConnectionError::Aborted => { + PendingConnectionError::Aborted } libp2p_core::connection::PendingConnectionError::IO(_) => PendingConnectionError::Io, } diff --git a/protocols/floodsub/src/layer.rs b/protocols/floodsub/src/layer.rs index eb5a7cb30b2..25b235e3364 100644 --- a/protocols/floodsub/src/layer.rs +++ b/protocols/floodsub/src/layer.rs @@ -29,7 +29,7 @@ use fnv::FnvHashSet; use libp2p_core::{connection::ConnectionId, PeerId}; use libp2p_swarm::{ DialPeerCondition, NetworkBehaviour, NetworkBehaviourAction, NotifyHandler, OneShotHandler, - PollParameters, ProtocolsHandler, + PollParameters, }; use log::warn; use smallvec::SmallVec; @@ -40,7 +40,12 @@ use std::{collections::VecDeque, iter}; /// Network behaviour that handles the floodsub protocol. pub struct Floodsub { /// Events that need to be yielded to the outside when polling. - events: VecDeque>, + events: VecDeque< + NetworkBehaviourAction< + FloodsubEvent, + OneShotHandler, + >, + >, config: FloodsubConfig, @@ -101,9 +106,11 @@ impl Floodsub { } if self.target_peers.insert(peer_id) { + let handler = self.new_handler(); self.events.push_back(NetworkBehaviourAction::DialPeer { peer_id, condition: DialPeerCondition::Disconnected, + handler, }); } } @@ -302,9 +309,11 @@ impl NetworkBehaviour for Floodsub { // We can be disconnected by the remote in case of inactivity for example, so we always // try to reconnect. if self.target_peers.contains(id) { + let handler = self.new_handler(); self.events.push_back(NetworkBehaviourAction::DialPeer { peer_id: *id, condition: DialPeerCondition::Disconnected, + handler, }); } } @@ -426,12 +435,7 @@ impl NetworkBehaviour for Floodsub { &mut self, _: &mut Context<'_>, _: &mut impl PollParameters, - ) -> Poll< - NetworkBehaviourAction< - ::InEvent, - Self::OutEvent, - >, - > { + ) -> Poll> { if let Some(event) = self.events.pop_front() { return Poll::Ready(event); } diff --git a/protocols/gossipsub/src/behaviour.rs b/protocols/gossipsub/src/behaviour.rs index 803f79c924b..e3adfdcfc1a 100644 --- a/protocols/gossipsub/src/behaviour.rs +++ b/protocols/gossipsub/src/behaviour.rs @@ -41,8 +41,8 @@ use libp2p_core::{ multiaddr::Protocol::Ip6, ConnectedPoint, Multiaddr, PeerId, }; use libp2p_swarm::{ - DialPeerCondition, NetworkBehaviour, NetworkBehaviourAction, NotifyHandler, PollParameters, - ProtocolsHandler, + DialPeerCondition, IntoProtocolsHandler, NetworkBehaviour, NetworkBehaviourAction, + NotifyHandler, PollParameters, }; use crate::backoff::BackoffStorage; @@ -193,7 +193,7 @@ impl From for PublishConfig { } type GossipsubNetworkBehaviourAction = - NetworkBehaviourAction, GossipsubEvent>; + NetworkBehaviourAction>; /// Network behaviour that handles the gossipsub protocol. /// @@ -425,8 +425,8 @@ where impl Gossipsub where - D: DataTransform, - F: TopicSubscriptionFilter, + D: DataTransform + Send + 'static, + F: TopicSubscriptionFilter + Send + 'static, { /// Lists the hashes of the topics we are currently subscribed to. pub fn topics(&self) -> impl Iterator { @@ -1043,9 +1043,11 @@ where if !self.peer_topics.contains_key(peer_id) { // Connect to peer debug!("Connecting to explicit peer {:?}", peer_id); + let handler = self.new_handler(); self.events.push_back(NetworkBehaviourAction::DialPeer { peer_id: *peer_id, condition: DialPeerCondition::Disconnected, + handler, }); } } @@ -1493,9 +1495,11 @@ where self.px_peers.insert(peer_id); // dial peer + let handler = self.new_handler(); self.events.push_back(NetworkBehaviourAction::DialPeer { peer_id, condition: DialPeerCondition::Disconnected, + handler, }); } } @@ -2969,6 +2973,7 @@ where peer_id: &PeerId, connection_id: &ConnectionId, endpoint: &ConnectedPoint, + _: ::Handler, ) { // Remove IP from peer scoring system if let Some((peer_score, ..)) = &mut self.peer_score { @@ -3169,47 +3174,12 @@ where &mut self, cx: &mut Context<'_>, _: &mut impl PollParameters, - ) -> Poll< - NetworkBehaviourAction< - ::InEvent, - Self::OutEvent, - >, - > { + ) -> Poll> { if let Some(event) = self.events.pop_front() { - return Poll::Ready(match event { - NetworkBehaviourAction::NotifyHandler { - peer_id, - handler, - event: send_event, - } => { - // clone send event reference if others references are present - let event = Arc::try_unwrap(send_event).unwrap_or_else(|e| (*e).clone()); - NetworkBehaviourAction::NotifyHandler { - peer_id, - event, - handler, - } - } - NetworkBehaviourAction::GenerateEvent(e) => { - NetworkBehaviourAction::GenerateEvent(e) - } - NetworkBehaviourAction::DialAddress { address } => { - NetworkBehaviourAction::DialAddress { address } - } - NetworkBehaviourAction::DialPeer { peer_id, condition } => { - NetworkBehaviourAction::DialPeer { peer_id, condition } - } - NetworkBehaviourAction::ReportObservedAddr { address, score } => { - NetworkBehaviourAction::ReportObservedAddr { address, score } - } - NetworkBehaviourAction::CloseConnection { - peer_id, - connection, - } => NetworkBehaviourAction::CloseConnection { - peer_id, - connection, - }, - }); + return Poll::Ready(event.map_in(|e: Arc| { + // clone send event reference if others references are present + Arc::try_unwrap(e).unwrap_or_else(|e| (*e).clone()) + })); } // update scores @@ -3396,7 +3366,7 @@ impl fmt::Debug for Gossipsub) -> fmt::Result { f.debug_struct("Gossipsub") .field("config", &self.config) - .field("events", &self.events) + .field("events", &self.events.len()) .field("control_pool", &self.control_pool) .field("publish_config", &self.publish_config) .field("topic_peers", &self.topic_peers) diff --git a/protocols/gossipsub/src/behaviour/tests.rs b/protocols/gossipsub/src/behaviour/tests.rs index ead3e6d89a9..5794f2e0054 100644 --- a/protocols/gossipsub/src/behaviour/tests.rs +++ b/protocols/gossipsub/src/behaviour/tests.rs @@ -747,7 +747,7 @@ mod tests { // check that our subscriptions are sent to each of the peers // collect all the SendEvents - let send_events: Vec<&NetworkBehaviourAction, GossipsubEvent>> = gs + let send_events: Vec<_> = gs .events .iter() .filter(|e| match e { @@ -1336,13 +1336,14 @@ mod tests { //add peer as explicit peer gs.add_explicit_peer(&peer); - let dial_events: Vec<&NetworkBehaviourAction, GossipsubEvent>> = gs + let dial_events: Vec<_> = gs .events .iter() .filter(|e| match e { NetworkBehaviourAction::DialPeer { peer_id, condition: DialPeerCondition::Disconnected, + handler: _, } => peer_id == &peer, _ => false, }) @@ -1388,6 +1389,7 @@ mod tests { NetworkBehaviourAction::DialPeer { peer_id, condition: DialPeerCondition::Disconnected, + handler: _, } => peer_id == peer, _ => false, }) @@ -1406,6 +1408,7 @@ mod tests { NetworkBehaviourAction::DialPeer { peer_id, condition: DialPeerCondition::Disconnected, + handler: _, } => peer_id == peer, _ => false, }) @@ -1819,6 +1822,7 @@ mod tests { NetworkBehaviourAction::DialPeer { peer_id, condition: DialPeerCondition::Disconnected, + handler: _, } => Some(peer_id.clone()), _ => None, }) diff --git a/protocols/identify/src/identify.rs b/protocols/identify/src/identify.rs index caa737f669c..d75dfb72054 100644 --- a/protocols/identify/src/identify.rs +++ b/protocols/identify/src/identify.rs @@ -27,8 +27,9 @@ use libp2p_core::{ ConnectedPoint, Multiaddr, PeerId, PublicKey, }; use libp2p_swarm::{ - AddressScore, DialPeerCondition, NegotiatedSubstream, NetworkBehaviour, NetworkBehaviourAction, - NotifyHandler, PollParameters, ProtocolsHandler, ProtocolsHandlerUpgrErr, + AddressScore, DialError, DialPeerCondition, IntoProtocolsHandler, NegotiatedSubstream, + NetworkBehaviour, NetworkBehaviourAction, NotifyHandler, PollParameters, ProtocolsHandler, + ProtocolsHandlerUpgrErr, }; use std::{ collections::{HashMap, HashSet, VecDeque}, @@ -52,7 +53,7 @@ pub struct Identify { /// Pending replies to send. pending_replies: VecDeque, /// Pending events to be emitted when polled. - events: VecDeque>, + events: VecDeque>, /// Peers to which an active push with current information about /// the local peer should be sent. pending_push: HashSet, @@ -173,9 +174,11 @@ impl Identify { for p in peers { if self.pending_push.insert(p) { if !self.connected.contains_key(&p) { + let handler = self.new_handler(); self.events.push_back(NetworkBehaviourAction::DialPeer { peer_id: p, condition: DialPeerCondition::Disconnected, + handler, }); } } @@ -213,13 +216,14 @@ impl NetworkBehaviour for Identify { peer_id: &PeerId, conn: &ConnectionId, _: &ConnectedPoint, + _: ::Handler, ) { if let Some(addrs) = self.connected.get_mut(peer_id) { addrs.remove(conn); } } - fn inject_dial_failure(&mut self, peer_id: &PeerId) { + fn inject_dial_failure(&mut self, peer_id: &PeerId, _: Self::ProtocolsHandler, _: DialError) { if !self.connected.contains_key(peer_id) { self.pending_push.remove(peer_id); } @@ -292,12 +296,7 @@ impl NetworkBehaviour for Identify { &mut self, cx: &mut Context<'_>, params: &mut impl PollParameters, - ) -> Poll< - NetworkBehaviourAction< - ::InEvent, - Self::OutEvent, - >, - > { + ) -> Poll> { if let Some(event) = self.events.pop_front() { return Poll::Ready(event); } diff --git a/protocols/kad/src/behaviour.rs b/protocols/kad/src/behaviour.rs index c7cd51b95b7..74b5616ff91 100644 --- a/protocols/kad/src/behaviour.rs +++ b/protocols/kad/src/behaviour.rs @@ -43,7 +43,8 @@ use libp2p_core::{ ConnectedPoint, Multiaddr, PeerId, }; use libp2p_swarm::{ - DialPeerCondition, NetworkBehaviour, NetworkBehaviourAction, NotifyHandler, PollParameters, + DialError, DialPeerCondition, NetworkBehaviour, NetworkBehaviourAction, NotifyHandler, + PollParameters, }; use log::{debug, info, warn}; use smallvec::SmallVec; @@ -98,7 +99,7 @@ pub struct Kademlia { connection_idle_timeout: Duration, /// Queued events to return when the behaviour is being polled. - queued_events: VecDeque, KademliaEvent>>, + queued_events: VecDeque>>, /// The currently known addresses of the local node. local_addrs: HashSet, @@ -394,6 +395,7 @@ impl KademliaConfig { impl Kademlia where for<'a> TStore: RecordStore<'a>, + TStore: Send + 'static, { /// Creates a new `Kademlia` network behaviour with a default configuration. pub fn new(id: PeerId, store: TStore) -> Self { @@ -561,10 +563,12 @@ where RoutingUpdate::Failed } kbucket::InsertResult::Pending { disconnected } => { + let handler = self.new_handler(); self.queued_events .push_back(NetworkBehaviourAction::DialPeer { peer_id: disconnected.into_preimage(), condition: DialPeerCondition::Disconnected, + handler, }); RoutingUpdate::Pending } @@ -1140,10 +1144,12 @@ where // // Only try dialing peer if not currently connected. if !self.connected_peers.contains(disconnected.preimage()) { + let handler = self.new_handler(); self.queued_events .push_back(NetworkBehaviourAction::DialPeer { peer_id: disconnected.into_preimage(), condition: DialPeerCondition::Disconnected, + handler, }) } } @@ -1859,9 +1865,32 @@ where } } - fn inject_dial_failure(&mut self, peer_id: &PeerId) { - for query in self.queries.iter_mut() { - query.on_failure(peer_id); + fn inject_dial_failure( + &mut self, + peer_id: &PeerId, + _: Self::ProtocolsHandler, + error: DialError, + ) { + match error { + DialError::Banned + | DialError::ConnectionLimit(_) + | DialError::InvalidAddress(_) + | DialError::UnreachableAddr(_) + | DialError::LocalPeerId + | DialError::NoAddresses => { + for query in self.queries.iter_mut() { + query.on_failure(peer_id); + } + } + DialError::DialPeerConditionFalse( + DialPeerCondition::Disconnected | DialPeerCondition::NotDialing, + ) => { + // We might (still) be connected, or about to be connected, thus do not report the + // failure to the queries. + } + DialError::DialPeerConditionFalse(DialPeerCondition::Always) => { + unreachable!("DialPeerCondition::Always can not trigger DialPeerConditionFalse."); + } } } @@ -2156,7 +2185,7 @@ where &mut self, cx: &mut Context<'_>, parameters: &mut impl PollParameters, - ) -> Poll, Self::OutEvent>> { + ) -> Poll> { let now = Instant::now(); // Calculate the available capacity for queries triggered by background jobs. @@ -2254,10 +2283,12 @@ where }); } else if &peer_id != self.kbuckets.local_key().preimage() { query.inner.pending_rpcs.push((peer_id, event)); + let handler = self.new_handler(); self.queued_events .push_back(NetworkBehaviourAction::DialPeer { peer_id, condition: DialPeerCondition::Disconnected, + handler, }); } } diff --git a/protocols/mdns/src/behaviour.rs b/protocols/mdns/src/behaviour.rs index 2a170e2d839..215a00be21b 100644 --- a/protocols/mdns/src/behaviour.rs +++ b/protocols/mdns/src/behaviour.rs @@ -287,12 +287,7 @@ impl NetworkBehaviour for Mdns { &mut self, cx: &mut Context<'_>, params: &mut impl PollParameters, - ) -> Poll< - NetworkBehaviourAction< - ::InEvent, - Self::OutEvent, - >, - > { + ) -> Poll> { while let Poll::Ready(event) = Pin::new(&mut self.if_watch).poll(cx) { let socket = self.recv_socket.get_ref(); match event { diff --git a/protocols/ping/src/lib.rs b/protocols/ping/src/lib.rs index d4e3828f430..70345dd0472 100644 --- a/protocols/ping/src/lib.rs +++ b/protocols/ping/src/lib.rs @@ -49,7 +49,6 @@ pub use handler::{PingConfig, PingFailure, PingResult, PingSuccess}; use libp2p_core::{connection::ConnectionId, PeerId}; use libp2p_swarm::{NetworkBehaviour, NetworkBehaviourAction, PollParameters}; use std::{collections::VecDeque, task::Context, task::Poll}; -use void::Void; /// `Ping` is a [`NetworkBehaviour`] that responds to inbound pings and /// periodically sends outbound pings on every established connection. @@ -103,7 +102,7 @@ impl NetworkBehaviour for Ping { &mut self, _: &mut Context<'_>, _: &mut impl PollParameters, - ) -> Poll> { + ) -> Poll> { if let Some(e) = self.events.pop_back() { Poll::Ready(NetworkBehaviourAction::GenerateEvent(e)) } else { diff --git a/protocols/relay/src/behaviour.rs b/protocols/relay/src/behaviour.rs index 9b17eca2c51..35f97647efc 100644 --- a/protocols/relay/src/behaviour.rs +++ b/protocols/relay/src/behaviour.rs @@ -29,7 +29,8 @@ use libp2p_core::connection::{ConnectedPoint, ConnectionId, ListenerId}; use libp2p_core::multiaddr::Multiaddr; use libp2p_core::PeerId; use libp2p_swarm::{ - DialPeerCondition, NetworkBehaviour, NetworkBehaviourAction, NotifyHandler, PollParameters, + DialError, DialPeerCondition, IntoProtocolsHandler, NetworkBehaviour, NetworkBehaviourAction, + NotifyHandler, PollParameters, }; use std::collections::{hash_map::Entry, HashMap, HashSet, VecDeque}; use std::task::{Context, Poll}; @@ -45,7 +46,7 @@ pub struct Relay { /// [`Self::listeners`] or [`Self::listener_any_relay`]. outbox_to_listeners: VecDeque<(PeerId, BehaviourToListenerMsg)>, /// Events that need to be yielded to the outside when polling. - outbox_to_swarm: VecDeque>, + outbox_to_swarm: VecDeque>, /// List of peers the network is connected to. connected_peers: HashMap>, @@ -301,7 +302,20 @@ impl NetworkBehaviour for Relay { } } - fn inject_dial_failure(&mut self, peer_id: &PeerId) { + fn inject_dial_failure( + &mut self, + peer_id: &PeerId, + _: Self::ProtocolsHandler, + error: DialError, + ) { + if let DialError::DialPeerConditionFalse( + DialPeerCondition::Disconnected | DialPeerCondition::NotDialing, + ) = error + { + // Return early. The dial, that this dial was canceled for, might still succeed. + return; + } + if let Entry::Occupied(o) = self.listeners.entry(*peer_id) { if matches!(o.get(), RelayListener::Connecting { .. }) { // By removing the entry, the channel to the listener is dropped and thus the @@ -340,6 +354,7 @@ impl NetworkBehaviour for Relay { peer: &PeerId, connection: &ConnectionId, _: &ConnectedPoint, + _: ::Handler, ) { // Remove connection from the set of connections for the given peer. In case the set is // empty it will be removed in `inject_disconnected`. @@ -472,10 +487,12 @@ impl NetworkBehaviour for Relay { src_connection_id: connection, }, ); + let handler = self.new_handler(); self.outbox_to_swarm .push_back(NetworkBehaviourAction::DialPeer { peer_id: dest_id, condition: DialPeerCondition::NotDialing, + handler, }); } else { self.outbox_to_swarm @@ -562,7 +579,7 @@ impl NetworkBehaviour for Relay { &mut self, cx: &mut Context<'_>, poll_parameters: &mut impl PollParameters, - ) -> Poll> { + ) -> Poll> { if !self.outbox_to_listeners.is_empty() { let relay_peer_id = self.outbox_to_listeners[0].0; @@ -668,6 +685,7 @@ impl NetworkBehaviour for Relay { return Poll::Ready(NetworkBehaviourAction::DialPeer { peer_id: relay_peer_id, condition: DialPeerCondition::Disconnected, + handler: self.new_handler(), }); } } @@ -734,6 +752,7 @@ impl NetworkBehaviour for Relay { return Poll::Ready(NetworkBehaviourAction::DialPeer { peer_id: relay_peer_id, condition: DialPeerCondition::Disconnected, + handler: self.new_handler(), }); } } diff --git a/protocols/relay/tests/lib.rs b/protocols/relay/tests/lib.rs index 0829ec87d7b..29a0cfd35ed 100644 --- a/protocols/relay/tests/lib.rs +++ b/protocols/relay/tests/lib.rs @@ -1147,11 +1147,12 @@ enum CombinedEvent { } impl CombinedBehaviour { - fn poll( + fn poll( &mut self, _: &mut Context, _: &mut impl PollParameters, - ) -> Poll> { + ) -> Poll::ProtocolsHandler>> + { if !self.events.is_empty() { return Poll::Ready(NetworkBehaviourAction::GenerateEvent(self.events.remove(0))); } diff --git a/protocols/request-response/Cargo.toml b/protocols/request-response/Cargo.toml index 8c73e766a8c..6f71ec9f408 100644 --- a/protocols/request-response/Cargo.toml +++ b/protocols/request-response/Cargo.toml @@ -25,6 +25,7 @@ wasm-timer = "0.2" [dev-dependencies] async-std = "1.6.2" +env_logger = "0.9.0" libp2p-noise = { path = "../../transports/noise" } libp2p-tcp = { path = "../../transports/tcp" } libp2p-yamux = { path = "../../muxers/yamux" } diff --git a/protocols/request-response/src/lib.rs b/protocols/request-response/src/lib.rs index a2277e4c8df..ef3913c1efb 100644 --- a/protocols/request-response/src/lib.rs +++ b/protocols/request-response/src/lib.rs @@ -68,7 +68,8 @@ use futures::channel::oneshot; use handler::{RequestProtocol, RequestResponseHandler, RequestResponseHandlerEvent}; use libp2p_core::{connection::ConnectionId, ConnectedPoint, Multiaddr, PeerId}; use libp2p_swarm::{ - DialPeerCondition, NetworkBehaviour, NetworkBehaviourAction, NotifyHandler, PollParameters, + DialError, DialPeerCondition, IntoProtocolsHandler, NetworkBehaviour, NetworkBehaviourAction, + NotifyHandler, PollParameters, }; use smallvec::SmallVec; use std::{ @@ -303,7 +304,7 @@ impl RequestResponseConfig { /// A request/response protocol for some message codec. pub struct RequestResponse where - TCodec: RequestResponseCodec, + TCodec: RequestResponseCodec + Clone + Send + 'static, { /// The supported inbound protocols. inbound_protocols: SmallVec<[TCodec::Protocol; 2]>, @@ -320,8 +321,8 @@ where /// Pending events to return from `poll`. pending_events: VecDeque< NetworkBehaviourAction< - RequestProtocol, RequestResponseEvent, + RequestResponseHandler, >, >, /// The currently connected peers, their pending outbound and inbound responses and their known, @@ -336,7 +337,7 @@ where impl RequestResponse where - TCodec: RequestResponseCodec + Clone, + TCodec: RequestResponseCodec + Clone + Send + 'static, { /// Creates a new `RequestResponse` behaviour for the given /// protocols, codec and configuration. @@ -403,10 +404,12 @@ where }; if let Some(request) = self.try_send_request(peer, request) { + let handler = self.new_handler(); self.pending_events .push_back(NetworkBehaviourAction::DialPeer { peer_id: *peer, condition: DialPeerCondition::Disconnected, + handler, }); self.pending_outbound_requests .entry(*peer) @@ -639,6 +642,7 @@ where peer_id: &PeerId, conn: &ConnectionId, _: &ConnectedPoint, + _: ::Handler, ) { let connections = self .connected @@ -682,7 +686,7 @@ where self.connected.remove(peer); } - fn inject_dial_failure(&mut self, peer: &PeerId) { + fn inject_dial_failure(&mut self, peer: &PeerId, _: Self::ProtocolsHandler, _: DialError) { // If there are pending outgoing requests when a dial failure occurs, // it is implied that we are not connected to the peer, since pending // outgoing requests are drained when a connection is established and @@ -863,12 +867,7 @@ where &mut self, _: &mut Context<'_>, _: &mut impl PollParameters, - ) -> Poll< - NetworkBehaviourAction< - RequestProtocol, - RequestResponseEvent, - >, - > { + ) -> Poll> { if let Some(ev) = self.pending_events.pop_front() { return Poll::Ready(ev); } else if self.pending_events.capacity() > EMPTY_QUEUE_SHRINK_THRESHOLD { diff --git a/protocols/request-response/src/throttled.rs b/protocols/request-response/src/throttled.rs index c882f41b211..2b8693bc437 100644 --- a/protocols/request-response/src/throttled.rs +++ b/protocols/request-response/src/throttled.rs @@ -40,11 +40,13 @@ use super::{ ProtocolSupport, RequestId, RequestResponse, RequestResponseCodec, RequestResponseConfig, RequestResponseEvent, RequestResponseMessage, }; -use crate::handler::{RequestProtocol, RequestResponseHandler, RequestResponseHandlerEvent}; +use crate::handler::{RequestResponseHandler, RequestResponseHandlerEvent}; use codec::{Codec, Message, ProtocolWrapper, Type}; use futures::ready; use libp2p_core::{connection::ConnectionId, ConnectedPoint, Multiaddr, PeerId}; -use libp2p_swarm::{NetworkBehaviour, NetworkBehaviourAction, PollParameters}; +use libp2p_swarm::{ + DialError, IntoProtocolsHandler, NetworkBehaviour, NetworkBehaviourAction, PollParameters, +}; use lru::LruCache; use std::{cmp::max, num::NonZeroU16}; use std::{ @@ -57,7 +59,7 @@ pub type ResponseChannel = super::ResponseChannel>; /// A wrapper around [`RequestResponse`] which adds request limits per peer. pub struct Throttled where - C: RequestResponseCodec + Send, + C: RequestResponseCodec + Clone + Send + 'static, C::Protocol: Sync, { /// A random id used for logging. @@ -439,8 +441,15 @@ where self.behaviour.inject_connection_established(p, id, end) } - fn inject_connection_closed(&mut self, peer: &PeerId, id: &ConnectionId, end: &ConnectedPoint) { - self.behaviour.inject_connection_closed(peer, id, end); + fn inject_connection_closed( + &mut self, + peer: &PeerId, + id: &ConnectionId, + end: &ConnectedPoint, + handler: ::Handler, + ) { + self.behaviour + .inject_connection_closed(peer, id, end, handler); if let Some(info) = self.peer_info.get_mut(peer) { if let Some(grant) = &mut info.recv_budget.grant { log::debug! { "{:08x}: resending credit grant {} to {} after connection closed", @@ -484,8 +493,13 @@ where self.behaviour.inject_disconnected(p) } - fn inject_dial_failure(&mut self, p: &PeerId) { - self.behaviour.inject_dial_failure(p) + fn inject_dial_failure( + &mut self, + p: &PeerId, + handler: Self::ProtocolsHandler, + error: DialError, + ) { + self.behaviour.inject_dial_failure(p, handler, error) } fn inject_event( @@ -501,7 +515,7 @@ where &mut self, cx: &mut Context<'_>, params: &mut impl PollParameters, - ) -> Poll>, Self::OutEvent>> { + ) -> Poll> { loop { if let Some(ev) = self.events.pop_front() { return Poll::Ready(NetworkBehaviourAction::GenerateEvent(ev)); @@ -737,12 +751,18 @@ where RequestResponseEvent::ResponseSent { peer, request_id }, )) } - NetworkBehaviourAction::DialAddress { address } => { - NetworkBehaviourAction::DialAddress { address } - } - NetworkBehaviourAction::DialPeer { peer_id, condition } => { - NetworkBehaviourAction::DialPeer { peer_id, condition } + NetworkBehaviourAction::DialAddress { address, handler } => { + NetworkBehaviourAction::DialAddress { address, handler } } + NetworkBehaviourAction::DialPeer { + peer_id, + condition, + handler, + } => NetworkBehaviourAction::DialPeer { + peer_id, + condition, + handler, + }, NetworkBehaviourAction::NotifyHandler { peer_id, handler, diff --git a/protocols/request-response/tests/ping.rs b/protocols/request-response/tests/ping.rs index 626f4effef3..884a378bcce 100644 --- a/protocols/request-response/tests/ping.rs +++ b/protocols/request-response/tests/ping.rs @@ -39,6 +39,7 @@ use std::{io, iter}; #[test] fn is_response_outbound() { + let _ = env_logger::try_init(); let ping = Ping("ping".to_string().into_bytes()); let offline_peer = PeerId::random(); diff --git a/swarm-derive/CHANGELOG.md b/swarm-derive/CHANGELOG.md index 94ea25f6b10..335d0fb28e8 100644 --- a/swarm-derive/CHANGELOG.md +++ b/swarm-derive/CHANGELOG.md @@ -1,3 +1,9 @@ +# 0.25.0 [unreleased] + +- Update to latest `libp2p-swarm` changes (see [PR 2191]). + +[PR 2191]: https://github.com/libp2p/rust-libp2p/pull/2191 + # 0.24.0 [2021-07-12] - Handle `NetworkBehaviourAction::CloseConnection`. See [PR 2110] for details. diff --git a/swarm-derive/Cargo.toml b/swarm-derive/Cargo.toml index e63b7a71dd8..2da7fc8a34b 100644 --- a/swarm-derive/Cargo.toml +++ b/swarm-derive/Cargo.toml @@ -2,7 +2,7 @@ name = "libp2p-swarm-derive" edition = "2018" description = "Procedural macros of libp2p-core" -version = "0.24.0" +version = "0.25.0" authors = ["Parity Technologies "] license = "MIT" repository = "https://github.com/libp2p/rust-libp2p" diff --git a/swarm-derive/src/lib.rs b/swarm-derive/src/lib.rs index 3f92e549fa9..eef08d15c07 100644 --- a/swarm-derive/src/lib.rs +++ b/swarm-derive/src/lib.rs @@ -57,6 +57,7 @@ fn build_struct(ast: &DeriveInput, data_struct: &DataStruct) -> TokenStream { let connection_id = quote! {::libp2p::core::connection::ConnectionId}; let connected_point = quote! {::libp2p::core::ConnectedPoint}; let listener_id = quote! {::libp2p::core::connection::ListenerId}; + let dial_error = quote! {::libp2p::swarm::DialError}; let poll_parameters = quote! {::libp2p::swarm::PollParameters}; @@ -223,15 +224,33 @@ fn build_struct(ast: &DeriveInput, data_struct: &DataStruct) -> TokenStream { // Build the list of statements to put in the body of `inject_connection_closed()`. let inject_connection_closed_stmts = { - data_struct.fields.iter().enumerate().filter_map(move |(field_n, field)| { - if is_ignored(&field) { - return None; - } - Some(match field.ident { - Some(ref i) => quote!{ self.#i.inject_connection_closed(peer_id, connection_id, endpoint); }, - None => quote!{ self.#field_n.inject_connection_closed(peer_id, connection_id, endpoint); }, + data_struct + .fields + .iter() + .enumerate() + // The outmost handler belongs to the last behaviour. + .rev() + .filter(|f| !is_ignored(&f.1)) + .enumerate() + .map(move |(enum_n, (field_n, field))| { + let handler = if field_n == 0 { + // Given that the iterator is reversed, this is the innermost handler only. + quote! { let handler = handlers } + } else { + quote! { + let (handlers, handler) = handlers.into_inner() + } + }; + let inject = match field.ident { + Some(ref i) => quote!{ self.#i.inject_connection_closed(peer_id, connection_id, endpoint, handler) }, + None => quote!{ self.#enum_n.inject_connection_closed(peer_id, connection_id, endpoint, handler) }, + }; + + quote! { + #handler; + #inject; + } }) - }) }; // Build the list of statements to put in the body of `inject_addr_reach_failure()`. @@ -255,15 +274,63 @@ fn build_struct(ast: &DeriveInput, data_struct: &DataStruct) -> TokenStream { .fields .iter() .enumerate() - .filter_map(move |(field_n, field)| { - if is_ignored(&field) { - return None; + // The outmost handler belongs to the last behaviour. + .rev() + .filter(|f| !is_ignored(&f.1)) + .enumerate() + .map(move |(enum_n, (field_n, field))| { + let handler = if field_n == 0 { + // Given that the iterator is reversed, this is the innermost handler only. + quote! { let handler = handlers } + } else { + quote! { + let (handlers, handler) = handlers.into_inner() + } + }; + + let inject = match field.ident { + Some(ref i) => { + quote! { self.#i.inject_dial_failure(peer_id, handler, error.clone()) } + } + None => { + quote! { self.#enum_n.inject_dial_failure(peer_id, handler, error.clone()) } + } + }; + + quote! { + #handler; + #inject; } + }) + }; - Some(match field.ident { - Some(ref i) => quote! { self.#i.inject_dial_failure(peer_id); }, - None => quote! { self.#field_n.inject_dial_failure(peer_id); }, - }) + // Build the list of statements to put in the body of `inject_listen_failure()`. + let inject_listen_failure_stmts = { + data_struct + .fields + .iter() + .enumerate() + .rev() + .filter(|f| !is_ignored(&f.1)) + .enumerate() + .map(move |(enum_n, (field_n, field))| { + let handler = if field_n == 0 { + quote! { let handler = handlers } + } else { + quote! { + let (handlers, handler) = handlers.into_inner() + } + }; + + let inject = match field.ident { + Some(ref i) => quote! { self.#i.inject_listen_failure(local_addr, send_back_addr, handler) }, + None => quote! { self.#enum_n.inject_listen_failure(local_addr, send_back_addr, handler) }, + }; + + quote! { + #handler; + #inject; + } }) }; @@ -426,6 +493,7 @@ fn build_struct(ast: &DeriveInput, data_struct: &DataStruct) -> TokenStream { ref mut ev @ None => *ev = Some(field_info), } } + // ph_ty = Some(quote! ) ph_ty.unwrap_or(quote! {()}) // TODO: `!` instead }; @@ -456,7 +524,7 @@ fn build_struct(ast: &DeriveInput, data_struct: &DataStruct) -> TokenStream { } } - out_handler.unwrap_or(quote! {()}) // TODO: incorrect + out_handler.unwrap_or(quote! {()}) // TODO: See test `empty`. }; // The method to use to poll. @@ -500,6 +568,42 @@ fn build_struct(ast: &DeriveInput, data_struct: &DataStruct) -> TokenStream { wrapped_event = quote!{ #either_ident::First(#wrapped_event) }; } + // `DialPeer` and `DialAddress` each provide a handler of the specific + // behaviour triggering the event. Though in order for the final handler + // to be able to handle protocols of all behaviours, the provided + // handler needs to be combined with handlers of all other behaviours. + let provided_handler_and_new_handlers = { + let mut out_handler = None; + + for (f_n, f) in data_struct.fields.iter().enumerate() { + if is_ignored(&f) { + continue; + } + + let f_name = match f.ident { + Some(ref i) => quote! { self.#i }, + None => quote! { self.#f_n }, + }; + + let builder = if field_n == f_n { + // The behaviour that triggered the event. Thus, instead of + // creating a new handler, use the provided handler. + quote! { provided_handler } + } else { + quote! { #f_name.new_handler() } + }; + + match out_handler { + Some(h) => { + out_handler = Some(quote! { #into_protocols_handler::select(#h, #builder) }) + } + ref mut h @ None => *h = Some(builder), + } + } + + out_handler.unwrap_or(quote! {()}) // TODO: See test `empty`. + }; + let generate_event_match_arm = if event_process { quote! { std::task::Poll::Ready(#network_behaviour_action::GenerateEvent(event)) => { @@ -518,11 +622,11 @@ fn build_struct(ast: &DeriveInput, data_struct: &DataStruct) -> TokenStream { loop { match #trait_to_impl::poll(&mut #field_name, cx, poll_params) { #generate_event_match_arm - std::task::Poll::Ready(#network_behaviour_action::DialAddress { address }) => { - return std::task::Poll::Ready(#network_behaviour_action::DialAddress { address }); + std::task::Poll::Ready(#network_behaviour_action::DialAddress { address, handler: provided_handler }) => { + return std::task::Poll::Ready(#network_behaviour_action::DialAddress { address, handler: #provided_handler_and_new_handlers }); } - std::task::Poll::Ready(#network_behaviour_action::DialPeer { peer_id, condition }) => { - return std::task::Poll::Ready(#network_behaviour_action::DialPeer { peer_id, condition }); + std::task::Poll::Ready(#network_behaviour_action::DialPeer { peer_id, condition, handler: provided_handler }) => { + return std::task::Poll::Ready(#network_behaviour_action::DialPeer { peer_id, condition, handler: #provided_handler_and_new_handlers }); } std::task::Poll::Ready(#network_behaviour_action::NotifyHandler { peer_id, handler, event }) => { return std::task::Poll::Ready(#network_behaviour_action::NotifyHandler { @@ -578,7 +682,7 @@ fn build_struct(ast: &DeriveInput, data_struct: &DataStruct) -> TokenStream { #(#inject_address_change_stmts);* } - fn inject_connection_closed(&mut self, peer_id: &#peer_id, connection_id: &#connection_id, endpoint: &#connected_point) { + fn inject_connection_closed(&mut self, peer_id: &#peer_id, connection_id: &#connection_id, endpoint: &#connected_point, handlers: ::Handler) { #(#inject_connection_closed_stmts);* } @@ -586,10 +690,14 @@ fn build_struct(ast: &DeriveInput, data_struct: &DataStruct) -> TokenStream { #(#inject_addr_reach_failure_stmts);* } - fn inject_dial_failure(&mut self, peer_id: &#peer_id) { + fn inject_dial_failure(&mut self, peer_id: &#peer_id, handlers: Self::ProtocolsHandler, error: #dial_error) { #(#inject_dial_failure_stmts);* } + fn inject_listen_failure(&mut self, local_addr: &#multiaddr, send_back_addr: &#multiaddr, handlers: Self::ProtocolsHandler) { + #(#inject_listen_failure_stmts);* + } + fn inject_new_listener(&mut self, id: #listener_id) { #(#inject_new_listener_stmts);* } @@ -629,10 +737,10 @@ fn build_struct(ast: &DeriveInput, data_struct: &DataStruct) -> TokenStream { } } - fn poll(&mut self, cx: &mut std::task::Context, poll_params: &mut impl #poll_parameters) -> std::task::Poll<#network_behaviour_action<<::Handler as #protocols_handler>::InEvent, Self::OutEvent>> { + fn poll(&mut self, cx: &mut std::task::Context, poll_params: &mut impl #poll_parameters) -> std::task::Poll<#network_behaviour_action> { use libp2p::futures::prelude::*; #(#poll_stmts)* - let f: std::task::Poll<#network_behaviour_action<<::Handler as #protocols_handler>::InEvent, Self::OutEvent>> = #poll_method; + let f: std::task::Poll<#network_behaviour_action> = #poll_method; f } } diff --git a/swarm-derive/tests/test.rs b/swarm-derive/tests/test.rs index 78a9ed985f9..ef457c83776 100644 --- a/swarm-derive/tests/test.rs +++ b/swarm-derive/tests/test.rs @@ -19,7 +19,7 @@ // DEALINGS IN THE SOFTWARE. use futures::prelude::*; -use libp2p::swarm::SwarmEvent; +use libp2p::swarm::{NetworkBehaviour, SwarmEvent}; use libp2p_swarm_derive::*; /// Small utility to check that a type implements `NetworkBehaviour`. @@ -149,11 +149,16 @@ fn custom_polling() { } impl Foo { - fn foo( + fn foo( &mut self, _: &mut std::task::Context, _: &mut impl libp2p::swarm::PollParameters, - ) -> std::task::Poll> { + ) -> std::task::Poll< + libp2p::swarm::NetworkBehaviourAction< + ::OutEvent, + ::ProtocolsHandler, + >, + > { std::task::Poll::Pending } } @@ -207,11 +212,16 @@ fn custom_event_and_polling() { } impl Foo { - fn foo( + fn foo( &mut self, _: &mut std::task::Context, _: &mut impl libp2p::swarm::PollParameters, - ) -> std::task::Poll> { + ) -> std::task::Poll< + libp2p::swarm::NetworkBehaviourAction< + ::OutEvent, + ::ProtocolsHandler, + >, + > { std::task::Poll::Pending } } diff --git a/swarm/CHANGELOG.md b/swarm/CHANGELOG.md index 4eacf3dfe45..5894591c3bd 100644 --- a/swarm/CHANGELOG.md +++ b/swarm/CHANGELOG.md @@ -19,10 +19,34 @@ - Implement `ProtocolsHandler` on `either::Either`representing either of two `ProtocolsHandler` implementations (see [PR 2192]). +- Require implementation to provide handler in + `NetworkBehaviourAction::DialPeer` and `NetworkBehaviourAction::DialAddress`. + Note that the handler is returned to the `NetworkBehaviour` on connection + failure and connection closing. Thus it can be used to carry state, which + otherwise would have to be tracked in the `NetworkBehaviour` itself. E.g. a + message destined to an unconnected peer can be included in the handler, and + thus directly send on connection success or extracted by the + `NetworkBehaviour` on connection failure (see [PR 2191]). + +- Include handler in `NetworkBehaviour::inject_dial_failure`, + `NetworkBehaviour::inject_connection_closed`, + `NetworkBehaviour::inject_listen_failure` (see [PR 2191]). + +- Include error in `NetworkBehaviour::inject_dial_failure` and call + `NetworkBehaviour::inject_dial_failure` on `DialPeerCondition` evaluating to + false. To emulate the previous behaviour, return early within + `inject_dial_failure` on `DialError::DialPeerConditionFalse`. See [PR 2191]. + +- Make `NetworkBehaviourAction` generic over `NetworkBehaviour::OutEvent` and + `NetworkBehaviour::ProtocolsHandler`. In most cases, change your generic type + parameters to `NetworkBehaviourAction`. See [PR 2191]. + [PR 2150]: https://github.com/libp2p/rust-libp2p/pull/2150 [PR 2182]: https://github.com/libp2p/rust-libp2p/pull/2182 [PR 2183]: https://github.com/libp2p/rust-libp2p/pull/2183 [PR 2192]: https://github.com/libp2p/rust-libp2p/pull/2192 +[PR 2191]: https://github.com/libp2p/rust-libp2p/pull/2191 # 0.30.0 [2021-07-12] diff --git a/swarm/Cargo.toml b/swarm/Cargo.toml index 33b482f2a0f..b8e5d77a242 100644 --- a/swarm/Cargo.toml +++ b/swarm/Cargo.toml @@ -20,7 +20,6 @@ wasm-timer = "0.2" void = "1" [dev-dependencies] -libp2p-mplex = { path = "../muxers/mplex" } -libp2p-noise = { path = "../transports/noise" } +libp2p = { path = "../", default-features = false, features = ["yamux", "plaintext"] } quickcheck = "0.9.0" rand = "0.7.2" diff --git a/swarm/src/behaviour.rs b/swarm/src/behaviour.rs index a21c7a023b8..085439aa1b1 100644 --- a/swarm/src/behaviour.rs +++ b/swarm/src/behaviour.rs @@ -19,13 +19,17 @@ // DEALINGS IN THE SOFTWARE. use crate::protocols_handler::{IntoProtocolsHandler, ProtocolsHandler}; -use crate::{AddressRecord, AddressScore}; +use crate::{AddressRecord, AddressScore, DialError}; use libp2p_core::{ connection::{ConnectionId, ListenerId}, ConnectedPoint, Multiaddr, PeerId, }; use std::{error, task::Context, task::Poll}; +/// Custom event that can be received by the [`ProtocolsHandler`]. +type THandlerInEvent = + <::Handler as ProtocolsHandler>::InEvent; + /// A behaviour for the network. Allows customizing the swarm. /// /// This trait has been designed to be composable. Multiple implementations can be combined into @@ -65,16 +69,20 @@ pub trait NetworkBehaviour: Send + 'static { /// Creates a new `ProtocolsHandler` for a connection with a peer. /// - /// Every time an incoming connection is opened, and every time we start dialing a node, this - /// method is called. + /// Every time an incoming connection is opened, and every time another [`NetworkBehaviour`] + /// emitted a dial request, this method is called. /// /// The returned object is a handler for that specific connection, and will be moved to a /// background task dedicated to that connection. /// - /// The network behaviour (ie. the implementation of this trait) and the handlers it has - /// spawned (ie. the objects returned by `new_handler`) can communicate by passing messages. - /// Messages sent from the handler to the behaviour are injected with `inject_event`, and - /// the behaviour can send a message to the handler by making `poll` return `SendEvent`. + /// The network behaviour (ie. the implementation of this trait) and the handlers it has spawned + /// (ie. the objects returned by `new_handler`) can communicate by passing messages. Messages + /// sent from the handler to the behaviour are injected with [`NetworkBehaviour::inject_event`], + /// and the behaviour can send a message to the handler by making [`NetworkBehaviour::poll`] + /// return [`NetworkBehaviourAction::NotifyHandler`]. + /// + /// Note that the handler is returned to the [`NetworkBehaviour`] on connection failure and + /// connection closing. fn new_handler(&mut self) -> Self::ProtocolsHandler; /// Addresses that this behaviour is aware of for this specific peer, and that may allow @@ -112,7 +120,14 @@ pub trait NetworkBehaviour: Send + 'static { /// A call to this method is always paired with an earlier call to /// `inject_connection_established` with the same peer ID, connection ID and /// endpoint. - fn inject_connection_closed(&mut self, _: &PeerId, _: &ConnectionId, _: &ConnectedPoint) {} + fn inject_connection_closed( + &mut self, + _: &PeerId, + _: &ConnectionId, + _: &ConnectedPoint, + _: ::Handler, + ) { + } /// Informs the behaviour that the [`ConnectedPoint`] of an existing connection has changed. fn inject_address_change( @@ -153,7 +168,26 @@ pub trait NetworkBehaviour: Send + 'static { /// /// The `peer_id` is guaranteed to be in a disconnected state. In other words, /// `inject_connected` has not been called, or `inject_disconnected` has been called since then. - fn inject_dial_failure(&mut self, _peer_id: &PeerId) {} + fn inject_dial_failure( + &mut self, + _peer_id: &PeerId, + _handler: Self::ProtocolsHandler, + _error: DialError, + ) { + } + + /// Indicates to the behaviour that an error happened on an incoming connection during its + /// initial handshake. + /// + /// This can include, for example, an error during the handshake of the encryption layer, or the + /// connection unexpectedly closed. + fn inject_listen_failure( + &mut self, + _local_addr: &Multiaddr, + _send_back_addr: &Multiaddr, + _handler: Self::ProtocolsHandler, + ) { + } /// Indicates to the behaviour that a new listener was created. fn inject_new_listener(&mut self, _id: ListenerId) {} @@ -182,8 +216,11 @@ pub trait NetworkBehaviour: Send + 'static { /// /// This API mimics the API of the `Stream` trait. The method may register the current task in /// order to wake it up at a later point in time. - fn poll(&mut self, cx: &mut Context<'_>, params: &mut impl PollParameters) - -> Poll::Handler as ProtocolsHandler>::InEvent, Self::OutEvent>>; + fn poll( + &mut self, + cx: &mut Context<'_>, + params: &mut impl PollParameters, + ) -> Poll>; } /// Parameters passed to `poll()`, that the `NetworkBehaviour` has access to. @@ -228,15 +265,35 @@ pub trait NetworkBehaviourEventProcess { /// in whose context it is executing. /// /// [`Swarm`]: super::Swarm +// +// Note: `TInEvent` is needed to be able to implement +// [`NetworkBehaviourAction::map_in`], mapping the handler `InEvent` leaving the +// handler itself untouched. #[derive(Debug)] -pub enum NetworkBehaviourAction { +pub enum NetworkBehaviourAction< + TOutEvent, + THandler: IntoProtocolsHandler, + TInEvent = THandlerInEvent, +> { /// Instructs the `Swarm` to return an event when it is being polled. GenerateEvent(TOutEvent), /// Instructs the swarm to dial the given multiaddress optionally including a [`PeerId`]. + /// + /// On success, [`NetworkBehaviour::inject_connection_established`] is invoked. + /// On failure, [`NetworkBehaviour::inject_dial_failure`] is invoked. + /// + /// Note that the provided handler is returned to the [`NetworkBehaviour`] on connection failure + /// and connection closing. Thus it can be used to carry state, which otherwise would have to be + /// tracked in the [`NetworkBehaviour`] itself. E.g. a message destined to an unconnected peer + /// can be included in the handler, and thus directly send on connection success or extracted by + /// the [`NetworkBehaviour`] on connection failure. See [`NetworkBehaviourAction::DialPeer`] for + /// example. DialAddress { /// The address to dial. address: Multiaddr, + /// The handler to be used to handle the connection to the peer. + handler: THandler, }, /// Instructs the swarm to dial a known `PeerId`. @@ -247,13 +304,194 @@ pub enum NetworkBehaviourAction { /// If we were already trying to dial this node, the addresses that are not yet in the queue of /// addresses to try are added back to this queue. /// - /// On success, [`NetworkBehaviour::inject_connected`] is invoked. + /// On success, [`NetworkBehaviour::inject_connection_established`] is invoked. /// On failure, [`NetworkBehaviour::inject_dial_failure`] is invoked. + /// + /// Note that the provided handler is returned to the [`NetworkBehaviour`] on connection failure + /// and connection closing. Thus it can be used to carry state, which otherwise would have to be + /// tracked in the [`NetworkBehaviour`] itself. E.g. a message destined to an unconnected peer + /// can be included in the handler, and thus directly send on connection success or extracted by + /// the [`NetworkBehaviour`] on connection failure. + /// + /// # Example + /// + /// ```rust + /// # use futures::executor::block_on; + /// # use futures::stream::StreamExt; + /// # use libp2p::core::connection::ConnectionId; + /// # use libp2p::core::identity; + /// # use libp2p::core::transport::{MemoryTransport, Transport}; + /// # use libp2p::core::upgrade::{self, DeniedUpgrade, InboundUpgrade, OutboundUpgrade}; + /// # use libp2p::core::PeerId; + /// # use libp2p::plaintext::PlainText2Config; + /// # use libp2p::swarm::{ + /// # DialError, DialPeerCondition, IntoProtocolsHandler, KeepAlive, NegotiatedSubstream, + /// # NetworkBehaviour, NetworkBehaviourAction, PollParameters, ProtocolsHandler, + /// # ProtocolsHandlerEvent, ProtocolsHandlerUpgrErr, SubstreamProtocol, Swarm, SwarmEvent, + /// # }; + /// # use libp2p::yamux; + /// # use std::collections::VecDeque; + /// # use std::task::{Context, Poll}; + /// # use void::Void; + /// # + /// # let local_key = identity::Keypair::generate_ed25519(); + /// # let local_public_key = local_key.public(); + /// # let local_peer_id = PeerId::from(local_public_key.clone()); + /// # + /// # let transport = MemoryTransport::default() + /// # .upgrade(upgrade::Version::V1) + /// # .authenticate(PlainText2Config { local_public_key }) + /// # .multiplex(yamux::YamuxConfig::default()) + /// # .boxed(); + /// # + /// # let mut swarm = Swarm::new(transport, MyBehaviour::default(), local_peer_id); + /// # + /// // Super precious message that we should better not lose. + /// let message = PreciousMessage("My precious message".to_string()); + /// + /// // Unfortunately this peer is offline, thus sending our message to it will fail. + /// let offline_peer = PeerId::random(); + /// + /// // Let's send it anyways. We should get it back in case connecting to the peer fails. + /// swarm.behaviour_mut().send(offline_peer, message); + /// + /// block_on(async { + /// // As expected, sending failed. But great news, we got our message back. + /// matches!( + /// swarm.next().await.expect("Infinite stream"), + /// SwarmEvent::Behaviour(PreciousMessage(_)) + /// ); + /// }); + /// + /// # #[derive(Default)] + /// # struct MyBehaviour { + /// # outbox_to_swarm: VecDeque>, + /// # } + /// # + /// # impl MyBehaviour { + /// # fn send(&mut self, peer_id: PeerId, msg: PreciousMessage) { + /// # self.outbox_to_swarm + /// # .push_back(NetworkBehaviourAction::DialPeer { + /// # peer_id, + /// # condition: DialPeerCondition::Always, + /// # handler: MyHandler { message: Some(msg) }, + /// # }); + /// # } + /// # } + /// # + /// impl NetworkBehaviour for MyBehaviour { + /// # type ProtocolsHandler = MyHandler; + /// # type OutEvent = PreciousMessage; + /// # + /// # fn new_handler(&mut self) -> Self::ProtocolsHandler { + /// # MyHandler { message: None } + /// # } + /// # + /// # + /// # fn inject_event( + /// # &mut self, + /// # _: PeerId, + /// # _: ConnectionId, + /// # _: <::Handler as ProtocolsHandler>::OutEvent, + /// # ) { + /// # unreachable!(); + /// # } + /// # + /// fn inject_dial_failure( + /// &mut self, + /// _: &PeerId, + /// handler: Self::ProtocolsHandler, + /// _: DialError, + /// ) { + /// // As expected, sending the message failed. But lucky us, we got the handler back, thus + /// // the precious message is not lost and we can return it back to the user. + /// let msg = handler.message.unwrap(); + /// self.outbox_to_swarm + /// .push_back(NetworkBehaviourAction::GenerateEvent(msg)) + /// } + /// # + /// # fn poll( + /// # &mut self, + /// # _: &mut Context<'_>, + /// # _: &mut impl PollParameters, + /// # ) -> Poll> { + /// # if let Some(action) = self.outbox_to_swarm.pop_front() { + /// # return Poll::Ready(action); + /// # } + /// # Poll::Pending + /// # } + /// } + /// + /// # struct MyHandler { + /// # message: Option, + /// # } + /// # + /// # impl ProtocolsHandler for MyHandler { + /// # type InEvent = Void; + /// # type OutEvent = Void; + /// # type Error = Void; + /// # type InboundProtocol = DeniedUpgrade; + /// # type OutboundProtocol = DeniedUpgrade; + /// # type InboundOpenInfo = (); + /// # type OutboundOpenInfo = Void; + /// # + /// # fn listen_protocol( + /// # &self, + /// # ) -> SubstreamProtocol { + /// # SubstreamProtocol::new(DeniedUpgrade, ()) + /// # } + /// # + /// # fn inject_fully_negotiated_inbound( + /// # &mut self, + /// # _: >::Output, + /// # _: Self::InboundOpenInfo, + /// # ) { + /// # } + /// # + /// # fn inject_fully_negotiated_outbound( + /// # &mut self, + /// # _: >::Output, + /// # _: Self::OutboundOpenInfo, + /// # ) { + /// # } + /// # + /// # fn inject_event(&mut self, _event: Self::InEvent) {} + /// # + /// # fn inject_dial_upgrade_error( + /// # &mut self, + /// # _: Self::OutboundOpenInfo, + /// # _: ProtocolsHandlerUpgrErr, + /// # ) { + /// # } + /// # + /// # fn connection_keep_alive(&self) -> KeepAlive { + /// # KeepAlive::Yes + /// # } + /// # + /// # fn poll( + /// # &mut self, + /// # _: &mut Context<'_>, + /// # ) -> Poll< + /// # ProtocolsHandlerEvent< + /// # Self::OutboundProtocol, + /// # Self::OutboundOpenInfo, + /// # Self::OutEvent, + /// # Self::Error, + /// # >, + /// # > { + /// # todo!("If `Self::message.is_some()` send the message to the remote.") + /// # } + /// # } + /// # #[derive(Debug, PartialEq, Eq)] + /// # struct PreciousMessage(String); + /// ``` DialPeer { /// The peer to try reach. peer_id: PeerId, /// The condition for initiating a new dialing attempt. condition: DialPeerCondition, + /// The handler to be used to handle the connection to the peer. + handler: THandler, }, /// Instructs the `Swarm` to send an event to the handler dedicated to a @@ -314,17 +552,28 @@ pub enum NetworkBehaviourAction { }, } -impl NetworkBehaviourAction { +impl + NetworkBehaviourAction +{ /// Map the handler event. - pub fn map_in(self, f: impl FnOnce(TInEvent) -> E) -> NetworkBehaviourAction { + pub fn map_in( + self, + f: impl FnOnce(TInEventOld) -> TInEventNew, + ) -> NetworkBehaviourAction { match self { NetworkBehaviourAction::GenerateEvent(e) => NetworkBehaviourAction::GenerateEvent(e), - NetworkBehaviourAction::DialAddress { address } => { - NetworkBehaviourAction::DialAddress { address } - } - NetworkBehaviourAction::DialPeer { peer_id, condition } => { - NetworkBehaviourAction::DialPeer { peer_id, condition } + NetworkBehaviourAction::DialAddress { address, handler } => { + NetworkBehaviourAction::DialAddress { address, handler } } + NetworkBehaviourAction::DialPeer { + peer_id, + condition, + handler, + } => NetworkBehaviourAction::DialPeer { + peer_id, + condition, + handler, + }, NetworkBehaviourAction::NotifyHandler { peer_id, handler, @@ -346,17 +595,25 @@ impl NetworkBehaviourAction { }, } } +} +impl NetworkBehaviourAction { /// Map the event the swarm will return. - pub fn map_out(self, f: impl FnOnce(TOutEvent) -> E) -> NetworkBehaviourAction { + pub fn map_out(self, f: impl FnOnce(TOutEvent) -> E) -> NetworkBehaviourAction { match self { NetworkBehaviourAction::GenerateEvent(e) => NetworkBehaviourAction::GenerateEvent(f(e)), - NetworkBehaviourAction::DialAddress { address } => { - NetworkBehaviourAction::DialAddress { address } - } - NetworkBehaviourAction::DialPeer { peer_id, condition } => { - NetworkBehaviourAction::DialPeer { peer_id, condition } + NetworkBehaviourAction::DialAddress { address, handler } => { + NetworkBehaviourAction::DialAddress { address, handler } } + NetworkBehaviourAction::DialPeer { + peer_id, + condition, + handler, + } => NetworkBehaviourAction::DialPeer { + peer_id, + condition, + handler, + }, NetworkBehaviourAction::NotifyHandler { peer_id, handler, @@ -380,6 +637,60 @@ impl NetworkBehaviourAction { } } +impl NetworkBehaviourAction +where + THandlerOld: IntoProtocolsHandler, + ::Handler: ProtocolsHandler, +{ + /// Map the handler. + pub fn map_handler( + self, + f: impl FnOnce(THandlerOld) -> THandlerNew, + ) -> NetworkBehaviourAction + where + THandlerNew: IntoProtocolsHandler, + ::Handler: ProtocolsHandler, + { + match self { + NetworkBehaviourAction::GenerateEvent(e) => NetworkBehaviourAction::GenerateEvent(e), + NetworkBehaviourAction::DialAddress { address, handler } => { + NetworkBehaviourAction::DialAddress { + address, + handler: f(handler), + } + } + NetworkBehaviourAction::DialPeer { + peer_id, + condition, + handler, + } => NetworkBehaviourAction::DialPeer { + peer_id, + condition, + handler: f(handler), + }, + NetworkBehaviourAction::NotifyHandler { + peer_id, + handler, + event, + } => NetworkBehaviourAction::NotifyHandler { + peer_id, + handler, + event: event, + }, + NetworkBehaviourAction::ReportObservedAddr { address, score } => { + NetworkBehaviourAction::ReportObservedAddr { address, score } + } + NetworkBehaviourAction::CloseConnection { + peer_id, + connection, + } => NetworkBehaviourAction::CloseConnection { + peer_id, + connection, + }, + } + } +} + /// The options w.r.t. which connection handler to notify of an event. #[derive(Debug, Clone)] pub enum NotifyHandler { @@ -392,7 +703,6 @@ pub enum NotifyHandler { /// The available conditions under which a new dialing attempt to /// a peer is initiated when requested by [`NetworkBehaviourAction::DialPeer`]. #[derive(Debug, Copy, Clone)] -#[non_exhaustive] pub enum DialPeerCondition { /// A new dialing attempt is initiated _only if_ the peer is currently /// considered disconnected, i.e. there is no established connection diff --git a/swarm/src/lib.rs b/swarm/src/lib.rs index 0bc718690be..dcccd46af79 100644 --- a/swarm/src/lib.rs +++ b/swarm/src/lib.rs @@ -82,8 +82,8 @@ use libp2p_core::{ }, muxing::StreamMuxerBox, network::{ - self, peer::ConnectedPeer, ConnectionLimits, Network, NetworkConfig, NetworkEvent, - NetworkInfo, + self, peer::ConnectedPeer, ConnectionLimits, DialAttemptsRemaining, Network, NetworkConfig, + NetworkEvent, NetworkInfo, }, transport::{self, TransportError}, upgrade::ProtocolName, @@ -331,19 +331,40 @@ where /// Initiates a new dialing attempt to the given address. pub fn dial_addr(&mut self, addr: Multiaddr) -> Result<(), DialError> { - let handler = self - .behaviour - .new_handler() + let handler = self.behaviour.new_handler(); + self.dial_addr_with_handler(addr, handler) + .map_err(|e| DialError::from_network_dial_error(e)) + .map_err(|(e, _)| e) + } + + fn dial_addr_with_handler( + &mut self, + addr: Multiaddr, + handler: ::ProtocolsHandler, + ) -> Result<(), network::DialError>>> { + let handler = handler .into_node_handler_builder() .with_substream_upgrade_protocol_override(self.substream_upgrade_protocol_override); - Ok(self.network.dial(&addr, handler).map(|_id| ())?) + + self.network.dial(&addr, handler).map(|_id| ()) } /// Initiates a new dialing attempt to the given peer. pub fn dial(&mut self, peer_id: &PeerId) -> Result<(), DialError> { + let handler = self.behaviour.new_handler(); + self.dial_with_handler(peer_id, handler) + } + + fn dial_with_handler( + &mut self, + peer_id: &PeerId, + handler: ::ProtocolsHandler, + ) -> Result<(), DialError> { if self.banned_peers.contains(peer_id) { - self.behaviour.inject_dial_failure(peer_id); - return Err(DialError::Banned); + let error = DialError::Banned; + self.behaviour + .inject_dial_failure(peer_id, handler, error.clone()); + return Err(error); } let self_listening = &self.listened_addrs; @@ -353,31 +374,31 @@ where .into_iter() .filter(|a| !self_listening.contains(a)); - let result = if let Some(first) = addrs.next() { - let handler = self - .behaviour - .new_handler() - .into_node_handler_builder() - .with_substream_upgrade_protocol_override(self.substream_upgrade_protocol_override); - self.network - .peer(*peer_id) - .dial(first, addrs, handler) - .map(|_| ()) - .map_err(DialError::from) - } else { - Err(DialError::NoAddresses) + let first = match addrs.next() { + Some(first) => first, + None => { + let error = DialError::NoAddresses; + self.behaviour + .inject_dial_failure(peer_id, handler, error.clone()); + return Err(error); + } }; - if let Err(error) = &result { - log::debug!( - "New dialing attempt to peer {:?} failed: {:?}.", - peer_id, - error - ); - self.behaviour.inject_dial_failure(&peer_id); + let handler = handler + .into_node_handler_builder() + .with_substream_upgrade_protocol_override(self.substream_upgrade_protocol_override); + match self.network.peer(*peer_id).dial(first, addrs, handler) { + Ok(_connection_id) => Ok(()), + Err(error) => { + let (error, handler) = DialError::from_network_dial_error(error); + self.behaviour.inject_dial_failure( + &peer_id, + handler.into_protocols_handler(), + error.clone(), + ); + Err(error) + } } - - result } /// Returns an iterator that produces the list of addresses we're listening on. @@ -568,6 +589,7 @@ where connected, error, num_established, + handler, }) => { if let Some(error) = error.as_ref() { log::debug!("Connection {:?} closed: {:?}", connected, error); @@ -576,8 +598,12 @@ where } let peer_id = connected.peer_id; let endpoint = connected.endpoint; - this.behaviour - .inject_connection_closed(&peer_id, &id, &endpoint); + this.behaviour.inject_connection_closed( + &peer_id, + &id, + &endpoint, + handler.into_protocols_handler(), + ); if num_established == 0 { this.behaviour.inject_disconnected(&peer_id); } @@ -668,8 +694,14 @@ where local_addr, send_back_addr, error, + handler, }) => { log::debug!("Incoming connection failed: {:?}", error); + this.behaviour.inject_listen_failure( + &local_addr, + &send_back_addr, + handler.into_protocols_handler(), + ); return Poll::Ready(SwarmEvent::IncomingConnectionError { local_addr, send_back_addr, @@ -682,19 +714,34 @@ where error, attempts_remaining, }) => { - log::debug!( - "Connection attempt to {:?} via {:?} failed with {:?}. Attempts remaining: {}.", - peer_id, multiaddr, error, attempts_remaining); this.behaviour .inject_addr_reach_failure(Some(&peer_id), &multiaddr, &error); - if attempts_remaining == 0 { - this.behaviour.inject_dial_failure(&peer_id); + + let num_remaining: u32; + match attempts_remaining { + DialAttemptsRemaining::Some(n) => { + num_remaining = n.into(); + } + DialAttemptsRemaining::None(handler) => { + num_remaining = 0; + this.behaviour.inject_dial_failure( + &peer_id, + handler.into_protocols_handler(), + DialError::UnreachableAddr(multiaddr.clone()), + ); + } } + + log::debug!( + "Connection attempt to {:?} via {:?} failed with {:?}. Attempts remaining: {}.", + peer_id, multiaddr, error, num_remaining, + ); + return Poll::Ready(SwarmEvent::UnreachableAddr { peer_id, address: multiaddr, error, - attempts_remaining, + attempts_remaining: num_remaining, }); } Poll::Ready(NetworkEvent::UnknownPeerDialError { @@ -761,44 +808,48 @@ where Poll::Ready(NetworkBehaviourAction::GenerateEvent(event)) => { return Poll::Ready(SwarmEvent::Behaviour(event)) } - Poll::Ready(NetworkBehaviourAction::DialAddress { address }) => { - let _ = Swarm::dial_addr(&mut *this, address); + Poll::Ready(NetworkBehaviourAction::DialAddress { address, handler }) => { + let _ = Swarm::dial_addr_with_handler(&mut *this, address, handler); } - Poll::Ready(NetworkBehaviourAction::DialPeer { peer_id, condition }) => { - if this.banned_peers.contains(&peer_id) { - this.behaviour.inject_dial_failure(&peer_id); + Poll::Ready(NetworkBehaviourAction::DialPeer { + peer_id, + condition, + handler, + }) => { + let condition_matched = match condition { + DialPeerCondition::Disconnected => this.network.is_disconnected(&peer_id), + DialPeerCondition::NotDialing => !this.network.is_dialing(&peer_id), + DialPeerCondition::Always => true, + }; + if condition_matched { + if Swarm::dial_with_handler(this, &peer_id, handler).is_ok() { + return Poll::Ready(SwarmEvent::Dialing(peer_id)); + } } else { - let condition_matched = match condition { - DialPeerCondition::Disconnected => { - this.network.is_disconnected(&peer_id) - } - DialPeerCondition::NotDialing => !this.network.is_dialing(&peer_id), - DialPeerCondition::Always => true, - }; - if condition_matched { - if Swarm::dial(this, &peer_id).is_ok() { - return Poll::Ready(SwarmEvent::Dialing(peer_id)); - } - } else { - // Even if the condition for a _new_ dialing attempt is not met, - // we always add any potentially new addresses of the peer to an - // ongoing dialing attempt, if there is one. - log::trace!( - "Condition for new dialing attempt to {:?} not met: {:?}", - peer_id, - condition - ); - let self_listening = &this.listened_addrs; - if let Some(mut peer) = this.network.peer(peer_id).into_dialing() { - let addrs = this.behaviour.addresses_of_peer(peer.id()); - let mut attempt = peer.some_attempt(); - for a in addrs { - if !self_listening.contains(&a) { - attempt.add_address(a); - } + // Even if the condition for a _new_ dialing attempt is not met, + // we always add any potentially new addresses of the peer to an + // ongoing dialing attempt, if there is one. + log::trace!( + "Condition for new dialing attempt to {:?} not met: {:?}", + peer_id, + condition + ); + let self_listening = &this.listened_addrs; + if let Some(mut peer) = this.network.peer(peer_id).into_dialing() { + let addrs = this.behaviour.addresses_of_peer(peer.id()); + let mut attempt = peer.some_attempt(); + for a in addrs { + if !self_listening.contains(&a) { + attempt.add_address(a); } } } + + this.behaviour.inject_dial_failure( + &peer_id, + handler, + DialError::DialPeerConditionFalse(condition), + ); } } Poll::Ready(NetworkBehaviourAction::NotifyHandler { @@ -1148,8 +1199,8 @@ where } } -/// The possible failures of [`Swarm::dial`]. -#[derive(Debug)] +/// The possible failures of dialing. +#[derive(Debug, Clone)] pub enum DialError { /// The peer is currently banned. Banned, @@ -1158,16 +1209,27 @@ pub enum DialError { ConnectionLimit(ConnectionLimit), /// The address given for dialing is invalid. InvalidAddress(Multiaddr), + /// Tried to dial an address but it ended up being unreachaable. + UnreachableAddr(Multiaddr), + /// The peer being dialed is the local peer and thus the dial was aborted. + LocalPeerId, /// [`NetworkBehaviour::addresses_of_peer`] returned no addresses /// for the peer to dial. NoAddresses, + /// The provided [`DialPeerCondition`] evaluated to false and thus the dial was aborted. + DialPeerConditionFalse(DialPeerCondition), } -impl From for DialError { - fn from(err: network::DialError) -> DialError { - match err { - network::DialError::ConnectionLimit(l) => DialError::ConnectionLimit(l), - network::DialError::InvalidAddress(a) => DialError::InvalidAddress(a), +impl DialError { + fn from_network_dial_error(error: network::DialError) -> (Self, THandler) { + match error { + network::DialError::ConnectionLimit { limit, handler } => { + (DialError::ConnectionLimit(limit), handler) + } + network::DialError::InvalidAddress { address, handler } => { + (DialError::InvalidAddress(address), handler) + } + network::DialError::LocalPeerId { handler } => (DialError::LocalPeerId, handler), } } } @@ -1177,8 +1239,17 @@ impl fmt::Display for DialError { match self { DialError::ConnectionLimit(err) => write!(f, "Dial error: {}", err), DialError::NoAddresses => write!(f, "Dial error: no addresses for peer."), + DialError::LocalPeerId => write!(f, "Dial error: tried to dial local peer id."), DialError::InvalidAddress(a) => write!(f, "Dial error: invalid address: {}", a), + DialError::UnreachableAddr(a) => write!(f, "Dial error: unreachable address: {}", a), DialError::Banned => write!(f, "Dial error: peer is banned."), + DialError::DialPeerConditionFalse(c) => { + write!( + f, + "Dial error: condition {:?} for dialing peer was false.", + c + ) + } } } } @@ -1188,8 +1259,11 @@ impl error::Error for DialError { match self { DialError::ConnectionLimit(err) => Some(err), DialError::InvalidAddress(_) => None, + DialError::UnreachableAddr(_) => None, + DialError::LocalPeerId => None, DialError::NoAddresses => None, DialError::Banned => None, + DialError::DialPeerConditionFalse(_) => None, } } } @@ -1241,12 +1315,7 @@ impl NetworkBehaviour for DummyBehaviour { &mut self, _: &mut Context<'_>, _: &mut impl PollParameters, - ) -> Poll< - NetworkBehaviourAction< - ::InEvent, - Self::OutEvent, - >, - > { + ) -> Poll> { Poll::Pending } } @@ -1257,8 +1326,9 @@ mod tests { use crate::protocols_handler::DummyProtocolsHandler; use crate::test::{CallTraceBehaviour, MockBehaviour}; use futures::{executor, future}; - use libp2p_core::{identity, multiaddr, transport, upgrade}; - use libp2p_noise as noise; + use libp2p::core::{identity, multiaddr, transport, upgrade}; + use libp2p::plaintext; + use libp2p::yamux; // Test execution state. // Connection => Disconnecting => Connecting. @@ -1274,17 +1344,16 @@ mod tests { O: Send + 'static, { let id_keys = identity::Keypair::generate_ed25519(); - let pubkey = id_keys.public(); - let noise_keys = noise::Keypair::::new() - .into_authentic(&id_keys) - .unwrap(); + let local_public_key = id_keys.public(); let transport = transport::MemoryTransport::default() .upgrade(upgrade::Version::V1) - .authenticate(noise::NoiseConfig::xx(noise_keys).into_authenticated()) - .multiplex(libp2p_mplex::MplexConfig::new()) + .authenticate(plaintext::PlainText2Config { + local_public_key: local_public_key.clone(), + }) + .multiplex(yamux::YamuxConfig::default()) .boxed(); let behaviour = CallTraceBehaviour::new(MockBehaviour::new(handler_proto)); - SwarmBuilder::new(transport, behaviour, pubkey.into()).build() + SwarmBuilder::new(transport, behaviour, local_public_key.into()).build() } fn swarms_connected( @@ -1320,17 +1389,15 @@ mod tests { <::Handler as ProtocolsHandler>::OutEvent: Clone { for s in &[swarm1, swarm2] { - if s.behaviour.inject_connection_closed.len() < num_connections { - assert_eq!(s.behaviour.inject_disconnected.len(), 0); - } else { - assert_eq!(s.behaviour.inject_disconnected.len(), 1); - } assert_eq!(s.behaviour.inject_connection_established.len(), 0); assert_eq!(s.behaviour.inject_connected.len(), 0); } [swarm1, swarm2] .iter() .all(|s| s.behaviour.inject_connection_closed.len() == num_connections) + && [swarm1, swarm2] + .iter() + .all(|s| s.behaviour.inject_disconnected.len() == 1) } /// Establishes multiple connections between two peers, diff --git a/swarm/src/protocols_handler/node_handler.rs b/swarm/src/protocols_handler/node_handler.rs index edb383282cd..8254968c6e8 100644 --- a/swarm/src/protocols_handler/node_handler.rs +++ b/swarm/src/protocols_handler/node_handler.rs @@ -65,6 +65,10 @@ where self.substream_upgrade_protocol_override = version; self } + + pub(crate) fn into_protocols_handler(self) -> TIntoProtoHandler { + self.handler + } } impl IntoConnectionHandler @@ -130,6 +134,12 @@ where substream_upgrade_protocol_override: Option, } +impl NodeHandlerWrapper { + pub(crate) fn into_protocols_handler(self) -> TProtoHandler { + self.handler + } +} + struct SubstreamUpgrade { user_data: Option, timeout: Delay, diff --git a/swarm/src/protocols_handler/select.rs b/swarm/src/protocols_handler/select.rs index b5891c25d1f..cce84928b5e 100644 --- a/swarm/src/protocols_handler/select.rs +++ b/swarm/src/protocols_handler/select.rs @@ -45,6 +45,10 @@ impl IntoProtocolsHandlerSelect { pub(crate) fn new(proto1: TProto1, proto2: TProto2) -> Self { IntoProtocolsHandlerSelect { proto1, proto2 } } + + pub fn into_inner(self) -> (TProto1, TProto2) { + (self.proto1, self.proto2) + } } impl IntoProtocolsHandler for IntoProtocolsHandlerSelect @@ -87,6 +91,10 @@ impl ProtocolsHandlerSelect { pub(crate) fn new(proto1: TProto1, proto2: TProto2) -> Self { ProtocolsHandlerSelect { proto1, proto2 } } + + pub fn into_inner(self) -> (TProto1, TProto2) { + (self.proto1, self.proto2) + } } impl ProtocolsHandler for ProtocolsHandlerSelect diff --git a/swarm/src/test.rs b/swarm/src/test.rs index 5cb05d7baf3..457701899a8 100644 --- a/swarm/src/test.rs +++ b/swarm/src/test.rs @@ -19,7 +19,7 @@ // DEALINGS IN THE SOFTWARE. use crate::{ - IntoProtocolsHandler, NetworkBehaviour, NetworkBehaviourAction, PollParameters, + DialError, IntoProtocolsHandler, NetworkBehaviour, NetworkBehaviourAction, PollParameters, ProtocolsHandler, }; use libp2p_core::{ @@ -45,7 +45,7 @@ where /// The next action to return from `poll`. /// /// An action is only returned once. - pub next_action: Option>, + pub next_action: Option>, } impl MockBehaviour @@ -84,7 +84,7 @@ where &mut self, _: &mut Context, _: &mut impl PollParameters, - ) -> Poll> { + ) -> Poll> { self.next_action.take().map_or(Poll::Pending, Poll::Ready) } } @@ -202,10 +202,16 @@ where self.inner.inject_disconnected(peer); } - fn inject_connection_closed(&mut self, p: &PeerId, c: &ConnectionId, e: &ConnectedPoint) { + fn inject_connection_closed( + &mut self, + p: &PeerId, + c: &ConnectionId, + e: &ConnectedPoint, + handler: ::Handler, + ) { self.inject_connection_closed .push((p.clone(), c.clone(), e.clone())); - self.inner.inject_connection_closed(p, c, e); + self.inner.inject_connection_closed(p, c, e, handler); } fn inject_event( @@ -228,9 +234,14 @@ where self.inner.inject_addr_reach_failure(p, a, e); } - fn inject_dial_failure(&mut self, p: &PeerId) { + fn inject_dial_failure( + &mut self, + p: &PeerId, + handler: Self::ProtocolsHandler, + error: DialError, + ) { self.inject_dial_failure.push(p.clone()); - self.inner.inject_dial_failure(p); + self.inner.inject_dial_failure(p, handler, error); } fn inject_new_listener(&mut self, id: ListenerId) { @@ -268,12 +279,11 @@ where self.inner.inject_listener_closed(l, r); } - fn poll(&mut self, cx: &mut Context, args: &mut impl PollParameters) -> - Poll::Handler as ProtocolsHandler>::InEvent, - Self::OutEvent - >> - { + fn poll( + &mut self, + cx: &mut Context, + args: &mut impl PollParameters, + ) -> Poll> { self.poll += 1; self.inner.poll(cx, args) } diff --git a/swarm/src/toggle.rs b/swarm/src/toggle.rs index 5a86a4824ed..575d4e46809 100644 --- a/swarm/src/toggle.rs +++ b/swarm/src/toggle.rs @@ -24,7 +24,8 @@ use crate::protocols_handler::{ }; use crate::upgrade::{InboundUpgradeSend, OutboundUpgradeSend, SendWrapper}; use crate::{ - NetworkBehaviour, NetworkBehaviourAction, NetworkBehaviourEventProcess, PollParameters, + DialError, NetworkBehaviour, NetworkBehaviourAction, NetworkBehaviourEventProcess, + PollParameters, }; use either::Either; use libp2p_core::{ @@ -113,9 +114,12 @@ where peer_id: &PeerId, connection: &ConnectionId, endpoint: &ConnectedPoint, + handler: ::Handler, ) { if let Some(inner) = self.inner.as_mut() { - inner.inject_connection_closed(peer_id, connection, endpoint) + if let Some(handler) = handler.inner { + inner.inject_connection_closed(peer_id, connection, endpoint, handler) + } } } @@ -153,9 +157,29 @@ where } } - fn inject_dial_failure(&mut self, peer_id: &PeerId) { + fn inject_dial_failure( + &mut self, + peer_id: &PeerId, + handler: Self::ProtocolsHandler, + error: DialError, + ) { + if let Some(inner) = self.inner.as_mut() { + if let Some(handler) = handler.inner { + inner.inject_dial_failure(peer_id, handler, error) + } + } + } + + fn inject_listen_failure( + &mut self, + local_addr: &Multiaddr, + send_back_addr: &Multiaddr, + handler: Self::ProtocolsHandler, + ) { if let Some(inner) = self.inner.as_mut() { - inner.inject_dial_failure(peer_id) + if let Some(handler) = handler.inner { + inner.inject_listen_failure(local_addr, send_back_addr, handler) + } } } @@ -201,11 +225,15 @@ where } } - fn poll(&mut self, cx: &mut Context<'_>, params: &mut impl PollParameters) - -> Poll::Handler as ProtocolsHandler>::InEvent, Self::OutEvent>> - { + fn poll( + &mut self, + cx: &mut Context<'_>, + params: &mut impl PollParameters, + ) -> Poll> { if let Some(inner) = self.inner.as_mut() { - inner.poll(cx, params) + inner + .poll(cx, params) + .map(|action| action.map_handler(|h| ToggleIntoProtoHandler { inner: Some(h) })) } else { Poll::Pending }