Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bridge: add subcommand to relay messages range #4383

Merged
merged 5 commits into from
May 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 69 additions & 1 deletion bridges/relays/lib-substrate-relay/src/cli/relay_messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,12 @@ use async_trait::async_trait;
use sp_core::Pair;
use structopt::StructOpt;

use bp_messages::MessageNonce;
use bp_runtime::HeaderIdProvider;
use relay_substrate_client::{
AccountIdOf, AccountKeyPairOf, BalanceOf, ChainWithRuntimeVersion, ChainWithTransactions,
AccountIdOf, AccountKeyPairOf, BalanceOf, Chain, ChainWithRuntimeVersion, ChainWithTransactions,
};
use relay_utils::UniqueSaturatedInto;

/// Messages relaying params.
#[derive(StructOpt)]
Expand All @@ -48,6 +51,35 @@ pub struct RelayMessagesParams {
prometheus_params: PrometheusParams,
}

/// Messages range relaying params.
#[derive(StructOpt)]
pub struct RelayMessagesRangeParams {
/// Number of the source chain header that we will use to prepare a messages proof.
/// This header must be previously proved to the target chain.
#[structopt(long)]
at_source_block: u128,
/// Hex-encoded lane id that should be served by the relay. Defaults to `00000000`.
#[structopt(long, default_value = "00000000")]
lane: HexLaneId,
/// Nonce (inclusive) of the first message to relay.
#[structopt(long)]
messages_start: MessageNonce,
/// Nonce (inclusive) of the last message to relay.
#[structopt(long)]
messages_end: MessageNonce,
/// Whether the outbound lane state proof should be included into transaction.
#[structopt(long)]
outbound_state_proof_required: bool,
#[structopt(flatten)]
source: SourceConnectionParams,
#[structopt(flatten)]
source_sign: SourceSigningParams,
#[structopt(flatten)]
target: TargetConnectionParams,
#[structopt(flatten)]
target_sign: TargetSigningParams,
}

/// Trait used for relaying messages between 2 chains.
#[async_trait]
pub trait MessagesRelayer: MessagesCliBridge
Expand Down Expand Up @@ -86,4 +118,40 @@ where
.await
.map_err(|e| anyhow::format_err!("{}", e))
}

/// Relay a consequitive range of messages.
async fn relay_messages_range(data: RelayMessagesRangeParams) -> anyhow::Result<()> {
let source_client = data.source.into_client::<Self::Source>().await?;
let target_client = data.target.into_client::<Self::Target>().await?;
let source_sign = data.source_sign.to_keypair::<Self::Source>()?;
let source_transactions_mortality = data.source_sign.transactions_mortality()?;
let target_sign = data.target_sign.to_keypair::<Self::Target>()?;
let target_transactions_mortality = data.target_sign.transactions_mortality()?;

let at_source_block = source_client
.header_by_number(data.at_source_block.unique_saturated_into())
.await
.map_err(|e| {
log::trace!(
target: "bridge",
"Failed to read {} header with number {}: {e:?}",
Self::Source::NAME,
data.at_source_block,
);
anyhow::format_err!("The command has failed")
})?
.id();

crate::messages_lane::relay_messages_range::<Self::MessagesLane>(
source_client,
target_client,
TransactionParams { signer: source_sign, mortality: source_transactions_mortality },
TransactionParams { signer: target_sign, mortality: target_transactions_mortality },
at_source_block,
data.lane.into(),
data.messages_start..=data.messages_end,
data.outbound_state_proof_required,
)
.await
}
}
45 changes: 44 additions & 1 deletion bridges/relays/lib-substrate-relay/src/messages_lane.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ use relay_utils::{
};
use sp_core::Pair;
use sp_runtime::traits::Zero;
use std::{fmt::Debug, marker::PhantomData};
use std::{fmt::Debug, marker::PhantomData, ops::RangeInclusive};

/// Substrate -> Substrate messages synchronization pipeline.
pub trait SubstrateMessageLane: 'static + Clone + Debug + Send + Sync {
Expand Down Expand Up @@ -275,6 +275,49 @@ where
.map_err(Into::into)
}

/// Deliver range of Substrate-to-Substrate messages. No checks are made to ensure that transaction
/// will succeed.
pub async fn relay_messages_range<P: SubstrateMessageLane>(
source_client: Client<P::SourceChain>,
target_client: Client<P::TargetChain>,
source_transaction_params: TransactionParams<AccountKeyPairOf<P::SourceChain>>,
target_transaction_params: TransactionParams<AccountKeyPairOf<P::TargetChain>>,
at_source_block: HeaderIdOf<P::SourceChain>,
lane_id: LaneId,
range: RangeInclusive<MessageNonce>,
outbound_state_proof_required: bool,
) -> anyhow::Result<()>
where
AccountIdOf<P::SourceChain>: From<<AccountKeyPairOf<P::SourceChain> as Pair>::Public>,
AccountIdOf<P::TargetChain>: From<<AccountKeyPairOf<P::TargetChain> as Pair>::Public>,
BalanceOf<P::SourceChain>: TryFrom<BalanceOf<P::TargetChain>>,
{
let relayer_id_at_source: AccountIdOf<P::SourceChain> =
source_transaction_params.signer.public().into();
messages_relay::relay_messages_range(
SubstrateMessagesSource::<P>::new(
source_client.clone(),
target_client.clone(),
lane_id,
source_transaction_params,
None,
),
SubstrateMessagesTarget::<P>::new(
target_client,
source_client,
lane_id,
relayer_id_at_source,
target_transaction_params,
None,
),
at_source_block,
range,
outbound_state_proof_required,
)
.await
.map_err(|_| anyhow::format_err!("The command has failed"))
}

/// Different ways of building `receive_messages_proof` calls.
pub trait ReceiveMessagesProofCallBuilder<P: SubstrateMessageLane> {
/// Given messages proof, build call of `receive_messages_proof` function of bridge
Expand Down
2 changes: 2 additions & 0 deletions bridges/relays/messages/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,3 +35,5 @@ mod message_race_limits;
mod message_race_loop;
mod message_race_receiving;
mod message_race_strategy;

pub use message_race_delivery::relay_messages_range;
65 changes: 64 additions & 1 deletion bridges/relays/messages/src/message_race_delivery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use async_trait::async_trait;
use futures::stream::FusedStream;

use bp_messages::{MessageNonce, UnrewardedRelayersState, Weight};
use relay_utils::FailedClient;
use relay_utils::{FailedClient, TrackedTransactionStatus, TransactionTracker};

use crate::{
message_lane::{MessageLane, SourceHeaderIdOf, TargetHeaderIdOf},
Expand Down Expand Up @@ -77,6 +77,69 @@ pub async fn run<P: MessageLane>(
.await
}

/// Relay range of messages.
pub async fn relay_messages_range<P: MessageLane>(
source_client: impl MessageLaneSourceClient<P>,
target_client: impl MessageLaneTargetClient<P>,
at: SourceHeaderIdOf<P>,
range: RangeInclusive<MessageNonce>,
outbound_state_proof_required: bool,
) -> Result<(), ()> {
// compute cumulative dispatch weight of all messages in given range
let dispatch_weight = source_client
.generated_message_details(at.clone(), range.clone())
.await
.map_err(|e| {
log::error!(
target: "bridge",
"Failed to get generated message details at {:?} for messages {:?}: {:?}",
at,
range,
e,
);
})?
.values()
.fold(Weight::zero(), |total, details| total.saturating_add(details.dispatch_weight));
// prepare messages proof
let (at, range, proof) = source_client
.prove_messages(
at.clone(),
range.clone(),
MessageProofParameters { outbound_state_proof_required, dispatch_weight },
)
.await
.map_err(|e| {
log::error!(
target: "bridge",
"Failed to generate messages proof at {:?} for messages {:?}: {:?}",
at,
range,
e,
);
})?;
// submit messages proof to the target node
let tx_tracker = target_client
.submit_messages_proof(None, at, range.clone(), proof)
.await
.map_err(|e| {
log::error!(
target: "bridge",
"Failed to submit messages proof for messages {:?}: {:?}",
range,
e,
);
})?
.tx_tracker;

match tx_tracker.wait().await {
TrackedTransactionStatus::Finalized(_) => Ok(()),
TrackedTransactionStatus::Lost => {
log::error!("Transaction with messages {:?} is considered lost", range,);
Err(())
},
}
}

/// Message delivery race.
struct MessageDeliveryRace<P>(std::marker::PhantomData<P>);

Expand Down
Loading