From 9b3b7ca7188aa6d4062a81334139e60ed4c7cad0 Mon Sep 17 00:00:00 2001 From: Vsevolod Stakhov Date: Sun, 17 Sep 2023 21:46:03 +0100 Subject: [PATCH] Revert #1409 partially --- polkadot/node/network/bridge/src/rx/mod.rs | 62 ++++------------------ 1 file changed, 9 insertions(+), 53 deletions(-) diff --git a/polkadot/node/network/bridge/src/rx/mod.rs b/polkadot/node/network/bridge/src/rx/mod.rs index e1125ebc904d..abf93f85348d 100644 --- a/polkadot/node/network/bridge/src/rx/mod.rs +++ b/polkadot/node/network/bridge/src/rx/mod.rs @@ -21,8 +21,7 @@ use super::*; use always_assert::never; use bytes::Bytes; use futures::{ - future::BoxFuture, - stream::{BoxStream, FuturesUnordered, StreamExt}, + stream::{BoxStream, StreamExt}, }; use parity_scale_codec::{Decode, DecodeAll}; @@ -748,6 +747,7 @@ where futures::pin_mut!(orchestra_signal_handler); + // TODO: use `select_with_strategy` here to prefer signals processing futures::future::select(orchestra_signal_handler, network_event_handler) .await .factor_first() @@ -1044,65 +1044,21 @@ fn dispatch_collation_event_to_all_unbounded( } } -fn send_or_queue_validation_event( - event: E, - sender: &mut Sender, - delayed_queue: &FuturesUnordered>, -) where - E: Send + 'static, - Sender: overseer::NetworkBridgeRxSenderTrait + overseer::SubsystemSender, -{ - match sender.try_send_message(event) { - Ok(()) => {}, - Err(overseer::TrySendError::Full(event)) => { - let mut sender = sender.clone(); - delayed_queue.push(Box::pin(async move { - sender.send_message(event).await; - })); - }, - Err(overseer::TrySendError::Closed(_)) => { - panic!( - "NetworkBridgeRxSender is closed when trying to send event of type: {}", - std::any::type_name::() - ); - }, - } -} - async fn dispatch_validation_events_to_all( events: I, sender: &mut impl overseer::NetworkBridgeRxSenderTrait, - metrics: &Metrics, + _metrics: &Metrics, ) where I: IntoIterator>, I::IntoIter: Send, { - let delayed_messages: FuturesUnordered> = FuturesUnordered::new(); - - // Fast path for sending events to subsystems, if any subsystem's queue is full, we hold - // the slow path future in the `delayed_messages` queue. for event in events { - if let Ok(msg) = event.focus().map(StatementDistributionMessage::from) { - send_or_queue_validation_event(msg, sender, &delayed_messages); - } - if let Ok(msg) = event.focus().map(BitfieldDistributionMessage::from) { - send_or_queue_validation_event(msg, sender, &delayed_messages); - } - if let Ok(msg) = event.focus().map(ApprovalDistributionMessage::from) { - send_or_queue_validation_event(msg, sender, &delayed_messages); - } - if let Ok(msg) = event.focus().map(GossipSupportMessage::from) { - send_or_queue_validation_event(msg, sender, &delayed_messages); - } - } - - let delayed_messages_count = delayed_messages.len(); - metrics.on_delayed_rx_queue(delayed_messages_count); - - if delayed_messages_count > 0 { - // Here we wait for all the delayed messages to be sent. - let _timer = metrics.time_delayed_rx_events(); // Dropped after `await` is completed - let _: Vec<()> = delayed_messages.collect().await; + sender + .send_messages(event.focus().map(StatementDistributionMessage::from)) + .await; + sender.send_messages(event.focus().map(BitfieldDistributionMessage::from)).await; + sender.send_messages(event.focus().map(ApprovalDistributionMessage::from)).await; + sender.send_messages(event.focus().map(GossipSupportMessage::from)).await; } }