Skip to content

Commit

Permalink
Merge branch 'development' into core-prune-inputs
Browse files Browse the repository at this point in the history
* development:
  feat: only trigger UTXO scanning when a new block event is received (tari-project#3620)
  • Loading branch information
sdbondi committed Nov 26, 2021
2 parents 4b1e913 + df1be7e commit 3a48a44
Show file tree
Hide file tree
Showing 20 changed files with 411 additions and 370 deletions.
2 changes: 1 addition & 1 deletion applications/daily_tests/cron_jobs.js
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ async function runWalletRecoveryTest(instances) {
});

notify(
`🙌 Wallet (Pubkey: ${identity.public_key} ) recovered to a block height of ${numScanned}, completed in ${timeDiffMinutes} minutes (${scannedRate} blocks/min). ${recoveredAmount} µT recovered for ${instances} instance(s).`
`🙌 Wallet (Pubkey: ${identity.public_key} ) recovered scanned ${numScanned} UTXO's, completed in ${timeDiffMinutes} minutes (${scannedRate} UTXOs/min). ${recoveredAmount} µT recovered for ${instances} instance(s).`
);
} catch (err) {
console.error(err);
Expand Down
7 changes: 2 additions & 5 deletions applications/tari_console_wallet/src/init/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use rustyline::Editor;
use tari_app_utilities::utilities::create_transport_type;
use tari_common::{exit_codes::ExitCodes, ConfigBootstrap, GlobalConfig};
use tari_comms::{
multiaddr::Multiaddr,
peer_manager::{Peer, PeerFeatures},
types::CommsSecretKey,
NodeIdentity,
Expand Down Expand Up @@ -299,10 +300,7 @@ pub async fn init_wallet(
);

let node_address = match wallet_db.get_node_address().await? {
None => config
.public_address
.clone()
.ok_or_else(|| ExitCodes::ConfigError("node public address error".to_string()))?,
None => config.public_address.clone().unwrap_or_else(Multiaddr::empty),
Some(a) => a,
};

Expand Down Expand Up @@ -403,7 +401,6 @@ pub async fn init_wallet(
config.buffer_size_console_wallet,
)),
Some(config.buffer_rate_limit_console_wallet),
Some(config.scan_for_utxo_interval),
Some(updater_config),
config.autoupdate_check_interval,
);
Expand Down
2 changes: 1 addition & 1 deletion applications/tari_console_wallet/src/recovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use tari_key_manager::mnemonic::Mnemonic;
use tari_shutdown::Shutdown;
use tari_wallet::{
storage::sqlite_db::WalletSqliteDatabase,
utxo_scanner_service::{handle::UtxoScannerEvent, utxo_scanning::UtxoScannerService},
utxo_scanner_service::{handle::UtxoScannerEvent, service::UtxoScannerService},
WalletSqlite,
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,10 +180,8 @@ impl WalletEventMonitor {
match result {
Ok(msg) => {
trace!(target: LOG_TARGET, "Wallet Event Monitor received base node event {:?}", msg);
match (*msg).clone() {
BaseNodeEvent::BaseNodeStateChanged(state) => {
if let BaseNodeEvent::BaseNodeStateChanged(state) = (*msg).clone() {
self.trigger_base_node_state_refresh(state).await;
}
}
},
Err(broadcast::error::RecvError::Lagged(n)) => {
Expand Down
1 change: 1 addition & 0 deletions base_layer/wallet/src/base_node_service/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ pub enum BaseNodeServiceResponse {
#[derive(Clone, Debug, Hash, PartialEq, Eq)]
pub enum BaseNodeEvent {
BaseNodeStateChanged(BaseNodeState),
NewBlockDetected(u64),
}

/// The Base Node Service Handle is a struct that contains the interfaces used to communicate with a running
Expand Down
32 changes: 21 additions & 11 deletions base_layer/wallet/src/base_node_service/monitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ where
},
Err(e @ BaseNodeMonitorError::RpcFailed(_)) => {
warn!(target: LOG_TARGET, "Connectivity failure to base node: {}", e);
self.map_state(move |_| BaseNodeState {
self.update_state(BaseNodeState {
chain_metadata: None,
is_synced: None,
updated: None,
Expand Down Expand Up @@ -134,7 +134,7 @@ where
let tip_info = match interrupt(base_node_watch.changed(), client.get_tip_info()).await {
Some(tip_info) => tip_info?,
None => {
self.map_state(|_| Default::default()).await;
self.update_state(Default::default()).await;
continue;
},
};
Expand Down Expand Up @@ -165,7 +165,8 @@ where

let is_synced = tip_info.is_synced;
let height_of_longest_chain = chain_metadata.height_of_longest_chain();
self.map_state(move |_| BaseNodeState {

self.update_state(BaseNodeState {
chain_metadata: Some(chain_metadata),
is_synced: Some(is_synced),
updated: Some(Utc::now().naive_utc()),
Expand All @@ -184,7 +185,7 @@ where

let delay = time::sleep(self.interval.saturating_sub(latency));
if interrupt(base_node_watch.changed(), delay).await.is_none() {
self.map_state(|_| Default::default()).await;
self.update_state(Default::default()).await;
}
}

Expand All @@ -193,14 +194,23 @@ where
Ok(())
}

async fn map_state<F>(&self, transform: F)
where F: FnOnce(&BaseNodeState) -> BaseNodeState {
let new_state = {
let mut lock = self.state.write().await;
let new_state = transform(&*lock);
*lock = new_state.clone();
new_state
async fn update_state(&self, new_state: BaseNodeState) {
let mut lock = self.state.write().await;
let (new_block_detected, height) = match (new_state.chain_metadata.clone(), (*lock).chain_metadata.clone()) {
(Some(new_metadata), Some(old_metadata)) => (
new_metadata.height_of_longest_chain() != old_metadata.height_of_longest_chain(),
new_metadata.height_of_longest_chain(),
),
(Some(new_metadata), _) => (true, new_metadata.height_of_longest_chain()),
(None, _) => (false, 0),
};

if new_block_detected {
self.publish_event(BaseNodeEvent::NewBlockDetected(height));
}

*lock = new_state.clone();

self.publish_event(BaseNodeEvent::BaseNodeStateChanged(new_state));
}

Expand Down
3 changes: 0 additions & 3 deletions base_layer/wallet/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ pub struct WalletConfig {
pub rate_limit: usize,
pub network: NetworkConsensus,
pub base_node_service_config: BaseNodeServiceConfig,
pub scan_for_utxo_interval: Duration,
pub updater_config: Option<AutoUpdateConfig>,
pub autoupdate_check_interval: Option<Duration>,
}
Expand All @@ -59,7 +58,6 @@ impl WalletConfig {
base_node_service_config: Option<BaseNodeServiceConfig>,
buffer_size: Option<usize>,
rate_limit: Option<usize>,
scan_for_utxo_interval: Option<Duration>,
updater_config: Option<AutoUpdateConfig>,
autoupdate_check_interval: Option<Duration>,
) -> Self {
Expand All @@ -72,7 +70,6 @@ impl WalletConfig {
rate_limit: rate_limit.unwrap_or(50),
network,
base_node_service_config: base_node_service_config.unwrap_or_default(),
scan_for_utxo_interval: scan_for_utxo_interval.unwrap_or_else(|| Duration::from_secs(43200)),
updater_config,
autoupdate_check_interval,
}
Expand Down
1 change: 1 addition & 0 deletions base_layer/wallet/src/output_manager_service/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -383,6 +383,7 @@ where
}
self.last_seen_tip_height = state.chain_metadata.map(|cm| cm.height_of_longest_chain());
},
BaseNodeEvent::NewBlockDetected(_) => {},
}
}

Expand Down
3 changes: 2 additions & 1 deletion base_layer/wallet/src/transaction_service/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ use crate::{
},
types::HashDigest,
util::watch::Watch,
utxo_scanner_service::utxo_scanning::RECOVERY_KEY,
utxo_scanner_service::RECOVERY_KEY,
};
use chrono::{NaiveDateTime, Utc};
use digest::Digest;
Expand Down Expand Up @@ -708,6 +708,7 @@ where
}
self.last_seen_tip_height = state.chain_metadata.map(|cm| cm.height_of_longest_chain());
},
BaseNodeEvent::NewBlockDetected(_) => {},
}
}

Expand Down
47 changes: 12 additions & 35 deletions base_layer/wallet/src/utxo_scanner_service/mod.rs
Original file line number Diff line number Diff line change
@@ -1,51 +1,34 @@
// Copyright 2021. The Tari Project
//
// Redistribution and use in source and binary forms, with or without modification, are permitted provided that the
// following conditions are met:
//
// 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following
// disclaimer.
//
// 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the
// following disclaimer in the documentation and/or other materials provided with the distribution.
//
// 3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote
// products derived from this software without specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

use crate::{
base_node_service::handle::BaseNodeServiceHandle,
connectivity_service::{WalletConnectivityHandle, WalletConnectivityInterface},
output_manager_service::handle::OutputManagerHandle,
storage::database::{WalletBackend, WalletDatabase},
transaction_service::handle::TransactionServiceHandle,
utxo_scanner_service::{
handle::UtxoScannerHandle,
utxo_scanning::{UtxoScannerMode, UtxoScannerService},
service::UtxoScannerService,
uxto_scanner_service_builder::UtxoScannerMode,
},
};
use futures::future;
use log::*;
use std::{sync::Arc, time::Duration};
use std::sync::Arc;
use tari_comms::{connectivity::ConnectivityRequester, NodeIdentity};
use tari_core::transactions::CryptoFactories;
use tari_service_framework::{async_trait, ServiceInitializationError, ServiceInitializer, ServiceInitializerContext};
use tokio::sync::broadcast;

pub mod error;
pub mod handle;
pub mod utxo_scanning;
pub mod service;
mod utxo_scanner_task;
mod uxto_scanner_service_builder;

pub use utxo_scanner_task::RECOVERY_KEY;

const LOG_TARGET: &str = "wallet::utxo_scanner_service::initializer";

pub struct UtxoScannerServiceInitializer<T> {
interval: Duration,
backend: Option<WalletDatabase<T>>,
factories: CryptoFactories,
node_identity: Arc<NodeIdentity>,
Expand All @@ -54,14 +37,8 @@ pub struct UtxoScannerServiceInitializer<T> {
impl<T> UtxoScannerServiceInitializer<T>
where T: WalletBackend + 'static
{
pub fn new(
interval: Duration,
backend: WalletDatabase<T>,
factories: CryptoFactories,
node_identity: Arc<NodeIdentity>,
) -> Self {
pub fn new(backend: WalletDatabase<T>, factories: CryptoFactories, node_identity: Arc<NodeIdentity>) -> Self {
Self {
interval,
backend: Some(backend),
factories,
node_identity,
Expand All @@ -87,19 +64,18 @@ where T: WalletBackend + 'static
.take()
.expect("Cannot start Utxo scanner service without setting a storage backend");
let factories = self.factories.clone();
let interval = self.interval;
let node_identity = self.node_identity.clone();

context.spawn_when_ready(move |handles| async move {
let transaction_service = handles.expect_handle::<TransactionServiceHandle>();
let output_manager_service = handles.expect_handle::<OutputManagerHandle>();
let comms_connectivity = handles.expect_handle::<ConnectivityRequester>();
let wallet_connectivity = handles.expect_handle::<WalletConnectivityHandle>();
let base_node_service_handle = handles.expect_handle::<BaseNodeServiceHandle>();

let scanning_service = UtxoScannerService::<T>::builder()
.with_peers(vec![])
.with_retry_limit(2)
.with_scanning_interval(interval)
.with_mode(UtxoScannerMode::Scanning)
.build_with_resources(
backend,
Expand All @@ -111,6 +87,7 @@ where T: WalletBackend + 'static
factories,
handles.get_shutdown_signal(),
event_sender,
base_node_service_handle,
)
.run();

Expand Down
Loading

0 comments on commit 3a48a44

Please sign in to comment.