Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(swarm)!: Remove ConnectionHandler::Error in favor of CloseReason #3201

Closed
wants to merge 8 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 2 additions & 9 deletions examples/file-sharing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,15 +208,14 @@ mod network {
use super::*;
use async_trait::async_trait;
use futures::channel::{mpsc, oneshot};
use libp2p::core::either::EitherError;
use libp2p::core::upgrade::{read_length_prefixed, write_length_prefixed, ProtocolName};
use libp2p::identity;
use libp2p::identity::ed25519;
use libp2p::kad::record::store::MemoryStore;
use libp2p::kad::{GetProvidersOk, Kademlia, KademliaEvent, QueryId, QueryResult};
use libp2p::multiaddr::Protocol;
use libp2p::request_response::{self, ProtocolSupport, RequestId, ResponseChannel};
use libp2p::swarm::{ConnectionHandlerUpgrErr, NetworkBehaviour, Swarm, SwarmEvent};
use libp2p::swarm::{NetworkBehaviour, Swarm, SwarmEvent};
use std::collections::{hash_map, HashMap, HashSet};
use std::iter;

Expand Down Expand Up @@ -401,13 +400,7 @@ mod network {
}
}

async fn handle_event(
&mut self,
event: SwarmEvent<
ComposedEvent,
EitherError<ConnectionHandlerUpgrErr<io::Error>, io::Error>,
>,
) {
async fn handle_event(&mut self, event: SwarmEvent<ComposedEvent>) {
match event {
SwarmEvent::Behaviour(ComposedEvent::Kademlia(
KademliaEvent::OutboundQueryProgressed {
Expand Down
4 changes: 2 additions & 2 deletions misc/metrics/src/identify.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,8 +193,8 @@ impl super::Recorder<libp2p_identify::Event> for Metrics {
}
}

impl<TBvEv, THandleErr> super::Recorder<libp2p_swarm::SwarmEvent<TBvEv, THandleErr>> for Metrics {
fn record(&self, event: &libp2p_swarm::SwarmEvent<TBvEv, THandleErr>) {
impl<TBvEv> super::Recorder<libp2p_swarm::SwarmEvent<TBvEv>> for Metrics {
fn record(&self, event: &libp2p_swarm::SwarmEvent<TBvEv>) {
if let libp2p_swarm::SwarmEvent::ConnectionClosed {
peer_id,
num_established,
Expand Down
4 changes: 2 additions & 2 deletions misc/metrics/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,8 +142,8 @@ impl Recorder<libp2p_relay::v2::relay::Event> for Metrics {
}
}

impl<TBvEv, THandleErr> Recorder<libp2p_swarm::SwarmEvent<TBvEv, THandleErr>> for Metrics {
fn record(&self, event: &libp2p_swarm::SwarmEvent<TBvEv, THandleErr>) {
impl<TBvEv> Recorder<libp2p_swarm::SwarmEvent<TBvEv>> for Metrics {
fn record(&self, event: &libp2p_swarm::SwarmEvent<TBvEv>) {
self.swarm.record(event);

#[cfg(feature = "identify")]
Expand Down
4 changes: 2 additions & 2 deletions misc/metrics/src/swarm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,8 +153,8 @@ impl Metrics {
}
}

impl<TBvEv, THandleErr> super::Recorder<libp2p_swarm::SwarmEvent<TBvEv, THandleErr>> for Metrics {
fn record(&self, event: &libp2p_swarm::SwarmEvent<TBvEv, THandleErr>) {
impl<TBvEv> super::Recorder<libp2p_swarm::SwarmEvent<TBvEv>> for Metrics {
fn record(&self, event: &libp2p_swarm::SwarmEvent<TBvEv>) {
match event {
libp2p_swarm::SwarmEvent::Behaviour(_) => {}
libp2p_swarm::SwarmEvent::ConnectionEstablished {
Expand Down
11 changes: 2 additions & 9 deletions protocols/dcutr/src/handler/direct.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ impl Handler {
impl ConnectionHandler for Handler {
type InEvent = void::Void;
type OutEvent = Event;
type Error = ConnectionHandlerUpgrErr<std::io::Error>;
type InboundProtocol = DeniedUpgrade;
type OutboundProtocol = DeniedUpgrade;
type OutboundOpenInfo = Void;
Expand Down Expand Up @@ -94,14 +93,8 @@ impl ConnectionHandler for Handler {
fn poll(
&mut self,
_: &mut Context<'_>,
) -> Poll<
ConnectionHandlerEvent<
Self::OutboundProtocol,
Self::OutboundOpenInfo,
Self::OutEvent,
Self::Error,
>,
> {
) -> Poll<ConnectionHandlerEvent<Self::OutboundProtocol, Self::OutboundOpenInfo, Self::OutEvent>>
{
if !self.reported {
self.reported = true;
return Poll::Ready(ConnectionHandlerEvent::Custom(
Expand Down
86 changes: 55 additions & 31 deletions protocols/dcutr/src/handler/relayed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ use libp2p_core::multiaddr::Multiaddr;
use libp2p_core::upgrade::{self, DeniedUpgrade, NegotiationError, UpgradeError};
use libp2p_core::ConnectedPoint;
use libp2p_swarm::handler::{
ConnectionEvent, DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound,
ListenUpgradeError,
CloseReason, ConnectionEvent, DialUpgradeError, FullyNegotiatedInbound,
FullyNegotiatedOutbound, ListenUpgradeError,
};
use libp2p_swarm::{
ConnectionHandler, ConnectionHandlerEvent, ConnectionHandlerUpgrErr, KeepAlive,
Expand Down Expand Up @@ -143,7 +143,6 @@ pub struct Handler {
<Self as ConnectionHandler>::OutboundProtocol,
<Self as ConnectionHandler>::OutboundOpenInfo,
<Self as ConnectionHandler>::OutEvent,
<Self as ConnectionHandler>::Error,
>,
>,
/// Inbound connect, accepted by the behaviour, pending completion.
Expand Down Expand Up @@ -247,15 +246,24 @@ impl Handler {
},
));
}
_ => {
// Anything else is considered a fatal error or misbehaviour of
// the remote peer and results in closing the connection.
self.pending_error = Some(error.map_upgrade_err(|e| {
e.map_err(|e| match e {
EitherError::A(e) => EitherError::A(e),
EitherError::B(v) => void::unreachable(v),
})
}));
ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(
NegotiationError::ProtocolError(p),
)) => {
self.queued_events
.push_back(ConnectionHandlerEvent::Close(CloseReason::new(
"dcutr-relay",
p,
)));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about trait CloseReasonExt to make this ConnectionHandlerEvent::Close(p.closeReason("dcutr-relay"))?

Copy link
Contributor Author

@thomaseizinger thomaseizinger Dec 16, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could do:

p.into_close_reason_for("dcutr-relay")

perhaps?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not to start bikesheding but what about impl From<(&str, Box<dyn Error + Send + 'static>)> for CloseReason and have ConnectionHandlerEvent::Close(("dcutr-relay", p).into())?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Personally, I am not a fan of From<(.., ..)> implementations. I generally tend to avoid to implement From unless I can provide a function somewhere that accepts impl Into and thus allows us to avoid .into() calls somewhere. To me, calling .into yourself is a bit of a code smell.

}
ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Apply(EitherError::A(e))) => {
self.queued_events
.push_back(ConnectionHandlerEvent::Close(CloseReason::new(
"dcutr-relay",
e,
)));
}
ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Apply(EitherError::B(never))) => {
void::unreachable(never)
}
}
}
Expand Down Expand Up @@ -289,10 +297,31 @@ impl Handler {
},
));
}
_ => {
// Anything else is considered a fatal error or misbehaviour of
// the remote peer and results in closing the connection.
self.pending_error = Some(error.map_upgrade_err(|e| e.map_err(EitherError::B)));

// Anything else is considered a fatal error or misbehaviour of
// the remote peer and results in closing the connection.
e @ ConnectionHandlerUpgrErr::Timer => {
self.queued_events
.push_back(ConnectionHandlerEvent::Close(CloseReason::new(
"dcutr-relay",
e,
)));
}
ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(
NegotiationError::ProtocolError(p),
)) => {
self.queued_events
.push_back(ConnectionHandlerEvent::Close(CloseReason::new(
"dcutr-relay",
p,
)));
}
ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Apply(e)) => {
self.queued_events
.push_back(ConnectionHandlerEvent::Close(CloseReason::new(
"dcutr-relay",
e,
)));
}
}
}
Expand All @@ -301,9 +330,6 @@ impl Handler {
impl ConnectionHandler for Handler {
type InEvent = Command;
type OutEvent = Event;
type Error = ConnectionHandlerUpgrErr<
EitherError<protocol::inbound::UpgradeError, protocol::outbound::UpgradeError>,
>;
type InboundProtocol = upgrade::EitherUpgrade<protocol::inbound::Upgrade, DeniedUpgrade>;
type OutboundProtocol = protocol::outbound::Upgrade;
type OutboundOpenInfo = u8; // Number of upgrade attempts.
Expand Down Expand Up @@ -364,18 +390,15 @@ impl ConnectionHandler for Handler {
fn poll(
&mut self,
cx: &mut Context<'_>,
) -> Poll<
ConnectionHandlerEvent<
Self::OutboundProtocol,
Self::OutboundOpenInfo,
Self::OutEvent,
Self::Error,
>,
> {
) -> Poll<ConnectionHandlerEvent<Self::OutboundProtocol, Self::OutboundOpenInfo, Self::OutEvent>>
{
// Check for a pending (fatal) error.
if let Some(err) = self.pending_error.take() {
// The handler will not be polled again by the `Swarm`.
return Poll::Ready(ConnectionHandlerEvent::Close(err));
return Poll::Ready(ConnectionHandlerEvent::Close(CloseReason::new(
"dcutr-relay",
err,
)));
}

// Return queued events.
Expand All @@ -392,9 +415,10 @@ impl ConnectionHandler for Handler {
));
}
Err(e) => {
return Poll::Ready(ConnectionHandlerEvent::Close(
ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Apply(EitherError::A(e))),
))
return Poll::Ready(ConnectionHandlerEvent::Close(CloseReason::new(
"dcutr-relay",
e,
)))
}
}
}
Expand Down
54 changes: 30 additions & 24 deletions protocols/gossipsub/src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@ use futures::StreamExt;
use instant::Instant;
use libp2p_core::upgrade::{NegotiationError, UpgradeError};
use libp2p_swarm::handler::{
ConnectionEvent, ConnectionHandler, ConnectionHandlerEvent, ConnectionHandlerUpgrErr,
DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound, KeepAlive,
SubstreamProtocol,
CloseReason, ConnectionEvent, ConnectionHandler, ConnectionHandlerEvent,
ConnectionHandlerUpgrErr, DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound,
KeepAlive, SubstreamProtocol,
};
use libp2p_swarm::NegotiatedSubstream;
use log::{error, trace, warn};
Expand Down Expand Up @@ -248,7 +248,6 @@ impl GossipsubHandler {
impl ConnectionHandler for GossipsubHandler {
type InEvent = GossipsubHandlerIn;
type OutEvent = HandlerEvent;
type Error = GossipsubHandlerError;
type InboundOpenInfo = ();
type InboundProtocol = ProtocolConfig;
type OutboundOpenInfo = crate::rpc_proto::Rpc;
Expand Down Expand Up @@ -283,14 +282,8 @@ impl ConnectionHandler for GossipsubHandler {
fn poll(
&mut self,
cx: &mut Context<'_>,
) -> Poll<
ConnectionHandlerEvent<
Self::OutboundProtocol,
Self::OutboundOpenInfo,
Self::OutEvent,
Self::Error,
>,
> {
) -> Poll<ConnectionHandlerEvent<Self::OutboundProtocol, Self::OutboundOpenInfo, Self::OutEvent>>
{
// Handle any upgrade errors
if let Some(error) = self.upgrade_errors.pop_front() {
let reported_error = match error {
Expand Down Expand Up @@ -327,7 +320,10 @@ impl ConnectionHandler for GossipsubHandler {

// If there was a fatal error, close the connection.
if let Some(error) = reported_error {
return Poll::Ready(ConnectionHandlerEvent::Close(error));
return Poll::Ready(ConnectionHandlerEvent::Close(CloseReason::new(
"gossipsub",
error,
)));
}
}

Expand All @@ -342,9 +338,10 @@ impl ConnectionHandler for GossipsubHandler {

if self.inbound_substreams_created > MAX_SUBSTREAM_CREATION {
// Too many inbound substreams have been created, end the connection.
return Poll::Ready(ConnectionHandlerEvent::Close(
return Poll::Ready(ConnectionHandlerEvent::Close(CloseReason::new(
"gossipsub",
GossipsubHandlerError::MaxInboundSubstreams,
));
)));
}

// determine if we need to create the stream
Expand All @@ -353,9 +350,10 @@ impl ConnectionHandler for GossipsubHandler {
&& !self.outbound_substream_establishing
{
if self.outbound_substreams_created >= MAX_SUBSTREAM_CREATION {
return Poll::Ready(ConnectionHandlerEvent::Close(
return Poll::Ready(ConnectionHandlerEvent::Close(CloseReason::new(
"gossipsub",
GossipsubHandlerError::MaxOutboundSubstreams,
));
)));
}
let message = self.send_queue.remove(0);
self.send_queue.shrink_to_fit();
Expand Down Expand Up @@ -478,13 +476,18 @@ impl ConnectionHandler for GossipsubHandler {
}
Err(e) => {
error!("Error sending message: {}", e);
return Poll::Ready(ConnectionHandlerEvent::Close(e));
return Poll::Ready(ConnectionHandlerEvent::Close(
CloseReason::new("gossipsub", e),
));
}
}
}
Poll::Ready(Err(e)) => {
error!("Outbound substream error while sending output: {:?}", e);
return Poll::Ready(ConnectionHandlerEvent::Close(e));
return Poll::Ready(ConnectionHandlerEvent::Close(CloseReason::new(
"gossipsub",
e,
)));
}
Poll::Pending => {
self.keep_alive = KeepAlive::Yes;
Expand All @@ -506,7 +509,10 @@ impl ConnectionHandler for GossipsubHandler {
Some(OutboundSubstreamState::WaitingOutput(substream))
}
Poll::Ready(Err(e)) => {
return Poll::Ready(ConnectionHandlerEvent::Close(e))
return Poll::Ready(ConnectionHandlerEvent::Close(CloseReason::new(
"gossipsub",
e,
)))
}
Poll::Pending => {
self.keep_alive = KeepAlive::Yes;
Expand All @@ -528,13 +534,13 @@ impl ConnectionHandler for GossipsubHandler {
}
Poll::Ready(Err(e)) => {
warn!("Outbound substream error while closing: {:?}", e);
return Poll::Ready(ConnectionHandlerEvent::Close(
return Poll::Ready(ConnectionHandlerEvent::Close(CloseReason::new(
"gossipsub",
io::Error::new(
io::ErrorKind::BrokenPipe,
"Failed to close outbound substream",
)
.into(),
));
),
)));
}
Poll::Pending => {
self.keep_alive = KeepAlive::No;
Expand Down
10 changes: 3 additions & 7 deletions protocols/identify/src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ use libp2p_swarm::{
use log::warn;
use smallvec::SmallVec;
use std::collections::VecDeque;
use std::{io, pin::Pin, task::Context, task::Poll, time::Duration};
use std::{pin::Pin, task::Context, task::Poll, time::Duration};

pub struct Proto {
initial_delay: Duration,
Expand Down Expand Up @@ -101,8 +101,7 @@ pub struct Handler {
inbound_identify_push: Option<BoxFuture<'static, Result<Info, UpgradeError>>>,
/// Pending events to yield.
events: SmallVec<
[ConnectionHandlerEvent<EitherUpgrade<Identify, Push<OutboundPush>>, (), Event, io::Error>;
4],
[ConnectionHandlerEvent<EitherUpgrade<Identify, Push<OutboundPush>>, (), Event>; 4],
>,

/// Streams awaiting `BehaviourInfo` to then send identify requests.
Expand Down Expand Up @@ -274,7 +273,6 @@ impl Handler {
impl ConnectionHandler for Handler {
type InEvent = InEvent;
type OutEvent = Event;
type Error = io::Error;
type InboundProtocol = SelectUpgrade<Identify, Push<InboundPush>>;
type OutboundProtocol = EitherUpgrade<Identify, Push<OutboundPush>>;
type OutboundOpenInfo = ();
Expand Down Expand Up @@ -333,9 +331,7 @@ impl ConnectionHandler for Handler {
fn poll(
&mut self,
cx: &mut Context<'_>,
) -> Poll<
ConnectionHandlerEvent<Self::OutboundProtocol, Self::OutboundOpenInfo, Event, Self::Error>,
> {
) -> Poll<ConnectionHandlerEvent<Self::OutboundProtocol, Self::OutboundOpenInfo, Event>> {
if !self.events.is_empty() {
return Poll::Ready(self.events.remove(0));
}
Expand Down
Loading