Skip to content

Commit

Permalink
lp-gateway-queue: Ensure messages are processed in order
Browse files Browse the repository at this point in the history
  • Loading branch information
cdamian committed Sep 20, 2024
1 parent ef843b2 commit a619b37
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 13 deletions.
50 changes: 40 additions & 10 deletions pallets/liquidity-pools-gateway-queue/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ use parity_scale_codec::FullCodec;
use scale_info::TypeInfo;
use sp_arithmetic::traits::BaseArithmetic;
use sp_runtime::traits::{EnsureAddAssign, One};
use sp_std::vec::Vec;

#[cfg(test)]
mod mock;
Expand Down Expand Up @@ -61,6 +60,12 @@ pub mod pallet {
#[pallet::getter(fn message_nonce_store)]
pub type MessageNonceStore<T: Config> = StorageValue<_, T::MessageNonce, ValueQuery>;

/// Storage that is used for keeping track of the last nonce that was
/// processed.
#[pallet::storage]
#[pallet::getter(fn last_processed_nonce)]
pub type LastProcessedNonce<T: Config> = StorageValue<_, T::MessageNonce, ValueQuery>;

/// Storage for messages that will be processed during the `on_idle` hook.
#[pallet::storage]
#[pallet::getter(fn message_queue)]
Expand Down Expand Up @@ -93,6 +98,9 @@ pub mod pallet {
message: T::Message,
error: DispatchError,
},

/// Maximum number of messages was reached.
MaxNumberOfMessagesWasReached,
}

#[pallet::error]
Expand Down Expand Up @@ -200,17 +208,34 @@ pub mod pallet {
}

fn service_message_queue(max_weight: Weight) -> Weight {
let mut weight_used = Weight::zero();
let mut last_processed_nonce = LastProcessedNonce::<T>::get();

// 1 read for the last processed nonce
let mut weight_used = T::DbWeight::get().reads(1);

loop {
if let Err(_) = last_processed_nonce.ensure_add_assign(One::one()) {
Self::deposit_event(Event::<T>::MaxNumberOfMessagesWasReached);

break;
}

let mut nonces = MessageQueue::<T>::iter_keys().collect::<Vec<_>>();
nonces.sort();
// 1 read for the nonce
weight_used.saturating_accrue(T::DbWeight::get().reads(1));

for nonce in nonces {
let message =
MessageQueue::<T>::get(nonce).expect("valid nonce ensured by `iter_keys`");
if last_processed_nonce > MessageNonceStore::<T>::get() {
break;
}

// 1 read for the message
weight_used.saturating_accrue(T::DbWeight::get().reads(1));

let message = match MessageQueue::<T>::get(last_processed_nonce) {
Some(msg) => msg,
// No message found, we can stop.
None => break,
};

let remaining_weight = max_weight.saturating_sub(weight_used);
let next_weight = T::MessageProcessor::max_processing_weight(&message);

Expand All @@ -219,10 +244,13 @@ pub mod pallet {
break;
}

let weight = match Self::process_message_and_deposit_event(nonce, message.clone()) {
let weight = match Self::process_message_and_deposit_event(
last_processed_nonce,
message.clone(),
) {
(Ok(()), weight) => weight,
(Err(e), weight) => {
FailedMessageQueue::<T>::insert(nonce, (message, e));
FailedMessageQueue::<T>::insert(last_processed_nonce, (message, e));

// 1 write for the failed message
weight.saturating_add(T::DbWeight::get().writes(1))
Expand All @@ -231,10 +259,12 @@ pub mod pallet {

weight_used.saturating_accrue(weight);

MessageQueue::<T>::remove(nonce);
MessageQueue::<T>::remove(last_processed_nonce);

// 1 write for removing the message
weight_used.saturating_accrue(T::DbWeight::get().writes(1));

LastProcessedNonce::<T>::set(last_processed_nonce);
}

weight_used
Expand Down
6 changes: 3 additions & 3 deletions pallets/liquidity-pools-gateway-queue/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ mod on_idle {
#[test]
fn success_all() {
new_test_ext().execute_with(|| {
(1..=3).for_each(|i| MessageQueue::<Runtime>::insert(i as u64, i * 10));
(1..=3).for_each(|i| Queue::queue(i * 10).unwrap());

Processor::mock_max_processing_weight(|_| PROCESS_LIMIT_WEIGHT);
let handle = Processor::mock_process(|_| (Ok(()), PROCESS_WEIGHT));
Expand All @@ -226,7 +226,7 @@ mod on_idle {
#[test]
fn not_all_messages_fit_in_the_block() {
new_test_ext().execute_with(|| {
(1..=5).for_each(|i| MessageQueue::<Runtime>::insert(i as u64, i * 10));
(1..=5).for_each(|i| Queue::queue(i * 10).unwrap());

Processor::mock_max_processing_weight(|_| PROCESS_LIMIT_WEIGHT);
let handle = Processor::mock_process(|_| (Ok(()), PROCESS_WEIGHT));
Expand All @@ -251,7 +251,7 @@ mod on_idle {
#[test]
fn with_failed_messages() {
new_test_ext().execute_with(|| {
(1..=3).for_each(|i| MessageQueue::<Runtime>::insert(i as u64, i * 10));
(1..=3).for_each(|i| Queue::queue(i * 10).unwrap());

Processor::mock_max_processing_weight(|_| PROCESS_LIMIT_WEIGHT);
let handle = Processor::mock_process(|msg| match msg {
Expand Down

0 comments on commit a619b37

Please sign in to comment.