Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add bulletproof rewind profiling #3618

Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 9 additions & 6 deletions applications/tari_console_wallet/src/init/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,10 @@ pub async fn init_wallet(
);

let node_address = match wallet_db.get_node_address().await? {
None => config.public_address.clone().unwrap_or_else(Multiaddr::empty),
None => match config.public_address.clone() {
Some(val) => val,
None => Multiaddr::empty(),
},
Some(a) => a,
};

Expand Down Expand Up @@ -464,12 +467,12 @@ pub async fn init_wallet(
},
};
}
if let Some(file_name) = seed_words_file_name {
let seed_words = wallet.output_manager_service.get_seed_words().await?.join(" ");
let _ = fs::write(file_name, seed_words)
.map_err(|e| ExitCodes::WalletError(format!("Problem writing seed words to file: {}", e)));
};
}
if let Some(file_name) = seed_words_file_name {
let seed_words = wallet.output_manager_service.get_seed_words().await?.join(" ");
let _ = fs::write(file_name, seed_words)
.map_err(|e| ExitCodes::WalletError(format!("Problem writing seed words to file: {}", e)));
};

Ok(wallet)
}
Expand Down
13 changes: 9 additions & 4 deletions applications/tari_console_wallet/src/recovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,11 @@ pub fn get_seed_from_seed_words(seed_words: Vec<String>) -> Result<CipherSeed, E
/// Recovers wallet funds by connecting to a given base node peer, downloading the transaction outputs stored in the
/// blockchain, and attempting to rewind them. Any outputs that are successfully rewound are then imported into the
/// wallet.
pub async fn wallet_recovery(wallet: &WalletSqlite, base_node_config: &PeerConfig) -> Result<(), ExitCodes> {
pub async fn wallet_recovery(
wallet: &WalletSqlite,
base_node_config: &PeerConfig,
retry_limit: usize,
) -> Result<(), ExitCodes> {
println!("\nPress Ctrl-C to stop the recovery process\n");
// We dont care about the shutdown signal here, so we just create one
let shutdown = Shutdown::new();
Expand All @@ -105,7 +109,8 @@ pub async fn wallet_recovery(wallet: &WalletSqlite, base_node_config: &PeerConfi

let mut recovery_task = UtxoScannerService::<WalletSqliteDatabase>::builder()
.with_peers(peer_public_keys)
.with_retry_limit(3)
// Do not make this a small number as wallet recovery needs to be resilient
.with_retry_limit(retry_limit)
.build_with_wallet(wallet, shutdown_signal);

let mut event_stream = recovery_task.get_event_receiver();
Expand All @@ -122,8 +127,8 @@ pub async fn wallet_recovery(wallet: &WalletSqlite, base_node_config: &PeerConfi
println!("OK (latency = {:.2?})", latency);
},
Ok(UtxoScannerEvent::Progress {
current_block: current,
current_chain_height: total,
current_index: current,
total_index: total,
}) => {
let percentage_progress = ((current as f32) * 100f32 / (total as f32)).round() as u32;
debug!(
Expand Down
6 changes: 5 additions & 1 deletion applications/tari_console_wallet/src/wallet_modes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,11 @@ pub fn recovery_mode(config: WalletModeConfig, wallet: WalletSqlite) -> Result<(
println!("{}", CUCUMBER_TEST_MARKER_A);

println!("Starting recovery...");
match handle.block_on(wallet_recovery(&wallet, &base_node_config)) {
match handle.block_on(wallet_recovery(
&wallet,
&base_node_config,
config.global_config.wallet_recovery_retry_limit,
)) {
Ok(_) => println!("Wallet recovered!"),
Err(e) => {
error!(target: LOG_TARGET, "Recovery failed: {}", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

use std::sync::Arc;
use std::{sync::Arc, time::Instant};

use log::*;
use rand::rngs::OsRng;
Expand Down Expand Up @@ -73,6 +73,8 @@ where TBackend: OutputManagerBackend + 'static
&mut self,
outputs: Vec<TransactionOutput>,
) -> Result<Vec<UnblindedOutput>, OutputManagerError> {
let start = Instant::now();
let outputs_length = outputs.len();
let mut rewound_outputs: Vec<UnblindedOutput> = outputs
.into_iter()
.filter_map(|output| {
Expand Down Expand Up @@ -114,6 +116,13 @@ where TBackend: OutputManagerBackend + 'static
},
)
.collect();
let rewind_time = start.elapsed();
trace!(
target: LOG_TARGET,
"bulletproof rewind profile - rewound {} outputs in {} ms",
outputs_length,
rewind_time.as_millis(),
);

for output in rewound_outputs.iter_mut() {
self.update_outputs_script_private_key_and_update_key_manager_index(output)
Expand All @@ -136,10 +145,7 @@ where TBackend: OutputManagerBackend + 'static
trace!(
target: LOG_TARGET,
"Output {} with value {} with {} recovered",
output
.as_transaction_input(&self.factories.commitment)?
.commitment
.to_hex(),
output_hex,
output.value,
output.features,
);
Expand Down
4 changes: 2 additions & 2 deletions base_layer/wallet/src/utxo_scanner_service/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ pub enum UtxoScannerEvent {
},
/// Progress of the recovery process (current_block, current_chain_height)
Progress {
current_block: u64,
current_chain_height: u64,
current_index: u64,
total_index: u64,
},
/// Completed Recovery (Number scanned, Num of Recovered outputs, Value of recovered outputs, Time taken)
Completed {
Expand Down
72 changes: 55 additions & 17 deletions base_layer/wallet/src/utxo_scanner_service/utxo_scanner_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ use tari_core::{
},
};
use tari_shutdown::ShutdownSignal;
use tokio::sync::broadcast;
use tokio::{sync::broadcast, time};

use crate::{
error::WalletError,
Expand Down Expand Up @@ -89,8 +89,8 @@ where TBackend: WalletBackend + 'static
) -> Result<(), UtxoScannerError> {
let metadata = self.get_metadata().await?.unwrap_or_default();
self.publish_event(UtxoScannerEvent::Progress {
current_block: final_utxo_pos,
current_chain_height: final_utxo_pos,
current_index: final_utxo_pos,
total_index: final_utxo_pos,
});
self.publish_event(UtxoScannerEvent::Completed {
number_scanned: total_scanned,
Expand All @@ -116,11 +116,20 @@ where TBackend: WalletBackend + 'static
Ok(conn) => Ok(conn),
Err(e) => {
self.publish_event(UtxoScannerEvent::ConnectionFailedToBaseNode {
peer,
peer: peer.clone(),
num_retries: self.num_retries,
retry_limit: self.retry_limit,
error: e.to_string(),
});
// No use re-dialing a peer that is not responsive for recovery mode
if self.mode == UtxoScannerMode::Recovery {
if let Ok(Some(connection)) = self.resources.comms_connectivity.get_connection(peer.clone()).await {
if connection.clone().disconnect().await.is_ok() {
debug!(target: LOG_TARGET, "Disconnected base node peer {}", peer);
}
};
let _ = time::sleep(Duration::from_secs(30));
}

Err(e.into())
},
Expand Down Expand Up @@ -243,14 +252,14 @@ where TBackend: WalletBackend + 'static
);

let end_header_hash = end_header.hash();
let end_header_size = end_header.output_mmr_size;
let output_mmr_size = end_header.output_mmr_size;
let mut num_recovered = 0u64;
let mut total_amount = MicroTari::from(0);
let mut total_scanned = 0;

self.publish_event(UtxoScannerEvent::Progress {
current_block: start_mmr_leaf_index,
current_chain_height: (end_header_size - 1),
current_index: start_mmr_leaf_index,
total_index: (output_mmr_size - 1),
});
let request = SyncUtxosRequest {
start: start_mmr_leaf_index,
Expand All @@ -259,13 +268,28 @@ where TBackend: WalletBackend + 'static
include_deleted_bitmaps: false,
};

let start = Instant::now();
let utxo_stream = client.sync_utxos(request).await?;
// We download in chunks just because rewind_outputs works with multiple outputs (and could parallelized
// rewinding)
let mut utxo_stream = utxo_stream.chunks(10);
trace!(
target: LOG_TARGET,
"bulletproof rewind profile - UTXO stream request time {} ms",
start.elapsed().as_millis(),
);

// We download in chunks for improved streaming efficiency
const CHUNK_SIZE: usize = 125;
let mut utxo_stream = utxo_stream.chunks(CHUNK_SIZE);
const COMMIT_EVERY_N: u64 = (1000_i64 / CHUNK_SIZE as i64) as u64;
let mut last_utxo_index = 0u64;
let mut iteration_count = 0u64;
while let Some(response) = utxo_stream.next().await {
let mut utxo_next_await_profiling = Vec::new();
let mut scan_for_outputs_profiling = Vec::new();
while let Some(response) = {
let start = Instant::now();
let utxo_stream_next = utxo_stream.next().await;
utxo_next_await_profiling.push(start.elapsed());
utxo_stream_next
} {
if self.shutdown_signal.is_triggered() {
// if running is set to false, we know its been canceled upstream so lets exit the loop
return Ok(total_scanned as u64);
Expand All @@ -274,14 +298,16 @@ where TBackend: WalletBackend + 'static
last_utxo_index = utxo_index;
total_scanned += outputs.len();
iteration_count += 1;

let start = Instant::now();
let found_outputs = self.scan_for_outputs(outputs).await?;
scan_for_outputs_profiling.push(start.elapsed());

// Reduce the number of db hits by only persisting progress every N iterations
const COMMIT_EVERY_N: u64 = 100;
if iteration_count % COMMIT_EVERY_N == 0 || last_utxo_index >= end_header_size - 1 {
if iteration_count % COMMIT_EVERY_N == 0 || last_utxo_index >= output_mmr_size - 1 {
self.publish_event(UtxoScannerEvent::Progress {
current_block: last_utxo_index,
current_chain_height: (end_header_size - 1),
current_index: last_utxo_index,
total_index: (output_mmr_size - 1),
});
self.update_scanning_progress_in_db(
last_utxo_index,
Expand All @@ -295,11 +321,23 @@ where TBackend: WalletBackend + 'static
num_recovered = num_recovered.saturating_add(count);
total_amount += amount;
}
trace!(
target: LOG_TARGET,
"bulletproof rewind profile - streamed {} outputs in {} ms",
total_scanned,
utxo_next_await_profiling.iter().fold(0, |acc, &x| acc + x.as_millis()),
);
trace!(
target: LOG_TARGET,
"bulletproof rewind profile - scanned {} outputs in {} ms",
total_scanned,
scan_for_outputs_profiling.iter().fold(0, |acc, &x| acc + x.as_millis()),
);
self.update_scanning_progress_in_db(last_utxo_index, total_amount, num_recovered, end_header_hash)
.await?;
self.publish_event(UtxoScannerEvent::Progress {
current_block: (end_header_size - 1),
current_chain_height: (end_header_size - 1),
current_index: (output_mmr_size - 1),
total_index: (output_mmr_size - 1),
});
Ok(total_scanned as u64)
}
Expand Down
4 changes: 2 additions & 2 deletions base_layer/wallet_ffi/src/tasks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,8 @@ pub async fn recovery_event_monitoring(
);
},
Ok(UtxoScannerEvent::Progress {
current_block: current,
current_chain_height: total,
current_index: current,
total_index: total,
}) => {
unsafe {
(recovery_progress_callback)(RecoveryEvent::Progress as u8, current, total);
Expand Down
5 changes: 5 additions & 0 deletions common/src/configuration/global.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ pub struct GlobalConfig {
pub base_node_event_channel_size: usize,
pub output_manager_event_channel_size: usize,
pub wallet_connection_manager_pool_size: usize,
pub wallet_recovery_retry_limit: usize,
pub console_wallet_password: Option<String>,
pub wallet_command_send_wait_stage: String,
pub wallet_command_send_wait_timeout: u64,
Expand Down Expand Up @@ -484,6 +485,9 @@ fn convert_node_config(
let key = "wallet.connection_manager_pool_size";
let wallet_connection_manager_pool_size = optional(cfg.get_int(key))?.unwrap_or(16) as usize;

let key = "wallet.wallet_recovery_retry_limit";
let wallet_recovery_retry_limit = optional(cfg.get_int(key))?.unwrap_or(3) as usize;

let key = "wallet.output_manager_event_channel_size";
let output_manager_event_channel_size = optional(cfg.get_int(key))?.unwrap_or(250) as usize;

Expand Down Expand Up @@ -759,6 +763,7 @@ fn convert_node_config(
transaction_event_channel_size,
base_node_event_channel_size,
wallet_connection_manager_pool_size,
wallet_recovery_retry_limit,
output_manager_event_channel_size,
console_wallet_password,
wallet_command_send_wait_stage,
Expand Down
Loading