Skip to content

Commit

Permalink
feat: add shutdown clone
Browse files Browse the repository at this point in the history
  • Loading branch information
Cifko committed Dec 20, 2022
1 parent 70a9a3f commit ac956c9
Show file tree
Hide file tree
Showing 9 changed files with 118 additions and 116 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 5 additions & 3 deletions applications/tari_base_node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,11 @@ pub use crate::{

const LOG_TARGET: &str = "tari::base_node::app";

pub async fn run_base_node(node_identity: Arc<NodeIdentity>, config: Arc<ApplicationConfig>) -> Result<(), ExitError> {
let shutdown = Shutdown::new();

pub async fn run_base_node(
shutdown: Shutdown,
node_identity: Arc<NodeIdentity>,
config: Arc<ApplicationConfig>,
) -> Result<(), ExitError> {
let data_dir = config.base_node.data_dir.clone();
let data_dir_str = data_dir.clone().into_os_string().into_string().unwrap();

Expand Down
12 changes: 8 additions & 4 deletions applications/tari_console_wallet/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ use crate::init::{boot_with_password, confirm_seed_words, wallet_mode};

pub const LOG_TARGET: &str = "wallet::console_wallet::main";

pub fn run_wallet(runtime: Runtime, config: &mut ApplicationConfig) -> Result<(), ExitError> {
pub fn run_wallet(shutdown: &mut Shutdown, runtime: Runtime, config: &mut ApplicationConfig) -> Result<(), ExitError> {
let data_dir = config.wallet.data_dir.clone();
let data_dir_str = data_dir.clone().into_os_string().into_string().unwrap();

Expand Down Expand Up @@ -84,10 +84,15 @@ pub fn run_wallet(runtime: Runtime, config: &mut ApplicationConfig) -> Result<()
command2: None,
};

run_wallet_with_cli(runtime, config, cli)
run_wallet_with_cli(shutdown, runtime, config, cli)
}

pub fn run_wallet_with_cli(runtime: Runtime, config: &mut ApplicationConfig, cli: Cli) -> Result<(), ExitError> {
pub fn run_wallet_with_cli(
shutdown: &mut Shutdown,
runtime: Runtime,
config: &mut ApplicationConfig,
cli: Cli,
) -> Result<(), ExitError> {
info!(
target: LOG_TARGET,
"== {} ({}) ==",
Expand All @@ -109,7 +114,6 @@ pub fn run_wallet_with_cli(runtime: Runtime, config: &mut ApplicationConfig, cli
// get command line password if provided
let seed_words_file_name = cli.seed_words_file_name.clone();

let mut shutdown = Shutdown::new();
let shutdown_signal = shutdown.to_signal();

if cli.change_password {
Expand Down
1 change: 1 addition & 0 deletions infrastructure/shutdown/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ use crate::oneshot_trigger::OneshotSignal;
///
/// _Note_: This will trigger when dropped, so the `Shutdown` instance should be held as
/// long as required by the application.
#[derive(Clone, Debug)]
pub struct Shutdown(oneshot_trigger::OneshotTrigger<()>);
impl Shutdown {
pub fn new() -> Self {
Expand Down
11 changes: 7 additions & 4 deletions infrastructure/shutdown/src/oneshot_trigger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
use std::{
future::Future,
pin::Pin,
sync::{Arc, Mutex},
task::{Context, Poll},
};

Expand All @@ -36,16 +37,17 @@ pub fn channel<T: Clone>() -> OneshotTrigger<T> {
OneshotTrigger::new()
}

#[derive(Clone, Debug)]
pub struct OneshotTrigger<T> {
sender: Option<oneshot::Sender<T>>,
sender: Arc<Mutex<Option<oneshot::Sender<T>>>>,
signal: OneshotSignal<T>,
}

impl<T: Clone> OneshotTrigger<T> {
pub fn new() -> Self {
let (tx, rx) = oneshot::channel();
Self {
sender: Some(tx),
sender: Arc::new(Mutex::new(Some(tx))),
signal: rx.shared().into(),
}
}
Expand All @@ -55,13 +57,14 @@ impl<T: Clone> OneshotTrigger<T> {
}

pub fn broadcast(&mut self, item: T) {
if let Some(tx) = self.sender.take() {
let mut x = self.sender.lock().unwrap();
if let Some(tx) = (*x).take() {
let _result = tx.send(item);
}
}

pub fn is_used(&self) -> bool {
self.sender.is_none()
self.sender.lock().unwrap().is_none()
}
}

Expand Down
1 change: 1 addition & 0 deletions integration_tests/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ tari_base_node = { path = "../applications/tari_base_node" }
tari_console_wallet = { path = "../applications/tari_console_wallet" }
tari_wallet = { path = "../base_layer/wallet" }
tari_common_types = { path = "../base_layer/common_types" }
tari_shutdown = { path = "../infrastructure/shutdown" }

anyhow = "1.0.53"
async-trait = "0.1.50"
Expand Down
2 changes: 1 addition & 1 deletion integration_tests/tests/cucumber.rs
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ async fn main() {
.summarized()
.assert_normalized(),
)
.after(|feature,rule,scenario,_ev,maybe_world| {
.after(|_feature,_rule,scenario,_ev,maybe_world| {
Box::pin(async move {
maybe_world.unwrap().after(scenario).await;
})
Expand Down
104 changes: 48 additions & 56 deletions integration_tests/tests/utils/base_node_process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,9 @@ use tari_common::configuration::CommonConfig;
use tari_comms::{multiaddr::Multiaddr, peer_manager::PeerFeatures, NodeIdentity};
use tari_comms_dht::DhtConfig;
use tari_p2p::{auto_update::AutoUpdateConfig, Network, PeerSeedsConfig, TransportType};
use tari_shutdown::Shutdown;
use tempfile::tempdir;
use tokio::{
task,
time::{sleep_until, Instant},
};
use tokio::task;
use tonic::transport::Channel;

use crate::TariWorld;
Expand All @@ -50,7 +48,7 @@ pub struct BaseNodeProcess {
pub identity: NodeIdentity,
pub temp_dir_path: String,
pub is_seed_node: bool,
pub kill_signal: Option<tokio::sync::oneshot::Sender<()>>,
pub kill_signal: Shutdown,
}

impl Drop for BaseNodeProcess {
Expand Down Expand Up @@ -86,15 +84,15 @@ pub async fn spawn_base_node(world: &mut TariWorld, is_seed_node: bool, bn_name:
let temp_dir = tempdir().unwrap();
let temp_dir_path = temp_dir.path().display().to_string();

let (kill_signal_sender, kill_signal_receiver) = tokio::sync::oneshot::channel::<()>();
let shutdown = Shutdown::new();
let process = BaseNodeProcess {
name: bn_name.clone(),
port,
grpc_port,
identity,
temp_dir_path,
is_seed_node,
kill_signal: Some(kill_signal_sender),
kill_signal: shutdown.clone(),
};

let name_cloned = bn_name.clone();
Expand All @@ -111,51 +109,47 @@ pub async fn spawn_base_node(world: &mut TariWorld, is_seed_node: bool, bn_name:

let mut common_config = CommonConfig::default();
common_config.base_path = temp_dir.as_ref().to_path_buf();
task::spawn(futures::future::select(
kill_signal_receiver,
Box::pin(async move {
let mut base_node_config = tari_base_node::ApplicationConfig {
common: common_config,
auto_update: AutoUpdateConfig::default(),
base_node: BaseNodeConfig::default(),
metrics: MetricsConfig::default(),
peer_seeds: PeerSeedsConfig {
peer_seeds: peer_addresses.into(),
..Default::default()
},
};

println!("Using base_node temp_dir: {}", temp_dir.path().display());
base_node_config.base_node.network = Network::LocalNet;
base_node_config.base_node.grpc_enabled = true;
base_node_config.base_node.grpc_address =
Some(format!("/ip4/127.0.0.1/tcp/{}", grpc_port).parse().unwrap());
base_node_config.base_node.report_grpc_error = true;

base_node_config.base_node.data_dir = temp_dir.path().to_path_buf();
base_node_config.base_node.identity_file = temp_dir.path().join("base_node_id.json");
base_node_config.base_node.tor_identity_file = temp_dir.path().join("base_node_tor_id.json");

base_node_config.base_node.lmdb_path = temp_dir.path().to_path_buf();
base_node_config.base_node.p2p.transport.transport_type = TransportType::Tcp;
base_node_config.base_node.p2p.transport.tcp.listener_address =
format!("/ip4/127.0.0.1/tcp/{}", port).parse().unwrap();
base_node_config.base_node.p2p.public_address =
Some(base_node_config.base_node.p2p.transport.tcp.listener_address.clone());
base_node_config.base_node.p2p.datastore_path = temp_dir.path().to_path_buf();
base_node_config.base_node.p2p.dht = DhtConfig::default_local_test();
base_node_config.base_node.p2p.allow_test_addresses = true;

println!(
"Initializing base node: name={}; port={}; grpc_port={}; is_seed_node={}",
name_cloned, port, grpc_port, is_seed_node
);
let result = run_base_node(Arc::new(base_node_identity), Arc::new(base_node_config)).await;
if let Err(e) = result {
panic!("{:?}", e);
}
}),
));
task::spawn(async move {
let mut base_node_config = tari_base_node::ApplicationConfig {
common: common_config,
auto_update: AutoUpdateConfig::default(),
base_node: BaseNodeConfig::default(),
metrics: MetricsConfig::default(),
peer_seeds: PeerSeedsConfig {
peer_seeds: peer_addresses.into(),
..Default::default()
},
};

println!("Using base_node temp_dir: {}", temp_dir.path().display());
base_node_config.base_node.network = Network::LocalNet;
base_node_config.base_node.grpc_enabled = true;
base_node_config.base_node.grpc_address = Some(format!("/ip4/127.0.0.1/tcp/{}", grpc_port).parse().unwrap());
base_node_config.base_node.report_grpc_error = true;

base_node_config.base_node.data_dir = temp_dir.path().to_path_buf();
base_node_config.base_node.identity_file = temp_dir.path().join("base_node_id.json");
base_node_config.base_node.tor_identity_file = temp_dir.path().join("base_node_tor_id.json");

base_node_config.base_node.lmdb_path = temp_dir.path().to_path_buf();
base_node_config.base_node.p2p.transport.transport_type = TransportType::Tcp;
base_node_config.base_node.p2p.transport.tcp.listener_address =
format!("/ip4/127.0.0.1/tcp/{}", port).parse().unwrap();
base_node_config.base_node.p2p.public_address =
Some(base_node_config.base_node.p2p.transport.tcp.listener_address.clone());
base_node_config.base_node.p2p.datastore_path = temp_dir.path().to_path_buf();
base_node_config.base_node.p2p.dht = DhtConfig::default_local_test();
base_node_config.base_node.p2p.allow_test_addresses = true;

println!(
"Initializing base node: name={}; port={}; grpc_port={}; is_seed_node={}",
name_cloned, port, grpc_port, is_seed_node
);
let result = run_base_node(shutdown, Arc::new(base_node_identity), Arc::new(base_node_config)).await;
if let Err(e) = result {
panic!("{:?}", e);
}
});

// make the new base node able to be referenced by other processes
world.base_nodes.insert(bn_name.clone(), process);
Expand All @@ -179,9 +173,7 @@ impl BaseNodeProcess {
Ok(BaseNodeGrpcClient::connect(format!("http://127.0.0.1:{}", self.grpc_port)).await?)
}

pub async fn kill(&mut self) {
self.kill_signal.take().unwrap().send(());
// This value is arbitrary. If there is no sleep the file might still be locked.
sleep_until(Instant::now() + Duration::from_secs(5)).await;
pub fn kill(&mut self) {
self.kill_signal.trigger();
}
}
Loading

0 comments on commit ac956c9

Please sign in to comment.