diff --git a/applications/daily_tests/cron_jobs.js b/applications/daily_tests/cron_jobs.js index 572163a552..d3041c6250 100644 --- a/applications/daily_tests/cron_jobs.js +++ b/applications/daily_tests/cron_jobs.js @@ -60,7 +60,7 @@ async function runWalletRecoveryTest(instances) { }); notify( - `🙌 Wallet (Pubkey: ${identity.public_key} ) recovered to a block height of ${numScanned}, completed in ${timeDiffMinutes} minutes (${scannedRate} blocks/min). ${recoveredAmount} µT recovered for ${instances} instance(s).` + `🙌 Wallet (Pubkey: ${identity.public_key} ) recovered scanned ${numScanned} UTXO's, completed in ${timeDiffMinutes} minutes (${scannedRate} UTXOs/min). ${recoveredAmount} µT recovered for ${instances} instance(s).` ); } catch (err) { console.error(err); diff --git a/applications/tari_console_wallet/src/init/mod.rs b/applications/tari_console_wallet/src/init/mod.rs index f974ed4b9f..74c9271c19 100644 --- a/applications/tari_console_wallet/src/init/mod.rs +++ b/applications/tari_console_wallet/src/init/mod.rs @@ -29,6 +29,7 @@ use rustyline::Editor; use tari_app_utilities::utilities::create_transport_type; use tari_common::{exit_codes::ExitCodes, ConfigBootstrap, GlobalConfig}; use tari_comms::{ + multiaddr::Multiaddr, peer_manager::{Peer, PeerFeatures}, types::CommsSecretKey, NodeIdentity, @@ -299,10 +300,7 @@ pub async fn init_wallet( ); let node_address = match wallet_db.get_node_address().await? { - None => config - .public_address - .clone() - .ok_or_else(|| ExitCodes::ConfigError("node public address error".to_string()))?, + None => config.public_address.clone().unwrap_or_else(Multiaddr::empty), Some(a) => a, }; @@ -403,7 +401,6 @@ pub async fn init_wallet( config.buffer_size_console_wallet, )), Some(config.buffer_rate_limit_console_wallet), - Some(config.scan_for_utxo_interval), Some(updater_config), config.autoupdate_check_interval, ); diff --git a/applications/tari_console_wallet/src/recovery.rs b/applications/tari_console_wallet/src/recovery.rs index 852a9b817f..ab49446bc7 100644 --- a/applications/tari_console_wallet/src/recovery.rs +++ b/applications/tari_console_wallet/src/recovery.rs @@ -30,7 +30,7 @@ use tari_key_manager::mnemonic::Mnemonic; use tari_shutdown::Shutdown; use tari_wallet::{ storage::sqlite_db::WalletSqliteDatabase, - utxo_scanner_service::{handle::UtxoScannerEvent, utxo_scanning::UtxoScannerService}, + utxo_scanner_service::{handle::UtxoScannerEvent, service::UtxoScannerService}, WalletSqlite, }; diff --git a/applications/tari_console_wallet/src/ui/state/wallet_event_monitor.rs b/applications/tari_console_wallet/src/ui/state/wallet_event_monitor.rs index c1b51207d0..b47b306e9c 100644 --- a/applications/tari_console_wallet/src/ui/state/wallet_event_monitor.rs +++ b/applications/tari_console_wallet/src/ui/state/wallet_event_monitor.rs @@ -180,10 +180,8 @@ impl WalletEventMonitor { match result { Ok(msg) => { trace!(target: LOG_TARGET, "Wallet Event Monitor received base node event {:?}", msg); - match (*msg).clone() { - BaseNodeEvent::BaseNodeStateChanged(state) => { + if let BaseNodeEvent::BaseNodeStateChanged(state) = (*msg).clone() { self.trigger_base_node_state_refresh(state).await; - } } }, Err(broadcast::error::RecvError::Lagged(n)) => { diff --git a/base_layer/wallet/src/base_node_service/handle.rs b/base_layer/wallet/src/base_node_service/handle.rs index 7e318b4a8b..80b09a2936 100644 --- a/base_layer/wallet/src/base_node_service/handle.rs +++ b/base_layer/wallet/src/base_node_service/handle.rs @@ -44,6 +44,7 @@ pub enum BaseNodeServiceResponse { #[derive(Clone, Debug, Hash, PartialEq, Eq)] pub enum BaseNodeEvent { BaseNodeStateChanged(BaseNodeState), + NewBlockDetected(u64), } /// The Base Node Service Handle is a struct that contains the interfaces used to communicate with a running diff --git a/base_layer/wallet/src/base_node_service/monitor.rs b/base_layer/wallet/src/base_node_service/monitor.rs index b5ae89902b..a79d04428f 100644 --- a/base_layer/wallet/src/base_node_service/monitor.rs +++ b/base_layer/wallet/src/base_node_service/monitor.rs @@ -88,7 +88,7 @@ where }, Err(e @ BaseNodeMonitorError::RpcFailed(_)) => { warn!(target: LOG_TARGET, "Connectivity failure to base node: {}", e); - self.map_state(move |_| BaseNodeState { + self.update_state(BaseNodeState { chain_metadata: None, is_synced: None, updated: None, @@ -134,7 +134,7 @@ where let tip_info = match interrupt(base_node_watch.changed(), client.get_tip_info()).await { Some(tip_info) => tip_info?, None => { - self.map_state(|_| Default::default()).await; + self.update_state(Default::default()).await; continue; }, }; @@ -165,7 +165,8 @@ where let is_synced = tip_info.is_synced; let height_of_longest_chain = chain_metadata.height_of_longest_chain(); - self.map_state(move |_| BaseNodeState { + + self.update_state(BaseNodeState { chain_metadata: Some(chain_metadata), is_synced: Some(is_synced), updated: Some(Utc::now().naive_utc()), @@ -184,7 +185,7 @@ where let delay = time::sleep(self.interval.saturating_sub(latency)); if interrupt(base_node_watch.changed(), delay).await.is_none() { - self.map_state(|_| Default::default()).await; + self.update_state(Default::default()).await; } } @@ -193,14 +194,23 @@ where Ok(()) } - async fn map_state(&self, transform: F) - where F: FnOnce(&BaseNodeState) -> BaseNodeState { - let new_state = { - let mut lock = self.state.write().await; - let new_state = transform(&*lock); - *lock = new_state.clone(); - new_state + async fn update_state(&self, new_state: BaseNodeState) { + let mut lock = self.state.write().await; + let (new_block_detected, height) = match (new_state.chain_metadata.clone(), (*lock).chain_metadata.clone()) { + (Some(new_metadata), Some(old_metadata)) => ( + new_metadata.height_of_longest_chain() != old_metadata.height_of_longest_chain(), + new_metadata.height_of_longest_chain(), + ), + (Some(new_metadata), _) => (true, new_metadata.height_of_longest_chain()), + (None, _) => (false, 0), }; + + if new_block_detected { + self.publish_event(BaseNodeEvent::NewBlockDetected(height)); + } + + *lock = new_state.clone(); + self.publish_event(BaseNodeEvent::BaseNodeStateChanged(new_state)); } diff --git a/base_layer/wallet/src/config.rs b/base_layer/wallet/src/config.rs index e176836453..2dc56f7d1d 100644 --- a/base_layer/wallet/src/config.rs +++ b/base_layer/wallet/src/config.rs @@ -43,7 +43,6 @@ pub struct WalletConfig { pub rate_limit: usize, pub network: NetworkConsensus, pub base_node_service_config: BaseNodeServiceConfig, - pub scan_for_utxo_interval: Duration, pub updater_config: Option, pub autoupdate_check_interval: Option, } @@ -59,7 +58,6 @@ impl WalletConfig { base_node_service_config: Option, buffer_size: Option, rate_limit: Option, - scan_for_utxo_interval: Option, updater_config: Option, autoupdate_check_interval: Option, ) -> Self { @@ -72,7 +70,6 @@ impl WalletConfig { rate_limit: rate_limit.unwrap_or(50), network, base_node_service_config: base_node_service_config.unwrap_or_default(), - scan_for_utxo_interval: scan_for_utxo_interval.unwrap_or_else(|| Duration::from_secs(43200)), updater_config, autoupdate_check_interval, } diff --git a/base_layer/wallet/src/output_manager_service/service.rs b/base_layer/wallet/src/output_manager_service/service.rs index 49ae3bf44b..d7b61cfbb2 100644 --- a/base_layer/wallet/src/output_manager_service/service.rs +++ b/base_layer/wallet/src/output_manager_service/service.rs @@ -383,6 +383,7 @@ where } self.last_seen_tip_height = state.chain_metadata.map(|cm| cm.height_of_longest_chain()); }, + BaseNodeEvent::NewBlockDetected(_) => {}, } } diff --git a/base_layer/wallet/src/transaction_service/service.rs b/base_layer/wallet/src/transaction_service/service.rs index 136bd198a8..6b00d1e0fe 100644 --- a/base_layer/wallet/src/transaction_service/service.rs +++ b/base_layer/wallet/src/transaction_service/service.rs @@ -49,7 +49,7 @@ use crate::{ }, types::HashDigest, util::watch::Watch, - utxo_scanner_service::utxo_scanning::RECOVERY_KEY, + utxo_scanner_service::RECOVERY_KEY, }; use chrono::{NaiveDateTime, Utc}; use digest::Digest; @@ -708,6 +708,7 @@ where } self.last_seen_tip_height = state.chain_metadata.map(|cm| cm.height_of_longest_chain()); }, + BaseNodeEvent::NewBlockDetected(_) => {}, } } diff --git a/base_layer/wallet/src/utxo_scanner_service/mod.rs b/base_layer/wallet/src/utxo_scanner_service/mod.rs index e8b9db5aea..c787e31f64 100644 --- a/base_layer/wallet/src/utxo_scanner_service/mod.rs +++ b/base_layer/wallet/src/utxo_scanner_service/mod.rs @@ -1,38 +1,18 @@ -// Copyright 2021. The Tari Project -// -// Redistribution and use in source and binary forms, with or without modification, are permitted provided that the -// following conditions are met: -// -// 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following -// disclaimer. -// -// 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the -// following disclaimer in the documentation and/or other materials provided with the distribution. -// -// 3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote -// products derived from this software without specific prior written permission. -// -// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, -// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE -// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, -// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR -// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, -// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE -// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - use crate::{ + base_node_service::handle::BaseNodeServiceHandle, connectivity_service::{WalletConnectivityHandle, WalletConnectivityInterface}, output_manager_service::handle::OutputManagerHandle, storage::database::{WalletBackend, WalletDatabase}, transaction_service::handle::TransactionServiceHandle, utxo_scanner_service::{ handle::UtxoScannerHandle, - utxo_scanning::{UtxoScannerMode, UtxoScannerService}, + service::UtxoScannerService, + uxto_scanner_service_builder::UtxoScannerMode, }, }; use futures::future; use log::*; -use std::{sync::Arc, time::Duration}; +use std::sync::Arc; use tari_comms::{connectivity::ConnectivityRequester, NodeIdentity}; use tari_core::transactions::CryptoFactories; use tari_service_framework::{async_trait, ServiceInitializationError, ServiceInitializer, ServiceInitializerContext}; @@ -40,12 +20,15 @@ use tokio::sync::broadcast; pub mod error; pub mod handle; -pub mod utxo_scanning; +pub mod service; +mod utxo_scanner_task; +mod uxto_scanner_service_builder; + +pub use utxo_scanner_task::RECOVERY_KEY; const LOG_TARGET: &str = "wallet::utxo_scanner_service::initializer"; pub struct UtxoScannerServiceInitializer { - interval: Duration, backend: Option>, factories: CryptoFactories, node_identity: Arc, @@ -54,14 +37,8 @@ pub struct UtxoScannerServiceInitializer { impl UtxoScannerServiceInitializer where T: WalletBackend + 'static { - pub fn new( - interval: Duration, - backend: WalletDatabase, - factories: CryptoFactories, - node_identity: Arc, - ) -> Self { + pub fn new(backend: WalletDatabase, factories: CryptoFactories, node_identity: Arc) -> Self { Self { - interval, backend: Some(backend), factories, node_identity, @@ -87,7 +64,6 @@ where T: WalletBackend + 'static .take() .expect("Cannot start Utxo scanner service without setting a storage backend"); let factories = self.factories.clone(); - let interval = self.interval; let node_identity = self.node_identity.clone(); context.spawn_when_ready(move |handles| async move { @@ -95,11 +71,11 @@ where T: WalletBackend + 'static let output_manager_service = handles.expect_handle::(); let comms_connectivity = handles.expect_handle::(); let wallet_connectivity = handles.expect_handle::(); + let base_node_service_handle = handles.expect_handle::(); let scanning_service = UtxoScannerService::::builder() .with_peers(vec![]) .with_retry_limit(2) - .with_scanning_interval(interval) .with_mode(UtxoScannerMode::Scanning) .build_with_resources( backend, @@ -111,6 +87,7 @@ where T: WalletBackend + 'static factories, handles.get_shutdown_signal(), event_sender, + base_node_service_handle, ) .run(); diff --git a/base_layer/wallet/src/utxo_scanner_service/service.rs b/base_layer/wallet/src/utxo_scanner_service/service.rs new file mode 100644 index 0000000000..9be47f6546 --- /dev/null +++ b/base_layer/wallet/src/utxo_scanner_service/service.rs @@ -0,0 +1,192 @@ +// Copyright 2021. The Tari Project +// +// Redistribution and use in source and binary forms, with or without modification, are permitted provided that the +// following conditions are met: +// +// 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following +// disclaimer. +// +// 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the +// following disclaimer in the documentation and/or other materials provided with the distribution. +// +// 3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote +// products derived from this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, +// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, +// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE +// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +use crate::{ + base_node_service::handle::BaseNodeServiceHandle, + error::WalletError, + output_manager_service::handle::OutputManagerHandle, + storage::database::{WalletBackend, WalletDatabase}, + transaction_service::handle::TransactionServiceHandle, + utxo_scanner_service::{ + handle::UtxoScannerEvent, + utxo_scanner_task::UtxoScannerTask, + uxto_scanner_service_builder::{UtxoScannerMode, UtxoScannerServiceBuilder}, + }, +}; + +use crate::base_node_service::handle::BaseNodeEvent; +use futures::FutureExt; +use log::*; +use serde::{Deserialize, Serialize}; +use std::sync::Arc; +use tari_common_types::types::HashOutput; +use tari_comms::{connectivity::ConnectivityRequester, peer_manager::Peer, types::CommsPublicKey, NodeIdentity}; +use tari_core::transactions::{tari_amount::MicroTari, CryptoFactories}; +use tari_shutdown::{Shutdown, ShutdownSignal}; +use tokio::{ + sync::{broadcast, watch}, + task, +}; + +pub const LOG_TARGET: &str = "wallet::utxo_scanning"; + +pub struct UtxoScannerService +where TBackend: WalletBackend + 'static +{ + pub(crate) resources: UtxoScannerResources, + pub(crate) retry_limit: usize, + pub(crate) peer_seeds: Vec, + pub(crate) mode: UtxoScannerMode, + pub(crate) shutdown_signal: ShutdownSignal, + pub(crate) event_sender: broadcast::Sender, + pub(crate) base_node_service: BaseNodeServiceHandle, +} + +impl UtxoScannerService +where TBackend: WalletBackend + 'static +{ + #[allow(clippy::too_many_arguments)] + pub fn new( + peer_seeds: Vec, + retry_limit: usize, + mode: UtxoScannerMode, + resources: UtxoScannerResources, + shutdown_signal: ShutdownSignal, + event_sender: broadcast::Sender, + base_node_service: BaseNodeServiceHandle, + ) -> Self { + Self { + resources, + peer_seeds, + retry_limit, + mode, + shutdown_signal, + event_sender, + base_node_service, + } + } + + fn create_task(&self, shutdown_signal: ShutdownSignal) -> UtxoScannerTask { + UtxoScannerTask { + resources: self.resources.clone(), + peer_seeds: self.peer_seeds.clone(), + event_sender: self.event_sender.clone(), + retry_limit: self.retry_limit, + peer_index: 0, + num_retries: 1, + mode: self.mode.clone(), + shutdown_signal, + } + } + + pub fn builder() -> UtxoScannerServiceBuilder { + UtxoScannerServiceBuilder::default() + } + + pub fn get_event_receiver(&mut self) -> broadcast::Receiver { + self.event_sender.subscribe() + } + + pub async fn run(mut self) -> Result<(), WalletError> { + info!(target: LOG_TARGET, "UTXO scanning service starting"); + + if self.mode == UtxoScannerMode::Recovery { + let task = self.create_task(self.shutdown_signal.clone()); + task::spawn(async move { + if let Err(err) = task.run().await { + error!(target: LOG_TARGET, "Error scanning UTXOs: {}", err); + } + }); + return Ok(()); + } + + let mut main_shutdown = self.shutdown_signal.clone(); + let mut base_node_service_event_stream = self.base_node_service.get_event_stream(); + + loop { + let mut local_shutdown = Shutdown::new(); + let task = self.create_task(local_shutdown.to_signal()); + let mut task_join_handle = task::spawn(async move { + if let Err(err) = task.run().await { + error!(target: LOG_TARGET, "Error scanning UTXOs: {}", err); + } + }) + .fuse(); + + loop { + tokio::select! { + event = base_node_service_event_stream.recv() => { + match event { + Ok(e) => { + if let BaseNodeEvent::NewBlockDetected(h) = (*e).clone() { + debug!(target: LOG_TARGET, "New block event received: {}", h); + if local_shutdown.is_triggered() { + debug!(target: LOG_TARGET, "Starting new round of UTXO scanning"); + break; + } + } + }, + Err(e) => debug!(target: LOG_TARGET, "Lagging read on base node event broadcast channel: {}", e), + }; + }, + _ = &mut task_join_handle => { + debug!(target: LOG_TARGET, "UTXO scanning round completed"); + local_shutdown.trigger(); + } + _ = self.resources.current_base_node_watcher.changed() => { + debug!(target: LOG_TARGET, "Base node change detected."); + let peer = self.resources.current_base_node_watcher.borrow().as_ref().cloned(); + if let Some(peer) = peer { + self.peer_seeds = vec![peer.public_key]; + } + local_shutdown.trigger(); + }, + _ = main_shutdown.wait() => { + // this will stop the task if its running, and let that thread exit gracefully + local_shutdown.trigger(); + info!(target: LOG_TARGET, "UTXO scanning service shutting down because it received the shutdown signal"); + return Ok(()); + } + } + } + } + } +} + +#[derive(Clone, Default, Serialize, Deserialize)] +pub struct ScanningMetadata { + pub total_amount: MicroTari, + pub number_of_utxos: u64, + pub utxo_index: u64, + pub height_hash: HashOutput, +} + +#[derive(Clone)] +pub struct UtxoScannerResources { + pub db: WalletDatabase, + pub comms_connectivity: ConnectivityRequester, + pub current_base_node_watcher: watch::Receiver>, + pub output_manager_service: OutputManagerHandle, + pub transaction_service: TransactionServiceHandle, + pub node_identity: Arc, + pub factories: CryptoFactories, +} diff --git a/base_layer/wallet/src/utxo_scanner_service/utxo_scanning.rs b/base_layer/wallet/src/utxo_scanner_service/utxo_scanner_task.rs similarity index 67% rename from base_layer/wallet/src/utxo_scanner_service/utxo_scanning.rs rename to base_layer/wallet/src/utxo_scanner_service/utxo_scanner_task.rs index b00ebc839d..e6c1283641 100644 --- a/base_layer/wallet/src/utxo_scanner_service/utxo_scanning.rs +++ b/base_layer/wallet/src/utxo_scanner_service/utxo_scanner_task.rs @@ -20,33 +20,19 @@ // WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE // USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -use std::{ - convert::TryFrom, - sync::{ - atomic::{AtomicBool, Ordering}, - Arc, - }, - time::{Duration, Instant}, -}; - use chrono::Utc; use futures::StreamExt; use log::*; -use serde::{Deserialize, Serialize}; -use tokio::{ - sync::{broadcast, watch}, - task, - time, - time::MissedTickBehavior, -}; -use tari_common_types::{transaction::TxId, types::HashOutput}; +use std::{ + convert::TryFrom, + time::{Duration, Instant}, +}; +use tari_common_types::transaction::TxId; use tari_comms::{ - connectivity::ConnectivityRequester, - peer_manager::{NodeId, Peer}, + peer_manager::NodeId, protocol::rpc::{RpcError, RpcStatus}, types::CommsPublicKey, - NodeIdentity, PeerConnection, }; use tari_core::{ @@ -59,22 +45,20 @@ use tari_core::{ transactions::{ tari_amount::MicroTari, transaction_entities::{TransactionOutput, UnblindedOutput}, - CryptoFactories, }, }; use tari_shutdown::ShutdownSignal; +use tokio::sync::broadcast; use crate::{ - connectivity_service::WalletConnectivityInterface, error::WalletError, - output_manager_service::handle::OutputManagerHandle, - storage::{ - database::{WalletBackend, WalletDatabase}, - sqlite_db::WalletSqliteDatabase, + storage::database::WalletBackend, + utxo_scanner_service::{ + error::UtxoScannerError, + handle::UtxoScannerEvent, + service::{ScanningMetadata, UtxoScannerResources}, + uxto_scanner_service_builder::UtxoScannerMode, }, - transaction_service::handle::TransactionServiceHandle, - utxo_scanner_service::{error::UtxoScannerError, handle::UtxoScannerEvent}, - WalletSqlite, }; pub const LOG_TARGET: &str = "wallet::utxo_scanning"; @@ -82,139 +66,17 @@ pub const LOG_TARGET: &str = "wallet::utxo_scanning"; pub const RECOVERY_KEY: &str = "recovery_data"; const SCANNING_KEY: &str = "scanning_data"; -#[derive(Debug, Clone, PartialEq)] -pub enum UtxoScannerMode { - Recovery, - Scanning, -} - -impl Default for UtxoScannerMode { - fn default() -> UtxoScannerMode { - UtxoScannerMode::Recovery - } -} - -#[derive(Debug, Default, Clone)] -pub struct UtxoScannerServiceBuilder { - retry_limit: usize, - peers: Vec, - mode: Option, - scanning_interval: Option, -} - -#[derive(Clone)] -struct UtxoScannerResources { - pub db: WalletDatabase, - pub comms_connectivity: ConnectivityRequester, - pub current_base_node_watcher: watch::Receiver>, - pub output_manager_service: OutputManagerHandle, - pub transaction_service: TransactionServiceHandle, - pub node_identity: Arc, - pub factories: CryptoFactories, -} - -impl UtxoScannerServiceBuilder { - /// Set the maximum number of times we retry recovery. A failed recovery is counted as _all_ peers have failed. - /// i.e. worst-case number of recovery attempts = number of sync peers * retry limit - pub fn with_retry_limit(&mut self, limit: usize) -> &mut Self { - self.retry_limit = limit; - self - } - - pub fn with_scanning_interval(&mut self, interval: Duration) -> &mut Self { - self.scanning_interval = Some(interval); - self - } - - pub fn with_peers(&mut self, peer_public_keys: Vec) -> &mut Self { - self.peers = peer_public_keys; - self - } - - pub fn with_mode(&mut self, mode: UtxoScannerMode) -> &mut Self { - self.mode = Some(mode); - self - } - - pub fn build_with_wallet( - &mut self, - wallet: &WalletSqlite, - shutdown_signal: ShutdownSignal, - ) -> UtxoScannerService { - let resources = UtxoScannerResources { - db: wallet.db.clone(), - comms_connectivity: wallet.comms.connectivity(), - current_base_node_watcher: wallet.wallet_connectivity.get_current_base_node_watcher(), - output_manager_service: wallet.output_manager_service.clone(), - transaction_service: wallet.transaction_service.clone(), - node_identity: wallet.comms.node_identity(), - factories: wallet.factories.clone(), - }; - - let (event_sender, _) = broadcast::channel(200); - - let interval = self - .scanning_interval - .unwrap_or_else(|| Duration::from_secs(60 * 60 * 12)); - UtxoScannerService::new( - self.peers.drain(..).collect(), - self.retry_limit, - self.mode.clone().unwrap_or_default(), - resources, - interval, - shutdown_signal, - event_sender, - ) - } - - #[allow(clippy::too_many_arguments)] - pub fn build_with_resources( - &mut self, - db: WalletDatabase, - comms_connectivity: ConnectivityRequester, - base_node_watcher: watch::Receiver>, - output_manager_service: OutputManagerHandle, - transaction_service: TransactionServiceHandle, - node_identity: Arc, - factories: CryptoFactories, - shutdown_signal: ShutdownSignal, - event_sender: broadcast::Sender, - ) -> UtxoScannerService { - let resources = UtxoScannerResources { - db, - comms_connectivity, - current_base_node_watcher: base_node_watcher, - output_manager_service, - transaction_service, - node_identity, - factories, - }; - let interval = self - .scanning_interval - .unwrap_or_else(|| Duration::from_secs(60 * 60 * 12)); - UtxoScannerService::new( - self.peers.drain(..).collect(), - self.retry_limit, - self.mode.clone().unwrap_or_default(), - resources, - interval, - shutdown_signal, - event_sender, - ) - } -} - -struct UtxoScannerTask +pub struct UtxoScannerTask where TBackend: WalletBackend + 'static { - resources: UtxoScannerResources, - event_sender: broadcast::Sender, - retry_limit: usize, - num_retries: usize, - peer_seeds: Vec, - peer_index: usize, - mode: UtxoScannerMode, - run_flag: Arc, + pub(crate) resources: UtxoScannerResources, + pub(crate) event_sender: broadcast::Sender, + pub(crate) retry_limit: usize, + pub(crate) num_retries: usize, + pub(crate) peer_seeds: Vec, + pub(crate) peer_index: usize, + pub(crate) mode: UtxoScannerMode, + pub(crate) shutdown_signal: ShutdownSignal, } impl UtxoScannerTask where TBackend: WalletBackend + 'static @@ -284,7 +146,7 @@ where TBackend: WalletBackend + 'static let start_index = self.get_start_utxo_mmr_pos(&mut client).await?; let tip_header = self.get_chain_tip_header(&mut client).await?; let output_mmr_size = tip_header.output_mmr_size; - if !self.run_flag.load(Ordering::Relaxed) { + if self.shutdown_signal.is_triggered() { // if running is set to false, we know its been canceled upstream so lets exit the loop return Ok((total_scanned, start_index, timer.elapsed())); } @@ -404,7 +266,7 @@ where TBackend: WalletBackend + 'static let mut last_utxo_index = 0u64; let mut iteration_count = 0u64; while let Some(response) = utxo_stream.next().await { - if !self.run_flag.load(Ordering::Relaxed) { + if self.shutdown_signal.is_triggered() { // if running is set to false, we know its been canceled upstream so lets exit the loop return Ok(total_scanned as u64); } @@ -581,10 +443,9 @@ where TBackend: WalletBackend + 'static Ok(tx_id) } - async fn run(mut self) -> Result<(), UtxoScannerError> { - self.run_flag.store(true, Ordering::Relaxed); + pub async fn run(mut self) -> Result<(), UtxoScannerError> { loop { - if !self.run_flag.load(Ordering::Relaxed) { + if self.shutdown_signal.is_triggered() { // if running is set to false, we know its been canceled upstream so lets exit the loop return Ok(()); } @@ -672,118 +533,6 @@ where TBackend: WalletBackend + 'static } } -pub struct UtxoScannerService -where TBackend: WalletBackend + 'static -{ - resources: UtxoScannerResources, - retry_limit: usize, - peer_seeds: Vec, - mode: UtxoScannerMode, - is_running: Arc, - scan_for_utxo_interval: Duration, - shutdown_signal: ShutdownSignal, - event_sender: broadcast::Sender, -} - -impl UtxoScannerService -where TBackend: WalletBackend + 'static -{ - #[allow(clippy::too_many_arguments)] - fn new( - peer_seeds: Vec, - retry_limit: usize, - mode: UtxoScannerMode, - resources: UtxoScannerResources, - scan_for_utxo_interval: Duration, - shutdown_signal: ShutdownSignal, - event_sender: broadcast::Sender, - ) -> Self { - Self { - resources, - peer_seeds, - retry_limit, - mode, - is_running: Arc::new(AtomicBool::new(false)), - scan_for_utxo_interval, - shutdown_signal, - event_sender, - } - } - - fn create_task(&self) -> UtxoScannerTask { - UtxoScannerTask { - resources: self.resources.clone(), - peer_seeds: self.peer_seeds.clone(), - event_sender: self.event_sender.clone(), - retry_limit: self.retry_limit, - peer_index: 0, - num_retries: 1, - mode: self.mode.clone(), - run_flag: self.is_running.clone(), - } - } - - pub fn builder() -> UtxoScannerServiceBuilder { - UtxoScannerServiceBuilder::default() - } - - pub fn get_event_receiver(&mut self) -> broadcast::Receiver { - self.event_sender.subscribe() - } - - pub async fn run(mut self) -> Result<(), WalletError> { - info!( - target: LOG_TARGET, - "UTXO scanning service starting (interval = {:.2?})", self.scan_for_utxo_interval - ); - - let mut shutdown = self.shutdown_signal.clone(); - let start_at = Instant::now() + Duration::from_secs(1); - let mut work_interval = time::interval_at(start_at.into(), self.scan_for_utxo_interval); - work_interval.set_missed_tick_behavior(MissedTickBehavior::Delay); - loop { - tokio::select! { - _ = work_interval.tick() => { - let running_flag = self.is_running.clone(); - if !running_flag.load(Ordering::SeqCst) { - let task = self.create_task(); - debug!(target: LOG_TARGET, "UTXO scanning service starting scan for utxos"); - task::spawn(async move { - if let Err(err) = task.run().await { - error!(target: LOG_TARGET, "Error scanning UTXOs: {}", err); - } - //we make sure the flag is set to false here - running_flag.store(false, Ordering::Relaxed); - }); - if self.mode == UtxoScannerMode::Recovery { - return Ok(()); - } - } - }, - _ = self.resources.current_base_node_watcher.changed() => { - debug!(target: LOG_TARGET, "Base node change detected."); - let peer = self.resources.current_base_node_watcher.borrow().as_ref().cloned(); - - // If we are recovering we will stick to the initially provided seeds - if self.mode != UtxoScannerMode::Recovery { - if let Some(peer) = peer { - self.peer_seeds = vec![peer.public_key]; - } - } - - self.is_running.store(false, Ordering::Relaxed); - }, - _ = shutdown.wait() => { - // this will stop the task if its running, and let that thread exit gracefully - self.is_running.store(false, Ordering::Relaxed); - info!(target: LOG_TARGET, "UTXO scanning service shutting down because it received the shutdown signal"); - return Ok(()); - } - } - } - } -} - fn convert_response_to_transaction_outputs( response: Vec>, last_utxo_index: u64, @@ -794,12 +543,12 @@ fn convert_response_to_transaction_outputs( .collect::, _>>()?; let current_utxo_index = response - // Assumes correct ordering which is otherwise not required for this protocol - .last() - .ok_or_else(|| { - UtxoScannerError::BaseNodeResponseError("Invalid response from base node: response was empty".to_string()) - })? - .mmr_index; + // Assumes correct ordering which is otherwise not required for this protocol + .last() + .ok_or_else(|| { + UtxoScannerError::BaseNodeResponseError("Invalid response from base node: response was empty".to_string()) + })? + .mmr_index; if current_utxo_index < last_utxo_index { return Err(UtxoScannerError::BaseNodeResponseError( "Invalid response from base node: mmr index must be non-decreasing".to_string(), @@ -817,11 +566,3 @@ fn convert_response_to_transaction_outputs( .collect::, _>>()?; Ok((outputs, current_utxo_index)) } - -#[derive(Clone, Default, Serialize, Deserialize)] -struct ScanningMetadata { - pub total_amount: MicroTari, - pub number_of_utxos: u64, - pub utxo_index: u64, - pub height_hash: HashOutput, -} diff --git a/base_layer/wallet/src/utxo_scanner_service/uxto_scanner_service_builder.rs b/base_layer/wallet/src/utxo_scanner_service/uxto_scanner_service_builder.rs new file mode 100644 index 0000000000..e57c461b8b --- /dev/null +++ b/base_layer/wallet/src/utxo_scanner_service/uxto_scanner_service_builder.rs @@ -0,0 +1,143 @@ +// Copyright 2021. The Tari Project +// +// Redistribution and use in source and binary forms, with or without modification, are permitted provided that the +// following conditions are met: +// +// 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following +// disclaimer. +// +// 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the +// following disclaimer in the documentation and/or other materials provided with the distribution. +// +// 3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote +// products derived from this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, +// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, +// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE +// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +use crate::{ + base_node_service::handle::BaseNodeServiceHandle, + connectivity_service::WalletConnectivityInterface, + output_manager_service::handle::OutputManagerHandle, + storage::{ + database::{WalletBackend, WalletDatabase}, + sqlite_db::WalletSqliteDatabase, + }, + transaction_service::handle::TransactionServiceHandle, + utxo_scanner_service::{ + handle::UtxoScannerEvent, + service::{UtxoScannerResources, UtxoScannerService}, + }, + WalletSqlite, +}; +use std::sync::Arc; +use tari_comms::{connectivity::ConnectivityRequester, peer_manager::Peer, types::CommsPublicKey, NodeIdentity}; +use tari_core::transactions::CryptoFactories; +use tari_shutdown::ShutdownSignal; +use tokio::sync::{broadcast, watch}; + +#[derive(Debug, Clone, PartialEq)] +pub enum UtxoScannerMode { + Recovery, + Scanning, +} + +impl Default for UtxoScannerMode { + fn default() -> UtxoScannerMode { + UtxoScannerMode::Recovery + } +} + +#[derive(Debug, Default, Clone)] +pub struct UtxoScannerServiceBuilder { + retry_limit: usize, + peers: Vec, + mode: Option, +} + +impl UtxoScannerServiceBuilder { + /// Set the maximum number of times we retry recovery. A failed recovery is counted as _all_ peers have failed. + /// i.e. worst-case number of recovery attempts = number of sync peers * retry limit + pub fn with_retry_limit(&mut self, limit: usize) -> &mut Self { + self.retry_limit = limit; + self + } + + pub fn with_peers(&mut self, peer_public_keys: Vec) -> &mut Self { + self.peers = peer_public_keys; + self + } + + pub fn with_mode(&mut self, mode: UtxoScannerMode) -> &mut Self { + self.mode = Some(mode); + self + } + + pub fn build_with_wallet( + &mut self, + wallet: &WalletSqlite, + shutdown_signal: ShutdownSignal, + ) -> UtxoScannerService { + let resources = UtxoScannerResources { + db: wallet.db.clone(), + comms_connectivity: wallet.comms.connectivity(), + current_base_node_watcher: wallet.wallet_connectivity.get_current_base_node_watcher(), + output_manager_service: wallet.output_manager_service.clone(), + transaction_service: wallet.transaction_service.clone(), + node_identity: wallet.comms.node_identity(), + factories: wallet.factories.clone(), + }; + + let (event_sender, _) = broadcast::channel(200); + + UtxoScannerService::new( + self.peers.drain(..).collect(), + self.retry_limit, + self.mode.clone().unwrap_or_default(), + resources, + shutdown_signal, + event_sender, + wallet.base_node_service.clone(), + ) + } + + #[allow(clippy::too_many_arguments)] + pub fn build_with_resources( + &mut self, + db: WalletDatabase, + comms_connectivity: ConnectivityRequester, + base_node_watcher: watch::Receiver>, + output_manager_service: OutputManagerHandle, + transaction_service: TransactionServiceHandle, + node_identity: Arc, + factories: CryptoFactories, + shutdown_signal: ShutdownSignal, + event_sender: broadcast::Sender, + base_node_service: BaseNodeServiceHandle, + ) -> UtxoScannerService { + let resources = UtxoScannerResources { + db, + comms_connectivity, + current_base_node_watcher: base_node_watcher, + output_manager_service, + transaction_service, + node_identity, + factories, + }; + + UtxoScannerService::new( + self.peers.drain(..).collect(), + self.retry_limit, + self.mode.clone().unwrap_or_default(), + resources, + shutdown_signal, + event_sender, + base_node_service, + ) + } +} diff --git a/base_layer/wallet/src/wallet.rs b/base_layer/wallet/src/wallet.rs index 169dfc2703..35d926a3c9 100644 --- a/base_layer/wallet/src/wallet.rs +++ b/base_layer/wallet/src/wallet.rs @@ -80,7 +80,7 @@ use crate::{ TransactionServiceInitializer, }, types::KeyDigest, - utxo_scanner_service::{handle::UtxoScannerHandle, UtxoScannerServiceInitializer}, + utxo_scanner_service::{handle::UtxoScannerHandle, UtxoScannerServiceInitializer, RECOVERY_KEY}, }; use tari_common_types::transaction::TxId; use tari_key_manager::cipher_seed::CipherSeed; @@ -188,7 +188,6 @@ where )) .add_initializer(WalletConnectivityInitializer::new(config.base_node_service_config)) .add_initializer(UtxoScannerServiceInitializer::new( - config.scan_for_utxo_interval, wallet_database.clone(), factories.clone(), node_identity.clone(), @@ -498,7 +497,6 @@ where /// Utility function to find out if there is data in the database indicating that there is an incomplete recovery /// process in progress pub async fn is_recovery_in_progress(&self) -> Result { - use crate::utxo_scanner_service::utxo_scanning::RECOVERY_KEY; Ok(self.db.get_client_key_value(RECOVERY_KEY.to_string()).await?.is_some()) } } diff --git a/base_layer/wallet/tests/wallet/mod.rs b/base_layer/wallet/tests/wallet/mod.rs index c33481f3c2..7a16a844ca 100644 --- a/base_layer/wallet/tests/wallet/mod.rs +++ b/base_layer/wallet/tests/wallet/mod.rs @@ -154,7 +154,6 @@ async fn create_wallet( None, None, None, - None, ); let metadata = ChainMetadata::new(std::i64::MAX as u64, Vec::new(), 0, 0, 0); @@ -704,7 +703,6 @@ async fn test_import_utxo() { None, None, None, - None, ); let mut alice_wallet = Wallet::start( config, diff --git a/base_layer/wallet_ffi/src/lib.rs b/base_layer/wallet_ffi/src/lib.rs index 566ec880d4..ad0153f422 100644 --- a/base_layer/wallet_ffi/src/lib.rs +++ b/base_layer/wallet_ffi/src/lib.rs @@ -153,7 +153,7 @@ use tari_wallet::{ models::{CompletedTransaction, InboundTransaction, OutboundTransaction}, }, }, - utxo_scanner_service::utxo_scanning::{UtxoScannerService, RECOVERY_KEY}, + utxo_scanner_service::{service::UtxoScannerService, RECOVERY_KEY}, Wallet, WalletConfig, WalletSqlite, @@ -3217,7 +3217,6 @@ pub unsafe extern "C" fn wallet_create( None, None, None, - None, ); let mut recovery_lookup = match runtime.block_on(wallet_database.get_client_key_value(RECOVERY_KEY.to_owned())) { diff --git a/common/config/presets/console_wallet.toml b/common/config/presets/console_wallet.toml index 957fceece1..ea7a7a4636 100644 --- a/common/config/presets/console_wallet.toml +++ b/common/config/presets/console_wallet.toml @@ -69,9 +69,6 @@ base_node_update_publisher_channel_size = 500 # (options: "DirectOnly", "StoreAndForwardOnly", DirectAndStoreAndForward". default: "DirectAndStoreAndForward"). #transaction_routing_mechanism = "DirectAndStoreAndForward" -# UTXO scanning service interval (default = 12 hours, i.e. 60 * 60 * 12 seconds) -scan_for_utxo_interval = 180 - # When running the console wallet in command mode, use these values to determine what "stage" and timeout to wait # for sent transactions. # The stages are: diff --git a/common/src/configuration/global.rs b/common/src/configuration/global.rs index 04086be1f0..34ba629e92 100644 --- a/common/src/configuration/global.rs +++ b/common/src/configuration/global.rs @@ -101,7 +101,6 @@ pub struct GlobalConfig { pub fetch_utxos_timeout: Duration, pub service_request_timeout: Duration, pub base_node_query_timeout: Duration, - pub scan_for_utxo_interval: Duration, pub saf_expiry_duration: Duration, pub transaction_broadcast_monitoring_timeout: Duration, pub transaction_chain_monitoring_timeout: Duration, @@ -445,11 +444,6 @@ fn convert_node_config( cfg.get_int(key) .map_err(|e| ConfigurationError::new(key, &e.to_string()))? as u64, ); - let key = "wallet.scan_for_utxo_interval"; - let scan_for_utxo_interval = Duration::from_secs( - cfg.get_int(key) - .map_err(|e| ConfigurationError::new(key, &e.to_string()))? as u64, - ); let key = "wallet.saf_expiry_duration"; let saf_expiry_duration = Duration::from_secs(optional(cfg.get_int(key))?.unwrap_or(10800) as u64); @@ -755,7 +749,6 @@ fn convert_node_config( fetch_utxos_timeout, service_request_timeout, base_node_query_timeout, - scan_for_utxo_interval, saf_expiry_duration, transaction_broadcast_monitoring_timeout, transaction_chain_monitoring_timeout, diff --git a/common/src/configuration/utils.rs b/common/src/configuration/utils.rs index 7b0f2c4cf0..78aaa5c73e 100644 --- a/common/src/configuration/utils.rs +++ b/common/src/configuration/utils.rs @@ -118,7 +118,6 @@ pub fn default_config(bootstrap: &ConfigBootstrap) -> Config { cfg.set_default("wallet.base_node_service_refresh_interval", 5).unwrap(); cfg.set_default("wallet.base_node_service_request_max_age", 60).unwrap(); cfg.set_default("wallet.balance_enquiry_cooldown_period", 1).unwrap(); - cfg.set_default("wallet.scan_for_utxo_interval", 60 * 60 * 12).unwrap(); cfg.set_default("wallet.transaction_broadcast_monitoring_timeout", 60) .unwrap(); cfg.set_default("wallet.transaction_chain_monitoring_timeout", 60) diff --git a/integration_tests/helpers/config.js b/integration_tests/helpers/config.js index a022190d3f..23653699b8 100644 --- a/integration_tests/helpers/config.js +++ b/integration_tests/helpers/config.js @@ -102,7 +102,6 @@ function baseEnvs(peerSeeds = [], forceSyncPeers = []) { TARI_MINING_NODE__NUM_MINING_THREADS: "1", TARI_MINING_NODE__MINE_ON_TIP_ONLY: true, TARI_MINING_NODE__VALIDATE_TIP_TIMEOUT_SEC: 1, - TARI_WALLET__SCAN_FOR_UTXO_INTERVAL: 5, }; if (forceSyncPeers.length > 0) { envs.TARI_BASE_NODE__LOCALNET__FORCE_SYNC_PEERS = forceSyncPeers.join(",");