Skip to content

Commit

Permalink
Make public addresses go first in authority discovery DHT records (pa…
Browse files Browse the repository at this point in the history
…ritytech#3757)

Make sure explicitly set by the operator public addresses go first in
the authority discovery DHT records.

Also update `Discovery` behavior to eliminate duplicates in the returned
addresses.

This PR should improve situation with
paritytech#3519.

Obsoletes paritytech#3657.
  • Loading branch information
dmitry-markin authored and dharjeezy committed Mar 24, 2024
1 parent 89389c8 commit 47d9f7b
Show file tree
Hide file tree
Showing 9 changed files with 99 additions and 27 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions cumulus/client/relay-chain-minimal-node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ fn build_authority_discovery_service<Block: BlockT>(
prometheus_registry: Option<Registry>,
) -> AuthorityDiscoveryService {
let auth_disc_publish_non_global_ips = config.network.allow_non_globals_in_dht;
let auth_disc_public_addresses = config.network.public_addresses.clone();
let authority_discovery_role = sc_authority_discovery::Role::Discover;
let dht_event_stream = network.event_stream("authority-discovery").filter_map(|e| async move {
match e {
Expand All @@ -65,6 +66,7 @@ fn build_authority_discovery_service<Block: BlockT>(
let (worker, service) = sc_authority_discovery::new_worker_and_service_with_config(
sc_authority_discovery::WorkerConfig {
publish_non_global_ips: auth_disc_publish_non_global_ips,
public_addresses: auth_disc_public_addresses,
// Require that authority discovery records are signed.
strict_record_validation: true,
..Default::default()
Expand Down
2 changes: 2 additions & 0 deletions polkadot/node/service/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -807,6 +807,7 @@ pub fn new_full<OverseerGenerator: OverseerGen>(

let shared_voter_state = rpc_setup;
let auth_disc_publish_non_global_ips = config.network.allow_non_globals_in_dht;
let auth_disc_public_addresses = config.network.public_addresses.clone();
let mut net_config = sc_network::config::FullNetworkConfiguration::new(&config.network);

let genesis_hash = client.block_hash(0).ok().flatten().expect("Genesis block exists; qed");
Expand Down Expand Up @@ -1061,6 +1062,7 @@ pub fn new_full<OverseerGenerator: OverseerGen>(
let (worker, service) = sc_authority_discovery::new_worker_and_service_with_config(
sc_authority_discovery::WorkerConfig {
publish_non_global_ips: auth_disc_publish_non_global_ips,
public_addresses: auth_disc_public_addresses,
// Require that authority discovery records are signed.
strict_record_validation: true,
..Default::default()
Expand Down
2 changes: 2 additions & 0 deletions substrate/bin/node/cli/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -422,6 +422,7 @@ pub fn new_full_base(

let shared_voter_state = rpc_setup;
let auth_disc_publish_non_global_ips = config.network.allow_non_globals_in_dht;
let auth_disc_public_addresses = config.network.public_addresses.clone();
let mut net_config = sc_network::config::FullNetworkConfiguration::new(&config.network);
let genesis_hash = client.block_hash(0).ok().flatten().expect("Genesis block exists; qed");

Expand Down Expand Up @@ -610,6 +611,7 @@ pub fn new_full_base(
sc_authority_discovery::new_worker_and_service_with_config(
sc_authority_discovery::WorkerConfig {
publish_non_global_ips: auth_disc_publish_non_global_ips,
public_addresses: auth_disc_public_addresses,
..Default::default()
},
client.clone(),
Expand Down
1 change: 1 addition & 0 deletions substrate/client/authority-discovery/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ multihash = { version = "0.18.1", default-features = false, features = [
"sha2",
"std",
] }
linked_hash_set = "0.1.4"
log = { workspace = true, default-features = true }
prost = "0.12"
rand = "0.8.5"
Expand Down
5 changes: 5 additions & 0 deletions substrate/client/authority-discovery/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,10 @@ pub struct WorkerConfig {
/// Defaults to `true` to avoid the surprise factor.
pub publish_non_global_ips: bool,

/// Public addresses set by the node operator to always publish first in the authority
/// discovery DHT record.
pub public_addresses: Vec<Multiaddr>,

/// Reject authority discovery records that are not signed by their network identity (PeerId)
///
/// Defaults to `false` to provide compatibility with old versions
Expand All @@ -104,6 +108,7 @@ impl Default for WorkerConfig {
// `authority_discovery_dht_event_received`.
max_query_interval: Duration::from_secs(10 * 60),
publish_non_global_ips: true,
public_addresses: Vec::new(),
strict_record_validation: false,
}
}
Expand Down
92 changes: 71 additions & 21 deletions substrate/client/authority-discovery/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ use addr_cache::AddrCache;
use codec::{Decode, Encode};
use ip_network::IpNetwork;
use libp2p::{core::multiaddr, identity::PublicKey, multihash::Multihash, Multiaddr, PeerId};
use linked_hash_set::LinkedHashSet;
use multihash_codetable::{Code, MultihashDigest};

use log::{debug, error, log_enabled};
Expand Down Expand Up @@ -120,14 +121,22 @@ pub struct Worker<Client, Network, Block, DhtEventStream> {

/// Interval to be proactive, publishing own addresses.
publish_interval: ExpIncInterval,

/// Pro-actively publish our own addresses at this interval, if the keys in the keystore
/// have changed.
publish_if_changed_interval: ExpIncInterval,

/// List of keys onto which addresses have been published at the latest publication.
/// Used to check whether they have changed.
latest_published_keys: HashSet<AuthorityId>,

/// Same value as in the configuration.
publish_non_global_ips: bool,

/// Public addresses set by the node operator to always publish first in the authority
/// discovery DHT record.
public_addresses: LinkedHashSet<Multiaddr>,

/// Same value as in the configuration.
strict_record_validation: bool,

Expand All @@ -136,6 +145,7 @@ pub struct Worker<Client, Network, Block, DhtEventStream> {

/// Queue of throttled lookups pending to be passed to the network.
pending_lookups: Vec<AuthorityId>,

/// Set of in-flight lookups.
in_flight_lookups: HashMap<KademliaKey, AuthorityId>,

Expand Down Expand Up @@ -224,6 +234,29 @@ where
None => None,
};

let public_addresses = {
let local_peer_id: Multihash = network.local_peer_id().into();

config
.public_addresses
.into_iter()
.map(|mut address| {
if let Some(multiaddr::Protocol::P2p(peer_id)) = address.iter().last() {
if peer_id != local_peer_id {
error!(
target: LOG_TARGET,
"Discarding invalid local peer ID in public address {address}.",
);
}
// Always discard `/p2p/...` protocol for proper address comparison (local
// peer id will be added before publishing).
address.pop();
}
address
})
.collect()
};

Worker {
from_service: from_service.fuse(),
client,
Expand All @@ -233,6 +266,7 @@ where
publish_if_changed_interval,
latest_published_keys: HashSet::new(),
publish_non_global_ips: config.publish_non_global_ips,
public_addresses,
strict_record_validation: config.strict_record_validation,
query_interval,
pending_lookups: Vec::new(),
Expand Down Expand Up @@ -304,32 +338,48 @@ where
}

fn addresses_to_publish(&self) -> impl Iterator<Item = Multiaddr> {
let peer_id: Multihash = self.network.local_peer_id().into();
let publish_non_global_ips = self.publish_non_global_ips;
let addresses = self.network.external_addresses().into_iter().filter(move |a| {
if publish_non_global_ips {
return true
}
let addresses = self
.public_addresses
.clone()
.into_iter()
.chain(self.network.external_addresses().into_iter().filter_map(|mut address| {
// Make sure the reported external address does not contain `/p2p/...` protocol.
if let Some(multiaddr::Protocol::P2p(_)) = address.iter().last() {
address.pop();
}

a.iter().all(|p| match p {
// The `ip_network` library is used because its `is_global()` method is stable,
// while `is_global()` in the standard library currently isn't.
multiaddr::Protocol::Ip4(ip) if !IpNetwork::from(ip).is_global() => false,
multiaddr::Protocol::Ip6(ip) if !IpNetwork::from(ip).is_global() => false,
_ => true,
if self.public_addresses.contains(&address) {
// Already added above.
None
} else {
Some(address)
}
}))
.filter(move |address| {
if publish_non_global_ips {
return true
}

address.iter().all(|protocol| match protocol {
// The `ip_network` library is used because its `is_global()` method is stable,
// while `is_global()` in the standard library currently isn't.
multiaddr::Protocol::Ip4(ip) if !IpNetwork::from(ip).is_global() => false,
multiaddr::Protocol::Ip6(ip) if !IpNetwork::from(ip).is_global() => false,
_ => true,
})
})
});
.collect::<Vec<_>>();

debug!(target: LOG_TARGET, "Authority DHT record peer_id='{:?}' addresses='{:?}'", peer_id, addresses.clone().collect::<Vec<_>>());
let peer_id = self.network.local_peer_id();
debug!(
target: LOG_TARGET,
"Authority DHT record peer_id='{peer_id}' addresses='{addresses:?}'",
);

// The address must include the peer id if not already set.
addresses.map(move |a| {
if a.iter().any(|p| matches!(p, multiaddr::Protocol::P2p(_))) {
a
} else {
a.with(multiaddr::Protocol::P2p(peer_id))
}
})
// The address must include the peer id.
let peer_id: Multihash = peer_id.into();
addresses.into_iter().map(move |a| a.with(multiaddr::Protocol::P2p(peer_id)))
}

/// Publish own public addresses.
Expand Down
2 changes: 1 addition & 1 deletion substrate/client/network/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ futures = "0.3.21"
futures-timer = "3.0.2"
ip_network = "0.4.1"
libp2p = { version = "0.51.4", features = ["dns", "identify", "kad", "macros", "mdns", "noise", "ping", "request-response", "tcp", "tokio", "websocket", "yamux"] }
linked_hash_set = "0.1.3"
linked_hash_set = "0.1.4"
log = { workspace = true, default-features = true }
mockall = "0.11.3"
parking_lot = "0.12.1"
Expand Down
19 changes: 14 additions & 5 deletions substrate/client/network/src/discovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ use libp2p::{
},
PeerId,
};
use linked_hash_set::LinkedHashSet;
use log::{debug, info, trace, warn};
use sp_core::hexdisplay::HexDisplay;
use std::{
Expand Down Expand Up @@ -550,14 +551,20 @@ impl NetworkBehaviour for DiscoveryBehaviour {
) -> Result<Vec<Multiaddr>, ConnectionDenied> {
let Some(peer_id) = maybe_peer else { return Ok(Vec::new()) };

let mut list = self
// Collect addresses into [`LinkedHashSet`] to eliminate duplicate entries preserving the
// order of addresses. Give priority to `permanent_addresses` (used with reserved nodes) and
// `ephemeral_addresses` (used for addresses discovered from other sources, like authority
// discovery DHT records).
let mut list: LinkedHashSet<_> = self
.permanent_addresses
.iter()
.filter_map(|(p, a)| (*p == peer_id).then_some(a.clone()))
.collect::<Vec<_>>();
.collect();

if let Some(ephemeral_addresses) = self.ephemeral_addresses.get(&peer_id) {
list.extend(ephemeral_addresses.clone());
ephemeral_addresses.iter().for_each(|address| {
list.insert_if_absent(address.clone());
});
}

{
Expand All @@ -583,12 +590,14 @@ impl NetworkBehaviour for DiscoveryBehaviour {
});
}

list.extend(list_to_filter);
list_to_filter.into_iter().for_each(|address| {
list.insert_if_absent(address);
});
}

trace!(target: "sub-libp2p", "Addresses of {:?}: {:?}", peer_id, list);

Ok(list)
Ok(list.into_iter().collect())
}

fn on_swarm_event(&mut self, event: FromSwarm<Self::ConnectionHandler>) {
Expand Down

0 comments on commit 47d9f7b

Please sign in to comment.