From 54ab711eed8cdefc5eedef9f3f96115eadc50e21 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Wed, 7 Dec 2022 13:14:49 +1100 Subject: [PATCH 1/7] feat(swarm)!: box up `ConnectionHandler::Error` --- examples/file-sharing.rs | 11 ++--------- misc/metrics/src/identify.rs | 4 ++-- misc/metrics/src/lib.rs | 4 ++-- misc/metrics/src/swarm.rs | 4 ++-- protocols/rendezvous/tests/harness.rs | 20 ++++++++++---------- swarm/src/connection.rs | 4 ++-- swarm/src/connection/error.rs | 18 ++++++------------ swarm/src/connection/pool.rs | 2 +- swarm/src/connection/pool/task.rs | 2 +- swarm/src/lib.rs | 18 +++++++----------- 10 files changed, 35 insertions(+), 52 deletions(-) diff --git a/examples/file-sharing.rs b/examples/file-sharing.rs index ef7d1fdf57d..67e31f16240 100644 --- a/examples/file-sharing.rs +++ b/examples/file-sharing.rs @@ -208,7 +208,6 @@ mod network { use super::*; use async_trait::async_trait; use futures::channel::{mpsc, oneshot}; - use libp2p::core::either::EitherError; use libp2p::core::upgrade::{read_length_prefixed, write_length_prefixed, ProtocolName}; use libp2p::identity; use libp2p::identity::ed25519; @@ -219,7 +218,7 @@ mod network { ProtocolSupport, RequestId, RequestResponse, RequestResponseCodec, RequestResponseEvent, RequestResponseMessage, ResponseChannel, }; - use libp2p::swarm::{ConnectionHandlerUpgrErr, NetworkBehaviour, Swarm, SwarmEvent}; + use libp2p::swarm::{NetworkBehaviour, Swarm, SwarmEvent}; use std::collections::{hash_map, HashMap, HashSet}; use std::iter; @@ -404,13 +403,7 @@ mod network { } } - async fn handle_event( - &mut self, - event: SwarmEvent< - ComposedEvent, - EitherError, io::Error>, - >, - ) { + async fn handle_event(&mut self, event: SwarmEvent) { match event { SwarmEvent::Behaviour(ComposedEvent::Kademlia( KademliaEvent::OutboundQueryProgressed { diff --git a/misc/metrics/src/identify.rs b/misc/metrics/src/identify.rs index 688c67a6190..a9b89f40734 100644 --- a/misc/metrics/src/identify.rs +++ b/misc/metrics/src/identify.rs @@ -193,8 +193,8 @@ impl super::Recorder for Metrics { } } -impl super::Recorder> for Metrics { - fn record(&self, event: &libp2p_swarm::SwarmEvent) { +impl super::Recorder> for Metrics { + fn record(&self, event: &libp2p_swarm::SwarmEvent) { if let libp2p_swarm::SwarmEvent::ConnectionClosed { peer_id, num_established, diff --git a/misc/metrics/src/lib.rs b/misc/metrics/src/lib.rs index 351887260df..6ebab878e0e 100644 --- a/misc/metrics/src/lib.rs +++ b/misc/metrics/src/lib.rs @@ -142,8 +142,8 @@ impl Recorder for Metrics { } } -impl Recorder> for Metrics { - fn record(&self, event: &libp2p_swarm::SwarmEvent) { +impl Recorder> for Metrics { + fn record(&self, event: &libp2p_swarm::SwarmEvent) { self.swarm.record(event); #[cfg(feature = "identify")] diff --git a/misc/metrics/src/swarm.rs b/misc/metrics/src/swarm.rs index c4fa8712d14..c4e8ed13e40 100644 --- a/misc/metrics/src/swarm.rs +++ b/misc/metrics/src/swarm.rs @@ -139,8 +139,8 @@ impl Metrics { } } -impl super::Recorder> for Metrics { - fn record(&self, event: &libp2p_swarm::SwarmEvent) { +impl super::Recorder> for Metrics { + fn record(&self, event: &libp2p_swarm::SwarmEvent) { match event { libp2p_swarm::SwarmEvent::Behaviour(_) => {} libp2p_swarm::SwarmEvent::ConnectionEstablished { endpoint, .. } => { diff --git a/protocols/rendezvous/tests/harness.rs b/protocols/rendezvous/tests/harness.rs index 523f34c76db..ffa3e3f257e 100644 --- a/protocols/rendezvous/tests/harness.rs +++ b/protocols/rendezvous/tests/harness.rs @@ -64,11 +64,11 @@ fn get_rand_memory_address() -> Multiaddr { .unwrap() } -pub async fn await_event_or_timeout( - swarm: &mut (impl Stream> + FusedStream + Unpin), -) -> SwarmEvent +pub async fn await_event_or_timeout( + swarm: &mut (impl Stream> + FusedStream + Unpin), +) -> SwarmEvent where - SwarmEvent: Debug, + SwarmEvent: Debug, { tokio::time::timeout( Duration::from_secs(30), @@ -80,13 +80,13 @@ where .expect("network behaviour to emit an event within 30 seconds") } -pub async fn await_events_or_timeout( - swarm_1: &mut (impl Stream> + FusedStream + Unpin), - swarm_2: &mut (impl Stream> + FusedStream + Unpin), -) -> (SwarmEvent, SwarmEvent) +pub async fn await_events_or_timeout( + swarm_1: &mut (impl Stream> + FusedStream + Unpin), + swarm_2: &mut (impl Stream> + FusedStream + Unpin), +) -> (SwarmEvent, SwarmEvent) where - SwarmEvent: Debug, - SwarmEvent: Debug, + SwarmEvent: Debug, + SwarmEvent: Debug, { tokio::time::timeout( Duration::from_secs(30), diff --git a/swarm/src/connection.rs b/swarm/src/connection.rs index 272f78f3a6b..3e907de9e15 100644 --- a/swarm/src/connection.rs +++ b/swarm/src/connection.rs @@ -165,7 +165,7 @@ where pub fn poll( self: Pin<&mut Self>, cx: &mut Context<'_>, - ) -> Poll, ConnectionError>> { + ) -> Poll, ConnectionError>> { let Self { requested_substreams, muxing, @@ -202,7 +202,7 @@ where return Poll::Ready(Ok(Event::Handler(event))); } Poll::Ready(ConnectionHandlerEvent::Close(err)) => { - return Poll::Ready(Err(ConnectionError::Handler(err))); + return Poll::Ready(Err(ConnectionError::Handler(Box::new(err)))); } } diff --git a/swarm/src/connection/error.rs b/swarm/src/connection/error.rs index db51ebca874..56596182fa7 100644 --- a/swarm/src/connection/error.rs +++ b/swarm/src/connection/error.rs @@ -25,7 +25,7 @@ use std::{fmt, io}; /// Errors that can occur in the context of an established `Connection`. #[derive(Debug)] -pub enum ConnectionError { +pub enum ConnectionError { /// An I/O error occurred on the connection. // TODO: Eventually this should also be a custom error? IO(io::Error), @@ -34,13 +34,10 @@ pub enum ConnectionError { KeepAliveTimeout, /// The connection handler produced an error. - Handler(THandlerErr), + Handler(Box), } -impl fmt::Display for ConnectionError -where - THandlerErr: fmt::Display, -{ +impl fmt::Display for ConnectionError { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { ConnectionError::IO(err) => write!(f, "Connection error: I/O error: {}", err), @@ -52,20 +49,17 @@ where } } -impl std::error::Error for ConnectionError -where - THandlerErr: std::error::Error + 'static, -{ +impl std::error::Error for ConnectionError { fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { match self { ConnectionError::IO(err) => Some(err), ConnectionError::KeepAliveTimeout => None, - ConnectionError::Handler(err) => Some(err), + ConnectionError::Handler(err) => Some(err.as_ref()), } } } -impl From for ConnectionError { +impl From for ConnectionError { fn from(error: io::Error) -> Self { ConnectionError::IO(error) } diff --git a/swarm/src/connection/pool.rs b/swarm/src/connection/pool.rs index 8729b2e36e1..fcea609dc4f 100644 --- a/swarm/src/connection/pool.rs +++ b/swarm/src/connection/pool.rs @@ -256,7 +256,7 @@ where connected: Connected, /// The error that occurred, if any. If `None`, the connection /// was closed by the local peer. - error: Option::Error>>, + error: Option, /// The remaining established connections to the same peer. remaining_established_connection_ids: Vec, handler: THandler::Handler, diff --git a/swarm/src/connection/pool/task.rs b/swarm/src/connection/pool/task.rs index 8e1129d8cae..b6273509a57 100644 --- a/swarm/src/connection/pool/task.rs +++ b/swarm/src/connection/pool/task.rs @@ -92,7 +92,7 @@ pub enum EstablishedConnectionEvent { Closed { id: ConnectionId, peer_id: PeerId, - error: Option>, + error: Option, handler: THandler, }, } diff --git a/swarm/src/lib.rs b/swarm/src/lib.rs index 0c5cadcf021..b1dc3d44672 100644 --- a/swarm/src/lib.rs +++ b/swarm/src/lib.rs @@ -169,13 +169,9 @@ type THandlerInEvent = type THandlerOutEvent = < as IntoConnectionHandler>::Handler as ConnectionHandler>::OutEvent; -/// Custom error that can be produced by the [`ConnectionHandler`] of the [`NetworkBehaviour`]. -type THandlerErr = - < as IntoConnectionHandler>::Handler as ConnectionHandler>::Error; - /// Event generated by the `Swarm`. #[derive(Debug)] -pub enum SwarmEvent { +pub enum SwarmEvent { /// Event generated by the `NetworkBehaviour`. Behaviour(TBehaviourOutEvent), /// A connection to the given peer has been opened. @@ -203,7 +199,7 @@ pub enum SwarmEvent { num_established: u32, /// Reason for the disconnection, if it was not a successful /// active close. - cause: Option>, + cause: Option, }, /// A new connection arrived on a listener and is in the process of protocol negotiation. /// @@ -813,7 +809,7 @@ where fn handle_pool_event( &mut self, event: PoolEvent, transport::Boxed<(PeerId, StreamMuxerBox)>>, - ) -> Option>> { + ) -> Option> { match event { PoolEvent::ConnectionEstablished { peer_id, @@ -986,7 +982,7 @@ where as Transport>::ListenerUpgrade, io::Error, >, - ) -> Option>> { + ) -> Option> { match event { TransportEvent::Incoming { listener_id: _, @@ -1090,7 +1086,7 @@ where fn handle_behaviour_event( &mut self, event: NetworkBehaviourAction, - ) -> Option>> { + ) -> Option> { match event { NetworkBehaviourAction::GenerateEvent(event) => { return Some(SwarmEvent::Behaviour(event)) @@ -1176,7 +1172,7 @@ where fn poll_next_event( mut self: Pin<&mut Self>, cx: &mut Context<'_>, - ) -> Poll>> { + ) -> Poll> { // We use a `this` variable because the compiler can't mutably borrow multiple times // across a `Deref`. let this = &mut *self; @@ -1371,7 +1367,7 @@ impl Stream for Swarm where TBehaviour: NetworkBehaviour, { - type Item = SwarmEvent, THandlerErr>; + type Item = SwarmEvent>; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { self.as_mut().poll_next_event(cx).map(Some) From 2893ec51ef303bea816d830620c4eb1374e97148 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Wed, 7 Dec 2022 13:35:47 +1100 Subject: [PATCH 2/7] Remove `ConnectionHandler::Error` associated type Now that we are always boxing the error, it makes more sense to have handlers directly return the most specific error without additional wrapping. --- protocols/dcutr/src/handler/direct.rs | 11 +--- protocols/dcutr/src/handler/relayed.rs | 63 ++++++++++--------- protocols/gossipsub/src/handler.rs | 34 ++++------ protocols/identify/src/handler.rs | 14 +---- protocols/kad/src/handler.rs | 13 +--- protocols/ping/src/handler.rs | 6 +- protocols/relay/src/v2/client/handler.rs | 16 +---- protocols/relay/src/v2/relay/handler.rs | 16 +---- protocols/rendezvous/src/substream_handler.rs | 11 +--- protocols/request-response/src/handler.rs | 6 +- swarm/src/behaviour/toggle.rs | 11 +--- swarm/src/connection.rs | 10 +-- swarm/src/dummy.rs | 11 +--- swarm/src/handler.rs | 58 ++++++----------- swarm/src/handler/either.rs | 13 +--- swarm/src/handler/map_in.rs | 11 +--- swarm/src/handler/map_out.rs | 11 +--- swarm/src/handler/multi.rs | 11 +--- swarm/src/handler/one_shot.rs | 13 +--- swarm/src/handler/pending.rs | 11 +--- swarm/src/handler/select.rs | 15 ++--- swarm/src/keep_alive.rs | 11 +--- 22 files changed, 108 insertions(+), 268 deletions(-) diff --git a/protocols/dcutr/src/handler/direct.rs b/protocols/dcutr/src/handler/direct.rs index f0fdf5930ac..5bcb7e8a2dc 100644 --- a/protocols/dcutr/src/handler/direct.rs +++ b/protocols/dcutr/src/handler/direct.rs @@ -52,7 +52,6 @@ impl Handler { impl ConnectionHandler for Handler { type InEvent = void::Void; type OutEvent = Event; - type Error = ConnectionHandlerUpgrErr; type InboundProtocol = DeniedUpgrade; type OutboundProtocol = DeniedUpgrade; type OutboundOpenInfo = Void; @@ -94,14 +93,8 @@ impl ConnectionHandler for Handler { fn poll( &mut self, _: &mut Context<'_>, - ) -> Poll< - ConnectionHandlerEvent< - Self::OutboundProtocol, - Self::OutboundOpenInfo, - Self::OutEvent, - Self::Error, - >, - > { + ) -> Poll> + { if !self.reported { self.reported = true; return Poll::Ready(ConnectionHandlerEvent::Custom( diff --git a/protocols/dcutr/src/handler/relayed.rs b/protocols/dcutr/src/handler/relayed.rs index 301f2ee3d82..12cd7793b4c 100644 --- a/protocols/dcutr/src/handler/relayed.rs +++ b/protocols/dcutr/src/handler/relayed.rs @@ -143,7 +143,6 @@ pub struct Handler { ::OutboundProtocol, ::OutboundOpenInfo, ::OutEvent, - ::Error, >, >, /// Inbound connect, accepted by the behaviour, pending completion. @@ -247,15 +246,18 @@ impl Handler { }, )); } - _ => { - // Anything else is considered a fatal error or misbehaviour of - // the remote peer and results in closing the connection. - self.pending_error = Some(error.map_upgrade_err(|e| { - e.map_err(|e| match e { - EitherError::A(e) => EitherError::A(e), - EitherError::B(v) => void::unreachable(v), - }) - })); + ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select( + NegotiationError::ProtocolError(p), + )) => { + self.queued_events + .push_back(ConnectionHandlerEvent::close(p)); + } + ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Apply(EitherError::A(e))) => { + self.queued_events + .push_back(ConnectionHandlerEvent::close(e)); + } + ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Apply(EitherError::B(never))) => { + void::unreachable(never) } } } @@ -289,10 +291,22 @@ impl Handler { }, )); } - _ => { - // Anything else is considered a fatal error or misbehaviour of - // the remote peer and results in closing the connection. - self.pending_error = Some(error.map_upgrade_err(|e| e.map_err(EitherError::B))); + + // Anything else is considered a fatal error or misbehaviour of + // the remote peer and results in closing the connection. + e @ ConnectionHandlerUpgrErr::Timer => { + self.queued_events + .push_back(ConnectionHandlerEvent::close(e)); + } + ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select( + NegotiationError::ProtocolError(p), + )) => { + self.queued_events + .push_back(ConnectionHandlerEvent::close(p)); + } + ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Apply(e)) => { + self.queued_events + .push_back(ConnectionHandlerEvent::close(e)); } } } @@ -301,9 +315,6 @@ impl Handler { impl ConnectionHandler for Handler { type InEvent = Command; type OutEvent = Event; - type Error = ConnectionHandlerUpgrErr< - EitherError, - >; type InboundProtocol = upgrade::EitherUpgrade; type OutboundProtocol = protocol::outbound::Upgrade; type OutboundOpenInfo = u8; // Number of upgrade attempts. @@ -364,18 +375,12 @@ impl ConnectionHandler for Handler { fn poll( &mut self, cx: &mut Context<'_>, - ) -> Poll< - ConnectionHandlerEvent< - Self::OutboundProtocol, - Self::OutboundOpenInfo, - Self::OutEvent, - Self::Error, - >, - > { + ) -> Poll> + { // Check for a pending (fatal) error. if let Some(err) = self.pending_error.take() { // The handler will not be polled again by the `Swarm`. - return Poll::Ready(ConnectionHandlerEvent::Close(err)); + return Poll::Ready(ConnectionHandlerEvent::close(err)); } // Return queued events. @@ -391,11 +396,7 @@ impl ConnectionHandler for Handler { Event::InboundConnectNegotiated(addresses), )); } - Err(e) => { - return Poll::Ready(ConnectionHandlerEvent::Close( - ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Apply(EitherError::A(e))), - )) - } + Err(e) => return Poll::Ready(ConnectionHandlerEvent::close(e)), } } diff --git a/protocols/gossipsub/src/handler.rs b/protocols/gossipsub/src/handler.rs index 68bcf912975..660cb4810f6 100644 --- a/protocols/gossipsub/src/handler.rs +++ b/protocols/gossipsub/src/handler.rs @@ -248,7 +248,6 @@ impl GossipsubHandler { impl ConnectionHandler for GossipsubHandler { type InEvent = GossipsubHandlerIn; type OutEvent = HandlerEvent; - type Error = GossipsubHandlerError; type InboundOpenInfo = (); type InboundProtocol = ProtocolConfig; type OutboundOpenInfo = crate::rpc_proto::Rpc; @@ -283,14 +282,8 @@ impl ConnectionHandler for GossipsubHandler { fn poll( &mut self, cx: &mut Context<'_>, - ) -> Poll< - ConnectionHandlerEvent< - Self::OutboundProtocol, - Self::OutboundOpenInfo, - Self::OutEvent, - Self::Error, - >, - > { + ) -> Poll> + { // Handle any upgrade errors if let Some(error) = self.upgrade_errors.pop_front() { let reported_error = match error { @@ -327,7 +320,7 @@ impl ConnectionHandler for GossipsubHandler { // If there was a fatal error, close the connection. if let Some(error) = reported_error { - return Poll::Ready(ConnectionHandlerEvent::Close(error)); + return Poll::Ready(ConnectionHandlerEvent::close(error)); } } @@ -342,7 +335,7 @@ impl ConnectionHandler for GossipsubHandler { if self.inbound_substreams_created > MAX_SUBSTREAM_CREATION { // Too many inbound substreams have been created, end the connection. - return Poll::Ready(ConnectionHandlerEvent::Close( + return Poll::Ready(ConnectionHandlerEvent::close( GossipsubHandlerError::MaxInboundSubstreams, )); } @@ -353,7 +346,7 @@ impl ConnectionHandler for GossipsubHandler { && !self.outbound_substream_establishing { if self.outbound_substreams_created >= MAX_SUBSTREAM_CREATION { - return Poll::Ready(ConnectionHandlerEvent::Close( + return Poll::Ready(ConnectionHandlerEvent::close( GossipsubHandlerError::MaxOutboundSubstreams, )); } @@ -478,13 +471,13 @@ impl ConnectionHandler for GossipsubHandler { } Err(e) => { error!("Error sending message: {}", e); - return Poll::Ready(ConnectionHandlerEvent::Close(e)); + return Poll::Ready(ConnectionHandlerEvent::close(e)); } } } Poll::Ready(Err(e)) => { error!("Outbound substream error while sending output: {:?}", e); - return Poll::Ready(ConnectionHandlerEvent::Close(e)); + return Poll::Ready(ConnectionHandlerEvent::close(e)); } Poll::Pending => { self.keep_alive = KeepAlive::Yes; @@ -506,7 +499,7 @@ impl ConnectionHandler for GossipsubHandler { Some(OutboundSubstreamState::WaitingOutput(substream)) } Poll::Ready(Err(e)) => { - return Poll::Ready(ConnectionHandlerEvent::Close(e)) + return Poll::Ready(ConnectionHandlerEvent::close(e)) } Poll::Pending => { self.keep_alive = KeepAlive::Yes; @@ -528,13 +521,10 @@ impl ConnectionHandler for GossipsubHandler { } Poll::Ready(Err(e)) => { warn!("Outbound substream error while closing: {:?}", e); - return Poll::Ready(ConnectionHandlerEvent::Close( - io::Error::new( - io::ErrorKind::BrokenPipe, - "Failed to close outbound substream", - ) - .into(), - )); + return Poll::Ready(ConnectionHandlerEvent::close(io::Error::new( + io::ErrorKind::BrokenPipe, + "Failed to close outbound substream", + ))); } Poll::Pending => { self.keep_alive = KeepAlive::No; diff --git a/protocols/identify/src/handler.rs b/protocols/identify/src/handler.rs index 0de54f0a006..fb65973ac8e 100644 --- a/protocols/identify/src/handler.rs +++ b/protocols/identify/src/handler.rs @@ -36,7 +36,7 @@ use libp2p_swarm::{ }; use log::warn; use smallvec::SmallVec; -use std::{io, pin::Pin, task::Context, task::Poll, time::Duration}; +use std::{pin::Pin, task::Context, task::Poll, time::Duration}; pub struct Proto { initial_delay: Duration, @@ -74,12 +74,7 @@ pub struct Handler { inbound_identify_push: Option>>, /// Pending events to yield. events: SmallVec< - [ConnectionHandlerEvent< - EitherUpgrade>, - (), - Event, - io::Error, - >; 4], + [ConnectionHandlerEvent>, (), Event>; 4], >, /// Future that fires when we need to identify the node again. @@ -197,7 +192,6 @@ impl Handler { impl ConnectionHandler for Handler { type InEvent = Push; type OutEvent = Event; - type Error = io::Error; type InboundProtocol = SelectUpgrade>; type OutboundProtocol = EitherUpgrade>; type OutboundOpenInfo = (); @@ -224,9 +218,7 @@ impl ConnectionHandler for Handler { fn poll( &mut self, cx: &mut Context<'_>, - ) -> Poll< - ConnectionHandlerEvent, - > { + ) -> Poll> { if !self.events.is_empty() { return Poll::Ready(self.events.remove(0)); } diff --git a/protocols/kad/src/handler.rs b/protocols/kad/src/handler.rs index 345762d4b54..dd53d50ca60 100644 --- a/protocols/kad/src/handler.rs +++ b/protocols/kad/src/handler.rs @@ -632,7 +632,6 @@ where { type InEvent = KademliaHandlerIn; type OutEvent = KademliaHandlerEvent; - type Error = io::Error; // TODO: better error type? type InboundProtocol = upgrade::EitherUpgrade; type OutboundProtocol = KademliaProtocolConfig; // Message of the request to send to the remote, and user data if we expect an answer. @@ -749,14 +748,8 @@ where fn poll( &mut self, cx: &mut Context<'_>, - ) -> Poll< - ConnectionHandlerEvent< - Self::OutboundProtocol, - Self::OutboundOpenInfo, - Self::OutEvent, - Self::Error, - >, - > { + ) -> Poll> + { if let ProtocolStatus::Confirmed = self.protocol_status { self.protocol_status = ProtocolStatus::Reported; return Poll::Ready(ConnectionHandlerEvent::Custom( @@ -844,7 +837,6 @@ where KademliaProtocolConfig, (KadRequestMsg, Option), KademliaHandlerEvent, - io::Error, >; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { @@ -986,7 +978,6 @@ where KademliaProtocolConfig, (KadRequestMsg, Option), KademliaHandlerEvent, - io::Error, >; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { diff --git a/protocols/ping/src/handler.rs b/protocols/ping/src/handler.rs index 67d4b66c0a0..1cc287e0ad5 100644 --- a/protocols/ping/src/handler.rs +++ b/protocols/ping/src/handler.rs @@ -256,7 +256,6 @@ impl Handler { impl ConnectionHandler for Handler { type InEvent = Void; type OutEvent = crate::Result; - type Error = Failure; type InboundProtocol = ReadyUpgrade<&'static [u8]>; type OutboundProtocol = ReadyUpgrade<&'static [u8]>; type OutboundOpenInfo = (); @@ -279,8 +278,7 @@ impl ConnectionHandler for Handler { fn poll( &mut self, cx: &mut Context<'_>, - ) -> Poll, (), crate::Result, Self::Error>> - { + ) -> Poll, (), crate::Result>> { match self.state { State::Inactive { reported: true } => { return Poll::Pending; // nothing to do on this connection @@ -325,7 +323,7 @@ impl ConnectionHandler for Handler { if self.failures > 1 || self.config.max_failures.get() > 1 { if self.failures >= self.config.max_failures.get() { log::debug!("Too many failures ({}). Closing connection.", self.failures); - return Poll::Ready(ConnectionHandlerEvent::Close(error)); + return Poll::Ready(ConnectionHandlerEvent::close(error)); } return Poll::Ready(ConnectionHandlerEvent::Custom(Err(error))); diff --git a/protocols/relay/src/v2/client/handler.rs b/protocols/relay/src/v2/client/handler.rs index 5d01cf9dbce..961d26d06df 100644 --- a/protocols/relay/src/v2/client/handler.rs +++ b/protocols/relay/src/v2/client/handler.rs @@ -186,7 +186,6 @@ pub struct Handler { ::OutboundProtocol, ::OutboundOpenInfo, ::OutEvent, - ::Error, >, >, @@ -509,9 +508,6 @@ impl Handler { impl ConnectionHandler for Handler { type InEvent = In; type OutEvent = Event; - type Error = ConnectionHandlerUpgrErr< - EitherError, - >; type InboundProtocol = inbound_stop::Upgrade; type OutboundProtocol = outbound_hop::Upgrade; type OutboundOpenInfo = OutboundOpenInfo; @@ -554,18 +550,12 @@ impl ConnectionHandler for Handler { fn poll( &mut self, cx: &mut Context<'_>, - ) -> Poll< - ConnectionHandlerEvent< - Self::OutboundProtocol, - Self::OutboundOpenInfo, - Self::OutEvent, - Self::Error, - >, - > { + ) -> Poll> + { // Check for a pending (fatal) error. if let Some(err) = self.pending_error.take() { // The handler will not be polled again by the `Swarm`. - return Poll::Ready(ConnectionHandlerEvent::Close(err)); + return Poll::Ready(ConnectionHandlerEvent::close(err)); } // Return queued events. diff --git a/protocols/relay/src/v2/relay/handler.rs b/protocols/relay/src/v2/relay/handler.rs index ef8b40755b2..6d06ee5a3c8 100644 --- a/protocols/relay/src/v2/relay/handler.rs +++ b/protocols/relay/src/v2/relay/handler.rs @@ -390,7 +390,6 @@ pub struct Handler { ::OutboundProtocol, ::OutboundOpenInfo, ::OutEvent, - ::Error, >, >, @@ -623,9 +622,6 @@ type Futures = FuturesUnordered>; impl ConnectionHandler for Handler { type InEvent = In; type OutEvent = Event; - type Error = ConnectionHandlerUpgrErr< - EitherError, - >; type InboundProtocol = inbound_hop::Upgrade; type OutboundProtocol = outbound_stop::Upgrade; type OutboundOpenInfo = OutboundOpenInfo; @@ -743,18 +739,12 @@ impl ConnectionHandler for Handler { fn poll( &mut self, cx: &mut Context<'_>, - ) -> Poll< - ConnectionHandlerEvent< - Self::OutboundProtocol, - Self::OutboundOpenInfo, - Self::OutEvent, - Self::Error, - >, - > { + ) -> Poll> + { // Check for a pending (fatal) error. if let Some(err) = self.pending_error.take() { // The handler will not be polled again by the `Swarm`. - return Poll::Ready(ConnectionHandlerEvent::Close(err)); + return Poll::Ready(ConnectionHandlerEvent::close(err)); } // Return queued events. diff --git a/protocols/rendezvous/src/substream_handler.rs b/protocols/rendezvous/src/substream_handler.rs index f57dfded6c9..6556f20bf36 100644 --- a/protocols/rendezvous/src/substream_handler.rs +++ b/protocols/rendezvous/src/substream_handler.rs @@ -357,7 +357,6 @@ where { type InEvent = InEvent; type OutEvent = OutEvent; - type Error = Void; type InboundProtocol = PassthroughProtocol; type OutboundProtocol = PassthroughProtocol; type InboundOpenInfo = (); @@ -447,14 +446,8 @@ where fn poll( &mut self, cx: &mut Context<'_>, - ) -> Poll< - ConnectionHandlerEvent< - Self::OutboundProtocol, - Self::OutboundOpenInfo, - Self::OutEvent, - Self::Error, - >, - > { + ) -> Poll> + { if let Some(open_info) = self.new_substreams.pop_front() { return Poll::Ready(ConnectionHandlerEvent::OutboundSubstreamRequest { protocol: TOutboundSubstreamHandler::upgrade(open_info), diff --git a/protocols/request-response/src/handler.rs b/protocols/request-response/src/handler.rs index 317da87dc0a..c21b322ecc0 100644 --- a/protocols/request-response/src/handler.rs +++ b/protocols/request-response/src/handler.rs @@ -277,7 +277,6 @@ where { type InEvent = RequestProtocol; type OutEvent = RequestResponseHandlerEvent; - type Error = ConnectionHandlerUpgrErr; type InboundProtocol = ResponseProtocol; type OutboundProtocol = RequestProtocol; type OutboundOpenInfo = RequestId; @@ -329,12 +328,11 @@ where fn poll( &mut self, cx: &mut Context<'_>, - ) -> Poll, RequestId, Self::OutEvent, Self::Error>> - { + ) -> Poll, RequestId, Self::OutEvent>> { // Check for a pending (fatal) error. if let Some(err) = self.pending_error.take() { // The handler will not be polled again by the `Swarm`. - return Poll::Ready(ConnectionHandlerEvent::Close(err)); + return Poll::Ready(ConnectionHandlerEvent::close(err)); } // Drain pending events. diff --git a/swarm/src/behaviour/toggle.rs b/swarm/src/behaviour/toggle.rs index 81255a40274..b1097005ee7 100644 --- a/swarm/src/behaviour/toggle.rs +++ b/swarm/src/behaviour/toggle.rs @@ -229,7 +229,6 @@ where { type InEvent = TInner::InEvent; type OutEvent = TInner::OutEvent; - type Error = TInner::Error; type InboundProtocol = EitherUpgrade, SendWrapper>; type OutboundProtocol = TInner::OutboundProtocol; @@ -268,14 +267,8 @@ where fn poll( &mut self, cx: &mut Context<'_>, - ) -> Poll< - ConnectionHandlerEvent< - Self::OutboundProtocol, - Self::OutboundOpenInfo, - Self::OutEvent, - Self::Error, - >, - > { + ) -> Poll> + { if let Some(inner) = self.inner.as_mut() { inner.poll(cx) } else { diff --git a/swarm/src/connection.rs b/swarm/src/connection.rs index 3e907de9e15..b71c5f53896 100644 --- a/swarm/src/connection.rs +++ b/swarm/src/connection.rs @@ -202,7 +202,7 @@ where return Poll::Ready(Ok(Event::Handler(event))); } Poll::Ready(ConnectionHandlerEvent::Close(err)) => { - return Poll::Ready(Err(ConnectionError::Handler(Box::new(err)))); + return Poll::Ready(Err(ConnectionError::Handler(err))); } } @@ -745,7 +745,6 @@ mod tests { impl ConnectionHandler for MockConnectionHandler { type InEvent = Void; type OutEvent = Void; - type Error = Void; type InboundProtocol = DeniedUpgrade; type OutboundProtocol = DeniedUpgrade; type InboundOpenInfo = (); @@ -793,12 +792,7 @@ mod tests { &mut self, _: &mut Context<'_>, ) -> Poll< - ConnectionHandlerEvent< - Self::OutboundProtocol, - Self::OutboundOpenInfo, - Self::OutEvent, - Self::Error, - >, + ConnectionHandlerEvent, > { if self.outbound_requested { self.outbound_requested = false; diff --git a/swarm/src/dummy.rs b/swarm/src/dummy.rs index 4ec58581c2e..bb8d4a22151 100644 --- a/swarm/src/dummy.rs +++ b/swarm/src/dummy.rs @@ -58,7 +58,6 @@ pub struct ConnectionHandler; impl crate::handler::ConnectionHandler for ConnectionHandler { type InEvent = Void; type OutEvent = Void; - type Error = Void; type InboundProtocol = DeniedUpgrade; type OutboundProtocol = DeniedUpgrade; type InboundOpenInfo = (); @@ -79,14 +78,8 @@ impl crate::handler::ConnectionHandler for ConnectionHandler { fn poll( &mut self, _: &mut Context<'_>, - ) -> Poll< - ConnectionHandlerEvent< - Self::OutboundProtocol, - Self::OutboundOpenInfo, - Self::OutEvent, - Self::Error, - >, - > { + ) -> Poll> + { Poll::Pending } diff --git a/swarm/src/handler.rs b/swarm/src/handler.rs index 8d34509c085..b434af381b6 100644 --- a/swarm/src/handler.rs +++ b/swarm/src/handler.rs @@ -96,8 +96,6 @@ pub trait ConnectionHandler: Send + 'static { type InEvent: fmt::Debug + Send + 'static; /// Custom event that can be produced by the handler and that will be returned to the outside. type OutEvent: fmt::Debug + Send + 'static; - /// The type of errors returned by [`ConnectionHandler::poll`]. - type Error: error::Error + fmt::Debug + Send + 'static; /// The inbound upgrade for the protocol(s) used by the handler. type InboundProtocol: InboundUpgradeSend; /// The outbound upgrade for the protocol(s) used by the handler. @@ -238,14 +236,7 @@ pub trait ConnectionHandler: Send + 'static { fn poll( &mut self, cx: &mut Context<'_>, - ) -> Poll< - ConnectionHandlerEvent< - Self::OutboundProtocol, - Self::OutboundOpenInfo, - Self::OutEvent, - Self::Error, - >, - >; + ) -> Poll>; /// Adds a closure that turns the input event into something else. fn map_in_event(self, map: TMap) -> MapInEvent @@ -426,8 +417,8 @@ impl SubstreamProtocol { } /// Event produced by a handler. -#[derive(Debug, Copy, Clone, PartialEq, Eq)] -pub enum ConnectionHandlerEvent { +#[derive(Debug)] +pub enum ConnectionHandlerEvent { /// Request a new outbound substream to be opened with the remote. OutboundSubstreamRequest { /// The protocol(s) to apply on the substream. @@ -442,22 +433,33 @@ pub enum ConnectionHandlerEvent), /// Other event. Custom(TCustom), } /// Event produced by a handler. -impl - ConnectionHandlerEvent +impl + ConnectionHandlerEvent { + /// Close the connection for the specified reason. + /// + /// You are encouraged to supply a downcast-friendly type here IF you later want to inspect, + /// why a connection was closed. This works best if you define a custom error type for your + /// handler and directly supply instances of this error to this function. This will allow + /// you and other users to directly downcast to a type from your module, thus giving them a + /// similar experience as if we were to track this with a type parameter. + pub fn close(error: impl error::Error + Send + 'static) -> Self { + Self::Close(Box::new(error)) + } + /// If this is an `OutboundSubstreamRequest`, maps the `info` member from a /// `TOutboundOpenInfo` to something else. pub fn map_outbound_open_info( self, map: F, - ) -> ConnectionHandlerEvent + ) -> ConnectionHandlerEvent where F: FnOnce(TOutboundOpenInfo) -> I, { @@ -474,10 +476,7 @@ impl /// If this is an `OutboundSubstreamRequest`, maps the protocol (`TConnectionUpgrade`) /// to something else. - pub fn map_protocol( - self, - map: F, - ) -> ConnectionHandlerEvent + pub fn map_protocol(self, map: F) -> ConnectionHandlerEvent where F: FnOnce(TConnectionUpgrade) -> I, { @@ -496,7 +495,7 @@ impl pub fn map_custom( self, map: F, - ) -> ConnectionHandlerEvent + ) -> ConnectionHandlerEvent where F: FnOnce(TCustom) -> I, { @@ -508,23 +507,6 @@ impl ConnectionHandlerEvent::Close(val) => ConnectionHandlerEvent::Close(val), } } - - /// If this is a `Close` event, maps the content to something else. - pub fn map_close( - self, - map: F, - ) -> ConnectionHandlerEvent - where - F: FnOnce(TErr) -> I, - { - match self { - ConnectionHandlerEvent::OutboundSubstreamRequest { protocol } => { - ConnectionHandlerEvent::OutboundSubstreamRequest { protocol } - } - ConnectionHandlerEvent::Custom(val) => ConnectionHandlerEvent::Custom(val), - ConnectionHandlerEvent::Close(val) => ConnectionHandlerEvent::Close(map(val)), - } - } } /// Error that can happen on an outbound substream opening attempt. diff --git a/swarm/src/handler/either.rs b/swarm/src/handler/either.rs index e6d16ed1133..dba1eb2f4f4 100644 --- a/swarm/src/handler/either.rs +++ b/swarm/src/handler/either.rs @@ -99,7 +99,6 @@ where { type InEvent = Either; type OutEvent = Either; - type Error = Either; type InboundProtocol = EitherUpgrade, SendWrapper>; type OutboundProtocol = @@ -140,23 +139,15 @@ where fn poll( &mut self, cx: &mut Context<'_>, - ) -> Poll< - ConnectionHandlerEvent< - Self::OutboundProtocol, - Self::OutboundOpenInfo, - Self::OutEvent, - Self::Error, - >, - > { + ) -> Poll> + { let event = match self { Either::Left(handler) => futures::ready!(handler.poll(cx)) .map_custom(Either::Left) - .map_close(Either::Left) .map_protocol(|p| EitherUpgrade::A(SendWrapper(p))) .map_outbound_open_info(Either::Left), Either::Right(handler) => futures::ready!(handler.poll(cx)) .map_custom(Either::Right) - .map_close(Either::Right) .map_protocol(|p| EitherUpgrade::B(SendWrapper(p))) .map_outbound_open_info(Either::Right), }; diff --git a/swarm/src/handler/map_in.rs b/swarm/src/handler/map_in.rs index 326a6f8f4f9..f3bf633ce5c 100644 --- a/swarm/src/handler/map_in.rs +++ b/swarm/src/handler/map_in.rs @@ -53,7 +53,6 @@ where { type InEvent = TNewIn; type OutEvent = TConnectionHandler::OutEvent; - type Error = TConnectionHandler::Error; type InboundProtocol = TConnectionHandler::InboundProtocol; type OutboundProtocol = TConnectionHandler::OutboundProtocol; type InboundOpenInfo = TConnectionHandler::InboundOpenInfo; @@ -77,14 +76,8 @@ where fn poll( &mut self, cx: &mut Context<'_>, - ) -> Poll< - ConnectionHandlerEvent< - Self::OutboundProtocol, - Self::OutboundOpenInfo, - Self::OutEvent, - Self::Error, - >, - > { + ) -> Poll> + { self.inner.poll(cx) } diff --git a/swarm/src/handler/map_out.rs b/swarm/src/handler/map_out.rs index 87306dc48c6..a3f1abf8e7e 100644 --- a/swarm/src/handler/map_out.rs +++ b/swarm/src/handler/map_out.rs @@ -48,7 +48,6 @@ where { type InEvent = TConnectionHandler::InEvent; type OutEvent = TNewOut; - type Error = TConnectionHandler::Error; type InboundProtocol = TConnectionHandler::InboundProtocol; type OutboundProtocol = TConnectionHandler::OutboundProtocol; type InboundOpenInfo = TConnectionHandler::InboundOpenInfo; @@ -70,14 +69,8 @@ where fn poll( &mut self, cx: &mut Context<'_>, - ) -> Poll< - ConnectionHandlerEvent< - Self::OutboundProtocol, - Self::OutboundOpenInfo, - Self::OutEvent, - Self::Error, - >, - > { + ) -> Poll> + { self.inner.poll(cx).map(|ev| match ev { ConnectionHandlerEvent::Custom(ev) => ConnectionHandlerEvent::Custom((self.map)(ev)), ConnectionHandlerEvent::Close(err) => ConnectionHandlerEvent::Close(err), diff --git a/swarm/src/handler/multi.rs b/swarm/src/handler/multi.rs index c1f937c1cb3..e48e538abf9 100644 --- a/swarm/src/handler/multi.rs +++ b/swarm/src/handler/multi.rs @@ -93,7 +93,6 @@ where { type InEvent = (K, ::InEvent); type OutEvent = (K, ::OutEvent); - type Error = ::Error; type InboundProtocol = Upgrade::InboundProtocol>; type OutboundProtocol = ::OutboundProtocol; type InboundOpenInfo = Info::InboundOpenInfo>; @@ -293,14 +292,8 @@ where fn poll( &mut self, cx: &mut Context<'_>, - ) -> Poll< - ConnectionHandlerEvent< - Self::OutboundProtocol, - Self::OutboundOpenInfo, - Self::OutEvent, - Self::Error, - >, - > { + ) -> Poll> + { // Calling `gen_range(0, 0)` (see below) would panic, so we have return early to avoid // that situation. if self.handlers.is_empty() { diff --git a/swarm/src/handler/one_shot.rs b/swarm/src/handler/one_shot.rs index e8cd03ebed8..82aa0ad36bf 100644 --- a/swarm/src/handler/one_shot.rs +++ b/swarm/src/handler/one_shot.rs @@ -123,7 +123,6 @@ where { type InEvent = TOutbound; type OutEvent = TEvent; - type Error = ConnectionHandlerUpgrErr<::Error>; type InboundProtocol = TInbound; type OutboundProtocol = TOutbound; type OutboundOpenInfo = (); @@ -144,16 +143,10 @@ where fn poll( &mut self, _: &mut Context<'_>, - ) -> Poll< - ConnectionHandlerEvent< - Self::OutboundProtocol, - Self::OutboundOpenInfo, - Self::OutEvent, - Self::Error, - >, - > { + ) -> Poll> + { if let Some(err) = self.pending_error.take() { - return Poll::Ready(ConnectionHandlerEvent::Close(err)); + return Poll::Ready(ConnectionHandlerEvent::close(err)); } if !self.events_out.is_empty() { diff --git a/swarm/src/handler/pending.rs b/swarm/src/handler/pending.rs index 2efa949dd71..99a88732033 100644 --- a/swarm/src/handler/pending.rs +++ b/swarm/src/handler/pending.rs @@ -42,7 +42,6 @@ impl PendingConnectionHandler { impl ConnectionHandler for PendingConnectionHandler { type InEvent = Void; type OutEvent = Void; - type Error = Void; type InboundProtocol = PendingUpgrade; type OutboundProtocol = PendingUpgrade; type OutboundOpenInfo = Void; @@ -63,14 +62,8 @@ impl ConnectionHandler for PendingConnectionHandler { fn poll( &mut self, _: &mut Context<'_>, - ) -> Poll< - ConnectionHandlerEvent< - Self::OutboundProtocol, - Self::OutboundOpenInfo, - Self::OutEvent, - Self::Error, - >, - > { + ) -> Poll> + { Poll::Pending } diff --git a/swarm/src/handler/select.rs b/swarm/src/handler/select.rs index 65508c0b6a5..7619341be37 100644 --- a/swarm/src/handler/select.rs +++ b/swarm/src/handler/select.rs @@ -333,7 +333,6 @@ where { type InEvent = EitherOutput; type OutEvent = EitherOutput; - type Error = EitherError; type InboundProtocol = SelectUpgrade< SendWrapper<::InboundProtocol>, SendWrapper<::InboundProtocol>, @@ -374,20 +373,14 @@ where fn poll( &mut self, cx: &mut Context<'_>, - ) -> Poll< - ConnectionHandlerEvent< - Self::OutboundProtocol, - Self::OutboundOpenInfo, - Self::OutEvent, - Self::Error, - >, - > { + ) -> Poll> + { match self.proto1.poll(cx) { Poll::Ready(ConnectionHandlerEvent::Custom(event)) => { return Poll::Ready(ConnectionHandlerEvent::Custom(EitherOutput::First(event))); } Poll::Ready(ConnectionHandlerEvent::Close(event)) => { - return Poll::Ready(ConnectionHandlerEvent::Close(EitherError::A(event))); + return Poll::Ready(ConnectionHandlerEvent::Close(event)); } Poll::Ready(ConnectionHandlerEvent::OutboundSubstreamRequest { protocol }) => { return Poll::Ready(ConnectionHandlerEvent::OutboundSubstreamRequest { @@ -404,7 +397,7 @@ where return Poll::Ready(ConnectionHandlerEvent::Custom(EitherOutput::Second(event))); } Poll::Ready(ConnectionHandlerEvent::Close(event)) => { - return Poll::Ready(ConnectionHandlerEvent::Close(EitherError::B(event))); + return Poll::Ready(ConnectionHandlerEvent::Close(event)); } Poll::Ready(ConnectionHandlerEvent::OutboundSubstreamRequest { protocol }) => { return Poll::Ready(ConnectionHandlerEvent::OutboundSubstreamRequest { diff --git a/swarm/src/keep_alive.rs b/swarm/src/keep_alive.rs index bd1ed812b8b..5a23d2b6e14 100644 --- a/swarm/src/keep_alive.rs +++ b/swarm/src/keep_alive.rs @@ -63,7 +63,6 @@ pub struct ConnectionHandler; impl crate::handler::ConnectionHandler for ConnectionHandler { type InEvent = Void; type OutEvent = Void; - type Error = Void; type InboundProtocol = DeniedUpgrade; type OutboundProtocol = DeniedUpgrade; type InboundOpenInfo = (); @@ -84,14 +83,8 @@ impl crate::handler::ConnectionHandler for ConnectionHandler { fn poll( &mut self, _: &mut Context<'_>, - ) -> Poll< - ConnectionHandlerEvent< - Self::OutboundProtocol, - Self::OutboundOpenInfo, - Self::OutEvent, - Self::Error, - >, - > { + ) -> Poll> + { Poll::Pending } From 840b45077a85184de57cc7e2775c6397f2bab981 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Wed, 7 Dec 2022 14:08:27 +1100 Subject: [PATCH 3/7] Fix rustdoc test --- swarm/src/behaviour.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/swarm/src/behaviour.rs b/swarm/src/behaviour.rs index 6240e570888..10895bd867e 100644 --- a/swarm/src/behaviour.rs +++ b/swarm/src/behaviour.rs @@ -557,7 +557,6 @@ pub enum NetworkBehaviourAction< /// # impl ConnectionHandler for MyHandler { /// # type InEvent = Void; /// # type OutEvent = Void; - /// # type Error = Void; /// # type InboundProtocol = DeniedUpgrade; /// # type OutboundProtocol = DeniedUpgrade; /// # type InboundOpenInfo = (); @@ -603,8 +602,7 @@ pub enum NetworkBehaviourAction< /// # ConnectionHandlerEvent< /// # Self::OutboundProtocol, /// # Self::OutboundOpenInfo, - /// # Self::OutEvent, - /// # Self::Error, + /// # Self::OutEvent /// # >, /// # > { /// # todo!("If `Self::message.is_some()` send the message to the remote.") From b9e1dd2e9dd2ce4d47d175c96fa01c8d499c8a00 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Fri, 16 Dec 2022 17:58:13 +1100 Subject: [PATCH 4/7] Introduce `CloseReason` --- protocols/dcutr/src/handler/relayed.rs | 41 ++++++++++++---- protocols/gossipsub/src/handler.rs | 44 +++++++++++------ protocols/ping/src/handler.rs | 6 ++- protocols/relay/src/v2/client/handler.rs | 9 ++-- protocols/relay/src/v2/relay/handler.rs | 8 ++-- protocols/request-response/src/handler.rs | 9 ++-- swarm/src/connection/error.rs | 9 ++-- swarm/src/handler.rs | 58 ++++++++++++++++++----- swarm/src/handler/one_shot.rs | 10 ++-- 9 files changed, 140 insertions(+), 54 deletions(-) diff --git a/protocols/dcutr/src/handler/relayed.rs b/protocols/dcutr/src/handler/relayed.rs index 12cd7793b4c..0e08bee3ee6 100644 --- a/protocols/dcutr/src/handler/relayed.rs +++ b/protocols/dcutr/src/handler/relayed.rs @@ -28,8 +28,8 @@ use libp2p_core::multiaddr::Multiaddr; use libp2p_core::upgrade::{self, DeniedUpgrade, NegotiationError, UpgradeError}; use libp2p_core::ConnectedPoint; use libp2p_swarm::handler::{ - ConnectionEvent, DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound, - ListenUpgradeError, + CloseReason, ConnectionEvent, DialUpgradeError, FullyNegotiatedInbound, + FullyNegotiatedOutbound, ListenUpgradeError, }; use libp2p_swarm::{ ConnectionHandler, ConnectionHandlerEvent, ConnectionHandlerUpgrErr, KeepAlive, @@ -250,11 +250,17 @@ impl Handler { NegotiationError::ProtocolError(p), )) => { self.queued_events - .push_back(ConnectionHandlerEvent::close(p)); + .push_back(ConnectionHandlerEvent::Close(CloseReason::new( + "dcutr-relay", + p, + ))); } ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Apply(EitherError::A(e))) => { self.queued_events - .push_back(ConnectionHandlerEvent::close(e)); + .push_back(ConnectionHandlerEvent::Close(CloseReason::new( + "dcutr-relay", + e, + ))); } ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Apply(EitherError::B(never))) => { void::unreachable(never) @@ -296,17 +302,26 @@ impl Handler { // the remote peer and results in closing the connection. e @ ConnectionHandlerUpgrErr::Timer => { self.queued_events - .push_back(ConnectionHandlerEvent::close(e)); + .push_back(ConnectionHandlerEvent::Close(CloseReason::new( + "dcutr-relay", + e, + ))); } ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select( NegotiationError::ProtocolError(p), )) => { self.queued_events - .push_back(ConnectionHandlerEvent::close(p)); + .push_back(ConnectionHandlerEvent::Close(CloseReason::new( + "dcutr-relay", + p, + ))); } ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Apply(e)) => { self.queued_events - .push_back(ConnectionHandlerEvent::close(e)); + .push_back(ConnectionHandlerEvent::Close(CloseReason::new( + "dcutr-relay", + e, + ))); } } } @@ -380,7 +395,10 @@ impl ConnectionHandler for Handler { // Check for a pending (fatal) error. if let Some(err) = self.pending_error.take() { // The handler will not be polled again by the `Swarm`. - return Poll::Ready(ConnectionHandlerEvent::close(err)); + return Poll::Ready(ConnectionHandlerEvent::Close(CloseReason::new( + "dcutr-relay", + err, + ))); } // Return queued events. @@ -396,7 +414,12 @@ impl ConnectionHandler for Handler { Event::InboundConnectNegotiated(addresses), )); } - Err(e) => return Poll::Ready(ConnectionHandlerEvent::close(e)), + Err(e) => { + return Poll::Ready(ConnectionHandlerEvent::Close(CloseReason::new( + "dcutr-relay", + e, + ))) + } } } diff --git a/protocols/gossipsub/src/handler.rs b/protocols/gossipsub/src/handler.rs index 660cb4810f6..fe455e6dfa1 100644 --- a/protocols/gossipsub/src/handler.rs +++ b/protocols/gossipsub/src/handler.rs @@ -27,9 +27,9 @@ use futures::StreamExt; use instant::Instant; use libp2p_core::upgrade::{NegotiationError, UpgradeError}; use libp2p_swarm::handler::{ - ConnectionEvent, ConnectionHandler, ConnectionHandlerEvent, ConnectionHandlerUpgrErr, - DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound, KeepAlive, - SubstreamProtocol, + CloseReason, ConnectionEvent, ConnectionHandler, ConnectionHandlerEvent, + ConnectionHandlerUpgrErr, DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound, + KeepAlive, SubstreamProtocol, }; use libp2p_swarm::NegotiatedSubstream; use log::{error, trace, warn}; @@ -320,7 +320,10 @@ impl ConnectionHandler for GossipsubHandler { // If there was a fatal error, close the connection. if let Some(error) = reported_error { - return Poll::Ready(ConnectionHandlerEvent::close(error)); + return Poll::Ready(ConnectionHandlerEvent::Close(CloseReason::new( + "gossipsub", + error, + ))); } } @@ -335,9 +338,10 @@ impl ConnectionHandler for GossipsubHandler { if self.inbound_substreams_created > MAX_SUBSTREAM_CREATION { // Too many inbound substreams have been created, end the connection. - return Poll::Ready(ConnectionHandlerEvent::close( + return Poll::Ready(ConnectionHandlerEvent::Close(CloseReason::new( + "gossipsub", GossipsubHandlerError::MaxInboundSubstreams, - )); + ))); } // determine if we need to create the stream @@ -346,9 +350,10 @@ impl ConnectionHandler for GossipsubHandler { && !self.outbound_substream_establishing { if self.outbound_substreams_created >= MAX_SUBSTREAM_CREATION { - return Poll::Ready(ConnectionHandlerEvent::close( + return Poll::Ready(ConnectionHandlerEvent::Close(CloseReason::new( + "gossipsub", GossipsubHandlerError::MaxOutboundSubstreams, - )); + ))); } let message = self.send_queue.remove(0); self.send_queue.shrink_to_fit(); @@ -471,13 +476,18 @@ impl ConnectionHandler for GossipsubHandler { } Err(e) => { error!("Error sending message: {}", e); - return Poll::Ready(ConnectionHandlerEvent::close(e)); + return Poll::Ready(ConnectionHandlerEvent::Close( + CloseReason::new("gossipsub", e), + )); } } } Poll::Ready(Err(e)) => { error!("Outbound substream error while sending output: {:?}", e); - return Poll::Ready(ConnectionHandlerEvent::close(e)); + return Poll::Ready(ConnectionHandlerEvent::Close(CloseReason::new( + "gossipsub", + e, + ))); } Poll::Pending => { self.keep_alive = KeepAlive::Yes; @@ -499,7 +509,10 @@ impl ConnectionHandler for GossipsubHandler { Some(OutboundSubstreamState::WaitingOutput(substream)) } Poll::Ready(Err(e)) => { - return Poll::Ready(ConnectionHandlerEvent::close(e)) + return Poll::Ready(ConnectionHandlerEvent::Close(CloseReason::new( + "gossipsub", + e, + ))) } Poll::Pending => { self.keep_alive = KeepAlive::Yes; @@ -521,9 +534,12 @@ impl ConnectionHandler for GossipsubHandler { } Poll::Ready(Err(e)) => { warn!("Outbound substream error while closing: {:?}", e); - return Poll::Ready(ConnectionHandlerEvent::close(io::Error::new( - io::ErrorKind::BrokenPipe, - "Failed to close outbound substream", + return Poll::Ready(ConnectionHandlerEvent::Close(CloseReason::new( + "gossipsub", + io::Error::new( + io::ErrorKind::BrokenPipe, + "Failed to close outbound substream", + ), ))); } Poll::Pending => { diff --git a/protocols/ping/src/handler.rs b/protocols/ping/src/handler.rs index 1cc287e0ad5..acbe0e9e633 100644 --- a/protocols/ping/src/handler.rs +++ b/protocols/ping/src/handler.rs @@ -25,7 +25,7 @@ use futures_timer::Delay; use libp2p_core::upgrade::ReadyUpgrade; use libp2p_core::{upgrade::NegotiationError, UpgradeError}; use libp2p_swarm::handler::{ - ConnectionEvent, DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound, + CloseReason, ConnectionEvent, DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound, }; use libp2p_swarm::{ ConnectionHandler, ConnectionHandlerEvent, ConnectionHandlerUpgrErr, KeepAlive, @@ -323,7 +323,9 @@ impl ConnectionHandler for Handler { if self.failures > 1 || self.config.max_failures.get() > 1 { if self.failures >= self.config.max_failures.get() { log::debug!("Too many failures ({}). Closing connection.", self.failures); - return Poll::Ready(ConnectionHandlerEvent::close(error)); + return Poll::Ready(ConnectionHandlerEvent::Close(CloseReason::new( + "ping", error, + ))); } return Poll::Ready(ConnectionHandlerEvent::Custom(Err(error))); diff --git a/protocols/relay/src/v2/client/handler.rs b/protocols/relay/src/v2/client/handler.rs index 961d26d06df..a4b7f235e7c 100644 --- a/protocols/relay/src/v2/client/handler.rs +++ b/protocols/relay/src/v2/client/handler.rs @@ -32,8 +32,8 @@ use libp2p_core::either::EitherError; use libp2p_core::multiaddr::Protocol; use libp2p_core::{upgrade, ConnectedPoint, Multiaddr, PeerId}; use libp2p_swarm::handler::{ - ConnectionEvent, DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound, - ListenUpgradeError, SendWrapper, + CloseReason, ConnectionEvent, DialUpgradeError, FullyNegotiatedInbound, + FullyNegotiatedOutbound, ListenUpgradeError, SendWrapper, }; use libp2p_swarm::{ dummy, ConnectionHandler, ConnectionHandlerEvent, ConnectionHandlerUpgrErr, @@ -555,7 +555,10 @@ impl ConnectionHandler for Handler { // Check for a pending (fatal) error. if let Some(err) = self.pending_error.take() { // The handler will not be polled again by the `Swarm`. - return Poll::Ready(ConnectionHandlerEvent::close(err)); + return Poll::Ready(ConnectionHandlerEvent::Close(CloseReason::new( + "relay-client", + err, + ))); } // Return queued events. diff --git a/protocols/relay/src/v2/relay/handler.rs b/protocols/relay/src/v2/relay/handler.rs index 6d06ee5a3c8..fc8228c09d3 100644 --- a/protocols/relay/src/v2/relay/handler.rs +++ b/protocols/relay/src/v2/relay/handler.rs @@ -34,8 +34,8 @@ use libp2p_core::connection::ConnectionId; use libp2p_core::either::EitherError; use libp2p_core::{upgrade, ConnectedPoint, Multiaddr, PeerId}; use libp2p_swarm::handler::{ - ConnectionEvent, DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound, - ListenUpgradeError, SendWrapper, + CloseReason, ConnectionEvent, DialUpgradeError, FullyNegotiatedInbound, + FullyNegotiatedOutbound, ListenUpgradeError, SendWrapper, }; use libp2p_swarm::{ dummy, ConnectionHandler, ConnectionHandlerEvent, ConnectionHandlerUpgrErr, @@ -744,7 +744,9 @@ impl ConnectionHandler for Handler { // Check for a pending (fatal) error. if let Some(err) = self.pending_error.take() { // The handler will not be polled again by the `Swarm`. - return Poll::Ready(ConnectionHandlerEvent::close(err)); + return Poll::Ready(ConnectionHandlerEvent::Close(CloseReason::new( + "relay", err, + ))); } // Return queued events. diff --git a/protocols/request-response/src/handler.rs b/protocols/request-response/src/handler.rs index c21b322ecc0..f138f8ba3de 100644 --- a/protocols/request-response/src/handler.rs +++ b/protocols/request-response/src/handler.rs @@ -24,8 +24,8 @@ use crate::codec::RequestResponseCodec; use crate::{RequestId, EMPTY_QUEUE_SHRINK_THRESHOLD}; use libp2p_swarm::handler::{ - ConnectionEvent, DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound, - ListenUpgradeError, + CloseReason, ConnectionEvent, DialUpgradeError, FullyNegotiatedInbound, + FullyNegotiatedOutbound, ListenUpgradeError, }; pub use protocol::{ProtocolSupport, RequestProtocol, ResponseProtocol}; @@ -332,7 +332,10 @@ where // Check for a pending (fatal) error. if let Some(err) = self.pending_error.take() { // The handler will not be polled again by the `Swarm`. - return Poll::Ready(ConnectionHandlerEvent::close(err)); + return Poll::Ready(ConnectionHandlerEvent::Close(CloseReason::new( + "request-response", + err, + ))); } // Drain pending events. diff --git a/swarm/src/connection/error.rs b/swarm/src/connection/error.rs index 56596182fa7..1c9ba40e47e 100644 --- a/swarm/src/connection/error.rs +++ b/swarm/src/connection/error.rs @@ -18,6 +18,7 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. +use crate::handler::CloseReason; use crate::transport::TransportError; use crate::Multiaddr; use crate::{connection::ConnectionLimit, ConnectedPoint, PeerId}; @@ -33,8 +34,8 @@ pub enum ConnectionError { /// The connection keep-alive timeout expired. KeepAliveTimeout, - /// The connection handler produced an error. - Handler(Box), + /// The connection handler actively closed the connection. + Handler(CloseReason), } impl fmt::Display for ConnectionError { @@ -44,7 +45,7 @@ impl fmt::Display for ConnectionError { ConnectionError::KeepAliveTimeout => { write!(f, "Connection closed due to expired keep-alive timeout.") } - ConnectionError::Handler(err) => write!(f, "Connection error: Handler error: {}", err), + ConnectionError::Handler(_) => Ok(()), // `CloseReason` prints enough context. } } } @@ -54,7 +55,7 @@ impl std::error::Error for ConnectionError { match self { ConnectionError::IO(err) => Some(err), ConnectionError::KeepAliveTimeout => None, - ConnectionError::Handler(err) => Some(err.as_ref()), + ConnectionError::Handler(err) => Some(err), } } } diff --git a/swarm/src/handler.rs b/swarm/src/handler.rs index b434af381b6..b4e6648f44c 100644 --- a/swarm/src/handler.rs +++ b/swarm/src/handler.rs @@ -433,7 +433,7 @@ pub enum ConnectionHandlerEvent /// connection, while allowing other [`ConnectionHandler`]s to continue using /// the connection, return [`KeepAlive::No`] in /// [`ConnectionHandler::connection_keep_alive`]. - Close(Box), + Close(CloseReason), /// Other event. Custom(TCustom), @@ -443,17 +443,6 @@ pub enum ConnectionHandlerEvent impl ConnectionHandlerEvent { - /// Close the connection for the specified reason. - /// - /// You are encouraged to supply a downcast-friendly type here IF you later want to inspect, - /// why a connection was closed. This works best if you define a custom error type for your - /// handler and directly supply instances of this error to this function. This will allow - /// you and other users to directly downcast to a type from your module, thus giving them a - /// similar experience as if we were to track this with a type parameter. - pub fn close(error: impl error::Error + Send + 'static) -> Self { - Self::Close(Box::new(error)) - } - /// If this is an `OutboundSubstreamRequest`, maps the `info` member from a /// `TOutboundOpenInfo` to something else. pub fn map_outbound_open_info( @@ -509,6 +498,51 @@ impl } } +#[derive(Debug)] +pub struct CloseReason { + component_name: &'static str, + source: Box, +} + +impl CloseReason { + /// Construct a new [`CloseReason`]. + /// + /// The first parameter should be a meaningful identifier for the component / protocol that is closing the connection. + /// The given `source` is returned from [`Error::source`](error::Error::source) and can be printed by iterating the source of this error. + /// + /// # Example + /// + /// ```rust + /// # use libp2p_swarm::handler::CloseReason; + /// + /// # fn main() { + /// let source = std::io::Error::from(std::io::ErrorKind::UnexpectedEof); // Imagine this being the result of reading from a stream. + /// let reason = CloseReason::new("ping", source); + /// + /// assert_eq!(reason.to_string(), "connection closed by 'ping' protocol") + /// # } + /// + /// ``` + pub fn new(component_name: &'static str, source: impl error::Error + Send + 'static) -> Self { + Self { + component_name, + source: Box::new(source), + } + } +} + +impl fmt::Display for CloseReason { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "connection closed by '{}' protocol", self.component_name) + } +} + +impl error::Error for CloseReason { + fn source(&self) -> Option<&(dyn error::Error + 'static)> { + Some(self.source.as_ref()) + } +} + /// Error that can happen on an outbound substream opening attempt. #[derive(Debug)] pub enum ConnectionHandlerUpgrErr { diff --git a/swarm/src/handler/one_shot.rs b/swarm/src/handler/one_shot.rs index 82aa0ad36bf..423d35c4c1e 100644 --- a/swarm/src/handler/one_shot.rs +++ b/swarm/src/handler/one_shot.rs @@ -19,9 +19,9 @@ // DEALINGS IN THE SOFTWARE. use crate::handler::{ - ConnectionEvent, ConnectionHandler, ConnectionHandlerEvent, ConnectionHandlerUpgrErr, - DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound, KeepAlive, - SubstreamProtocol, + CloseReason, ConnectionEvent, ConnectionHandler, ConnectionHandlerEvent, + ConnectionHandlerUpgrErr, DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound, + KeepAlive, SubstreamProtocol, }; use crate::upgrade::{InboundUpgradeSend, OutboundUpgradeSend}; use instant::Instant; @@ -146,7 +146,9 @@ where ) -> Poll> { if let Some(err) = self.pending_error.take() { - return Poll::Ready(ConnectionHandlerEvent::close(err)); + return Poll::Ready(ConnectionHandlerEvent::Close(CloseReason::new( + "oneshot", err, + ))); } if !self.events_out.is_empty() { From 32597a61266bcd04ef54047128af7d4ef1c761a3 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Tue, 20 Dec 2022 14:20:01 +1100 Subject: [PATCH 5/7] Reference anyhow in docs --- swarm/src/handler.rs | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/swarm/src/handler.rs b/swarm/src/handler.rs index 358f90dee37..b8d212c70c4 100644 --- a/swarm/src/handler.rs +++ b/swarm/src/handler.rs @@ -498,6 +498,11 @@ impl } } +/// Encapsulates the reason for why a specific [`ConnectionHandler`] closed a connection. +/// +/// This type implements [`Error`](error::Error) and exposes the underlying reason through its [`Error::source`](error::Error::source) function. +/// When printing this error, you will need to iterate the entire chain of causes. +/// Error handling libraries like `anyhow` do this in case you don't want to do it yourself. #[derive(Debug)] pub struct CloseReason { component_name: &'static str, From 7aa991eecfd88afa95aa30a8baf341e580912e65 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Tue, 20 Dec 2022 14:21:36 +1100 Subject: [PATCH 6/7] Unify docs --- swarm/src/handler.rs | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/swarm/src/handler.rs b/swarm/src/handler.rs index b8d212c70c4..e420f63a7a9 100644 --- a/swarm/src/handler.rs +++ b/swarm/src/handler.rs @@ -499,10 +499,6 @@ impl } /// Encapsulates the reason for why a specific [`ConnectionHandler`] closed a connection. -/// -/// This type implements [`Error`](error::Error) and exposes the underlying reason through its [`Error::source`](error::Error::source) function. -/// When printing this error, you will need to iterate the entire chain of causes. -/// Error handling libraries like `anyhow` do this in case you don't want to do it yourself. #[derive(Debug)] pub struct CloseReason { component_name: &'static str, @@ -515,6 +511,8 @@ impl CloseReason { /// The first parameter should be a meaningful identifier for the component / protocol that is closing the connection. /// The given `source` is returned from [`Error::source`](error::Error::source) and can be printed by iterating the source of this error. /// + /// If you don't want to iterate over the sources yourself, you can use an error handling library like `anyhow` or `eyre`. + /// /// # Example /// /// ```rust From 78abccb72effc00df68149a698a1e7bfb3c34bc6 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Tue, 20 Dec 2022 14:22:06 +1100 Subject: [PATCH 7/7] Update variable name --- swarm/src/handler.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/swarm/src/handler.rs b/swarm/src/handler.rs index e420f63a7a9..6bf25c8ca8a 100644 --- a/swarm/src/handler.rs +++ b/swarm/src/handler.rs @@ -501,7 +501,7 @@ impl /// Encapsulates the reason for why a specific [`ConnectionHandler`] closed a connection. #[derive(Debug)] pub struct CloseReason { - component_name: &'static str, + protocol: &'static str, source: Box, } @@ -526,9 +526,9 @@ impl CloseReason { /// # } /// /// ``` - pub fn new(component_name: &'static str, source: impl error::Error + Send + 'static) -> Self { + pub fn new(protocol: &'static str, source: impl error::Error + Send + 'static) -> Self { Self { - component_name, + protocol, source: Box::new(source), } } @@ -536,7 +536,7 @@ impl CloseReason { impl fmt::Display for CloseReason { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(f, "connection closed by '{}' protocol", self.component_name) + write!(f, "connection closed by '{}' protocol", self.protocol) } }