Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

implement collation generation subsystem #1557

Merged
merged 25 commits into from
Aug 17, 2020
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
d098d0d
start sketching out a collation generation subsystem
coriolinus Aug 10, 2020
8056a3d
invent a basic strategy for double initialization
coriolinus Aug 10, 2020
1fbf5d3
clean up warnings
coriolinus Aug 10, 2020
280805e
impl util requests from runtime assuming a context instead of a FromJ…
coriolinus Aug 10, 2020
c8a05ea
implement collation generation algorithm from guide
coriolinus Aug 11, 2020
a3b0849
update AllMessages in tests
coriolinus Aug 12, 2020
a60162e
fix trivial review comments
coriolinus Aug 12, 2020
2268267
Merge remote-tracking branch 'origin/master' into prgn-collation-gene…
coriolinus Aug 12, 2020
c999987
remove another redundant declaration from merge
coriolinus Aug 12, 2020
8570982
filter availability cores by para_id
coriolinus Aug 13, 2020
e5d25aa
handle new activations each in their own async task
coriolinus Aug 13, 2020
cc54f99
update guide according to the actual current implementation
coriolinus Aug 13, 2020
cba3a37
add initialization to guide
coriolinus Aug 13, 2020
8cbf8ff
add general-purpose subsystem_test_harness helper
coriolinus Aug 13, 2020
430df65
write first handle_new_activations test
coriolinus Aug 13, 2020
cce8c39
Merge remote-tracking branch 'origin/master' into prgn-collation-gene…
coriolinus Aug 14, 2020
9bedae7
add test that handle_new_activations filters local_validation_data re…
coriolinus Aug 14, 2020
ccaeee5
add (failing) test of collation distribution message sending
coriolinus Aug 14, 2020
b306de3
rustfmt
coriolinus Aug 14, 2020
b40f639
broken: work on fixing sender test
coriolinus Aug 14, 2020
b87c4d6
fix broken test
coriolinus Aug 14, 2020
ddbf460
collation function returns commitments hash
coriolinus Aug 14, 2020
e5baa5d
add missing overseer impls
coriolinus Aug 14, 2020
fd90cd4
calculating erasure coding is polkadot's responsibility, not cumulus
coriolinus Aug 17, 2020
38024e2
concurrentize per-relay_parent requests
coriolinus Aug 17, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ members = [
"service",
"validation",

"node/collation-generation",
"node/core/av-store",
"node/core/backing",
"node/core/bitfield-signing",
Expand Down
15 changes: 15 additions & 0 deletions node/collation-generation/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
[package]
name = "polkadot-node-collation-generation"
version = "0.1.0"
authors = ["Parity Technologies <admin@parity.io>"]
edition = "2018"

[dependencies]
derive_more = "0.99.9"
futures = "0.3.5"
log = "0.4.8"
polkadot-node-primitives = { path = "../primitives" }
polkadot-node-subsystem = { path = "../subsystem" }
polkadot-node-subsystem-util = { path = "../subsystem-util" }
polkadot-primitives = { path = "../../primitives" }
sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" }
222 changes: 222 additions & 0 deletions node/collation-generation/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,222 @@
// Copyright 2020 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.

// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.

// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.

// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.

//! The collation generation subsystem is the interface between polkadot and the collators.

#![deny(missing_docs)]

use futures::channel::oneshot;
use polkadot_node_primitives::CollationGenerationConfig;
use polkadot_node_subsystem::{
errors::RuntimeApiError,
messages::{AllMessages, CollationGenerationMessage, CollatorProtocolMessage},
FromOverseer, SpawnedSubsystem, Subsystem, SubsystemContext, SubsystemError, SubsystemResult,
};
use polkadot_node_subsystem_util::{
self as util, request_availability_cores_ctx, request_global_validation_data_ctx,
request_local_validation_data_ctx,
};
use polkadot_primitives::v1::{
collator_signature_payload, validation_data_hash, CandidateCommitments, CandidateDescriptor,
CommittedCandidateReceipt, CoreState, Hash, OccupiedCoreAssumption,
};
use sp_core::crypto::Pair;

/// Collation Generation Subsystem
pub struct CollationGenerationSubsystem {
config: Option<CollationGenerationConfig>,
}

impl CollationGenerationSubsystem {
/// Run this subsystem
///
/// Conceptually, this is very simple: it just loops forever.
///
/// - On incoming overseer messages, it starts or stops jobs as appropriate.
/// - On other incoming messages, if they can be converted into Job::ToJob and
/// include a hash, then they're forwarded to the appropriate individual job.
/// - On outgoing messages from the jobs, it forwards them to the overseer.
///
/// If `err_tx` is not `None`, errors are forwarded onto that channel as they occur.
/// Otherwise, most are logged and then discarded.
async fn run<Context>(mut self, mut ctx: Context)
where
Context: SubsystemContext<Message = CollationGenerationMessage>,
{
loop {
let incoming = ctx.recv().await;
if self.handle_incoming::<Context>(incoming, &mut ctx).await {
break;
}
}
}

// handle an incoming message. return true if we should break afterwards.
// note: this doesn't strictly need to be a separate function; it's more an administrative function
// so that we don't clutter the run loop. It could in principle be inlined directly into there.
// it should hopefully therefore be ok that it's an async function mutably borrowing self.
async fn handle_incoming<Context>(
&mut self,
incoming: SubsystemResult<FromOverseer<Context::Message>>,
ctx: &mut Context,
) -> bool
where
Context: SubsystemContext<Message = CollationGenerationMessage>,
{
use polkadot_node_subsystem::ActiveLeavesUpdate;
use polkadot_node_subsystem::FromOverseer::{Communication, Signal};
use polkadot_node_subsystem::OverseerSignal::{ActiveLeaves, BlockFinalized, Conclude};

match incoming {
Ok(Signal(ActiveLeaves(ActiveLeavesUpdate { activated, .. }))) => {
// follow the procedure from the guide
if let Some(ref config) = self.config {
if let Err(err) = handle_new_activations(config, &activated, ctx).await {
log::warn!(target: "collation_generation", "failed to handle new activations: {:?}", err);
return true;
};
}
false
}
Ok(Signal(Conclude)) => true,
Ok(Communication {
msg: CollationGenerationMessage::Initialize(config),
}) => {
if self.config.is_some() {
log::warn!(target: "collation_generation", "double initialization");
true
} else {
self.config = Some(config);
false
}
}
Ok(Signal(BlockFinalized(_))) => false,
Err(err) => {
log::error!(target: "collation_generation", "error receiving message from subsystem context: {:?}", err);
true
}
}
}
}

impl<Context> Subsystem<Context> for CollationGenerationSubsystem
where
Context: SubsystemContext<Message = CollationGenerationMessage>,
{
fn start(self, ctx: Context) -> SpawnedSubsystem {
let subsystem = CollationGenerationSubsystem { config: None };

let future = Box::pin(subsystem.run(ctx));

SpawnedSubsystem {
name: "CollationGenerationSubsystem",
future,
}
}
}

#[derive(Debug, derive_more::From)]
enum Error {
#[from]
Subsystem(SubsystemError),
#[from]
OneshotRecv(oneshot::Canceled),
#[from]
Runtime(RuntimeApiError),
#[from]
Util(util::Error),
}

type Result<T> = std::result::Result<T, Error>;

async fn handle_new_activations<Context: SubsystemContext>(
config: &CollationGenerationConfig,
activated: &[Hash],
ctx: &mut Context,
) -> Result<()> {
// follow the procedure from the guide:
// https://w3f.github.io/parachain-implementers-guide/node/collators/collation-generation.html

for relay_parent in activated.iter().copied() {
let global_validation_data = request_global_validation_data_ctx(relay_parent, ctx)
.await?
.await??;

let availability_cores = request_availability_cores_ctx(relay_parent, ctx)
.await?
.await??;

for core in availability_cores {
coriolinus marked this conversation as resolved.
Show resolved Hide resolved
let (scheduled_core, assumption) = match core {
CoreState::Scheduled(scheduled_core) => {
(scheduled_core, OccupiedCoreAssumption::Free)
}
CoreState::Occupied(_occupied_core) => {
// TODO: https://github.com/paritytech/polkadot/issues/1573
continue
}
_ => continue,
};

let local_validation_data = match request_local_validation_data_ctx(
relay_parent,
scheduled_core.para_id,
assumption,
ctx,
)
.await?
.await??
{
Some(local_validation_data) => local_validation_data,
None => continue,
};

let validation_data_hash =
validation_data_hash(&global_validation_data, &local_validation_data);

let collation = (config.collator)(&global_validation_data, &local_validation_data).await;
coriolinus marked this conversation as resolved.
Show resolved Hide resolved

let pov_hash = collation.proof_of_validity.hash();

let ccr = CommittedCandidateReceipt {
commitments: CandidateCommitments {
upward_messages: collation.upward_messages,
head_data: collation.head_data,
..Default::default()
},
descriptor: CandidateDescriptor {
signature: config.key.sign(&collator_signature_payload(
&relay_parent,
&scheduled_core.para_id,
&validation_data_hash,
&pov_hash,
)),
para_id: scheduled_core.para_id,
relay_parent,
collator: config.key.public(),
validation_data_hash,
pov_hash,
},
};

ctx.send_message(AllMessages::CollatorProtocol(
CollatorProtocolMessage::DistributeCollation(ccr, collation.proof_of_validity),
)).await?;
}
}

Ok(())
}
2 changes: 2 additions & 0 deletions node/overseer/examples/minimal-example.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,8 @@ fn main() {
availability_store: DummySubsystem,
network_bridge: DummySubsystem,
chain_api: DummySubsystem,
collation_generation: DummySubsystem,
collator_protocol: DummySubsystem,
};
let (overseer, _handler) = Overseer::new(
vec![],
Expand Down
Loading