diff --git a/Cargo.lock b/Cargo.lock index 227fd8968080..6a6c45c26dd1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4646,6 +4646,22 @@ dependencies = [ "sp-runtime", ] +[[package]] +name = "polkadot-node-collation-generation" +version = "0.1.0" +dependencies = [ + "derive_more 0.99.9", + "futures 0.3.5", + "log 0.4.11", + "polkadot-erasure-coding", + "polkadot-node-primitives", + "polkadot-node-subsystem", + "polkadot-node-subsystem-test-helpers", + "polkadot-node-subsystem-util", + "polkadot-primitives", + "sp-core", +] + [[package]] name = "polkadot-node-core-av-store" version = "0.1.0" @@ -4809,6 +4825,7 @@ dependencies = [ name = "polkadot-node-primitives" version = "0.1.0" dependencies = [ + "futures 0.3.5", "parity-scale-codec", "polkadot-primitives", "polkadot-statement-table", diff --git a/Cargo.toml b/Cargo.toml index c36511f813cf..1c98ca8d0b85 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -44,6 +44,7 @@ members = [ "service", "validation", + "node/collation-generation", "node/core/av-store", "node/core/backing", "node/core/bitfield-signing", diff --git a/node/collation-generation/Cargo.toml b/node/collation-generation/Cargo.toml new file mode 100644 index 000000000000..f7d5e7f162ef --- /dev/null +++ b/node/collation-generation/Cargo.toml @@ -0,0 +1,19 @@ +[package] +name = "polkadot-node-collation-generation" +version = "0.1.0" +authors = ["Parity Technologies "] +edition = "2018" + +[dependencies] +derive_more = "0.99.9" +futures = "0.3.5" +log = "0.4.8" +polkadot-erasure-coding = { path = "../../erasure-coding" } +polkadot-node-primitives = { path = "../primitives" } +polkadot-node-subsystem = { path = "../subsystem" } +polkadot-node-subsystem-util = { path = "../subsystem-util" } +polkadot-primitives = { path = "../../primitives" } +sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" } + +[dev-dependencies] +polkadot-node-subsystem-test-helpers = { path = "../subsystem-test-helpers" } diff --git a/node/collation-generation/src/lib.rs b/node/collation-generation/src/lib.rs new file mode 100644 index 000000000000..3ad76ff7f76d --- /dev/null +++ b/node/collation-generation/src/lib.rs @@ -0,0 +1,652 @@ +// Copyright 2020 Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see . + +//! The collation generation subsystem is the interface between polkadot and the collators. + +#![deny(missing_docs)] + +use futures::{ + channel::{mpsc, oneshot}, + future::FutureExt, + join, + select, + sink::SinkExt, + stream::StreamExt, +}; +use polkadot_node_primitives::CollationGenerationConfig; +use polkadot_node_subsystem::{ + errors::RuntimeApiError, + messages::{AllMessages, CollationGenerationMessage, CollatorProtocolMessage}, + FromOverseer, SpawnedSubsystem, Subsystem, SubsystemContext, SubsystemError, SubsystemResult, +}; +use polkadot_node_subsystem_util::{ + self as util, request_availability_cores_ctx, request_global_validation_data_ctx, + request_local_validation_data_ctx, request_validators_ctx, +}; +use polkadot_primitives::v1::{ + collator_signature_payload, validation_data_hash, AvailableData, CandidateCommitments, + CandidateDescriptor, CandidateReceipt, CoreState, GlobalValidationData, Hash, + LocalValidationData, OccupiedCoreAssumption, PoV, +}; +use sp_core::crypto::Pair; +use std::sync::Arc; + +/// Collation Generation Subsystem +pub struct CollationGenerationSubsystem { + config: Option>, +} + +impl CollationGenerationSubsystem { + /// Run this subsystem + /// + /// Conceptually, this is very simple: it just loops forever. + /// + /// - On incoming overseer messages, it starts or stops jobs as appropriate. + /// - On other incoming messages, if they can be converted into Job::ToJob and + /// include a hash, then they're forwarded to the appropriate individual job. + /// - On outgoing messages from the jobs, it forwards them to the overseer. + /// + /// If `err_tx` is not `None`, errors are forwarded onto that channel as they occur. + /// Otherwise, most are logged and then discarded. + async fn run(mut self, mut ctx: Context) + where + Context: SubsystemContext, + { + // when we activate new leaves, we spawn a bunch of sub-tasks, each of which is + // expected to generate precisely one message. We don't want to block the main loop + // at any point waiting for them all, so instead, we create a channel on which they can + // send those messages. We can then just monitor the channel and forward messages on it + // to the overseer here, via the context. + let (sender, mut receiver) = mpsc::channel(0); + + loop { + select! { + incoming = ctx.recv().fuse() => { + if self.handle_incoming::(incoming, &mut ctx, &sender).await { + break; + } + }, + msg = receiver.next().fuse() => { + if let Some(msg) = msg { + if let Err(err) = ctx.send_message(msg).await { + log::warn!(target: "collation_generation", "failed to forward message to overseer: {:?}", err); + break; + } + } + }, + } + } + } + + // handle an incoming message. return true if we should break afterwards. + // note: this doesn't strictly need to be a separate function; it's more an administrative function + // so that we don't clutter the run loop. It could in principle be inlined directly into there. + // it should hopefully therefore be ok that it's an async function mutably borrowing self. + async fn handle_incoming( + &mut self, + incoming: SubsystemResult>, + ctx: &mut Context, + sender: &mpsc::Sender, + ) -> bool + where + Context: SubsystemContext, + { + use polkadot_node_subsystem::ActiveLeavesUpdate; + use polkadot_node_subsystem::FromOverseer::{Communication, Signal}; + use polkadot_node_subsystem::OverseerSignal::{ActiveLeaves, BlockFinalized, Conclude}; + + match incoming { + Ok(Signal(ActiveLeaves(ActiveLeavesUpdate { activated, .. }))) => { + // follow the procedure from the guide + if let Some(config) = &self.config { + if let Err(err) = + handle_new_activations(config.clone(), &activated, ctx, sender).await + { + log::warn!(target: "collation_generation", "failed to handle new activations: {:?}", err); + return true; + }; + } + false + } + Ok(Signal(Conclude)) => true, + Ok(Communication { + msg: CollationGenerationMessage::Initialize(config), + }) => { + if self.config.is_some() { + log::warn!(target: "collation_generation", "double initialization"); + true + } else { + self.config = Some(Arc::new(config)); + false + } + } + Ok(Signal(BlockFinalized(_))) => false, + Err(err) => { + log::error!(target: "collation_generation", "error receiving message from subsystem context: {:?}", err); + true + } + } + } +} + +impl Subsystem for CollationGenerationSubsystem +where + Context: SubsystemContext, +{ + fn start(self, ctx: Context) -> SpawnedSubsystem { + let subsystem = CollationGenerationSubsystem { config: None }; + + let future = Box::pin(subsystem.run(ctx)); + + SpawnedSubsystem { + name: "CollationGenerationSubsystem", + future, + } + } +} + +#[derive(Debug, derive_more::From)] +enum Error { + #[from] + Subsystem(SubsystemError), + #[from] + OneshotRecv(oneshot::Canceled), + #[from] + Runtime(RuntimeApiError), + #[from] + Util(util::Error), + #[from] + Erasure(polkadot_erasure_coding::Error), +} + +type Result = std::result::Result; + +async fn handle_new_activations( + config: Arc, + activated: &[Hash], + ctx: &mut Context, + sender: &mpsc::Sender, +) -> Result<()> { + // follow the procedure from the guide: + // https://w3f.github.io/parachain-implementers-guide/node/collators/collation-generation.html + + for relay_parent in activated.iter().copied() { + // double-future magic happens here: the first layer of requests takes a mutable borrow of the context, and + // returns a receiver. The second layer of requests actually polls those receivers to completion. + let (global_validation_data, availability_cores, validators) = join!( + request_global_validation_data_ctx(relay_parent, ctx).await?, + request_availability_cores_ctx(relay_parent, ctx).await?, + request_validators_ctx(relay_parent, ctx).await?, + ); + + let global_validation_data = global_validation_data??; + let availability_cores = availability_cores??; + let n_validators = validators??.len(); + + for core in availability_cores { + let (scheduled_core, assumption) = match core { + CoreState::Scheduled(scheduled_core) => { + (scheduled_core, OccupiedCoreAssumption::Free) + } + CoreState::Occupied(_occupied_core) => { + // TODO: https://github.com/paritytech/polkadot/issues/1573 + continue; + } + _ => continue, + }; + + if scheduled_core.para_id != config.para_id { + continue; + } + + // we get local validation data synchronously for each core instead of within the subtask loop, + // because we have only a single mutable handle to the context, so the work can't really be distributed + let local_validation_data = match request_local_validation_data_ctx( + relay_parent, + scheduled_core.para_id, + assumption, + ctx, + ) + .await? + .await?? + { + Some(local_validation_data) => local_validation_data, + None => continue, + }; + + let task_global_validation_data = global_validation_data.clone(); + let task_config = config.clone(); + let mut task_sender = sender.clone(); + ctx.spawn("collation generation collation builder", Box::pin(async move { + let validation_data_hash = + validation_data_hash(&task_global_validation_data, &local_validation_data); + + let collation = (task_config.collator)(&task_global_validation_data, &local_validation_data).await; + + let pov_hash = collation.proof_of_validity.hash(); + + let signature_payload = collator_signature_payload( + &relay_parent, + &scheduled_core.para_id, + &validation_data_hash, + &pov_hash, + ); + + let erasure_root = match erasure_root(n_validators, local_validation_data, task_global_validation_data, collation.proof_of_validity.clone()) { + Ok(erasure_root) => erasure_root, + Err(err) => { + log::error!(target: "collation_generation", "failed to calculate erasure root for para_id {}: {:?}", scheduled_core.para_id, err); + return + } + }; + + let commitments = CandidateCommitments { + fees: collation.fees, + upward_messages: collation.upward_messages, + new_validation_code: collation.new_validation_code, + head_data: collation.head_data, + erasure_root, + }; + + let ccr = CandidateReceipt { + commitments_hash: commitments.hash(), + descriptor: CandidateDescriptor { + signature: task_config.key.sign(&signature_payload), + para_id: scheduled_core.para_id, + relay_parent, + collator: task_config.key.public(), + validation_data_hash, + pov_hash, + }, + }; + + if let Err(err) = task_sender.send(AllMessages::CollatorProtocol( + CollatorProtocolMessage::DistributeCollation(ccr, collation.proof_of_validity) + )).await { + log::warn!(target: "collation_generation", "failed to send collation result for para_id {}: {:?}", scheduled_core.para_id, err); + } + })).await?; + } + } + + Ok(()) +} + +fn erasure_root( + n_validators: usize, + local_validation_data: LocalValidationData, + global_validation_data: GlobalValidationData, + pov: PoV, +) -> Result { + let omitted_validation = polkadot_primitives::v1::OmittedValidationData { + global_validation: global_validation_data, + local_validation: local_validation_data, + }; + + let available_data = AvailableData { + omitted_validation, + pov, + }; + + let chunks = polkadot_erasure_coding::obtain_chunks_v1(n_validators, &available_data)?; + Ok(polkadot_erasure_coding::branches(&chunks).root()) +} + +#[cfg(test)] +mod tests { + mod handle_new_activations { + use super::super::*; + use futures::{ + lock::Mutex, + task::{Context as FuturesContext, Poll}, + Future, + }; + use polkadot_node_primitives::Collation; + use polkadot_node_subsystem::messages::{ + AllMessages, RuntimeApiMessage, RuntimeApiRequest, + }; + use polkadot_node_subsystem_test_helpers::{ + subsystem_test_harness, TestSubsystemContextHandle, + }; + use polkadot_primitives::v1::{ + BlockData, BlockNumber, CollatorPair, GlobalValidationData, Id as ParaId, + LocalValidationData, PoV, ScheduledCore, + }; + use std::pin::Pin; + + fn test_collation() -> Collation { + Collation { + fees: Default::default(), + upward_messages: Default::default(), + new_validation_code: Default::default(), + head_data: Default::default(), + proof_of_validity: PoV { + block_data: BlockData(Vec::new()), + }, + } + } + + // Box + Unpin + Send + struct TestCollator; + + impl Future for TestCollator { + type Output = Collation; + + fn poll(self: Pin<&mut Self>, _cx: &mut FuturesContext) -> Poll { + Poll::Ready(test_collation()) + } + } + + impl Unpin for TestCollator {} + + fn test_config>(para_id: Id) -> Arc { + Arc::new(CollationGenerationConfig { + key: CollatorPair::generate().0, + collator: Box::new(|_gvd: &GlobalValidationData, _lvd: &LocalValidationData| { + Box::new(TestCollator) + }), + para_id: para_id.into(), + }) + } + + fn scheduled_core_for>(para_id: Id) -> ScheduledCore { + ScheduledCore { + para_id: para_id.into(), + collator: None, + } + } + + #[test] + fn requests_validation_and_availability_per_relay_parent() { + let activated_hashes: Vec = vec![ + [1; 32].into(), + [4; 32].into(), + [9; 32].into(), + [16; 32].into(), + ]; + + let requested_validation_data = Arc::new(Mutex::new(Vec::new())); + let requested_availability_cores = Arc::new(Mutex::new(Vec::new())); + + let overseer_requested_validation_data = requested_validation_data.clone(); + let overseer_requested_availability_cores = requested_availability_cores.clone(); + let overseer = |mut handle: TestSubsystemContextHandle| async move { + loop { + match handle.try_recv().await { + None => break, + Some(AllMessages::RuntimeApi(RuntimeApiMessage::Request(hash, RuntimeApiRequest::GlobalValidationData(tx)))) => { + overseer_requested_validation_data.lock().await.push(hash); + tx.send(Ok(Default::default())).unwrap(); + } + Some(AllMessages::RuntimeApi(RuntimeApiMessage::Request(hash, RuntimeApiRequest::AvailabilityCores(tx)))) => { + overseer_requested_availability_cores.lock().await.push(hash); + tx.send(Ok(vec![])).unwrap(); + } + Some(AllMessages::RuntimeApi(RuntimeApiMessage::Request(_hash, RuntimeApiRequest::Validators(tx)))) => { + tx.send(Ok(vec![Default::default(); 3])).unwrap(); + } + Some(msg) => panic!("didn't expect any other overseer requests given no availability cores; got {:?}", msg), + } + } + }; + + let (tx, _rx) = mpsc::channel(0); + + let subsystem_activated_hashes = activated_hashes.clone(); + subsystem_test_harness(overseer, |mut ctx| async move { + handle_new_activations( + test_config(123), + &subsystem_activated_hashes, + &mut ctx, + &tx, + ) + .await + .unwrap(); + }); + + let mut requested_validation_data = Arc::try_unwrap(requested_validation_data) + .expect("overseer should have shut down by now") + .into_inner(); + requested_validation_data.sort(); + let mut requested_availability_cores = Arc::try_unwrap(requested_availability_cores) + .expect("overseer should have shut down by now") + .into_inner(); + requested_availability_cores.sort(); + + assert_eq!(requested_validation_data, activated_hashes); + assert_eq!(requested_availability_cores, activated_hashes); + } + + #[test] + fn requests_local_validation_for_scheduled_matches() { + let activated_hashes: Vec = vec![ + Hash::repeat_byte(1), + Hash::repeat_byte(4), + Hash::repeat_byte(9), + Hash::repeat_byte(16), + ]; + + let requested_local_validation_data = Arc::new(Mutex::new(Vec::new())); + + let overseer_requested_local_validation_data = requested_local_validation_data.clone(); + let overseer = |mut handle: TestSubsystemContextHandle| async move { + loop { + match handle.try_recv().await { + None => break, + Some(AllMessages::RuntimeApi(RuntimeApiMessage::Request( + _hash, + RuntimeApiRequest::GlobalValidationData(tx), + ))) => { + tx.send(Ok(Default::default())).unwrap(); + } + Some(AllMessages::RuntimeApi(RuntimeApiMessage::Request( + hash, + RuntimeApiRequest::AvailabilityCores(tx), + ))) => { + tx.send(Ok(vec![ + CoreState::Free, + // this is weird, see explanation below + CoreState::Scheduled(scheduled_core_for( + (hash.as_fixed_bytes()[0] * 4) as u32, + )), + CoreState::Scheduled(scheduled_core_for( + (hash.as_fixed_bytes()[0] * 5) as u32, + )), + ])) + .unwrap(); + } + Some(AllMessages::RuntimeApi(RuntimeApiMessage::Request( + hash, + RuntimeApiRequest::LocalValidationData( + _para_id, + _occupied_core_assumption, + tx, + ), + ))) => { + overseer_requested_local_validation_data + .lock() + .await + .push(hash); + tx.send(Ok(Default::default())).unwrap(); + } + Some(AllMessages::RuntimeApi(RuntimeApiMessage::Request( + _hash, + RuntimeApiRequest::Validators(tx), + ))) => { + tx.send(Ok(vec![Default::default(); 3])).unwrap(); + } + Some(msg) => { + panic!("didn't expect any other overseer requests; got {:?}", msg) + } + } + } + }; + + let (tx, _rx) = mpsc::channel(0); + + subsystem_test_harness(overseer, |mut ctx| async move { + handle_new_activations(test_config(16), &activated_hashes, &mut ctx, &tx) + .await + .unwrap(); + }); + + let requested_local_validation_data = Arc::try_unwrap(requested_local_validation_data) + .expect("overseer should have shut down by now") + .into_inner(); + + // the only activated hash should be from the 4 hash: + // each activated hash generates two scheduled cores: one with its value * 4, one with its value * 5 + // given that the test configuration has a para_id of 16, there's only one way to get that value: with the 4 + // hash. + assert_eq!(requested_local_validation_data, vec![[4; 32].into()]); + } + + #[test] + fn sends_distribute_collation_message() { + let activated_hashes: Vec = vec![ + Hash::repeat_byte(1), + Hash::repeat_byte(4), + Hash::repeat_byte(9), + Hash::repeat_byte(16), + ]; + + let overseer = |mut handle: TestSubsystemContextHandle| async move { + loop { + match handle.try_recv().await { + None => break, + Some(AllMessages::RuntimeApi(RuntimeApiMessage::Request( + _hash, + RuntimeApiRequest::GlobalValidationData(tx), + ))) => { + tx.send(Ok(Default::default())).unwrap(); + } + Some(AllMessages::RuntimeApi(RuntimeApiMessage::Request( + hash, + RuntimeApiRequest::AvailabilityCores(tx), + ))) => { + tx.send(Ok(vec![ + CoreState::Free, + // this is weird, see explanation below + CoreState::Scheduled(scheduled_core_for( + (hash.as_fixed_bytes()[0] * 4) as u32, + )), + CoreState::Scheduled(scheduled_core_for( + (hash.as_fixed_bytes()[0] * 5) as u32, + )), + ])) + .unwrap(); + } + Some(AllMessages::RuntimeApi(RuntimeApiMessage::Request( + _hash, + RuntimeApiRequest::LocalValidationData( + _para_id, + _occupied_core_assumption, + tx, + ), + ))) => { + tx.send(Ok(Some(Default::default()))).unwrap(); + } + Some(AllMessages::RuntimeApi(RuntimeApiMessage::Request( + _hash, + RuntimeApiRequest::Validators(tx), + ))) => { + tx.send(Ok(vec![Default::default(); 3])).unwrap(); + } + Some(msg) => { + panic!("didn't expect any other overseer requests; got {:?}", msg) + } + } + } + }; + + let config = test_config(16); + let subsystem_config = config.clone(); + + let (tx, rx) = mpsc::channel(0); + + // empty vec doesn't allocate on the heap, so it's ok we throw it away + let sent_messages = Arc::new(Mutex::new(Vec::new())); + let subsystem_sent_messages = sent_messages.clone(); + subsystem_test_harness(overseer, |mut ctx| async move { + handle_new_activations(subsystem_config, &activated_hashes, &mut ctx, &tx) + .await + .unwrap(); + + std::mem::drop(tx); + + // collect all sent messages + *subsystem_sent_messages.lock().await = rx.collect().await; + }); + + let sent_messages = Arc::try_unwrap(sent_messages) + .expect("subsystem should have shut down by now") + .into_inner(); + + // we expect a single message to be sent, containing a candidate receipt. + // we don't care too much about the commitments_hash right now, but let's ensure that we've calculated the + // correct descriptor + let expect_pov_hash = test_collation().proof_of_validity.hash(); + let expect_validation_data_hash = + validation_data_hash::(&Default::default(), &Default::default()); + let expect_relay_parent = Hash::repeat_byte(4); + let expect_payload = collator_signature_payload( + &expect_relay_parent, + &config.para_id, + &expect_validation_data_hash, + &expect_pov_hash, + ); + let expect_descriptor = CandidateDescriptor { + signature: config.key.sign(&expect_payload), + para_id: config.para_id, + relay_parent: expect_relay_parent, + collator: config.key.public(), + validation_data_hash: expect_validation_data_hash, + pov_hash: expect_pov_hash, + }; + + assert_eq!(sent_messages.len(), 1); + match &sent_messages[0] { + AllMessages::CollatorProtocol(CollatorProtocolMessage::DistributeCollation( + CandidateReceipt { descriptor, .. }, + _pov, + )) => { + // signature generation is non-deterministic, so we can't just assert that the + // expected descriptor is correct. What we can do is validate that the produced + // descriptor has a valid signature, then just copy in the generated signature + // and check the rest of the fields for equality. + assert!(CollatorPair::verify( + &descriptor.signature, + &collator_signature_payload( + &descriptor.relay_parent, + &descriptor.para_id, + &descriptor.validation_data_hash, + &descriptor.pov_hash, + ) + .as_ref(), + &descriptor.collator, + )); + let expect_descriptor = { + let mut expect_descriptor = expect_descriptor; + expect_descriptor.signature = descriptor.signature.clone(); + expect_descriptor + }; + assert_eq!(descriptor, &expect_descriptor); + } + _ => panic!("received wrong message type"), + } + } + } +} diff --git a/node/overseer/examples/minimal-example.rs b/node/overseer/examples/minimal-example.rs index 3b12da323ba9..3ebcb3482558 100644 --- a/node/overseer/examples/minimal-example.rs +++ b/node/overseer/examples/minimal-example.rs @@ -145,7 +145,6 @@ fn main() { candidate_validation: Subsystem2, candidate_backing: Subsystem1, candidate_selection: DummySubsystem, - collator_protocol: DummySubsystem, statement_distribution: DummySubsystem, availability_distribution: DummySubsystem, bitfield_signing: DummySubsystem, @@ -156,6 +155,8 @@ fn main() { availability_store: DummySubsystem, network_bridge: DummySubsystem, chain_api: DummySubsystem, + collation_generation: DummySubsystem, + collator_protocol: DummySubsystem, }; let (overseer, _handler) = Overseer::new( vec![], diff --git a/node/overseer/src/lib.rs b/node/overseer/src/lib.rs index d4931baa3306..71f5d17e6dbb 100644 --- a/node/overseer/src/lib.rs +++ b/node/overseer/src/lib.rs @@ -78,8 +78,8 @@ use polkadot_subsystem::messages::{ CandidateValidationMessage, CandidateBackingMessage, CandidateSelectionMessage, ChainApiMessage, StatementDistributionMessage, AvailabilityDistributionMessage, BitfieldSigningMessage, BitfieldDistributionMessage, - ProvisionerMessage, PoVDistributionMessage, RuntimeApiMessage, CollatorProtocolMessage, - AvailabilityStoreMessage, NetworkBridgeMessage, AllMessages, + ProvisionerMessage, PoVDistributionMessage, RuntimeApiMessage, + AvailabilityStoreMessage, NetworkBridgeMessage, AllMessages, CollationGenerationMessage, CollatorProtocolMessage, }; pub use polkadot_subsystem::{ Subsystem, SubsystemContext, OverseerSignal, FromOverseer, SubsystemError, SubsystemResult, @@ -333,9 +333,6 @@ pub struct Overseer { /// A candidate selection subsystem. candidate_selection_subsystem: OverseenSubsystem, - /// A collator protocol subsystem - collator_protocol_subsystem: OverseenSubsystem, - /// A statement distribution subsystem. statement_distribution_subsystem: OverseenSubsystem, @@ -363,9 +360,15 @@ pub struct Overseer { /// A network bridge subsystem. network_bridge_subsystem: OverseenSubsystem, - /// A Chain API subsystem + /// A Chain API subsystem. chain_api_subsystem: OverseenSubsystem, + /// A Collation Generation subsystem. + collation_generation_subsystem: OverseenSubsystem, + + /// A Collator Protocol subsystem. + collator_protocol_subsystem: OverseenSubsystem, + /// Spawner to spawn tasks to. s: S, @@ -398,15 +401,13 @@ pub struct Overseer { /// /// [`Subsystem`]: trait.Subsystem.html /// [`DummySubsystem`]: struct.DummySubsystem.html -pub struct AllSubsystems { +pub struct AllSubsystems { /// A candidate validation subsystem. pub candidate_validation: CV, /// A candidate backing subsystem. pub candidate_backing: CB, /// A candidate selection subsystem. pub candidate_selection: CS, - /// A collator protocol subsystem. - pub collator_protocol: CP, /// A statement distribution subsystem. pub statement_distribution: SD, /// An availability distribution subsystem. @@ -427,6 +428,10 @@ pub struct AllSubsystems Overseer @@ -499,7 +504,6 @@ where /// candidate_validation: ValidationSubsystem, /// candidate_backing: DummySubsystem, /// candidate_selection: DummySubsystem, - /// collator_protocol: DummySubsystem, /// statement_distribution: DummySubsystem, /// availability_distribution: DummySubsystem, /// bitfield_signing: DummySubsystem, @@ -510,6 +514,8 @@ where /// availability_store: DummySubsystem, /// network_bridge: DummySubsystem, /// chain_api: DummySubsystem, + /// collation_generation: DummySubsystem, + /// collator_protocol: DummySubsystem, /// }; /// let (overseer, _handler) = Overseer::new( /// vec![], @@ -530,16 +536,15 @@ where /// # /// # }); } /// ``` - pub fn new( + pub fn new( leaves: impl IntoIterator, - all_subsystems: AllSubsystems, + all_subsystems: AllSubsystems, mut s: S, ) -> SubsystemResult<(Self, OverseerHandler)> where CV: Subsystem> + Send, CB: Subsystem> + Send, CS: Subsystem> + Send, - CP: Subsystem> + Send, SD: Subsystem> + Send, AD: Subsystem> + Send, BS: Subsystem> + Send, @@ -550,6 +555,8 @@ where AS: Subsystem> + Send, NB: Subsystem> + Send, CA: Subsystem> + Send, + CG: Subsystem> + Send, + CP: Subsystem> + Send, { let (events_tx, events_rx) = mpsc::channel(CHANNEL_CAPACITY); @@ -581,13 +588,6 @@ where all_subsystems.candidate_selection, )?; - let collator_protocol_subsystem = spawn( - &mut s, - &mut running_subsystems, - &mut running_subsystems_rx, - all_subsystems.collator_protocol, - )?; - let statement_distribution_subsystem = spawn( &mut s, &mut running_subsystems, @@ -658,6 +658,21 @@ where all_subsystems.chain_api, )?; + let collation_generation_subsystem = spawn( + &mut s, + &mut running_subsystems, + &mut running_subsystems_rx, + all_subsystems.collation_generation, + )?; + + + let collator_protocol_subsystem = spawn( + &mut s, + &mut running_subsystems, + &mut running_subsystems_rx, + all_subsystems.collator_protocol, + )?; + let active_leaves = HashSet::new(); let leaves = leaves @@ -669,7 +684,6 @@ where candidate_validation_subsystem, candidate_backing_subsystem, candidate_selection_subsystem, - collator_protocol_subsystem, statement_distribution_subsystem, availability_distribution_subsystem, bitfield_signing_subsystem, @@ -680,6 +694,8 @@ where availability_store_subsystem, network_bridge_subsystem, chain_api_subsystem, + collation_generation_subsystem, + collator_protocol_subsystem, s, running_subsystems, running_subsystems_rx, @@ -705,10 +721,6 @@ where let _ = s.tx.send(FromOverseer::Signal(OverseerSignal::Conclude)).await; } - if let Some(ref mut s) = self.collator_protocol_subsystem.instance { - let _ = s.tx.send(FromOverseer::Signal(OverseerSignal::Conclude)).await; - } - if let Some(ref mut s) = self.statement_distribution_subsystem.instance { let _ = s.tx.send(FromOverseer::Signal(OverseerSignal::Conclude)).await; } @@ -745,6 +757,14 @@ where let _ = s.tx.send(FromOverseer::Signal(OverseerSignal::Conclude)).await; } + if let Some(ref mut s) = self.collator_protocol_subsystem.instance { + let _ = s.tx.send(FromOverseer::Signal(OverseerSignal::Conclude)).await; + } + + if let Some(ref mut s) = self.collation_generation_subsystem.instance { + let _ = s.tx.send(FromOverseer::Signal(OverseerSignal::Conclude)).await; + } + let mut stop_delay = Delay::new(Duration::from_secs(STOP_DELAY)).fuse(); loop { @@ -863,10 +883,6 @@ where s.tx.send(FromOverseer::Signal(signal.clone())).await?; } - if let Some(ref mut s) = self.collator_protocol_subsystem.instance { - s.tx.send(FromOverseer::Signal(signal.clone())).await?; - } - if let Some(ref mut s) = self.statement_distribution_subsystem.instance { s.tx.send(FromOverseer::Signal(signal.clone())).await?; } @@ -900,7 +916,15 @@ where } if let Some(ref mut s) = self.chain_api_subsystem.instance { - s.tx.send(FromOverseer::Signal(signal)).await?; + s.tx.send(FromOverseer::Signal(signal.clone())).await?; + } + + if let Some(ref mut s) = self.collator_protocol_subsystem.instance { + s.tx.send(FromOverseer::Signal(signal.clone())).await?; + } + + if let Some(ref mut s) = self.collation_generation_subsystem.instance { + s.tx.send(FromOverseer::Signal(signal.clone())).await?; } Ok(()) @@ -923,11 +947,6 @@ where let _ = s.tx.send(FromOverseer::Communication { msg }).await; } } - AllMessages::CollatorProtocol(msg) => { - if let Some(ref mut s) = self.collator_protocol_subsystem.instance { - let _ = s.tx.send(FromOverseer::Communication { msg }).await; - } - } AllMessages::StatementDistribution(msg) => { if let Some(ref mut s) = self.statement_distribution_subsystem.instance { let _ = s.tx.send(FromOverseer::Communication { msg }).await; @@ -978,6 +997,16 @@ where let _ = s.tx.send(FromOverseer::Communication { msg }).await; } } + AllMessages::CollationGeneration(msg) => { + if let Some(ref mut s) = self.collation_generation_subsystem.instance { + let _ = s.tx.send(FromOverseer::Communication { msg }).await; + } + } + AllMessages::CollatorProtocol(msg) => { + if let Some(ref mut s) = self.collator_protocol_subsystem.instance { + let _ = s.tx.send(FromOverseer::Communication { msg }).await; + } + } } } @@ -1130,7 +1159,6 @@ mod tests { candidate_validation: TestSubsystem1(s1_tx), candidate_backing: TestSubsystem2(s2_tx), candidate_selection: DummySubsystem, - collator_protocol: DummySubsystem, statement_distribution: DummySubsystem, availability_distribution: DummySubsystem, bitfield_signing: DummySubsystem, @@ -1141,6 +1169,8 @@ mod tests { availability_store: DummySubsystem, network_bridge: DummySubsystem, chain_api: DummySubsystem, + collation_generation: DummySubsystem, + collator_protocol: DummySubsystem, }; let (overseer, mut handler) = Overseer::new( vec![], @@ -1195,7 +1225,6 @@ mod tests { candidate_validation: TestSubsystem1(s1_tx), candidate_backing: TestSubsystem4, candidate_selection: DummySubsystem, - collator_protocol: DummySubsystem, statement_distribution: DummySubsystem, availability_distribution: DummySubsystem, bitfield_signing: DummySubsystem, @@ -1206,6 +1235,8 @@ mod tests { availability_store: DummySubsystem, network_bridge: DummySubsystem, chain_api: DummySubsystem, + collation_generation: DummySubsystem, + collator_protocol: DummySubsystem, }; let (overseer, _handle) = Overseer::new( vec![], @@ -1313,7 +1344,6 @@ mod tests { candidate_validation: TestSubsystem5(tx_5), candidate_backing: TestSubsystem6(tx_6), candidate_selection: DummySubsystem, - collator_protocol: DummySubsystem, statement_distribution: DummySubsystem, availability_distribution: DummySubsystem, bitfield_signing: DummySubsystem, @@ -1324,6 +1354,8 @@ mod tests { availability_store: DummySubsystem, network_bridge: DummySubsystem, chain_api: DummySubsystem, + collation_generation: DummySubsystem, + collator_protocol: DummySubsystem, }; let (overseer, mut handler) = Overseer::new( vec![first_block], @@ -1416,7 +1448,6 @@ mod tests { candidate_validation: TestSubsystem5(tx_5), candidate_backing: TestSubsystem6(tx_6), candidate_selection: DummySubsystem, - collator_protocol: DummySubsystem, statement_distribution: DummySubsystem, availability_distribution: DummySubsystem, bitfield_signing: DummySubsystem, @@ -1427,6 +1458,8 @@ mod tests { availability_store: DummySubsystem, network_bridge: DummySubsystem, chain_api: DummySubsystem, + collation_generation: DummySubsystem, + collator_protocol: DummySubsystem, }; // start with two forks of different height. let (overseer, mut handler) = Overseer::new( diff --git a/node/primitives/Cargo.toml b/node/primitives/Cargo.toml index b38e7e542ed2..81e2467b374f 100644 --- a/node/primitives/Cargo.toml +++ b/node/primitives/Cargo.toml @@ -6,6 +6,7 @@ edition = "2018" description = "Primitives types for the Node-side" [dependencies] +futures = "0.3.5" polkadot-primitives = { path = "../../primitives" } polkadot-statement-table = { path = "../../statement-table" } parity-scale-codec = { version = "1.3.4", default-features = false, features = ["derive"] } diff --git a/node/primitives/src/lib.rs b/node/primitives/src/lib.rs index 2bcc7a392647..2ff704c2dd16 100644 --- a/node/primitives/src/lib.rs +++ b/node/primitives/src/lib.rs @@ -20,12 +20,13 @@ //! not shared between the node and the runtime. This crate builds on top of the primitives defined //! there. +use futures::Future; use parity_scale_codec::{Decode, Encode}; use polkadot_primitives::v1::{ Hash, CommittedCandidateReceipt, CandidateReceipt, CompactStatement, EncodeAs, Signed, SigningContext, ValidatorIndex, ValidatorId, UpwardMessage, Balance, ValidationCode, GlobalValidationData, LocalValidationData, - HeadData, + HeadData, PoV, CollatorPair, Id as ParaId, }; use polkadot_statement_table::{ generic::{ @@ -258,3 +259,39 @@ impl std::convert::TryFrom for MisbehaviorReport { } } } + +/// The output of a collator. +/// +/// This differs from `CandidateCommitments` in two ways: +/// +/// - does not contain the erasure root; that's computed at the Polkadot level, not at Cumulus +/// - contains a proof of validity. +#[derive(Clone, Encode, Decode)] +pub struct Collation { + /// Fees paid from the chain to the relay chain validators. + pub fees: Balance, + /// Messages destined to be interpreted by the Relay chain itself. + pub upward_messages: Vec, + /// New validation code. + pub new_validation_code: Option, + /// The head-data produced as a result of execution. + pub head_data: HeadData, + /// Proof that this block is valid. + pub proof_of_validity: PoV, +} + +/// Configuration for the collation generator +pub struct CollationGenerationConfig { + /// Collator's authentication key, so it can sign things. + pub key: CollatorPair, + /// Collation function. + pub collator: Box Box + Unpin + Send> + Send + Sync>, + /// The parachain that this collator collates for + pub para_id: ParaId, +} + +impl std::fmt::Debug for CollationGenerationConfig { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "CollationGenerationConfig {{ ... }}") + } +} diff --git a/node/service/src/lib.rs b/node/service/src/lib.rs index ca350454f025..01b1a240906b 100644 --- a/node/service/src/lib.rs +++ b/node/service/src/lib.rs @@ -281,7 +281,6 @@ fn real_overseer( candidate_validation: DummySubsystem, candidate_backing: DummySubsystem, candidate_selection: DummySubsystem, - collator_protocol: DummySubsystem, statement_distribution: DummySubsystem, availability_distribution: DummySubsystem, bitfield_signing: DummySubsystem, @@ -292,6 +291,8 @@ fn real_overseer( availability_store: DummySubsystem, network_bridge: DummySubsystem, chain_api: DummySubsystem, + collation_generation: DummySubsystem, + collator_protocol: DummySubsystem, }; Overseer::new( leaves, diff --git a/node/subsystem-test-helpers/src/lib.rs b/node/subsystem-test-helpers/src/lib.rs index 4d9b70d4f8e3..1a9bae84b149 100644 --- a/node/subsystem-test-helpers/src/lib.rs +++ b/node/subsystem-test-helpers/src/lib.rs @@ -16,19 +16,21 @@ //! Utilities for testing subsystems. -use polkadot_node_subsystem::{SubsystemContext, FromOverseer, SubsystemResult, SubsystemError}; use polkadot_node_subsystem::messages::AllMessages; +use polkadot_node_subsystem::{FromOverseer, SubsystemContext, SubsystemError, SubsystemResult}; -use futures::prelude::*; use futures::channel::mpsc; use futures::poll; +use futures::prelude::*; +use futures_timer::Delay; use parking_lot::Mutex; -use sp_core::traits::SpawnNamed; +use sp_core::{testing::TaskExecutor, traits::SpawnNamed}; use std::convert::Infallible; use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll, Waker}; +use std::time::Duration; enum SinkState { Empty { @@ -50,24 +52,21 @@ pub struct SingleItemStream(Arc>>); impl Sink for SingleItemSink { type Error = Infallible; - fn poll_ready( - self: Pin<&mut Self>, - cx: &mut Context, - ) -> Poll> { + fn poll_ready(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { let mut state = self.0.lock(); match *state { SinkState::Empty { .. } => Poll::Ready(Ok(())), - SinkState::Item { ref mut ready_waker, .. } => { + SinkState::Item { + ref mut ready_waker, + .. + } => { *ready_waker = Some(cx.waker().clone()); Poll::Pending } } } - fn start_send( - self: Pin<&mut Self>, - item: T, - ) -> Result<(), Infallible> { + fn start_send(self: Pin<&mut Self>, item: T) -> Result<(), Infallible> { let mut state = self.0.lock(); match *state { @@ -88,24 +87,21 @@ impl Sink for SingleItemSink { Ok(()) } - fn poll_flush( - self: Pin<&mut Self>, - cx: &mut Context, - ) -> Poll> { + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { let mut state = self.0.lock(); match *state { SinkState::Empty { .. } => Poll::Ready(Ok(())), - SinkState::Item { ref mut flush_waker, .. } => { + SinkState::Item { + ref mut flush_waker, + .. + } => { *flush_waker = Some(cx.waker().clone()); Poll::Pending } } } - fn poll_close( - self: Pin<&mut Self>, - cx: &mut Context, - ) -> Poll> { + fn poll_close(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { self.poll_flush(cx) } } @@ -120,7 +116,11 @@ impl Stream for SingleItemStream { match std::mem::replace(&mut *state, SinkState::Empty { read_waker }) { SinkState::Empty { .. } => Poll::Pending, - SinkState::Item { item, ready_waker, flush_waker } => { + SinkState::Item { + item, + ready_waker, + flush_waker, + } => { if let Some(waker) = ready_waker { waker.wake(); } @@ -141,10 +141,7 @@ impl Stream for SingleItemStream { /// not when the item is buffered. pub fn single_item_sink() -> (SingleItemSink, SingleItemStream) { let inner = Arc::new(Mutex::new(SinkState::Empty { read_waker: None })); - ( - SingleItemSink(inner.clone()), - SingleItemStream(inner), - ) + (SingleItemSink(inner.clone()), SingleItemStream(inner)) } /// A test subsystem context. @@ -155,7 +152,9 @@ pub struct TestSubsystemContext { } #[async_trait::async_trait] -impl SubsystemContext for TestSubsystemContext { +impl SubsystemContext + for TestSubsystemContext +{ type Message = M; async fn try_recv(&mut self) -> Result>, ()> { @@ -170,23 +169,33 @@ impl SubsystemContext for Tes self.rx.next().await.ok_or(SubsystemError) } - async fn spawn(&mut self, name: &'static str, s: Pin + Send>>) - -> SubsystemResult<()> - { + async fn spawn( + &mut self, + name: &'static str, + s: Pin + Send>>, + ) -> SubsystemResult<()> { self.spawn.spawn(name, s); Ok(()) } async fn send_message(&mut self, msg: AllMessages) -> SubsystemResult<()> { - self.tx.send(msg).await.expect("test overseer no longer live"); + self.tx + .send(msg) + .await + .expect("test overseer no longer live"); Ok(()) } async fn send_messages(&mut self, msgs: T) -> SubsystemResult<()> - where T: IntoIterator + Send, T::IntoIter: Send + where + T: IntoIterator + Send, + T::IntoIter: Send, { let mut iter = stream::iter(msgs.into_iter().map(Ok)); - self.tx.send_all(&mut iter).await.expect("test overseer no longer live"); + self.tx + .send_all(&mut iter) + .await + .expect("test overseer no longer live"); Ok(()) } @@ -202,19 +211,27 @@ impl TestSubsystemContextHandle { /// Send a message or signal to the subsystem. This resolves at the point in time where the /// subsystem has _read_ the message. pub async fn send(&mut self, from_overseer: FromOverseer) { - self.tx.send(from_overseer).await.expect("Test subsystem no longer live"); + self.tx + .send(from_overseer) + .await + .expect("Test subsystem no longer live"); } /// Receive the next message from the subsystem. pub async fn recv(&mut self) -> AllMessages { - self.rx.next().await.expect("Test subsystem no longer live") + self.try_recv().await.expect("Test subsystem no longer live") + } + + /// Receive the next message from the subsystem, or `None` if the channel has been closed. + pub async fn try_recv(&mut self) -> Option { + self.rx.next().await } } /// Make a test subsystem context. -pub fn make_subsystem_context(spawn: S) - -> (TestSubsystemContext, TestSubsystemContextHandle) -{ +pub fn make_subsystem_context( + spawn: S, +) -> (TestSubsystemContext, TestSubsystemContextHandle) { let (overseer_tx, overseer_rx) = single_item_sink(); let (all_messages_tx, all_messages_rx) = mpsc::unbounded(); @@ -226,7 +243,39 @@ pub fn make_subsystem_context(spawn: S) }, TestSubsystemContextHandle { tx: overseer_tx, - rx: all_messages_rx + rx: all_messages_rx, }, ) -} \ No newline at end of file +} + +/// Test a subsystem, mocking the overseer +/// +/// Pass in two async closures: one mocks the overseer, the other runs the test from the perspective of a subsystem. +/// +/// Times out in two seconds. +pub fn subsystem_test_harness( + overseer_factory: OverseerFactory, + test_factory: TestFactory, +) where + OverseerFactory: FnOnce(TestSubsystemContextHandle) -> Overseer, + Overseer: Future, + TestFactory: FnOnce(TestSubsystemContext) -> Test, + Test: Future, +{ + let pool = TaskExecutor::new(); + let (context, handle) = make_subsystem_context(pool); + let overseer = overseer_factory(handle); + let test = test_factory(context); + + let timeout = Delay::new(Duration::from_secs(2)); + + futures::pin_mut!(overseer, test, timeout); + + futures::executor::block_on(async move { + futures::select! { + _ = overseer.fuse() => (), + _ = test.fuse() => (), + _ = timeout.fuse() => panic!("test timed out instead of completing"), + } + }); +} diff --git a/node/subsystem-util/src/lib.rs b/node/subsystem-util/src/lib.rs index cc8cbd239dd9..d6998b267fea 100644 --- a/node/subsystem-util/src/lib.rs +++ b/node/subsystem-util/src/lib.rs @@ -189,6 +189,89 @@ specialize_requests! { fn request_candidate_events() -> Vec; CandidateEvents; } +/// Request some data from the `RuntimeApi` via a SubsystemContext. +async fn request_from_runtime_ctx( + parent: Hash, + ctx: &mut Context, + request_builder: RequestBuilder, +) -> Result, Error> +where + RequestBuilder: FnOnce(RuntimeApiSender) -> RuntimeApiRequest, + Context: SubsystemContext, +{ + let (tx, rx) = oneshot::channel(); + + ctx + .send_message( + AllMessages::RuntimeApi(RuntimeApiMessage::Request(parent, request_builder(tx))) + .try_into() + .map_err(|err| Error::SenderConversion(format!("{:?}", err)))?, + ) + .await?; + + Ok(rx) +} + + +/// Construct specialized request functions for the runtime. +/// +/// These would otherwise get pretty repetitive. +macro_rules! specialize_requests_ctx { + // expand return type name for documentation purposes + (fn $func_name:ident( $( $param_name:ident : $param_ty:ty ),* ) -> $return_ty:ty ; $request_variant:ident;) => { + specialize_requests_ctx!{ + named stringify!($request_variant) ; fn $func_name( $( $param_name : $param_ty ),* ) -> $return_ty ; $request_variant; + } + }; + + // create a single specialized request function + (named $doc_name:expr ; fn $func_name:ident( $( $param_name:ident : $param_ty:ty ),* ) -> $return_ty:ty ; $request_variant:ident;) => { + #[doc = "Request `"] + #[doc = $doc_name] + #[doc = "` from the runtime via a `SubsystemContext`"] + pub async fn $func_name( + parent: Hash, + $( + $param_name: $param_ty, + )* + ctx: &mut Context, + ) -> Result, Error> { + request_from_runtime_ctx(parent, ctx, |tx| RuntimeApiRequest::$request_variant( + $( $param_name, )* tx + )).await + } + }; + + // recursive decompose + ( + fn $func_name:ident( $( $param_name:ident : $param_ty:ty ),* ) -> $return_ty:ty ; $request_variant:ident; + $( + fn $t_func_name:ident( $( $t_param_name:ident : $t_param_ty:ty ),* ) -> $t_return_ty:ty ; $t_request_variant:ident; + )+ + ) => { + specialize_requests_ctx!{ + fn $func_name( $( $param_name : $param_ty ),* ) -> $return_ty ; $request_variant ; + } + specialize_requests_ctx!{ + $( + fn $t_func_name( $( $t_param_name : $t_param_ty ),* ) -> $t_return_ty ; $t_request_variant ; + )+ + } + }; +} + +specialize_requests_ctx! { + fn request_validators_ctx() -> Vec; Validators; + fn request_validator_groups_ctx() -> (Vec>, GroupRotationInfo); ValidatorGroups; + fn request_availability_cores_ctx() -> Vec; AvailabilityCores; + fn request_global_validation_data_ctx() -> GlobalValidationData; GlobalValidationData; + fn request_local_validation_data_ctx(para_id: ParaId, assumption: OccupiedCoreAssumption) -> Option; LocalValidationData; + fn request_session_index_for_child_ctx() -> SessionIndex; SessionIndexForChild; + fn request_validation_code_ctx(para_id: ParaId, assumption: OccupiedCoreAssumption) -> Option; ValidationCode; + fn request_candidate_pending_availability_ctx(para_id: ParaId) -> Option; CandidatePendingAvailability; + fn request_candidate_events_ctx() -> Vec; CandidateEvents; +} + /// From the given set of validators, find the first key we can sign with, if any. pub fn signing_key(validators: &[ValidatorId], keystore: &KeyStorePtr) -> Option { let keystore = keystore.read(); diff --git a/node/subsystem/src/messages.rs b/node/subsystem/src/messages.rs index 60813124015a..ddea2639e1eb 100644 --- a/node/subsystem/src/messages.rs +++ b/node/subsystem/src/messages.rs @@ -24,20 +24,19 @@ use futures::channel::{mpsc, oneshot}; -use polkadot_primitives::v1::{ - Hash, CommittedCandidateReceipt, CollatorId, - CandidateReceipt, PoV, ErasureChunk, BackedCandidate, Id as ParaId, - SignedAvailabilityBitfield, ValidatorId, ValidationCode, ValidatorIndex, - CoreAssignment, CoreOccupied, CandidateDescriptor, - ValidatorSignature, OmittedValidationData, AvailableData, GroupRotationInfo, - CoreState, LocalValidationData, GlobalValidationData, OccupiedCoreAssumption, - CandidateEvent, SessionIndex, BlockNumber, +use polkadot_node_network_protocol::{ + v1 as protocol_v1, NetworkBridgeEvent, ReputationChange, PeerId, PeerSet, }; use polkadot_node_primitives::{ - MisbehaviorReport, SignedFullStatement, ValidationResult, + CollationGenerationConfig, MisbehaviorReport, SignedFullStatement, ValidationResult, }; -use polkadot_node_network_protocol::{ - v1 as protocol_v1, NetworkBridgeEvent, ReputationChange, PeerId, PeerSet, +use polkadot_primitives::v1::{ + AvailableData, BackedCandidate, BlockNumber, CandidateDescriptor, CandidateEvent, + CandidateReceipt, CollatorId, CommittedCandidateReceipt, + CoreAssignment, CoreOccupied, CoreState, ErasureChunk, GlobalValidationData, GroupRotationInfo, + Hash, Id as ParaId, LocalValidationData, OccupiedCoreAssumption, OmittedValidationData, PoV, + SessionIndex, SignedAvailabilityBitfield, ValidationCode, ValidatorId, ValidatorIndex, + ValidatorSignature, }; use std::sync::Arc; @@ -82,7 +81,6 @@ pub enum CandidateBackingMessage { Statement(Hash, SignedFullStatement), } - impl CandidateBackingMessage { /// If the current variant contains the relay parent hash, return it. pub fn relay_parent(&self) -> Option { @@ -505,6 +503,20 @@ impl PoVDistributionMessage { } } +/// Message to the Collation Generation Subsystem. +#[derive(Debug)] +pub enum CollationGenerationMessage { + /// Initialize the collation generation subsystem + Initialize(CollationGenerationConfig), +} + +impl CollationGenerationMessage { + /// If the current variant contains the relay parent hash, return it. + pub fn relay_parent(&self) -> Option { + None + } +} + /// A message type tying together all message types that are used across Subsystems. #[derive(Debug)] pub enum AllMessages { @@ -536,4 +548,6 @@ pub enum AllMessages { AvailabilityStore(AvailabilityStoreMessage), /// Message for the network bridge subsystem. NetworkBridge(NetworkBridgeMessage), + /// Message for the Collation Generation subsystem + CollationGeneration(CollationGenerationMessage), } diff --git a/roadmap/implementers-guide/src/node/collators/collation-generation.md b/roadmap/implementers-guide/src/node/collators/collation-generation.md index d27ba7408508..ab3f80273d60 100644 --- a/roadmap/implementers-guide/src/node/collators/collation-generation.md +++ b/roadmap/implementers-guide/src/node/collators/collation-generation.md @@ -4,33 +4,48 @@ The collation generation subsystem is executed on collator nodes and produces ca ## Protocol -Input: None +Input: `CollationGenerationMessage` -Output: CollationDistributionMessage +```rust +enum CollationGenerationMessage { + Initialize(CollationGenerationConfig), +} +``` + +No more than one initialization message should ever be sent to the collation generation subsystem. + +Output: `CollationDistributionMessage` ## Functionality The process of generating a collation for a parachain is very parachain-specific. As such, the details of how to do so are left beyond the scope of this description. The subsystem should be implemented as an abstract wrapper, which is aware of this configuration: ```rust +pub struct Collation { + /// Hash of `CandidateCommitments` as understood by the collator. + pub commitments_hash: Hash, + pub proof_of_validity: PoV, +} + struct CollationGenerationConfig { - key: CollatorPair, - collation_producer: Fn(params) -> async (HeadData, Vec, PoV), + key: CollatorPair, + collator: Box Box>> + para_id: ParaId, } ``` The configuration should be optional, to allow for the case where the node is not run with the capability to collate. On `ActiveLeavesUpdate`: - * If there is no collation generation config, ignore. - * Otherwise, for each `activated` head in the update: - * Determine if the para is scheduled or is next up on any occupied core by fetching the `availability_cores` Runtime API. - * Determine an occupied core assumption to make about the para. The simplest thing to do is to always assume that if the para occupies a core, that the candidate will become available. Further on, this might be determined based on bitfields seen or validator requests. - * Use the Runtime API subsystem to fetch the full validation data. - * Construct validation function params based on validation data. - * Invoke the `collation_producer`. - * Construct a `CommittedCandidateReceipt` using the outputs of the `collation_producer` and signing with the `key`. - * Dispatch a [`CollatorProtocolMessage`][CPM]`::DistributeCollation(receipt, pov)`. + +* If there is no collation generation config, ignore. +* Otherwise, for each `activated` head in the update: + * Determine if the para is scheduled on any core by fetching the `availability_cores` Runtime API. + > TODO: figure out what to do in the case of occupied cores; see [this issue](https://github.com/paritytech/polkadot/issues/1573). + * Determine an occupied core assumption to make about the para. Scheduled cores can make `OccupiedCoreAssumption::Free`. + * Use the Runtime API subsystem to fetch the full validation data. + * Invoke the `collator`, and use its outputs to produce a `CandidateReceipt`, signed with the configuration's `key`. + * Dispatch a [`CollatorProtocolMessage`][CPM]`::DistributeCollation(receipt, pov)`. [CP]: collator-protocol.md [CPM]: ../../types/overseer-protocol.md#collatorprotocolmessage