diff --git a/node/network/dispute-distribution/src/metrics.rs b/node/network/dispute-distribution/src/metrics.rs index 9b891b47b79c..54a1bb9e6085 100644 --- a/node/network/dispute-distribution/src/metrics.rs +++ b/node/network/dispute-distribution/src/metrics.rs @@ -46,6 +46,9 @@ struct MetricsInner { /// /// We both have successful imports and failed imports here. imported_requests: CounterVec, + + /// The duration of issued dispute request to response. + time_dispute_request: prometheus::Histogram, } impl Metrics { @@ -61,7 +64,7 @@ impl Metrics { } } - /// Increment counter on served chunks. + /// Increment counter on served disputes. pub fn on_received_request(&self) { if let Some(metrics) = &self.0 { metrics.received_requests.inc() @@ -74,6 +77,11 @@ impl Metrics { metrics.imported_requests.with_label_values(&[label]).inc() } } + + /// Get a timer to time request/response duration. + pub fn time_dispute_request(&self) -> Option { + self.0.as_ref().map(|metrics| metrics.time_dispute_request.start_timer()) + } } impl metrics::Metrics for Metrics { @@ -106,6 +114,13 @@ impl metrics::Metrics for Metrics { )?, registry, )?, + time_dispute_request: prometheus::register( + prometheus::Histogram::with_opts(prometheus::HistogramOpts::new( + "parachain_dispute_distribution_time_dispute_request", + "Time needed for dispute votes to get confirmed/fail getting transmitted.", + ))?, + registry, + )?, }; Ok(Metrics(Some(metrics))) } diff --git a/node/network/dispute-distribution/src/sender/mod.rs b/node/network/dispute-distribution/src/sender/mod.rs index c640d5374245..b1243ec0ef32 100644 --- a/node/network/dispute-distribution/src/sender/mod.rs +++ b/node/network/dispute-distribution/src/sender/mod.rs @@ -97,9 +97,15 @@ impl DisputeSender { return Ok(()) }, Entry::Vacant(vacant) => { - let send_task = - SendTask::new(ctx, runtime, &self.active_sessions, self.tx.clone(), req) - .await?; + let send_task = SendTask::new( + ctx, + runtime, + &self.active_sessions, + self.tx.clone(), + req, + &self.metrics, + ) + .await?; vacant.insert(send_task); }, } @@ -140,7 +146,9 @@ impl DisputeSender { for dispute in self.disputes.values_mut() { if have_new_sessions || dispute.has_failed_sends() { - dispute.refresh_sends(ctx, runtime, &self.active_sessions).await?; + dispute + .refresh_sends(ctx, runtime, &self.active_sessions, &self.metrics) + .await?; } } diff --git a/node/network/dispute-distribution/src/sender/send_task.rs b/node/network/dispute-distribution/src/sender/send_task.rs index 0c3681a30a99..6b7958b06281 100644 --- a/node/network/dispute-distribution/src/sender/send_task.rs +++ b/node/network/dispute-distribution/src/sender/send_task.rs @@ -20,12 +20,13 @@ use futures::{channel::mpsc, future::RemoteHandle, Future, FutureExt, SinkExt}; use polkadot_node_network_protocol::{ request_response::{ + outgoing::RequestError, v1::{DisputeRequest, DisputeResponse}, OutgoingRequest, OutgoingResult, Recipient, Requests, }, IfDisconnected, }; -use polkadot_node_subsystem_util::runtime::RuntimeInfo; +use polkadot_node_subsystem_util::{metrics, runtime::RuntimeInfo}; use polkadot_primitives::v1::{ AuthorityDiscoveryId, CandidateHash, Hash, SessionIndex, ValidatorIndex, }; @@ -38,7 +39,7 @@ use super::error::{Fatal, Result}; use crate::{ metrics::{FAILED, SUCCEEDED}, - LOG_TARGET, + Metrics, LOG_TARGET, }; /// Delivery status for a particular dispute. @@ -57,6 +58,16 @@ pub struct SendTask { /// Whether we have any tasks failed since the last refresh. has_failed_sends: bool, + /// Total count of failed transmissions. + /// + /// Used for issuing a warning, if that number gets above a certain threshold. + failed_count: usize, + + /// Total number of initiated requests. + /// + /// Used together with `failed_count` for issuing a warning on too many failed attempts. + send_count: usize, + /// Sender to be cloned for tasks. tx: mpsc::Sender, } @@ -87,14 +98,14 @@ pub enum TaskResult { /// Task was not able to get the request out to its peer. /// /// It should be retried in that case. - Failed, + Failed(RequestError), } impl TaskResult { pub fn as_metrics_label(&self) -> &'static str { match self { Self::Succeeded => SUCCEEDED, - Self::Failed => FAILED, + Self::Failed(_) => FAILED, } } } @@ -107,10 +118,17 @@ impl SendTask { active_sessions: &HashMap, tx: mpsc::Sender, request: DisputeRequest, + metrics: &Metrics, ) -> Result { - let mut send_task = - Self { request, deliveries: HashMap::new(), has_failed_sends: false, tx }; - send_task.refresh_sends(ctx, runtime, active_sessions).await?; + let mut send_task = Self { + request, + deliveries: HashMap::new(), + has_failed_sends: false, + tx, + failed_count: 0, + send_count: 0, + }; + send_task.refresh_sends(ctx, runtime, active_sessions, metrics).await?; Ok(send_task) } @@ -123,6 +141,7 @@ impl SendTask { ctx: &mut Context, runtime: &mut RuntimeInfo, active_sessions: &HashMap, + metrics: &Metrics, ) -> Result<()> { let new_authorities = self.get_relevant_validators(ctx, runtime, active_sessions).await?; @@ -137,10 +156,12 @@ impl SendTask { // Start any new tasks that are needed: let new_statuses = - send_requests(ctx, self.tx.clone(), add_authorities, self.request.clone()).await?; + send_requests(ctx, self.tx.clone(), add_authorities, self.request.clone(), metrics) + .await?; - self.deliveries.extend(new_statuses.into_iter()); self.has_failed_sends = false; + self.send_count += new_statuses.len(); + self.deliveries.extend(new_statuses.into_iter()); Ok(()) } @@ -150,15 +171,38 @@ impl SendTask { } /// Handle a finished response waiting task. + /// + /// Called by `DisputeSender` upon reception of the corresponding message from our spawned `wait_response_task`. pub fn on_finished_send(&mut self, authority: &AuthorityDiscoveryId, result: TaskResult) { match result { - TaskResult::Failed => { - tracing::warn!( + TaskResult::Failed(err) => { + tracing::debug!( target: LOG_TARGET, - candidate = ?self.request.0.candidate_receipt.hash(), ?authority, - "Could not get our message out! If this keeps happening, then check chain whether the dispute made it there." + candidate_hash = %self.request.0.candidate_receipt.hash(), + %err, + "Error sending dispute statements to node." ); + + self.failed_count += 1; + let error_rate = (100 * self.failed_count).checked_div(self.send_count).expect( + "We cannot receive a failed request, without having sent one first. qed.", + ); + // 10% seems to be a sensible threshold to become alert - note that + // self.send_count gets increased in batches of the full validator set, so we don't + // need to account for a low send_count. + if error_rate > 10 { + tracing::warn!( + target: LOG_TARGET, + candidate_hash = %self.request.0.candidate_receipt.hash(), + last_authority = ?authority, + last_error = %err, + failed_count = ?self.failed_count, + total_attempts = ?self.send_count, + "Sending our dispute vote failed for more than 10% of total attempts!" + ); + } + self.has_failed_sends = true; // Remove state, so we know what to try again: self.deliveries.remove(authority); @@ -236,6 +280,7 @@ async fn send_requests( tx: mpsc::Sender, receivers: Vec, req: DisputeRequest, + metrics: &Metrics, ) -> Result> { let mut statuses = HashMap::with_capacity(receivers.len()); let mut reqs = Vec::with_capacity(receivers.len()); @@ -251,6 +296,7 @@ async fn send_requests( req.0.candidate_receipt.hash(), receiver.clone(), tx.clone(), + metrics.time_dispute_request(), ); let (remote, remote_handle) = fut.remote_handle(); @@ -273,28 +319,13 @@ async fn wait_response_task( candidate_hash: CandidateHash, receiver: AuthorityDiscoveryId, mut tx: mpsc::Sender, + _timer: Option, ) { let result = pending_response.await; let msg = match result { - Err(err) => { - tracing::warn!( - target: LOG_TARGET, - %candidate_hash, - %receiver, - %err, - "Error sending dispute statements to node." - ); - TaskFinish { candidate_hash, receiver, result: TaskResult::Failed } - }, - Ok(DisputeResponse::Confirmed) => { - tracing::trace!( - target: LOG_TARGET, - %candidate_hash, - %receiver, - "Sending dispute message succeeded" - ); - TaskFinish { candidate_hash, receiver, result: TaskResult::Succeeded } - }, + Err(err) => TaskFinish { candidate_hash, receiver, result: TaskResult::Failed(err) }, + Ok(DisputeResponse::Confirmed) => + TaskFinish { candidate_hash, receiver, result: TaskResult::Succeeded }, }; if let Err(err) = tx.feed(msg).await { tracing::debug!(