diff --git a/Cargo.lock b/Cargo.lock index 2a08136e0382..ee2cbd230412 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6791,6 +6791,7 @@ dependencies = [ "kvdb", "kvdb-rocksdb", "log", + "lru", "pallet-babe", "pallet-im-online", "pallet-mmr-primitives", diff --git a/doc/testing.md b/doc/testing.md index 8230ea352c0f..6709d1b9bdeb 100644 --- a/doc/testing.md +++ b/doc/testing.md @@ -185,6 +185,7 @@ struct BehaveMaleficient; impl OverseerGen for BehaveMaleficient { fn generate<'a, Spawner, RuntimeClient>( &self, + connector: OverseerConnector, args: OverseerGenArgs<'a, Spawner, RuntimeClient>, ) -> Result<(Overseer>, OverseerHandler), Error> where @@ -213,7 +214,7 @@ impl OverseerGen for BehaveMaleficient { ), ); - Overseer::new(leaves, all_subsystems, registry, runtime_client, spawner) + Overseer::new(leaves, all_subsystems, registry, runtime_client, spawner, connector) .map_err(|e| e.into()) // A builder pattern will simplify this further diff --git a/node/malus/src/tests.rs b/node/malus/src/tests.rs index 00c39dd7b2b5..57072336852b 100644 --- a/node/malus/src/tests.rs +++ b/node/malus/src/tests.rs @@ -20,8 +20,8 @@ use polkadot_node_subsystem_test_helpers::*; use polkadot_node_subsystem::{ messages::{AllMessages, AvailabilityStoreMessage}, - overseer::{gen::TimeoutExt, Subsystem}, - DummySubsystem, + overseer::{dummy::DummySubsystem, gen::TimeoutExt, Subsystem}, + SubsystemError, }; #[derive(Clone, Debug)] @@ -48,34 +48,38 @@ where } } +#[derive(Clone, Debug)] +struct PassInterceptor; + +impl MessageInterceptor for PassInterceptor +where + Sender: overseer::SubsystemSender + + overseer::SubsystemSender + + Clone + + 'static, +{ + type Message = AvailabilityStoreMessage; +} + async fn overseer_send>(overseer: &mut TestSubsystemContextHandle, msg: T) { overseer.send(FromOverseer::Communication { msg }).await; } -#[test] -fn integrity_test() { +fn launch_harness(test_gen: G) +where + F: Future> + Send, + M: Into + std::fmt::Debug + Send + 'static, + AllMessages: From, + Sub: Subsystem, SubsystemError>, + G: Fn(TestSubsystemContextHandle) -> (F, Sub), +{ let pool = sp_core::testing::TaskExecutor::new(); - let (context, mut overseer) = make_subsystem_context(pool); - - let sub = DummySubsystem; - - let sub_intercepted = InterceptedSubsystem::new(sub, BlackHoleInterceptor); + let (context, overseer) = make_subsystem_context(pool); - // Try to send a message we know is going to be filtered. - let test_fut = async move { - let (tx, rx) = futures::channel::oneshot::channel(); - overseer_send( - &mut overseer, - AvailabilityStoreMessage::QueryChunk(Default::default(), 0.into(), tx), - ) - .await; - let _ = rx.timeout(std::time::Duration::from_millis(100)).await.unwrap(); - overseer - }; + let (test_fut, subsystem) = test_gen(overseer); let subsystem = async move { - sub_intercepted.start(context).future.await.unwrap(); + subsystem.start(context).future.await.unwrap(); }; - futures::pin_mut!(test_fut); futures::pin_mut!(subsystem); @@ -88,3 +92,49 @@ fn integrity_test() { )) .1; } + +#[test] +fn integrity_test_intercept() { + launch_harness(|mut overseer| { + let sub = DummySubsystem; + + let sub_intercepted = InterceptedSubsystem::new(sub, BlackHoleInterceptor); + + ( + async move { + let (tx, rx) = futures::channel::oneshot::channel(); + overseer_send( + &mut overseer, + AvailabilityStoreMessage::QueryChunk(Default::default(), 0.into(), tx), + ) + .await; + let _ = rx.timeout(std::time::Duration::from_millis(100)).await.unwrap(); + overseer + }, + sub_intercepted, + ) + }) +} + +#[test] +fn integrity_test_pass() { + launch_harness(|mut overseer| { + let sub = DummySubsystem; + + let sub_intercepted = InterceptedSubsystem::new(sub, PassInterceptor); + + ( + async move { + let (tx, rx) = futures::channel::oneshot::channel(); + overseer_send( + &mut overseer, + AvailabilityStoreMessage::QueryChunk(Default::default(), 0.into(), tx), + ) + .await; + let _ = rx.timeout(std::time::Duration::from_millis(100)).await.unwrap(); + overseer + }, + sub_intercepted, + ) + }) +} diff --git a/node/malus/src/variant-a.rs b/node/malus/src/variant-a.rs index a545bd360faf..8abdf75198b3 100644 --- a/node/malus/src/variant-a.rs +++ b/node/malus/src/variant-a.rs @@ -37,7 +37,7 @@ use polkadot_cli::{ use polkadot_node_core_candidate_validation::CandidateValidationSubsystem; use polkadot_node_subsystem::{ messages::{AllMessages, CandidateValidationMessage}, - overseer::{self, OverseerHandle}, + overseer::{self, OverseerConnector, OverseerHandle}, FromOverseer, }; @@ -86,6 +86,7 @@ struct BehaveMaleficient; impl OverseerGen for BehaveMaleficient { fn generate<'a, Spawner, RuntimeClient>( &self, + connector: OverseerConnector, args: OverseerGenArgs<'a, Spawner, RuntimeClient>, ) -> Result<(Overseer>, OverseerHandle), Error> where @@ -113,7 +114,7 @@ impl OverseerGen for BehaveMaleficient { }, ); - Overseer::new(leaves, all_subsystems, registry, runtime_client, spawner) + Overseer::new(leaves, all_subsystems, registry, runtime_client, spawner, connector) .map_err(|e| e.into()) } } diff --git a/node/overseer/examples/minimal-example.rs b/node/overseer/examples/minimal-example.rs index 6970054a3013..0c4a3bdc6ae6 100644 --- a/node/overseer/examples/minimal-example.rs +++ b/node/overseer/examples/minimal-example.rs @@ -29,7 +29,8 @@ use polkadot_node_subsystem_types::messages::{ use polkadot_overseer::{ self as overseer, gen::{FromOverseer, SpawnedSubsystem}, - AllMessages, AllSubsystems, HeadSupportsParachains, Overseer, OverseerSignal, SubsystemError, + AllMessages, AllSubsystems, HeadSupportsParachains, Overseer, OverseerConnector, + OverseerSignal, SubsystemError, }; use polkadot_primitives::v1::Hash; @@ -173,8 +174,15 @@ fn main() { .replace_candidate_validation(|_| Subsystem2) .replace_candidate_backing(|orig| orig); - let (overseer, _handle) = - Overseer::new(vec![], all_subsystems, None, AlwaysSupportsParachains, spawner).unwrap(); + let (overseer, _handle) = Overseer::new( + vec![], + all_subsystems, + None, + AlwaysSupportsParachains, + spawner, + OverseerConnector::default(), + ) + .unwrap(); let overseer_fut = overseer.run().fuse(); let timer_stream = timer_stream; diff --git a/node/overseer/overseer-gen/proc-macro/src/impl_builder.rs b/node/overseer/overseer-gen/proc-macro/src/impl_builder.rs index 832e193fd4d1..4714a1f3d495 100644 --- a/node/overseer/overseer-gen/proc-macro/src/impl_builder.rs +++ b/node/overseer/overseer-gen/proc-macro/src/impl_builder.rs @@ -130,9 +130,13 @@ pub(crate) fn impl_builder(info: &OverseerInfo) -> proc_macro2::TokenStream { &mut self.handle } /// Obtain access to the overseer handle. - pub fn as_handle(&mut self) -> &#handle { + pub fn as_handle(&self) -> &#handle { &self.handle } + /// Obtain a clone of the handle. + pub fn handle(&self) -> #handle { + self.handle.clone() + } } impl ::std::default::Default for #connector { diff --git a/node/overseer/src/dummy.rs b/node/overseer/src/dummy.rs new file mode 100644 index 000000000000..c3d3d4fcf3c8 --- /dev/null +++ b/node/overseer/src/dummy.rs @@ -0,0 +1,54 @@ +// Copyright 2020 Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see . + +use crate::{AllMessages, OverseerSignal}; +use polkadot_node_subsystem_types::errors::SubsystemError; +use polkadot_overseer_gen::{FromOverseer, SpawnedSubsystem, Subsystem, SubsystemContext}; + +/// A dummy subsystem that implements [`Subsystem`] for all +/// types of messages. Used for tests or as a placeholder. +#[derive(Clone, Copy, Debug)] +pub struct DummySubsystem; + +impl Subsystem for DummySubsystem +where + Context: SubsystemContext< + Signal = OverseerSignal, + Error = SubsystemError, + AllMessages = AllMessages, + >, +{ + fn start(self, mut ctx: Context) -> SpawnedSubsystem { + let future = Box::pin(async move { + loop { + match ctx.recv().await { + Err(_) => return Ok(()), + Ok(FromOverseer::Signal(OverseerSignal::Conclude)) => return Ok(()), + Ok(overseer_msg) => { + tracing::debug!( + target: "dummy-subsystem", + "Discarding a message sent from overseer {:?}", + overseer_msg + ); + continue + }, + } + } + }); + + SpawnedSubsystem { name: "dummy-subsystem", future } + } +} diff --git a/node/overseer/src/lib.rs b/node/overseer/src/lib.rs index 2fb9ffd359b2..4c69d2868412 100644 --- a/node/overseer/src/lib.rs +++ b/node/overseer/src/lib.rs @@ -70,7 +70,6 @@ use std::{ use futures::{channel::oneshot, future::BoxFuture, select, Future, FutureExt, StreamExt}; use lru::LruCache; -use parking_lot::RwLock; use client::{BlockImportNotification, BlockchainEvents, FinalityNotification}; use polkadot_primitives::v1::{Block, BlockId, BlockNumber, Hash, ParachainHost}; @@ -91,15 +90,18 @@ pub use polkadot_node_subsystem_types::{ jaeger, ActivatedLeaf, ActiveLeavesUpdate, LeafStatus, OverseerSignal, }; +/// Test helper supplements. +pub mod dummy; +pub use self::dummy::DummySubsystem; + // TODO legacy, to be deleted, left for easier integration // TODO https://github.com/paritytech/polkadot/issues/3427 mod subsystems; -pub use self::subsystems::{AllSubsystems, DummySubsystem}; +pub use self::subsystems::AllSubsystems; -mod metrics; -use self::metrics::Metrics; +pub mod metrics; -use polkadot_node_metrics::{ +pub use polkadot_node_metrics::{ metrics::{prometheus, Metrics as MetricsTrait}, Metronome, }; @@ -115,7 +117,7 @@ pub use polkadot_overseer_gen::{ /// Store 2 days worth of blocks, not accounting for forks, /// in the LRU cache. Assumes a 6-second block time. -const KNOWN_LEAVES_CACHE_SIZE: usize = 2 * 24 * 3600 / 6; +pub const KNOWN_LEAVES_CACHE_SIZE: usize = 2 * 24 * 3600 / 6; #[cfg(test)] mod tests; @@ -141,18 +143,12 @@ where /// /// [`Overseer`]: struct.Overseer.html #[derive(Clone)] -pub enum Handle { - /// Used only at initialization to break the cyclic dependency. - // TODO: refactor in https://github.com/paritytech/polkadot/issues/3427 - Disconnected(Arc>>), - /// A handle to the overseer. - Connected(OverseerHandle), -} +pub struct Handle(OverseerHandle); impl Handle { - /// Create a new disconnected [`Handle`]. - pub fn new_disconnected() -> Self { - Self::Disconnected(Arc::new(RwLock::new(None))) + /// Create a new [`Handle`]. + pub fn new(raw: OverseerHandle) -> Self { + Self(raw) } /// Inform the `Overseer` that that some block was imported. @@ -201,58 +197,8 @@ impl Handle { /// Most basic operation, to stop a server. async fn send_and_log_error(&mut self, event: Event) { - self.try_connect(); - if let Self::Connected(ref mut handle) = self { - if handle.send(event).await.is_err() { - tracing::info!(target: LOG_TARGET, "Failed to send an event to Overseer"); - } - } else { - tracing::warn!(target: LOG_TARGET, "Using a disconnected Handle to send to Overseer"); - } - } - - /// Whether the handle is disconnected. - pub fn is_disconnected(&self) -> bool { - match self { - Self::Disconnected(ref x) => x.read().is_none(), - _ => false, - } - } - - /// Connect this handle and all disconnected clones of it to the overseer. - pub fn connect_to_overseer(&mut self, handle: OverseerHandle) { - match self { - Self::Disconnected(ref mut x) => { - let mut maybe_handle = x.write(); - if maybe_handle.is_none() { - tracing::info!(target: LOG_TARGET, "🖇️ Connecting all Handles to Overseer"); - *maybe_handle = Some(handle); - } else { - tracing::warn!( - target: LOG_TARGET, - "Attempting to connect a clone of a connected Handle", - ); - } - }, - _ => { - tracing::warn!( - target: LOG_TARGET, - "Attempting to connect an already connected Handle", - ); - }, - } - } - - /// Try upgrading from `Self::Disconnected` to `Self::Connected` state - /// after calling `connect_to_overseer` on `self` or a clone of `self`. - fn try_connect(&mut self) { - if let Self::Disconnected(ref mut x) = self { - let guard = x.write(); - if let Some(ref h) = *guard { - let handle = h.clone(); - drop(guard); - *self = Self::Connected(handle); - } + if self.0.send(event).await.is_err() { + tracing::info!(target: LOG_TARGET, "Failed to send an event to Overseer"); } } } @@ -439,7 +385,7 @@ pub struct Overseer { pub known_leaves: LruCache, /// Various Prometheus metrics. - pub metrics: Metrics, + pub metrics: crate::metrics::Metrics, } impl Overseer @@ -490,12 +436,13 @@ where /// # use polkadot_primitives::v1::Hash; /// # use polkadot_overseer::{ /// # self as overseer, + /// # Overseer, /// # OverseerSignal, + /// # OverseerConnector, /// # SubsystemSender as _, /// # AllMessages, /// # AllSubsystems, /// # HeadSupportsParachains, - /// # Overseer, /// # SubsystemError, /// # gen::{ /// # SubsystemContext, @@ -549,6 +496,7 @@ where /// None, /// AlwaysSupportsParachains, /// spawner, + /// OverseerConnector::default(), /// ).unwrap(); /// /// let timer = Delay::new(Duration::from_millis(50)).fuse(); @@ -615,6 +563,7 @@ where prometheus_registry: Option<&prometheus::Registry>, supports_parachains: SupportsParachains, s: S, + connector: OverseerConnector, ) -> SubsystemResult<(Self, OverseerHandle)> where CV: Subsystem, SubsystemError> + Send, @@ -643,7 +592,7 @@ where CS: Subsystem, SubsystemError> + Send, S: SpawnNamed, { - let metrics: Metrics = ::register(prometheus_registry)?; + let metrics = ::register(prometheus_registry)?; let (mut overseer, handle) = Self::builder() .candidate_validation(all_subsystems.candidate_validation) @@ -679,7 +628,7 @@ where .supports_parachains(supports_parachains) .metrics(metrics.clone()) .spawner(s) - .build()?; + .build_with_connector(connector)?; // spawn the metrics metronome task { diff --git a/node/overseer/src/metrics.rs b/node/overseer/src/metrics.rs index ab3b2a3bc14b..fa883857e9be 100644 --- a/node/overseer/src/metrics.rs +++ b/node/overseer/src/metrics.rs @@ -17,7 +17,7 @@ //! Prometheus metrics related to the overseer and its channels. use super::*; -use polkadot_node_metrics::metrics::{self, prometheus}; +pub use polkadot_node_metrics::metrics::{self, prometheus, Metrics as MetricsTrait}; use parity_util_mem::MemoryAllocationSnapshot; @@ -110,7 +110,7 @@ impl Metrics { } } -impl metrics::Metrics for Metrics { +impl MetricsTrait for Metrics { fn try_register(registry: &prometheus::Registry) -> Result { let metrics = MetricsInner { activated_heads_total: prometheus::register( diff --git a/node/overseer/src/subsystems.rs b/node/overseer/src/subsystems.rs index 648528730d67..695d368dc16c 100644 --- a/node/overseer/src/subsystems.rs +++ b/node/overseer/src/subsystems.rs @@ -19,47 +19,9 @@ //! In the future, everything should be set up using the generated //! overseer builder pattern instead. -use crate::{AllMessages, OverseerSignal}; -use polkadot_node_subsystem_types::errors::SubsystemError; +use crate::dummy::DummySubsystem; use polkadot_overseer_all_subsystems_gen::AllSubsystemsGen; -use polkadot_overseer_gen::{ - FromOverseer, MapSubsystem, SpawnedSubsystem, Subsystem, SubsystemContext, -}; - -/// A dummy subsystem that implements [`Subsystem`] for all -/// types of messages. Used for tests or as a placeholder. -#[derive(Clone, Copy, Debug)] -pub struct DummySubsystem; - -impl Subsystem for DummySubsystem -where - Context: SubsystemContext< - Signal = OverseerSignal, - Error = SubsystemError, - AllMessages = AllMessages, - >, -{ - fn start(self, mut ctx: Context) -> SpawnedSubsystem { - let future = Box::pin(async move { - loop { - match ctx.recv().await { - Err(_) => return Ok(()), - Ok(FromOverseer::Signal(OverseerSignal::Conclude)) => return Ok(()), - Ok(overseer_msg) => { - tracing::debug!( - target: "dummy-subsystem", - "Discarding a message sent from overseer {:?}", - overseer_msg - ); - continue - }, - } - } - }); - - SpawnedSubsystem { name: "dummy-subsystem", future } - } -} +use polkadot_overseer_gen::MapSubsystem; /// This struct is passed as an argument to create a new instance of an [`Overseer`]. /// diff --git a/node/overseer/src/tests.rs b/node/overseer/src/tests.rs index 7564116e7141..e8edab17679e 100644 --- a/node/overseer/src/tests.rs +++ b/node/overseer/src/tests.rs @@ -32,7 +32,7 @@ use polkadot_primitives::v1::{ ValidatorIndex, }; -use crate::{self as overseer, gen::Delay, HeadSupportsParachains, Overseer}; +use crate::{self as overseer, gen::Delay, HeadSupportsParachains, Overseer, OverseerConnector}; use metered_channel as metered; use assert_matches::assert_matches; @@ -164,9 +164,16 @@ fn overseer_works() { .replace_candidate_validation(move |_| TestSubsystem1(s1_tx)) .replace_candidate_backing(move |_| TestSubsystem2(s2_tx)); - let (overseer, handle) = - Overseer::new(vec![], all_subsystems, None, MockSupportsParachains, spawner).unwrap(); - let mut handle = Handle::Connected(handle); + let (overseer, handle) = Overseer::new( + vec![], + all_subsystems, + None, + MockSupportsParachains, + spawner, + OverseerConnector::default(), + ) + .unwrap(); + let mut handle = Handle::new(handle); let overseer_fut = overseer.run().fuse(); pin_mut!(overseer_fut); @@ -227,9 +234,10 @@ fn overseer_metrics_work() { Some(®istry), MockSupportsParachains, spawner, + OverseerConnector::default(), ) .unwrap(); - let mut handle = Handle::Connected(handle); + let mut handle = Handle::new(handle); let overseer_fut = overseer.run().fuse(); pin_mut!(overseer_fut); @@ -280,8 +288,15 @@ fn overseer_ends_on_subsystem_exit() { executor::block_on(async move { let all_subsystems = AllSubsystems::<()>::dummy().replace_candidate_backing(|_| ReturnOnStart); - let (overseer, _handle) = - Overseer::new(vec![], all_subsystems, None, MockSupportsParachains, spawner).unwrap(); + let (overseer, _handle) = Overseer::new( + vec![], + all_subsystems, + None, + MockSupportsParachains, + spawner, + OverseerConnector::default(), + ) + .unwrap(); overseer.run().await.unwrap(); }) @@ -382,10 +397,16 @@ fn overseer_start_stop_works() { let all_subsystems = AllSubsystems::<()>::dummy() .replace_candidate_validation(move |_| TestSubsystem5(tx_5)) .replace_candidate_backing(move |_| TestSubsystem6(tx_6)); - let (overseer, handle) = - Overseer::new(vec![first_block], all_subsystems, None, MockSupportsParachains, spawner) - .unwrap(); - let mut handle = Handle::Connected(handle); + let (overseer, handle) = Overseer::new( + vec![first_block], + all_subsystems, + None, + MockSupportsParachains, + spawner, + OverseerConnector::default(), + ) + .unwrap(); + let mut handle = Handle::new(handle); let overseer_fut = overseer.run().fuse(); pin_mut!(overseer_fut); @@ -486,9 +507,10 @@ fn overseer_finalize_works() { None, MockSupportsParachains, spawner, + OverseerConnector::default(), ) .unwrap(); - let mut handle = Handle::Connected(handle); + let mut handle = Handle::new(handle); let overseer_fut = overseer.run().fuse(); pin_mut!(overseer_fut); @@ -573,10 +595,16 @@ fn do_not_send_empty_leaves_update_on_block_finalization() { let all_subsystems = AllSubsystems::<()>::dummy().replace_candidate_backing(move |_| TestSubsystem6(tx_5)); - let (overseer, handle) = - Overseer::new(Vec::new(), all_subsystems, None, MockSupportsParachains, spawner) - .unwrap(); - let mut handle = Handle::Connected(handle); + let (overseer, handle) = Overseer::new( + Vec::new(), + all_subsystems, + None, + MockSupportsParachains, + spawner, + OverseerConnector::default(), + ) + .unwrap(); + let mut handle = Handle::new(handle); let overseer_fut = overseer.run().fuse(); pin_mut!(overseer_fut); @@ -849,9 +877,17 @@ fn overseer_all_subsystems_receive_signals_and_messages() { dispute_distribution: subsystem.clone(), chain_selection: subsystem.clone(), }; - let (overseer, handle) = - Overseer::new(vec![], all_subsystems, None, MockSupportsParachains, spawner).unwrap(); - let mut handle = Handle::Connected(handle); + + let (overseer, handle) = Overseer::new( + vec![], + all_subsystems, + None, + MockSupportsParachains, + spawner, + OverseerConnector::default(), + ) + .unwrap(); + let mut handle = Handle::new(handle); let overseer_fut = overseer.run().fuse(); pin_mut!(overseer_fut); diff --git a/node/service/Cargo.toml b/node/service/Cargo.toml index affd8b25b33b..f457b8f5742c 100644 --- a/node/service/Cargo.toml +++ b/node/service/Cargo.toml @@ -68,6 +68,7 @@ thiserror = "1.0.26" kvdb = "0.10.0" kvdb-rocksdb = { version = "0.14.0", optional = true } async-trait = "0.1.51" +lru = "0.6" # Polkadot polkadot-node-core-parachains-inherent = { path = "../core/parachains-inherent" } diff --git a/node/service/src/lib.rs b/node/service/src/lib.rs index bcb181fa04b8..d4c8cdc16a7c 100644 --- a/node/service/src/lib.rs +++ b/node/service/src/lib.rs @@ -54,7 +54,7 @@ use { pub use sp_core::traits::SpawnNamed; #[cfg(feature = "full-node")] pub use { - polkadot_overseer::{Handle, Overseer, OverseerHandle}, + polkadot_overseer::{Handle, Overseer, OverseerConnector, OverseerHandle}, polkadot_primitives::v1::ParachainHost, sc_client_api::AuxStore, sp_authority_discovery::AuthorityDiscoveryApi, @@ -68,6 +68,8 @@ use polkadot_subsystem::jaeger; use std::{sync::Arc, time::Duration}; use prometheus_endpoint::Registry; +#[cfg(feature = "full-node")] +use service::KeystoreContainer; use service::RpcHandlers; use telemetry::TelemetryWorker; #[cfg(feature = "full-node")] @@ -302,14 +304,15 @@ fn jaeger_launch_collector_with_agent( } #[cfg(feature = "full-node")] -type FullSelectChain = relay_chain_selection::SelectRelayChainWithFallback; +type FullSelectChain = relay_chain_selection::SelectRelayChain; #[cfg(feature = "full-node")] -type FullGrandpaBlockImport = grandpa::GrandpaBlockImport< - FullBackend, - Block, - FullClient, - FullSelectChain, ->; +type FullGrandpaBlockImport = + grandpa::GrandpaBlockImport< + FullBackend, + Block, + FullClient, + ChainSelection, + >; #[cfg(feature = "light-node")] type LightBackend = service::TLightBackendWithHash; @@ -319,36 +322,29 @@ type LightClient = service::TLightClientWithBackend; #[cfg(feature = "full-node")] -fn new_partial( +struct Basics +where + RuntimeApi: ConstructRuntimeApi> + + Send + + Sync + + 'static, + RuntimeApi::RuntimeApi: + RuntimeApiCollection>, + ExecutorDispatch: NativeExecutionDispatch + 'static, +{ + task_manager: TaskManager, + client: Arc>, + backend: Arc, + keystore_container: KeystoreContainer, + telemetry: Option, +} + +#[cfg(feature = "full-node")] +fn new_partial_basics( config: &mut Configuration, jaeger_agent: Option, telemetry_worker_handle: Option, -) -> Result< - service::PartialComponents< - FullClient, - FullBackend, - FullSelectChain, - sc_consensus::DefaultImportQueue>, - sc_transaction_pool::FullPool>, - ( - impl service::RpcExtensionBuilder, - ( - babe::BabeBlockImport< - Block, - FullClient, - FullGrandpaBlockImport, - >, - grandpa::LinkHalf, FullSelectChain>, - babe::BabeLink, - beefy_gadget::notification::BeefySignedCommitmentSender, - ), - grandpa::SharedVoterState, - std::time::Duration, // slot-duration - Option, - ), - >, - Error, -> +) -> Result, Error> where RuntimeApi: ConstructRuntimeApi> + Send @@ -400,12 +396,53 @@ where jaeger_launch_collector_with_agent(task_manager.spawn_handle(), &*config, jaeger_agent)?; - let select_chain = relay_chain_selection::SelectRelayChainWithFallback::new( - backend.clone(), - Handle::new_disconnected(), - polkadot_node_subsystem_util::metrics::Metrics::register(config.prometheus_registry())?, - ); + Ok(Basics { task_manager, client, backend, keystore_container, telemetry }) +} +#[cfg(feature = "full-node")] +fn new_partial( + config: &mut Configuration, + Basics { task_manager, backend, client, keystore_container, telemetry }: Basics< + RuntimeApi, + ExecutorDispatch, + >, + select_chain: ChainSelection, +) -> Result< + service::PartialComponents< + FullClient, + FullBackend, + ChainSelection, + sc_consensus::DefaultImportQueue>, + sc_transaction_pool::FullPool>, + ( + impl service::RpcExtensionBuilder, + ( + babe::BabeBlockImport< + Block, + FullClient, + FullGrandpaBlockImport, + >, + grandpa::LinkHalf, ChainSelection>, + babe::BabeLink, + beefy_gadget::notification::BeefySignedCommitmentSender, + ), + grandpa::SharedVoterState, + std::time::Duration, // slot-duration + Option, + ), + >, + Error, +> +where + RuntimeApi: ConstructRuntimeApi> + + Send + + Sync + + 'static, + RuntimeApi::RuntimeApi: + RuntimeApiCollection>, + ExecutorDispatch: NativeExecutionDispatch + 'static, + ChainSelection: 'static + SelectChain, +{ let transaction_pool = sc_transaction_pool::BasicPool::new_full( config.transaction_pool.clone(), config.role.is_authority().into(), @@ -674,23 +711,54 @@ where let disable_grandpa = config.disable_grandpa; let name = config.network.node_name.clone(); - let service::PartialComponents { + let basics = new_partial_basics::( + &mut config, + jaeger_agent, + telemetry_worker_handle, + )?; + + let prometheus_registry = config.prometheus_registry().cloned(); + + let overseer_connector = OverseerConnector::default(); + let overseer_handle = Handle::new(overseer_connector.handle()); + + let chain_spec = config.chain_spec.cloned_box(); + + // we should remove this check before we deploy parachains on polkadot + // TODO: https://github.com/paritytech/polkadot/issues/3326 + let is_relay_chain = chain_spec.is_kusama() || + chain_spec.is_westend() || + chain_spec.is_rococo() || + chain_spec.is_wococo(); + + let local_keystore = basics.keystore_container.local_keystore(); + let requires_overseer_for_chain_sel = local_keystore.is_some() && + is_relay_chain && + (role.is_authority() || is_collator.is_collator()); + + use relay_chain_selection::SelectRelayChain; + + let select_chain = SelectRelayChain::new( + basics.backend.clone(), + overseer_handle.clone(), + requires_overseer_for_chain_sel, + polkadot_node_subsystem_util::metrics::Metrics::register(prometheus_registry.as_ref())?, + ); + let service::PartialComponents::<_, _, SelectRelayChain<_>, _, _, _> { client, backend, mut task_manager, keystore_container, - mut select_chain, + select_chain, import_queue, transaction_pool, other: (rpc_extensions_builder, import_setup, rpc_setup, slot_duration, mut telemetry), - } = new_partial::( + } = new_partial::>( &mut config, - jaeger_agent, - telemetry_worker_handle, + basics, + select_chain, )?; - let prometheus_registry = config.prometheus_registry().cloned(); - let shared_voter_state = rpc_setup; let auth_disc_publish_non_global_ips = config.network.allow_non_globals_in_dht; @@ -699,7 +767,7 @@ where // Substrate nodes. config.network.extra_sets.push(grandpa::grandpa_peers_set_config()); - if config.chain_spec.is_rococo() || config.chain_spec.is_wococo() { + if chain_spec.is_rococo() || chain_spec.is_wococo() { config.network.extra_sets.push(beefy_gadget::beefy_peers_set_config()); } @@ -784,7 +852,6 @@ where col_data: crate::parachains_db::REAL_COLUMNS.col_dispute_coordinator_data, }; - let chain_spec = config.chain_spec.cloned_box(); let rpc_handlers = service::spawn_tasks(service::SpawnTasksParams { config, backend: backend.clone(), @@ -804,7 +871,10 @@ where let overseer_client = client.clone(); let spawner = task_manager.spawn_handle(); - let active_leaves = futures::executor::block_on(active_leaves(&select_chain, &*client))?; + // Cannot use the `RelayChainSelection`, since that'd require a setup _and running_ overseer + // which we are about to setup. + let active_leaves = + futures::executor::block_on(active_leaves(select_chain.as_longest_chain(), &*client))?; let authority_discovery_service = if role.is_authority() || is_collator.is_collator() { use futures::StreamExt; @@ -841,7 +911,6 @@ where None }; - let local_keystore = keystore_container.local_keystore(); if local_keystore.is_none() { tracing::info!("Cannot run as validator without local keystore."); } @@ -852,6 +921,7 @@ where let overseer_handle = if let Some((authority_discovery_service, keystore)) = maybe_params { let (overseer, overseer_handle) = overseer_gen .generate::>( + overseer_connector, OverseerGenArgs { leaves: active_leaves, keystore, @@ -875,43 +945,37 @@ where dispute_coordinator_config, }, )?; - let handle = Handle::Connected(overseer_handle.clone()); - let handle_clone = handle.clone(); - - task_manager.spawn_essential_handle().spawn_blocking( - "overseer", - Box::pin(async move { - use futures::{pin_mut, select, FutureExt}; - - let forward = polkadot_overseer::forward_events(overseer_client, handle_clone); - - let forward = forward.fuse(); - let overseer_fut = overseer.run().fuse(); - - pin_mut!(overseer_fut); - pin_mut!(forward); - - select! { - _ = forward => (), - _ = overseer_fut => (), - complete => (), - } - }), - ); - // we should remove this check before we deploy parachains on polkadot - // TODO: https://github.com/paritytech/polkadot/issues/3326 - let should_connect_overseer = chain_spec.is_kusama() || - chain_spec.is_westend() || - chain_spec.is_rococo() || - chain_spec.is_wococo(); - - if should_connect_overseer { - select_chain.connect_to_overseer(overseer_handle.clone()); - } else { - tracing::info!("Overseer is running in the disconnected state"); + let handle = Handle::new(overseer_handle.clone()); + + { + let handle = handle.clone(); + task_manager.spawn_essential_handle().spawn_blocking( + "overseer", + Box::pin(async move { + use futures::{pin_mut, select, FutureExt}; + + let forward = polkadot_overseer::forward_events(overseer_client, handle); + + let forward = forward.fuse(); + let overseer_fut = overseer.run().fuse(); + + pin_mut!(overseer_fut); + pin_mut!(forward); + + select! { + _ = forward => (), + _ = overseer_fut => (), + complete => (), + } + }), + ); } Some(handle) } else { + assert!( + !requires_overseer_for_chain_sel, + "Precondition congruence (false) is guaranteed by manual checking. qed" + ); None }; @@ -1228,6 +1292,31 @@ where Ok((task_manager, rpc_handlers)) } +macro_rules! chain_ops { + ($config:expr, $jaeger_agent:expr, $telemetry_worker_handle:expr; $scope:ident, $executor:ident, $variant:ident) => {{ + let telemetry_worker_handle = $telemetry_worker_handle; + let jaeger_agent = $jaeger_agent; + let mut config = $config; + let basics = new_partial_basics::<$scope::RuntimeApi, $executor>( + config, + jaeger_agent, + telemetry_worker_handle, + )?; + + use ::sc_consensus::LongestChain; + // use the longest chain selection, since there is no overseer available + let chain_selection = LongestChain::new(basics.backend.clone()); + + let service::PartialComponents { client, backend, import_queue, task_manager, .. } = + new_partial::<$scope::RuntimeApi, $executor, LongestChain<_, Block>>( + &mut config, + basics, + chain_selection, + )?; + Ok((Arc::new(Client::$variant(client)), backend, import_queue, task_manager)) + }}; +} + /// Builds a new object suitable for chain operations. #[cfg(feature = "full-node")] pub fn new_chain_ops( @@ -1244,48 +1333,26 @@ pub fn new_chain_ops( > { config.keystore = service::config::KeystoreConfig::InMemory; + let telemetry_worker_handle = None; + #[cfg(feature = "rococo-native")] if config.chain_spec.is_rococo() || config.chain_spec.is_wococo() { - let service::PartialComponents { client, backend, import_queue, task_manager, .. } = - new_partial::( - config, - jaeger_agent, - None, - )?; - return Ok((Arc::new(Client::Rococo(client)), backend, import_queue, task_manager)) + return chain_ops!(config, jaeger_agent, telemetry_worker_handle; rococo_runtime, RococoExecutorDispatch, Rococo) } #[cfg(feature = "kusama-native")] if config.chain_spec.is_kusama() { - let service::PartialComponents { client, backend, import_queue, task_manager, .. } = - new_partial::( - config, - jaeger_agent, - None, - )?; - return Ok((Arc::new(Client::Kusama(client)), backend, import_queue, task_manager)) + return chain_ops!(config, jaeger_agent, telemetry_worker_handle; kusama_runtime, KusamaExecutorDispatch, Kusama) } #[cfg(feature = "westend-native")] if config.chain_spec.is_westend() { - let service::PartialComponents { client, backend, import_queue, task_manager, .. } = - new_partial::( - config, - jaeger_agent, - None, - )?; - return Ok((Arc::new(Client::Westend(client)), backend, import_queue, task_manager)) + return chain_ops!(config, jaeger_agent, telemetry_worker_handle; westend_runtime, WestendExecutorDispatch, Westend) } #[cfg(feature = "polkadot-native")] { - let service::PartialComponents { client, backend, import_queue, task_manager, .. } = - new_partial::( - config, - jaeger_agent, - None, - )?; - return Ok((Arc::new(Client::Polkadot(client)), backend, import_queue, task_manager)) + return chain_ops!(config, jaeger_agent, telemetry_worker_handle; polkadot_runtime, PolkadotExecutorDispatch, Polkadot) } #[cfg(not(feature = "polkadot-native"))] diff --git a/node/service/src/overseer.rs b/node/service/src/overseer.rs index 66156fd18298..222c45d4da16 100644 --- a/node/service/src/overseer.rs +++ b/node/service/src/overseer.rs @@ -15,6 +15,7 @@ // along with Polkadot. If not, see . use super::{AuthorityDiscoveryApi, Block, Error, Hash, IsCollator, Registry, SpawnNamed}; +use lru::LruCache; use polkadot_availability_distribution::IncomingRequestReceivers; use polkadot_node_core_approval_voting::Config as ApprovalVotingConfig; use polkadot_node_core_av_store::Config as AvailabilityConfig; @@ -22,7 +23,13 @@ use polkadot_node_core_candidate_validation::Config as CandidateValidationConfig use polkadot_node_core_chain_selection::Config as ChainSelectionConfig; use polkadot_node_core_dispute_coordinator::Config as DisputeCoordinatorConfig; use polkadot_node_network_protocol::request_response::{v1 as request_v1, IncomingRequestReceiver}; -use polkadot_overseer::{AllSubsystems, BlockInfo, Overseer, OverseerHandle}; +#[cfg(any(feature = "malus", test))] +pub use polkadot_overseer::dummy::DummySubsystem; +pub use polkadot_overseer::{ + metrics::Metrics, AllSubsystems, BlockInfo, HeadSupportsParachains, MetricsTrait, Overseer, + OverseerBuilder, OverseerConnector, OverseerHandle, +}; + use polkadot_primitives::v1::ParachainHost; use sc_authority_discovery::Service as AuthorityDiscoveryService; use sc_client_api::AuxStore; @@ -255,6 +262,176 @@ where Ok(all_subsystems) } +/// Obtain a prepared `OverseerBuilder`, that is initialized +/// with all default values. +pub fn prepared_overseer_builder<'a, Spawner, RuntimeClient>( + OverseerGenArgs { + leaves, + keystore, + runtime_client, + parachains_db, + network_service, + authority_discovery_service, + pov_req_receiver, + chunk_req_receiver, + collation_req_receiver, + available_data_req_receiver, + statement_req_receiver, + dispute_req_receiver, + registry, + spawner, + is_collator, + approval_voting_config, + availability_config, + candidate_validation_config, + chain_selection_config, + dispute_coordinator_config, + }: OverseerGenArgs<'a, Spawner, RuntimeClient>, +) -> Result< + OverseerBuilder< + Spawner, + Arc, + CandidateValidationSubsystem, + CandidateBackingSubsystem, + StatementDistributionSubsystem, + AvailabilityDistributionSubsystem, + AvailabilityRecoverySubsystem, + BitfieldSigningSubsystem, + BitfieldDistributionSubsystem, + ProvisionerSubsystem, + RuntimeApiSubsystem, + AvailabilityStoreSubsystem, + NetworkBridgeSubsystem< + Arc>, + AuthorityDiscoveryService, + >, + ChainApiSubsystem, + CollationGenerationSubsystem, + CollatorProtocolSubsystem, + ApprovalDistributionSubsystem, + ApprovalVotingSubsystem, + GossipSupportSubsystem, + DisputeCoordinatorSubsystem, + DisputeParticipationSubsystem, + DisputeDistributionSubsystem, + ChainSelectionSubsystem, + >, + Error, +> +where + RuntimeClient: 'static + ProvideRuntimeApi + HeaderBackend + AuxStore, + RuntimeClient::Api: ParachainHost + BabeApi + AuthorityDiscoveryApi, + Spawner: 'static + SpawnNamed + Clone + Unpin, +{ + use polkadot_node_subsystem_util::metrics::Metrics; + use std::iter::FromIterator; + + let metrics = ::register(registry)?; + + let builder = Overseer::builder() + .availability_distribution(AvailabilityDistributionSubsystem::new( + keystore.clone(), + IncomingRequestReceivers { pov_req_receiver, chunk_req_receiver }, + Metrics::register(registry)?, + )) + .availability_recovery(AvailabilityRecoverySubsystem::with_chunks_only( + available_data_req_receiver, + Metrics::register(registry)?, + )) + .availability_store(AvailabilityStoreSubsystem::new( + parachains_db.clone(), + availability_config, + Metrics::register(registry)?, + )) + .bitfield_distribution(BitfieldDistributionSubsystem::new(Metrics::register(registry)?)) + .bitfield_signing(BitfieldSigningSubsystem::new( + spawner.clone(), + keystore.clone(), + Metrics::register(registry)?, + )) + .candidate_backing(CandidateBackingSubsystem::new( + spawner.clone(), + keystore.clone(), + Metrics::register(registry)?, + )) + .candidate_validation(CandidateValidationSubsystem::with_config( + candidate_validation_config, + Metrics::register(registry)?, // candidate-validation metrics + Metrics::register(registry)?, // validation host metrics + )) + .chain_api(ChainApiSubsystem::new(runtime_client.clone(), Metrics::register(registry)?)) + .collation_generation(CollationGenerationSubsystem::new(Metrics::register(registry)?)) + .collator_protocol({ + let side = match is_collator { + IsCollator::Yes(collator_pair) => ProtocolSide::Collator( + network_service.local_peer_id().clone(), + collator_pair, + collation_req_receiver, + Metrics::register(registry)?, + ), + IsCollator::No => ProtocolSide::Validator { + keystore: keystore.clone(), + eviction_policy: Default::default(), + metrics: Metrics::register(registry)?, + }, + }; + CollatorProtocolSubsystem::new(side) + }) + .network_bridge(NetworkBridgeSubsystem::new( + network_service.clone(), + authority_discovery_service.clone(), + Box::new(network_service.clone()), + Metrics::register(registry)?, + )) + .provisioner(ProvisionerSubsystem::new(spawner.clone(), (), Metrics::register(registry)?)) + .runtime_api(RuntimeApiSubsystem::new( + runtime_client.clone(), + Metrics::register(registry)?, + spawner.clone(), + )) + .statement_distribution(StatementDistributionSubsystem::new( + keystore.clone(), + statement_req_receiver, + Metrics::register(registry)?, + )) + .approval_distribution(ApprovalDistributionSubsystem::new(Metrics::register(registry)?)) + .approval_voting(ApprovalVotingSubsystem::with_config( + approval_voting_config, + parachains_db.clone(), + keystore.clone(), + Box::new(network_service.clone()), + Metrics::register(registry)?, + )) + .gossip_support(GossipSupportSubsystem::new(keystore.clone())) + .dispute_coordinator(DisputeCoordinatorSubsystem::new( + parachains_db.clone(), + dispute_coordinator_config, + keystore.clone(), + Metrics::register(registry)?, + )) + .dispute_participation(DisputeParticipationSubsystem::new()) + .dispute_distribution(DisputeDistributionSubsystem::new( + keystore.clone(), + dispute_req_receiver, + authority_discovery_service.clone(), + Metrics::register(registry)?, + )) + .chain_selection(ChainSelectionSubsystem::new(chain_selection_config, parachains_db)) + .leaves(Vec::from_iter( + leaves + .into_iter() + .map(|BlockInfo { hash, parent_hash: _, number }| (hash, number)), + )) + .activation_external_listeners(Default::default()) + .span_per_active_leaf(Default::default()) + .active_leaves(Default::default()) + .supports_parachains(runtime_client) + .known_leaves(LruCache::new(KNOWN_LEAVES_CACHE_SIZE)) + .metrics(metrics) + .spawner(spawner); + Ok(builder) +} + /// Trait for the `fn` generating the overseer. /// /// Default behavior is to create an unmodified overseer, as `RealOverseerGen` @@ -263,6 +440,7 @@ pub trait OverseerGen { /// Overwrite the full generation of the overseer, including the subsystems. fn generate<'a, Spawner, RuntimeClient>( &self, + connector: OverseerConnector, args: OverseerGenArgs<'a, Spawner, RuntimeClient>, ) -> Result<(Overseer>, OverseerHandle), Error> where @@ -271,19 +449,22 @@ pub trait OverseerGen { Spawner: 'static + SpawnNamed + Clone + Unpin, { let gen = RealOverseerGen; - RealOverseerGen::generate::(&gen, args) + RealOverseerGen::generate::(&gen, connector, args) } // It would be nice to make `create_subsystems` part of this trait, // but the amount of generic arguments that would be required as // as consequence make this rather annoying to implement and use. } +use polkadot_overseer::KNOWN_LEAVES_CACHE_SIZE; + /// The regular set of subsystems. pub struct RealOverseerGen; impl OverseerGen for RealOverseerGen { fn generate<'a, Spawner, RuntimeClient>( &self, + connector: OverseerConnector, args: OverseerGenArgs<'a, Spawner, RuntimeClient>, ) -> Result<(Overseer>, OverseerHandle), Error> where @@ -291,14 +472,8 @@ impl OverseerGen for RealOverseerGen { RuntimeClient::Api: ParachainHost + BabeApi + AuthorityDiscoveryApi, Spawner: 'static + SpawnNamed + Clone + Unpin, { - let spawner = args.spawner.clone(); - let leaves = args.leaves.clone(); - let runtime_client = args.runtime_client.clone(); - let registry = args.registry.clone(); - - let all_subsystems = create_default_subsystems::(args)?; - - Overseer::new(leaves, all_subsystems, registry, runtime_client, spawner) + prepared_overseer_builder(args)? + .build_with_connector(connector) .map_err(|e| e.into()) } } diff --git a/node/service/src/relay_chain_selection.rs b/node/service/src/relay_chain_selection.rs index 184d526eac47..2033a06a5f22 100644 --- a/node/service/src/relay_chain_selection.rs +++ b/node/service/src/relay_chain_selection.rs @@ -39,7 +39,7 @@ use super::{HeaderProvider, HeaderProviderProvider}; use consensus_common::{Error as ConsensusError, SelectChain}; use futures::channel::oneshot; use polkadot_node_subsystem_util::metrics::{self, prometheus}; -use polkadot_overseer::{AllMessages, Handle, OverseerHandle}; +use polkadot_overseer::{AllMessages, Handle}; use polkadot_primitives::v1::{ Block as PolkadotBlock, BlockNumber, Hash, Header as PolkadotHeader, }; @@ -109,66 +109,62 @@ impl Metrics { } /// A chain-selection implementation which provides safety for relay chains. -pub struct SelectRelayChainWithFallback> { - // A fallback to use in case the overseer is disconnected. - // - // This is used on relay chains which have not yet enabled - // parachains as well as situations where the node is offline. - fallback: sc_consensus::LongestChain, - selection: SelectRelayChain, +pub struct SelectRelayChain> { + is_relay_chain: bool, + longest_chain: sc_consensus::LongestChain, + selection: SelectRelayChainInner, } -impl Clone for SelectRelayChainWithFallback +impl Clone for SelectRelayChain where B: sc_client_api::Backend, - SelectRelayChain: Clone, + SelectRelayChainInner: Clone, { fn clone(&self) -> Self { - Self { fallback: self.fallback.clone(), selection: self.selection.clone() } + Self { + longest_chain: self.longest_chain.clone(), + is_relay_chain: self.is_relay_chain, + selection: self.selection.clone(), + } } } -impl SelectRelayChainWithFallback +impl SelectRelayChain where B: sc_client_api::Backend + 'static, { - /// Create a new [`SelectRelayChainWithFallback`] wrapping the given chain backend + /// Create a new [`SelectRelayChain`] wrapping the given chain backend /// and a handle to the overseer. - pub fn new(backend: Arc, overseer: Handle, metrics: Metrics) -> Self { - SelectRelayChainWithFallback { - fallback: sc_consensus::LongestChain::new(backend.clone()), - selection: SelectRelayChain::new(backend, overseer, metrics), + pub fn new(backend: Arc, overseer: Handle, is_relay_chain: bool, metrics: Metrics) -> Self { + SelectRelayChain { + longest_chain: sc_consensus::LongestChain::new(backend.clone()), + selection: SelectRelayChainInner::new(backend, overseer, metrics), + is_relay_chain, } } -} -impl SelectRelayChainWithFallback -where - B: sc_client_api::Backend + 'static, -{ - /// Given an overseer handle, this connects the [`SelectRelayChainWithFallback`]'s - /// internal handle and its clones to the same overseer. - pub fn connect_to_overseer(&mut self, handle: OverseerHandle) { - self.selection.overseer.connect_to_overseer(handle); + /// Allow access to the inner chain, for usage during the node setup. + pub fn as_longest_chain(&self) -> &sc_consensus::LongestChain { + &self.longest_chain } } #[async_trait::async_trait] -impl SelectChain for SelectRelayChainWithFallback +impl SelectChain for SelectRelayChain where B: sc_client_api::Backend + 'static, { async fn leaves(&self) -> Result, ConsensusError> { - if self.selection.overseer.is_disconnected() { - return self.fallback.leaves().await + if !self.is_relay_chain { + return self.longest_chain.leaves().await } self.selection.leaves().await } async fn best_chain(&self) -> Result { - if self.selection.overseer.is_disconnected() { - return self.fallback.best_chain().await + if !self.is_relay_chain { + return self.longest_chain.best_chain().await } self.selection.best_chain().await } @@ -179,34 +175,34 @@ where maybe_max_number: Option, ) -> Result, ConsensusError> { let longest_chain_best = - self.fallback.finality_target(target_hash, maybe_max_number).await?; + self.longest_chain.finality_target(target_hash, maybe_max_number).await?; - if self.selection.overseer.is_disconnected() { + if !self.is_relay_chain { return Ok(longest_chain_best) } self.selection - .finality_target_with_fallback(target_hash, longest_chain_best, maybe_max_number) + .finality_target_with_longest_chain(target_hash, longest_chain_best, maybe_max_number) .await } } /// A chain-selection implementation which provides safety for relay chains /// but does not handle situations where the overseer is not yet connected. -pub struct SelectRelayChain { +pub struct SelectRelayChainInner { backend: Arc, overseer: OH, metrics: Metrics, } -impl SelectRelayChain +impl SelectRelayChainInner where B: HeaderProviderProvider, OH: OverseerHandleT, { - /// Create a new [`SelectRelayChain`] wrapping the given chain backend + /// Create a new [`SelectRelayChainInner`] wrapping the given chain backend /// and a handle to the overseer. pub fn new(backend: Arc, overseer: OH, metrics: Metrics) -> Self { - SelectRelayChain { backend, overseer, metrics } + SelectRelayChainInner { backend, overseer, metrics } } fn block_header(&self, hash: Hash) -> Result { @@ -234,13 +230,13 @@ where } } -impl Clone for SelectRelayChain +impl Clone for SelectRelayChainInner where B: HeaderProviderProvider + Send + Sync, OH: OverseerHandleT, { fn clone(&self) -> Self { - SelectRelayChain { + SelectRelayChainInner { backend: self.backend.clone(), overseer: self.overseer.clone(), metrics: self.metrics.clone(), @@ -273,7 +269,7 @@ impl OverseerHandleT for Handle { } } -impl SelectRelayChain +impl SelectRelayChainInner where B: HeaderProviderProvider, OH: OverseerHandleT, @@ -317,7 +313,7 @@ where /// /// It will also constrain the chain to only chains which are fully /// approved, and chains which contain no disputes. - pub(crate) async fn finality_target_with_fallback( + pub(crate) async fn finality_target_with_longest_chain( &self, target_hash: Hash, best_leaf: Option, diff --git a/node/service/src/tests.rs b/node/service/src/tests.rs index 34ac69d78cc7..7dc5fe19ecbd 100644 --- a/node/service/src/tests.rs +++ b/node/service/src/tests.rs @@ -79,7 +79,7 @@ fn test_harness>( let (finality_target_tx, finality_target_rx) = oneshot::channel::>(); - let select_relay_chain = SelectRelayChain::::new( + let select_relay_chain = SelectRelayChainInner::::new( Arc::new(case_vars.chain.clone()), context.sender().clone(), Default::default(), diff --git a/node/subsystem-test-helpers/src/lib.rs b/node/subsystem-test-helpers/src/lib.rs index 80dae08825b0..0ac52ee82346 100644 --- a/node/subsystem-test-helpers/src/lib.rs +++ b/node/subsystem-test-helpers/src/lib.rs @@ -372,7 +372,9 @@ mod tests { use super::*; use futures::executor::block_on; use polkadot_node_subsystem::messages::CollatorProtocolMessage; - use polkadot_overseer::{AllSubsystems, Handle, HeadSupportsParachains, Overseer}; + use polkadot_overseer::{ + AllSubsystems, Handle, HeadSupportsParachains, Overseer, OverseerConnector, + }; use polkadot_primitives::v1::Hash; struct AlwaysSupportsParachains; @@ -394,9 +396,10 @@ mod tests { None, AlwaysSupportsParachains, spawner.clone(), + OverseerConnector::default(), ) .unwrap(); - let mut handle = Handle::Connected(handle); + let mut handle = Handle::new(handle); spawner.spawn("overseer", overseer.run().then(|_| async { () }).boxed()); diff --git a/node/subsystem/src/lib.rs b/node/subsystem/src/lib.rs index f0918ab1dc02..054f0d5997bb 100644 --- a/node/subsystem/src/lib.rs +++ b/node/subsystem/src/lib.rs @@ -24,7 +24,9 @@ pub use jaeger::*; pub use polkadot_node_jaeger as jaeger; -pub use polkadot_overseer::{self as overseer, ActiveLeavesUpdate, DummySubsystem, OverseerSignal}; +pub use polkadot_overseer::{ + self as overseer, ActiveLeavesUpdate, OverseerConnector, OverseerSignal, +}; pub use polkadot_node_subsystem_types::{ errors::{self, *},