Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

remove connected disconnected state, 3rd attempt #3898

Merged
merged 20 commits into from
Sep 28, 2021
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion doc/testing.md
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ struct BehaveMaleficient;
impl OverseerGen for BehaveMaleficient {
fn generate<'a, Spawner, RuntimeClient>(
&self,
connector: OverseerConnector,
args: OverseerGenArgs<'a, Spawner, RuntimeClient>,
) -> Result<(Overseer<Spawner, Arc<RuntimeClient>>, OverseerHandler), Error>
where
Expand Down Expand Up @@ -213,7 +214,7 @@ impl OverseerGen for BehaveMaleficient {
),
);

Overseer::new(leaves, all_subsystems, registry, runtime_client, spawner)
Overseer::new(leaves, all_subsystems, registry, runtime_client, spawner, connector)
.map_err(|e| e.into())

// A builder pattern will simplify this further
Expand Down
2 changes: 1 addition & 1 deletion node/malus/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@ use super::*;
use polkadot_node_subsystem_test_helpers::*;

use polkadot_node_subsystem::{
dummy::DummySubsystem,
messages::{AllMessages, AvailabilityStoreMessage},
overseer::{gen::TimeoutExt, Subsystem},
DummySubsystem,
};

#[derive(Clone, Debug)]
Expand Down
5 changes: 3 additions & 2 deletions node/malus/src/variant-a.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ use polkadot_cli::{
use polkadot_node_core_candidate_validation::CandidateValidationSubsystem;
use polkadot_node_subsystem::{
messages::{AllMessages, CandidateValidationMessage},
overseer::{self, OverseerHandle},
overseer::{self, OverseerConnector, OverseerHandle},
FromOverseer,
};

Expand Down Expand Up @@ -86,6 +86,7 @@ struct BehaveMaleficient;
impl OverseerGen for BehaveMaleficient {
fn generate<'a, Spawner, RuntimeClient>(
&self,
connector: OverseerConnector,
args: OverseerGenArgs<'a, Spawner, RuntimeClient>,
) -> Result<(Overseer<Spawner, Arc<RuntimeClient>>, OverseerHandle), Error>
where
Expand Down Expand Up @@ -113,7 +114,7 @@ impl OverseerGen for BehaveMaleficient {
},
);

Overseer::new(leaves, all_subsystems, registry, runtime_client, spawner)
Overseer::new(leaves, all_subsystems, registry, runtime_client, spawner, connector)
.map_err(|e| e.into())
}
}
Expand Down
14 changes: 11 additions & 3 deletions node/overseer/examples/minimal-example.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ use polkadot_node_subsystem_types::messages::{
use polkadot_overseer::{
self as overseer,
gen::{FromOverseer, SpawnedSubsystem},
AllMessages, AllSubsystems, HeadSupportsParachains, Overseer, OverseerSignal, SubsystemError,
AllMessages, AllSubsystems, HeadSupportsParachains, Overseer, OverseerConnector,
OverseerSignal, SubsystemError,
};
use polkadot_primitives::v1::Hash;

Expand Down Expand Up @@ -173,8 +174,15 @@ fn main() {
.replace_candidate_validation(|_| Subsystem2)
.replace_candidate_backing(|orig| orig);

let (overseer, _handle) =
Overseer::new(vec![], all_subsystems, None, AlwaysSupportsParachains, spawner).unwrap();
let (overseer, _handle) = Overseer::new(
vec![],
all_subsystems,
None,
AlwaysSupportsParachains,
spawner,
OverseerConnector::default(),
)
.unwrap();
let overseer_fut = overseer.run().fuse();
let timer_stream = timer_stream;

Expand Down
6 changes: 5 additions & 1 deletion node/overseer/overseer-gen/proc-macro/src/impl_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,9 +130,13 @@ pub(crate) fn impl_builder(info: &OverseerInfo) -> proc_macro2::TokenStream {
&mut self.handle
}
/// Obtain access to the overseer handle.
pub fn as_handle(&mut self) -> &#handle {
pub fn as_handle(&self) -> &#handle {
&self.handle
}
/// Obtain a clone of the handle.
pub fn handle(&self) -> #handle {
self.handle.clone()
}
}

impl ::std::default::Default for #connector {
Expand Down
54 changes: 54 additions & 0 deletions node/overseer/src/dummy.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
// Copyright 2020 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.

// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.

// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.

// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.

use crate::{AllMessages, OverseerSignal};
use polkadot_node_subsystem_types::errors::SubsystemError;
use polkadot_overseer_gen::{FromOverseer, SpawnedSubsystem, Subsystem, SubsystemContext};

/// A dummy subsystem that implements [`Subsystem`] for all
/// types of messages. Used for tests or as a placeholder.
#[derive(Clone, Copy, Debug)]
pub struct DummySubsystem;

impl<Context> Subsystem<Context, SubsystemError> for DummySubsystem
where
Context: SubsystemContext<
Signal = OverseerSignal,
Error = SubsystemError,
AllMessages = AllMessages,
>,
{
fn start(self, mut ctx: Context) -> SpawnedSubsystem<SubsystemError> {
let future = Box::pin(async move {
loop {
match ctx.recv().await {
Err(_) => return Ok(()),
Ok(FromOverseer::Signal(OverseerSignal::Conclude)) => return Ok(()),
Ok(overseer_msg) => {
tracing::debug!(
target: "dummy-subsystem",
"Discarding a message sent from overseer {:?}",
overseer_msg
);
continue
},
}
}
});

SpawnedSubsystem { name: "dummy-subsystem", future }
}
}
93 changes: 21 additions & 72 deletions node/overseer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ use std::{

use futures::{channel::oneshot, future::BoxFuture, select, Future, FutureExt, StreamExt};
use lru::LruCache;
use parking_lot::RwLock;

use client::{BlockImportNotification, BlockchainEvents, FinalityNotification};
use polkadot_primitives::v1::{Block, BlockId, BlockNumber, Hash, ParachainHost};
Expand All @@ -91,15 +90,18 @@ pub use polkadot_node_subsystem_types::{
jaeger, ActivatedLeaf, ActiveLeavesUpdate, LeafStatus, OverseerSignal,
};

/// Test helper supplements.
pub mod dummy;
pub use self::dummy::DummySubsystem;

// TODO legacy, to be deleted, left for easier integration
// TODO https://github.com/paritytech/polkadot/issues/3427
mod subsystems;
pub use self::subsystems::{AllSubsystems, DummySubsystem};
pub use self::subsystems::AllSubsystems;

mod metrics;
use self::metrics::Metrics;
pub mod metrics;

use polkadot_node_metrics::{
pub use polkadot_node_metrics::{
metrics::{prometheus, Metrics as MetricsTrait},
Metronome,
};
Expand All @@ -115,7 +117,7 @@ pub use polkadot_overseer_gen::{

/// Store 2 days worth of blocks, not accounting for forks,
/// in the LRU cache. Assumes a 6-second block time.
const KNOWN_LEAVES_CACHE_SIZE: usize = 2 * 24 * 3600 / 6;
pub const KNOWN_LEAVES_CACHE_SIZE: usize = 2 * 24 * 3600 / 6;

#[cfg(test)]
mod tests;
Expand All @@ -141,18 +143,12 @@ where
///
/// [`Overseer`]: struct.Overseer.html
#[derive(Clone)]
pub enum Handle {
/// Used only at initialization to break the cyclic dependency.
// TODO: refactor in https://github.com/paritytech/polkadot/issues/3427
Disconnected(Arc<RwLock<Option<OverseerHandle>>>),
/// A handle to the overseer.
Connected(OverseerHandle),
}
pub struct Handle(OverseerHandle);

impl Handle {
/// Create a new disconnected [`Handle`].
pub fn new_disconnected() -> Self {
Self::Disconnected(Arc::new(RwLock::new(None)))
/// Create a new [`Handle`].
pub fn new(raw: OverseerHandle) -> Self {
Self(raw)
}

/// Inform the `Overseer` that that some block was imported.
Expand Down Expand Up @@ -201,58 +197,8 @@ impl Handle {

/// Most basic operation, to stop a server.
async fn send_and_log_error(&mut self, event: Event) {
self.try_connect();
if let Self::Connected(ref mut handle) = self {
if handle.send(event).await.is_err() {
tracing::info!(target: LOG_TARGET, "Failed to send an event to Overseer");
}
} else {
tracing::warn!(target: LOG_TARGET, "Using a disconnected Handle to send to Overseer");
}
}

/// Whether the handle is disconnected.
pub fn is_disconnected(&self) -> bool {
match self {
Self::Disconnected(ref x) => x.read().is_none(),
_ => false,
}
}

/// Connect this handle and all disconnected clones of it to the overseer.
pub fn connect_to_overseer(&mut self, handle: OverseerHandle) {
match self {
Self::Disconnected(ref mut x) => {
let mut maybe_handle = x.write();
if maybe_handle.is_none() {
tracing::info!(target: LOG_TARGET, "🖇️ Connecting all Handles to Overseer");
*maybe_handle = Some(handle);
} else {
tracing::warn!(
target: LOG_TARGET,
"Attempting to connect a clone of a connected Handle",
);
}
},
_ => {
tracing::warn!(
target: LOG_TARGET,
"Attempting to connect an already connected Handle",
);
},
}
}

/// Try upgrading from `Self::Disconnected` to `Self::Connected` state
/// after calling `connect_to_overseer` on `self` or a clone of `self`.
fn try_connect(&mut self) {
if let Self::Disconnected(ref mut x) = self {
let guard = x.write();
if let Some(ref h) = *guard {
let handle = h.clone();
drop(guard);
*self = Self::Connected(handle);
}
if self.0.send(event).await.is_err() {
tracing::info!(target: LOG_TARGET, "Failed to send an event to Overseer");
}
}
}
Expand Down Expand Up @@ -439,7 +385,7 @@ pub struct Overseer<SupportsParachains> {
pub known_leaves: LruCache<Hash, ()>,

/// Various Prometheus metrics.
pub metrics: Metrics,
pub metrics: crate::metrics::Metrics,
}

impl<S, SupportsParachains> Overseer<S, SupportsParachains>
Expand Down Expand Up @@ -490,12 +436,13 @@ where
/// # use polkadot_primitives::v1::Hash;
/// # use polkadot_overseer::{
/// # self as overseer,
/// # Overseer,
/// # OverseerSignal,
/// # OverseerConnector,
/// # SubsystemSender as _,
/// # AllMessages,
/// # AllSubsystems,
/// # HeadSupportsParachains,
/// # Overseer,
/// # SubsystemError,
/// # gen::{
/// # SubsystemContext,
Expand Down Expand Up @@ -549,6 +496,7 @@ where
/// None,
/// AlwaysSupportsParachains,
/// spawner,
/// OverseerConnector::default(),
/// ).unwrap();
///
/// let timer = Delay::new(Duration::from_millis(50)).fuse();
Expand Down Expand Up @@ -615,6 +563,7 @@ where
prometheus_registry: Option<&prometheus::Registry>,
supports_parachains: SupportsParachains,
s: S,
connector: OverseerConnector,
) -> SubsystemResult<(Self, OverseerHandle)>
where
CV: Subsystem<OverseerSubsystemContext<CandidateValidationMessage>, SubsystemError> + Send,
Expand Down Expand Up @@ -643,7 +592,7 @@ where
CS: Subsystem<OverseerSubsystemContext<ChainSelectionMessage>, SubsystemError> + Send,
S: SpawnNamed,
{
let metrics: Metrics = <Metrics as MetricsTrait>::register(prometheus_registry)?;
let metrics = <crate::metrics::Metrics as MetricsTrait>::register(prometheus_registry)?;

let (mut overseer, handle) = Self::builder()
.candidate_validation(all_subsystems.candidate_validation)
Expand Down Expand Up @@ -679,7 +628,7 @@ where
.supports_parachains(supports_parachains)
.metrics(metrics.clone())
.spawner(s)
.build()?;
.build_with_connector(connector)?;

// spawn the metrics metronome task
{
Expand Down
4 changes: 2 additions & 2 deletions node/overseer/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
//! Prometheus metrics related to the overseer and its channels.

use super::*;
use polkadot_node_metrics::metrics::{self, prometheus};
pub use polkadot_node_metrics::metrics::{self, prometheus, Metrics as MetricsTrait};

use parity_util_mem::MemoryAllocationSnapshot;

Expand Down Expand Up @@ -110,7 +110,7 @@ impl Metrics {
}
}

impl metrics::Metrics for Metrics {
impl MetricsTrait for Metrics {
fn try_register(registry: &prometheus::Registry) -> Result<Self, prometheus::PrometheusError> {
let metrics = MetricsInner {
activated_heads_total: prometheus::register(
Expand Down
Loading