Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

remote-ext: improve state download performance on slow connections #14746

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 35 additions & 14 deletions utils/frame/remote-externalities/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ pub use sp_io::TestExternalities;
use sp_runtime::{traits::Block as BlockT, StateVersion};
use spinners::{Spinner, Spinners};
use std::{
cmp::max,
cmp::{max, min},
fs,
ops::{Deref, DerefMut},
path::{Path, PathBuf},
Expand Down Expand Up @@ -360,7 +360,8 @@ where
const PARALLEL_REQUESTS: usize = 4;
const BATCH_SIZE_INCREASE_FACTOR: f32 = 1.10;
const BATCH_SIZE_DECREASE_FACTOR: f32 = 0.50;
const INITIAL_BATCH_SIZE: usize = 5000;
const REQUEST_DURATION_TARGET: Duration = Duration::from_secs(2);
const INITIAL_BATCH_SIZE: usize = 10;
// nodes by default will not return more than 1000 keys per request
const DEFAULT_KEY_DOWNLOAD_PAGE: u32 = 1000;
const KEYS_PAGE_MAX_RETRIES: usize = 12;
Expand Down Expand Up @@ -521,6 +522,7 @@ where
.insert(method, params.clone())
.map_err(|_| "Invalid batch method and/or params")?
}
let request_started = Instant::now();
let batch_response = match client.batch_request::<Option<StorageData>>(batch).await {
Ok(batch_response) => batch_response,
Err(e) => {
Expand All @@ -530,20 +532,39 @@ where

log::debug!(
target: LOG_TARGET,
"Batch request failed, trying again with smaller batch size. {}",
"Batch request failed, resetting batch size to 1. Error: {}",
e.to_string()
);

return Self::get_storage_data_dynamic_batch_size(
client,
payloads,
max(1, (batch_size as f32 * Self::BATCH_SIZE_DECREASE_FACTOR) as usize),
bar,
)
.await
// Request timed out or server errored. Try to get things moving again by starting
// again with just 1 item.
return Self::get_storage_data_dynamic_batch_size(client, payloads, 1, bar).await
},
};

// Request succeeded. Decide whether to increase or decrease the batch size for the next
// request, depending on if the elapsed time was greater than or less than the target.
let request_duration = request_started.elapsed();
let next_batch_size = if request_duration > Self::REQUEST_DURATION_TARGET {
max(1, (batch_size as f32 * Self::BATCH_SIZE_DECREASE_FACTOR) as usize)
} else {
// Increase the batch size by *at most* the number of remaining payloads
min(
payloads.len(),
// Increase the batch size by *at least* 1
max(
batch_size + 1,
(batch_size as f32 * Self::BATCH_SIZE_INCREASE_FACTOR) as usize,
),
)
};

log::debug!(
target: LOG_TARGET,
"Request duration: {:?} Target duration: {:?} Last batch size: {} Next batch size: {}",
request_duration, Self::REQUEST_DURATION_TARGET, batch_size, next_batch_size
);

// Collect the data from this batch
let mut data: Vec<Option<StorageData>> = vec![];
let batch_response_len = batch_response.len();
Expand All @@ -560,7 +581,7 @@ where
let mut rest = Self::get_storage_data_dynamic_batch_size(
client,
remaining_payloads,
max(batch_size + 1, (batch_size as f32 * Self::BATCH_SIZE_INCREASE_FACTOR) as usize),
next_batch_size,
bar,
)
.await?;
Expand Down Expand Up @@ -1345,7 +1366,7 @@ mod remote_tests {
.execute_with(|| {});
}

#[tokio::test(flavor = "multi_thread")]
#[tokio::test]
async fn can_create_snapshot() {
const CACHE: &'static str = "can_create_snapshot";
init_logger();
Expand All @@ -1369,7 +1390,7 @@ mod remote_tests {
.filter(|p| p.path().file_name().unwrap_or_default() == CACHE)
.collect::<Vec<_>>();

let snap: Snapshot<Block> = Builder::<Block>::new().load_snapshot(CACHE.into()).unwrap();
let snap: Snapshot<Block> = Snapshot::load(&PathBuf::from(CACHE)).unwrap();
assert!(matches!(snap, Snapshot { raw_storage, .. } if raw_storage.len() > 0));

assert!(to_delete.len() == 1);
Expand Down Expand Up @@ -1401,7 +1422,7 @@ mod remote_tests {
.filter(|p| p.path().file_name().unwrap_or_default() == CACHE)
.collect::<Vec<_>>();

let snap: Snapshot<Block> = Builder::<Block>::new().load_snapshot(CACHE.into()).unwrap();
let snap: Snapshot<Block> = Snapshot::load(&PathBuf::from(CACHE)).unwrap();
assert!(matches!(snap, Snapshot { raw_storage, .. } if raw_storage.len() > 0));

assert!(to_delete.len() == 1);
Expand Down