diff --git a/Cargo.lock b/Cargo.lock index cc80e2542fb0..03cc4c4b528b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1096,17 +1096,6 @@ dependencies = [ "windows-sys 0.48.0", ] -[[package]] -name = "async-recursion" -version = "1.0.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0e97ce7de6cf12de5d7226c73f5ba9811622f4db3a5b91b55c53e987e5f91cba" -dependencies = [ - "proc-macro2", - "quote", - "syn 2.0.38", -] - [[package]] name = "async-stream" version = "0.3.5" @@ -5286,7 +5275,6 @@ dependencies = [ name = "frame-remote-externalities" version = "0.10.0-dev" dependencies = [ - "async-recursion", "futures", "indicatif", "jsonrpsee", diff --git a/substrate/utils/frame/remote-externalities/Cargo.toml b/substrate/utils/frame/remote-externalities/Cargo.toml index ad6ab006da1d..7067aed238ac 100644 --- a/substrate/utils/frame/remote-externalities/Cargo.toml +++ b/substrate/utils/frame/remote-externalities/Cargo.toml @@ -23,7 +23,6 @@ sp-runtime = { path = "../../../primitives/runtime" } tokio = { version = "1.22.0", features = ["macros", "rt-multi-thread"] } substrate-rpc-client = { path = "../rpc/client" } futures = "0.3" -async-recursion = "1.0.4" indicatif = "0.17.3" spinners = "4.1.0" tokio-retry = "0.3.0" diff --git a/substrate/utils/frame/remote-externalities/src/lib.rs b/substrate/utils/frame/remote-externalities/src/lib.rs index 072ea6ef5e59..71e9320ebeeb 100644 --- a/substrate/utils/frame/remote-externalities/src/lib.rs +++ b/substrate/utils/frame/remote-externalities/src/lib.rs @@ -20,7 +20,6 @@ //! An equivalent of `sp_io::TestExternalities` that can load its state from a remote substrate //! based chain, or a local state snapshot file. -use async_recursion::async_recursion; use codec::{Compact, Decode, Encode}; use indicatif::{ProgressBar, ProgressStyle}; use jsonrpsee::{ @@ -44,7 +43,7 @@ use sp_runtime::{ use sp_state_machine::TestExternalities; use spinners::{Spinner, Spinners}; use std::{ - cmp::max, + cmp::{max, min}, fs, ops::{Deref, DerefMut}, path::{Path, PathBuf}, @@ -353,10 +352,11 @@ where const PARALLEL_REQUESTS: usize = 4; const BATCH_SIZE_INCREASE_FACTOR: f32 = 1.10; const BATCH_SIZE_DECREASE_FACTOR: f32 = 0.50; - const INITIAL_BATCH_SIZE: usize = 5000; + const REQUEST_DURATION_TARGET: Duration = Duration::from_secs(15); + const INITIAL_BATCH_SIZE: usize = 10; // nodes by default will not return more than 1000 keys per request const DEFAULT_KEY_DOWNLOAD_PAGE: u32 = 1000; - const KEYS_PAGE_MAX_RETRIES: usize = 12; + const MAX_RETRIES: usize = 12; const KEYS_PAGE_RETRY_INTERVAL: Duration = Duration::from_secs(5); async fn rpc_get_storage( @@ -411,8 +411,8 @@ where let keys = loop { // This loop can hit the node with very rapid requests, occasionally causing it to // error out in CI (https://github.com/paritytech/substrate/issues/14129), so we retry. - let retry_strategy = FixedInterval::new(Self::KEYS_PAGE_RETRY_INTERVAL) - .take(Self::KEYS_PAGE_MAX_RETRIES); + let retry_strategy = + FixedInterval::new(Self::KEYS_PAGE_RETRY_INTERVAL).take(Self::MAX_RETRIES); let get_page_closure = || self.get_keys_single_page(Some(prefix.clone()), last_key.clone(), at); let page = Retry::spawn(retry_strategy, get_page_closure).await?; @@ -448,8 +448,6 @@ where /// /// * `client` - An `Arc` wrapped `HttpClient` used for making the requests. /// * `payloads` - A vector of tuples containing a JSONRPC method name and `ArrayParams` - /// * `batch_size` - The initial batch size to use for the request. The batch size will be - /// adjusted dynamically in case of failure. /// /// # Returns /// @@ -485,80 +483,107 @@ where /// } /// } /// ``` - #[async_recursion] async fn get_storage_data_dynamic_batch_size( client: &HttpClient, payloads: Vec<(String, ArrayParams)>, - batch_size: usize, bar: &ProgressBar, ) -> Result>, String> { - // All payloads have been processed - if payloads.is_empty() { - return Ok(vec![]) - }; - - log::debug!( - target: LOG_TARGET, - "Remaining payloads: {} Batch request size: {}", - payloads.len(), - batch_size, - ); + let mut all_data: Vec> = vec![]; + let mut start_index = 0; + let mut retries = 0usize; + let mut batch_size = Self::INITIAL_BATCH_SIZE; + let total_payloads = payloads.len(); + + while start_index < total_payloads { + log::debug!( + target: LOG_TARGET, + "Remaining payloads: {} Batch request size: {}", + total_payloads - start_index, + batch_size, + ); - // Payloads to attempt to process this batch - let page = payloads.iter().take(batch_size).cloned().collect::>(); + let end_index = usize::min(start_index + batch_size, total_payloads); + let page = &payloads[start_index..end_index]; - // Build the batch request - let mut batch = BatchRequestBuilder::new(); - for (method, params) in page.iter() { - batch - .insert(method, params.clone()) - .map_err(|_| "Invalid batch method and/or params")? - } - let batch_response = match client.batch_request::>(batch).await { - Ok(batch_response) => batch_response, - Err(e) => { - if batch_size < 2 { - return Err(e.to_string()) - } + // Build the batch request + let mut batch = BatchRequestBuilder::new(); + for (method, params) in page.iter() { + batch + .insert(method, params.clone()) + .map_err(|_| "Invalid batch method and/or params")?; + } - log::debug!( - target: LOG_TARGET, - "Batch request failed, trying again with smaller batch size. {}", - e.to_string() - ); + let request_started = Instant::now(); + let batch_response = match client.batch_request::>(batch).await { + Ok(batch_response) => { + retries = 0; + batch_response + }, + Err(e) => { + if retries > Self::MAX_RETRIES { + return Err(e.to_string()) + } + + retries += 1; + let failure_log = format!( + "Batch request failed ({}/{} retries). Error: {}", + retries, + Self::MAX_RETRIES, + e.to_string() + ); + // after 2 subsequent failures something very wrong is happening. log a warning + // and reset the batch size down to 1. + if retries >= 2 { + log::warn!("{}", failure_log); + batch_size = 1; + } else { + log::debug!("{}", failure_log); + // Decrease batch size by DECREASE_FACTOR + batch_size = + (batch_size as f32 * Self::BATCH_SIZE_DECREASE_FACTOR) as usize; + } + continue + }, + }; - return Self::get_storage_data_dynamic_batch_size( - client, - payloads, - max(1, (batch_size as f32 * Self::BATCH_SIZE_DECREASE_FACTOR) as usize), - bar, + let request_duration = request_started.elapsed(); + batch_size = if request_duration > Self::REQUEST_DURATION_TARGET { + // Decrease batch size + max(1, (batch_size as f32 * Self::BATCH_SIZE_DECREASE_FACTOR) as usize) + } else { + // Increase batch size, but not more than the remaining total payloads to process + min( + total_payloads - start_index, + max( + batch_size + 1, + (batch_size as f32 * Self::BATCH_SIZE_INCREASE_FACTOR) as usize, + ), ) - .await - }, - }; + }; + + log::debug!( + target: LOG_TARGET, + "Request duration: {:?} Target duration: {:?} Last batch size: {} Next batch size: {}", + request_duration, + Self::REQUEST_DURATION_TARGET, + end_index - start_index, + batch_size + ); - // Collect the data from this batch - let mut data: Vec> = vec![]; - let batch_response_len = batch_response.len(); - for item in batch_response.into_iter() { - match item { - Ok(x) => data.push(x), - Err(e) => return Err(e.message().to_string()), + let batch_response_len = batch_response.len(); + for item in batch_response.into_iter() { + match item { + Ok(x) => all_data.push(x), + Err(e) => return Err(e.message().to_string()), + } } + bar.inc(batch_response_len as u64); + + // Update the start index for the next iteration + start_index = end_index; } - bar.inc(batch_response_len as u64); - // Return this data joined with the remaining keys - let remaining_payloads = payloads.iter().skip(batch_size).cloned().collect::>(); - let mut rest = Self::get_storage_data_dynamic_batch_size( - client, - remaining_payloads, - max(batch_size + 1, (batch_size as f32 * Self::BATCH_SIZE_INCREASE_FACTOR) as usize), - bar, - ) - .await?; - data.append(&mut rest); - Ok(data) + Ok(all_data) } /// Synonym of `getPairs` that uses paged queries to first get the keys, and then @@ -605,12 +630,7 @@ where ); let payloads_chunked = payloads.chunks((&payloads.len() / Self::PARALLEL_REQUESTS).max(1)); let requests = payloads_chunked.map(|payload_chunk| { - Self::get_storage_data_dynamic_batch_size( - &client, - payload_chunk.to_vec(), - Self::INITIAL_BATCH_SIZE, - &bar, - ) + Self::get_storage_data_dynamic_batch_size(&client, payload_chunk.to_vec(), &bar) }); // Execute the requests and move the Result outside. let storage_data_result: Result, _> = @@ -683,20 +703,14 @@ where .collect::>(); let bar = ProgressBar::new(payloads.len() as u64); - let storage_data = match Self::get_storage_data_dynamic_batch_size( - client, - payloads, - Self::INITIAL_BATCH_SIZE, - &bar, - ) - .await - { - Ok(storage_data) => storage_data, - Err(e) => { - log::error!(target: LOG_TARGET, "batch processing failed: {:?}", e); - return Err("batch processing failed") - }, - }; + let storage_data = + match Self::get_storage_data_dynamic_batch_size(client, payloads, &bar).await { + Ok(storage_data) => storage_data, + Err(e) => { + log::error!(target: LOG_TARGET, "batch processing failed: {:?}", e); + return Err("batch processing failed") + }, + }; assert_eq!(child_keys_len, storage_data.len());