diff --git a/relays/lib-substrate-relay/src/parachains/target.rs b/relays/lib-substrate-relay/src/parachains/target.rs index fa159cdeb7e5..34a6a31311dd 100644 --- a/relays/lib-substrate-relay/src/parachains/target.rs +++ b/relays/lib-substrate-relay/src/parachains/target.rs @@ -34,7 +34,7 @@ use parachains_relay::{ use relay_substrate_client::{ AccountIdOf, AccountKeyPairOf, BlockNumberOf, Chain, Client, Error as SubstrateError, HashOf, HeaderIdOf, HeaderOf, RelayChain, SignParam, TransactionEra, TransactionSignScheme, - UnsignedTransaction, + TransactionTracker, UnsignedTransaction, }; use relay_utils::{relay_loop::Client as RelayClient, HeaderId}; use sp_core::{Bytes, Pair}; @@ -86,6 +86,8 @@ where P::TransactionSignScheme: TransactionSignScheme, AccountIdOf: From< as Pair>::Public>, { + type TransactionTracker = TransactionTracker; + async fn best_block(&self) -> Result, Self::Error> { let best_header = self.client.best_header().await?; let best_id = best_header.id(); @@ -172,7 +174,7 @@ where at_relay_block: HeaderIdOf, updated_parachains: Vec<(ParaId, ParaHash)>, proof: ParaHeadsProof, - ) -> Result<(), Self::Error> { + ) -> Result { let genesis_hash = *self.client.genesis_hash(); let transaction_params = self.transaction_params.clone(); let (spec_version, transaction_version) = self.client.simple_runtime_version().await?; @@ -182,7 +184,7 @@ where proof, ); self.client - .submit_signed_extrinsic( + .submit_and_watch_signed_extrinsic( self.transaction_params.signer.public().into(), SignParam:: { spec_version, @@ -196,6 +198,5 @@ where }, ) .await - .map(drop) } } diff --git a/relays/parachains/src/parachains_loop.rs b/relays/parachains/src/parachains_loop.rs index f44f986d6323..09e55740cef3 100644 --- a/relays/parachains/src/parachains_loop.rs +++ b/relays/parachains/src/parachains_loop.rs @@ -22,13 +22,21 @@ use bp_polkadot_core::{ parachains::{ParaHash, ParaHeadsProof, ParaId}, BlockNumber as RelayBlockNumber, }; -use futures::{future::FutureExt, select}; +use futures::{ + future::{FutureExt, Shared}, + poll, select, +}; use relay_substrate_client::{BlockNumberOf, Chain, HeaderIdOf}; -use relay_utils::{metrics::MetricsParams, relay_loop::Client as RelayClient, FailedClient}; +use relay_utils::{ + metrics::MetricsParams, relay_loop::Client as RelayClient, FailedClient, + TrackedTransactionStatus, TransactionTracker, +}; use std::{ collections::{BTreeMap, BTreeSet}, future::Future, - time::{Duration, Instant}, + pin::Pin, + task::Poll, + time::Duration, }; /// Parachain heads synchronization params. @@ -115,6 +123,9 @@ pub trait SourceClient: RelayClient { /// Target client used in parachain heads synchronization loop. #[async_trait] pub trait TargetClient: RelayClient { + /// Transaction tracker to track submitted transactions. + type TransactionTracker: TransactionTracker; + /// Get best block id. async fn best_block(&self) -> Result, Self::Error>; @@ -141,7 +152,7 @@ pub trait TargetClient: RelayClient { at_source_block: HeaderIdOf, updated_parachains: Vec<(ParaId, ParaHash)>, proof: ParaHeadsProof, - ) -> Result<(), Self::Error>; + ) -> Result; } /// Return prefix that will be used by default to expose Prometheus metrics of the parachains @@ -196,7 +207,7 @@ where P::TargetChain::AVERAGE_BLOCK_INTERVAL, ); - let mut tx_tracker: Option> = None; + let mut submitted_heads_tracker: Option> = None; futures::pin_mut!(exit_signal); @@ -246,9 +257,29 @@ where &sync_params.parachains, ) .await?; - tx_tracker = tx_tracker.take().and_then(|tx_tracker| tx_tracker.update(&heads_at_target)); - if tx_tracker.is_some() { - continue + + // check if our transaction has been mined + if let Some(tracker) = submitted_heads_tracker.take() { + match tracker.update(&heads_at_target).await { + SubmittedHeadsStatus::Waiting(tracker) => { + // no news about our transaction and we shall keep waiting + submitted_heads_tracker = Some(tracker); + continue + }, + SubmittedHeadsStatus::Final(TrackedTransactionStatus::Finalized) => { + // all heads have been updated, we don't need this tracker anymore + }, + SubmittedHeadsStatus::Final(TrackedTransactionStatus::Lost) => { + log::warn!( + target: "bridge", + "Parachains synchronization from {} to {} has stalled. Going to restart", + P::SourceChain::NAME, + P::TargetChain::NAME, + ); + + return Err(FailedClient::Both) + }, + } } // we have no active transaction and may need to update heads, but do we have something for @@ -317,7 +348,7 @@ where "Incorrect parachains SourceClient implementation" ); - target_client + let transaction_tracker = target_client .submit_parachain_heads_proof( best_finalized_relay_block, updated_ids.iter().cloned().zip(head_hashes).collect(), @@ -334,11 +365,10 @@ where ); FailedClient::Target })?; - - tx_tracker = Some(TransactionTracker::

::new( + submitted_heads_tracker = Some(SubmittedHeadsTracker::

::new( updated_ids, best_finalized_relay_block.0, - sync_params.stall_timeout, + transaction_tracker, )); } } @@ -494,19 +524,27 @@ async fn read_heads_at_target( Ok(para_best_head_hashes) } -/// Parachain heads transaction tracker. -struct TransactionTracker { +/// Submitted heads status. +enum SubmittedHeadsStatus { + /// Heads are not yet updated. + Waiting(SubmittedHeadsTracker

), + /// Heads transaction has either been finalized or lost (i.e. received its "final" status). + Final(TrackedTransactionStatus), +} + +/// Submitted parachain heads transaction. +struct SubmittedHeadsTracker { /// Ids of parachains which heads were updated in the tracked transaction. awaiting_update: BTreeSet, /// Number of relay chain block that has been used to craft parachain heads proof. relay_block_number: BlockNumberOf, - /// Transaction submit time. - submitted_at: Instant, - /// Transaction death time. - death_time: Instant, + /// Future that waits for submitted transaction finality or loss. + /// + /// It needs to be shared because of `poll` macro and our consuming `update` method. + transaction_tracker: Shared + Send>>>, } -impl TransactionTracker

+impl SubmittedHeadsTracker

where P::SourceChain: Chain, { @@ -514,22 +552,20 @@ where pub fn new( awaiting_update: impl IntoIterator, relay_block_number: BlockNumberOf, - stall_timeout: Duration, + transaction_tracker: impl TransactionTracker + 'static, ) -> Self { - let now = Instant::now(); - TransactionTracker { + SubmittedHeadsTracker { awaiting_update: awaiting_update.into_iter().collect(), relay_block_number, - submitted_at: now, - death_time: now + stall_timeout, + transaction_tracker: transaction_tracker.wait().fuse().boxed().shared(), } } - /// Returns `None` if all parachain heads have been updated or we consider our transaction dead. - pub fn update( + /// Returns `None` if all submitted parachain heads have been updated. + pub async fn update( mut self, heads_at_target: &BTreeMap>, - ) -> Option { + ) -> SubmittedHeadsStatus

{ // remove all pending heads that were synced for (para, best_para_head) in heads_at_target { if best_para_head @@ -554,23 +590,17 @@ where // if we have synced all required heads, we are done if self.awaiting_update.is_empty() { - return None + return SubmittedHeadsStatus::Final(TrackedTransactionStatus::Finalized) } - // if our transaction is dead now, we may start over again - let now = Instant::now(); - if now >= self.death_time { - log::warn!( - target: "bridge", - "Parachain heads update transaction {} has been lost: no updates for {}s", - P::TargetChain::NAME, - (now - self.submitted_at).as_secs(), - ); - - return None + // if underlying transaction tracker has reported that the transaction is lost, we may + // then restart our sync + let transaction_tracker = self.transaction_tracker.clone(); + if let Poll::Ready(TrackedTransactionStatus::Lost) = poll!(transaction_tracker) { + return SubmittedHeadsStatus::Final(TrackedTransactionStatus::Lost) } - Some(self) + SubmittedHeadsStatus::Waiting(self) } } @@ -613,6 +643,16 @@ mod tests { data: Arc>, } + #[derive(Clone, Debug)] + struct TestTransactionTracker(TrackedTransactionStatus); + + #[async_trait] + impl TransactionTracker for TestTransactionTracker { + async fn wait(self) -> TrackedTransactionStatus { + self.0 + } + } + #[derive(Clone, Debug)] struct TestClientData { source_sync_status: Result, @@ -711,6 +751,8 @@ mod tests { #[async_trait] impl TargetClient for TestClient { + type TransactionTracker = TestTransactionTracker; + async fn best_block(&self) -> Result, TestError> { self.data.lock().await.target_best_block.clone() } @@ -736,13 +778,14 @@ mod tests { _at_source_block: HeaderIdOf, _updated_parachains: Vec<(ParaId, ParaHash)>, _proof: ParaHeadsProof, - ) -> Result<(), Self::Error> { - self.data.lock().await.target_submit_result.clone()?; + ) -> Result { + let mut data = self.data.lock().await; + data.target_submit_result.clone()?; - if let Some(mut exit_signal_sender) = self.data.lock().await.exit_signal_sender.take() { + if let Some(mut exit_signal_sender) = data.exit_signal_sender.take() { exit_signal_sender.send(()).await.unwrap(); } - Ok(()) + Ok(TestTransactionTracker(TrackedTransactionStatus::Finalized)) } } @@ -891,27 +934,35 @@ mod tests { const PARA_1_ID: u32 = PARA_ID + 1; const SOURCE_BLOCK_NUMBER: u32 = 100; - fn test_tx_tracker() -> TransactionTracker { - TransactionTracker::new( + fn test_tx_tracker() -> SubmittedHeadsTracker { + SubmittedHeadsTracker::new( vec![ParaId(PARA_ID), ParaId(PARA_1_ID)], SOURCE_BLOCK_NUMBER, - Duration::from_secs(1), + TestTransactionTracker(TrackedTransactionStatus::Finalized), ) } - #[test] - fn tx_tracker_update_when_nothing_is_updated() { + impl From> for Option> { + fn from(status: SubmittedHeadsStatus) -> Option> { + match status { + SubmittedHeadsStatus::Waiting(tracker) => Some(tracker.awaiting_update), + _ => None, + } + } + } + + #[async_std::test] + async fn tx_tracker_update_when_nothing_is_updated() { assert_eq!( - test_tx_tracker() - .update(&vec![].into_iter().collect()) - .map(|t| t.awaiting_update), Some(test_tx_tracker().awaiting_update), + test_tx_tracker().update(&vec![].into_iter().collect()).await.into(), ); } - #[test] - fn tx_tracker_update_when_one_of_heads_is_updated_to_previous_value() { + #[async_std::test] + async fn tx_tracker_update_when_one_of_heads_is_updated_to_previous_value() { assert_eq!( + Some(test_tx_tracker().awaiting_update), test_tx_tracker() .update( &vec![( @@ -924,14 +975,15 @@ mod tests { .into_iter() .collect() ) - .map(|t| t.awaiting_update), - Some(test_tx_tracker().awaiting_update), + .await + .into(), ); } - #[test] - fn tx_tracker_update_when_one_of_heads_is_updated() { + #[async_std::test] + async fn tx_tracker_update_when_one_of_heads_is_updated() { assert_eq!( + Some(vec![ParaId(PARA_1_ID)].into_iter().collect::>()), test_tx_tracker() .update( &vec![( @@ -944,14 +996,15 @@ mod tests { .into_iter() .collect() ) - .map(|t| t.awaiting_update), - Some(vec![ParaId(PARA_1_ID)].into_iter().collect()), + .await + .into(), ); } - #[test] - fn tx_tracker_update_when_all_heads_are_updated() { + #[async_std::test] + async fn tx_tracker_update_when_all_heads_are_updated() { assert_eq!( + Option::>::None, test_tx_tracker() .update( &vec![ @@ -973,21 +1026,33 @@ mod tests { .into_iter() .collect() ) - .map(|t| t.awaiting_update), - None, + .await + .into(), ); } - #[test] - fn tx_tracker_update_when_tx_is_stalled() { + #[async_std::test] + async fn tx_tracker_update_when_tx_is_stalled() { let mut tx_tracker = test_tx_tracker(); - tx_tracker.death_time = Instant::now(); + tx_tracker.transaction_tracker = + futures::future::ready(TrackedTransactionStatus::Lost).boxed().shared(); assert_eq!( - tx_tracker.update(&vec![].into_iter().collect()).map(|t| t.awaiting_update), - None, + Option::>::None, + tx_tracker.update(&vec![].into_iter().collect()).await.into(), ); } + #[async_std::test] + async fn tx_tracker_update_when_tx_is_finalized() { + let mut tx_tracker = test_tx_tracker(); + tx_tracker.transaction_tracker = + futures::future::ready(TrackedTransactionStatus::Finalized).boxed().shared(); + assert!(matches!( + tx_tracker.update(&vec![].into_iter().collect()).await, + SubmittedHeadsStatus::Waiting(_), + )); + } + #[test] fn parachain_is_not_updated_if_it_is_unknown_to_both_clients() { assert_eq!(