diff --git a/Cargo.lock b/Cargo.lock index ac64e65ee0ec..b2d655e6f41c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1585,16 +1585,15 @@ dependencies = [ [[package]] name = "blake3" -version = "1.4.1" +version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "199c42ab6972d92c9f8995f086273d25c42fc0f7b2a1fcefba465c1352d25ba5" +checksum = "0231f06152bf547e9c2b5194f247cd97aacf6dcd8b15d8e5ec0663f64580da87" dependencies = [ "arrayref", "arrayvec 0.7.4", "cc", "cfg-if", "constant_time_eq 0.3.0", - "digest 0.10.7", ] [[package]] @@ -12568,6 +12567,7 @@ version = "1.0.0" dependencies = [ "always-assert", "assert_matches", + "blake3", "cfg-if", "criterion 0.4.0", "futures", @@ -12646,6 +12646,7 @@ dependencies = [ "sp-externalities 0.19.0", "sp-io", "sp-tracing 10.0.0", + "substrate-build-script-utils", "tempfile", "thiserror", "tracing-gum", @@ -12670,6 +12671,7 @@ dependencies = [ name = "polkadot-node-core-pvf-prepare-worker" version = "1.0.0" dependencies = [ + "blake3", "cfg-if", "criterion 0.4.0", "libc", diff --git a/polkadot/node/core/candidate-validation/src/lib.rs b/polkadot/node/core/candidate-validation/src/lib.rs index a3d6f0473136..96ab07d3b824 100644 --- a/polkadot/node/core/candidate-validation/src/lib.rs +++ b/polkadot/node/core/candidate-validation/src/lib.rs @@ -25,7 +25,7 @@ use polkadot_node_core_pvf::{ InternalValidationError, InvalidCandidate as WasmInvalidCandidate, PrepareError, - PrepareJobKind, PrepareStats, PvfPrepData, ValidationError, ValidationHost, + PrepareJobKind, PvfPrepData, ValidationError, ValidationHost, }; use polkadot_node_primitives::{ BlockData, InvalidCandidate, PoV, ValidationResult, POV_BOMB_LIMIT, VALIDATION_CODE_BOMB_LIMIT, @@ -794,7 +794,7 @@ trait ValidationBackend { validation_result } - async fn precheck_pvf(&mut self, pvf: PvfPrepData) -> Result; + async fn precheck_pvf(&mut self, pvf: PvfPrepData) -> Result<(), PrepareError>; } #[async_trait] @@ -824,7 +824,7 @@ impl ValidationBackend for ValidationHost { })? } - async fn precheck_pvf(&mut self, pvf: PvfPrepData) -> Result { + async fn precheck_pvf(&mut self, pvf: PvfPrepData) -> Result<(), PrepareError> { let (tx, rx) = oneshot::channel(); if let Err(err) = self.precheck_pvf(pvf, tx).await { // Return an IO error if there was an error communicating with the host. diff --git a/polkadot/node/core/candidate-validation/src/tests.rs b/polkadot/node/core/candidate-validation/src/tests.rs index cab823e1e637..801c581938a6 100644 --- a/polkadot/node/core/candidate-validation/src/tests.rs +++ b/polkadot/node/core/candidate-validation/src/tests.rs @@ -377,7 +377,7 @@ impl ValidationBackend for MockValidateCandidateBackend { result } - async fn precheck_pvf(&mut self, _pvf: PvfPrepData) -> Result { + async fn precheck_pvf(&mut self, _pvf: PvfPrepData) -> Result<(), PrepareError> { unreachable!() } } @@ -1014,11 +1014,11 @@ fn pov_decompression_failure_is_invalid() { } struct MockPreCheckBackend { - result: Result, + result: Result<(), PrepareError>, } impl MockPreCheckBackend { - fn with_hardcoded_result(result: Result) -> Self { + fn with_hardcoded_result(result: Result<(), PrepareError>) -> Self { Self { result } } } @@ -1034,7 +1034,7 @@ impl ValidationBackend for MockPreCheckBackend { unreachable!() } - async fn precheck_pvf(&mut self, _pvf: PvfPrepData) -> Result { + async fn precheck_pvf(&mut self, _pvf: PvfPrepData) -> Result<(), PrepareError> { self.result.clone() } } @@ -1051,7 +1051,7 @@ fn precheck_works() { let (check_fut, check_result) = precheck_pvf( ctx.sender(), - MockPreCheckBackend::with_hardcoded_result(Ok(PrepareStats::default())), + MockPreCheckBackend::with_hardcoded_result(Ok(())), relay_parent, validation_code_hash, ) @@ -1113,7 +1113,7 @@ fn precheck_invalid_pvf_blob_compression() { let (check_fut, check_result) = precheck_pvf( ctx.sender(), - MockPreCheckBackend::with_hardcoded_result(Ok(PrepareStats::default())), + MockPreCheckBackend::with_hardcoded_result(Ok(())), relay_parent, validation_code_hash, ) diff --git a/polkadot/node/core/pvf/Cargo.toml b/polkadot/node/core/pvf/Cargo.toml index 3e72ca9e5326..27da484fe4f1 100644 --- a/polkadot/node/core/pvf/Cargo.toml +++ b/polkadot/node/core/pvf/Cargo.toml @@ -8,6 +8,7 @@ license.workspace = true [dependencies] always-assert = "0.1" +blake3 = "1.5" cfg-if = "1.0" futures = "0.3.21" futures-timer = "3.0.2" diff --git a/polkadot/node/core/pvf/benches/host_prepare_rococo_runtime.rs b/polkadot/node/core/pvf/benches/host_prepare_rococo_runtime.rs index 378374a10b39..c02a0b595da3 100644 --- a/polkadot/node/core/pvf/benches/host_prepare_rococo_runtime.rs +++ b/polkadot/node/core/pvf/benches/host_prepare_rococo_runtime.rs @@ -56,7 +56,7 @@ impl TestHost { &self, code: &[u8], executor_params: ExecutorParams, - ) -> Result { + ) -> Result<(), PrepareError> { let (result_tx, result_rx) = futures::channel::oneshot::channel(); let code = sp_maybe_compressed_blob::decompress(code, 16 * 1024 * 1024) diff --git a/polkadot/node/core/pvf/common/Cargo.toml b/polkadot/node/core/pvf/common/Cargo.toml index e3fda06963e3..bfe1be9156fc 100644 --- a/polkadot/node/core/pvf/common/Cargo.toml +++ b/polkadot/node/core/pvf/common/Cargo.toml @@ -36,6 +36,9 @@ seccompiler = "0.4.0" assert_matches = "1.4.0" tempfile = "3.3.0" +[build-dependencies] +substrate-build-script-utils = { path = "../../../../../substrate/utils/build-script-utils" } + [features] # This feature is used to export test code to other crates without putting it in the production build. test-utils = [] diff --git a/polkadot/node/core/pvf/common/build.rs b/polkadot/node/core/pvf/common/build.rs new file mode 100644 index 000000000000..5531ad411da8 --- /dev/null +++ b/polkadot/node/core/pvf/common/build.rs @@ -0,0 +1,19 @@ +// Copyright (C) Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see . + +fn main() { + substrate_build_script_utils::generate_wasmtime_version(); +} diff --git a/polkadot/node/core/pvf/common/src/error.rs b/polkadot/node/core/pvf/common/src/error.rs index 34475c481f73..6bf05ece78ef 100644 --- a/polkadot/node/core/pvf/common/src/error.rs +++ b/polkadot/node/core/pvf/common/src/error.rs @@ -14,16 +14,24 @@ // You should have received a copy of the GNU General Public License // along with Polkadot. If not, see . -use crate::prepare::PrepareStats; +use crate::prepare::{PrepareSuccess, PrepareWorkerSuccess}; use parity_scale_codec::{Decode, Encode}; use std::fmt; -/// Result of PVF preparation performed by the validation host. Contains stats about the preparation -/// if successful -pub type PrepareResult = Result; +/// Result of PVF preparation from a worker, with checksum of the compiled PVF and stats of the +/// preparation if successful. +pub type PrepareWorkerResult = Result; + +/// Result of PVF preparation propagated all the way back to the host, with path to the concluded +/// artifact and stats of the preparation if successful. +pub type PrepareResult = Result; + +/// Result of prechecking PVF performed by the validation host. Contains stats about the preparation +/// if successful. +pub type PrecheckResult = Result<(), PrepareError>; /// An error that occurred during the prepare part of the PVF pipeline. -// Codec indexes are intended to stabilize pre-encoded payloads (see `OOM_PAYLOAD` below) +// Codec indexes are intended to stabilize pre-encoded payloads (see `OOM_PAYLOAD`) #[derive(Debug, Clone, Encode, Decode)] pub enum PrepareError { /// During the prevalidation stage of preparation an issue was found with the PVF. diff --git a/polkadot/node/core/pvf/common/src/lib.rs b/polkadot/node/core/pvf/common/src/lib.rs index e2211b97d87b..282d2f7c41d0 100644 --- a/polkadot/node/core/pvf/common/src/lib.rs +++ b/polkadot/node/core/pvf/common/src/lib.rs @@ -31,6 +31,8 @@ pub use sp_tracing; const LOG_TARGET: &str = "parachain::pvf-common"; +pub const RUNTIME_VERSION: &str = env!("SUBSTRATE_WASMTIME_VERSION"); + use std::{ io::{self, Read, Write}, mem, diff --git a/polkadot/node/core/pvf/common/src/prepare.rs b/polkadot/node/core/pvf/common/src/prepare.rs index 4436ebe4861e..28ab682ec136 100644 --- a/polkadot/node/core/pvf/common/src/prepare.rs +++ b/polkadot/node/core/pvf/common/src/prepare.rs @@ -15,6 +15,25 @@ // along with Polkadot. If not, see . use parity_scale_codec::{Decode, Encode}; +use std::path::PathBuf; + +/// Result from prepare worker if successful. +#[derive(Debug, Clone, Default, Encode, Decode)] +pub struct PrepareWorkerSuccess { + /// Checksum of the compiled PVF. + pub checksum: String, + /// Stats of the current preparation run. + pub stats: PrepareStats, +} + +/// Result of PVF preparation if successful. +#[derive(Debug, Clone, Default)] +pub struct PrepareSuccess { + /// Canonical path to the compiled artifact. + pub path: PathBuf, + /// Stats of the current preparation run. + pub stats: PrepareStats, +} /// Preparation statistics, including the CPU time and memory taken. #[derive(Debug, Clone, Default, Encode, Decode)] diff --git a/polkadot/node/core/pvf/common/src/pvf.rs b/polkadot/node/core/pvf/common/src/pvf.rs index 0cc86434c195..2d8f6430187b 100644 --- a/polkadot/node/core/pvf/common/src/pvf.rs +++ b/polkadot/node/core/pvf/common/src/pvf.rs @@ -115,7 +115,7 @@ impl fmt::Debug for PvfPrepData { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { write!( f, - "Pvf {{ code, code_hash: {:?}, executor_params: {:?}, prep_timeout: {:?} }}", + "Pvf {{ code: [...], code_hash: {:?}, executor_params: {:?}, prep_timeout: {:?} }}", self.code_hash, self.executor_params, self.prep_timeout ) } diff --git a/polkadot/node/core/pvf/prepare-worker/Cargo.toml b/polkadot/node/core/pvf/prepare-worker/Cargo.toml index 1cd221533f48..005f2e935117 100644 --- a/polkadot/node/core/pvf/prepare-worker/Cargo.toml +++ b/polkadot/node/core/pvf/prepare-worker/Cargo.toml @@ -7,6 +7,7 @@ edition.workspace = true license.workspace = true [dependencies] +blake3 = "1.5" cfg-if = "1.0" gum = { package = "tracing-gum", path = "../../../gum" } libc = "0.2.139" diff --git a/polkadot/node/core/pvf/prepare-worker/src/lib.rs b/polkadot/node/core/pvf/prepare-worker/src/lib.rs index 151b54efc2d1..34e6a78c26ae 100644 --- a/polkadot/node/core/pvf/prepare-worker/src/lib.rs +++ b/polkadot/node/core/pvf/prepare-worker/src/lib.rs @@ -40,10 +40,10 @@ use nix::{ use os_pipe::{self, PipeReader, PipeWriter}; use parity_scale_codec::{Decode, Encode}; use polkadot_node_core_pvf_common::{ - error::{PrepareError, PrepareResult}, + error::{PrepareError, PrepareWorkerResult}, executor_intf::create_runtime_from_artifact_bytes, framed_recv_blocking, framed_send_blocking, - prepare::{MemoryStats, PrepareJobKind, PrepareStats}, + prepare::{MemoryStats, PrepareJobKind, PrepareStats, PrepareWorkerSuccess}, pvf::PvfPrepData, worker::{ cpu_time_monitor_loop, run_worker, stringify_panic_payload, @@ -106,7 +106,7 @@ fn recv_request(stream: &mut UnixStream) -> io::Result { } /// Send a worker response. -fn send_response(stream: &mut UnixStream, result: PrepareResult) -> io::Result<()> { +fn send_response(stream: &mut UnixStream, result: PrepareWorkerResult) -> io::Result<()> { framed_send_blocking(stream, &result.encode()) } @@ -186,8 +186,8 @@ fn end_memory_tracking() -> isize { /// /// 7. If compilation succeeded, write the compiled artifact into a temporary file. /// -/// 8. Send the result of preparation back to the host. If any error occurred in the above steps, we -/// send that in the `PrepareResult`. +/// 8. Send the result of preparation back to the host, including the checksum of the artifact. If +/// any error occurred in the above steps, we send that in the `PrepareWorkerResult`. pub fn worker_entrypoint( socket_path: PathBuf, worker_dir_path: PathBuf, @@ -439,11 +439,11 @@ fn handle_child_process( Err(err) => Err(err), Ok(ok) => { cfg_if::cfg_if! { - if #[cfg(target_os = "linux")] { - let (artifact, max_rss) = ok; - } else { - let artifact = ok; - } + if #[cfg(target_os = "linux")] { + let (artifact, max_rss) = ok; + } else { + let artifact = ok; + } } // Stop the memory stats worker and get its observed memory stats. @@ -511,7 +511,7 @@ fn handle_parent_process( worker_pid: u32, usage_before: Usage, timeout: Duration, -) -> Result { +) -> Result { // Read from the child. Don't decode unless the process exited normally, which we check later. let mut received_data = Vec::new(); pipe_read @@ -554,7 +554,7 @@ fn handle_parent_process( match result { Err(err) => Err(err), - Ok(response) => { + Ok(JobResponse { artifact, memory_stats }) => { // The exit status should have been zero if no error occurred. if exit_status != 0 { return Err(PrepareError::JobError(format!( @@ -577,13 +577,14 @@ fn handle_parent_process( temp_artifact_dest.display(), ); // Write to the temp file created by the host. - if let Err(err) = fs::write(&temp_artifact_dest, &response.artifact) { + if let Err(err) = fs::write(&temp_artifact_dest, &artifact) { return Err(PrepareError::IoErr(err.to_string())) }; - Ok(PrepareStats { - memory_stats: response.memory_stats, - cpu_time_elapsed: cpu_tv, + let checksum = blake3::hash(&artifact.as_ref()).to_hex().to_string(); + Ok(PrepareWorkerSuccess { + checksum, + stats: PrepareStats { memory_stats, cpu_time_elapsed: cpu_tv }, }) }, } @@ -657,13 +658,13 @@ fn error_from_errno(context: &'static str, errno: Errno) -> PrepareError { type JobResult = Result; -/// Pre-encoded length-prefixed `Result::Err(PrepareError::OutOfMemory)` +/// Pre-encoded length-prefixed `JobResult::Err(PrepareError::OutOfMemory)` const OOM_PAYLOAD: &[u8] = b"\x02\x00\x00\x00\x00\x00\x00\x00\x01\x08"; #[test] fn pre_encoded_payloads() { // NOTE: This must match the type of `response` in `send_child_response`. - let oom_unencoded: JobResult = Result::Err(PrepareError::OutOfMemory); + let oom_unencoded: JobResult = JobResult::Err(PrepareError::OutOfMemory); let oom_encoded = oom_unencoded.encode(); // The payload is prefixed with its length in `framed_send`. let mut oom_payload = oom_encoded.len().to_le_bytes().to_vec(); diff --git a/polkadot/node/core/pvf/src/artifacts.rs b/polkadot/node/core/pvf/src/artifacts.rs index dd83f76494ed..53085eade3cb 100644 --- a/polkadot/node/core/pvf/src/artifacts.rs +++ b/polkadot/node/core/pvf/src/artifacts.rs @@ -16,10 +16,10 @@ //! PVF artifacts (final compiled code blobs). //! -//! # Lifecycle of an artifact +//! # Lifecycle of an artifact //! -//! 1. During node start-up, the artifacts cache is cleaned up. This means that all local artifacts -//! stored on-disk are cleared, and we start with an empty [`Artifacts`] table. +//! 1. During node start-up, we will check the cached artifacts, if any. The stale and corrupted +//! ones are pruned. The valid ones are registered in the [`Artifacts`] table. //! //! 2. In order to be executed, a PVF should be prepared first. This means that artifacts should //! have an [`ArtifactState::Prepared`] entry for that artifact in the table. If not, the @@ -55,18 +55,29 @@ //! older by a predefined parameter. This process is run very rarely (say, once a day). Once the //! artifact is expired it is removed from disk eagerly atomically. -use crate::host::PrepareResultSender; +use crate::{host::PrecheckResultSender, LOG_TARGET}; use always_assert::always; -use polkadot_node_core_pvf_common::{error::PrepareError, prepare::PrepareStats, pvf::PvfPrepData}; +use polkadot_core_primitives::Hash; +use polkadot_node_core_pvf_common::{ + error::PrepareError, prepare::PrepareStats, pvf::PvfPrepData, RUNTIME_VERSION, +}; use polkadot_node_primitives::NODE_VERSION; use polkadot_parachain_primitives::primitives::ValidationCodeHash; use polkadot_primitives::ExecutorParamsHash; use std::{ collections::HashMap, path::{Path, PathBuf}, + str::FromStr as _, time::{Duration, SystemTime}, }; +const RUNTIME_PREFIX: &str = "wasmtime_v"; +const NODE_PREFIX: &str = "polkadot_v"; + +fn artifact_prefix() -> String { + format!("{}{}_{}{}", RUNTIME_PREFIX, RUNTIME_VERSION, NODE_PREFIX, NODE_VERSION) +} + /// Identifier of an artifact. Encodes a code hash of the PVF and a hash of executor parameter set. #[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)] pub struct ArtifactId { @@ -75,9 +86,6 @@ pub struct ArtifactId { } impl ArtifactId { - const PREFIX: &'static str = "wasmtime_"; - const NODE_VERSION_PREFIX: &'static str = "polkadot_v"; - /// Creates a new artifact ID with the given hash. pub fn new(code_hash: ValidationCodeHash, executor_params_hash: ExecutorParamsHash) -> Self { Self { code_hash, executor_params_hash } @@ -88,38 +96,34 @@ impl ArtifactId { Self::new(pvf.code_hash(), pvf.executor_params().hash()) } - /// Tries to recover the artifact id from the given file name. - #[cfg(test)] - pub fn from_file_name(file_name: &str) -> Option { - use polkadot_core_primitives::Hash; - use std::str::FromStr as _; - - let file_name = - file_name.strip_prefix(Self::PREFIX)?.strip_prefix(Self::NODE_VERSION_PREFIX)?; - - // [ node version | code hash | param hash ] - let parts: Vec<&str> = file_name.split('_').collect(); - let (_node_ver, code_hash_str, executor_params_hash_str) = (parts[0], parts[1], parts[2]); - - let code_hash = Hash::from_str(code_hash_str).ok()?.into(); - let executor_params_hash = - ExecutorParamsHash::from_hash(Hash::from_str(executor_params_hash_str).ok()?); - - Some(Self { code_hash, executor_params_hash }) - } - - /// Returns the expected path to this artifact given the root of the cache. - pub fn path(&self, cache_path: &Path) -> PathBuf { + /// Returns the canonical path to the concluded artifact. + pub(crate) fn path(&self, cache_path: &Path, checksum: &str) -> PathBuf { let file_name = format!( - "{}{}{}_{:#x}_{:#x}", - Self::PREFIX, - Self::NODE_VERSION_PREFIX, - NODE_VERSION, + "{}_{:#x}_{:#x}_0x{}", + artifact_prefix(), self.code_hash, - self.executor_params_hash + self.executor_params_hash, + checksum ); cache_path.join(file_name) } + + /// Tries to recover the artifact id from the given file name. + /// Return `None` if the given file name is invalid. + /// VALID_NAME := _ _ _ + fn from_file_name(file_name: &str) -> Option { + let file_name = file_name.strip_prefix(&artifact_prefix())?.strip_prefix('_')?; + let parts: Vec<&str> = file_name.split('_').collect(); + + if let [code_hash, param_hash, _checksum] = parts[..] { + let code_hash = Hash::from_str(code_hash).ok()?.into(); + let executor_params_hash = + ExecutorParamsHash::from_hash(Hash::from_str(param_hash).ok()?); + return Some(Self { code_hash, executor_params_hash }) + } + + None + } } /// A bundle of the artifact ID and the path. @@ -136,8 +140,8 @@ pub struct ArtifactPathId { } impl ArtifactPathId { - pub(crate) fn new(artifact_id: ArtifactId, cache_path: &Path) -> Self { - Self { path: artifact_id.path(cache_path), id: artifact_id } + pub(crate) fn new(artifact_id: ArtifactId, path: &Path) -> Self { + Self { id: artifact_id, path: path.to_owned() } } } @@ -148,6 +152,8 @@ pub enum ArtifactState { /// That means that the artifact should be accessible through the path obtained by the artifact /// id (unless, it was removed externally). Prepared { + /// The path of the compiled artifact. + path: PathBuf, /// The time when the artifact was last needed. /// /// This is updated when we get the heads up for this artifact or when we just discover @@ -159,7 +165,7 @@ pub enum ArtifactState { /// A task to prepare this artifact is scheduled. Preparing { /// List of result senders that are waiting for a response. - waiting_for_response: Vec, + waiting_for_response: Vec, /// The number of times this artifact has failed to prepare. num_failures: u32, }, @@ -177,32 +183,148 @@ pub enum ArtifactState { /// A container of all known artifact ids and their states. pub struct Artifacts { - artifacts: HashMap, + inner: HashMap, } impl Artifacts { - /// Initialize a blank cache at the given path. This will clear everything present at the - /// given path, to be populated over time. - /// - /// The recognized artifacts will be filled in the table and unrecognized will be removed. - pub async fn new(cache_path: &Path) -> Self { - // First delete the entire cache. This includes artifacts and any leftover worker dirs (see - // [`WorkerDir`]). Nodes are long-running so this should populate shortly. - let _ = tokio::fs::remove_dir_all(cache_path).await; + #[cfg(test)] + pub(crate) fn empty() -> Self { + Self { inner: HashMap::new() } + } + + #[cfg(test)] + pub(crate) fn len(&self) -> usize { + self.inner.len() + } + + /// Create an empty table and populate it with valid artifacts as [`ArtifactState::Prepared`], + /// if any. The existing caches will be checked by their file name to determine whether they are + /// valid, e.g., matching the current node version. The ones deemed invalid will be pruned. + pub async fn new_and_prune(cache_path: &Path) -> Self { + let mut artifacts = Self { inner: HashMap::new() }; + artifacts.insert_and_prune(cache_path).await; + artifacts + } + + async fn insert_and_prune(&mut self, cache_path: &Path) { + async fn is_corrupted(path: &Path) -> bool { + let checksum = match tokio::fs::read(path).await { + Ok(bytes) => blake3::hash(&bytes), + Err(err) => { + // just remove the file if we cannot read it + gum::warn!( + target: LOG_TARGET, + ?err, + "unable to read artifact {:?} when checking integrity, removing...", + path, + ); + return true + }, + }; + + if let Some(file_name) = path.file_name() { + if let Some(file_name) = file_name.to_str() { + return !file_name.ends_with(checksum.to_hex().as_str()) + } + } + true + } + + // Insert the entry into the artifacts table if it is valid. + // Otherwise, prune it. + async fn insert_or_prune( + artifacts: &mut Artifacts, + entry: &tokio::fs::DirEntry, + cache_path: &Path, + ) { + let file_type = entry.file_type().await; + let file_name = entry.file_name(); + + match file_type { + Ok(file_type) => + if !file_type.is_file() { + return + }, + Err(err) => { + gum::warn!( + target: LOG_TARGET, + ?err, + "unable to get file type for {:?}", + file_name, + ); + return + }, + } + + if let Some(file_name) = file_name.to_str() { + let id = ArtifactId::from_file_name(file_name); + let path = cache_path.join(file_name); + + if id.is_none() || is_corrupted(&path).await { + gum::warn!( + target: LOG_TARGET, + "discarding invalid artifact {:?}", + &path, + ); + let _ = tokio::fs::remove_file(&path).await; + return + } + + if let Some(id) = id { + gum::debug!( + target: LOG_TARGET, + "reusing existing {:?} for node version v{}", + &path, + NODE_VERSION, + ); + artifacts.insert_prepared(id, path, SystemTime::now(), Default::default()); + } + } else { + gum::warn!( + target: LOG_TARGET, + "non-Unicode file name {:?} found in {:?}", + file_name, + cache_path, + ); + } + } + // Make sure that the cache path directory and all its parents are created. let _ = tokio::fs::create_dir_all(cache_path).await; - Self { artifacts: HashMap::new() } - } + let mut dir = match tokio::fs::read_dir(cache_path).await { + Ok(dir) => dir, + Err(err) => { + gum::error!( + target: LOG_TARGET, + ?err, + "failed to read dir {:?}", + cache_path, + ); + return + }, + }; - #[cfg(test)] - pub(crate) fn empty() -> Self { - Self { artifacts: HashMap::new() } + loop { + match dir.next_entry().await { + Ok(Some(entry)) => insert_or_prune(self, &entry, cache_path).await, + Ok(None) => break, + Err(err) => { + gum::warn!( + target: LOG_TARGET, + ?err, + "error processing artifacts in {:?}", + cache_path, + ); + break + }, + } + } } /// Returns the state of the given artifact by its ID. pub fn artifact_state_mut(&mut self, artifact_id: &ArtifactId) -> Option<&mut ArtifactState> { - self.artifacts.get_mut(artifact_id) + self.inner.get_mut(artifact_id) } /// Inform the table about the artifact with the given ID. The state will be set to "preparing". @@ -212,53 +334,52 @@ impl Artifacts { pub fn insert_preparing( &mut self, artifact_id: ArtifactId, - waiting_for_response: Vec, + waiting_for_response: Vec, ) { // See the precondition. always!(self - .artifacts + .inner .insert(artifact_id, ArtifactState::Preparing { waiting_for_response, num_failures: 0 }) .is_none()); } /// Insert an artifact with the given ID as "prepared". /// - /// This function must be used only for brand-new artifacts and should never be used for - /// replacing existing ones. - #[cfg(test)] - pub fn insert_prepared( + /// This function should only be used to build the artifact table at startup with valid + /// artifact caches. + pub(crate) fn insert_prepared( &mut self, artifact_id: ArtifactId, + path: PathBuf, last_time_needed: SystemTime, prepare_stats: PrepareStats, ) { // See the precondition. always!(self - .artifacts - .insert(artifact_id, ArtifactState::Prepared { last_time_needed, prepare_stats }) + .inner + .insert(artifact_id, ArtifactState::Prepared { path, last_time_needed, prepare_stats }) .is_none()); } - /// Remove and retrieve the artifacts from the table that are older than the supplied - /// Time-To-Live. - pub fn prune(&mut self, artifact_ttl: Duration) -> Vec { + /// Remove artifacts older than the given TTL and return id and path of the removed ones. + pub fn prune(&mut self, artifact_ttl: Duration) -> Vec<(ArtifactId, PathBuf)> { let now = SystemTime::now(); let mut to_remove = vec![]; - for (k, v) in self.artifacts.iter() { - if let ArtifactState::Prepared { last_time_needed, .. } = *v { + for (k, v) in self.inner.iter() { + if let ArtifactState::Prepared { last_time_needed, ref path, .. } = *v { if now .duration_since(last_time_needed) .map(|age| age > artifact_ttl) .unwrap_or(false) { - to_remove.push(k.clone()); + to_remove.push((k.clone(), path.clone())); } } } for artifact in &to_remove { - self.artifacts.remove(artifact); + self.inner.remove(&artifact.0); } to_remove @@ -267,13 +388,72 @@ impl Artifacts { #[cfg(test)] mod tests { - use super::{ArtifactId, Artifacts, NODE_VERSION}; + use super::{artifact_prefix as prefix, ArtifactId, Artifacts, NODE_VERSION, RUNTIME_VERSION}; use polkadot_primitives::ExecutorParamsHash; + use rand::Rng; use sp_core::H256; - use std::{path::Path, str::FromStr}; + use std::{ + fs, + io::Write, + path::{Path, PathBuf}, + str::FromStr, + }; + + fn rand_hash(len: usize) -> String { + let mut rng = rand::thread_rng(); + let hex: Vec<_> = "0123456789abcdef".chars().collect(); + (0..len).map(|_| hex[rng.gen_range(0..hex.len())]).collect() + } + + fn file_name(code_hash: &str, param_hash: &str, checksum: &str) -> String { + format!("{}_0x{}_0x{}_0x{}", prefix(), code_hash, param_hash, checksum) + } - fn file_name(code_hash: &str, param_hash: &str) -> String { - format!("wasmtime_polkadot_v{}_0x{}_0x{}", NODE_VERSION, code_hash, param_hash) + fn create_artifact( + dir: impl AsRef, + prefix: &str, + code_hash: impl AsRef, + params_hash: impl AsRef, + ) -> (PathBuf, String) { + fn artifact_path_without_checksum( + dir: impl AsRef, + prefix: &str, + code_hash: impl AsRef, + params_hash: impl AsRef, + ) -> PathBuf { + let mut path = dir.as_ref().to_path_buf(); + let file_name = + format!("{}_0x{}_0x{}", prefix, code_hash.as_ref(), params_hash.as_ref(),); + path.push(file_name); + path + } + + let (code_hash, params_hash) = (code_hash.as_ref(), params_hash.as_ref()); + let path = artifact_path_without_checksum(dir, prefix, code_hash, params_hash); + let mut file = fs::File::create(&path).unwrap(); + + let content = format!("{}{}", code_hash, params_hash).into_bytes(); + file.write_all(&content).unwrap(); + let checksum = blake3::hash(&content).to_hex().to_string(); + + (path, checksum) + } + + fn create_rand_artifact(dir: impl AsRef, prefix: &str) -> (PathBuf, String) { + create_artifact(dir, prefix, rand_hash(64), rand_hash(64)) + } + + fn concluded_path(path: impl AsRef, checksum: &str) -> PathBuf { + let path = path.as_ref(); + let mut file_name = path.file_name().unwrap().to_os_string(); + file_name.push("_0x"); + file_name.push(checksum); + path.with_file_name(file_name) + } + + #[test] + fn artifact_prefix() { + assert_eq!(prefix(), format!("wasmtime_v{}_polkadot_v{}", RUNTIME_VERSION, NODE_VERSION)); } #[test] @@ -284,6 +464,7 @@ mod tests { let file_name = file_name( "0022800000000000000000000000000000000000000000000000000000000000", "0033900000000000000000000000000000000000000000000000000000000000", + "00000000000000000000000000000000", ); assert_eq!( @@ -305,40 +486,54 @@ mod tests { let dir = Path::new("/test"); let code_hash = "1234567890123456789012345678901234567890123456789012345678901234"; let params_hash = "4321098765432109876543210987654321098765432109876543210987654321"; - let file_name = file_name(code_hash, params_hash); + let checksum = "34567890123456789012345678901234"; + let file_name = file_name(code_hash, params_hash, checksum); let code_hash = H256::from_str(code_hash).unwrap(); let params_hash = H256::from_str(params_hash).unwrap(); + let path = ArtifactId::new(code_hash.into(), ExecutorParamsHash::from_hash(params_hash)) + .path(dir, checksum); - assert_eq!( - ArtifactId::new(code_hash.into(), ExecutorParamsHash::from_hash(params_hash)) - .path(dir) - .to_str(), - Some(format!("/test/{}", file_name).as_str()), - ); + assert_eq!(path.to_str().unwrap(), format!("/test/{}", file_name)); } #[tokio::test] - async fn artifacts_removes_cache_on_startup() { - let fake_cache_path = crate::worker_intf::tmppath("test-cache").await.unwrap(); - let fake_artifact_path = { - let mut p = fake_cache_path.clone(); - p.push("wasmtime_0x1234567890123456789012345678901234567890123456789012345678901234"); - p - }; + async fn remove_stale_cache_on_startup() { + let cache_dir = crate::worker_intf::tmppath("test-cache").await.unwrap(); + fs::create_dir_all(&cache_dir).unwrap(); + + // invalid prefix + create_rand_artifact(&cache_dir, ""); + create_rand_artifact(&cache_dir, "wasmtime_polkadot_v"); + create_rand_artifact(&cache_dir, "wasmtime_v8.0.0_polkadot_v1.0.0"); + + let prefix = prefix(); + + // no checksum + create_rand_artifact(&cache_dir, &prefix); + + // invalid hashes + let (path, checksum) = create_artifact(&cache_dir, &prefix, "000", "000001"); + let new_path = concluded_path(&path, &checksum); + fs::rename(&path, &new_path).unwrap(); - // create a tmp cache with 1 artifact. + // checksum tampered + let (path, checksum) = create_rand_artifact(&cache_dir, &prefix); + let new_path = concluded_path(&path, checksum.chars().rev().collect::().as_str()); + fs::rename(&path, &new_path).unwrap(); - std::fs::create_dir_all(&fake_cache_path).unwrap(); - std::fs::File::create(fake_artifact_path).unwrap(); + // valid + let (path, checksum) = create_rand_artifact(&cache_dir, &prefix); + let new_path = concluded_path(&path, &checksum); + fs::rename(&path, &new_path).unwrap(); - // this should remove it and re-create. + assert_eq!(fs::read_dir(&cache_dir).unwrap().count(), 7); - let p = &fake_cache_path; - Artifacts::new(p).await; + let artifacts = Artifacts::new_and_prune(&cache_dir).await; - assert_eq!(std::fs::read_dir(&fake_cache_path).unwrap().count(), 0); + assert_eq!(fs::read_dir(&cache_dir).unwrap().count(), 1); + assert_eq!(artifacts.len(), 1); - std::fs::remove_dir_all(fake_cache_path).unwrap(); + fs::remove_dir_all(cache_dir).unwrap(); } } diff --git a/polkadot/node/core/pvf/src/host.rs b/polkadot/node/core/pvf/src/host.rs index 5919b9ba32c9..f67934e4171c 100644 --- a/polkadot/node/core/pvf/src/host.rs +++ b/polkadot/node/core/pvf/src/host.rs @@ -32,14 +32,15 @@ use futures::{ Future, FutureExt, SinkExt, StreamExt, }; use polkadot_node_core_pvf_common::{ - error::{PrepareError, PrepareResult}, + error::{PrecheckResult, PrepareError}, + prepare::PrepareSuccess, pvf::PvfPrepData, }; use polkadot_node_subsystem::SubsystemResult; use polkadot_parachain_primitives::primitives::ValidationResult; use std::{ collections::HashMap, - path::{Path, PathBuf}, + path::PathBuf, time::{Duration, SystemTime}, }; @@ -63,7 +64,7 @@ pub const EXECUTE_BINARY_NAME: &str = "polkadot-execute-worker"; pub(crate) type ResultSender = oneshot::Sender>; /// Transmission end used for sending the PVF preparation result. -pub(crate) type PrepareResultSender = oneshot::Sender; +pub(crate) type PrecheckResultSender = oneshot::Sender; /// A handle to the async process serving the validation host requests. #[derive(Clone)] @@ -83,7 +84,7 @@ impl ValidationHost { pub async fn precheck_pvf( &mut self, pvf: PvfPrepData, - result_tx: PrepareResultSender, + result_tx: PrecheckResultSender, ) -> Result<(), String> { self.to_host_tx .send(ToHost::PrecheckPvf { pvf, result_tx }) @@ -133,7 +134,7 @@ impl ValidationHost { } enum ToHost { - PrecheckPvf { pvf: PvfPrepData, result_tx: PrepareResultSender }, + PrecheckPvf { pvf: PvfPrepData, result_tx: PrecheckResultSender }, ExecutePvf(ExecutePvfInputs), HeadsUp { active_pvfs: Vec }, } @@ -249,10 +250,9 @@ pub async fn start( let run_sweeper = sweeper_task(to_sweeper_rx); let run_host = async move { - let artifacts = Artifacts::new(&config.cache_path).await; + let artifacts = Artifacts::new_and_prune(&config.cache_path).await; run(Inner { - cache_path: config.cache_path, cleanup_pulse_interval: Duration::from_secs(3600), artifact_ttl: Duration::from_secs(3600 * 24), artifacts, @@ -296,7 +296,6 @@ impl AwaitingPrepare { } struct Inner { - cache_path: PathBuf, cleanup_pulse_interval: Duration, artifact_ttl: Duration, artifacts: Artifacts, @@ -317,7 +316,6 @@ struct Fatal; async fn run( Inner { - cache_path, cleanup_pulse_interval, artifact_ttl, mut artifacts, @@ -361,7 +359,6 @@ async fn run( // will notice it. break_if_fatal!(handle_cleanup_pulse( - &cache_path, &mut to_sweeper_tx, &mut artifacts, artifact_ttl, @@ -380,7 +377,6 @@ async fn run( // If the artifact failed before, it could be re-scheduled for preparation here if // the preparation failure cooldown has elapsed. break_if_fatal!(handle_to_host( - &cache_path, &mut artifacts, &mut to_prepare_queue_tx, &mut to_execute_queue_tx, @@ -402,7 +398,6 @@ async fn run( // We could be eager in terms of reporting and plumb the result from the preparation // worker but we don't for the sake of simplicity. break_if_fatal!(handle_prepare_done( - &cache_path, &mut artifacts, &mut to_execute_queue_tx, &mut awaiting_prepare, @@ -414,7 +409,6 @@ async fn run( } async fn handle_to_host( - cache_path: &Path, artifacts: &mut Artifacts, prepare_queue: &mut mpsc::Sender, execute_queue: &mut mpsc::Sender, @@ -426,15 +420,8 @@ async fn handle_to_host( handle_precheck_pvf(artifacts, prepare_queue, pvf, result_tx).await?; }, ToHost::ExecutePvf(inputs) => { - handle_execute_pvf( - cache_path, - artifacts, - prepare_queue, - execute_queue, - awaiting_prepare, - inputs, - ) - .await?; + handle_execute_pvf(artifacts, prepare_queue, execute_queue, awaiting_prepare, inputs) + .await?; }, ToHost::HeadsUp { active_pvfs } => handle_heads_up(artifacts, prepare_queue, active_pvfs).await?, @@ -454,21 +441,21 @@ async fn handle_precheck_pvf( artifacts: &mut Artifacts, prepare_queue: &mut mpsc::Sender, pvf: PvfPrepData, - result_sender: PrepareResultSender, + result_sender: PrecheckResultSender, ) -> Result<(), Fatal> { let artifact_id = ArtifactId::from_pvf_prep_data(&pvf); if let Some(state) = artifacts.artifact_state_mut(&artifact_id) { match state { - ArtifactState::Prepared { last_time_needed, prepare_stats } => { + ArtifactState::Prepared { last_time_needed, .. } => { *last_time_needed = SystemTime::now(); - let _ = result_sender.send(Ok(prepare_stats.clone())); + let _ = result_sender.send(Ok(())); }, ArtifactState::Preparing { waiting_for_response, num_failures: _ } => waiting_for_response.push(result_sender), ArtifactState::FailedToProcess { error, .. } => { // Do not retry an artifact that previously failed preparation. - let _ = result_sender.send(PrepareResult::Err(error.clone())); + let _ = result_sender.send(PrecheckResult::Err(error.clone())); }, } } else { @@ -491,7 +478,6 @@ async fn handle_precheck_pvf( /// When preparing for execution, we use a more lenient timeout ([`LENIENT_PREPARATION_TIMEOUT`]) /// than when prechecking. async fn handle_execute_pvf( - cache_path: &Path, artifacts: &mut Artifacts, prepare_queue: &mut mpsc::Sender, execute_queue: &mut mpsc::Sender, @@ -504,8 +490,8 @@ async fn handle_execute_pvf( if let Some(state) = artifacts.artifact_state_mut(&artifact_id) { match state { - ArtifactState::Prepared { last_time_needed, .. } => { - let file_metadata = std::fs::metadata(artifact_id.path(cache_path)); + ArtifactState::Prepared { ref path, last_time_needed, .. } => { + let file_metadata = std::fs::metadata(path); if file_metadata.is_ok() { *last_time_needed = SystemTime::now(); @@ -514,7 +500,7 @@ async fn handle_execute_pvf( send_execute( execute_queue, execute::ToQueue::Enqueue { - artifact: ArtifactPathId::new(artifact_id, cache_path), + artifact: ArtifactPathId::new(artifact_id, path), pending_execution_request: PendingExecutionRequest { exec_timeout, params, @@ -677,7 +663,6 @@ async fn handle_heads_up( } async fn handle_prepare_done( - cache_path: &Path, artifacts: &mut Artifacts, execute_queue: &mut mpsc::Sender, awaiting_prepare: &mut AwaitingPrepare, @@ -718,7 +703,8 @@ async fn handle_prepare_done( state { for result_sender in waiting_for_response.drain(..) { - let _ = result_sender.send(result.clone()); + let result = result.clone().map(|_| ()); + let _ = result_sender.send(result); } num_failures } else { @@ -738,16 +724,18 @@ async fn handle_prepare_done( continue } - // Don't send failed artifacts to the execution's queue. - if let Err(ref error) = result { - let _ = result_tx.send(Err(ValidationError::from(error.clone()))); - continue - } + let path = match &result { + Ok(success) => success.path.clone(), + Err(error) => { + let _ = result_tx.send(Err(ValidationError::from(error.clone()))); + continue + }, + }; send_execute( execute_queue, execute::ToQueue::Enqueue { - artifact: ArtifactPathId::new(artifact_id.clone(), cache_path), + artifact: ArtifactPathId::new(artifact_id.clone(), &path), pending_execution_request: PendingExecutionRequest { exec_timeout, params, @@ -760,8 +748,8 @@ async fn handle_prepare_done( } *state = match result { - Ok(prepare_stats) => - ArtifactState::Prepared { last_time_needed: SystemTime::now(), prepare_stats }, + Ok(PrepareSuccess { path, stats: prepare_stats }) => + ArtifactState::Prepared { path, last_time_needed: SystemTime::now(), prepare_stats }, Err(error) => { let last_time_failed = SystemTime::now(); let num_failures = *num_failures + 1; @@ -814,7 +802,6 @@ async fn enqueue_prepare_for_execute( } async fn handle_cleanup_pulse( - cache_path: &Path, sweeper_tx: &mut mpsc::Sender, artifacts: &mut Artifacts, artifact_ttl: Duration, @@ -825,14 +812,13 @@ async fn handle_cleanup_pulse( "PVF pruning: {} artifacts reached their end of life", to_remove.len(), ); - for artifact_id in to_remove { + for (artifact_id, path) in to_remove { gum::debug!( target: LOG_TARGET, validation_code_hash = ?artifact_id.code_hash, "pruning artifact", ); - let artifact_path = artifact_id.path(cache_path); - sweeper_tx.send(artifact_path).await.map_err(|_| Fatal)?; + sweeper_tx.send(path).await.map_err(|_| Fatal)?; } Ok(()) @@ -890,7 +876,11 @@ pub(crate) mod tests { use crate::InvalidCandidate; use assert_matches::assert_matches; use futures::future::BoxFuture; - use polkadot_node_core_pvf_common::{error::PrepareError, prepare::PrepareStats}; + use polkadot_node_core_pvf_common::{ + error::PrepareError, + prepare::{PrepareStats, PrepareSuccess}, + }; + use sp_core::hexdisplay::AsBytesRef; const TEST_EXECUTION_TIMEOUT: Duration = Duration::from_secs(3); pub(crate) const TEST_PREPARATION_TIMEOUT: Duration = Duration::from_secs(30); @@ -910,12 +900,16 @@ pub(crate) mod tests { } /// Creates a new PVF which artifact id can be uniquely identified by the given number. - fn artifact_id(descriminator: u32) -> ArtifactId { - ArtifactId::from_pvf_prep_data(&PvfPrepData::from_discriminator(descriminator)) + fn artifact_id(discriminator: u32) -> ArtifactId { + ArtifactId::from_pvf_prep_data(&PvfPrepData::from_discriminator(discriminator)) } - fn artifact_path(descriminator: u32) -> PathBuf { - artifact_id(descriminator).path(&PathBuf::from(std::env::temp_dir())).to_owned() + fn artifact_path(discriminator: u32) -> PathBuf { + let pvf = PvfPrepData::from_discriminator(discriminator); + let checksum = blake3::hash(pvf.code().as_bytes_ref()); + artifact_id(discriminator) + .path(&PathBuf::from(std::env::temp_dir()), checksum.to_hex().as_str()) + .to_owned() } struct Builder { @@ -953,8 +947,6 @@ pub(crate) mod tests { impl Test { fn new(Builder { cleanup_pulse_interval, artifact_ttl, artifacts }: Builder) -> Self { - let cache_path = PathBuf::from(std::env::temp_dir()); - let (to_host_tx, to_host_rx) = mpsc::channel(10); let (to_prepare_queue_tx, to_prepare_queue_rx) = mpsc::channel(10); let (from_prepare_queue_tx, from_prepare_queue_rx) = mpsc::unbounded(); @@ -962,7 +954,6 @@ pub(crate) mod tests { let (to_sweeper_tx, to_sweeper_rx) = mpsc::channel(10); let run = run(Inner { - cache_path, cleanup_pulse_interval, artifact_ttl, artifacts, @@ -1111,12 +1102,18 @@ pub(crate) mod tests { let mut builder = Builder::default(); builder.cleanup_pulse_interval = Duration::from_millis(100); builder.artifact_ttl = Duration::from_millis(500); - builder - .artifacts - .insert_prepared(artifact_id(1), mock_now, PrepareStats::default()); - builder - .artifacts - .insert_prepared(artifact_id(2), mock_now, PrepareStats::default()); + builder.artifacts.insert_prepared( + artifact_id(1), + artifact_path(1), + mock_now, + PrepareStats::default(), + ); + builder.artifacts.insert_prepared( + artifact_id(2), + artifact_path(2), + mock_now, + PrepareStats::default(), + ); let mut test = builder.build(); let mut host = test.host_handle(); @@ -1188,7 +1185,7 @@ pub(crate) mod tests { test.from_prepare_queue_tx .send(prepare::FromQueue { artifact_id: artifact_id(1), - result: Ok(PrepareStats::default()), + result: Ok(PrepareSuccess::default()), }) .await .unwrap(); @@ -1204,7 +1201,7 @@ pub(crate) mod tests { test.from_prepare_queue_tx .send(prepare::FromQueue { artifact_id: artifact_id(2), - result: Ok(PrepareStats::default()), + result: Ok(PrepareSuccess::default()), }) .await .unwrap(); @@ -1258,7 +1255,7 @@ pub(crate) mod tests { test.from_prepare_queue_tx .send(prepare::FromQueue { artifact_id: artifact_id(1), - result: Ok(PrepareStats::default()), + result: Ok(PrepareSuccess::default()), }) .await .unwrap(); @@ -1371,7 +1368,7 @@ pub(crate) mod tests { test.from_prepare_queue_tx .send(prepare::FromQueue { artifact_id: artifact_id(2), - result: Ok(PrepareStats::default()), + result: Ok(PrepareSuccess::default()), }) .await .unwrap(); @@ -1527,7 +1524,7 @@ pub(crate) mod tests { test.from_prepare_queue_tx .send(prepare::FromQueue { artifact_id: artifact_id(1), - result: Ok(PrepareStats::default()), + result: Ok(PrepareSuccess::default()), }) .await .unwrap(); @@ -1703,7 +1700,7 @@ pub(crate) mod tests { test.from_prepare_queue_tx .send(prepare::FromQueue { artifact_id: artifact_id(1), - result: Ok(PrepareStats::default()), + result: Ok(PrepareSuccess::default()), }) .await .unwrap(); diff --git a/polkadot/node/core/pvf/src/lib.rs b/polkadot/node/core/pvf/src/lib.rs index 102a91dbdad7..7e7a13252548 100644 --- a/polkadot/node/core/pvf/src/lib.rs +++ b/polkadot/node/core/pvf/src/lib.rs @@ -84,7 +84,7 @@ //! A pruning task will run at a fixed interval of time. This task will remove all artifacts that //! weren't used or received a heads up signal for a while. //! -//! ## Execution +//! ## Execution //! //! The execute workers will be fed by the requests from the execution queue, which is basically a //! combination of a path to the compiled artifact and the diff --git a/polkadot/node/core/pvf/src/prepare/pool.rs b/polkadot/node/core/pvf/src/prepare/pool.rs index 8e02f540d321..21af21e5b028 100644 --- a/polkadot/node/core/pvf/src/prepare/pool.rs +++ b/polkadot/node/core/pvf/src/prepare/pool.rs @@ -68,7 +68,7 @@ pub enum ToPool { /// /// In either case, the worker is considered busy and no further `StartWork` messages should be /// sent until either `Concluded` or `Rip` message is received. - StartWork { worker: Worker, pvf: PvfPrepData, artifact_path: PathBuf }, + StartWork { worker: Worker, pvf: PvfPrepData, cache_path: PathBuf }, } /// A message sent from pool to its client. @@ -232,7 +232,7 @@ fn handle_to_pool( .boxed(), ); }, - ToPool::StartWork { worker, pvf, artifact_path } => { + ToPool::StartWork { worker, pvf, cache_path } => { if let Some(data) = spawned.get_mut(worker) { if let Some(idle) = data.idle.take() { let preparation_timer = metrics.time_preparation(); @@ -242,7 +242,7 @@ fn handle_to_pool( worker, idle, pvf, - artifact_path, + cache_path, preparation_timer, ) .boxed(), @@ -303,10 +303,10 @@ async fn start_work_task( worker: Worker, idle: IdleWorker, pvf: PvfPrepData, - artifact_path: PathBuf, + cache_path: PathBuf, _preparation_timer: Option, ) -> PoolEvent { - let outcome = worker_intf::start_work(&metrics, idle, pvf, artifact_path).await; + let outcome = worker_intf::start_work(&metrics, idle, pvf, cache_path).await; PoolEvent::StartWork(worker, outcome) } diff --git a/polkadot/node/core/pvf/src/prepare/queue.rs b/polkadot/node/core/pvf/src/prepare/queue.rs index c38012d74548..c140a6cafda0 100644 --- a/polkadot/node/core/pvf/src/prepare/queue.rs +++ b/polkadot/node/core/pvf/src/prepare/queue.rs @@ -268,12 +268,12 @@ fn find_idle_worker(queue: &mut Queue) -> Option { } async fn handle_from_pool(queue: &mut Queue, from_pool: pool::FromPool) -> Result<(), Fatal> { - use pool::FromPool::*; + use pool::FromPool; match from_pool { - Spawned(worker) => handle_worker_spawned(queue, worker).await?, - Concluded { worker, rip, result } => + FromPool::Spawned(worker) => handle_worker_spawned(queue, worker).await?, + FromPool::Concluded { worker, rip, result } => handle_worker_concluded(queue, worker, rip, result).await?, - Rip(worker) => handle_worker_rip(queue, worker).await?, + FromPool::Rip(worker) => handle_worker_rip(queue, worker).await?, } Ok(()) } @@ -424,17 +424,17 @@ async fn spawn_extra_worker(queue: &mut Queue, critical: bool) -> Result<(), Fat /// Attaches the work to the given worker telling the poll about the job. async fn assign(queue: &mut Queue, worker: Worker, job: Job) -> Result<(), Fatal> { let job_data = &mut queue.jobs[job]; - - let artifact_id = ArtifactId::from_pvf_prep_data(&job_data.pvf); - let artifact_path = artifact_id.path(&queue.cache_path); - job_data.worker = Some(worker); queue.workers[worker].job = Some(job); send_pool( &mut queue.to_pool_tx, - pool::ToPool::StartWork { worker, pvf: job_data.pvf.clone(), artifact_path }, + pool::ToPool::StartWork { + worker, + pvf: job_data.pvf.clone(), + cache_path: queue.cache_path.clone(), + }, ) .await?; @@ -491,7 +491,7 @@ mod tests { use crate::host::tests::TEST_PREPARATION_TIMEOUT; use assert_matches::assert_matches; use futures::{future::BoxFuture, FutureExt}; - use polkadot_node_core_pvf_common::{error::PrepareError, prepare::PrepareStats}; + use polkadot_node_core_pvf_common::{error::PrepareError, prepare::PrepareSuccess}; use slotmap::SlotMap; use std::task::Poll; @@ -612,7 +612,7 @@ mod tests { test.send_from_pool(pool::FromPool::Concluded { worker: w, rip: false, - result: Ok(PrepareStats::default()), + result: Ok(PrepareSuccess::default()), }); assert_eq!( @@ -651,7 +651,7 @@ mod tests { test.send_from_pool(pool::FromPool::Concluded { worker: w1, rip: false, - result: Ok(PrepareStats::default()), + result: Ok(PrepareSuccess::default()), }); assert_matches!(test.poll_and_recv_to_pool().await, pool::ToPool::StartWork { .. }); @@ -697,7 +697,7 @@ mod tests { test.send_from_pool(pool::FromPool::Concluded { worker: w1, rip: false, - result: Ok(PrepareStats::default()), + result: Ok(PrepareSuccess::default()), }); assert_eq!(test.poll_and_recv_to_pool().await, pool::ToPool::Kill(w1)); } @@ -731,7 +731,7 @@ mod tests { test.send_from_pool(pool::FromPool::Concluded { worker: w1, rip: true, - result: Ok(PrepareStats::default()), + result: Ok(PrepareSuccess::default()), }); // Since there is still work, the queue requested one extra worker to spawn to handle the diff --git a/polkadot/node/core/pvf/src/prepare/worker_intf.rs b/polkadot/node/core/pvf/src/prepare/worker_intf.rs index a22fa74b2fe1..e7f142a46bb8 100644 --- a/polkadot/node/core/pvf/src/prepare/worker_intf.rs +++ b/polkadot/node/core/pvf/src/prepare/worker_intf.rs @@ -17,6 +17,7 @@ //! Host interface to the prepare worker. use crate::{ + artifacts::ArtifactId, metrics::Metrics, security, worker_intf::{ @@ -27,8 +28,8 @@ use crate::{ }; use parity_scale_codec::{Decode, Encode}; use polkadot_node_core_pvf_common::{ - error::{PrepareError, PrepareResult}, - prepare::PrepareStats, + error::{PrepareError, PrepareResult, PrepareWorkerResult}, + prepare::{PrepareStats, PrepareSuccess, PrepareWorkerSuccess}, pvf::PvfPrepData, worker_dir, SecurityStatus, }; @@ -81,7 +82,7 @@ pub enum Outcome { /// final destination location. RenameTmpFile { worker: IdleWorker, - result: PrepareResult, + result: PrepareWorkerResult, err: String, // Unfortunately `PathBuf` doesn't implement `Encode`/`Decode`, so we do a fallible // conversion to `Option`. @@ -115,7 +116,7 @@ pub async fn start_work( metrics: &Metrics, worker: IdleWorker, pvf: PvfPrepData, - artifact_path: PathBuf, + cache_path: PathBuf, ) -> Outcome { let IdleWorker { stream, pid, worker_dir } = worker; @@ -123,8 +124,8 @@ pub async fn start_work( target: LOG_TARGET, worker_pid = %pid, ?worker_dir, - "starting prepare for {}", - artifact_path.display(), + "starting prepare for {:?}", + pvf, ); with_worker_dir_setup( @@ -135,7 +136,7 @@ pub async fn start_work( let preparation_timeout = pvf.prep_timeout(); let audit_log_file = security::AuditLogFile::try_open_and_seek_to_end().await; - if let Err(err) = send_request(&mut stream, pvf.clone()).await { + if let Err(err) = send_request(&mut stream, &pvf).await { gum::warn!( target: LOG_TARGET, worker_pid = %pid, @@ -159,7 +160,7 @@ pub async fn start_work( match result { // Received bytes from worker within the time limit. - Ok(Ok(prepare_result)) => { + Ok(Ok(prepare_worker_result)) => { // Check if any syscall violations occurred during the job. For now this is only // informative, as we are not enforcing the seccomp policy yet. for syscall in security::check_seccomp_violations_for_worker(audit_log_file, pid).await { @@ -175,10 +176,11 @@ pub async fn start_work( handle_response( metrics, IdleWorker { stream, pid, worker_dir }, - prepare_result, + prepare_worker_result, pid, tmp_artifact_file, - artifact_path, + &pvf, + &cache_path, preparation_timeout, ) .await @@ -215,20 +217,22 @@ pub async fn start_work( async fn handle_response( metrics: &Metrics, worker: IdleWorker, - result: PrepareResult, + result: PrepareWorkerResult, worker_pid: u32, tmp_file: PathBuf, - artifact_path: PathBuf, + pvf: &PvfPrepData, + cache_path: &PathBuf, preparation_timeout: Duration, ) -> Outcome { - let PrepareStats { cpu_time_elapsed, memory_stats } = match result.clone() { - Ok(result) => result, - // Timed out on the child. This should already be logged by the child. - Err(PrepareError::TimedOut) => return Outcome::TimedOut, - Err(PrepareError::JobDied(err)) => return Outcome::JobDied(err), - Err(PrepareError::OutOfMemory) => return Outcome::OutOfMemory, - Err(_) => return Outcome::Concluded { worker, result }, - }; + let PrepareWorkerSuccess { checksum, stats: PrepareStats { cpu_time_elapsed, memory_stats } } = + match result.clone() { + Ok(result) => result, + // Timed out on the child. This should already be logged by the child. + Err(PrepareError::TimedOut) => return Outcome::TimedOut, + Err(PrepareError::JobDied(err)) => return Outcome::JobDied(err), + Err(PrepareError::OutOfMemory) => return Outcome::OutOfMemory, + Err(err) => return Outcome::Concluded { worker, result: Err(err) }, + }; if cpu_time_elapsed > preparation_timeout { // The job didn't complete within the timeout. @@ -243,6 +247,9 @@ async fn handle_response( return Outcome::TimedOut } + let artifact_id = ArtifactId::from_pvf_prep_data(pvf); + let artifact_path = artifact_id.path(cache_path, &checksum); + gum::debug!( target: LOG_TARGET, %worker_pid, @@ -252,7 +259,13 @@ async fn handle_response( ); let outcome = match tokio::fs::rename(&tmp_file, &artifact_path).await { - Ok(()) => Outcome::Concluded { worker, result }, + Ok(()) => Outcome::Concluded { + worker, + result: Ok(PrepareSuccess { + path: artifact_path, + stats: PrepareStats { cpu_time_elapsed, memory_stats: memory_stats.clone() }, + }), + }, Err(err) => { gum::warn!( target: LOG_TARGET, @@ -329,14 +342,14 @@ where outcome } -async fn send_request(stream: &mut UnixStream, pvf: PvfPrepData) -> io::Result<()> { +async fn send_request(stream: &mut UnixStream, pvf: &PvfPrepData) -> io::Result<()> { framed_send(stream, &pvf.encode()).await?; Ok(()) } -async fn recv_response(stream: &mut UnixStream, pid: u32) -> io::Result { +async fn recv_response(stream: &mut UnixStream, pid: u32) -> io::Result { let result = framed_recv(stream).await?; - let result = PrepareResult::decode(&mut &result[..]).map_err(|e| { + let result = PrepareWorkerResult::decode(&mut &result[..]).map_err(|e| { // We received invalid bytes from the worker. let bound_bytes = &result[..result.len().min(4)]; gum::warn!( diff --git a/polkadot/node/core/pvf/src/worker_intf.rs b/polkadot/node/core/pvf/src/worker_intf.rs index 8f9a7de354b8..5e589b9abcee 100644 --- a/polkadot/node/core/pvf/src/worker_intf.rs +++ b/polkadot/node/core/pvf/src/worker_intf.rs @@ -198,7 +198,7 @@ pub async fn tmppath_in(prefix: &str, dir: &Path) -> io::Result { /// The same as [`tmppath_in`], but uses [`std::env::temp_dir`] as the directory. pub async fn tmppath(prefix: &str) -> io::Result { - let temp_dir = PathBuf::from(std::env::temp_dir()); + let temp_dir = std::env::temp_dir(); tmppath_in(prefix, &temp_dir).await } @@ -453,7 +453,7 @@ impl Drop for WorkerDir { /// artifacts from previous jobs. pub fn clear_worker_dir_path(worker_dir_path: &Path) -> io::Result<()> { fn remove_dir_contents(path: &Path) -> io::Result<()> { - for entry in std::fs::read_dir(&path)? { + for entry in std::fs::read_dir(path)? { let entry = entry?; let path = entry.path(); diff --git a/polkadot/node/core/pvf/tests/it/main.rs b/polkadot/node/core/pvf/tests/it/main.rs index d2d842cf84a3..5bdf49cc719e 100644 --- a/polkadot/node/core/pvf/tests/it/main.rs +++ b/polkadot/node/core/pvf/tests/it/main.rs @@ -20,8 +20,7 @@ use assert_matches::assert_matches; use parity_scale_codec::Encode as _; use polkadot_node_core_pvf::{ start, testing::build_workers_and_get_paths, Config, InvalidCandidate, Metrics, PrepareError, - PrepareJobKind, PrepareStats, PvfPrepData, ValidationError, ValidationHost, - JOB_TIMEOUT_WALL_CLOCK_FACTOR, + PrepareJobKind, PvfPrepData, ValidationError, ValidationHost, JOB_TIMEOUT_WALL_CLOCK_FACTOR, }; use polkadot_parachain_primitives::primitives::{BlockData, ValidationParams, ValidationResult}; use polkadot_primitives::{ExecutorParam, ExecutorParams}; @@ -70,7 +69,7 @@ impl TestHost { &self, code: &[u8], executor_params: ExecutorParams, - ) -> Result { + ) -> Result<(), PrepareError> { let (result_tx, result_rx) = futures::channel::oneshot::channel(); let code = sp_maybe_compressed_blob::decompress(code, 16 * 1024 * 1024) diff --git a/substrate/utils/build-script-utils/src/version.rs b/substrate/utils/build-script-utils/src/version.rs index f6a9ff9554ab..549e499b1102 100644 --- a/substrate/utils/build-script-utils/src/version.rs +++ b/substrate/utils/build-script-utils/src/version.rs @@ -59,3 +59,34 @@ fn get_version(impl_commit: &str) -> String { impl_commit ) } + +/// Generate `SUBSTRATE_WASMTIME_VERSION` +pub fn generate_wasmtime_version() { + generate_dependency_version("wasmtime", "SUBSTRATE_WASMTIME_VERSION"); +} + +fn generate_dependency_version(dep: &str, env_var: &str) { + // we only care about the root + match std::process::Command::new("cargo") + .args(["tree", "--depth=0", "--locked", "--package", dep]) + .output() + { + Ok(output) if output.status.success() => { + let version = String::from_utf8_lossy(&output.stdout); + + // vX.X.X + if let Some(ver) = version.strip_prefix(&format!("{} v", dep)) { + println!("cargo:rustc-env={}={}", env_var, ver); + } else { + println!("cargo:warning=Unexpected result {}", version); + } + }, + + // command errors out when it could not find the given dependency + // or when having multiple versions of it + Ok(output) => + println!("cargo:warning=`cargo tree` {}", String::from_utf8_lossy(&output.stderr)), + + Err(err) => println!("cargo:warning=Could not run `cargo tree`: {}", err), + } +}