From 576718502a17d3bbc55d934c5302b5bebee4897a Mon Sep 17 00:00:00 2001 From: Liam Aharon Date: Wed, 30 Aug 2023 13:15:54 +1000 Subject: [PATCH 1/7] target a request duration instead of increasing to failure --- .../frame/remote-externalities/src/lib.rs | 43 ++++++++++++++----- 1 file changed, 32 insertions(+), 11 deletions(-) diff --git a/substrate/utils/frame/remote-externalities/src/lib.rs b/substrate/utils/frame/remote-externalities/src/lib.rs index 072ea6ef5e59..3080e12af40a 100644 --- a/substrate/utils/frame/remote-externalities/src/lib.rs +++ b/substrate/utils/frame/remote-externalities/src/lib.rs @@ -44,7 +44,7 @@ use sp_runtime::{ use sp_state_machine::TestExternalities; use spinners::{Spinner, Spinners}; use std::{ - cmp::max, + cmp::{max, min}, fs, ops::{Deref, DerefMut}, path::{Path, PathBuf}, @@ -353,7 +353,8 @@ where const PARALLEL_REQUESTS: usize = 4; const BATCH_SIZE_INCREASE_FACTOR: f32 = 1.10; const BATCH_SIZE_DECREASE_FACTOR: f32 = 0.50; - const INITIAL_BATCH_SIZE: usize = 5000; + const REQUEST_DURATION_TARGET: Duration = Duration::from_secs(2); + const INITIAL_BATCH_SIZE: usize = 10; // nodes by default will not return more than 1000 keys per request const DEFAULT_KEY_DOWNLOAD_PAGE: u32 = 1000; const KEYS_PAGE_MAX_RETRIES: usize = 12; @@ -514,6 +515,7 @@ where .insert(method, params.clone()) .map_err(|_| "Invalid batch method and/or params")? } + let request_started = Instant::now(); let batch_response = match client.batch_request::>(batch).await { Ok(batch_response) => batch_response, Err(e) => { @@ -523,20 +525,39 @@ where log::debug!( target: LOG_TARGET, - "Batch request failed, trying again with smaller batch size. {}", + "Batch request failed, resetting batch size to 1. Error: {}", e.to_string() ); - return Self::get_storage_data_dynamic_batch_size( - client, - payloads, - max(1, (batch_size as f32 * Self::BATCH_SIZE_DECREASE_FACTOR) as usize), - bar, - ) - .await + // Request timed out or server errored. This is very bad. Try to get things moving + // again by starting again with just 1 item. + return Self::get_storage_data_dynamic_batch_size(client, payloads, 1, bar).await }, }; + // Request succeeded. Decide whether to increase or decrease the batch size for the next + // request, depending on if the elapsed time was greater than or less than the target. + let request_duration = request_started.elapsed(); + let next_batch_size = if request_duration > Self::REQUEST_DURATION_TARGET { + max(1, (batch_size as f32 * Self::BATCH_SIZE_DECREASE_FACTOR) as usize) + } else { + // Increase the batch size by *at most* the number of remaining payloads + min( + payloads.len(), + // Increase the batch size by *at least* 1 + max( + batch_size + 1, + (batch_size as f32 * Self::BATCH_SIZE_INCREASE_FACTOR) as usize, + ), + ) + }; + + log::debug!( + target: LOG_TARGET, + "Request duration: {:?} Target duration: {:?} Last batch size: {} Next batch size: {}", + request_duration, Self::REQUEST_DURATION_TARGET, batch_size, next_batch_size + ); + // Collect the data from this batch let mut data: Vec> = vec![]; let batch_response_len = batch_response.len(); @@ -553,7 +574,7 @@ where let mut rest = Self::get_storage_data_dynamic_batch_size( client, remaining_payloads, - max(batch_size + 1, (batch_size as f32 * Self::BATCH_SIZE_INCREASE_FACTOR) as usize), + next_batch_size, bar, ) .await?; From 62238504370e773f6467982eb005c96cb575bdbf Mon Sep 17 00:00:00 2001 From: Liam Aharon Date: Thu, 21 Sep 2023 15:42:04 +0100 Subject: [PATCH 2/7] use loop instead of recursion --- Cargo.lock | 12 -- .../frame/remote-externalities/Cargo.toml | 1 - .../frame/remote-externalities/src/lib.rs | 194 ++++++++---------- 3 files changed, 89 insertions(+), 118 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index ddb19c24e1f6..fe908a1234b8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1130,17 +1130,6 @@ dependencies = [ "windows-sys 0.48.0", ] -[[package]] -name = "async-recursion" -version = "1.0.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0e97ce7de6cf12de5d7226c73f5ba9811622f4db3a5b91b55c53e987e5f91cba" -dependencies = [ - "proc-macro2", - "quote", - "syn 2.0.37", -] - [[package]] name = "async-stream" version = "0.3.5" @@ -5286,7 +5275,6 @@ dependencies = [ name = "frame-remote-externalities" version = "0.10.0-dev" dependencies = [ - "async-recursion", "futures", "indicatif", "jsonrpsee", diff --git a/substrate/utils/frame/remote-externalities/Cargo.toml b/substrate/utils/frame/remote-externalities/Cargo.toml index ad6ab006da1d..7067aed238ac 100644 --- a/substrate/utils/frame/remote-externalities/Cargo.toml +++ b/substrate/utils/frame/remote-externalities/Cargo.toml @@ -23,7 +23,6 @@ sp-runtime = { path = "../../../primitives/runtime" } tokio = { version = "1.22.0", features = ["macros", "rt-multi-thread"] } substrate-rpc-client = { path = "../rpc/client" } futures = "0.3" -async-recursion = "1.0.4" indicatif = "0.17.3" spinners = "4.1.0" tokio-retry = "0.3.0" diff --git a/substrate/utils/frame/remote-externalities/src/lib.rs b/substrate/utils/frame/remote-externalities/src/lib.rs index 3080e12af40a..d2095d1c382d 100644 --- a/substrate/utils/frame/remote-externalities/src/lib.rs +++ b/substrate/utils/frame/remote-externalities/src/lib.rs @@ -20,7 +20,6 @@ //! An equivalent of `sp_io::TestExternalities` that can load its state from a remote substrate //! based chain, or a local state snapshot file. -use async_recursion::async_recursion; use codec::{Compact, Decode, Encode}; use indicatif::{ProgressBar, ProgressStyle}; use jsonrpsee::{ @@ -357,7 +356,7 @@ where const INITIAL_BATCH_SIZE: usize = 10; // nodes by default will not return more than 1000 keys per request const DEFAULT_KEY_DOWNLOAD_PAGE: u32 = 1000; - const KEYS_PAGE_MAX_RETRIES: usize = 12; + const MAX_RETRIES: usize = 12; const KEYS_PAGE_RETRY_INTERVAL: Duration = Duration::from_secs(5); async fn rpc_get_storage( @@ -412,8 +411,8 @@ where let keys = loop { // This loop can hit the node with very rapid requests, occasionally causing it to // error out in CI (https://github.com/paritytech/substrate/issues/14129), so we retry. - let retry_strategy = FixedInterval::new(Self::KEYS_PAGE_RETRY_INTERVAL) - .take(Self::KEYS_PAGE_MAX_RETRIES); + let retry_strategy = + FixedInterval::new(Self::KEYS_PAGE_RETRY_INTERVAL).take(Self::MAX_RETRIES); let get_page_closure = || self.get_keys_single_page(Some(prefix.clone()), last_key.clone(), at); let page = Retry::spawn(retry_strategy, get_page_closure).await?; @@ -449,8 +448,6 @@ where /// /// * `client` - An `Arc` wrapped `HttpClient` used for making the requests. /// * `payloads` - A vector of tuples containing a JSONRPC method name and `ArrayParams` - /// * `batch_size` - The initial batch size to use for the request. The batch size will be - /// adjusted dynamically in case of failure. /// /// # Returns /// @@ -486,100 +483,98 @@ where /// } /// } /// ``` - #[async_recursion] async fn get_storage_data_dynamic_batch_size( client: &HttpClient, payloads: Vec<(String, ArrayParams)>, - batch_size: usize, bar: &ProgressBar, ) -> Result>, String> { - // All payloads have been processed - if payloads.is_empty() { - return Ok(vec![]) - }; - - log::debug!( - target: LOG_TARGET, - "Remaining payloads: {} Batch request size: {}", - payloads.len(), - batch_size, - ); + let mut all_data: Vec> = vec![]; + let mut start_index = 0; + let mut retries = 0usize; + let mut batch_size = Self::INITIAL_BATCH_SIZE; + let total_payloads = payloads.len(); + + while start_index <= total_payloads { + log::debug!( + target: LOG_TARGET, + "Remaining payloads: {} Batch request size: {}", + total_payloads - start_index, + batch_size, + ); - // Payloads to attempt to process this batch - let page = payloads.iter().take(batch_size).cloned().collect::>(); + let end_index = usize::min(start_index + batch_size, total_payloads); + let page = &payloads[start_index..end_index]; - // Build the batch request - let mut batch = BatchRequestBuilder::new(); - for (method, params) in page.iter() { - batch - .insert(method, params.clone()) - .map_err(|_| "Invalid batch method and/or params")? - } - let request_started = Instant::now(); - let batch_response = match client.batch_request::>(batch).await { - Ok(batch_response) => batch_response, - Err(e) => { - if batch_size < 2 { - return Err(e.to_string()) - } + // Build the batch request + let mut batch = BatchRequestBuilder::new(); + for (method, params) in page.iter() { + batch + .insert(method, params.clone()) + .map_err(|_| "Invalid batch method and/or params")?; + } - log::debug!( - target: LOG_TARGET, - "Batch request failed, resetting batch size to 1. Error: {}", - e.to_string() - ); + let request_started = Instant::now(); + let batch_response = match client.batch_request::>(batch).await { + Ok(batch_response) => { + retries = 0; + batch_response + }, + Err(e) => { + if retries > Self::MAX_RETRIES { + return Err(e.to_string()) + } - // Request timed out or server errored. This is very bad. Try to get things moving - // again by starting again with just 1 item. - return Self::get_storage_data_dynamic_batch_size(client, payloads, 1, bar).await - }, - }; + batch_size = 1; + retries += 1; + log::warn!( + target: LOG_TARGET, + "Batch request failed ({}/{} retries). Setting batch size to 1 and trying again. Error: {}", + retries, + Self::MAX_RETRIES, + e.to_string() + ); + continue + }, + }; - // Request succeeded. Decide whether to increase or decrease the batch size for the next - // request, depending on if the elapsed time was greater than or less than the target. - let request_duration = request_started.elapsed(); - let next_batch_size = if request_duration > Self::REQUEST_DURATION_TARGET { - max(1, (batch_size as f32 * Self::BATCH_SIZE_DECREASE_FACTOR) as usize) - } else { - // Increase the batch size by *at most* the number of remaining payloads - min( - payloads.len(), - // Increase the batch size by *at least* 1 - max( - batch_size + 1, - (batch_size as f32 * Self::BATCH_SIZE_INCREASE_FACTOR) as usize, - ), - ) - }; + let request_duration = request_started.elapsed(); + batch_size = if request_duration > Self::REQUEST_DURATION_TARGET { + // Decrease batch size + max(1, (batch_size as f32 * Self::BATCH_SIZE_DECREASE_FACTOR) as usize) + } else { + // Increase batch size, but not more than the remaining total payloads to process + min( + total_payloads - start_index, + max( + batch_size + 1, + (batch_size as f32 * Self::BATCH_SIZE_INCREASE_FACTOR) as usize, + ), + ) + }; - log::debug!( - target: LOG_TARGET, - "Request duration: {:?} Target duration: {:?} Last batch size: {} Next batch size: {}", - request_duration, Self::REQUEST_DURATION_TARGET, batch_size, next_batch_size - ); + log::debug!( + target: LOG_TARGET, + "Request duration: {:?} Target duration: {:?} Last batch size: {} Next batch size: {}", + request_duration, + Self::REQUEST_DURATION_TARGET, + end_index - start_index, + batch_size + ); - // Collect the data from this batch - let mut data: Vec> = vec![]; - let batch_response_len = batch_response.len(); - for item in batch_response.into_iter() { - match item { - Ok(x) => data.push(x), - Err(e) => return Err(e.message().to_string()), + let batch_response_len = batch_response.len(); + for item in batch_response.into_iter() { + match item { + Ok(x) => all_data.push(x), + Err(e) => return Err(e.message().to_string()), + } } + bar.inc(batch_response_len as u64); + + // Update the start index for the next iteration + start_index = end_index; } - bar.inc(batch_response_len as u64); - // Return this data joined with the remaining keys - let remaining_payloads = payloads.iter().skip(batch_size).cloned().collect::>(); - let mut rest = Self::get_storage_data_dynamic_batch_size( - client, - remaining_payloads, - next_batch_size, - bar, - ) - .await?; - data.append(&mut rest); - Ok(data) + Ok(all_data) } /// Synonym of `getPairs` that uses paged queries to first get the keys, and then @@ -626,12 +621,7 @@ where ); let payloads_chunked = payloads.chunks((&payloads.len() / Self::PARALLEL_REQUESTS).max(1)); let requests = payloads_chunked.map(|payload_chunk| { - Self::get_storage_data_dynamic_batch_size( - &client, - payload_chunk.to_vec(), - Self::INITIAL_BATCH_SIZE, - &bar, - ) + Self::get_storage_data_dynamic_batch_size(&client, payload_chunk.to_vec(), &bar) }); // Execute the requests and move the Result outside. let storage_data_result: Result, _> = @@ -704,20 +694,14 @@ where .collect::>(); let bar = ProgressBar::new(payloads.len() as u64); - let storage_data = match Self::get_storage_data_dynamic_batch_size( - client, - payloads, - Self::INITIAL_BATCH_SIZE, - &bar, - ) - .await - { - Ok(storage_data) => storage_data, - Err(e) => { - log::error!(target: LOG_TARGET, "batch processing failed: {:?}", e); - return Err("batch processing failed") - }, - }; + let storage_data = + match Self::get_storage_data_dynamic_batch_size(client, payloads, &bar).await { + Ok(storage_data) => storage_data, + Err(e) => { + log::error!(target: LOG_TARGET, "batch processing failed: {:?}", e); + return Err("batch processing failed") + }, + }; assert_eq!(child_keys_len, storage_data.len()); From a474aeded2a87290fd2b0935a6ddb750ee5ce9fc Mon Sep 17 00:00:00 2001 From: Liam Aharon Date: Thu, 21 Sep 2023 15:43:05 +0100 Subject: [PATCH 3/7] fix conditional check --- substrate/utils/frame/remote-externalities/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/substrate/utils/frame/remote-externalities/src/lib.rs b/substrate/utils/frame/remote-externalities/src/lib.rs index d2095d1c382d..fc9c858a52fb 100644 --- a/substrate/utils/frame/remote-externalities/src/lib.rs +++ b/substrate/utils/frame/remote-externalities/src/lib.rs @@ -494,7 +494,7 @@ where let mut batch_size = Self::INITIAL_BATCH_SIZE; let total_payloads = payloads.len(); - while start_index <= total_payloads { + while all_data.len() < total_payloads { log::debug!( target: LOG_TARGET, "Remaining payloads: {} Batch request size: {}", From b01349242874e763e382e3264f15cb8e2e98728e Mon Sep 17 00:00:00 2001 From: Liam Aharon Date: Thu, 21 Sep 2023 15:58:32 +0100 Subject: [PATCH 4/7] improve conditional --- substrate/utils/frame/remote-externalities/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/substrate/utils/frame/remote-externalities/src/lib.rs b/substrate/utils/frame/remote-externalities/src/lib.rs index fc9c858a52fb..dd4a26325f6b 100644 --- a/substrate/utils/frame/remote-externalities/src/lib.rs +++ b/substrate/utils/frame/remote-externalities/src/lib.rs @@ -494,7 +494,7 @@ where let mut batch_size = Self::INITIAL_BATCH_SIZE; let total_payloads = payloads.len(); - while all_data.len() < total_payloads { + while start_index < total_payloads { log::debug!( target: LOG_TARGET, "Remaining payloads: {} Batch request size: {}", From 73db54983e193b550170ed0b118fd99eb31532b7 Mon Sep 17 00:00:00 2001 From: Liam Aharon Date: Fri, 22 Sep 2023 16:38:59 +0100 Subject: [PATCH 5/7] more granular retry handling --- .../utils/frame/remote-externalities/src/lib.rs | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) diff --git a/substrate/utils/frame/remote-externalities/src/lib.rs b/substrate/utils/frame/remote-externalities/src/lib.rs index dd4a26325f6b..49c442dc73ca 100644 --- a/substrate/utils/frame/remote-externalities/src/lib.rs +++ b/substrate/utils/frame/remote-externalities/src/lib.rs @@ -524,15 +524,24 @@ where return Err(e.to_string()) } - batch_size = 1; retries += 1; - log::warn!( - target: LOG_TARGET, - "Batch request failed ({}/{} retries). Setting batch size to 1 and trying again. Error: {}", + let failure_log = format!( + "Batch request failed ({}/{} retries). Error: {}", retries, Self::MAX_RETRIES, e.to_string() ); + // after 3 subsequent failures something very wrong is happening. log a warning + // and reset the batch size down to 1. + if retries >= 3 { + log::warn!("{}", failure_log); + batch_size = 1; + } else { + log::debug!("{}", failure_log); + // Decrease batch size by DECREASE_FACTOR + batch_size = + (batch_size as f32 * Self::BATCH_SIZE_DECREASE_FACTOR) as usize; + } continue }, }; From 910b039bae7b70758f36ed4de61b7008812acdc5 Mon Sep 17 00:00:00 2001 From: Liam Aharon Date: Mon, 25 Sep 2023 15:38:39 +0100 Subject: [PATCH 6/7] tweak config --- substrate/utils/frame/remote-externalities/src/lib.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/substrate/utils/frame/remote-externalities/src/lib.rs b/substrate/utils/frame/remote-externalities/src/lib.rs index 49c442dc73ca..9872ac810dd9 100644 --- a/substrate/utils/frame/remote-externalities/src/lib.rs +++ b/substrate/utils/frame/remote-externalities/src/lib.rs @@ -349,10 +349,10 @@ where B::Hash: DeserializeOwned, B::Header: DeserializeOwned, { - const PARALLEL_REQUESTS: usize = 4; + const PARALLEL_REQUESTS: usize = 2; const BATCH_SIZE_INCREASE_FACTOR: f32 = 1.10; const BATCH_SIZE_DECREASE_FACTOR: f32 = 0.50; - const REQUEST_DURATION_TARGET: Duration = Duration::from_secs(2); + const REQUEST_DURATION_TARGET: Duration = Duration::from_secs(15); const INITIAL_BATCH_SIZE: usize = 10; // nodes by default will not return more than 1000 keys per request const DEFAULT_KEY_DOWNLOAD_PAGE: u32 = 1000; From ad5e763c16b90355e5d74c6e20344c8399f4ba78 Mon Sep 17 00:00:00 2001 From: Liam Aharon Date: Mon, 25 Sep 2023 16:01:08 +0100 Subject: [PATCH 7/7] tweak config --- substrate/utils/frame/remote-externalities/src/lib.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/substrate/utils/frame/remote-externalities/src/lib.rs b/substrate/utils/frame/remote-externalities/src/lib.rs index 9872ac810dd9..71e9320ebeeb 100644 --- a/substrate/utils/frame/remote-externalities/src/lib.rs +++ b/substrate/utils/frame/remote-externalities/src/lib.rs @@ -349,7 +349,7 @@ where B::Hash: DeserializeOwned, B::Header: DeserializeOwned, { - const PARALLEL_REQUESTS: usize = 2; + const PARALLEL_REQUESTS: usize = 4; const BATCH_SIZE_INCREASE_FACTOR: f32 = 1.10; const BATCH_SIZE_DECREASE_FACTOR: f32 = 0.50; const REQUEST_DURATION_TARGET: Duration = Duration::from_secs(15); @@ -531,9 +531,9 @@ where Self::MAX_RETRIES, e.to_string() ); - // after 3 subsequent failures something very wrong is happening. log a warning + // after 2 subsequent failures something very wrong is happening. log a warning // and reset the batch size down to 1. - if retries >= 3 { + if retries >= 2 { log::warn!("{}", failure_log); batch_size = 1; } else {