Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

protocols/relay: Limit inbound streams #2698

Merged
merged 11 commits into from
Jun 8, 2022
11 changes: 11 additions & 0 deletions protocols/relay/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,14 @@
# 0.9.1 - unreleased

- Respond to at most one incoming reservation request. Deny <= 8 incoming
circuit requests with one per peer. And deny new circuits before accepting new
circuits. See [PR 2698].

- Expose explicits errors via `UpgradeError` instead of generic `io::Error`. See
[PR 2698].

[PR 2698]: https://github.com/libp2p/rust-libp2p/pull/2698/

# 0.9.0

- Update to `libp2p-core` `v0.33.0`.
Expand Down
4 changes: 2 additions & 2 deletions protocols/relay/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ name = "libp2p-relay"
edition = "2021"
rust-version = "1.56.1"
description = "Communications relaying for libp2p"
version = "0.9.0"
version = "0.9.1"
authors = ["Parity Technologies <admin@parity.io>", "Max Inden <mail@max-inden.de>"]
license = "MIT"
repository = "https://github.com/libp2p/rust-libp2p"
Expand All @@ -21,12 +21,12 @@ libp2p-core = { version = "0.33.0", path = "../../core", default-features = fals
libp2p-swarm = { version = "0.36.0", path = "../../swarm" }
log = "0.4"
pin-project = "1"
prost-codec = { version = "0.1", path = "../../misc/prost-codec" }
prost = "0.10"
rand = "0.8.4"
smallvec = "1.6.1"
static_assertions = "1"
thiserror = "1.0"
unsigned-varint = { version = "0.7", features = ["asynchronous_codec"] }
void = "1"

[build-dependencies]
Expand Down
11 changes: 7 additions & 4 deletions protocols/relay/src/v2/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ use libp2p_swarm::{
NotifyHandler, PollParameters,
};
use std::collections::{hash_map, HashMap, VecDeque};
use std::io::{Error, IoSlice};
use std::io::{Error, ErrorKind, IoSlice};
use std::ops::DerefMut;
use std::pin::Pin;
use std::task::{Context, Poll};
Expand Down Expand Up @@ -84,7 +84,7 @@ pub enum Event {
/// Denying an inbound circuit request failed.
InboundCircuitReqDenyFailed {
src_peer_id: PeerId,
error: std::io::Error,
error: inbound_stop::UpgradeError,
},
}

Expand Down Expand Up @@ -320,7 +320,7 @@ impl NetworkBehaviour for Client {
/// A [`NegotiatedSubstream`] acting as a [`RelayedConnection`].
pub enum RelayedConnection {
InboundAccepting {
accept: BoxFuture<'static, Result<RelayedConnection, std::io::Error>>,
accept: BoxFuture<'static, Result<RelayedConnection, Error>>,
},
Operational {
read_buffer: Bytes,
Expand All @@ -338,7 +338,10 @@ impl RelayedConnection {
) -> Self {
RelayedConnection::InboundAccepting {
accept: async {
let (substream, read_buffer) = circuit.accept().await?;
let (substream, read_buffer) = circuit
.accept()
.await
.map_err(|e| Error::new(ErrorKind::Other, e))?;
Ok(RelayedConnection::Operational {
read_buffer,
substream,
Expand Down
75 changes: 52 additions & 23 deletions protocols/relay/src/v2/client/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,16 @@ use libp2p_swarm::{
KeepAlive, NegotiatedSubstream, SubstreamProtocol,
};
use log::debug;
use std::collections::VecDeque;
use std::collections::{HashMap, VecDeque};
use std::fmt;
use std::task::{Context, Poll};
use std::time::Duration;

/// The maximum number of circuits being denied concurrently.
///
/// Circuits to be denied exceeding the limit are dropped.
const MAX_NUMBER_DENYING_CIRCUIT: usize = 8;

pub enum In {
Reserve {
to_listener: mpsc::Sender<transport::ToListenerMsg>,
Expand Down Expand Up @@ -100,7 +105,7 @@ pub enum Event {
/// Denying an inbound circuit request failed.
InboundCircuitReqDenyFailed {
src_peer_id: PeerId,
error: std::io::Error,
error: inbound_stop::UpgradeError,
},
}

Expand Down Expand Up @@ -196,7 +201,8 @@ pub struct Handler {
/// eventually.
alive_lend_out_substreams: FuturesUnordered<oneshot::Receiver<void::Void>>,

circuit_deny_futs: FuturesUnordered<BoxFuture<'static, (PeerId, Result<(), std::io::Error>)>>,
circuit_deny_futs:
HashMap<PeerId, BoxFuture<'static, Result<(), protocol::inbound_stop::UpgradeError>>>,

/// Futures that try to send errors to the transport.
///
Expand Down Expand Up @@ -251,12 +257,27 @@ impl ConnectionHandler for Handler {
}
Reservation::None => {
let src_peer_id = inbound_circuit.src_peer_id();
self.circuit_deny_futs.push(
inbound_circuit
.deny(Status::NoReservation)
.map(move |result| (src_peer_id, result))
.boxed(),
)

if self.circuit_deny_futs.len() == MAX_NUMBER_DENYING_CIRCUIT
&& !self.circuit_deny_futs.contains_key(&src_peer_id)
{
log::warn!(
"Dropping inbound circuit request to be denied from {:?} due to exceeding limit.",
src_peer_id,
);
} else if self
.circuit_deny_futs
.insert(
src_peer_id,
inbound_circuit.deny(Status::NoReservation).boxed(),
)
.is_some()
{
log::warn!(
"Dropping existing inbound circuit request to be denied from {:?} in favor of new one.",
src_peer_id
)
}
}
}
}
Expand Down Expand Up @@ -537,20 +558,28 @@ impl ConnectionHandler for Handler {
}

// Deny incoming circuit requests.
if let Poll::Ready(Some((src_peer_id, result))) = self.circuit_deny_futs.poll_next_unpin(cx)
{
match result {
Ok(()) => {
return Poll::Ready(ConnectionHandlerEvent::Custom(
Event::InboundCircuitReqDenied { src_peer_id },
))
}
Err(error) => {
return Poll::Ready(ConnectionHandlerEvent::Custom(
Event::InboundCircuitReqDenyFailed { src_peer_id, error },
))
}
}
let maybe_event =
self.circuit_deny_futs
.iter_mut()
.find_map(|(src_peer_id, fut)| match fut.poll_unpin(cx) {
Poll::Ready(Ok(())) => Some((
*src_peer_id,
Event::InboundCircuitReqDenied {
src_peer_id: *src_peer_id,
},
)),
Poll::Ready(Err(error)) => Some((
*src_peer_id,
Event::InboundCircuitReqDenyFailed {
src_peer_id: *src_peer_id,
error,
},
)),
Poll::Pending => None,
});
if let Some((src_peer_id, event)) = maybe_event {
self.circuit_deny_futs.remove(&src_peer_id);
return Poll::Ready(ConnectionHandlerEvent::Custom(event));
}

// Send errors to transport.
Expand Down
65 changes: 23 additions & 42 deletions protocols/relay/src/v2/protocol/inbound_hop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,10 @@ use bytes::Bytes;
use futures::{future::BoxFuture, prelude::*};
use libp2p_core::{upgrade, Multiaddr, PeerId};
use libp2p_swarm::NegotiatedSubstream;
use prost::Message;
use std::convert::TryInto;
use std::io::Cursor;
use std::iter;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use thiserror::Error;
use unsigned_varint::codec::UviBytes;

pub struct Upgrade {
pub reservation_duration: Duration,
Expand All @@ -54,23 +51,19 @@ impl upgrade::InboundUpgrade<NegotiatedSubstream> for Upgrade {
type Future = BoxFuture<'static, Result<Self::Output, Self::Error>>;

fn upgrade_inbound(self, substream: NegotiatedSubstream, _: Self::Info) -> Self::Future {
let mut codec = UviBytes::default();
codec.set_max_len(MAX_MESSAGE_SIZE);
let mut substream = Framed::new(substream, codec);
let mut substream = Framed::new(substream, prost_codec::Codec::new(MAX_MESSAGE_SIZE));

async move {
let msg: bytes::BytesMut = substream
.next()
.await
.ok_or_else(|| std::io::Error::new(std::io::ErrorKind::UnexpectedEof, ""))??;

let HopMessage {
r#type,
peer,
reservation: _,
limit: _,
status: _,
} = HopMessage::decode(Cursor::new(msg))?;
} = substream
.next()
.await
.ok_or(FatalUpgradeError::StreamClosed)??;

let r#type =
hop_message::Type::from_i32(r#type).ok_or(FatalUpgradeError::ParseTypeField)?;
Expand Down Expand Up @@ -103,28 +96,22 @@ pub enum UpgradeError {
Fatal(#[from] FatalUpgradeError),
}

impl From<prost::DecodeError> for UpgradeError {
fn from(error: prost::DecodeError) -> Self {
Self::Fatal(error.into())
}
}

impl From<std::io::Error> for UpgradeError {
fn from(error: std::io::Error) -> Self {
impl From<prost_codec::Error> for UpgradeError {
fn from(error: prost_codec::Error) -> Self {
Self::Fatal(error.into())
}
}

#[derive(Debug, Error)]
pub enum FatalUpgradeError {
#[error("Failed to decode message: {0}.")]
Decode(
#[error("Failed to encode or decode")]
Codec(
#[from]
#[source]
prost::DecodeError,
prost_codec::Error,
),
#[error(transparent)]
Io(#[from] std::io::Error),
#[error("Stream closed")]
StreamClosed,
#[error("Failed to parse response type field.")]
ParseTypeField,
#[error("Failed to parse peer id.")]
Expand All @@ -141,14 +128,14 @@ pub enum Req {
}

pub struct ReservationReq {
substream: Framed<NegotiatedSubstream, UviBytes<Cursor<Vec<u8>>>>,
substream: Framed<NegotiatedSubstream, prost_codec::Codec<HopMessage>>,
reservation_duration: Duration,
max_circuit_duration: Duration,
max_circuit_bytes: u64,
}

impl ReservationReq {
pub async fn accept(self, addrs: Vec<Multiaddr>) -> Result<(), std::io::Error> {
pub async fn accept(self, addrs: Vec<Multiaddr>) -> Result<(), UpgradeError> {
let msg = HopMessage {
r#type: hop_message::Type::Status.into(),
peer: None,
Expand All @@ -175,7 +162,7 @@ impl ReservationReq {
self.send(msg).await
}

pub async fn deny(self, status: Status) -> Result<(), std::io::Error> {
pub async fn deny(self, status: Status) -> Result<(), UpgradeError> {
let msg = HopMessage {
r#type: hop_message::Type::Status.into(),
peer: None,
Expand All @@ -187,11 +174,8 @@ impl ReservationReq {
self.send(msg).await
}

async fn send(mut self, msg: HopMessage) -> Result<(), std::io::Error> {
let mut encoded_msg = Vec::with_capacity(msg.encoded_len());
msg.encode(&mut encoded_msg)
.expect("Vec to have sufficient capacity.");
self.substream.send(Cursor::new(encoded_msg)).await?;
async fn send(mut self, msg: HopMessage) -> Result<(), UpgradeError> {
self.substream.send(msg).await?;
self.substream.flush().await?;
self.substream.close().await?;

Expand All @@ -201,15 +185,15 @@ impl ReservationReq {

pub struct CircuitReq {
dst: PeerId,
substream: Framed<NegotiatedSubstream, UviBytes<Cursor<Vec<u8>>>>,
substream: Framed<NegotiatedSubstream, prost_codec::Codec<HopMessage>>,
}

impl CircuitReq {
pub fn dst(&self) -> PeerId {
self.dst
}

pub async fn accept(mut self) -> Result<(NegotiatedSubstream, Bytes), std::io::Error> {
pub async fn accept(mut self) -> Result<(NegotiatedSubstream, Bytes), UpgradeError> {
let msg = HopMessage {
r#type: hop_message::Type::Status.into(),
peer: None,
Expand All @@ -234,7 +218,7 @@ impl CircuitReq {
Ok((io, read_buffer.freeze()))
}

pub async fn deny(mut self, status: Status) -> Result<(), std::io::Error> {
pub async fn deny(mut self, status: Status) -> Result<(), UpgradeError> {
let msg = HopMessage {
r#type: hop_message::Type::Status.into(),
peer: None,
Expand All @@ -243,14 +227,11 @@ impl CircuitReq {
status: Some(status.into()),
};
self.send(msg).await?;
self.substream.close().await
self.substream.close().await.map_err(Into::into)
}

async fn send(&mut self, msg: HopMessage) -> Result<(), std::io::Error> {
let mut encoded_msg = Vec::with_capacity(msg.encoded_len());
msg.encode(&mut encoded_msg)
.expect("Vec to have sufficient capacity.");
self.substream.send(Cursor::new(encoded_msg)).await?;
async fn send(&mut self, msg: HopMessage) -> Result<(), prost_codec::Error> {
self.substream.send(msg).await?;
self.substream.flush().await?;

Ok(())
Expand Down
Loading