Skip to content

Commit

Permalink
{core,swarm}: Remove Network abstraction (#2492)
Browse files Browse the repository at this point in the history
This commit removes the `Network` abstraction, thus managing `Listeners`
and the connection `Pool` in `Swarm` directly. This is done under the
assumption that noone uses the `Network` abstraction directly, but
instead everyone always uses it through `Swarm`. Both `Listeners` and
`Pool` are moved from `libp2p-core` into `libp2p-swarm`. Given that they
are no longer exposed via `Network`, they can be treated as an
implementation detail of `libp2p-swarm` and `Swarm`.

This change does not include any behavioural changes.

This change has the followin benefits:

- Removal of `NetworkEvent`, which was mostly an isomorphism of
  `SwarmEvent`.
- Removal of the never-directly-used `Network` abstraction.
- Removal of now obsolete verbose `Peer` (`core/src/network/peer.rs`)
  construct.
- Removal of `libp2p-core` `DialOpts`, which is a direct mapping of
  `libp2p-swarm` `DialOpts`.
- Allowing breaking changes to the connection handling and `Swarm` API
  interface without a breaking change in `libp2p-core` and thus a
  without a breaking change in `/transport` protocols.

This change enables the following potential future changes:

- Removal of `NodeHandler` and `ConnectionHandler`. Thus allowing to
  rename `ProtocolsHandler` into `ConnectionHandler`.
- Moving `NetworkBehaviour` and `ProtocolsHandler` into `libp2p-core`,
  having `libp2p-xxx` protocol crates only depend on `libp2p-core` and
  thus allowing general breaking changes to `Swarm` without breaking all
  `libp2p-xxx` crates.
  • Loading branch information
mxinden committed Feb 13, 2022
1 parent 861e15d commit 7fc342e
Show file tree
Hide file tree
Showing 29 changed files with 1,324 additions and 3,203 deletions.
5 changes: 5 additions & 0 deletions core/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# 0.32.0 [unreleased]

- Remove `Network`. `libp2p-core` is from now on an auxiliary crate only. Users
that have previously used `Network` only, will need to use `Swarm` instead. See
[PR 2492].

- Update to `multiaddr` `v0.14.0`.

- Update to `multihash` `v0.16.0`.
Expand All @@ -10,6 +14,7 @@

[PR 2456]: https://github.com/libp2p/rust-libp2p/pull/2456
[RUSTSEC-2022-0009]: https://rustsec.org/advisories/RUSTSEC-2022-0009.html
[PR 2492]: https://github.com/libp2p/rust-libp2p/pull/2492

# 0.31.0 [2022-01-27]

Expand Down
4 changes: 2 additions & 2 deletions core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,11 @@ criterion = "0.3"
libp2p-mplex = { path = "../muxers/mplex" }
libp2p-noise = { path = "../transports/noise" }
libp2p-tcp = { path = "../transports/tcp" }
serde_json = "1.0"
rmp-serde = "1.0"
multihash = { version = "0.16", default-features = false, features = ["arb"] }
quickcheck = "0.9.0"
rand07 = { package = "rand", version = "0.7" }
rmp-serde = "1.0"
serde_json = "1.0"

[build-dependencies]
prost-build = "0.9"
Expand Down
229 changes: 28 additions & 201 deletions core/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,29 +18,7 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

mod error;
pub(crate) mod handler;
mod listeners;
mod substream;

pub(crate) mod pool;

pub use error::{
ConnectionError, PendingConnectionError, PendingInboundConnectionError,
PendingOutboundConnectionError,
};
pub use handler::{ConnectionHandler, ConnectionHandlerEvent, IntoConnectionHandler};
pub use listeners::{ListenerId, ListenersEvent, ListenersStream};
pub use pool::{ConnectionCounters, ConnectionLimits};
pub use pool::{EstablishedConnection, EstablishedConnectionIter, PendingConnection};
pub use substream::{Close, Substream, SubstreamEndpoint};

use crate::multiaddr::{Multiaddr, Protocol};
use crate::muxing::StreamMuxer;
use crate::PeerId;
use std::hash::Hash;
use std::{error::Error, fmt, pin::Pin, task::Context, task::Poll};
use substream::{Muxing, SubstreamEvent};

/// Connection identifier.
#[derive(Debug, Copy, Clone, Hash, PartialEq, Eq, PartialOrd, Ord)]
Expand All @@ -53,7 +31,34 @@ impl ConnectionId {
/// in test environments. There is in general no guarantee
/// that all connection IDs are based on non-negative integers.
pub fn new(id: usize) -> Self {
ConnectionId(id)
Self(id)
}
}

impl std::ops::Add<usize> for ConnectionId {
type Output = Self;

fn add(self, other: usize) -> Self {
Self(self.0 + other)
}
}

/// The ID of a single listener.
#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash, PartialOrd, Ord)]
pub struct ListenerId(u64);

impl ListenerId {
/// Creates a `ListenerId` from a non-negative integer.
pub fn new(id: u64) -> Self {
Self(id)
}
}

impl std::ops::Add<u64> for ListenerId {
type Output = Self;

fn add(self, other: u64) -> Self {
Self(self.0 + other)
}
}

Expand Down Expand Up @@ -236,181 +241,3 @@ impl ConnectedPoint {
}
}
}

/// Information about a successfully established connection.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct Connected {
/// The connected endpoint, including network address information.
pub endpoint: ConnectedPoint,
/// Information obtained from the transport.
pub peer_id: PeerId,
}

/// Event generated by a [`Connection`].
#[derive(Debug, Clone)]
pub enum Event<T> {
/// Event generated by the [`ConnectionHandler`].
Handler(T),
/// Address of the remote has changed.
AddressChange(Multiaddr),
}

/// A multiplexed connection to a peer with an associated `ConnectionHandler`.
pub struct Connection<TMuxer, THandler>
where
TMuxer: StreamMuxer,
THandler: ConnectionHandler<Substream = Substream<TMuxer>>,
{
/// Node that handles the muxing.
muxing: substream::Muxing<TMuxer, THandler::OutboundOpenInfo>,
/// Handler that processes substreams.
handler: THandler,
}

impl<TMuxer, THandler> fmt::Debug for Connection<TMuxer, THandler>
where
TMuxer: StreamMuxer,
THandler: ConnectionHandler<Substream = Substream<TMuxer>> + fmt::Debug,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Connection")
.field("muxing", &self.muxing)
.field("handler", &self.handler)
.finish()
}
}

impl<TMuxer, THandler> Unpin for Connection<TMuxer, THandler>
where
TMuxer: StreamMuxer,
THandler: ConnectionHandler<Substream = Substream<TMuxer>>,
{
}

impl<TMuxer, THandler> Connection<TMuxer, THandler>
where
TMuxer: StreamMuxer,
THandler: ConnectionHandler<Substream = Substream<TMuxer>>,
{
/// Builds a new `Connection` from the given substream multiplexer
/// and connection handler.
pub fn new(muxer: TMuxer, handler: THandler) -> Self {
Connection {
muxing: Muxing::new(muxer),
handler,
}
}

/// Returns a reference to the `ConnectionHandler`
pub fn handler(&self) -> &THandler {
&self.handler
}

/// Returns a mutable reference to the `ConnectionHandler`
pub fn handler_mut(&mut self) -> &mut THandler {
&mut self.handler
}

/// Notifies the connection handler of an event.
pub fn inject_event(&mut self, event: THandler::InEvent) {
self.handler.inject_event(event);
}

/// Begins an orderly shutdown of the connection, returning the connection
/// handler and a `Future` that resolves when connection shutdown is complete.
pub fn close(self) -> (THandler, Close<TMuxer>) {
(self.handler, self.muxing.close().0)
}

/// Polls the connection for events produced by the associated handler
/// as a result of I/O activity on the substream multiplexer.
pub fn poll(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<Event<THandler::OutEvent>, ConnectionError<THandler::Error>>> {
loop {
let mut io_pending = false;

// Perform I/O on the connection through the muxer, informing the handler
// of new substreams.
match self.muxing.poll(cx) {
Poll::Pending => io_pending = true,
Poll::Ready(Ok(SubstreamEvent::InboundSubstream { substream })) => self
.handler
.inject_substream(substream, SubstreamEndpoint::Listener),
Poll::Ready(Ok(SubstreamEvent::OutboundSubstream {
user_data,
substream,
})) => {
let endpoint = SubstreamEndpoint::Dialer(user_data);
self.handler.inject_substream(substream, endpoint)
}
Poll::Ready(Ok(SubstreamEvent::AddressChange(address))) => {
self.handler.inject_address_change(&address);
return Poll::Ready(Ok(Event::AddressChange(address)));
}
Poll::Ready(Err(err)) => return Poll::Ready(Err(ConnectionError::IO(err))),
}

// Poll the handler for new events.
match self.handler.poll(cx) {
Poll::Pending => {
if io_pending {
return Poll::Pending; // Nothing to do
}
}
Poll::Ready(Ok(ConnectionHandlerEvent::OutboundSubstreamRequest(user_data))) => {
self.muxing.open_substream(user_data);
}
Poll::Ready(Ok(ConnectionHandlerEvent::Custom(event))) => {
return Poll::Ready(Ok(Event::Handler(event)));
}
Poll::Ready(Err(err)) => return Poll::Ready(Err(ConnectionError::Handler(err))),
}
}
}
}

/// Borrowed information about an incoming connection currently being negotiated.
#[derive(Debug, Copy, Clone)]
pub struct IncomingInfo<'a> {
/// Local connection address.
pub local_addr: &'a Multiaddr,
/// Address used to send back data to the remote.
pub send_back_addr: &'a Multiaddr,
}

impl<'a> IncomingInfo<'a> {
/// Builds the [`PendingPoint`] corresponding to the incoming connection.
pub fn to_pending_point(&self) -> PendingPoint {
PendingPoint::Listener {
local_addr: self.local_addr.clone(),
send_back_addr: self.send_back_addr.clone(),
}
}
/// Builds the [`ConnectedPoint`] corresponding to the incoming connection.
pub fn to_connected_point(&self) -> ConnectedPoint {
ConnectedPoint::Listener {
local_addr: self.local_addr.clone(),
send_back_addr: self.send_back_addr.clone(),
}
}
}

/// Information about a connection limit.
#[derive(Debug, Clone)]
pub struct ConnectionLimit {
/// The maximum number of connections.
pub limit: u32,
/// The current number of connections.
pub current: u32,
}

impl fmt::Display for ConnectionLimit {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}/{}", self.current, self.limit)
}
}

/// A `ConnectionLimit` can represent an error if it has been exceeded.
impl Error for ConnectionLimit {}
4 changes: 1 addition & 3 deletions core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,18 +61,16 @@ pub mod connection;
pub mod either;
pub mod identity;
pub mod muxing;
pub mod network;
pub mod peer_record;
pub mod signed_envelope;
pub mod transport;
pub mod upgrade;

pub use connection::{Connected, ConnectedPoint, Endpoint};
pub use connection::{ConnectedPoint, Endpoint};
pub use identity::PublicKey;
pub use multiaddr::Multiaddr;
pub use multihash;
pub use muxing::StreamMuxer;
pub use network::{DialOpts, Network};
pub use peer_id::PeerId;
pub use peer_record::PeerRecord;
pub use signed_envelope::SignedEnvelope;
Expand Down
Loading

0 comments on commit 7fc342e

Please sign in to comment.