Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[kad] Provide a targeted store operation. #1988

Merged
merged 8 commits into from
Mar 9, 2021
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions protocols/kad/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,18 @@
# 0.29.0 [unreleased]

- Add `KademliaCaching` and `KademliaConfig::set_caching` to configure
whether Kademlia should track, in lookups, the closest nodes to a key
that did not return a record, via `GetRecordOk::cache_candidates`.
As before, if a lookup used a quorum of 1, these candidates will
automatically be sent the found record. Otherwise, with a lookup
quorum of > 1, the candidates can be used with `Kademlia::put_record_to`
after selecting one of the return records to cache. As is the current
behaviour, caching is enabled by default with a `max_peers` of 1, i.e.
it only tracks the closest node to the key that did not return a record.

- Add `Kademlia::put_record_to` for storing a record at specific nodes,
e.g. for write-back caching after a successful read with quorum > 1.

- Update `libp2p-swarm`.

# 0.28.1 [2021-02-15]
Expand Down
174 changes: 143 additions & 31 deletions protocols/kad/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@ use libp2p_swarm::{
};
use log::{info, debug, warn};
use smallvec::SmallVec;
use std::{borrow::Cow, error, iter, time::Duration};
use std::collections::{HashSet, VecDeque};
use std::{borrow::Cow, error, time::Duration};
use std::collections::{HashSet, VecDeque, BTreeMap};
use std::fmt;
use std::num::NonZeroUsize;
use std::task::{Context, Poll};
Expand Down Expand Up @@ -100,6 +100,9 @@ pub struct Kademlia<TStore> {
/// The currently known addresses of the local node.
local_addrs: HashSet<Multiaddr>,

/// See [`KademliaConfig::caching`].
caching: KademliaCaching,

/// The record storage.
store: TStore,
}
Expand Down Expand Up @@ -143,6 +146,24 @@ pub struct KademliaConfig {
provider_publication_interval: Option<Duration>,
connection_idle_timeout: Duration,
kbucket_inserts: KademliaBucketInserts,
caching: KademliaCaching,
}

/// The configuration for Kademlia "write-back" caching after successful
/// lookups via [`Kademlia::get_record`].
#[derive(Debug, Clone)]
pub enum KademliaCaching {
/// Caching is disabled and the peers closest to records being looked up
/// that do not return a record are not tracked, i.e.
/// [`GetRecordOk::cache_candidates`] is always empty.
Disabled,
/// Up to `max_peers` peers not returning a record that are closest to the key
/// being looked up are tracked and returned in [`GetRecordOk::cache_candidates`].
/// Furthermore, if [`Kademlia::get_record`] is used with a quorum of 1, the
/// found record is automatically sent to (i.e. cached at) these peers. For lookups with a
/// quorum > 1, the write-back operation must be performed explicitly, if
/// desired and after choosing a record from the results, via [`Kademlia::put_record_to`].
Enabled { max_peers: u16 },
}

impl Default for KademliaConfig {
Expand All @@ -158,6 +179,7 @@ impl Default for KademliaConfig {
provider_record_ttl: Some(Duration::from_secs(24 * 60 * 60)),
connection_idle_timeout: Duration::from_secs(10),
kbucket_inserts: KademliaBucketInserts::OnConnected,
caching: KademliaCaching::Enabled { max_peers: 1 },
}
}
}
Expand Down Expand Up @@ -319,6 +341,17 @@ impl KademliaConfig {
self.kbucket_inserts = inserts;
self
}

/// Sets the [`KademliaCaching`] strategy to use for successful lookups.
///
/// The default is [`KademliaCaching::Enabled`] with a `max_peers` of 1.
/// Hence, with default settings and a lookup quorum of 1, a successful lookup
/// will result in the record being cached at the closest node to the key that
/// did not return the record, i.e. the standard Kademlia behaviour.
pub fn set_caching(&mut self, c: KademliaCaching) -> &mut Self {
self.caching = c;
self
}
}

impl<TStore> Kademlia<TStore>
Expand Down Expand Up @@ -366,7 +399,8 @@ where
record_ttl: config.record_ttl,
provider_record_ttl: config.provider_record_ttl,
connection_idle_timeout: config.connection_idle_timeout,
local_addrs: HashSet::new()
local_addrs: HashSet::new(),
caching: config.caching,
}
}

Expand Down Expand Up @@ -589,7 +623,12 @@ where

let done = records.len() >= quorum.get();
let target = kbucket::Key::new(key.clone());
let info = QueryInfo::GetRecord { key: key.clone(), records, quorum, cache_at: None };
let info = QueryInfo::GetRecord {
key: key.clone(),
records,
quorum,
cache_candidates: BTreeMap::new(),
};
let peers = self.kbuckets.closest_keys(&target);
let inner = QueryInner::new(info);
let id = self.queries.add_iter_closest(target.clone(), peers, inner); // (*)
Expand All @@ -602,7 +641,8 @@ where
id
}

/// Stores a record in the DHT.
/// Stores a record in the DHT, locally as well as at the nodes
/// closest to the key as per the xor distance metric.
///
/// Returns `Ok` if a record has been stored locally, providing the
/// `QueryId` of the initial query that replicates the record in the DHT.
Expand Down Expand Up @@ -638,6 +678,54 @@ where
Ok(self.queries.add_iter_closest(target.clone(), peers, inner))
}

/// Stores a record at specific peers, without storing it locally.
///
/// The given [`Quorum`] is understood in the context of the total
/// number of distinct peers given.
///
/// If the record's expiration is `None`, the configured record TTL is used.
///
/// > **Note**: This is not a regular Kademlia DHT operation. It may be
/// > used to selectively update or store a record to specific peers
/// > for the purpose of e.g. making sure these peers have the latest
/// > "version" of a record or to "cache" a record at further peers
/// > to increase the lookup success rate on the DHT for other peers.
/// >
/// > In particular, if lookups are performed with a quorum > 1 multiple
/// > possibly different records may be returned and the standard Kademlia
/// > procedure of "caching" (i.e. storing) a found record at the closest
/// > node to the key that _did not_ return it cannot be employed
/// > transparently. In that case, client code can explicitly choose
/// > which record to store at which peers for analogous write-back
/// > caching or for other reasons.
pub fn put_record_to<I>(&mut self, mut record: Record, peers: I, quorum: Quorum) -> QueryId
where
I: ExactSizeIterator<Item = PeerId>
{
let quorum = if peers.len() > 0 {
quorum.eval(NonZeroUsize::new(peers.len()).expect("> 0"))
} else {
// If no peers are given, we just let the query fail immediately
// due to the fact that the quorum must be at least one, instead of
// introducing a new kind of error.
NonZeroUsize::new(1).expect("1 > 0")
};
record.expires = record.expires.or_else(||
self.record_ttl.map(|ttl| Instant::now() + ttl));
let context = PutRecordContext::Custom;
let info = QueryInfo::PutRecord {
context,
record,
quorum,
phase: PutRecordPhase::PutRecord {
success: Vec::new(),
get_closest_peers_stats: QueryStats::empty()
}
};
let inner = QueryInner::new(info);
self.queries.add_fixed(peers, inner)
}

/// Removes the record with the given key from _local_ storage,
/// if the local node is the publisher of the record.
///
Expand Down Expand Up @@ -1083,10 +1171,10 @@ where
}
}

QueryInfo::GetRecord { key, records, quorum, cache_at } => {
QueryInfo::GetRecord { key, records, quorum, cache_candidates } => {
let results = if records.len() >= quorum.get() { // [not empty]
if let Some(cache_key) = cache_at {
// Cache the record at the closest node to the key that
if quorum.get() == 1 && !cache_candidates.is_empty() {
// Cache the record at the closest node(s) to the key that
// did not return the record.
let record = records.first().expect("[not empty]").record.clone();
let quorum = NonZeroUsize::new(1).expect("1 > 0");
Expand All @@ -1101,9 +1189,9 @@ where
}
};
let inner = QueryInner::new(info);
self.queries.add_fixed(iter::once(cache_key.into_preimage()), inner);
self.queries.add_fixed(cache_candidates.values().copied(), inner);
}
Ok(GetRecordOk { records })
Ok(GetRecordOk { records, cache_candidates })
} else if records.is_empty() {
Err(GetRecordError::NotFound {
key,
Expand Down Expand Up @@ -1153,7 +1241,7 @@ where
}
};
match context {
PutRecordContext::Publish =>
PutRecordContext::Publish | PutRecordContext::Custom =>
Some(KademliaEvent::QueryResult {
id: query_id,
stats: get_closest_peers_stats.merge(result.stats),
Expand Down Expand Up @@ -1252,7 +1340,7 @@ where
}
});
match context {
PutRecordContext::Publish =>
PutRecordContext::Publish | PutRecordContext::Custom =>
Some(KademliaEvent::QueryResult {
id: query_id,
stats: result.stats,
Expand Down Expand Up @@ -1722,7 +1810,7 @@ where
} => {
if let Some(query) = self.queries.get_mut(&user_data) {
if let QueryInfo::GetRecord {
key, records, quorum, cache_at
key, records, quorum, cache_candidates
} = &mut query.inner.info {
if let Some(record) = record {
records.push(PeerRecord{ peer: Some(source), record });
Expand All @@ -1744,19 +1832,19 @@ where
);
}
}
} else if quorum.get() == 1 {
// It is a "standard" Kademlia query, for which the
// closest node to the key that did *not* return the
// value is tracked in order to cache the record on
// that node if the query turns out to be successful.
let source_key = kbucket::Key::from(source);
if let Some(cache_key) = cache_at {
let key = kbucket::Key::new(key.clone());
if source_key.distance(&key) < cache_key.distance(&key) {
*cache_at = Some(source_key)
} else {
log::trace!("Record with key {:?} not found at {}", key, source);
if let KademliaCaching::Enabled { max_peers } = self.caching {
let source_key = kbucket::Key::from(source);
let target_key = kbucket::Key::from(key.clone());
let distance = source_key.distance(&target_key);
cache_candidates.insert(distance, source);
if cache_candidates.len() > max_peers as usize {
// TODO: `pop_last()` would be nice once stabilised.
// See https://github.com/rust-lang/rust/issues/62924.
let last = *cache_candidates.keys().next_back().expect("len > 0");
cache_candidates.remove(&last);
}
} else {
*cache_at = Some(source_key)
}
}
}
Expand Down Expand Up @@ -2063,7 +2151,18 @@ pub type GetRecordResult = Result<GetRecordOk, GetRecordError>;
/// The successful result of [`Kademlia::get_record`].
#[derive(Debug, Clone)]
pub struct GetRecordOk {
pub records: Vec<PeerRecord>
/// The records found, including the peer that returned them.
pub records: Vec<PeerRecord>,
/// If caching is enabled, these are the peers closest
/// _to the record key_ (not the local node) that were queried but
/// did not return the record, sorted by distance to the record key
/// from closest to farthest. How many of these are tracked is configured
/// by [`KademliaConfig::set_caching`]. If the lookup used a quorum of
/// 1, these peers will be sent the record as a means of caching.
/// If the lookup used a quorum > 1, you may wish to use these
/// candidates with [`Kademlia::put_record_to`] after selecting
/// one of the returned records.
pub cache_candidates: BTreeMap<kbucket::Distance, PeerId>,
}

/// The error result of [`Kademlia::get_record`].
Expand Down Expand Up @@ -2319,17 +2418,32 @@ impl QueryInner {
/// The context of a [`QueryInfo::AddProvider`] query.
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub enum AddProviderContext {
/// The context is a [`Kademlia::start_providing`] operation.
Publish,
/// The context is periodic republishing of provider announcements
/// initiated earlier via [`Kademlia::start_providing`].
Republish,
}

/// The context of a [`QueryInfo::PutRecord`] query.
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub enum PutRecordContext {
/// The context is a [`Kademlia::put_record`] operation.
Publish,
/// The context is periodic republishing of records stored
/// earlier via [`Kademlia::put_record`].
Republish,
/// The context is periodic replication (i.e. without extending
/// the record TTL) of stored records received earlier from another peer.
Replicate,
/// The context is an automatic write-back caching operation of a
/// record found via [`Kademlia::get_record`] at the closest node
/// to the key queried that did not return a record. This only
/// occurs after a lookup quorum of 1 as per standard Kademlia.
Cache,
/// The context is a custom store operation targeting specific
/// peers initiated by [`Kademlia::put_record_to`].
Custom,
}

/// Information about a running query.
Expand Down Expand Up @@ -2389,11 +2503,9 @@ pub enum QueryInfo {
records: Vec<PeerRecord>,
/// The number of records to look for.
quorum: NonZeroUsize,
/// The closest peer to `key` that did not return a record.
///
/// When a record is found in a standard Kademlia query (quorum == 1),
/// it is cached at this peer as soon as a record is found.
cache_at: Option<kbucket::Key<PeerId>>,
/// The peers closest to the `key` that were queried but did not return a record,
/// i.e. the peers that are candidates for caching the record.
cache_candidates: BTreeMap<kbucket::Distance, PeerId>,
},
}

Expand Down
12 changes: 9 additions & 3 deletions protocols/kad/src/behaviour/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -643,7 +643,9 @@ fn get_record() {

let record = Record::new(random_multihash(), vec![4,5,6]);

swarms[1].store.put(record.clone()).unwrap();
let expected_cache_candidate = *Swarm::local_peer_id(&swarms[1]);

swarms[2].store.put(record.clone()).unwrap();
let qid = swarms[0].get_record(&record.key, Quorum::One);

block_on(
Expand All @@ -653,12 +655,16 @@ fn get_record() {
match swarm.poll_next_unpin(ctx) {
Poll::Ready(Some(KademliaEvent::QueryResult {
id,
result: QueryResult::GetRecord(Ok(GetRecordOk { records })),
result: QueryResult::GetRecord(Ok(GetRecordOk {
records, cache_candidates
})),
..
})) => {
assert_eq!(id, qid);
assert_eq!(records.len(), 1);
assert_eq!(records.first().unwrap().record, record);
assert_eq!(cache_candidates.len(), 1);
assert_eq!(cache_candidates.values().next(), Some(&expected_cache_candidate));
return Poll::Ready(());
}
// Ignore any other event.
Expand Down Expand Up @@ -699,7 +705,7 @@ fn get_record_many() {
match swarm.poll_next_unpin(ctx) {
Poll::Ready(Some(KademliaEvent::QueryResult {
id,
result: QueryResult::GetRecord(Ok(GetRecordOk { records })),
result: QueryResult::GetRecord(Ok(GetRecordOk { records, .. })),
..
})) => {
assert_eq!(id, qid);
Expand Down
9 changes: 8 additions & 1 deletion protocols/kad/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,14 @@ mod dht_proto {
}

pub use addresses::Addresses;
pub use behaviour::{Kademlia, KademliaBucketInserts, KademliaConfig, KademliaEvent, Quorum};
pub use behaviour::{
Kademlia,
KademliaBucketInserts,
KademliaConfig,
KademliaCaching,
KademliaEvent,
Quorum
};
pub use behaviour::{
QueryRef,
QueryMut,
Expand Down