Skip to content

Commit

Permalink
feat(relay): hide internals of Connection
Browse files Browse the repository at this point in the history
Relayed connections to other peers are created from streams to the relay itself. Internally, such a connection has different states. These however are not relevant to the user and should be encapsulated to allow for more backwards-compatible changes. The only interface exposed is `AsyncRead` and `AsyncWrite`.

Resolves: #3255.

Pull-Request: #3829.
  • Loading branch information
tcoratger committed May 2, 2023
1 parent 30d2c75 commit c728824
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 36 deletions.
4 changes: 4 additions & 0 deletions protocols/relay/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,11 @@
- Raise MSRV to 1.65.
See [PR 3715].

- Hide internals of `Connection` and expose only `AsyncRead` and `AsyncWrite`.
See [PR 3829].

[PR 3715]: https://github.com/libp2p/rust-libp2p/pull/3715
[PR 3829]: https://github.com/libp2p/rust-libp2p/pull/3829

## 0.15.2

Expand Down
78 changes: 49 additions & 29 deletions protocols/relay/src/priv_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ use libp2p_swarm::{
};
use std::collections::{hash_map, HashMap, VecDeque};
use std::io::{Error, ErrorKind, IoSlice};
use std::ops::DerefMut;
use std::pin::Pin;
use std::task::{Context, Poll};
use transport::Transport;
Expand Down Expand Up @@ -387,32 +386,43 @@ impl NetworkBehaviour for Behaviour {
}
}

/// A [`NegotiatedSubstream`] acting as a [`Connection`].
pub enum Connection {
/// Represents a connection to another peer via a relay.
///
/// Internally, this uses a stream to the relay.
pub struct Connection {
state: ConnectionState,
}

enum ConnectionState {
InboundAccepting {
accept: BoxFuture<'static, Result<Connection, Error>>,
accept: BoxFuture<'static, Result<ConnectionState, Error>>,
},
Operational {
read_buffer: Bytes,
substream: NegotiatedSubstream,
/// "Drop notifier" pattern to signal to the transport that the connection has been dropped.
///
/// This is flagged as "dead-code" by the compiler because we never read from it here.
/// However, it is actual use is to trigger the `Canceled` error in the `Transport` when this `Sender` is dropped.
#[allow(dead_code)]
drop_notifier: oneshot::Sender<void::Void>,
},
}

impl Unpin for Connection {}
impl Unpin for ConnectionState {}

impl Connection {
impl ConnectionState {
pub(crate) fn new_inbound(
circuit: inbound_stop::Circuit,
drop_notifier: oneshot::Sender<void::Void>,
) -> Self {
Connection::InboundAccepting {
ConnectionState::InboundAccepting {
accept: async {
let (substream, read_buffer) = circuit
.accept()
.await
.map_err(|e| Error::new(ErrorKind::Other, e))?;
Ok(Connection::Operational {
Ok(ConnectionState::Operational {
read_buffer,
substream,
drop_notifier,
Expand All @@ -427,7 +437,7 @@ impl Connection {
read_buffer: Bytes,
drop_notifier: oneshot::Sender<void::Void>,
) -> Self {
Connection::Operational {
ConnectionState::Operational {
substream,
read_buffer,
drop_notifier,
Expand All @@ -442,35 +452,41 @@ impl AsyncWrite for Connection {
buf: &[u8],
) -> Poll<Result<usize, Error>> {
loop {
match self.deref_mut() {
Connection::InboundAccepting { accept } => {
*self = ready!(accept.poll_unpin(cx))?;
match &mut self.state {
ConnectionState::InboundAccepting { accept } => {
*self = Connection {
state: ready!(accept.poll_unpin(cx))?,
};
}
Connection::Operational { substream, .. } => {
ConnectionState::Operational { substream, .. } => {
return Pin::new(substream).poll_write(cx, buf);
}
}
}
}
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Error>> {
loop {
match self.deref_mut() {
Connection::InboundAccepting { accept } => {
*self = ready!(accept.poll_unpin(cx))?;
match &mut self.state {
ConnectionState::InboundAccepting { accept } => {
*self = Connection {
state: ready!(accept.poll_unpin(cx))?,
};
}
Connection::Operational { substream, .. } => {
ConnectionState::Operational { substream, .. } => {
return Pin::new(substream).poll_flush(cx);
}
}
}
}
fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Error>> {
loop {
match self.deref_mut() {
Connection::InboundAccepting { accept } => {
*self = ready!(accept.poll_unpin(cx))?;
match &mut self.state {
ConnectionState::InboundAccepting { accept } => {
*self = Connection {
state: ready!(accept.poll_unpin(cx))?,
};
}
Connection::Operational { substream, .. } => {
ConnectionState::Operational { substream, .. } => {
return Pin::new(substream).poll_close(cx);
}
}
Expand All @@ -483,11 +499,13 @@ impl AsyncWrite for Connection {
bufs: &[IoSlice],
) -> Poll<Result<usize, Error>> {
loop {
match self.deref_mut() {
Connection::InboundAccepting { accept } => {
*self = ready!(accept.poll_unpin(cx))?;
match &mut self.state {
ConnectionState::InboundAccepting { accept } => {
*self = Connection {
state: ready!(accept.poll_unpin(cx))?,
};
}
Connection::Operational { substream, .. } => {
ConnectionState::Operational { substream, .. } => {
return Pin::new(substream).poll_write_vectored(cx, bufs);
}
}
Expand All @@ -502,11 +520,13 @@ impl AsyncRead for Connection {
buf: &mut [u8],
) -> Poll<Result<usize, Error>> {
loop {
match self.deref_mut() {
Connection::InboundAccepting { accept } => {
*self = ready!(accept.poll_unpin(cx))?;
match &mut self.state {
ConnectionState::InboundAccepting { accept } => {
*self = Connection {
state: ready!(accept.poll_unpin(cx))?,
};
}
Connection::Operational {
ConnectionState::Operational {
read_buffer,
substream,
..
Expand Down
13 changes: 6 additions & 7 deletions protocols/relay/src/priv_client/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -189,10 +189,11 @@ impl Handler {

let (tx, rx) = oneshot::channel();
self.alive_lend_out_substreams.push(rx);
let connection = super::Connection::new_inbound(inbound_circuit, tx);
let connection = super::ConnectionState::new_inbound(inbound_circuit, tx);

pending_msgs.push_back(transport::ToListenerMsg::IncomingRelayedConnection {
stream: connection,
// stream: connection,
stream: super::Connection { state: connection },
src_peer_id,
relay_peer_id: self.remote_peer_id,
relay_addr: self.remote_addr.clone(),
Expand Down Expand Up @@ -271,11 +272,9 @@ impl Handler {
OutboundOpenInfo::Connect { send_back },
) => {
let (tx, rx) = oneshot::channel();
match send_back.send(Ok(super::Connection::new_outbound(
substream,
read_buffer,
tx,
))) {
match send_back.send(Ok(super::Connection {
state: super::ConnectionState::new_outbound(substream, read_buffer, tx),
})) {
Ok(()) => {
self.alive_lend_out_substreams.push(rx);
self.queued_events.push_back(ConnectionHandlerEvent::Custom(
Expand Down

0 comments on commit c728824

Please sign in to comment.