Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

{core/,swarm/}: Dial with handler and return handler on error and closed #2191

Merged
merged 43 commits into from
Aug 31, 2021
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
a332591
core/: Return handler on connection error and closed
mxinden Aug 12, 2021
46904a6
swarm/: Inject handler on connection error and closed
mxinden Aug 12, 2021
d756be1
swarm/src/behaviour: Provide handler with Dial and DialAddr
mxinden Aug 12, 2021
595623f
Merge branch 'libp2p/master' into handler
mxinden Aug 18, 2021
705842f
swarm/src/behaviour: Add default trait para on NetworkBehaviourAction
mxinden Aug 18, 2021
d744b4a
core/src/connection/manager: Fully close a task on disconnect
mxinden Aug 18, 2021
5c2aef6
core/: Remove DisconnectedPeer::set_connected and Pool::add
mxinden Aug 18, 2021
ce0d278
core/src/connection: Report ConnectionLimit through task
mxinden Aug 18, 2021
425e777
core/: Emit Event::Failed on aborted pending connection
mxinden Aug 19, 2021
5ff6397
core/tests: Adjust to type changes
mxinden Aug 19, 2021
7d9285f
core/CHANGELOG: Add entry for ConnectionLimit change
mxinden Aug 19, 2021
682f6be
protocols/*: Update
mxinden Aug 19, 2021
9262c03
Merge branch 'libp2p/master' into handler
mxinden Aug 19, 2021
62c5e13
protocols/*: Update
mxinden Aug 19, 2021
174693a
swarm-derive: Adjust to changes
mxinden Aug 20, 2021
a78de13
core/: Fix ConectionClose and PendingAborted reporting
mxinden Aug 20, 2021
d4960b7
*: Format with rustfmt
mxinden Aug 20, 2021
b73139a
core/src/connection: Remove outdated doc comment
mxinden Aug 20, 2021
aa02e5f
swarm/src/toggle: Fix TODOs
mxinden Aug 20, 2021
2c9f0d3
protocols/: Remove unused imports
mxinden Aug 23, 2021
6d7c73a
Merge branch 'libp2p/master' into handler
mxinden Aug 23, 2021
a56980e
core/src/network/event: Use NoZeroU32
mxinden Aug 24, 2021
1c3ed2e
swarm/src/protocols_handler: Rename to into_protocols_handler
mxinden Aug 24, 2021
32fc84e
swarm/src/behaviour: Introduce NetworkBehaviour::inject_listen_failure
mxinden Aug 24, 2021
7ea4908
swarm/src/lib: Inject handler on DialPeerCondition false
mxinden Aug 24, 2021
fbd4681
core/src/connection: Assume manager to always close handler
mxinden Aug 25, 2021
6787e77
swarm-derive: Add comments
mxinden Aug 25, 2021
b864133
swarm: Add documentation
mxinden Aug 25, 2021
b2bf380
*: Format with rustfmt
mxinden Aug 25, 2021
7d342b6
swar/src/behaviour: Link to NotifyHandler not SendEvent
mxinden Aug 25, 2021
5853890
*: Update changelogs
mxinden Aug 25, 2021
4d0faf9
swarm-derive: Fix typo
mxinden Aug 25, 2021
7a45e7b
Apply suggestions from code review
mxinden Aug 26, 2021
cef949c
core/src/network: Revert map_err
mxinden Aug 26, 2021
ceb77e5
core/src/network: Use custom method on DialAttemptsRemaining
mxinden Aug 26, 2021
814ff4b
swarm: Add doc example for carrying state in handler
mxinden Aug 26, 2021
90df72a
Merge branch 'libp2p/master' into handler
mxinden Aug 26, 2021
60c7261
swarm/src/lib: Remove use_handler_to_carry_state
mxinden Aug 30, 2021
a2f1819
core/tests/network_dial_error: Use get_attempts
mxinden Aug 30, 2021
b905545
swarm/src/behaviour.rs: Use heading for doc example
mxinden Aug 30, 2021
99f81d0
core/tests: Format with rustfmt
mxinden Aug 30, 2021
1c5eb8e
Merge branch 'master' into handler
mxinden Aug 30, 2021
c09198e
protocols/gossipsub/src/behaviour.rs: Remove unnecesary assignment
mxinden Aug 31, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 9 additions & 4 deletions core/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -518,10 +518,15 @@ where
// Ensure the address to dial encapsulates the `p2p` protocol for the
// targeted peer, so that the transport has a "fully qualified" address
// to work with.
let addr = p2p_addr(opts.peer, opts.address).map_err(|address| DialError::InvalidAddress {
address,
handler: opts.handler,
})?;
let addr = match p2p_addr(opts.peer, opts.address) {
Ok(address) => address,
Err(address) => {
return Err(DialError::InvalidAddress {
address,
handler: opts.handler,
})
}
};
mxinden marked this conversation as resolved.
Show resolved Hide resolved

let result = match transport.dial(addr.clone()) {
Ok(fut) => {
Expand Down
8 changes: 4 additions & 4 deletions core/src/network/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,9 +178,9 @@ pub enum DialAttemptsRemaining<THandler> {
None(THandler),
}

impl<THandler> From<&DialAttemptsRemaining<THandler>> for u32 {
fn from(attempts_remaining: &DialAttemptsRemaining<THandler>) -> Self {
match attempts_remaining {
impl<THandler> DialAttemptsRemaining<THandler> {
fn get_attempts(&self) -> u32 {
match self {
DialAttemptsRemaining::Some(attempts) => (*attempts).into(),
DialAttemptsRemaining::None(_) => 0,
}
Expand Down Expand Up @@ -268,7 +268,7 @@ where
error,
} => f
.debug_struct("DialError")
.field("attempts_remaining", &Into::<u32>::into(attempts_remaining))
.field("attempts_remaining", &attempts_remaining.get_attempts())
.field("peer_id", peer_id)
.field("multiaddr", multiaddr)
.field("error", error)
Expand Down
3 changes: 1 addition & 2 deletions swarm/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ wasm-timer = "0.2"
void = "1"

[dev-dependencies]
libp2p-mplex = { path = "../muxers/mplex" }
libp2p-noise = { path = "../transports/noise" }
libp2p = { path = "../", default-features = false, features = ["yamux", "plaintext"] }
quickcheck = "0.9.0"
rand = "0.7.2"
198 changes: 186 additions & 12 deletions swarm/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -282,16 +282,17 @@ pub enum NetworkBehaviourAction<
///
/// On success, [`NetworkBehaviour::inject_connection_established`] is invoked.
/// On failure, [`NetworkBehaviour::inject_dial_failure`] is invoked.
///
/// Note that the provided handler is returned to the [`NetworkBehaviour`] on connection failure
/// and connection closing. Thus it can be used to carry state, which otherwise would have to be
/// tracked in the [`NetworkBehaviour`] itself. E.g. a message destined to an unconnected peer
/// can be included in the handler, and thus directly send on connection success or extracted by
/// the [`NetworkBehaviour`] on connection failure. See [`NetworkBehaviourAction::DialPeer`] for
/// example.
DialAddress {
/// The address to dial.
address: Multiaddr,
/// The handler to be used to handle the connection to the peer.
///
/// Note that the handler is returned to the [`NetworkBehaviour`] on connection failure and
/// connection closing. Thus it can be used to carry state, which otherwise would have to be
/// tracked in the [`NetworkBehaviour`] itself. E.g. a message destined to an unconnected
/// peer can be included in the handler, and thus directly send on connection success or
/// extracted by the [`NetworkBehaviour`] on connection failure.
handler: THandler,
},

Expand All @@ -305,18 +306,191 @@ pub enum NetworkBehaviourAction<
///
/// On success, [`NetworkBehaviour::inject_connection_established`] is invoked.
/// On failure, [`NetworkBehaviour::inject_dial_failure`] is invoked.
///
/// Note that the provided handler is returned to the [`NetworkBehaviour`] on connection failure
/// and connection closing. Thus it can be used to carry state, which otherwise would have to be
/// tracked in the [`NetworkBehaviour`] itself. E.g. a message destined to an unconnected peer
/// can be included in the handler, and thus directly send on connection success or extracted by
/// the [`NetworkBehaviour`] on connection failure.
///
/// Example showcasing usage of handler to carry state:
mxinden marked this conversation as resolved.
Show resolved Hide resolved
///
/// ```rust
/// # use futures::executor::block_on;
/// # use futures::stream::StreamExt;
/// # use libp2p::core::connection::ConnectionId;
/// # use libp2p::core::identity;
/// # use libp2p::core::transport::{MemoryTransport, Transport};
/// # use libp2p::core::upgrade::{self, DeniedUpgrade, InboundUpgrade, OutboundUpgrade};
/// # use libp2p::core::PeerId;
/// # use libp2p::plaintext::PlainText2Config;
/// # use libp2p::swarm::{
/// # DialError, DialPeerCondition, IntoProtocolsHandler, KeepAlive, NegotiatedSubstream,
/// # NetworkBehaviour, NetworkBehaviourAction, PollParameters, ProtocolsHandler,
/// # ProtocolsHandlerEvent, ProtocolsHandlerUpgrErr, SubstreamProtocol, Swarm, SwarmEvent,
/// # };
/// # use libp2p::yamux;
/// # use std::collections::VecDeque;
/// # use std::task::{Context, Poll};
/// # use void::Void;
/// #
/// # let local_key = identity::Keypair::generate_ed25519();
/// # let local_public_key = local_key.public();
/// # let local_peer_id = PeerId::from(local_public_key.clone());
/// #
/// # let transport = MemoryTransport::default()
/// # .upgrade(upgrade::Version::V1)
/// # .authenticate(PlainText2Config { local_public_key })
/// # .multiplex(yamux::YamuxConfig::default())
/// # .boxed();
/// #
/// # let mut swarm = Swarm::new(transport, MyBehaviour::default(), local_peer_id);
/// #
/// // Super precious message that we should better not lose.
/// let message = PreciousMessage("My precious message".to_string());
///
/// // Unfortunately this peer is offline, thus sending our message to it will fail.
/// let offline_peer = PeerId::random();
///
/// // Let's send it anyways. We should get it back in case connecting to the peer fails.
/// swarm.behaviour_mut().send(offline_peer, message);
///
/// block_on(async {
/// // As expected, sending failed. But great news, we got our message back.
/// matches!(
/// swarm.next().await.expect("Infinite stream"),
/// SwarmEvent::Behaviour(PreciousMessage(_))
/// );
/// });
///
/// # #[derive(Default)]
/// # struct MyBehaviour {
/// # outbox_to_swarm: VecDeque<NetworkBehaviourAction<PreciousMessage, MyHandler>>,
/// # }
/// #
/// # impl MyBehaviour {
/// # fn send(&mut self, peer_id: PeerId, msg: PreciousMessage) {
/// # self.outbox_to_swarm
/// # .push_back(NetworkBehaviourAction::DialPeer {
/// # peer_id,
/// # condition: DialPeerCondition::Always,
/// # handler: MyHandler { message: Some(msg) },
/// # });
/// # }
/// # }
/// #
/// impl NetworkBehaviour for MyBehaviour {
/// # type ProtocolsHandler = MyHandler;
/// # type OutEvent = PreciousMessage;
/// #
/// # fn new_handler(&mut self) -> Self::ProtocolsHandler {
/// # MyHandler { message: None }
/// # }
/// #
/// #
/// # fn inject_event(
/// # &mut self,
/// # _: PeerId,
/// # _: ConnectionId,
/// # _: <<Self::ProtocolsHandler as IntoProtocolsHandler>::Handler as ProtocolsHandler>::OutEvent,
/// # ) {
/// # unreachable!();
/// # }
/// #
/// fn inject_dial_failure(
/// &mut self,
/// _: &PeerId,
/// handler: Self::ProtocolsHandler,
/// _: DialError,
/// ) {
/// // As expected, sending the message failed. But lucky us, we got the handler back, thus
/// // the precious message is not lost and we can return it back to the user.
/// let msg = handler.message.unwrap();
/// self.outbox_to_swarm
/// .push_back(NetworkBehaviourAction::GenerateEvent(msg))
/// }
/// #
/// # fn poll(
/// # &mut self,
/// # _: &mut Context<'_>,
/// # _: &mut impl PollParameters,
/// # ) -> Poll<NetworkBehaviourAction<Self::OutEvent, Self::ProtocolsHandler>> {
/// # if let Some(action) = self.outbox_to_swarm.pop_front() {
/// # return Poll::Ready(action);
/// # }
/// # Poll::Pending
/// # }
/// }
///
/// # struct MyHandler {
/// # message: Option<PreciousMessage>,
/// # }
/// #
/// # impl ProtocolsHandler for MyHandler {
/// # type InEvent = Void;
/// # type OutEvent = Void;
/// # type Error = Void;
/// # type InboundProtocol = DeniedUpgrade;
/// # type OutboundProtocol = DeniedUpgrade;
/// # type InboundOpenInfo = ();
/// # type OutboundOpenInfo = Void;
/// #
/// # fn listen_protocol(
/// # &self,
/// # ) -> SubstreamProtocol<Self::InboundProtocol, Self::InboundOpenInfo> {
/// # SubstreamProtocol::new(DeniedUpgrade, ())
/// # }
/// #
/// # fn inject_fully_negotiated_inbound(
/// # &mut self,
/// # _: <Self::InboundProtocol as InboundUpgrade<NegotiatedSubstream>>::Output,
/// # _: Self::InboundOpenInfo,
/// # ) {
/// # }
/// #
/// # fn inject_fully_negotiated_outbound(
/// # &mut self,
/// # _: <Self::OutboundProtocol as OutboundUpgrade<NegotiatedSubstream>>::Output,
/// # _: Self::OutboundOpenInfo,
/// # ) {
/// # }
/// #
/// # fn inject_event(&mut self, _event: Self::InEvent) {}
/// #
/// # fn inject_dial_upgrade_error(
/// # &mut self,
/// # _: Self::OutboundOpenInfo,
/// # _: ProtocolsHandlerUpgrErr<Void>,
/// # ) {
/// # }
/// #
/// # fn connection_keep_alive(&self) -> KeepAlive {
/// # KeepAlive::Yes
/// # }
/// #
/// # fn poll(
/// # &mut self,
/// # _: &mut Context<'_>,
/// # ) -> Poll<
/// # ProtocolsHandlerEvent<
/// # Self::OutboundProtocol,
/// # Self::OutboundOpenInfo,
/// # Self::OutEvent,
/// # Self::Error,
/// # >,
/// # > {
/// # todo!("If `Self::message.is_some()` send the message to the remote.")
/// # }
/// # }
/// # #[derive(Debug, PartialEq, Eq)]
/// # struct PreciousMessage(String);
/// ```
DialPeer {
/// The peer to try reach.
peer_id: PeerId,
/// The condition for initiating a new dialing attempt.
condition: DialPeerCondition,
/// The handler to be used to handle the connection to the peer.
///
/// Note that the handler is returned to the [`NetworkBehaviour`] on connection failure and
/// connection closing. Thus it can be used to carry state, which otherwise would have to be
/// tracked in the [`NetworkBehaviour`] itself. E.g. a message destined to an unconnected
/// peer can be included in the handler, and thus directly send on connection success or
/// extracted by the [`NetworkBehaviour`] on connection failure.
handler: THandler,
},

Expand Down
23 changes: 14 additions & 9 deletions swarm/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1326,8 +1326,9 @@ mod tests {
use crate::protocols_handler::DummyProtocolsHandler;
use crate::test::{CallTraceBehaviour, MockBehaviour};
use futures::{executor, future};
use libp2p_core::{identity, multiaddr, transport, upgrade};
use libp2p_noise as noise;
use libp2p::core::{identity, multiaddr, transport, upgrade};
use libp2p::plaintext;
use libp2p::yamux;

// Test execution state.
// Connection => Disconnecting => Connecting.
Expand All @@ -1343,17 +1344,16 @@ mod tests {
O: Send + 'static,
{
let id_keys = identity::Keypair::generate_ed25519();
let pubkey = id_keys.public();
let noise_keys = noise::Keypair::<noise::X25519Spec>::new()
.into_authentic(&id_keys)
.unwrap();
let local_public_key = id_keys.public();
let transport = transport::MemoryTransport::default()
.upgrade(upgrade::Version::V1)
.authenticate(noise::NoiseConfig::xx(noise_keys).into_authenticated())
.multiplex(libp2p_mplex::MplexConfig::new())
.authenticate(plaintext::PlainText2Config {
local_public_key: local_public_key.clone(),
})
.multiplex(yamux::YamuxConfig::default())
.boxed();
let behaviour = CallTraceBehaviour::new(MockBehaviour::new(handler_proto));
SwarmBuilder::new(transport, behaviour, pubkey.into()).build()
SwarmBuilder::new(transport, behaviour, local_public_key.into()).build()
}

fn swarms_connected<TBehaviour>(
Expand Down Expand Up @@ -1704,4 +1704,9 @@ mod tests {
}
}))
}

/// [`NetworkBehaviourAction::DialAddress`] and [`NetworkBehaviourAction::DialPeer`] require a
/// handler. This handler can be used to carry state. See corresponding doc comments.
#[test]
fn use_handler_to_carry_state() {}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is that a left-over from adding a dedicated test but then resorting to a doc-test?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ups. Thanks for the pointer. Removed in 60c7261.

}