Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(swarm): allow behaviours to share addresses of peers #4371

Merged
merged 28 commits into from
Jan 24, 2024
Merged
Show file tree
Hide file tree
Changes from 20 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
1026947
swarm: Add NewExternalAddrOfPeer
StemCll Nov 28, 2023
c6a323c
deprecate request-response add_address
StemCll Nov 29, 2023
03fd927
swarm: add peer addresses
StemCll Nov 29, 2023
6110643
Move peer cache to peer addresses
StemCll Dec 10, 2023
3f227c7
Add pop function to peer address
StemCll Dec 10, 2023
959a131
Add CR changes
StemCll Dec 11, 2023
c306434
Remove public get_mut from peer addresses
StemCll Dec 14, 2023
6619f0d
identify: emit when identified new peer; add changelog entries and v…
StemCll Dec 17, 2023
1bda774
Revert version changes
StemCll Dec 25, 2023
32076ae
Extend swarm changelog entry; fix peer_addresses put logic
StemCll Dec 26, 2023
2277bd6
Add additional test for remove
StemCll Dec 26, 2023
8e1dc1e
Add connection test to request response
StemCll Dec 27, 2023
056d51e
Rever toml file format changes
StemCll Dec 27, 2023
b8f8bc5
Documentation fixes
thomaseizinger Dec 27, 2023
f5f162c
Update protocols/identify/CHANGELOG.md
thomaseizinger Dec 27, 2023
ff81f51
Revert autonat version bump
StemCll Dec 27, 2023
496c185
Rename put to add in PeerAddresses
StemCll Jan 1, 2024
db18158
Don't filter new external addresses
StemCll Jan 1, 2024
7fa87fb
Fix clippy warn
StemCll Jan 1, 2024
af0a91c
Merge branch 'master' into feat/swarm/report_remote_address
StemCll Jan 2, 2024
41b533e
Merge branch 'master' into feat/swarm/report_remote_address
StemCll Jan 17, 2024
31723f2
Review suggestions resolved
StemCll Jan 18, 2024
5059d51
Merge branch 'master' into feat/swarm/report_remote_address
StemCll Jan 20, 2024
da6294b
Re-work `PeerAddresses` to be limited in how many addresses we store
thomaseizinger Jan 21, 2024
6fae501
Allow new event in dctur test
thomaseizinger Jan 21, 2024
1866abd
Fix clippy warnings
thomaseizinger Jan 21, 2024
b1bedf7
Merge branch 'master' into feat/swarm/report_remote_address
thomaseizinger Jan 24, 2024
49d53d1
Merge branch 'master' into feat/swarm/report_remote_address
mergify[bot] Jan 24, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 4 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ libp2p-dcutr = { version = "0.11.0", path = "protocols/dcutr" }
libp2p-dns = { version = "0.41.1", path = "transports/dns" }
libp2p-floodsub = { version = "0.44.0", path = "protocols/floodsub" }
libp2p-gossipsub = { version = "0.46.1", path = "protocols/gossipsub" }
libp2p-identify = { version = "0.44.1", path = "protocols/identify" }
libp2p-identify = { version = "0.44.2", path = "protocols/identify" }
libp2p-identity = { version = "0.2.8" }
libp2p-kad = { version = "0.45.3", path = "protocols/kad" }
libp2p-mdns = { version = "0.45.1", path = "protocols/mdns" }
Expand All @@ -97,10 +97,10 @@ libp2p-pnet = { version = "0.24.0", path = "transports/pnet" }
libp2p-quic = { version = "0.10.2", path = "transports/quic" }
libp2p-relay = { version = "0.17.1", path = "protocols/relay" }
libp2p-rendezvous = { version = "0.14.0", path = "protocols/rendezvous" }
libp2p-request-response = { version = "0.26.1", path = "protocols/request-response" }
libp2p-request-response = { version = "0.26.2", path = "protocols/request-response" }
libp2p-server = { version = "0.12.5", path = "misc/server" }
libp2p-swarm = { version = "0.44.1", path = "swarm" }
libp2p-swarm-derive = { version = "=0.34.2", path = "swarm-derive" } # `libp2p-swarm-derive` may not be compatible with different `libp2p-swarm` non-breaking releases. E.g. `libp2p-swarm` might introduce a new enum variant `FromSwarm` (which is `#[non-exhaustive]`) in a non-breaking release. Older versions of `libp2p-swarm-derive` would not forward this enum variant within the `NetworkBehaviour` hierarchy. Thus the version pinning is required.
libp2p-swarm = { version = "0.44.2", path = "swarm" }
libp2p-swarm-derive = { version = "=0.34.3", path = "swarm-derive" } # `libp2p-swarm-derive` may not be compatible with different `libp2p-swarm` non-breaking releases. E.g. `libp2p-swarm` might introduce a new enum variant `FromSwarm` (which is `#[non-exhaustive]`) in a non-breaking release. Older versions of `libp2p-swarm-derive` would not forward this enum variant within the `NetworkBehaviour` hierarchy. Thus the version pinning is required.
libp2p-swarm-test = { version = "0.3.0", path = "swarm-test" }
libp2p-tcp = { version = "0.41.0", path = "transports/tcp" }
libp2p-tls = { version = "0.3.0", path = "transports/tls" }
Expand Down
2 changes: 1 addition & 1 deletion protocols/autonat/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ name = "libp2p-autonat"
edition = "2021"
rust-version = { workspace = true }
description = "NAT and firewall detection for libp2p"
version = "0.12.0"
authors = ["David Craven <david@craven.ch>", "Elena Frank <elena.frank@protonmail.com>"]
version = "0.12.0"
license = "MIT"
repository = "https://github.com/libp2p/rust-libp2p"
keywords = ["peer-to-peer", "libp2p", "networking"]
Expand Down
1 change: 1 addition & 0 deletions protocols/autonat/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,7 @@ impl Behaviour {
pub fn add_server(&mut self, peer: PeerId, address: Option<Multiaddr>) {
self.servers.insert(peer);
if let Some(addr) = address {
#[allow(deprecated)]
StemCll marked this conversation as resolved.
Show resolved Hide resolved
self.inner.add_address(&peer, addr);
}
}
Expand Down
5 changes: 5 additions & 0 deletions protocols/identify/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
## 0.44.2

- Emit `ToSwarm::NewExternalAddrOfPeer` for all external addresses of remote peers.
See [PR 4371](https://github.com/libp2p/rust-libp2p/pull/4371).

## 0.44.1

- Ensure `Multiaddr` handled and returned by `Behaviour` are `/p2p` terminated.
Expand Down
2 changes: 1 addition & 1 deletion protocols/identify/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ name = "libp2p-identify"
edition = "2021"
rust-version = { workspace = true }
description = "Nodes identifcation protocol for libp2p"
version = "0.44.1"
version = "0.44.2"
authors = ["Parity Technologies <admin@parity.io>"]
license = "MIT"
repository = "https://github.com/libp2p/rust-libp2p"
Expand Down
70 changes: 36 additions & 34 deletions protocols/identify/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,14 @@ use libp2p_identity::PublicKey;
use libp2p_swarm::behaviour::{ConnectionClosed, ConnectionEstablished, DialFailure, FromSwarm};
use libp2p_swarm::{
ConnectionDenied, DialError, ExternalAddresses, ListenAddresses, NetworkBehaviour,
NotifyHandler, StreamUpgradeError, THandlerInEvent, ToSwarm,
NotifyHandler, PeerAddresses, StreamUpgradeError, THandlerInEvent, ToSwarm,
};
use libp2p_swarm::{ConnectionId, THandler, THandlerOutEvent};
use lru::LruCache;

use std::collections::hash_map::Entry;
use std::num::NonZeroUsize;
use std::{
collections::{HashMap, HashSet, VecDeque},
iter::FromIterator,
task::Context,
task::Poll,
time::Duration,
Expand Down Expand Up @@ -200,9 +199,9 @@ impl Behaviour {
.or_default()
.insert(conn, addr);

if let Some(entry) = self.discovered_peers.get_mut(&peer_id) {
if let Some(cache) = self.discovered_peers.0.as_mut() {
for addr in failed_addresses {
entry.remove(addr);
cache.remove(&peer_id, addr);
}
}
}
Expand Down Expand Up @@ -274,7 +273,21 @@ impl NetworkBehaviour for Behaviour {

let observed = info.observed_addr.clone();
self.events
.push_back(ToSwarm::GenerateEvent(Event::Received { peer_id, info }));
.push_back(ToSwarm::GenerateEvent(Event::Received {
peer_id,
info: info.clone(),
}));

if let Some(ref mut discovered_peers) = self.discovered_peers.0 {
for address in &info.listen_addrs {
if discovered_peers.add(peer_id, address.clone()) {
self.events.push_back(ToSwarm::NewExternalAddrOfPeer {
peer_id,
addr: address.clone(),
});
}
}
}
thomaseizinger marked this conversation as resolved.
Show resolved Hide resolved

match self.our_observed_addresses.entry(id) {
Entry::Vacant(not_yet_observed) => {
Expand Down Expand Up @@ -387,11 +400,11 @@ impl NetworkBehaviour for Behaviour {
self.our_observed_addresses.remove(&connection_id);
}
FromSwarm::DialFailure(DialFailure { peer_id, error, .. }) => {
if let Some(entry) = peer_id.and_then(|id| self.discovered_peers.get_mut(&id)) {
if let DialError::Transport(errors) = error {
for (addr, _error) in errors {
entry.remove(addr);
}
if let (Some(peer_id), Some(cache), DialError::Transport(errors)) =
(peer_id, self.discovered_peers.0.as_mut(), error)
{
for (addr, _error) in errors {
cache.remove(&peer_id, addr);
}
}
}
Expand Down Expand Up @@ -445,42 +458,31 @@ fn multiaddr_matches_peer_id(addr: &Multiaddr, peer_id: &PeerId) -> bool {
true
}

struct PeerCache(Option<LruCache<PeerId, HashSet<Multiaddr>>>);
struct PeerCache(Option<PeerAddresses>);

impl PeerCache {
fn disabled() -> Self {
Self(None)
}

fn enabled(size: NonZeroUsize) -> Self {
Self(Some(LruCache::new(size)))
}

fn get_mut(&mut self, peer: &PeerId) -> Option<&mut HashSet<Multiaddr>> {
self.0.as_mut()?.get_mut(peer)
Self(Some(PeerAddresses::new(size)))
}

fn put(&mut self, peer: PeerId, addresses: impl Iterator<Item = Multiaddr>) {
let cache = match self.0.as_mut() {
None => return,
Some(cache) => cache,
};
thomaseizinger marked this conversation as resolved.
Show resolved Hide resolved

let addresses = addresses.filter_map(|a| a.with_p2p(peer).ok());
cache.put(peer, HashSet::from_iter(addresses));
if let Some(cache) = self.0.as_mut() {
for address in addresses {
cache.add(peer, address);
}
}
}

fn get(&mut self, peer: &PeerId) -> Vec<Multiaddr> {
let cache = match self.0.as_mut() {
None => return Vec::new(),
Some(cache) => cache,
};

cache
.get(peer)
.cloned()
.map(Vec::from_iter)
.unwrap_or_default()
if let Some(cache) = self.0.as_mut() {
cache.get(peer).collect()
} else {
Vec::new()
}
}
}

Expand Down
5 changes: 5 additions & 0 deletions protocols/request-response/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
## 0.26.2

- Deprecate `Behaviour::add_address` in favor of `Swarm::add_peer_address`.
See [PR 4371](https://github.com/libp2p/rust-libp2p/pull/4371).

## 0.26.1

- Derive `PartialOrd` and `Ord` for `{Out,In}boundRequestId`.
Expand Down
6 changes: 3 additions & 3 deletions protocols/request-response/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ name = "libp2p-request-response"
edition = "2021"
rust-version = { workspace = true }
description = "Generic Request/Response Protocols"
version = "0.26.1"
version = "0.26.2"
authors = ["Parity Technologies <admin@parity.io>"]
license = "MIT"
repository = "https://github.com/libp2p/rust-libp2p"
Expand All @@ -19,7 +19,7 @@ libp2p-core = { workspace = true }
libp2p-swarm = { workspace = true }
libp2p-identity = { workspace = true }
rand = "0.8"
serde = { version = "1.0", optional = true}
serde = { version = "1.0", optional = true }
serde_json = { version = "1.0.108", optional = true }
smallvec = "1.11.2"
tracing = "0.1.37"
Expand All @@ -40,7 +40,7 @@ libp2p-yamux = { workspace = true }
rand = "0.8"
libp2p-swarm-test = { path = "../../swarm-test" }
futures_ringbuf = "0.4.0"
serde = { version = "1.0", features = ["derive"]}
serde = { version = "1.0", features = ["derive"] }
tracing-subscriber = { version = "0.3", features = ["env-filter"] }

# Passing arguments to the docsrs builder in order to properly document cfg's.
Expand Down
29 changes: 12 additions & 17 deletions protocols/request-response/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,8 @@ use libp2p_identity::PeerId;
use libp2p_swarm::{
behaviour::{AddressChange, ConnectionClosed, DialFailure, FromSwarm},
dial_opts::DialOpts,
ConnectionDenied, ConnectionHandler, ConnectionId, NetworkBehaviour, NotifyHandler, THandler,
THandlerInEvent, THandlerOutEvent, ToSwarm,
ConnectionDenied, ConnectionHandler, ConnectionId, NetworkBehaviour, NotifyHandler,
PeerAddresses, THandler, THandlerInEvent, THandlerOutEvent, ToSwarm,
};
use smallvec::SmallVec;
use std::{
Expand Down Expand Up @@ -357,7 +357,7 @@ where
/// reachable addresses, if any.
connected: HashMap<PeerId, SmallVec<[Connection; 2]>>,
/// Externally managed addresses via `add_address` and `remove_address`.
addresses: HashMap<PeerId, HashSet<Multiaddr>>,
addresses: PeerAddresses,
/// Requests that have not yet been sent and are waiting for a connection
/// to be established.
pending_outbound_requests: HashMap<PeerId, SmallVec<[OutboundMessage<TCodec>; 10]>>,
Expand Down Expand Up @@ -406,7 +406,7 @@ where
pending_events: VecDeque::new(),
connected: HashMap::new(),
pending_outbound_requests: HashMap::new(),
addresses: HashMap::new(),
addresses: PeerAddresses::default(),
}
}

Expand Down Expand Up @@ -470,20 +470,14 @@ where
///
/// Returns true if the address was added, false otherwise (i.e. if the
/// address is already in the list).
#[deprecated(note = "Use `Swarm::add_peer_address` instead.")]
pub fn add_address(&mut self, peer: &PeerId, address: Multiaddr) -> bool {
self.addresses.entry(*peer).or_default().insert(address)
self.addresses.add(*peer, address)
}

/// Removes an address of a peer previously added via `add_address`.
/// Removes an address of a peer previously added via [`Behaviour::add_address`].
pub fn remove_address(&mut self, peer: &PeerId, address: &Multiaddr) {
thomaseizinger marked this conversation as resolved.
Show resolved Hide resolved
let mut last = false;
if let Some(addresses) = self.addresses.get_mut(peer) {
addresses.retain(|a| a != address);
last = addresses.is_empty();
}
if last {
self.addresses.remove(peer);
}
self.addresses.remove(peer, address);
}

/// Checks whether a peer is currently connected.
Expand Down Expand Up @@ -764,9 +758,9 @@ where
if let Some(connections) = self.connected.get(&peer) {
addresses.extend(connections.iter().filter_map(|c| c.remote_address.clone()))
}
if let Some(more) = self.addresses.get(&peer) {
addresses.extend(more.iter().cloned());
}

let cached_addrs = self.addresses.get(&peer);
addresses.extend(cached_addrs);

Ok(addresses)
}
Expand Down Expand Up @@ -797,6 +791,7 @@ where
}

fn on_swarm_event(&mut self, event: FromSwarm) {
self.addresses.on_swarm_event(&event);
match event {
FromSwarm::ConnectionEstablished(_) => {}
FromSwarm::ConnectionClosed(connection_closed) => {
Expand Down
60 changes: 60 additions & 0 deletions protocols/request-response/tests/peer_address.rs
thomaseizinger marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
use libp2p_core::ConnectedPoint;
use libp2p_request_response as request_response;
use libp2p_request_response::ProtocolSupport;
use libp2p_swarm::{StreamProtocol, Swarm, SwarmEvent};
use libp2p_swarm_test::SwarmExt;
use serde::{Deserialize, Serialize};
use std::iter;
use tracing_subscriber::EnvFilter;

#[async_std::test]
async fn dial_succeeds_after_adding_peers_address() {
let _ = tracing_subscriber::fmt()
.with_env_filter(EnvFilter::from_default_env())
.try_init();

let protocols = iter::once((StreamProtocol::new("/ping/1"), ProtocolSupport::Full));
let config = request_response::Config::default();

let mut swarm = Swarm::new_ephemeral(|_| {
request_response::cbor::Behaviour::<Ping, Pong>::new(protocols.clone(), config.clone())
});

let mut swarm2 = Swarm::new_ephemeral(|_| {
request_response::cbor::Behaviour::<Ping, Pong>::new(protocols.clone(), config.clone())
});

let peer_id2 = *swarm2.local_peer_id();

let (listen_addr, _) = swarm2.listen().with_memory_addr_external().await;

swarm.add_peer_address(peer_id2, listen_addr.clone());

swarm.dial(peer_id2).unwrap();

async_std::task::spawn(swarm2.loop_on_next());

let (connected_peer_id, connected_address) = swarm
.wait(|event| match event {
SwarmEvent::ConnectionEstablished {
peer_id, endpoint, ..
} => {
let address = match endpoint {
ConnectedPoint::Dialer { address, .. } => Some(address),
_ => None,
};
Some((peer_id, address))
}
_ => None,
})
.await;
let expected_address = listen_addr.with_p2p(peer_id2).unwrap();

assert_eq!(connected_peer_id, peer_id2);
assert_eq!(expected_address, connected_address.unwrap());
}

#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
struct Ping(Vec<u8>);
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
struct Pong(Vec<u8>);
Loading
Loading