Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
sdbondi committed Sep 19, 2023
1 parent 6cec238 commit b799536
Show file tree
Hide file tree
Showing 51 changed files with 1,268 additions and 357 deletions.
41 changes: 30 additions & 11 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ members = [
"clients/base_node_client",
"clients/validator_node_client",
"clients/wallet_daemon_client",
"dan_layer/comms_rpc_state_sync",
"dan_layer/consensus",
"dan_layer/consensus_tests",
"dan_layer/epoch_manager",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -386,11 +386,8 @@ impl BaseLayerScanner {
created_justify: *genesis.justify().id(),
created_block: *genesis.id(),
created_height: NodeHeight::zero(),
destroyed_by_transaction: None,
destroyed_justify: None,
destroyed_by_block: None,
created_at_epoch: Epoch(0),
destroyed_at_epoch: None,
destroyed: None,
}
.create(tx)
})
Expand Down
1 change: 1 addition & 0 deletions applications/tari_validator_node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ minotari_wallet_grpc_client = { git = "https://github.com/tari-project/tari.git"
tari_base_node_client = { path = "../../clients/base_node_client" }
tari_epoch_manager = { path = "../../dan_layer/epoch_manager", features = ["base_layer"] }
tari_indexer_lib = { path = "../../dan_layer/indexer_lib" }
tari_comms_rpc_state_sync = { path = "../../dan_layer/comms_rpc_state_sync" }

tari_consensus = { path = "../../dan_layer/consensus" }
tari_state_store_sqlite = { path = "../../dan_layer/state_store_sqlite" }
Expand Down
11 changes: 3 additions & 8 deletions applications/tari_validator_node/src/bootstrap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,7 @@ pub async fn spawn_services(
rx_consensus_message,
outbound_messaging,
mempool.clone(),
validator_node_client_factory.clone(),
shutdown.clone(),
)
.await;
Expand Down Expand Up @@ -383,11 +384,8 @@ where
created_justify: *genesis_block.justify().id(),
created_block: *genesis_block.id(),
created_height: NodeHeight(0),
destroyed_by_transaction: None,
destroyed_justify: None,
destroyed_by_block: None,
created_at_epoch: Epoch(0),
destroyed_at_epoch: None,
destroyed: None,
}
.create(tx)?;
}
Expand All @@ -404,11 +402,8 @@ where
created_justify: *genesis_block.justify().id(),
created_block: *genesis_block.id(),
created_height: NodeHeight(0),
destroyed_by_transaction: None,
destroyed_justify: None,
destroyed_by_block: None,
created_at_epoch: Epoch(0),
destroyed_at_epoch: None,
destroyed: None,
}
.create(tx)?;
}
Expand Down
8 changes: 6 additions & 2 deletions applications/tari_validator_node/src/consensus/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use std::sync::Arc;

use tari_common_types::types::PublicKey;
use tari_comms::{types::CommsPublicKey, NodeIdentity};
use tari_comms_rpc_state_sync::CommsRpcStateSyncManager;
use tari_consensus::{
hotstuff::{ConsensusWorker, ConsensusWorkerContext, HotstuffEvent, HotstuffWorker},
messages::HotstuffMessage,
Expand All @@ -16,6 +17,7 @@ use tari_epoch_manager::base_layer::EpochManagerHandle;
use tari_shutdown::ShutdownSignal;
use tari_state_store_sqlite::SqliteStateStore;
use tari_transaction::{Transaction, TransactionId};
use tari_validator_node_rpc::client::TariCommsValidatorNodeClientFactory;
use tokio::{
sync::{broadcast, mpsc},
task::JoinHandle,
Expand Down Expand Up @@ -45,6 +47,7 @@ pub async fn spawn(
rx_hs_message: mpsc::Receiver<(CommsPublicKey, HotstuffMessage<PublicKey>)>,
outbound_messaging: OutboundMessaging,
mempool: MempoolHandle,
client_factory: TariCommsValidatorNodeClientFactory,
shutdown_signal: ShutdownSignal,
) -> (JoinHandle<Result<(), anyhow::Error>>, EventSubscription<HotstuffEvent>) {
let (tx_broadcast, rx_broadcast) = mpsc::channel(10);
Expand All @@ -64,7 +67,7 @@ pub async fn spawn(
validator_addr,
rx_new_transactions,
rx_hs_message,
store,
store.clone(),
epoch_manager.clone(),
leader_strategy,
signing_service,
Expand All @@ -78,9 +81,10 @@ pub async fn spawn(
);

let context = ConsensusWorkerContext {
epoch_manager,
epoch_manager: epoch_manager.clone(),
epoch_events,
hotstuff: hotstuff_worker,
state_sync: CommsRpcStateSyncManager::new(epoch_manager, store, client_factory),
};

let handle = ConsensusWorker::new(shutdown_signal).spawn(context);
Expand Down
2 changes: 2 additions & 0 deletions applications/tari_validator_node/src/consensus/spec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// SPDX-License-Identifier: BSD-3-Clause

use tari_comms::types::CommsPublicKey;
use tari_comms_rpc_state_sync::CommsRpcStateSyncManager;
use tari_consensus::traits::ConsensusSpec;
use tari_epoch_manager::base_layer::EpochManagerHandle;
use tari_state_store_sqlite::SqliteStateStore;
Expand All @@ -20,5 +21,6 @@ impl ConsensusSpec for TariConsensusSpec {
type LeaderStrategy = RoundRobinLeaderStrategy;
type StateManager = TariStateManager;
type StateStore = SqliteStateStore<Self::Addr>;
type SyncManager = CommsRpcStateSyncManager<Self::EpochManager, Self::StateStore>;
type VoteSignatureService = TariSignatureService;
}
10 changes: 8 additions & 2 deletions applications/tari_validator_node/src/consensus/state_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ use tari_dan_common_types::ShardId;
use tari_dan_storage::{
consensus_models::{Block, ExecutedTransaction, SubstateRecord},
StateStore,
StateStoreWriteTransaction,
StorageError,
};

Expand Down Expand Up @@ -35,7 +34,14 @@ impl<TStateStore: StateStore> StateManager<TStateStore> for TariStateManager {
let down_shards = diff
.down_iter()
.map(|(addr, version)| ShardId::from_address(addr, *version));
tx.substate_down_many(down_shards, block.epoch(), block.id(), transaction.id())?;
SubstateRecord::destroy_many(
tx,
down_shards,
block.epoch(),
block.id(),
block.justify().id(),
transaction.id(),
)?;

let to_up = diff.up_iter().map(|(addr, substate)| {
SubstateRecord::new(
Expand Down
43 changes: 3 additions & 40 deletions applications/tari_validator_node/src/dan_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,11 @@ use log::*;
use tari_comms::{connection_manager::LivenessStatus, connectivity::ConnectivityEvent, peer_manager::NodeId};
use tari_consensus::hotstuff::HotstuffEvent;
use tari_dan_storage::{consensus_models::Block, StateStore};
use tari_epoch_manager::{EpochManagerError, EpochManagerEvent, EpochManagerReader};
use tari_epoch_manager::{EpochManagerError, EpochManagerReader};
use tari_shutdown::ShutdownSignal;
use tokio::{task, time, time::MissedTickBehavior};
use tokio::{time, time::MissedTickBehavior};

use crate::{
p2p::services::{committee_state_sync::CommitteeStateSync, networking::NetworkingService},
Services,
};
use crate::{p2p::services::networking::NetworkingService, Services};

const LOG_TARGET: &str = "tari::validator_node::dan_node";

Expand Down Expand Up @@ -64,8 +61,6 @@ impl DanNode {
let mut tick = time::interval(Duration::from_secs(10));
tick.set_missed_tick_behavior(MissedTickBehavior::Skip);

let mut epoch_manager_events = self.services.epoch_manager.subscribe().await?;

loop {
tokio::select! {
// Wait until killed
Expand All @@ -86,10 +81,6 @@ impl DanNode {
error!(target: LOG_TARGET, "Error handling hotstuff event: {}", err);
},

Ok(event) = epoch_manager_events.recv() => {
self.handle_epoch_manager_event(event).await?;
}

Err(err) = self.services.on_any_exit() => {
error!(target: LOG_TARGET, "Error in service: {}", err);
return Err(err);
Expand Down Expand Up @@ -142,34 +133,6 @@ impl DanNode {
Ok(())
}

async fn handle_epoch_manager_event(&self, event: EpochManagerEvent) -> Result<(), anyhow::Error> {
match event {
EpochManagerEvent::EpochChanged(epoch) => {
info!(target: LOG_TARGET, "📅 Epoch changed to {}", epoch);
let sync_service = CommitteeStateSync::new(
self.services.epoch_manager.clone(),
self.services.validator_node_client_factory.clone(),
self.services.state_store.clone(),
self.services.global_db.clone(),
self.services.comms.node_identity().public_key().clone(),
);

// EpochChanged should only happen once per epoch and the event is not emitted during initial sync. So
// spawning state sync for each event should be ok.
task::spawn(async move {
if let Err(e) = sync_service.sync_state(epoch).await {
error!(
target: LOG_TARGET,
"Failed to sync peers state for epoch {}: {}", epoch, e
);
}
});
},
EpochManagerEvent::ThisValidatorIsRegistered { .. } => {},
}
Ok(())
}

async fn dial_local_shard_peers(&mut self) -> Result<(), anyhow::Error> {
let epoch = self.services.epoch_manager.current_epoch().await?;
let res = self
Expand Down
1 change: 1 addition & 0 deletions applications/tari_validator_node/src/p2p/rpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

mod service_impl;
mod sync_task;

pub use service_impl::ValidatorNodeRpcServiceImpl;
use tari_common_types::types::PublicKey;
Expand Down
Loading

0 comments on commit b799536

Please sign in to comment.