From dc3a7378cfa94ac280ef5b14a79da800a7f5de99 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Wed, 24 May 2023 09:52:16 +0200 Subject: [PATCH] feat(swarm): replace address scoring with explicit candidates Previously, a `NetworkBehaviour` could report an `AddressScore` for an external address. This score was a `u32` and addresses would be ranked amongst those. In reality, an address is either confirmed to be publicly reachable (via a protocol such as AutoNAT) or merely represents a candidate that might be an external address. In a way, addresses are guilty (private) until proven innocent (publicly reachable). When a `NetworkBehaviour` reports an address candidate, we perform address translation on it to potentially correct for ephemeral ports of TCP. These candidates are then injected back into the `NetworkBehaviour`. Protocols such as AutoNAT can use these addresses as a source for probing their NAT status. Once confirmed, they can emit a `ToSwarm::ExternalAddrConfirmed` event which again will be passed to all `NetworkBehaviour`s. This simplified approach will allow us implement Kademlia's client-mode (https://github.com/libp2p/rust-libp2p/issues/2032) without additional configuration options: As soon as an address is reported as publicly reachable, we can activate server-mode for that connection. Related: https://github.com/libp2p/rust-libp2p/pull/3877. Related: https://github.com/libp2p/rust-libp2p/issues/3953. Related: https://github.com/libp2p/rust-libp2p/issues/2032. Related: https://github.com/libp2p/go-libp2p/issues/2229. Co-authored-by: Max Inden Pull-Request: #3954. --- src/behaviour.rs | 33 ++++++++++++++++--------- src/behaviour/as_client.rs | 24 ++++-------------- tests/test_client.rs | 50 ++++---------------------------------- tests/test_server.rs | 16 ++++++------ 4 files changed, 38 insertions(+), 85 deletions(-) diff --git a/src/behaviour.rs b/src/behaviour.rs index 17681341489..926445c9e49 100644 --- a/src/behaviour.rs +++ b/src/behaviour.rs @@ -36,10 +36,10 @@ use libp2p_request_response::{ }; use libp2p_swarm::{ behaviour::{ - AddressChange, ConnectionClosed, ConnectionEstablished, DialFailure, ExpiredExternalAddr, - ExpiredListenAddr, FromSwarm, + AddressChange, ConnectionClosed, ConnectionEstablished, DialFailure, ExpiredListenAddr, + ExternalAddrExpired, FromSwarm, }, - ConnectionDenied, ConnectionId, ExternalAddresses, ListenAddresses, NetworkBehaviour, + ConnectionDenied, ConnectionId, ListenAddresses, NetworkBehaviour, NewExternalAddrCandidate, PollParameters, THandler, THandlerInEvent, THandlerOutEvent, ToSwarm, }; use std::{ @@ -214,7 +214,7 @@ pub struct Behaviour { probe_id: ProbeId, listen_addresses: ListenAddresses, - external_addresses: ExternalAddresses, + other_candidates: HashSet, } impl Behaviour { @@ -240,7 +240,7 @@ impl Behaviour { pending_actions: VecDeque::new(), probe_id: ProbeId(0), listen_addresses: Default::default(), - external_addresses: Default::default(), + other_candidates: Default::default(), } } @@ -279,6 +279,12 @@ impl Behaviour { self.servers.retain(|p| p != peer); } + /// Explicitly probe the provided address for external reachability. + pub fn probe_address(&mut self, candidate: Multiaddr) { + self.other_candidates.insert(candidate); + self.as_client().on_new_address(); + } + fn as_client(&mut self) -> AsClient { AsClient { inner: &mut self.inner, @@ -294,7 +300,7 @@ impl Behaviour { last_probe: &mut self.last_probe, schedule_probe: &mut self.schedule_probe, listen_addresses: &self.listen_addresses, - external_addresses: &self.external_addresses, + other_candidates: &self.other_candidates, } } @@ -532,7 +538,6 @@ impl NetworkBehaviour for Behaviour { fn on_swarm_event(&mut self, event: FromSwarm) { self.listen_addresses.on_swarm_event(&event); - self.external_addresses.on_swarm_event(&event); match event { FromSwarm::ConnectionEstablished(connection_established) => { @@ -561,14 +566,17 @@ impl NetworkBehaviour for Behaviour { })); self.as_client().on_expired_address(addr); } - FromSwarm::ExpiredExternalAddr(ExpiredExternalAddr { addr }) => { + FromSwarm::ExternalAddrExpired(ExternalAddrExpired { addr }) => { self.inner - .on_swarm_event(FromSwarm::ExpiredExternalAddr(ExpiredExternalAddr { addr })); + .on_swarm_event(FromSwarm::ExternalAddrExpired(ExternalAddrExpired { addr })); self.as_client().on_expired_address(addr); } - external_addr @ FromSwarm::NewExternalAddr(_) => { - self.inner.on_swarm_event(external_addr); - self.as_client().on_new_address(); + FromSwarm::NewExternalAddrCandidate(NewExternalAddrCandidate { addr }) => { + self.inner + .on_swarm_event(FromSwarm::NewExternalAddrCandidate( + NewExternalAddrCandidate { addr }, + )); + self.probe_address(addr.to_owned()); } listen_failure @ FromSwarm::ListenFailure(_) => { self.inner.on_swarm_event(listen_failure) @@ -580,6 +588,7 @@ impl NetworkBehaviour for Behaviour { listener_closed @ FromSwarm::ListenerClosed(_) => { self.inner.on_swarm_event(listener_closed) } + confirmed @ FromSwarm::ExternalAddrConfirmed(_) => self.inner.on_swarm_event(confirmed), } } diff --git a/src/behaviour/as_client.rs b/src/behaviour/as_client.rs index e0c0b2e9e0a..e57523afaf8 100644 --- a/src/behaviour/as_client.rs +++ b/src/behaviour/as_client.rs @@ -30,9 +30,7 @@ use instant::Instant; use libp2p_core::Multiaddr; use libp2p_identity::PeerId; use libp2p_request_response::{self as request_response, OutboundFailure, RequestId}; -use libp2p_swarm::{ - AddressScore, ConnectionId, ExternalAddresses, ListenAddresses, PollParameters, ToSwarm, -}; +use libp2p_swarm::{ConnectionId, ListenAddresses, PollParameters, ToSwarm}; use rand::{seq::SliceRandom, thread_rng}; use std::{ collections::{HashMap, HashSet, VecDeque}, @@ -97,13 +95,13 @@ pub(crate) struct AsClient<'a> { pub(crate) last_probe: &'a mut Option, pub(crate) schedule_probe: &'a mut Delay, pub(crate) listen_addresses: &'a ListenAddresses, - pub(crate) external_addresses: &'a ExternalAddresses, + pub(crate) other_candidates: &'a HashSet, } impl<'a> HandleInnerEvent for AsClient<'a> { fn handle_event( &mut self, - params: &mut impl PollParameters, + _: &mut impl PollParameters, event: request_response::Event, ) -> VecDeque { match event { @@ -147,19 +145,7 @@ impl<'a> HandleInnerEvent for AsClient<'a> { } if let Ok(address) = response.result { - // Update observed address score if it is finite. - #[allow(deprecated)] - // TODO: Fix once we report `AddressScore` through `FromSwarm` event. - let score = params - .external_addresses() - .find_map(|r| (r.addr == address).then_some(r.score)) - .unwrap_or(AddressScore::Finite(0)); - if let AddressScore::Finite(finite_score) = score { - actions.push_back(ToSwarm::ReportObservedAddr { - address, - score: AddressScore::Finite(finite_score + 1), - }); - } + actions.push_back(ToSwarm::ExternalAddrConfirmed(address)); } actions @@ -201,7 +187,7 @@ impl<'a> AsClient<'a> { self.schedule_probe.reset(self.config.retry_interval); let addresses = self - .external_addresses + .other_candidates .iter() .chain(self.listen_addresses.iter()) .cloned() diff --git a/tests/test_client.rs b/tests/test_client.rs index 221833c9377..1911d1a6b2d 100644 --- a/tests/test_client.rs +++ b/tests/test_client.rs @@ -24,7 +24,7 @@ use libp2p_autonat::{ }; use libp2p_core::Multiaddr; use libp2p_identity::PeerId; -use libp2p_swarm::{AddressScore, Swarm, SwarmEvent}; +use libp2p_swarm::{Swarm, SwarmEvent}; use libp2p_swarm_test::SwarmExt as _; use std::time::Duration; @@ -70,48 +70,6 @@ async fn test_auto_probe() { assert!(client.behaviour().public_address().is_none()); assert_eq!(client.behaviour().confidence(), 0); - // Test Private NAT Status - - // Artificially add a faulty address. - let unreachable_addr: Multiaddr = "/ip4/127.0.0.1/tcp/42".parse().unwrap(); - client.add_external_address(unreachable_addr.clone(), AddressScore::Infinite); - - let id = match client.next_behaviour_event().await { - Event::OutboundProbe(OutboundProbeEvent::Request { probe_id, peer }) => { - assert_eq!(peer, server_id); - probe_id - } - other => panic!("Unexpected behaviour event: {other:?}."), - }; - - match client.next_behaviour_event().await { - Event::OutboundProbe(OutboundProbeEvent::Error { - probe_id, - peer, - error, - }) => { - assert_eq!(peer.unwrap(), server_id); - assert_eq!(probe_id, id); - assert_eq!( - error, - OutboundProbeError::Response(ResponseError::DialError) - ); - } - other => panic!("Unexpected behaviour event: {other:?}."), - } - - match client.next_behaviour_event().await { - Event::StatusChanged { old, new } => { - assert_eq!(old, NatStatus::Unknown); - assert_eq!(new, NatStatus::Private); - } - other => panic!("Unexpected behaviour event: {other:?}."), - } - - assert_eq!(client.behaviour().confidence(), 0); - assert_eq!(client.behaviour().nat_status(), NatStatus::Private); - assert!(client.behaviour().public_address().is_none()); - // Test new public listening address client.listen().await; @@ -142,12 +100,14 @@ async fn test_auto_probe() { } SwarmEvent::Behaviour(Event::StatusChanged { old, new }) => { // Expect to flip status to public - assert_eq!(old, NatStatus::Private); + assert_eq!(old, NatStatus::Unknown); assert!(matches!(new, NatStatus::Public(_))); assert!(new.is_public()); break; } SwarmEvent::IncomingConnection { .. } + | SwarmEvent::ConnectionEstablished { .. } + | SwarmEvent::Dialing { .. } | SwarmEvent::NewListenAddr { .. } | SwarmEvent::ExpiredListenAddr { .. } => {} other => panic!("Unexpected swarm event: {other:?}."), @@ -198,7 +158,7 @@ async fn test_confidence() { client.listen().await; } else { let unreachable_addr = "/ip4/127.0.0.1/tcp/42".parse().unwrap(); - client.add_external_address(unreachable_addr, AddressScore::Infinite); + client.behaviour_mut().probe_address(unreachable_addr); } for i in 0..MAX_CONFIDENCE + 1 { diff --git a/tests/test_server.rs b/tests/test_server.rs index 5f6a9a269d4..0f14c6edb27 100644 --- a/tests/test_server.rs +++ b/tests/test_server.rs @@ -24,7 +24,7 @@ use libp2p_autonat::{ use libp2p_core::{multiaddr::Protocol, ConnectedPoint, Endpoint, Multiaddr}; use libp2p_identity::PeerId; use libp2p_swarm::DialError; -use libp2p_swarm::{AddressScore, Swarm, SwarmEvent}; +use libp2p_swarm::{Swarm, SwarmEvent}; use libp2p_swarm_test::SwarmExt as _; use std::{num::NonZeroU32, time::Duration}; @@ -131,10 +131,9 @@ async fn test_dial_back() { async fn test_dial_error() { let (mut server, server_id, server_addr) = new_server_swarm(None).await; let (mut client, client_id) = new_client_swarm(server_id, server_addr).await; - client.add_external_address( - "/ip4/127.0.0.1/tcp/12345".parse().unwrap(), - AddressScore::Infinite, - ); + client + .behaviour_mut() + .probe_address("/ip4/127.0.0.1/tcp/12345".parse().unwrap()); async_std::task::spawn(client.loop_on_next()); let request_probe_id = match server.next_behaviour_event().await { @@ -274,10 +273,9 @@ async fn test_dial_multiple_addr() { let (mut client, client_id) = new_client_swarm(server_id, server_addr.clone()).await; client.listen().await; - client.add_external_address( - "/ip4/127.0.0.1/tcp/12345".parse().unwrap(), - AddressScore::Infinite, - ); + client + .behaviour_mut() + .probe_address("/ip4/127.0.0.1/tcp/12345".parse().unwrap()); async_std::task::spawn(client.loop_on_next()); let dial_addresses = match server.next_behaviour_event().await {