From 2563a91edbdebf0b8c5069b4a35ea9f438df5d9e Mon Sep 17 00:00:00 2001 From: Svyatoslav Nikolsky Date: Wed, 14 Sep 2022 17:30:57 +0300 Subject: [PATCH] TransactionInvalidationTracker (#1544) * TransactionInvalidationTracker * TransacitonInvalidationTracker -> TransactionTracker * change sign_transaction method * clippy and spelling * removed comment * more transactiontracker tests * stalls_when_transaction_tracker_returns_error * remove test code * remove "impl TransactionTracker for ()" * enum TrackedTransactionStatus * test TransactionTracker in on_transaction_status * do_wait --- .../src/cli/register_parachain.rs | 134 +++----- bridges/relays/client-substrate/src/client.rs | 36 +- bridges/relays/client-substrate/src/lib.rs | 2 + .../src/transaction_tracker.rs | 322 ++++++++++++++++++ bridges/relays/finality/src/finality_loop.rs | 79 +++-- .../finality/src/finality_loop_tests.rs | 60 +++- .../src/finality/target.rs | 9 +- bridges/relays/utils/src/lib.rs | 17 + 8 files changed, 514 insertions(+), 145 deletions(-) create mode 100644 bridges/relays/client-substrate/src/transaction_tracker.rs diff --git a/bridges/relays/bin-substrate/src/cli/register_parachain.rs b/bridges/relays/bin-substrate/src/cli/register_parachain.rs index 890436537795c..4febc4b04a8e1 100644 --- a/bridges/relays/bin-substrate/src/cli/register_parachain.rs +++ b/bridges/relays/bin-substrate/src/cli/register_parachain.rs @@ -26,10 +26,8 @@ use polkadot_runtime_common::{ paras_registrar::Call as ParaRegistrarCall, slots::Call as ParaSlotsCall, }; use polkadot_runtime_parachains::paras::ParaLifecycle; -use relay_substrate_client::{ - AccountIdOf, CallOf, Chain, Client, HashOf, SignParam, Subscription, TransactionStatusOf, - UnsignedTransaction, -}; +use relay_substrate_client::{AccountIdOf, CallOf, Chain, Client, SignParam, UnsignedTransaction}; +use relay_utils::{TrackedTransactionStatus, TransactionTracker}; use rialto_runtime::SudoCall; use sp_core::{ storage::{well_known_keys::CODE, StorageKey}, @@ -116,26 +114,30 @@ impl RegisterParachain { ParaRegistrarCall::reserve {}.into(); let reserve_parachain_signer = relay_sign.clone(); let (spec_version, transaction_version) = relay_client.simple_runtime_version().await?; - wait_until_transaction_is_finalized::( - relay_client - .submit_and_watch_signed_extrinsic( - relay_sudo_account.clone(), - SignParam:: { - spec_version, - transaction_version, - genesis_hash: relay_genesis_hash, - signer: reserve_parachain_signer, - }, - move |_, transaction_nonce| { - Ok(UnsignedTransaction::new( - reserve_parachain_id_call.into(), - transaction_nonce, - )) - }, - ) - .await?, - ) - .await?; + let reserve_result = relay_client + .submit_and_watch_signed_extrinsic( + relay_sudo_account.clone(), + SignParam:: { + spec_version, + transaction_version, + genesis_hash: relay_genesis_hash, + signer: reserve_parachain_signer, + }, + move |_, transaction_nonce| { + Ok(UnsignedTransaction::new( + reserve_parachain_id_call.into(), + transaction_nonce, + )) + }, + ) + .await? + .wait() + .await; + if reserve_result == TrackedTransactionStatus::Lost { + return Err(anyhow::format_err!( + "Failed to finalize `reserve-parachain-id` transaction" + )) + } log::info!(target: "bridge", "Reserved parachain id: {:?}", para_id); // step 2: register parathread @@ -161,26 +163,30 @@ impl RegisterParachain { } .into(); let register_parathread_signer = relay_sign.clone(); - wait_until_transaction_is_finalized::( - relay_client - .submit_and_watch_signed_extrinsic( - relay_sudo_account.clone(), - SignParam:: { - spec_version, - transaction_version, - genesis_hash: relay_genesis_hash, - signer: register_parathread_signer, - }, - move |_, transaction_nonce| { - Ok(UnsignedTransaction::new( - register_parathread_call.into(), - transaction_nonce, - )) - }, - ) - .await?, - ) - .await?; + let register_result = relay_client + .submit_and_watch_signed_extrinsic( + relay_sudo_account.clone(), + SignParam:: { + spec_version, + transaction_version, + genesis_hash: relay_genesis_hash, + signer: register_parathread_signer, + }, + move |_, transaction_nonce| { + Ok(UnsignedTransaction::new( + register_parathread_call.into(), + transaction_nonce, + )) + }, + ) + .await? + .wait() + .await; + if register_result == TrackedTransactionStatus::Lost { + return Err(anyhow::format_err!( + "Failed to finalize `register-parathread` transaction" + )) + } log::info!(target: "bridge", "Registered parachain: {:?}. Waiting for onboarding", para_id); // wait until parathread is onboarded @@ -256,46 +262,6 @@ impl RegisterParachain { } } -/// Wait until transaction is included into finalized block. -/// -/// Returns the hash of the finalized block with transaction. -pub(crate) async fn wait_until_transaction_is_finalized( - subscription: Subscription>, -) -> anyhow::Result> { - loop { - let transaction_status = subscription.next().await?; - match transaction_status { - Some(TransactionStatusOf::::FinalityTimeout(_)) | - Some(TransactionStatusOf::::Usurped(_)) | - Some(TransactionStatusOf::::Dropped) | - Some(TransactionStatusOf::::Invalid) | - None => - return Err(anyhow::format_err!( - "We've been waiting for finalization of {} transaction, but it now has the {:?} status", - C::NAME, - transaction_status, - )), - Some(TransactionStatusOf::::Finalized(block_hash)) => { - log::trace!( - target: "bridge", - "{} transaction has been finalized at block {}", - C::NAME, - block_hash, - ); - return Ok(block_hash) - }, - _ => { - log::trace!( - target: "bridge", - "Received intermediate status of {} transaction: {:?}", - C::NAME, - transaction_status, - ); - }, - } - } -} - /// Wait until parachain state is changed. async fn wait_para_state( relay_client: &Client, diff --git a/bridges/relays/client-substrate/src/client.rs b/bridges/relays/client-substrate/src/client.rs index b1f0eb85873da..ed327e167b064 100644 --- a/bridges/relays/client-substrate/src/client.rs +++ b/bridges/relays/client-substrate/src/client.rs @@ -23,8 +23,8 @@ use crate::{ SubstrateGrandpaClient, SubstrateStateClient, SubstrateSystemClient, SubstrateTransactionPaymentClient, }, - ConnectionParams, Error, HashOf, HeaderIdOf, Result, SignParam, TransactionSignScheme, - TransactionStatusOf, UnsignedTransaction, + transaction_stall_timeout, ConnectionParams, Error, HashOf, HeaderIdOf, Result, SignParam, + TransactionSignScheme, TransactionTracker, UnsignedTransaction, }; use async_std::sync::{Arc, Mutex}; @@ -40,7 +40,7 @@ use jsonrpsee::{ use num_traits::{Bounded, Zero}; use pallet_balances::AccountData; use pallet_transaction_payment::InclusionFee; -use relay_utils::relay_loop::RECONNECT_DELAY; +use relay_utils::{relay_loop::RECONNECT_DELAY, STALL_TIMEOUT}; use sp_core::{ storage::{StorageData, StorageKey}, Bytes, Hasher, @@ -58,7 +58,7 @@ const SUB_API_TXPOOL_VALIDATE_TRANSACTION: &str = "TaggedTransactionQueue_valida const MAX_SUBSCRIPTION_CAPACITY: usize = 4096; /// Opaque justifications subscription type. -pub struct Subscription(Mutex>>); +pub struct Subscription(pub(crate) Mutex>>); /// Opaque GRANDPA authorities set. pub type OpaqueGrandpaAuthoritiesSet = Vec; @@ -467,14 +467,20 @@ impl Client { prepare_extrinsic: impl FnOnce(HeaderIdOf, C::Index) -> Result> + Send + 'static, - ) -> Result>> { + ) -> Result> { let _guard = self.submit_signed_extrinsic_lock.lock().await; let transaction_nonce = self.next_account_index(extrinsic_signer).await?; let best_header = self.best_header().await?; let best_header_id = best_header.id(); - let subscription = self + let (sender, receiver) = futures::channel::mpsc::channel(MAX_SUBSCRIPTION_CAPACITY); + let (tracker, subscription) = self .jsonrpsee_execute(move |client| async move { let extrinsic = prepare_extrinsic(best_header_id, transaction_nonce)?; + let stall_timeout = transaction_stall_timeout( + extrinsic.era.mortality_period(), + C::AVERAGE_BLOCK_INTERVAL, + STALL_TIMEOUT, + ); let signed_extrinsic = S::sign_transaction(signing_data, extrinsic)?.encode(); let tx_hash = C::Hasher::hash(&signed_extrinsic); let subscription = SubstrateAuthorClient::::submit_and_watch_extrinsic( @@ -487,17 +493,21 @@ impl Client { e })?; log::trace!(target: "bridge", "Sent transaction to {} node: {:?}", C::NAME, tx_hash); - Ok(subscription) + let tracker = TransactionTracker::new( + stall_timeout, + tx_hash, + Subscription(Mutex::new(receiver)), + ); + Ok((tracker, subscription)) }) .await?; - let (sender, receiver) = futures::channel::mpsc::channel(MAX_SUBSCRIPTION_CAPACITY); self.tokio.spawn(Subscription::background_worker( C::NAME.into(), "extrinsic".into(), subscription, sender, )); - Ok(Subscription(Mutex::new(receiver))) + Ok(tracker) } /// Returns pending extrinsics from transaction pool. @@ -669,6 +679,14 @@ impl Client { } impl Subscription { + /// Consumes subscription and returns future statuses stream. + pub fn into_stream(self) -> impl futures::Stream { + futures::stream::unfold(self, |this| async { + let item = this.0.lock().await.next().await.unwrap_or(None); + item.map(|i| (i, this)) + }) + } + /// Return next item from the subscription. pub async fn next(&self) -> Result> { let mut receiver = self.0.lock().await; diff --git a/bridges/relays/client-substrate/src/lib.rs b/bridges/relays/client-substrate/src/lib.rs index 0234459f9d7d5..b9e489688d35d 100644 --- a/bridges/relays/client-substrate/src/lib.rs +++ b/bridges/relays/client-substrate/src/lib.rs @@ -23,6 +23,7 @@ mod client; mod error; mod rpc; mod sync_header; +mod transaction_tracker; pub mod guard; pub mod metrics; @@ -39,6 +40,7 @@ pub use crate::{ client::{ChainRuntimeVersion, Client, OpaqueGrandpaAuthoritiesSet, Subscription}, error::{Error, Result}, sync_header::SyncHeader, + transaction_tracker::TransactionTracker, }; pub use bp_runtime::{ AccountIdOf, AccountPublicOf, BalanceOf, BlockNumberOf, Chain as ChainBase, HashOf, HeaderOf, diff --git a/bridges/relays/client-substrate/src/transaction_tracker.rs b/bridges/relays/client-substrate/src/transaction_tracker.rs new file mode 100644 index 0000000000000..b85e859017f7e --- /dev/null +++ b/bridges/relays/client-substrate/src/transaction_tracker.rs @@ -0,0 +1,322 @@ +// Copyright 2019-2021 Parity Technologies (UK) Ltd. +// This file is part of Parity Bridges Common. + +// Parity Bridges Common is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity Bridges Common is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity Bridges Common. If not, see . + +//! Helper for tracking transaction invalidation events. + +use crate::{Chain, HashOf, Subscription, TransactionStatusOf}; + +use async_trait::async_trait; +use futures::{Stream, StreamExt}; +use relay_utils::TrackedTransactionStatus; +use std::time::Duration; + +/// Substrate transaction tracker implementation. +/// +/// Substrate node provides RPC API to submit and watch for transaction events. This way +/// we may know when transaction is included into block, finalized or rejected. There are +/// some edge cases, when we can't fully trust this mechanism - e.g. transaction may broadcasted +/// and then dropped out of node transaction pool (some other cases are also possible - node +/// restarts, connection lost, ...). Then we can't know for sure - what is currently happening +/// with our transaction. Is the transaction really lost? Is it still alive on the chain network? +/// +/// We have several options to handle such cases: +/// +/// 1) hope that the transaction is still alive and wait for its mining until it is spoiled; +/// +/// 2) assume that the transaction is lost and resubmit another transaction instantly; +/// +/// 3) wait for some time (if transaction is mortal - then until block where it dies; if it is +/// immortal - then for some time that we assume is long enough to mine it) and assume that +/// it is lost. +/// +/// This struct implements third option as it seems to be the most optimal. +pub struct TransactionTracker { + transaction_hash: HashOf, + stall_timeout: Duration, + subscription: Subscription>, +} + +impl TransactionTracker { + /// Create transaction tracker. + pub fn new( + stall_timeout: Duration, + transaction_hash: HashOf, + subscription: Subscription>, + ) -> Self { + Self { stall_timeout, transaction_hash, subscription } + } + + /// Wait for final transaction status and return it along with last known internal invalidation + /// status. + async fn do_wait(self) -> (TrackedTransactionStatus, InvalidationStatus) { + let invalidation_status = watch_transaction_status::( + self.transaction_hash, + self.subscription.into_stream(), + ) + .await; + match invalidation_status { + InvalidationStatus::Finalized => + (TrackedTransactionStatus::Finalized, invalidation_status), + InvalidationStatus::Invalid => (TrackedTransactionStatus::Lost, invalidation_status), + InvalidationStatus::Lost => { + async_std::task::sleep(self.stall_timeout).await; + // if someone is still watching for our transaction, then we're reporting + // an error here (which is treated as "transaction lost") + log::trace!( + target: "bridge", + "{} transaction {:?} is considered lost after timeout", + C::NAME, + self.transaction_hash, + ); + + (TrackedTransactionStatus::Lost, invalidation_status) + }, + } + } +} + +#[async_trait] +impl relay_utils::TransactionTracker for TransactionTracker { + async fn wait(self) -> TrackedTransactionStatus { + self.do_wait().await.0 + } +} + +/// Transaction invalidation status. +/// +/// Note that in places where the `TransactionTracker` is used, the finalization event will be +/// ignored - relay loops are detecting the mining/finalization using their own +/// techniques. That's why we're using `InvalidationStatus` here. +#[derive(Debug, PartialEq)] +enum InvalidationStatus { + /// Transaction has been included into block and finalized. + Finalized, + /// Transaction has been invalidated. + Invalid, + /// We have lost track of transaction status. + Lost, +} + +/// Watch for transaction status until transaction is finalized or we lose track of its status. +async fn watch_transaction_status>>( + transaction_hash: HashOf, + subscription: S, +) -> InvalidationStatus { + futures::pin_mut!(subscription); + + loop { + match subscription.next().await { + Some(TransactionStatusOf::::Finalized(block_hash)) => { + // the only "successful" outcome of this method is when the block with transaction + // has been finalized + log::trace!( + target: "bridge", + "{} transaction {:?} has been finalized at block: {:?}", + C::NAME, + transaction_hash, + block_hash, + ); + return InvalidationStatus::Finalized + }, + Some(TransactionStatusOf::::Invalid) => { + // if node says that the transaction is invalid, there are still chances that + // it is not actually invalid - e.g. if the block where transaction has been + // revalidated is retracted and transaction (at some other node pool) becomes + // valid again on other fork. But let's assume that the chances of this event + // are almost zero - there's a lot of things that must happen for this to be the + // case. + log::trace!( + target: "bridge", + "{} transaction {:?} has been invalidated", + C::NAME, + transaction_hash, + ); + return InvalidationStatus::Invalid + }, + Some(TransactionStatusOf::::Future) | + Some(TransactionStatusOf::::Ready) | + Some(TransactionStatusOf::::Broadcast(_)) => { + // nothing important (for us) has happened + }, + Some(TransactionStatusOf::::InBlock(block_hash)) => { + // TODO: read matching system event (ExtrinsicSuccess or ExtrinsicFailed), log it + // here and use it later (on finality) for reporting invalid transaction + // https://github.com/paritytech/parity-bridges-common/issues/1464 + log::trace!( + target: "bridge", + "{} transaction {:?} has been included in block: {:?}", + C::NAME, + transaction_hash, + block_hash, + ); + }, + Some(TransactionStatusOf::::Retracted(block_hash)) => { + log::trace!( + target: "bridge", + "{} transaction {:?} at block {:?} has been retracted", + C::NAME, + transaction_hash, + block_hash, + ); + }, + Some(TransactionStatusOf::::FinalityTimeout(block_hash)) => { + // finality is lagging? let's wait a bit more and report a stall + log::trace!( + target: "bridge", + "{} transaction {:?} block {:?} has not been finalized for too long", + C::NAME, + transaction_hash, + block_hash, + ); + return InvalidationStatus::Lost + }, + Some(TransactionStatusOf::::Usurped(new_transaction_hash)) => { + // this may be result of our transaction resubmitter work or some manual + // intervention. In both cases - let's start stall timeout, because the meaning + // of transaction may have changed + log::trace!( + target: "bridge", + "{} transaction {:?} has been usurped by new transaction: {:?}", + C::NAME, + transaction_hash, + new_transaction_hash, + ); + return InvalidationStatus::Lost + }, + Some(TransactionStatusOf::::Dropped) => { + // the transaction has been removed from the pool because of its limits. Let's wait + // a bit and report a stall + log::trace!( + target: "bridge", + "{} transaction {:?} has been dropped from the pool", + C::NAME, + transaction_hash, + ); + return InvalidationStatus::Lost + }, + None => { + // the status of transaction is unknown to us (the subscription has been closed?). + // Let's wait a bit and report a stall + return InvalidationStatus::Lost + }, + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::test_chain::TestChain; + use futures::{FutureExt, SinkExt}; + use sc_transaction_pool_api::TransactionStatus; + + async fn on_transaction_status( + status: TransactionStatus, HashOf>, + ) -> Option<(TrackedTransactionStatus, InvalidationStatus)> { + let (mut sender, receiver) = futures::channel::mpsc::channel(1); + let tx_tracker = TransactionTracker::::new( + Duration::from_secs(0), + Default::default(), + Subscription(async_std::sync::Mutex::new(receiver)), + ); + + sender.send(Some(status)).await.unwrap(); + tx_tracker.do_wait().now_or_never() + } + + #[async_std::test] + async fn returns_finalized_on_finalized() { + assert_eq!( + on_transaction_status(TransactionStatus::Finalized(Default::default())).await, + Some((TrackedTransactionStatus::Finalized, InvalidationStatus::Finalized)), + ); + } + + #[async_std::test] + async fn returns_invalid_on_invalid() { + assert_eq!( + on_transaction_status(TransactionStatus::Invalid).await, + Some((TrackedTransactionStatus::Lost, InvalidationStatus::Invalid)), + ); + } + + #[async_std::test] + async fn waits_on_future() { + assert_eq!(on_transaction_status(TransactionStatus::Future).await, None,); + } + + #[async_std::test] + async fn waits_on_ready() { + assert_eq!(on_transaction_status(TransactionStatus::Ready).await, None,); + } + + #[async_std::test] + async fn waits_on_broadcast() { + assert_eq!( + on_transaction_status(TransactionStatus::Broadcast(Default::default())).await, + None, + ); + } + + #[async_std::test] + async fn waits_on_in_block() { + assert_eq!( + on_transaction_status(TransactionStatus::InBlock(Default::default())).await, + None, + ); + } + + #[async_std::test] + async fn waits_on_retracted() { + assert_eq!( + on_transaction_status(TransactionStatus::Retracted(Default::default())).await, + None, + ); + } + + #[async_std::test] + async fn lost_on_finality_timeout() { + assert_eq!( + on_transaction_status(TransactionStatus::FinalityTimeout(Default::default())).await, + Some((TrackedTransactionStatus::Lost, InvalidationStatus::Lost)), + ); + } + + #[async_std::test] + async fn lost_on_usurped() { + assert_eq!( + on_transaction_status(TransactionStatus::Usurped(Default::default())).await, + Some((TrackedTransactionStatus::Lost, InvalidationStatus::Lost)), + ); + } + + #[async_std::test] + async fn lost_on_dropped() { + assert_eq!( + on_transaction_status(TransactionStatus::Dropped).await, + Some((TrackedTransactionStatus::Lost, InvalidationStatus::Lost)), + ); + } + + #[async_std::test] + async fn lost_on_subscription_error() { + assert_eq!( + watch_transaction_status::(Default::default(), futures::stream::iter([])) + .now_or_never(), + Some(InvalidationStatus::Lost), + ); + } +} diff --git a/bridges/relays/finality/src/finality_loop.rs b/bridges/relays/finality/src/finality_loop.rs index c29a5d5fec21f..951edfdde9485 100644 --- a/bridges/relays/finality/src/finality_loop.rs +++ b/bridges/relays/finality/src/finality_loop.rs @@ -29,7 +29,7 @@ use futures::{select, Future, FutureExt, Stream, StreamExt}; use num_traits::{One, Saturating}; use relay_utils::{ metrics::MetricsParams, relay_loop::Client as RelayClient, retry_backoff, FailedClient, - HeaderId, MaybeConnectionError, + HeaderId, MaybeConnectionError, TrackedTransactionStatus, TransactionTracker, }; use std::{ pin::Pin, @@ -86,6 +86,9 @@ pub trait SourceClient: RelayClient { /// Target client used in finality synchronization loop. #[async_trait] pub trait TargetClient: RelayClient { + /// Transaction tracker to track submitted transactions. + type TransactionTracker: TransactionTracker; + /// Get best finalized source block number. async fn best_finalized_source_block_id( &self, @@ -96,7 +99,7 @@ pub trait TargetClient: RelayClient { &self, header: P::Header, proof: P::FinalityProof, - ) -> Result<(), Self::Error>; + ) -> Result; } /// Return prefix that will be used by default to expose Prometheus metrics of the finality proofs @@ -153,8 +156,6 @@ pub(crate) enum Error { Target(TargetError), /// Finality proof for mandatory header is missing from the source node. MissingMandatoryFinalityProof(P::Number), - /// The synchronization has stalled. - Stalled, } impl Error @@ -167,7 +168,6 @@ where match *self { Error::Source(ref error) if error.is_connection_error() => Err(FailedClient::Source), Error::Target(ref error) if error.is_connection_error() => Err(FailedClient::Target), - Error::Stalled => Err(FailedClient::Both), _ => Ok(()), } } @@ -175,9 +175,9 @@ where /// Information about transaction that we have submitted. #[derive(Debug, Clone)] -pub(crate) struct Transaction { - /// Time when we have submitted this transaction. - pub time: Instant, +pub(crate) struct Transaction { + /// Submitted transaction tracker. + pub tracker: Tracker, /// The number of the header we have submitted. pub submitted_header_number: Number, } @@ -206,11 +206,12 @@ pub(crate) struct FinalityLoopState<'a, P: FinalitySyncPipeline, FinalityProofsS &'a mut RestartableFinalityProofsStream, /// Recent finality proofs that we have read from the stream. pub(crate) recent_finality_proofs: &'a mut FinalityProofs

, - /// Last transaction that we have submitted to the target node. - pub(crate) last_transaction: Option>, + /// Number of the last header, submitted to the target node. + pub(crate) submitted_header_number: Option, } -async fn run_until_connection_lost( +/// Run finality relay loop until connection to one of nodes is lost. +pub(crate) async fn run_until_connection_lost( source_client: impl SourceClient

, target_client: impl TargetClient

, sync_params: FinalitySyncParams, @@ -230,8 +231,9 @@ async fn run_until_connection_lost( }) }; + let last_transaction_tracker = futures::future::Fuse::terminated(); let exit_signal = exit_signal.fuse(); - futures::pin_mut!(exit_signal); + futures::pin_mut!(last_transaction_tracker, exit_signal); let mut finality_proofs_stream = RestartableFinalityProofsStream { needs_restart: false, @@ -241,7 +243,7 @@ async fn run_until_connection_lost( let mut progress = (Instant::now(), None); let mut retry_backoff = retry_backoff(); - let mut last_transaction = None; + let mut last_submitted_header_number = None; loop { // run loop iteration @@ -252,7 +254,7 @@ async fn run_until_connection_lost( progress: &mut progress, finality_proofs_stream: &mut finality_proofs_stream, recent_finality_proofs: &mut recent_finality_proofs, - last_transaction: last_transaction.clone(), + submitted_header_number: last_submitted_header_number, }, &sync_params, &metrics_sync, @@ -261,8 +263,14 @@ async fn run_until_connection_lost( // deal with errors let next_tick = match iteration_result { - Ok(updated_last_transaction) => { - last_transaction = updated_last_transaction; + Ok(Some(updated_last_transaction)) => { + last_transaction_tracker.set(updated_last_transaction.tracker.wait().fuse()); + last_submitted_header_number = + Some(updated_last_transaction.submitted_header_number); + retry_backoff.reset(); + sync_params.tick + }, + Ok(None) => { retry_backoff.reset(); sync_params.tick }, @@ -281,6 +289,18 @@ async fn run_until_connection_lost( // wait till exit signal, or new source block select! { + transaction_status = last_transaction_tracker => { + if transaction_status == TrackedTransactionStatus::Lost { + log::error!( + target: "bridge", + "Finality synchronization from {} to {} has stalled. Going to restart", + P::SOURCE_NAME, + P::TARGET_NAME, + ); + + return Err(FailedClient::Both); + } + }, _ = async_std::task::sleep(next_tick).fuse() => {}, _ = exit_signal => return Ok(()), } @@ -293,7 +313,7 @@ pub(crate) async fn run_loop_iteration( state: FinalityLoopState<'_, P, SC::FinalityProofsStream>, sync_params: &FinalitySyncParams, metrics_sync: &Option, -) -> Result>, Error> +) -> Result>, Error> where P: FinalitySyncPipeline, SC: SourceClient

, @@ -333,20 +353,11 @@ where // if we have already submitted header, then we just need to wait for it // if we're waiting too much, then we believe our transaction has been lost and restart sync - if let Some(last_transaction) = state.last_transaction { - if best_number_at_target >= last_transaction.submitted_header_number { + if let Some(submitted_header_number) = state.submitted_header_number { + if best_number_at_target >= submitted_header_number { // transaction has been mined && we can continue - } else if last_transaction.time.elapsed() > sync_params.stall_timeout { - log::error!( - target: "bridge", - "Finality synchronization from {} to {} has stalled. Going to restart", - P::SOURCE_NAME, - P::TARGET_NAME, - ); - - return Err(Error::Stalled) } else { - return Ok(Some(last_transaction)) + return Ok(None) } } @@ -363,22 +374,20 @@ where .await? { Some((header, justification)) => { - let new_transaction = - Transaction { time: Instant::now(), submitted_header_number: header.number() }; - + let submitted_header_number = header.number(); log::debug!( target: "bridge", "Going to submit finality proof of {} header #{:?} to {}", P::SOURCE_NAME, - new_transaction.submitted_header_number, + submitted_header_number, P::TARGET_NAME, ); - target_client + let tracker = target_client .submit_finality_proof(header, justification) .await .map_err(Error::Target)?; - Ok(Some(new_transaction)) + Ok(Some(Transaction { tracker, submitted_header_number })) }, None => Ok(None), } diff --git a/bridges/relays/finality/src/finality_loop_tests.rs b/bridges/relays/finality/src/finality_loop_tests.rs index b7f7bc80029fa..7144ccb0c4810 100644 --- a/bridges/relays/finality/src/finality_loop_tests.rs +++ b/bridges/relays/finality/src/finality_loop_tests.rs @@ -20,10 +20,10 @@ use crate::{ finality_loop::{ - prune_recent_finality_proofs, read_finality_proofs_from_stream, run, run_loop_iteration, - select_better_recent_finality_proof, select_header_to_submit, FinalityLoopState, - FinalityProofs, FinalitySyncParams, RestartableFinalityProofsStream, SourceClient, - TargetClient, + prune_recent_finality_proofs, read_finality_proofs_from_stream, run_loop_iteration, + run_until_connection_lost, select_better_recent_finality_proof, select_header_to_submit, + FinalityLoopState, FinalityProofs, FinalitySyncParams, RestartableFinalityProofsStream, + SourceClient, TargetClient, }, sync_loop_metrics::SyncLoopMetrics, FinalityProof, FinalitySyncPipeline, SourceHeader, @@ -33,7 +33,8 @@ use async_trait::async_trait; use futures::{FutureExt, Stream, StreamExt}; use parking_lot::Mutex; use relay_utils::{ - metrics::MetricsParams, relay_loop::Client as RelayClient, HeaderId, MaybeConnectionError, + relay_loop::Client as RelayClient, FailedClient, HeaderId, MaybeConnectionError, + TrackedTransactionStatus, TransactionTracker, }; use std::{ collections::HashMap, @@ -46,6 +47,22 @@ type IsMandatory = bool; type TestNumber = u64; type TestHash = u64; +#[derive(Clone, Debug)] +struct TestTransactionTracker(TrackedTransactionStatus); + +impl Default for TestTransactionTracker { + fn default() -> TestTransactionTracker { + TestTransactionTracker(TrackedTransactionStatus::Finalized) + } +} + +#[async_trait] +impl TransactionTracker for TestTransactionTracker { + async fn wait(self) -> TrackedTransactionStatus { + self.0 + } +} + #[derive(Debug, Clone)] enum TestError { NonConnection, @@ -104,6 +121,7 @@ struct ClientsData { target_best_block_id: HeaderId, target_headers: Vec<(TestSourceHeader, TestFinalityProof)>, + target_transaction_tracker: TestTransactionTracker, } #[derive(Clone)] @@ -164,6 +182,8 @@ impl RelayClient for TestTargetClient { #[async_trait] impl TargetClient for TestTargetClient { + type TransactionTracker = TestTransactionTracker; + async fn best_finalized_source_block_id( &self, ) -> Result, TestError> { @@ -176,12 +196,13 @@ impl TargetClient for TestTargetClient { &self, header: TestSourceHeader, proof: TestFinalityProof, - ) -> Result<(), TestError> { + ) -> Result { let mut data = self.data.lock(); (self.on_method_call)(&mut data); data.target_best_block_id = HeaderId(header.number(), header.hash()); data.target_headers.push((header, proof)); - Ok(()) + (self.on_method_call)(&mut data); + Ok(data.target_transaction_tracker.clone()) } } @@ -203,6 +224,7 @@ fn prepare_test_clients( target_best_block_id: HeaderId(5, 5), target_headers: vec![], + target_transaction_tracker: TestTransactionTracker(TrackedTransactionStatus::Finalized), })); ( TestSourceClient { @@ -224,7 +246,7 @@ fn test_sync_params() -> FinalitySyncParams { fn run_sync_loop( state_function: impl Fn(&mut ClientsData) -> bool + Send + Sync + 'static, -) -> ClientsData { +) -> (ClientsData, Result<(), FailedClient>) { let (exit_sender, exit_receiver) = futures::channel::mpsc::unbounded(); let (source_client, target_client) = prepare_test_clients( exit_sender, @@ -243,21 +265,21 @@ fn run_sync_loop( let sync_params = test_sync_params(); let clients_data = source_client.data.clone(); - let _ = async_std::task::block_on(run( + let result = async_std::task::block_on(run_until_connection_lost( source_client, target_client, sync_params, - MetricsParams::disabled(), + None, exit_receiver.into_future().map(|(_, _)| ()), )); let clients_data = clients_data.lock().clone(); - clients_data + (clients_data, result) } #[test] fn finality_sync_loop_works() { - let client_data = run_sync_loop(|data| { + let (client_data, result) = run_sync_loop(|data| { // header#7 has persistent finality proof, but it isn't mandatory => it isn't submitted, // because header#8 has persistent finality proof && it is mandatory => it is submitted // header#9 has persistent finality proof, but it isn't mandatory => it is submitted, @@ -286,6 +308,7 @@ fn finality_sync_loop_works() { data.target_best_block_id.0 == 16 }); + assert_eq!(result, Ok(())); assert_eq!( client_data.target_headers, vec![ @@ -538,7 +561,7 @@ fn different_forks_at_source_and_at_target_are_detected() { progress: &mut progress, finality_proofs_stream: &mut finality_proofs_stream, recent_finality_proofs: &mut recent_finality_proofs, - last_transaction: None, + submitted_header_number: None, }, &test_sync_params(), &Some(metrics_sync.clone()), @@ -547,3 +570,14 @@ fn different_forks_at_source_and_at_target_are_detected() { assert!(!metrics_sync.is_using_same_fork()); } + +#[test] +fn stalls_when_transaction_tracker_returns_error() { + let (_, result) = run_sync_loop(|data| { + data.target_transaction_tracker = TestTransactionTracker(TrackedTransactionStatus::Lost); + data.target_best_block_id = HeaderId(5, 5); + data.target_best_block_id.0 == 16 + }); + + assert_eq!(result, Err(FailedClient::Both)); +} diff --git a/bridges/relays/lib-substrate-relay/src/finality/target.rs b/bridges/relays/lib-substrate-relay/src/finality/target.rs index 351f21cec80a9..7bdb77d4ee05e 100644 --- a/bridges/relays/lib-substrate-relay/src/finality/target.rs +++ b/bridges/relays/lib-substrate-relay/src/finality/target.rs @@ -28,7 +28,7 @@ use async_trait::async_trait; use finality_relay::TargetClient; use relay_substrate_client::{ AccountIdOf, AccountKeyPairOf, Chain, Client, Error, HeaderIdOf, HeaderOf, SignParam, - SyncHeader, TransactionEra, TransactionSignScheme, UnsignedTransaction, + SyncHeader, TransactionEra, TransactionSignScheme, TransactionTracker, UnsignedTransaction, }; use relay_utils::relay_loop::Client as RelayClient; use sp_core::Pair; @@ -89,6 +89,8 @@ where AccountIdOf: From< as Pair>::Public>, P::TransactionSignScheme: TransactionSignScheme, { + type TransactionTracker = TransactionTracker; + async fn best_finalized_source_block_id(&self) -> Result, Error> { // we can't continue to relay finality if target node is out of sync, because // it may have already received (some of) headers that we're going to relay @@ -109,14 +111,14 @@ where &self, header: SyncHeader>, proof: SubstrateFinalityProof

, - ) -> Result<(), Error> { + ) -> Result { let genesis_hash = *self.client.genesis_hash(); let transaction_params = self.transaction_params.clone(); let call = P::SubmitFinalityProofCallBuilder::build_submit_finality_proof_call(header, proof); let (spec_version, transaction_version) = self.client.simple_runtime_version().await?; self.client - .submit_signed_extrinsic( + .submit_and_watch_signed_extrinsic( self.transaction_params.signer.public().into(), SignParam:: { spec_version, @@ -130,6 +132,5 @@ where }, ) .await - .map(drop) } } diff --git a/bridges/relays/utils/src/lib.rs b/bridges/relays/utils/src/lib.rs index 8e8870ac188f6..dbc8e5df8218b 100644 --- a/bridges/relays/utils/src/lib.rs +++ b/bridges/relays/utils/src/lib.rs @@ -20,6 +20,7 @@ pub use bp_runtime::HeaderId; pub use error::Error; pub use relay_loop::{relay_loop, relay_metrics}; +use async_trait::async_trait; use backoff::{backoff::Backoff, ExponentialBackoff}; use futures::future::FutureExt; use std::time::Duration; @@ -119,6 +120,22 @@ pub trait MaybeConnectionError { fn is_connection_error(&self) -> bool; } +/// Final status of the tracked transaction. +#[derive(Debug, Clone, Copy, PartialEq)] +pub enum TrackedTransactionStatus { + /// Transaction has been lost. + Lost, + /// Transaction has been mined and finalized. + Finalized, +} + +/// Transaction tracker. +#[async_trait] +pub trait TransactionTracker: Send { + /// Wait until transaction is either finalized or invalidated/lost. + async fn wait(self) -> TrackedTransactionStatus; +} + /// Stringified error that may be either connection-related or not. #[derive(Error, Debug)] pub enum StringifiedMaybeConnectionError {