diff --git a/Cargo.lock b/Cargo.lock index 87a2dad75494..6dc3bed95ae5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8988,9 +8988,9 @@ checksum = "04744f49eae99ab78e0d5c0b603ab218f515ea8cfe5a456d7629ad883a3b6e7d" [[package]] name = "orchestra" -version = "0.3.3" +version = "0.3.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "46d78e1deb2a8d54fc1f063a544130db4da31dfe4d5d3b493186424910222a76" +checksum = "5edee0c1917703f8a28cd229cf6a5c91a7ee34be139ced16509ac5b53b9d0c51" dependencies = [ "async-trait", "dyn-clonable", @@ -9005,9 +9005,9 @@ dependencies = [ [[package]] name = "orchestra-proc-macro" -version = "0.3.3" +version = "0.3.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d035b1f968d91a826f2e34a9d6d02cb2af5aa7ca39ebd27922d850ab4b2dd2c6" +checksum = "4f60e64a3808b5bb2786b9da09fc70714952aabcdd0eeba6f1718e3dbc34ad5b" dependencies = [ "expander 2.0.0", "indexmap 2.0.0", @@ -14028,9 +14028,9 @@ dependencies = [ [[package]] name = "prioritized-metered-channel" -version = "0.5.1" +version = "0.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e99f0c89bd88f393aab44a4ab949351f7bc7e7e1179d11ecbfe50cbe4c47e342" +checksum = "a172e6cc603231f2cf004232eabcecccc0da53ba576ab286ef7baa0cfc7927ad" dependencies = [ "coarsetime", "crossbeam-queue", diff --git a/cumulus/client/relay-chain-inprocess-interface/Cargo.toml b/cumulus/client/relay-chain-inprocess-interface/Cargo.toml index 63f4c9154743..7c7edc502d4c 100644 --- a/cumulus/client/relay-chain-inprocess-interface/Cargo.toml +++ b/cumulus/client/relay-chain-inprocess-interface/Cargo.toml @@ -42,7 +42,7 @@ sp-keyring = { path = "../../../substrate/primitives/keyring" } # Polkadot polkadot-primitives = { path = "../../../polkadot/primitives" } polkadot-test-client = { path = "../../../polkadot/node/test/client" } -metered = { package = "prioritized-metered-channel", version = "0.5.1", default-features = false, features = ["futures_channel"] } +metered = { package = "prioritized-metered-channel", version = "0.6.1", default-features = false, features = ["futures_channel"] } # Cumulus cumulus-test-service = { path = "../../test/service" } diff --git a/polkadot/node/core/candidate-validation/src/lib.rs b/polkadot/node/core/candidate-validation/src/lib.rs index 18c279689158..bf6e09fd1b69 100644 --- a/polkadot/node/core/candidate-validation/src/lib.rs +++ b/polkadot/node/core/candidate-validation/src/lib.rs @@ -55,10 +55,11 @@ use polkadot_primitives::{ use parity_scale_codec::Encode; -use futures::{channel::oneshot, prelude::*}; +use futures::{channel::oneshot, prelude::*, stream::FuturesUnordered}; use std::{ path::PathBuf, + pin::Pin, sync::Arc, time::{Duration, Instant}, }; @@ -81,6 +82,11 @@ const PVF_APPROVAL_EXECUTION_RETRY_DELAY: Duration = Duration::from_secs(3); #[cfg(test)] const PVF_APPROVAL_EXECUTION_RETRY_DELAY: Duration = Duration::from_millis(200); +// The task queue size is chosen to be somewhat bigger than the PVF host incoming queue size +// to allow exhaustive validation messages to fall through in case the tasks are clogged with +// `ValidateFromChainState` messages awaiting data from the runtime +const TASK_LIMIT: usize = 30; + /// Configuration for the candidate validation subsystem #[derive(Clone)] pub struct Config { @@ -130,6 +136,83 @@ impl CandidateValidationSubsystem { } } +fn handle_validation_message( + mut sender: S, + validation_host: ValidationHost, + metrics: Metrics, + msg: CandidateValidationMessage, +) -> Pin + Send>> +where + S: SubsystemSender, +{ + match msg { + CandidateValidationMessage::ValidateFromChainState { + candidate_receipt, + pov, + executor_params, + exec_kind, + response_sender, + .. + } => async move { + let _timer = metrics.time_validate_from_chain_state(); + let res = validate_from_chain_state( + &mut sender, + validation_host, + candidate_receipt, + pov, + executor_params, + exec_kind, + &metrics, + ) + .await; + + metrics.on_validation_event(&res); + let _ = response_sender.send(res); + } + .boxed(), + CandidateValidationMessage::ValidateFromExhaustive { + validation_data, + validation_code, + candidate_receipt, + pov, + executor_params, + exec_kind, + response_sender, + .. + } => async move { + let _timer = metrics.time_validate_from_exhaustive(); + let res = validate_candidate_exhaustive( + validation_host, + validation_data, + validation_code, + candidate_receipt, + pov, + executor_params, + exec_kind, + &metrics, + ) + .await; + + metrics.on_validation_event(&res); + let _ = response_sender.send(res); + } + .boxed(), + CandidateValidationMessage::PreCheck { + relay_parent, + validation_code_hash, + response_sender, + .. + } => async move { + let precheck_result = + precheck_pvf(&mut sender, validation_host, relay_parent, validation_code_hash) + .await; + + let _ = response_sender.send(precheck_result); + } + .boxed(), + } +} + #[overseer::contextbounds(CandidateValidation, prefix = self::overseer)] async fn run( mut ctx: Context, @@ -156,106 +239,48 @@ async fn run( .await?; ctx.spawn_blocking("pvf-validation-host", task.boxed())?; + let mut tasks = FuturesUnordered::new(); + loop { - match ctx.recv().await? { - FromOrchestra::Signal(OverseerSignal::ActiveLeaves(_)) => {}, - FromOrchestra::Signal(OverseerSignal::BlockFinalized(..)) => {}, - FromOrchestra::Signal(OverseerSignal::Conclude) => return Ok(()), - FromOrchestra::Communication { msg } => match msg { - CandidateValidationMessage::ValidateFromChainState { - candidate_receipt, - pov, - executor_params, - exec_kind, - response_sender, - .. - } => { - let bg = { - let mut sender = ctx.sender().clone(); - let metrics = metrics.clone(); - let validation_host = validation_host.clone(); - - async move { - let _timer = metrics.time_validate_from_chain_state(); - let res = validate_from_chain_state( - &mut sender, - validation_host, - candidate_receipt, - pov, - executor_params, - exec_kind, - &metrics, - ) - .await; - - metrics.on_validation_event(&res); - let _ = response_sender.send(res); - } - }; - - ctx.spawn("validate-from-chain-state", bg.boxed())?; - }, - CandidateValidationMessage::ValidateFromExhaustive { - validation_data, - validation_code, - candidate_receipt, - pov, - executor_params, - exec_kind, - response_sender, - .. - } => { - let bg = { - let metrics = metrics.clone(); - let validation_host = validation_host.clone(); - - async move { - let _timer = metrics.time_validate_from_exhaustive(); - let res = validate_candidate_exhaustive( - validation_host, - validation_data, - validation_code, - candidate_receipt, - pov, - executor_params, - exec_kind, - &metrics, - ) - .await; - - metrics.on_validation_event(&res); - let _ = response_sender.send(res); - } - }; - - ctx.spawn("validate-from-exhaustive", bg.boxed())?; + loop { + futures::select! { + comm = ctx.recv().fuse() => { + match comm { + Ok(FromOrchestra::Signal(OverseerSignal::ActiveLeaves(_))) => {}, + Ok(FromOrchestra::Signal(OverseerSignal::BlockFinalized(..))) => {}, + Ok(FromOrchestra::Signal(OverseerSignal::Conclude)) => return Ok(()), + Ok(FromOrchestra::Communication { msg }) => { + let task = handle_validation_message(ctx.sender().clone(), validation_host.clone(), metrics.clone(), msg); + tasks.push(task); + if tasks.len() >= TASK_LIMIT { + break + } + }, + Err(e) => return Err(SubsystemError::from(e)), + } }, - CandidateValidationMessage::PreCheck { - relay_parent, - validation_code_hash, - response_sender, - .. - } => { - let bg = { - let mut sender = ctx.sender().clone(); - let validation_host = validation_host.clone(); - - async move { - let precheck_result = precheck_pvf( - &mut sender, - validation_host, - relay_parent, - validation_code_hash, - ) - .await; - - let _ = response_sender.send(precheck_result); - } - }; - - ctx.spawn("candidate-validation-pre-check", bg.boxed())?; + _ = tasks.select_next_some() => () + } + } + + gum::debug!(target: LOG_TARGET, "Validation task limit hit"); + + loop { + futures::select! { + signal = ctx.recv_signal().fuse() => { + match signal { + Ok(OverseerSignal::ActiveLeaves(_)) => {}, + Ok(OverseerSignal::BlockFinalized(..)) => {}, + Ok(OverseerSignal::Conclude) => return Ok(()), + Err(e) => return Err(SubsystemError::from(e)), + } }, - }, + _ = tasks.select_next_some() => { + if tasks.len() < TASK_LIMIT { + break + } + } + } } } } diff --git a/polkadot/node/core/pvf/src/host.rs b/polkadot/node/core/pvf/src/host.rs index 21e13453edf3..fd86e46998aa 100644 --- a/polkadot/node/core/pvf/src/host.rs +++ b/polkadot/node/core/pvf/src/host.rs @@ -60,6 +60,9 @@ pub const PREPARE_BINARY_NAME: &str = "polkadot-prepare-worker"; /// The name of binary spawned to execute a PVF pub const EXECUTE_BINARY_NAME: &str = "polkadot-execute-worker"; +/// The size of incoming message queue +pub const HOST_MESSAGE_QUEUE_SIZE: usize = 10; + /// An alias to not spell the type for the oneshot sender for the PVF execution result. pub(crate) type ResultSender = oneshot::Sender>; @@ -227,7 +230,7 @@ pub async fn start( Err(err) => return Err(SubsystemError::Context(err)), }; - let (to_host_tx, to_host_rx) = mpsc::channel(10); + let (to_host_tx, to_host_rx) = mpsc::channel(HOST_MESSAGE_QUEUE_SIZE); let validation_host = ValidationHost { to_host_tx, security_status: security_status.clone() }; diff --git a/polkadot/node/core/pvf/src/lib.rs b/polkadot/node/core/pvf/src/lib.rs index 79391630b2d3..92263281eeab 100644 --- a/polkadot/node/core/pvf/src/lib.rs +++ b/polkadot/node/core/pvf/src/lib.rs @@ -104,7 +104,10 @@ mod worker_interface; pub mod testing; pub use error::{InvalidCandidate, PossiblyInvalidError, ValidationError}; -pub use host::{start, Config, ValidationHost, EXECUTE_BINARY_NAME, PREPARE_BINARY_NAME}; +pub use host::{ + start, Config, ValidationHost, EXECUTE_BINARY_NAME, HOST_MESSAGE_QUEUE_SIZE, + PREPARE_BINARY_NAME, +}; pub use metrics::Metrics; pub use priority::Priority; pub use worker_interface::{framed_recv, framed_send, JOB_TIMEOUT_WALL_CLOCK_FACTOR}; diff --git a/polkadot/node/malus/src/interceptor.rs b/polkadot/node/malus/src/interceptor.rs index e994319beb96..b44ffc8956b5 100644 --- a/polkadot/node/malus/src/interceptor.rs +++ b/polkadot/node/malus/src/interceptor.rs @@ -22,7 +22,7 @@ use polkadot_node_subsystem::*; pub use polkadot_node_subsystem::{messages::*, overseer, FromOrchestra}; -use std::{future::Future, pin::Pin}; +use std::{collections::VecDeque, future::Future, pin::Pin}; /// Filter incoming and outgoing messages. pub trait MessageInterceptor: Send + Sync + Clone + 'static @@ -170,6 +170,7 @@ where inner: Context, message_filter: Fil, sender: InterceptedSender<::Sender, Fil>, + message_buffer: VecDeque::Message>>, } impl InterceptedContext @@ -189,7 +190,7 @@ where inner: inner.sender().clone(), message_filter: message_filter.clone(), }; - Self { inner, message_filter, sender } + Self { inner, message_filter, sender, message_buffer: VecDeque::new() } } } @@ -233,6 +234,9 @@ where } async fn recv(&mut self) -> SubsystemResult> { + if let Some(msg) = self.message_buffer.pop_front() { + return Ok(msg) + } loop { let msg = self.inner.recv().await?; if let Some(msg) = self.message_filter.intercept_incoming(self.inner.sender(), msg) { @@ -241,6 +245,19 @@ where } } + async fn recv_signal(&mut self) -> SubsystemResult { + loop { + let msg = self.inner.recv().await?; + if let Some(msg) = self.message_filter.intercept_incoming(self.inner.sender(), msg) { + if let FromOrchestra::Signal(sig) = msg { + return Ok(sig) + } else { + self.message_buffer.push_back(msg) + } + } + } + } + fn spawn( &mut self, name: &'static str, diff --git a/polkadot/node/metrics/Cargo.toml b/polkadot/node/metrics/Cargo.toml index e9a4d463f4d9..90d95a6e50a2 100644 --- a/polkadot/node/metrics/Cargo.toml +++ b/polkadot/node/metrics/Cargo.toml @@ -14,7 +14,7 @@ futures = "0.3.21" futures-timer = "3.0.2" gum = { package = "tracing-gum", path = "../gum" } -metered = { package = "prioritized-metered-channel", version = "0.5.1", default-features = false, features = ["futures_channel"] } +metered = { package = "prioritized-metered-channel", version = "0.6.1", default-features = false, features = ["futures_channel"] } # Both `sc-service` and `sc-cli` are required by runtime metrics `logger_hook()`. sc-service = { path = "../../../substrate/client/service" } sc-cli = { path = "../../../substrate/client/cli" } diff --git a/polkadot/node/overseer/Cargo.toml b/polkadot/node/overseer/Cargo.toml index f168bdd08070..f39f0661aa17 100644 --- a/polkadot/node/overseer/Cargo.toml +++ b/polkadot/node/overseer/Cargo.toml @@ -20,14 +20,14 @@ polkadot-node-primitives = { path = "../primitives" } polkadot-node-subsystem-types = { path = "../subsystem-types" } polkadot-node-metrics = { path = "../metrics" } polkadot-primitives = { path = "../../primitives" } -orchestra = { version = "0.3.3", default-features = false, features = ["futures_channel"] } +orchestra = { version = "0.3.4", default-features = false, features = ["futures_channel"] } gum = { package = "tracing-gum", path = "../gum" } sp-core = { path = "../../../substrate/primitives/core" } async-trait = "0.1.74" tikv-jemalloc-ctl = { version = "0.5.0", optional = true } [dev-dependencies] -metered = { package = "prioritized-metered-channel", version = "0.5.1", default-features = false, features = ["futures_channel"] } +metered = { package = "prioritized-metered-channel", version = "0.6.1", default-features = false, features = ["futures_channel"] } sp-core = { path = "../../../substrate/primitives/core" } futures = { version = "0.3.21", features = ["thread-pool"] } femme = "2.2.1" diff --git a/polkadot/node/subsystem-bench/Cargo.toml b/polkadot/node/subsystem-bench/Cargo.toml index f09f7fc48976..750f7a7e2f83 100644 --- a/polkadot/node/subsystem-bench/Cargo.toml +++ b/polkadot/node/subsystem-bench/Cargo.toml @@ -55,7 +55,7 @@ prometheus = { version = "0.13.0", default-features = false } serde = "1.0.195" serde_yaml = "0.9" paste = "1.0.14" -orchestra = { version = "0.3.3", default-features = false, features = ["futures_channel"] } +orchestra = { version = "0.3.4", default-features = false, features = ["futures_channel"] } pyroscope = "0.5.7" pyroscope_pprofrs = "0.2.7" diff --git a/polkadot/node/subsystem-test-helpers/src/lib.rs b/polkadot/node/subsystem-test-helpers/src/lib.rs index dfa78e04b8c9..6c1ac86c4507 100644 --- a/polkadot/node/subsystem-test-helpers/src/lib.rs +++ b/polkadot/node/subsystem-test-helpers/src/lib.rs @@ -32,6 +32,7 @@ use parking_lot::Mutex; use sp_core::testing::TaskExecutor; use std::{ + collections::VecDeque, convert::Infallible, future::Future, pin::Pin, @@ -190,6 +191,7 @@ pub struct TestSubsystemContext { tx: TestSubsystemSender, rx: mpsc::Receiver>, spawn: S, + message_buffer: VecDeque>, } #[async_trait::async_trait] @@ -207,6 +209,9 @@ where type Error = SubsystemError; async fn try_recv(&mut self) -> Result>, ()> { + if let Some(msg) = self.message_buffer.pop_front() { + return Ok(Some(msg)) + } match poll!(self.rx.next()) { Poll::Ready(Some(msg)) => Ok(Some(msg)), Poll::Ready(None) => Err(()), @@ -215,12 +220,30 @@ where } async fn recv(&mut self) -> SubsystemResult> { + if let Some(msg) = self.message_buffer.pop_front() { + return Ok(msg) + } self.rx .next() .await .ok_or_else(|| SubsystemError::Context("Receiving end closed".to_owned())) } + async fn recv_signal(&mut self) -> SubsystemResult { + loop { + let msg = self + .rx + .next() + .await + .ok_or_else(|| SubsystemError::Context("Receiving end closed".to_owned()))?; + if let FromOrchestra::Signal(sig) = msg { + return Ok(sig) + } else { + self.message_buffer.push_back(msg) + } + } + } + fn spawn( &mut self, name: &'static str, @@ -314,6 +337,7 @@ pub fn make_buffered_subsystem_context( tx: TestSubsystemSender { tx: all_messages_tx }, rx: overseer_rx, spawn: SpawnGlue(spawner), + message_buffer: VecDeque::new(), }, TestSubsystemContextHandle { tx: overseer_tx, rx: all_messages_rx }, ) diff --git a/polkadot/node/subsystem-types/Cargo.toml b/polkadot/node/subsystem-types/Cargo.toml index 6713e9031234..181ef54b4c6c 100644 --- a/polkadot/node/subsystem-types/Cargo.toml +++ b/polkadot/node/subsystem-types/Cargo.toml @@ -17,7 +17,7 @@ polkadot-node-primitives = { path = "../primitives" } polkadot-node-network-protocol = { path = "../network/protocol" } polkadot-statement-table = { path = "../../statement-table" } polkadot-node-jaeger = { path = "../jaeger" } -orchestra = { version = "0.3.3", default-features = false, features = ["futures_channel"] } +orchestra = { version = "0.3.4", default-features = false, features = ["futures_channel"] } sc-network = { path = "../../../substrate/client/network" } sp-api = { path = "../../../substrate/primitives/api" } sp-blockchain = { path = "../../../substrate/primitives/blockchain" } diff --git a/polkadot/node/subsystem-util/Cargo.toml b/polkadot/node/subsystem-util/Cargo.toml index 3147a4f64f46..68a834d46e3e 100644 --- a/polkadot/node/subsystem-util/Cargo.toml +++ b/polkadot/node/subsystem-util/Cargo.toml @@ -32,7 +32,7 @@ polkadot-node-network-protocol = { path = "../network/protocol" } polkadot-primitives = { path = "../../primitives" } polkadot-node-primitives = { path = "../primitives" } polkadot-overseer = { path = "../overseer" } -metered = { package = "prioritized-metered-channel", version = "0.5.1", default-features = false, features = ["futures_channel"] } +metered = { package = "prioritized-metered-channel", version = "0.6.1", default-features = false, features = ["futures_channel"] } sp-core = { path = "../../../substrate/primitives/core" } sp-application-crypto = { path = "../../../substrate/primitives/application-crypto" } diff --git a/prdoc/pr_2125.prdoc b/prdoc/pr_2125.prdoc new file mode 100644 index 000000000000..ee81975d2d07 --- /dev/null +++ b/prdoc/pr_2125.prdoc @@ -0,0 +1,14 @@ +# Schema: Polkadot SDK PRDoc Schema (prdoc) v1.0.0 +# See doc at https://raw.githubusercontent.com/paritytech/polkadot-sdk/master/prdoc/schema_user.json + +title: Introduce bounds for the number of candidate validation subsystem simultaneously processed tasks + +doc: + - audience: Node Dev + description: | + Makes it possible for the candidate validation subsystem to create backpressure on subsystems + requesting to validate a candidate through limiting the number of simultaneously processed + validation tasks. + +crates: + - name: polkadot-node-core-candidate-validation