From df1be7e4249d6e3e0701c837bedbdc2ad6c9ff65 Mon Sep 17 00:00:00 2001 From: Philip Robinson Date: Fri, 26 Nov 2021 08:50:05 +0200 Subject: [PATCH] feat: only trigger UTXO scanning when a new block event is received (#3620) Description --- Previously the UTXO scanner task was triggered on an interval. This PR updates this process to rather be triggered only when an existing task is not running and new Block event is received from the wallet Base Node monitoring service. It is only when this event is received when we know there will be new block data to scan. A new event is added to the Base Node Monitoring Service that only fires when a new block is detected to be used in the above functionality The Utxo Scanner is also refactored into separate files. To help in the review I have highlighted the new code below An unrelated bug is also fixed with handling of the wallet public address in the Console Wallet that broke the wallet. How Has This Been Tested? --- manually and by CI --- applications/daily_tests/cron_jobs.js | 2 +- .../tari_console_wallet/src/init/mod.rs | 7 +- .../tari_console_wallet/src/recovery.rs | 2 +- .../src/ui/state/wallet_event_monitor.rs | 4 +- .../wallet/src/base_node_service/handle.rs | 1 + .../wallet/src/base_node_service/monitor.rs | 32 +- base_layer/wallet/src/config.rs | 3 - .../src/output_manager_service/service.rs | 1 + .../wallet/src/transaction_service/service.rs | 3 +- .../wallet/src/utxo_scanner_service/mod.rs | 47 +-- .../src/utxo_scanner_service/service.rs | 192 +++++++++++ ...{utxo_scanning.rs => utxo_scanner_task.rs} | 323 ++---------------- .../uxto_scanner_service_builder.rs | 143 ++++++++ base_layer/wallet/src/wallet.rs | 4 +- base_layer/wallet/tests/wallet/mod.rs | 2 - base_layer/wallet_ffi/src/lib.rs | 3 +- common/config/presets/console_wallet.toml | 3 - common/src/configuration/global.rs | 7 - common/src/configuration/utils.rs | 1 - integration_tests/helpers/config.js | 1 - 20 files changed, 411 insertions(+), 370 deletions(-) create mode 100644 base_layer/wallet/src/utxo_scanner_service/service.rs rename base_layer/wallet/src/utxo_scanner_service/{utxo_scanning.rs => utxo_scanner_task.rs} (67%) create mode 100644 base_layer/wallet/src/utxo_scanner_service/uxto_scanner_service_builder.rs diff --git a/applications/daily_tests/cron_jobs.js b/applications/daily_tests/cron_jobs.js index 572163a552..d3041c6250 100644 --- a/applications/daily_tests/cron_jobs.js +++ b/applications/daily_tests/cron_jobs.js @@ -60,7 +60,7 @@ async function runWalletRecoveryTest(instances) { }); notify( - `🙌 Wallet (Pubkey: ${identity.public_key} ) recovered to a block height of ${numScanned}, completed in ${timeDiffMinutes} minutes (${scannedRate} blocks/min). ${recoveredAmount} µT recovered for ${instances} instance(s).` + `🙌 Wallet (Pubkey: ${identity.public_key} ) recovered scanned ${numScanned} UTXO's, completed in ${timeDiffMinutes} minutes (${scannedRate} UTXOs/min). ${recoveredAmount} µT recovered for ${instances} instance(s).` ); } catch (err) { console.error(err); diff --git a/applications/tari_console_wallet/src/init/mod.rs b/applications/tari_console_wallet/src/init/mod.rs index f974ed4b9f..74c9271c19 100644 --- a/applications/tari_console_wallet/src/init/mod.rs +++ b/applications/tari_console_wallet/src/init/mod.rs @@ -29,6 +29,7 @@ use rustyline::Editor; use tari_app_utilities::utilities::create_transport_type; use tari_common::{exit_codes::ExitCodes, ConfigBootstrap, GlobalConfig}; use tari_comms::{ + multiaddr::Multiaddr, peer_manager::{Peer, PeerFeatures}, types::CommsSecretKey, NodeIdentity, @@ -299,10 +300,7 @@ pub async fn init_wallet( ); let node_address = match wallet_db.get_node_address().await? { - None => config - .public_address - .clone() - .ok_or_else(|| ExitCodes::ConfigError("node public address error".to_string()))?, + None => config.public_address.clone().unwrap_or_else(Multiaddr::empty), Some(a) => a, }; @@ -403,7 +401,6 @@ pub async fn init_wallet( config.buffer_size_console_wallet, )), Some(config.buffer_rate_limit_console_wallet), - Some(config.scan_for_utxo_interval), Some(updater_config), config.autoupdate_check_interval, ); diff --git a/applications/tari_console_wallet/src/recovery.rs b/applications/tari_console_wallet/src/recovery.rs index 852a9b817f..ab49446bc7 100644 --- a/applications/tari_console_wallet/src/recovery.rs +++ b/applications/tari_console_wallet/src/recovery.rs @@ -30,7 +30,7 @@ use tari_key_manager::mnemonic::Mnemonic; use tari_shutdown::Shutdown; use tari_wallet::{ storage::sqlite_db::WalletSqliteDatabase, - utxo_scanner_service::{handle::UtxoScannerEvent, utxo_scanning::UtxoScannerService}, + utxo_scanner_service::{handle::UtxoScannerEvent, service::UtxoScannerService}, WalletSqlite, }; diff --git a/applications/tari_console_wallet/src/ui/state/wallet_event_monitor.rs b/applications/tari_console_wallet/src/ui/state/wallet_event_monitor.rs index c1b51207d0..b47b306e9c 100644 --- a/applications/tari_console_wallet/src/ui/state/wallet_event_monitor.rs +++ b/applications/tari_console_wallet/src/ui/state/wallet_event_monitor.rs @@ -180,10 +180,8 @@ impl WalletEventMonitor { match result { Ok(msg) => { trace!(target: LOG_TARGET, "Wallet Event Monitor received base node event {:?}", msg); - match (*msg).clone() { - BaseNodeEvent::BaseNodeStateChanged(state) => { + if let BaseNodeEvent::BaseNodeStateChanged(state) = (*msg).clone() { self.trigger_base_node_state_refresh(state).await; - } } }, Err(broadcast::error::RecvError::Lagged(n)) => { diff --git a/base_layer/wallet/src/base_node_service/handle.rs b/base_layer/wallet/src/base_node_service/handle.rs index 7e318b4a8b..80b09a2936 100644 --- a/base_layer/wallet/src/base_node_service/handle.rs +++ b/base_layer/wallet/src/base_node_service/handle.rs @@ -44,6 +44,7 @@ pub enum BaseNodeServiceResponse { #[derive(Clone, Debug, Hash, PartialEq, Eq)] pub enum BaseNodeEvent { BaseNodeStateChanged(BaseNodeState), + NewBlockDetected(u64), } /// The Base Node Service Handle is a struct that contains the interfaces used to communicate with a running diff --git a/base_layer/wallet/src/base_node_service/monitor.rs b/base_layer/wallet/src/base_node_service/monitor.rs index b5ae89902b..a79d04428f 100644 --- a/base_layer/wallet/src/base_node_service/monitor.rs +++ b/base_layer/wallet/src/base_node_service/monitor.rs @@ -88,7 +88,7 @@ where }, Err(e @ BaseNodeMonitorError::RpcFailed(_)) => { warn!(target: LOG_TARGET, "Connectivity failure to base node: {}", e); - self.map_state(move |_| BaseNodeState { + self.update_state(BaseNodeState { chain_metadata: None, is_synced: None, updated: None, @@ -134,7 +134,7 @@ where let tip_info = match interrupt(base_node_watch.changed(), client.get_tip_info()).await { Some(tip_info) => tip_info?, None => { - self.map_state(|_| Default::default()).await; + self.update_state(Default::default()).await; continue; }, }; @@ -165,7 +165,8 @@ where let is_synced = tip_info.is_synced; let height_of_longest_chain = chain_metadata.height_of_longest_chain(); - self.map_state(move |_| BaseNodeState { + + self.update_state(BaseNodeState { chain_metadata: Some(chain_metadata), is_synced: Some(is_synced), updated: Some(Utc::now().naive_utc()), @@ -184,7 +185,7 @@ where let delay = time::sleep(self.interval.saturating_sub(latency)); if interrupt(base_node_watch.changed(), delay).await.is_none() { - self.map_state(|_| Default::default()).await; + self.update_state(Default::default()).await; } } @@ -193,14 +194,23 @@ where Ok(()) } - async fn map_state(&self, transform: F) - where F: FnOnce(&BaseNodeState) -> BaseNodeState { - let new_state = { - let mut lock = self.state.write().await; - let new_state = transform(&*lock); - *lock = new_state.clone(); - new_state + async fn update_state(&self, new_state: BaseNodeState) { + let mut lock = self.state.write().await; + let (new_block_detected, height) = match (new_state.chain_metadata.clone(), (*lock).chain_metadata.clone()) { + (Some(new_metadata), Some(old_metadata)) => ( + new_metadata.height_of_longest_chain() != old_metadata.height_of_longest_chain(), + new_metadata.height_of_longest_chain(), + ), + (Some(new_metadata), _) => (true, new_metadata.height_of_longest_chain()), + (None, _) => (false, 0), }; + + if new_block_detected { + self.publish_event(BaseNodeEvent::NewBlockDetected(height)); + } + + *lock = new_state.clone(); + self.publish_event(BaseNodeEvent::BaseNodeStateChanged(new_state)); } diff --git a/base_layer/wallet/src/config.rs b/base_layer/wallet/src/config.rs index e176836453..2dc56f7d1d 100644 --- a/base_layer/wallet/src/config.rs +++ b/base_layer/wallet/src/config.rs @@ -43,7 +43,6 @@ pub struct WalletConfig { pub rate_limit: usize, pub network: NetworkConsensus, pub base_node_service_config: BaseNodeServiceConfig, - pub scan_for_utxo_interval: Duration, pub updater_config: Option, pub autoupdate_check_interval: Option, } @@ -59,7 +58,6 @@ impl WalletConfig { base_node_service_config: Option, buffer_size: Option, rate_limit: Option, - scan_for_utxo_interval: Option, updater_config: Option, autoupdate_check_interval: Option, ) -> Self { @@ -72,7 +70,6 @@ impl WalletConfig { rate_limit: rate_limit.unwrap_or(50), network, base_node_service_config: base_node_service_config.unwrap_or_default(), - scan_for_utxo_interval: scan_for_utxo_interval.unwrap_or_else(|| Duration::from_secs(43200)), updater_config, autoupdate_check_interval, } diff --git a/base_layer/wallet/src/output_manager_service/service.rs b/base_layer/wallet/src/output_manager_service/service.rs index 49ae3bf44b..d7b61cfbb2 100644 --- a/base_layer/wallet/src/output_manager_service/service.rs +++ b/base_layer/wallet/src/output_manager_service/service.rs @@ -383,6 +383,7 @@ where } self.last_seen_tip_height = state.chain_metadata.map(|cm| cm.height_of_longest_chain()); }, + BaseNodeEvent::NewBlockDetected(_) => {}, } } diff --git a/base_layer/wallet/src/transaction_service/service.rs b/base_layer/wallet/src/transaction_service/service.rs index 136bd198a8..6b00d1e0fe 100644 --- a/base_layer/wallet/src/transaction_service/service.rs +++ b/base_layer/wallet/src/transaction_service/service.rs @@ -49,7 +49,7 @@ use crate::{ }, types::HashDigest, util::watch::Watch, - utxo_scanner_service::utxo_scanning::RECOVERY_KEY, + utxo_scanner_service::RECOVERY_KEY, }; use chrono::{NaiveDateTime, Utc}; use digest::Digest; @@ -708,6 +708,7 @@ where } self.last_seen_tip_height = state.chain_metadata.map(|cm| cm.height_of_longest_chain()); }, + BaseNodeEvent::NewBlockDetected(_) => {}, } } diff --git a/base_layer/wallet/src/utxo_scanner_service/mod.rs b/base_layer/wallet/src/utxo_scanner_service/mod.rs index e8b9db5aea..c787e31f64 100644 --- a/base_layer/wallet/src/utxo_scanner_service/mod.rs +++ b/base_layer/wallet/src/utxo_scanner_service/mod.rs @@ -1,38 +1,18 @@ -// Copyright 2021. The Tari Project -// -// Redistribution and use in source and binary forms, with or without modification, are permitted provided that the -// following conditions are met: -// -// 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following -// disclaimer. -// -// 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the -// following disclaimer in the documentation and/or other materials provided with the distribution. -// -// 3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote -// products derived from this software without specific prior written permission. -// -// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, -// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE -// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, -// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR -// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, -// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE -// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - use crate::{ + base_node_service::handle::BaseNodeServiceHandle, connectivity_service::{WalletConnectivityHandle, WalletConnectivityInterface}, output_manager_service::handle::OutputManagerHandle, storage::database::{WalletBackend, WalletDatabase}, transaction_service::handle::TransactionServiceHandle, utxo_scanner_service::{ handle::UtxoScannerHandle, - utxo_scanning::{UtxoScannerMode, UtxoScannerService}, + service::UtxoScannerService, + uxto_scanner_service_builder::UtxoScannerMode, }, }; use futures::future; use log::*; -use std::{sync::Arc, time::Duration}; +use std::sync::Arc; use tari_comms::{connectivity::ConnectivityRequester, NodeIdentity}; use tari_core::transactions::CryptoFactories; use tari_service_framework::{async_trait, ServiceInitializationError, ServiceInitializer, ServiceInitializerContext}; @@ -40,12 +20,15 @@ use tokio::sync::broadcast; pub mod error; pub mod handle; -pub mod utxo_scanning; +pub mod service; +mod utxo_scanner_task; +mod uxto_scanner_service_builder; + +pub use utxo_scanner_task::RECOVERY_KEY; const LOG_TARGET: &str = "wallet::utxo_scanner_service::initializer"; pub struct UtxoScannerServiceInitializer { - interval: Duration, backend: Option>, factories: CryptoFactories, node_identity: Arc, @@ -54,14 +37,8 @@ pub struct UtxoScannerServiceInitializer { impl UtxoScannerServiceInitializer where T: WalletBackend + 'static { - pub fn new( - interval: Duration, - backend: WalletDatabase, - factories: CryptoFactories, - node_identity: Arc, - ) -> Self { + pub fn new(backend: WalletDatabase, factories: CryptoFactories, node_identity: Arc) -> Self { Self { - interval, backend: Some(backend), factories, node_identity, @@ -87,7 +64,6 @@ where T: WalletBackend + 'static .take() .expect("Cannot start Utxo scanner service without setting a storage backend"); let factories = self.factories.clone(); - let interval = self.interval; let node_identity = self.node_identity.clone(); context.spawn_when_ready(move |handles| async move { @@ -95,11 +71,11 @@ where T: WalletBackend + 'static let output_manager_service = handles.expect_handle::(); let comms_connectivity = handles.expect_handle::(); let wallet_connectivity = handles.expect_handle::(); + let base_node_service_handle = handles.expect_handle::(); let scanning_service = UtxoScannerService::::builder() .with_peers(vec![]) .with_retry_limit(2) - .with_scanning_interval(interval) .with_mode(UtxoScannerMode::Scanning) .build_with_resources( backend, @@ -111,6 +87,7 @@ where T: WalletBackend + 'static factories, handles.get_shutdown_signal(), event_sender, + base_node_service_handle, ) .run(); diff --git a/base_layer/wallet/src/utxo_scanner_service/service.rs b/base_layer/wallet/src/utxo_scanner_service/service.rs new file mode 100644 index 0000000000..9be47f6546 --- /dev/null +++ b/base_layer/wallet/src/utxo_scanner_service/service.rs @@ -0,0 +1,192 @@ +// Copyright 2021. The Tari Project +// +// Redistribution and use in source and binary forms, with or without modification, are permitted provided that the +// following conditions are met: +// +// 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following +// disclaimer. +// +// 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the +// following disclaimer in the documentation and/or other materials provided with the distribution. +// +// 3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote +// products derived from this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, +// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, +// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE +// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +use crate::{ + base_node_service::handle::BaseNodeServiceHandle, + error::WalletError, + output_manager_service::handle::OutputManagerHandle, + storage::database::{WalletBackend, WalletDatabase}, + transaction_service::handle::TransactionServiceHandle, + utxo_scanner_service::{ + handle::UtxoScannerEvent, + utxo_scanner_task::UtxoScannerTask, + uxto_scanner_service_builder::{UtxoScannerMode, UtxoScannerServiceBuilder}, + }, +}; + +use crate::base_node_service::handle::BaseNodeEvent; +use futures::FutureExt; +use log::*; +use serde::{Deserialize, Serialize}; +use std::sync::Arc; +use tari_common_types::types::HashOutput; +use tari_comms::{connectivity::ConnectivityRequester, peer_manager::Peer, types::CommsPublicKey, NodeIdentity}; +use tari_core::transactions::{tari_amount::MicroTari, CryptoFactories}; +use tari_shutdown::{Shutdown, ShutdownSignal}; +use tokio::{ + sync::{broadcast, watch}, + task, +}; + +pub const LOG_TARGET: &str = "wallet::utxo_scanning"; + +pub struct UtxoScannerService +where TBackend: WalletBackend + 'static +{ + pub(crate) resources: UtxoScannerResources, + pub(crate) retry_limit: usize, + pub(crate) peer_seeds: Vec, + pub(crate) mode: UtxoScannerMode, + pub(crate) shutdown_signal: ShutdownSignal, + pub(crate) event_sender: broadcast::Sender, + pub(crate) base_node_service: BaseNodeServiceHandle, +} + +impl UtxoScannerService +where TBackend: WalletBackend + 'static +{ + #[allow(clippy::too_many_arguments)] + pub fn new( + peer_seeds: Vec, + retry_limit: usize, + mode: UtxoScannerMode, + resources: UtxoScannerResources, + shutdown_signal: ShutdownSignal, + event_sender: broadcast::Sender, + base_node_service: BaseNodeServiceHandle, + ) -> Self { + Self { + resources, + peer_seeds, + retry_limit, + mode, + shutdown_signal, + event_sender, + base_node_service, + } + } + + fn create_task(&self, shutdown_signal: ShutdownSignal) -> UtxoScannerTask { + UtxoScannerTask { + resources: self.resources.clone(), + peer_seeds: self.peer_seeds.clone(), + event_sender: self.event_sender.clone(), + retry_limit: self.retry_limit, + peer_index: 0, + num_retries: 1, + mode: self.mode.clone(), + shutdown_signal, + } + } + + pub fn builder() -> UtxoScannerServiceBuilder { + UtxoScannerServiceBuilder::default() + } + + pub fn get_event_receiver(&mut self) -> broadcast::Receiver { + self.event_sender.subscribe() + } + + pub async fn run(mut self) -> Result<(), WalletError> { + info!(target: LOG_TARGET, "UTXO scanning service starting"); + + if self.mode == UtxoScannerMode::Recovery { + let task = self.create_task(self.shutdown_signal.clone()); + task::spawn(async move { + if let Err(err) = task.run().await { + error!(target: LOG_TARGET, "Error scanning UTXOs: {}", err); + } + }); + return Ok(()); + } + + let mut main_shutdown = self.shutdown_signal.clone(); + let mut base_node_service_event_stream = self.base_node_service.get_event_stream(); + + loop { + let mut local_shutdown = Shutdown::new(); + let task = self.create_task(local_shutdown.to_signal()); + let mut task_join_handle = task::spawn(async move { + if let Err(err) = task.run().await { + error!(target: LOG_TARGET, "Error scanning UTXOs: {}", err); + } + }) + .fuse(); + + loop { + tokio::select! { + event = base_node_service_event_stream.recv() => { + match event { + Ok(e) => { + if let BaseNodeEvent::NewBlockDetected(h) = (*e).clone() { + debug!(target: LOG_TARGET, "New block event received: {}", h); + if local_shutdown.is_triggered() { + debug!(target: LOG_TARGET, "Starting new round of UTXO scanning"); + break; + } + } + }, + Err(e) => debug!(target: LOG_TARGET, "Lagging read on base node event broadcast channel: {}", e), + }; + }, + _ = &mut task_join_handle => { + debug!(target: LOG_TARGET, "UTXO scanning round completed"); + local_shutdown.trigger(); + } + _ = self.resources.current_base_node_watcher.changed() => { + debug!(target: LOG_TARGET, "Base node change detected."); + let peer = self.resources.current_base_node_watcher.borrow().as_ref().cloned(); + if let Some(peer) = peer { + self.peer_seeds = vec![peer.public_key]; + } + local_shutdown.trigger(); + }, + _ = main_shutdown.wait() => { + // this will stop the task if its running, and let that thread exit gracefully + local_shutdown.trigger(); + info!(target: LOG_TARGET, "UTXO scanning service shutting down because it received the shutdown signal"); + return Ok(()); + } + } + } + } + } +} + +#[derive(Clone, Default, Serialize, Deserialize)] +pub struct ScanningMetadata { + pub total_amount: MicroTari, + pub number_of_utxos: u64, + pub utxo_index: u64, + pub height_hash: HashOutput, +} + +#[derive(Clone)] +pub struct UtxoScannerResources { + pub db: WalletDatabase, + pub comms_connectivity: ConnectivityRequester, + pub current_base_node_watcher: watch::Receiver>, + pub output_manager_service: OutputManagerHandle, + pub transaction_service: TransactionServiceHandle, + pub node_identity: Arc, + pub factories: CryptoFactories, +} diff --git a/base_layer/wallet/src/utxo_scanner_service/utxo_scanning.rs b/base_layer/wallet/src/utxo_scanner_service/utxo_scanner_task.rs similarity index 67% rename from base_layer/wallet/src/utxo_scanner_service/utxo_scanning.rs rename to base_layer/wallet/src/utxo_scanner_service/utxo_scanner_task.rs index b00ebc839d..e6c1283641 100644 --- a/base_layer/wallet/src/utxo_scanner_service/utxo_scanning.rs +++ b/base_layer/wallet/src/utxo_scanner_service/utxo_scanner_task.rs @@ -20,33 +20,19 @@ // WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE // USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -use std::{ - convert::TryFrom, - sync::{ - atomic::{AtomicBool, Ordering}, - Arc, - }, - time::{Duration, Instant}, -}; - use chrono::Utc; use futures::StreamExt; use log::*; -use serde::{Deserialize, Serialize}; -use tokio::{ - sync::{broadcast, watch}, - task, - time, - time::MissedTickBehavior, -}; -use tari_common_types::{transaction::TxId, types::HashOutput}; +use std::{ + convert::TryFrom, + time::{Duration, Instant}, +}; +use tari_common_types::transaction::TxId; use tari_comms::{ - connectivity::ConnectivityRequester, - peer_manager::{NodeId, Peer}, + peer_manager::NodeId, protocol::rpc::{RpcError, RpcStatus}, types::CommsPublicKey, - NodeIdentity, PeerConnection, }; use tari_core::{ @@ -59,22 +45,20 @@ use tari_core::{ transactions::{ tari_amount::MicroTari, transaction_entities::{TransactionOutput, UnblindedOutput}, - CryptoFactories, }, }; use tari_shutdown::ShutdownSignal; +use tokio::sync::broadcast; use crate::{ - connectivity_service::WalletConnectivityInterface, error::WalletError, - output_manager_service::handle::OutputManagerHandle, - storage::{ - database::{WalletBackend, WalletDatabase}, - sqlite_db::WalletSqliteDatabase, + storage::database::WalletBackend, + utxo_scanner_service::{ + error::UtxoScannerError, + handle::UtxoScannerEvent, + service::{ScanningMetadata, UtxoScannerResources}, + uxto_scanner_service_builder::UtxoScannerMode, }, - transaction_service::handle::TransactionServiceHandle, - utxo_scanner_service::{error::UtxoScannerError, handle::UtxoScannerEvent}, - WalletSqlite, }; pub const LOG_TARGET: &str = "wallet::utxo_scanning"; @@ -82,139 +66,17 @@ pub const LOG_TARGET: &str = "wallet::utxo_scanning"; pub const RECOVERY_KEY: &str = "recovery_data"; const SCANNING_KEY: &str = "scanning_data"; -#[derive(Debug, Clone, PartialEq)] -pub enum UtxoScannerMode { - Recovery, - Scanning, -} - -impl Default for UtxoScannerMode { - fn default() -> UtxoScannerMode { - UtxoScannerMode::Recovery - } -} - -#[derive(Debug, Default, Clone)] -pub struct UtxoScannerServiceBuilder { - retry_limit: usize, - peers: Vec, - mode: Option, - scanning_interval: Option, -} - -#[derive(Clone)] -struct UtxoScannerResources { - pub db: WalletDatabase, - pub comms_connectivity: ConnectivityRequester, - pub current_base_node_watcher: watch::Receiver>, - pub output_manager_service: OutputManagerHandle, - pub transaction_service: TransactionServiceHandle, - pub node_identity: Arc, - pub factories: CryptoFactories, -} - -impl UtxoScannerServiceBuilder { - /// Set the maximum number of times we retry recovery. A failed recovery is counted as _all_ peers have failed. - /// i.e. worst-case number of recovery attempts = number of sync peers * retry limit - pub fn with_retry_limit(&mut self, limit: usize) -> &mut Self { - self.retry_limit = limit; - self - } - - pub fn with_scanning_interval(&mut self, interval: Duration) -> &mut Self { - self.scanning_interval = Some(interval); - self - } - - pub fn with_peers(&mut self, peer_public_keys: Vec) -> &mut Self { - self.peers = peer_public_keys; - self - } - - pub fn with_mode(&mut self, mode: UtxoScannerMode) -> &mut Self { - self.mode = Some(mode); - self - } - - pub fn build_with_wallet( - &mut self, - wallet: &WalletSqlite, - shutdown_signal: ShutdownSignal, - ) -> UtxoScannerService { - let resources = UtxoScannerResources { - db: wallet.db.clone(), - comms_connectivity: wallet.comms.connectivity(), - current_base_node_watcher: wallet.wallet_connectivity.get_current_base_node_watcher(), - output_manager_service: wallet.output_manager_service.clone(), - transaction_service: wallet.transaction_service.clone(), - node_identity: wallet.comms.node_identity(), - factories: wallet.factories.clone(), - }; - - let (event_sender, _) = broadcast::channel(200); - - let interval = self - .scanning_interval - .unwrap_or_else(|| Duration::from_secs(60 * 60 * 12)); - UtxoScannerService::new( - self.peers.drain(..).collect(), - self.retry_limit, - self.mode.clone().unwrap_or_default(), - resources, - interval, - shutdown_signal, - event_sender, - ) - } - - #[allow(clippy::too_many_arguments)] - pub fn build_with_resources( - &mut self, - db: WalletDatabase, - comms_connectivity: ConnectivityRequester, - base_node_watcher: watch::Receiver>, - output_manager_service: OutputManagerHandle, - transaction_service: TransactionServiceHandle, - node_identity: Arc, - factories: CryptoFactories, - shutdown_signal: ShutdownSignal, - event_sender: broadcast::Sender, - ) -> UtxoScannerService { - let resources = UtxoScannerResources { - db, - comms_connectivity, - current_base_node_watcher: base_node_watcher, - output_manager_service, - transaction_service, - node_identity, - factories, - }; - let interval = self - .scanning_interval - .unwrap_or_else(|| Duration::from_secs(60 * 60 * 12)); - UtxoScannerService::new( - self.peers.drain(..).collect(), - self.retry_limit, - self.mode.clone().unwrap_or_default(), - resources, - interval, - shutdown_signal, - event_sender, - ) - } -} - -struct UtxoScannerTask +pub struct UtxoScannerTask where TBackend: WalletBackend + 'static { - resources: UtxoScannerResources, - event_sender: broadcast::Sender, - retry_limit: usize, - num_retries: usize, - peer_seeds: Vec, - peer_index: usize, - mode: UtxoScannerMode, - run_flag: Arc, + pub(crate) resources: UtxoScannerResources, + pub(crate) event_sender: broadcast::Sender, + pub(crate) retry_limit: usize, + pub(crate) num_retries: usize, + pub(crate) peer_seeds: Vec, + pub(crate) peer_index: usize, + pub(crate) mode: UtxoScannerMode, + pub(crate) shutdown_signal: ShutdownSignal, } impl UtxoScannerTask where TBackend: WalletBackend + 'static @@ -284,7 +146,7 @@ where TBackend: WalletBackend + 'static let start_index = self.get_start_utxo_mmr_pos(&mut client).await?; let tip_header = self.get_chain_tip_header(&mut client).await?; let output_mmr_size = tip_header.output_mmr_size; - if !self.run_flag.load(Ordering::Relaxed) { + if self.shutdown_signal.is_triggered() { // if running is set to false, we know its been canceled upstream so lets exit the loop return Ok((total_scanned, start_index, timer.elapsed())); } @@ -404,7 +266,7 @@ where TBackend: WalletBackend + 'static let mut last_utxo_index = 0u64; let mut iteration_count = 0u64; while let Some(response) = utxo_stream.next().await { - if !self.run_flag.load(Ordering::Relaxed) { + if self.shutdown_signal.is_triggered() { // if running is set to false, we know its been canceled upstream so lets exit the loop return Ok(total_scanned as u64); } @@ -581,10 +443,9 @@ where TBackend: WalletBackend + 'static Ok(tx_id) } - async fn run(mut self) -> Result<(), UtxoScannerError> { - self.run_flag.store(true, Ordering::Relaxed); + pub async fn run(mut self) -> Result<(), UtxoScannerError> { loop { - if !self.run_flag.load(Ordering::Relaxed) { + if self.shutdown_signal.is_triggered() { // if running is set to false, we know its been canceled upstream so lets exit the loop return Ok(()); } @@ -672,118 +533,6 @@ where TBackend: WalletBackend + 'static } } -pub struct UtxoScannerService -where TBackend: WalletBackend + 'static -{ - resources: UtxoScannerResources, - retry_limit: usize, - peer_seeds: Vec, - mode: UtxoScannerMode, - is_running: Arc, - scan_for_utxo_interval: Duration, - shutdown_signal: ShutdownSignal, - event_sender: broadcast::Sender, -} - -impl UtxoScannerService -where TBackend: WalletBackend + 'static -{ - #[allow(clippy::too_many_arguments)] - fn new( - peer_seeds: Vec, - retry_limit: usize, - mode: UtxoScannerMode, - resources: UtxoScannerResources, - scan_for_utxo_interval: Duration, - shutdown_signal: ShutdownSignal, - event_sender: broadcast::Sender, - ) -> Self { - Self { - resources, - peer_seeds, - retry_limit, - mode, - is_running: Arc::new(AtomicBool::new(false)), - scan_for_utxo_interval, - shutdown_signal, - event_sender, - } - } - - fn create_task(&self) -> UtxoScannerTask { - UtxoScannerTask { - resources: self.resources.clone(), - peer_seeds: self.peer_seeds.clone(), - event_sender: self.event_sender.clone(), - retry_limit: self.retry_limit, - peer_index: 0, - num_retries: 1, - mode: self.mode.clone(), - run_flag: self.is_running.clone(), - } - } - - pub fn builder() -> UtxoScannerServiceBuilder { - UtxoScannerServiceBuilder::default() - } - - pub fn get_event_receiver(&mut self) -> broadcast::Receiver { - self.event_sender.subscribe() - } - - pub async fn run(mut self) -> Result<(), WalletError> { - info!( - target: LOG_TARGET, - "UTXO scanning service starting (interval = {:.2?})", self.scan_for_utxo_interval - ); - - let mut shutdown = self.shutdown_signal.clone(); - let start_at = Instant::now() + Duration::from_secs(1); - let mut work_interval = time::interval_at(start_at.into(), self.scan_for_utxo_interval); - work_interval.set_missed_tick_behavior(MissedTickBehavior::Delay); - loop { - tokio::select! { - _ = work_interval.tick() => { - let running_flag = self.is_running.clone(); - if !running_flag.load(Ordering::SeqCst) { - let task = self.create_task(); - debug!(target: LOG_TARGET, "UTXO scanning service starting scan for utxos"); - task::spawn(async move { - if let Err(err) = task.run().await { - error!(target: LOG_TARGET, "Error scanning UTXOs: {}", err); - } - //we make sure the flag is set to false here - running_flag.store(false, Ordering::Relaxed); - }); - if self.mode == UtxoScannerMode::Recovery { - return Ok(()); - } - } - }, - _ = self.resources.current_base_node_watcher.changed() => { - debug!(target: LOG_TARGET, "Base node change detected."); - let peer = self.resources.current_base_node_watcher.borrow().as_ref().cloned(); - - // If we are recovering we will stick to the initially provided seeds - if self.mode != UtxoScannerMode::Recovery { - if let Some(peer) = peer { - self.peer_seeds = vec![peer.public_key]; - } - } - - self.is_running.store(false, Ordering::Relaxed); - }, - _ = shutdown.wait() => { - // this will stop the task if its running, and let that thread exit gracefully - self.is_running.store(false, Ordering::Relaxed); - info!(target: LOG_TARGET, "UTXO scanning service shutting down because it received the shutdown signal"); - return Ok(()); - } - } - } - } -} - fn convert_response_to_transaction_outputs( response: Vec>, last_utxo_index: u64, @@ -794,12 +543,12 @@ fn convert_response_to_transaction_outputs( .collect::, _>>()?; let current_utxo_index = response - // Assumes correct ordering which is otherwise not required for this protocol - .last() - .ok_or_else(|| { - UtxoScannerError::BaseNodeResponseError("Invalid response from base node: response was empty".to_string()) - })? - .mmr_index; + // Assumes correct ordering which is otherwise not required for this protocol + .last() + .ok_or_else(|| { + UtxoScannerError::BaseNodeResponseError("Invalid response from base node: response was empty".to_string()) + })? + .mmr_index; if current_utxo_index < last_utxo_index { return Err(UtxoScannerError::BaseNodeResponseError( "Invalid response from base node: mmr index must be non-decreasing".to_string(), @@ -817,11 +566,3 @@ fn convert_response_to_transaction_outputs( .collect::, _>>()?; Ok((outputs, current_utxo_index)) } - -#[derive(Clone, Default, Serialize, Deserialize)] -struct ScanningMetadata { - pub total_amount: MicroTari, - pub number_of_utxos: u64, - pub utxo_index: u64, - pub height_hash: HashOutput, -} diff --git a/base_layer/wallet/src/utxo_scanner_service/uxto_scanner_service_builder.rs b/base_layer/wallet/src/utxo_scanner_service/uxto_scanner_service_builder.rs new file mode 100644 index 0000000000..e57c461b8b --- /dev/null +++ b/base_layer/wallet/src/utxo_scanner_service/uxto_scanner_service_builder.rs @@ -0,0 +1,143 @@ +// Copyright 2021. The Tari Project +// +// Redistribution and use in source and binary forms, with or without modification, are permitted provided that the +// following conditions are met: +// +// 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following +// disclaimer. +// +// 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the +// following disclaimer in the documentation and/or other materials provided with the distribution. +// +// 3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote +// products derived from this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, +// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, +// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE +// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +use crate::{ + base_node_service::handle::BaseNodeServiceHandle, + connectivity_service::WalletConnectivityInterface, + output_manager_service::handle::OutputManagerHandle, + storage::{ + database::{WalletBackend, WalletDatabase}, + sqlite_db::WalletSqliteDatabase, + }, + transaction_service::handle::TransactionServiceHandle, + utxo_scanner_service::{ + handle::UtxoScannerEvent, + service::{UtxoScannerResources, UtxoScannerService}, + }, + WalletSqlite, +}; +use std::sync::Arc; +use tari_comms::{connectivity::ConnectivityRequester, peer_manager::Peer, types::CommsPublicKey, NodeIdentity}; +use tari_core::transactions::CryptoFactories; +use tari_shutdown::ShutdownSignal; +use tokio::sync::{broadcast, watch}; + +#[derive(Debug, Clone, PartialEq)] +pub enum UtxoScannerMode { + Recovery, + Scanning, +} + +impl Default for UtxoScannerMode { + fn default() -> UtxoScannerMode { + UtxoScannerMode::Recovery + } +} + +#[derive(Debug, Default, Clone)] +pub struct UtxoScannerServiceBuilder { + retry_limit: usize, + peers: Vec, + mode: Option, +} + +impl UtxoScannerServiceBuilder { + /// Set the maximum number of times we retry recovery. A failed recovery is counted as _all_ peers have failed. + /// i.e. worst-case number of recovery attempts = number of sync peers * retry limit + pub fn with_retry_limit(&mut self, limit: usize) -> &mut Self { + self.retry_limit = limit; + self + } + + pub fn with_peers(&mut self, peer_public_keys: Vec) -> &mut Self { + self.peers = peer_public_keys; + self + } + + pub fn with_mode(&mut self, mode: UtxoScannerMode) -> &mut Self { + self.mode = Some(mode); + self + } + + pub fn build_with_wallet( + &mut self, + wallet: &WalletSqlite, + shutdown_signal: ShutdownSignal, + ) -> UtxoScannerService { + let resources = UtxoScannerResources { + db: wallet.db.clone(), + comms_connectivity: wallet.comms.connectivity(), + current_base_node_watcher: wallet.wallet_connectivity.get_current_base_node_watcher(), + output_manager_service: wallet.output_manager_service.clone(), + transaction_service: wallet.transaction_service.clone(), + node_identity: wallet.comms.node_identity(), + factories: wallet.factories.clone(), + }; + + let (event_sender, _) = broadcast::channel(200); + + UtxoScannerService::new( + self.peers.drain(..).collect(), + self.retry_limit, + self.mode.clone().unwrap_or_default(), + resources, + shutdown_signal, + event_sender, + wallet.base_node_service.clone(), + ) + } + + #[allow(clippy::too_many_arguments)] + pub fn build_with_resources( + &mut self, + db: WalletDatabase, + comms_connectivity: ConnectivityRequester, + base_node_watcher: watch::Receiver>, + output_manager_service: OutputManagerHandle, + transaction_service: TransactionServiceHandle, + node_identity: Arc, + factories: CryptoFactories, + shutdown_signal: ShutdownSignal, + event_sender: broadcast::Sender, + base_node_service: BaseNodeServiceHandle, + ) -> UtxoScannerService { + let resources = UtxoScannerResources { + db, + comms_connectivity, + current_base_node_watcher: base_node_watcher, + output_manager_service, + transaction_service, + node_identity, + factories, + }; + + UtxoScannerService::new( + self.peers.drain(..).collect(), + self.retry_limit, + self.mode.clone().unwrap_or_default(), + resources, + shutdown_signal, + event_sender, + base_node_service, + ) + } +} diff --git a/base_layer/wallet/src/wallet.rs b/base_layer/wallet/src/wallet.rs index 169dfc2703..35d926a3c9 100644 --- a/base_layer/wallet/src/wallet.rs +++ b/base_layer/wallet/src/wallet.rs @@ -80,7 +80,7 @@ use crate::{ TransactionServiceInitializer, }, types::KeyDigest, - utxo_scanner_service::{handle::UtxoScannerHandle, UtxoScannerServiceInitializer}, + utxo_scanner_service::{handle::UtxoScannerHandle, UtxoScannerServiceInitializer, RECOVERY_KEY}, }; use tari_common_types::transaction::TxId; use tari_key_manager::cipher_seed::CipherSeed; @@ -188,7 +188,6 @@ where )) .add_initializer(WalletConnectivityInitializer::new(config.base_node_service_config)) .add_initializer(UtxoScannerServiceInitializer::new( - config.scan_for_utxo_interval, wallet_database.clone(), factories.clone(), node_identity.clone(), @@ -498,7 +497,6 @@ where /// Utility function to find out if there is data in the database indicating that there is an incomplete recovery /// process in progress pub async fn is_recovery_in_progress(&self) -> Result { - use crate::utxo_scanner_service::utxo_scanning::RECOVERY_KEY; Ok(self.db.get_client_key_value(RECOVERY_KEY.to_string()).await?.is_some()) } } diff --git a/base_layer/wallet/tests/wallet/mod.rs b/base_layer/wallet/tests/wallet/mod.rs index c33481f3c2..7a16a844ca 100644 --- a/base_layer/wallet/tests/wallet/mod.rs +++ b/base_layer/wallet/tests/wallet/mod.rs @@ -154,7 +154,6 @@ async fn create_wallet( None, None, None, - None, ); let metadata = ChainMetadata::new(std::i64::MAX as u64, Vec::new(), 0, 0, 0); @@ -704,7 +703,6 @@ async fn test_import_utxo() { None, None, None, - None, ); let mut alice_wallet = Wallet::start( config, diff --git a/base_layer/wallet_ffi/src/lib.rs b/base_layer/wallet_ffi/src/lib.rs index 566ec880d4..ad0153f422 100644 --- a/base_layer/wallet_ffi/src/lib.rs +++ b/base_layer/wallet_ffi/src/lib.rs @@ -153,7 +153,7 @@ use tari_wallet::{ models::{CompletedTransaction, InboundTransaction, OutboundTransaction}, }, }, - utxo_scanner_service::utxo_scanning::{UtxoScannerService, RECOVERY_KEY}, + utxo_scanner_service::{service::UtxoScannerService, RECOVERY_KEY}, Wallet, WalletConfig, WalletSqlite, @@ -3217,7 +3217,6 @@ pub unsafe extern "C" fn wallet_create( None, None, None, - None, ); let mut recovery_lookup = match runtime.block_on(wallet_database.get_client_key_value(RECOVERY_KEY.to_owned())) { diff --git a/common/config/presets/console_wallet.toml b/common/config/presets/console_wallet.toml index 957fceece1..ea7a7a4636 100644 --- a/common/config/presets/console_wallet.toml +++ b/common/config/presets/console_wallet.toml @@ -69,9 +69,6 @@ base_node_update_publisher_channel_size = 500 # (options: "DirectOnly", "StoreAndForwardOnly", DirectAndStoreAndForward". default: "DirectAndStoreAndForward"). #transaction_routing_mechanism = "DirectAndStoreAndForward" -# UTXO scanning service interval (default = 12 hours, i.e. 60 * 60 * 12 seconds) -scan_for_utxo_interval = 180 - # When running the console wallet in command mode, use these values to determine what "stage" and timeout to wait # for sent transactions. # The stages are: diff --git a/common/src/configuration/global.rs b/common/src/configuration/global.rs index 04086be1f0..34ba629e92 100644 --- a/common/src/configuration/global.rs +++ b/common/src/configuration/global.rs @@ -101,7 +101,6 @@ pub struct GlobalConfig { pub fetch_utxos_timeout: Duration, pub service_request_timeout: Duration, pub base_node_query_timeout: Duration, - pub scan_for_utxo_interval: Duration, pub saf_expiry_duration: Duration, pub transaction_broadcast_monitoring_timeout: Duration, pub transaction_chain_monitoring_timeout: Duration, @@ -445,11 +444,6 @@ fn convert_node_config( cfg.get_int(key) .map_err(|e| ConfigurationError::new(key, &e.to_string()))? as u64, ); - let key = "wallet.scan_for_utxo_interval"; - let scan_for_utxo_interval = Duration::from_secs( - cfg.get_int(key) - .map_err(|e| ConfigurationError::new(key, &e.to_string()))? as u64, - ); let key = "wallet.saf_expiry_duration"; let saf_expiry_duration = Duration::from_secs(optional(cfg.get_int(key))?.unwrap_or(10800) as u64); @@ -755,7 +749,6 @@ fn convert_node_config( fetch_utxos_timeout, service_request_timeout, base_node_query_timeout, - scan_for_utxo_interval, saf_expiry_duration, transaction_broadcast_monitoring_timeout, transaction_chain_monitoring_timeout, diff --git a/common/src/configuration/utils.rs b/common/src/configuration/utils.rs index 7b0f2c4cf0..78aaa5c73e 100644 --- a/common/src/configuration/utils.rs +++ b/common/src/configuration/utils.rs @@ -118,7 +118,6 @@ pub fn default_config(bootstrap: &ConfigBootstrap) -> Config { cfg.set_default("wallet.base_node_service_refresh_interval", 5).unwrap(); cfg.set_default("wallet.base_node_service_request_max_age", 60).unwrap(); cfg.set_default("wallet.balance_enquiry_cooldown_period", 1).unwrap(); - cfg.set_default("wallet.scan_for_utxo_interval", 60 * 60 * 12).unwrap(); cfg.set_default("wallet.transaction_broadcast_monitoring_timeout", 60) .unwrap(); cfg.set_default("wallet.transaction_chain_monitoring_timeout", 60) diff --git a/integration_tests/helpers/config.js b/integration_tests/helpers/config.js index a022190d3f..23653699b8 100644 --- a/integration_tests/helpers/config.js +++ b/integration_tests/helpers/config.js @@ -102,7 +102,6 @@ function baseEnvs(peerSeeds = [], forceSyncPeers = []) { TARI_MINING_NODE__NUM_MINING_THREADS: "1", TARI_MINING_NODE__MINE_ON_TIP_ONLY: true, TARI_MINING_NODE__VALIDATE_TIP_TIMEOUT_SEC: 1, - TARI_WALLET__SCAN_FOR_UTXO_INTERVAL: 5, }; if (forceSyncPeers.length > 0) { envs.TARI_BASE_NODE__LOCALNET__FORCE_SYNC_PEERS = forceSyncPeers.join(",");