Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
Merge branch 'master' into ao-job-manager-dont-bring-down-subsystem-o…
Browse files Browse the repository at this point in the history
…n-race-condition

* master:
  initial prometheus metrics (#1536)
  Companion for Substrate 6868 (WeightInfo for System, Utility, and Timestamp) (#1606)
  Promote `HrmpChannelId` and supply more docs (#1595)
  move AssignmentKind and CoreAssigment to scheduler (#1571)
  overseer: fix build (#1596)
  • Loading branch information
ordian committed Aug 18, 2020
2 parents ccd8f78 + 804958a commit 473587a
Show file tree
Hide file tree
Showing 43 changed files with 1,375 additions and 365 deletions.
279 changes: 140 additions & 139 deletions Cargo.lock

Large diffs are not rendered by default.

62 changes: 55 additions & 7 deletions node/collation-generation/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use polkadot_node_subsystem::{
errors::RuntimeApiError,
messages::{AllMessages, CollationGenerationMessage, CollatorProtocolMessage},
FromOverseer, SpawnedSubsystem, Subsystem, SubsystemContext, SubsystemError, SubsystemResult,
metrics::{self, prometheus},
};
use polkadot_node_subsystem_util::{
self as util, request_availability_cores_ctx, request_global_validation_data_ctx,
Expand All @@ -47,9 +48,18 @@ use std::sync::Arc;
/// Collation Generation Subsystem
pub struct CollationGenerationSubsystem {
config: Option<Arc<CollationGenerationConfig>>,
metrics: Metrics,
}

impl CollationGenerationSubsystem {
/// Create a new instance of the `CollationGenerationSubsystem`.
pub fn new(metrics: Metrics) -> Self {
Self {
config: None,
metrics,
}
}

/// Run this subsystem
///
/// Conceptually, this is very simple: it just loops forever.
Expand Down Expand Up @@ -112,8 +122,9 @@ impl CollationGenerationSubsystem {
Ok(Signal(ActiveLeaves(ActiveLeavesUpdate { activated, .. }))) => {
// follow the procedure from the guide
if let Some(config) = &self.config {
let metrics = self.metrics.clone();
if let Err(err) =
handle_new_activations(config.clone(), &activated, ctx, sender).await
handle_new_activations(config.clone(), &activated, ctx, metrics, sender).await
{
log::warn!(target: "collation_generation", "failed to handle new activations: {:?}", err);
return true;
Expand Down Expand Up @@ -146,13 +157,13 @@ impl<Context> Subsystem<Context> for CollationGenerationSubsystem
where
Context: SubsystemContext<Message = CollationGenerationMessage>,
{
fn start(self, ctx: Context) -> SpawnedSubsystem {
let subsystem = CollationGenerationSubsystem { config: None };
type Metrics = Metrics;

let future = Box::pin(subsystem.run(ctx));
fn start(self, ctx: Context) -> SpawnedSubsystem {
let future = Box::pin(self.run(ctx));

SpawnedSubsystem {
name: "CollationGenerationSubsystem",
name: "collation-generation-subsystem",
future,
}
}
Expand All @@ -178,6 +189,7 @@ async fn handle_new_activations<Context: SubsystemContext>(
config: Arc<CollationGenerationConfig>,
activated: &[Hash],
ctx: &mut Context,
metrics: Metrics,
sender: &mpsc::Sender<AllMessages>,
) -> Result<()> {
// follow the procedure from the guide:
Expand Down Expand Up @@ -230,6 +242,7 @@ async fn handle_new_activations<Context: SubsystemContext>(
let task_global_validation_data = global_validation_data.clone();
let task_config = config.clone();
let mut task_sender = sender.clone();
let metrics = metrics.clone();
ctx.spawn("collation generation collation builder", Box::pin(async move {
let validation_data_hash =
validation_data_hash(&task_global_validation_data, &local_validation_data);
Expand Down Expand Up @@ -273,6 +286,8 @@ async fn handle_new_activations<Context: SubsystemContext>(
},
};

metrics.on_collation_generated();

if let Err(err) = task_sender.send(AllMessages::CollatorProtocol(
CollatorProtocolMessage::DistributeCollation(ccr, collation.proof_of_validity)
)).await {
Expand Down Expand Up @@ -305,6 +320,38 @@ fn erasure_root(
Ok(polkadot_erasure_coding::branches(&chunks).root())
}

#[derive(Clone)]
struct MetricsInner {
collations_generated_total: prometheus::Counter<prometheus::U64>,
}

/// CollationGenerationSubsystem metrics.
#[derive(Default, Clone)]
pub struct Metrics(Option<MetricsInner>);

impl Metrics {
fn on_collation_generated(&self) {
if let Some(metrics) = &self.0 {
metrics.collations_generated_total.inc();
}
}
}

impl metrics::Metrics for Metrics {
fn try_register(registry: &prometheus::Registry) -> std::result::Result<Self, prometheus::PrometheusError> {
let metrics = MetricsInner {
collations_generated_total: prometheus::register(
prometheus::Counter::new(
"parachain_collations_generated_total",
"Number of collations generated."
)?,
registry,
)?,
};
Ok(Metrics(Some(metrics)))
}
}

#[cfg(test)]
mod tests {
mod handle_new_activations {
Expand Down Expand Up @@ -411,6 +458,7 @@ mod tests {
test_config(123),
&subsystem_activated_hashes,
&mut ctx,
Metrics(None),
&tx,
)
.await
Expand Down Expand Up @@ -498,7 +546,7 @@ mod tests {
let (tx, _rx) = mpsc::channel(0);

subsystem_test_harness(overseer, |mut ctx| async move {
handle_new_activations(test_config(16), &activated_hashes, &mut ctx, &tx)
handle_new_activations(test_config(16), &activated_hashes, &mut ctx, Metrics(None), &tx)
.await
.unwrap();
});
Expand Down Expand Up @@ -581,7 +629,7 @@ mod tests {
let sent_messages = Arc::new(Mutex::new(Vec::new()));
let subsystem_sent_messages = sent_messages.clone();
subsystem_test_harness(overseer, |mut ctx| async move {
handle_new_activations(subsystem_config, &activated_hashes, &mut ctx, &tx)
handle_new_activations(subsystem_config, &activated_hashes, &mut ctx, Metrics(None), &tx)
.await
.unwrap();

Expand Down
66 changes: 55 additions & 11 deletions node/core/av-store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ use polkadot_primitives::v1::{
};
use polkadot_subsystem::{
FromOverseer, SubsystemError, Subsystem, SubsystemContext, SpawnedSubsystem,
metrics::{self, prometheus},
};
use polkadot_subsystem::messages::AvailabilityStoreMessage;

Expand All @@ -59,6 +60,7 @@ enum Error {
/// An implementation of the Availability Store subsystem.
pub struct AvailabilityStoreSubsystem {
inner: Arc<dyn KeyValueDB>,
metrics: Metrics,
}

fn available_data_key(candidate_hash: &Hash) -> Vec<u8> {
Expand All @@ -85,7 +87,7 @@ pub struct Config {

impl AvailabilityStoreSubsystem {
/// Create a new `AvailabilityStoreSubsystem` with a given config on disk.
pub fn new_on_disk(config: Config) -> io::Result<Self> {
pub fn new_on_disk(config: Config, metrics: Metrics) -> io::Result<Self> {
let mut db_config = DatabaseConfig::with_columns(columns::NUM_COLUMNS);

if let Some(cache_size) = config.cache_size {
Expand All @@ -106,13 +108,15 @@ impl AvailabilityStoreSubsystem {

Ok(Self {
inner: Arc::new(db),
metrics,
})
}

#[cfg(test)]
fn new_in_memory(inner: Arc<dyn KeyValueDB>) -> Self {
Self {
inner,
metrics: Metrics(None),
}
}
}
Expand All @@ -130,7 +134,7 @@ where
Ok(FromOverseer::Signal(Conclude)) => break,
Ok(FromOverseer::Signal(_)) => (),
Ok(FromOverseer::Communication { msg }) => {
process_message(&subsystem.inner, msg)?;
process_message(&subsystem.inner, &subsystem.metrics, msg)?;
}
Err(_) => break,
}
Expand All @@ -142,7 +146,7 @@ where
Ok(())
}

fn process_message(db: &Arc<dyn KeyValueDB>, msg: AvailabilityStoreMessage) -> Result<(), Error> {
fn process_message(db: &Arc<dyn KeyValueDB>, metrics: &Metrics, msg: AvailabilityStoreMessage) -> Result<(), Error> {
use AvailabilityStoreMessage::*;
match msg {
QueryAvailableData(hash, tx) => {
Expand All @@ -152,10 +156,10 @@ fn process_message(db: &Arc<dyn KeyValueDB>, msg: AvailabilityStoreMessage) -> R
tx.send(available_data(db, &hash).is_some()).map_err(|_| oneshot::Canceled)?;
}
QueryChunk(hash, id, tx) => {
tx.send(get_chunk(db, &hash, id)?).map_err(|_| oneshot::Canceled)?;
tx.send(get_chunk(db, &hash, id, metrics)?).map_err(|_| oneshot::Canceled)?;
}
QueryChunkAvailability(hash, id, tx) => {
tx.send(get_chunk(db, &hash, id)?.is_some()).map_err(|_| oneshot::Canceled)?;
tx.send(get_chunk(db, &hash, id, metrics)?.is_some()).map_err(|_| oneshot::Canceled)?;
}
StoreChunk(hash, id, chunk, tx) => {
match store_chunk(db, &hash, id, chunk) {
Expand All @@ -169,7 +173,7 @@ fn process_message(db: &Arc<dyn KeyValueDB>, msg: AvailabilityStoreMessage) -> R
}
}
StoreAvailableData(hash, id, n_validators, av_data, tx) => {
match store_available_data(db, &hash, id, n_validators, av_data) {
match store_available_data(db, &hash, id, n_validators, av_data, metrics) {
Err(e) => {
tx.send(Err(())).map_err(|_| oneshot::Canceled)?;
return Err(e);
Expand All @@ -194,11 +198,12 @@ fn store_available_data(
id: Option<ValidatorIndex>,
n_validators: u32,
available_data: AvailableData,
metrics: &Metrics,
) -> Result<(), Error> {
let mut tx = DBTransaction::new();

if let Some(index) = id {
let chunks = get_chunks(&available_data, n_validators as usize)?;
let chunks = get_chunks(&available_data, n_validators as usize, metrics)?;
store_chunk(db, candidate_hash, n_validators, chunks[index as usize].clone())?;
}

Expand Down Expand Up @@ -231,7 +236,7 @@ fn store_chunk(db: &Arc<dyn KeyValueDB>, candidate_hash: &Hash, _n_validators: u
Ok(())
}

fn get_chunk(db: &Arc<dyn KeyValueDB>, candidate_hash: &Hash, index: u32)
fn get_chunk(db: &Arc<dyn KeyValueDB>, candidate_hash: &Hash, index: u32, metrics: &Metrics)
-> Result<Option<ErasureChunk>, Error>
{
if let Some(chunk) = query_inner(
Expand All @@ -242,7 +247,7 @@ fn get_chunk(db: &Arc<dyn KeyValueDB>, candidate_hash: &Hash, index: u32)
}

if let Some(data) = available_data(db, candidate_hash) {
let mut chunks = get_chunks(&data.data, data.n_validators as usize)?;
let mut chunks = get_chunks(&data.data, data.n_validators as usize, metrics)?;
let desired_chunk = chunks.get(index as usize).cloned();
for chunk in chunks.drain(..) {
store_chunk(db, candidate_hash, data.n_validators, chunk)?;
Expand Down Expand Up @@ -271,6 +276,8 @@ impl<Context> Subsystem<Context> for AvailabilityStoreSubsystem
where
Context: SubsystemContext<Message=AvailabilityStoreMessage>,
{
type Metrics = Metrics;

fn start(self, ctx: Context) -> SpawnedSubsystem {
let future = Box::pin(async move {
if let Err(e) = run(self, ctx).await {
Expand All @@ -285,8 +292,9 @@ impl<Context> Subsystem<Context> for AvailabilityStoreSubsystem
}
}

fn get_chunks(data: &AvailableData, n_validators: usize) -> Result<Vec<ErasureChunk>, Error> {
fn get_chunks(data: &AvailableData, n_validators: usize, metrics: &Metrics) -> Result<Vec<ErasureChunk>, Error> {
let chunks = erasure::obtain_chunks_v1(n_validators, data)?;
metrics.on_chunks_received(chunks.len());
let branches = erasure::branches(chunks.as_ref());

Ok(chunks
Expand All @@ -302,6 +310,41 @@ fn get_chunks(data: &AvailableData, n_validators: usize) -> Result<Vec<ErasureCh
)
}

#[derive(Clone)]
struct MetricsInner {
received_availability_chunks_total: prometheus::Counter<prometheus::U64>,
}

/// Availability metrics.
#[derive(Default, Clone)]
pub struct Metrics(Option<MetricsInner>);

impl Metrics {
fn on_chunks_received(&self, count: usize) {
if let Some(metrics) = &self.0 {
use core::convert::TryFrom as _;
// assume usize fits into u64
let by = u64::try_from(count).unwrap_or_default();
metrics.received_availability_chunks_total.inc_by(by);
}
}
}

impl metrics::Metrics for Metrics {
fn try_register(registry: &prometheus::Registry) -> Result<Self, prometheus::PrometheusError> {
let metrics = MetricsInner {
received_availability_chunks_total: prometheus::register(
prometheus::Counter::new(
"parachain_received_availability_chunks_total",
"Number of availability chunks received.",
)?,
registry,
)?,
};
Ok(Metrics(Some(metrics)))
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down Expand Up @@ -501,7 +544,8 @@ mod tests {
omitted_validation,
};

let chunks_expected = get_chunks(&available_data, n_validators as usize).unwrap();
let no_metrics = Metrics(None);
let chunks_expected = get_chunks(&available_data, n_validators as usize, &no_metrics).unwrap();

let (tx, rx) = oneshot::channel();
let block_msg = AvailabilityStoreMessage::StoreAvailableData(
Expand Down
Loading

0 comments on commit 473587a

Please sign in to comment.