From b799536a3bd11940de1548bff5e0d76460e77f33 Mon Sep 17 00:00:00 2001 From: Stan Bondi Date: Mon, 18 Sep 2023 16:41:06 +0400 Subject: [PATCH] wip --- Cargo.lock | 41 ++- Cargo.toml | 1 + .../src/base_layer_scanner.rs | 5 +- applications/tari_validator_node/Cargo.toml | 1 + .../tari_validator_node/src/bootstrap.rs | 11 +- .../tari_validator_node/src/consensus/mod.rs | 8 +- .../tari_validator_node/src/consensus/spec.rs | 2 + .../src/consensus/state_manager.rs | 10 +- .../tari_validator_node/src/dan_node.rs | 43 +-- .../tari_validator_node/src/p2p/rpc/mod.rs | 1 + .../src/p2p/rpc/service_impl.rs | 231 +++++++++----- .../src/p2p/services/mod.rs | 2 +- dan_layer/common_types/src/committee.rs | 10 - dan_layer/comms_rpc_state_sync/Cargo.toml | 24 ++ dan_layer/comms_rpc_state_sync/src/error.rs | 28 ++ dan_layer/comms_rpc_state_sync/src/lib.rs | 18 ++ dan_layer/comms_rpc_state_sync/src/manager.rs | 281 ++++++++++++++++++ dan_layer/consensus/Cargo.toml | 3 + dan_layer/consensus/src/hotstuff/common.rs | 39 +-- dan_layer/consensus/src/hotstuff/error.rs | 2 + .../src/hotstuff/on_receive_new_view.rs | 8 +- .../src/hotstuff/on_receive_proposal.rs | 6 +- .../consensus/src/hotstuff/on_receive_vote.rs | 7 +- .../src/hotstuff/state_machine/check_sync.rs | 20 +- .../src/hotstuff/state_machine/syncing.rs | 13 +- .../src/hotstuff/state_machine/worker.rs | 11 +- dan_layer/consensus/src/hotstuff/worker.rs | 14 +- dan_layer/consensus/src/traits/mod.rs | 3 + dan_layer/consensus/src/traits/sync.rs | 19 ++ dan_layer/consensus_tests/Cargo.toml | 8 +- dan_layer/consensus_tests/src/support/mod.rs | 1 + dan_layer/consensus_tests/src/support/spec.rs | 2 + dan_layer/consensus_tests/src/support/sync.rs | 23 ++ .../src/support/validator/builder.rs | 2 + .../base_layer/base_layer_epoch_manager.rs | 4 + dan_layer/state_store_sqlite/src/error.rs | 12 +- dan_layer/state_store_sqlite/src/reader.rs | 58 ++++ .../src/sql_models/block.rs | 1 + .../src/sql_models/substate.rs | 42 ++- dan_layer/state_store_sqlite/src/writer.rs | 11 +- .../storage/src/consensus_models/block.rs | 47 ++- .../storage/src/consensus_models/command.rs | 12 + .../consensus_models/quorum_certificate.rs | 37 +++ .../storage/src/consensus_models/substate.rs | 171 +++++++++-- dan_layer/storage/src/error.rs | 16 +- dan_layer/storage/src/state_store/mod.rs | 6 + .../validator_node_rpc/proto/consensus.proto | 33 +- dan_layer/validator_node_rpc/proto/rpc.proto | 52 ++++ .../src/conversions/consensus.rs | 81 ++++- .../validator_node_rpc/src/conversions/rpc.rs | 123 +++++--- .../validator_node_rpc/src/rpc_service.rs | 21 +- 51 files changed, 1268 insertions(+), 357 deletions(-) create mode 100644 dan_layer/comms_rpc_state_sync/Cargo.toml create mode 100644 dan_layer/comms_rpc_state_sync/src/error.rs create mode 100644 dan_layer/comms_rpc_state_sync/src/lib.rs create mode 100644 dan_layer/comms_rpc_state_sync/src/manager.rs create mode 100644 dan_layer/consensus/src/traits/sync.rs create mode 100644 dan_layer/consensus_tests/src/support/sync.rs diff --git a/Cargo.lock b/Cargo.lock index e87245c27c..d6c37ccc07 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -618,9 +618,9 @@ checksum = "ecc7ab41815b3c653ccd2978ec3255c81349336702dfdf62ee6f7069b12a3aae" [[package]] name = "async-trait" -version = "0.1.68" +version = "0.1.73" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b9ccdd8f2a161be9bd5c023df56f1b2a0bd1d83872ae53b71a84a12c9bf6e842" +checksum = "bc00ceb34980c03614e35a3a4e218276a0a824e911d07651cd0d858a51e8c0f0" dependencies = [ "proc-macro2", "quote", @@ -1537,7 +1537,7 @@ dependencies = [ [[package]] name = "consensus_tests" -version = "0.1.0" +version = "0.50.0-pre.0" dependencies = [ "anyhow", "async-trait", @@ -4140,9 +4140,9 @@ dependencies = [ [[package]] name = "log" -version = "0.4.19" +version = "0.4.20" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b06a4cde4c0f271a446782e3eff8de789548ce57dbc8eca9292c27f4a42004b4" +checksum = "b5e6163cb8c49088c2c36f57875e58ccd8c87c7427f7fbd50ea6710b2f3f2e8f" dependencies = [ "serde", "value-bag", @@ -5735,9 +5735,9 @@ dependencies = [ [[package]] name = "quote" -version = "1.0.28" +version = "1.0.33" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1b9ab9c7eadfd8df19006f1cf1a4aed13540ed5cbc047010ece5826e10825488" +checksum = "5267fca4496028628a95160fc423a33e8b2e6af8a5302579e322e4b520293cae" dependencies = [ "proc-macro2", ] @@ -7417,6 +7417,24 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "tari_comms_rpc_state_sync" +version = "0.50.0-pre.0" +dependencies = [ + "anyhow", + "async-trait", + "futures 0.3.28", + "log", + "tari_comms", + "tari_consensus", + "tari_dan_common_types", + "tari_dan_storage", + "tari_epoch_manager", + "tari_transaction", + "tari_validator_node_rpc", + "thiserror", +] + [[package]] name = "tari_consensus" version = "0.50.0-pre.0" @@ -8322,6 +8340,7 @@ dependencies = [ "tari_common_types", "tari_comms", "tari_comms_logging", + "tari_comms_rpc_state_sync", "tari_consensus", "tari_core", "tari_crypto", @@ -8524,18 +8543,18 @@ dependencies = [ [[package]] name = "thiserror" -version = "1.0.40" +version = "1.0.48" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "978c9a314bd8dc99be594bc3c175faaa9794be04a5a5e153caba6915336cebac" +checksum = "9d6d7a740b8a666a7e828dd00da9c0dc290dff53154ea77ac109281de90589b7" dependencies = [ "thiserror-impl", ] [[package]] name = "thiserror-impl" -version = "1.0.40" +version = "1.0.48" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f9456a42c5b0d803c8cd86e73dd7cc9edd429499f37a3550d286d5e86720569f" +checksum = "49922ecae66cc8a249b77e68d1d0623c1b2c514f0060c27cdc68bd62a1219d35" dependencies = [ "proc-macro2", "quote", diff --git a/Cargo.toml b/Cargo.toml index 69a780c071..c4160ee98e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,6 +13,7 @@ members = [ "clients/base_node_client", "clients/validator_node_client", "clients/wallet_daemon_client", + "dan_layer/comms_rpc_state_sync", "dan_layer/consensus", "dan_layer/consensus_tests", "dan_layer/epoch_manager", diff --git a/applications/tari_dan_app_utilities/src/base_layer_scanner.rs b/applications/tari_dan_app_utilities/src/base_layer_scanner.rs index c261e8d287..d57c759a6c 100644 --- a/applications/tari_dan_app_utilities/src/base_layer_scanner.rs +++ b/applications/tari_dan_app_utilities/src/base_layer_scanner.rs @@ -386,11 +386,8 @@ impl BaseLayerScanner { created_justify: *genesis.justify().id(), created_block: *genesis.id(), created_height: NodeHeight::zero(), - destroyed_by_transaction: None, - destroyed_justify: None, - destroyed_by_block: None, created_at_epoch: Epoch(0), - destroyed_at_epoch: None, + destroyed: None, } .create(tx) }) diff --git a/applications/tari_validator_node/Cargo.toml b/applications/tari_validator_node/Cargo.toml index 9d8a51553b..c04ee93425 100644 --- a/applications/tari_validator_node/Cargo.toml +++ b/applications/tari_validator_node/Cargo.toml @@ -34,6 +34,7 @@ minotari_wallet_grpc_client = { git = "https://github.com/tari-project/tari.git" tari_base_node_client = { path = "../../clients/base_node_client" } tari_epoch_manager = { path = "../../dan_layer/epoch_manager", features = ["base_layer"] } tari_indexer_lib = { path = "../../dan_layer/indexer_lib" } +tari_comms_rpc_state_sync = { path = "../../dan_layer/comms_rpc_state_sync" } tari_consensus = { path = "../../dan_layer/consensus" } tari_state_store_sqlite = { path = "../../dan_layer/state_store_sqlite" } diff --git a/applications/tari_validator_node/src/bootstrap.rs b/applications/tari_validator_node/src/bootstrap.rs index 1357ad28bc..740807835f 100644 --- a/applications/tari_validator_node/src/bootstrap.rs +++ b/applications/tari_validator_node/src/bootstrap.rs @@ -245,6 +245,7 @@ pub async fn spawn_services( rx_consensus_message, outbound_messaging, mempool.clone(), + validator_node_client_factory.clone(), shutdown.clone(), ) .await; @@ -383,11 +384,8 @@ where created_justify: *genesis_block.justify().id(), created_block: *genesis_block.id(), created_height: NodeHeight(0), - destroyed_by_transaction: None, - destroyed_justify: None, - destroyed_by_block: None, created_at_epoch: Epoch(0), - destroyed_at_epoch: None, + destroyed: None, } .create(tx)?; } @@ -404,11 +402,8 @@ where created_justify: *genesis_block.justify().id(), created_block: *genesis_block.id(), created_height: NodeHeight(0), - destroyed_by_transaction: None, - destroyed_justify: None, - destroyed_by_block: None, created_at_epoch: Epoch(0), - destroyed_at_epoch: None, + destroyed: None, } .create(tx)?; } diff --git a/applications/tari_validator_node/src/consensus/mod.rs b/applications/tari_validator_node/src/consensus/mod.rs index 33498af97e..df1be0e561 100644 --- a/applications/tari_validator_node/src/consensus/mod.rs +++ b/applications/tari_validator_node/src/consensus/mod.rs @@ -5,6 +5,7 @@ use std::sync::Arc; use tari_common_types::types::PublicKey; use tari_comms::{types::CommsPublicKey, NodeIdentity}; +use tari_comms_rpc_state_sync::CommsRpcStateSyncManager; use tari_consensus::{ hotstuff::{ConsensusWorker, ConsensusWorkerContext, HotstuffEvent, HotstuffWorker}, messages::HotstuffMessage, @@ -16,6 +17,7 @@ use tari_epoch_manager::base_layer::EpochManagerHandle; use tari_shutdown::ShutdownSignal; use tari_state_store_sqlite::SqliteStateStore; use tari_transaction::{Transaction, TransactionId}; +use tari_validator_node_rpc::client::TariCommsValidatorNodeClientFactory; use tokio::{ sync::{broadcast, mpsc}, task::JoinHandle, @@ -45,6 +47,7 @@ pub async fn spawn( rx_hs_message: mpsc::Receiver<(CommsPublicKey, HotstuffMessage)>, outbound_messaging: OutboundMessaging, mempool: MempoolHandle, + client_factory: TariCommsValidatorNodeClientFactory, shutdown_signal: ShutdownSignal, ) -> (JoinHandle>, EventSubscription) { let (tx_broadcast, rx_broadcast) = mpsc::channel(10); @@ -64,7 +67,7 @@ pub async fn spawn( validator_addr, rx_new_transactions, rx_hs_message, - store, + store.clone(), epoch_manager.clone(), leader_strategy, signing_service, @@ -78,9 +81,10 @@ pub async fn spawn( ); let context = ConsensusWorkerContext { - epoch_manager, + epoch_manager: epoch_manager.clone(), epoch_events, hotstuff: hotstuff_worker, + state_sync: CommsRpcStateSyncManager::new(epoch_manager, store, client_factory), }; let handle = ConsensusWorker::new(shutdown_signal).spawn(context); diff --git a/applications/tari_validator_node/src/consensus/spec.rs b/applications/tari_validator_node/src/consensus/spec.rs index 27e211465f..a37c081b34 100644 --- a/applications/tari_validator_node/src/consensus/spec.rs +++ b/applications/tari_validator_node/src/consensus/spec.rs @@ -2,6 +2,7 @@ // SPDX-License-Identifier: BSD-3-Clause use tari_comms::types::CommsPublicKey; +use tari_comms_rpc_state_sync::CommsRpcStateSyncManager; use tari_consensus::traits::ConsensusSpec; use tari_epoch_manager::base_layer::EpochManagerHandle; use tari_state_store_sqlite::SqliteStateStore; @@ -20,5 +21,6 @@ impl ConsensusSpec for TariConsensusSpec { type LeaderStrategy = RoundRobinLeaderStrategy; type StateManager = TariStateManager; type StateStore = SqliteStateStore; + type SyncManager = CommsRpcStateSyncManager; type VoteSignatureService = TariSignatureService; } diff --git a/applications/tari_validator_node/src/consensus/state_manager.rs b/applications/tari_validator_node/src/consensus/state_manager.rs index 08fca59743..a44d9d3cd2 100644 --- a/applications/tari_validator_node/src/consensus/state_manager.rs +++ b/applications/tari_validator_node/src/consensus/state_manager.rs @@ -6,7 +6,6 @@ use tari_dan_common_types::ShardId; use tari_dan_storage::{ consensus_models::{Block, ExecutedTransaction, SubstateRecord}, StateStore, - StateStoreWriteTransaction, StorageError, }; @@ -35,7 +34,14 @@ impl StateManager for TariStateManager { let down_shards = diff .down_iter() .map(|(addr, version)| ShardId::from_address(addr, *version)); - tx.substate_down_many(down_shards, block.epoch(), block.id(), transaction.id())?; + SubstateRecord::destroy_many( + tx, + down_shards, + block.epoch(), + block.id(), + block.justify().id(), + transaction.id(), + )?; let to_up = diff.up_iter().map(|(addr, substate)| { SubstateRecord::new( diff --git a/applications/tari_validator_node/src/dan_node.rs b/applications/tari_validator_node/src/dan_node.rs index 5d7072a572..851625c10c 100644 --- a/applications/tari_validator_node/src/dan_node.rs +++ b/applications/tari_validator_node/src/dan_node.rs @@ -26,14 +26,11 @@ use log::*; use tari_comms::{connection_manager::LivenessStatus, connectivity::ConnectivityEvent, peer_manager::NodeId}; use tari_consensus::hotstuff::HotstuffEvent; use tari_dan_storage::{consensus_models::Block, StateStore}; -use tari_epoch_manager::{EpochManagerError, EpochManagerEvent, EpochManagerReader}; +use tari_epoch_manager::{EpochManagerError, EpochManagerReader}; use tari_shutdown::ShutdownSignal; -use tokio::{task, time, time::MissedTickBehavior}; +use tokio::{time, time::MissedTickBehavior}; -use crate::{ - p2p::services::{committee_state_sync::CommitteeStateSync, networking::NetworkingService}, - Services, -}; +use crate::{p2p::services::networking::NetworkingService, Services}; const LOG_TARGET: &str = "tari::validator_node::dan_node"; @@ -64,8 +61,6 @@ impl DanNode { let mut tick = time::interval(Duration::from_secs(10)); tick.set_missed_tick_behavior(MissedTickBehavior::Skip); - let mut epoch_manager_events = self.services.epoch_manager.subscribe().await?; - loop { tokio::select! { // Wait until killed @@ -86,10 +81,6 @@ impl DanNode { error!(target: LOG_TARGET, "Error handling hotstuff event: {}", err); }, - Ok(event) = epoch_manager_events.recv() => { - self.handle_epoch_manager_event(event).await?; - } - Err(err) = self.services.on_any_exit() => { error!(target: LOG_TARGET, "Error in service: {}", err); return Err(err); @@ -142,34 +133,6 @@ impl DanNode { Ok(()) } - async fn handle_epoch_manager_event(&self, event: EpochManagerEvent) -> Result<(), anyhow::Error> { - match event { - EpochManagerEvent::EpochChanged(epoch) => { - info!(target: LOG_TARGET, "📅 Epoch changed to {}", epoch); - let sync_service = CommitteeStateSync::new( - self.services.epoch_manager.clone(), - self.services.validator_node_client_factory.clone(), - self.services.state_store.clone(), - self.services.global_db.clone(), - self.services.comms.node_identity().public_key().clone(), - ); - - // EpochChanged should only happen once per epoch and the event is not emitted during initial sync. So - // spawning state sync for each event should be ok. - task::spawn(async move { - if let Err(e) = sync_service.sync_state(epoch).await { - error!( - target: LOG_TARGET, - "Failed to sync peers state for epoch {}: {}", epoch, e - ); - } - }); - }, - EpochManagerEvent::ThisValidatorIsRegistered { .. } => {}, - } - Ok(()) - } - async fn dial_local_shard_peers(&mut self) -> Result<(), anyhow::Error> { let epoch = self.services.epoch_manager.current_epoch().await?; let res = self diff --git a/applications/tari_validator_node/src/p2p/rpc/mod.rs b/applications/tari_validator_node/src/p2p/rpc/mod.rs index 0ec308b799..0afa457dae 100644 --- a/applications/tari_validator_node/src/p2p/rpc/mod.rs +++ b/applications/tari_validator_node/src/p2p/rpc/mod.rs @@ -21,6 +21,7 @@ // USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. mod service_impl; +mod sync_task; pub use service_impl::ValidatorNodeRpcServiceImpl; use tari_common_types::types::PublicKey; diff --git a/applications/tari_validator_node/src/p2p/rpc/service_impl.rs b/applications/tari_validator_node/src/p2p/rpc/service_impl.rs index 1295993600..56ddead862 100644 --- a/applications/tari_validator_node/src/p2p/rpc/service_impl.rs +++ b/applications/tari_validator_node/src/p2p/rpc/service_impl.rs @@ -1,3 +1,4 @@ +use std::collections::HashSet; // Copyright 2021, The Tari Project // // Redistribution and use in source and binary forms, with or without modification, are permitted provided that @@ -28,8 +29,10 @@ use tari_comms::protocol::rpc::{Request, Response, RpcStatus, Streaming}; use tari_dan_common_types::{optional::Optional, NodeAddressable, ShardId}; use tari_dan_p2p::PeerProvider; use tari_dan_storage::{ - consensus_models::{SubstateRecord, TransactionRecord}, + consensus_models::{Block, BlockId, HighQc, LockedBlock, QuorumCertificate, SubstateRecord, TransactionRecord}, StateStore, + StateStoreReadTransaction, + StorageError, }; use tari_engine_types::virtual_substate::VirtualSubstateAddress; use tari_epoch_manager::base_layer::EpochManagerHandle; @@ -39,14 +42,16 @@ use tari_transaction::{Transaction, TransactionId}; use tari_validator_node_rpc::{ proto, proto::rpc::{ + GetHighQcRequest, + GetHighQcResponse, GetSubstateRequest, GetSubstateResponse, GetTransactionResultRequest, GetTransactionResultResponse, PayloadResultStatus, SubstateStatus, - VnStateSyncRequest, - VnStateSyncResponse, + SyncBlocksRequest, + SyncBlocksResponse, }, rpc_service::ValidatorNodeRpcService, }; @@ -138,71 +143,6 @@ where TPeerProvider: PeerProvider + Clone + Send + Sync + 'static Ok(Streaming::new(rx)) } - async fn vn_state_sync( - &self, - request: Request, - ) -> Result, RpcStatus> { - let (tx, rx) = mpsc::channel(100); - let msg = request.into_message(); - - let start_shard_id = msg - .start_shard_id - .and_then(|s| ShardId::try_from(s).ok()) - .ok_or_else(|| RpcStatus::bad_request("start_shard_id malformed or not provided"))?; - let end_shard_id = msg - .end_shard_id - .and_then(|s| ShardId::try_from(s).ok()) - .ok_or_else(|| RpcStatus::bad_request("end_shard_id malformed or not provided"))?; - - let excluded_shards = msg - .inventory - .iter() - .map(|s| ShardId::try_from(s.bytes.as_slice()).map_err(|_| RpcStatus::bad_request("invalid shard_id"))) - .collect::, RpcStatus>>()?; - - let shard_db = self.shard_state_store.clone(); - - task::spawn(async move { - let shards_substates_data = shard_db.with_read_tx(|tx| { - SubstateRecord::get_many_within_range(tx, start_shard_id..=end_shard_id, excluded_shards.as_slice()) - }); - - let substates = match shards_substates_data { - Ok(s) => s, - Err(err) => { - error!(target: LOG_TARGET, "{}", err); - let _ignore = tx.send(Err(RpcStatus::general(&err))).await; - return; - }, - }; - - if substates.is_empty() { - return; - } - - // select data from db where shard_id <= end_shard_id and shard_id >= start_shard_id - for substate in substates { - match proto::rpc::VnStateSyncResponse::try_from(substate) { - Ok(r) => { - if tx.send(Ok(r)).await.is_err() { - debug!( - target: LOG_TARGET, - "Peer stream closed by client before completing. Aborting" - ); - break; - } - }, - Err(e) => { - error!(target: LOG_TARGET, "{}", e); - let _ignore = tx.send(Err(RpcStatus::general(&e))).await; - return; - }, - } - } - }); - Ok(Streaming::new(rx)) - } - async fn get_substate(&self, req: Request) -> Result, RpcStatus> { let req = req.into_message(); @@ -239,13 +179,13 @@ where TPeerProvider: PeerProvider + Clone + Send + Sync + 'static version: substate.version(), created_transaction_hash: substate.created_by_transaction().into_array().to_vec(), destroyed_transaction_hash: substate - .destroyed_by_transaction() - .map(|id| id.into_array().to_vec()) + .destroyed() + .map(|destroyed| destroyed.by_transaction.as_bytes().to_vec()) .unwrap_or_default(), quorum_certificates: Some(created_qc) .into_iter() .chain(destroyed_qc) - .map(Into::into) + .map(|qc| (&qc).into()) .collect(), ..Default::default() } @@ -257,7 +197,7 @@ where TPeerProvider: PeerProvider + Clone + Send + Sync + 'static substate: substate.substate_value().to_bytes(), created_transaction_hash: substate.created_by_transaction().into_array().to_vec(), destroyed_transaction_hash: vec![], - quorum_certificates: vec![created_qc.into()], + quorum_certificates: vec![(&created_qc).into()], } }; @@ -328,4 +268,151 @@ where TPeerProvider: PeerProvider + Clone + Send + Sync + 'static abort_details, })) } + + async fn sync_blocks( + &self, + request: Request, + ) -> Result, RpcStatus> { + let req = request.into_message(); + let store = self.shard_state_store.clone(); + + let (sender, receiver) = mpsc::channel(10); + + let start_block_id = BlockId::try_from(req.start_block_id) + .map_err(|e| RpcStatus::bad_request(&format!("Invalid encoded block id: {}", e)))?; + // Check if we have the blocks + let start_block = store + .with_read_tx(|tx| Block::get(tx, &start_block_id).optional()) + .map_err(RpcStatus::log_internal_error(LOG_TARGET))? + .ok_or_else(|| RpcStatus::not_found("start_block_id not found"))?; + + task::spawn(async move { + let mut buffer = Vec::with_capacity(10); + let mut current_block_id = *start_block.id(); + loop { + let result = store.with_read_tx(|tx| { + loop { + let children = tx.blocks_get_all_by_parent(¤t_block_id)?; + let Some(child) = children.into_iter().find(|b| b.is_committed()) else { + break; + }; + + current_block_id = *child.id(); + let all_qcs = child + .commands() + .iter() + .flat_map(|cmd| cmd.evidence().qc_ids_iter()) + .collect::>(); + let certificates = QuorumCertificate::get_all(tx, all_qcs)?; + let updates = child.get_substate_updates(tx)?; + buffer.push((child, certificates, updates)); + if buffer.len() == buffer.capacity() { + break; + } + } + Ok::<_, StorageError>(()) + }); + + if let Err(err) = result { + if sender + .send(Err(RpcStatus::log_internal_error(LOG_TARGET)(err))) + .await + .is_err() + { + debug!( + target: LOG_TARGET, + "Peer stream closed by client before completing. Aborting" + ); + break; + } + } + + let num_items = buffer.len(); + + for (block, quorum_certificates, updates) in buffer.drain(..) { + if sender + .send(Ok(SyncBlocksResponse { + block: Some(block.into()), + quorum_certificates: quorum_certificates.iter().map(Into::into).collect(), + substate_updates: updates.into_iter().map(Into::into).collect(), + })) + .await + .is_err() + { + debug!( + target: LOG_TARGET, + "Peer stream closed by client before completing. Aborting" + ); + break; + } + } + + // If we didnt fill up the buffer, send the final blocks + if num_items < buffer.capacity() { + store + .with_read_tx(|tx| { + let mut current = LockedBlock::get(tx)?.get_block(tx)?; + loop { + let all_qcs = current + .commands() + .iter() + .flat_map(|cmd| cmd.evidence().qc_ids_iter()) + .collect::>(); + let certificates = QuorumCertificate::get_all(tx, all_qcs)?; + let updates = current.get_substate_updates(tx)?; + + let parent = current.get_parent(tx)?; + buffer.push((current, certificates, updates)); + if *parent.id() == current_block_id { + break; + } + current = parent; + } + + buffer.reverse(); + Ok::<_, StorageError>(()) + // TODO: unwrap + }) + .unwrap(); + + for (block, quorum_certificates, updates) in buffer.drain(..) { + if sender + .send(Ok(SyncBlocksResponse { + block: Some(block.into()), + quorum_certificates: quorum_certificates.iter().map(Into::into).collect(), + substate_updates: updates.into_iter().map(Into::into).collect(), + })) + .await + .is_err() + { + debug!( + target: LOG_TARGET, + "Peer stream closed by client before completing. Aborting" + ); + break; + } + } + break; + } + } + }); + Ok(Streaming::new(receiver)) + } + + async fn get_high_qc(&self, _request: Request) -> Result, RpcStatus> { + let high_qc = self + .shard_state_store + .with_read_tx(|tx| { + HighQc::get(tx) + .optional()? + .map(|hqc| hqc.get_quorum_certificate(tx)) + .transpose() + }) + .map_err(RpcStatus::log_internal_error(LOG_TARGET))? + .unwrap_or_else(QuorumCertificate::genesis); + + Ok(Response::new(GetHighQcResponse { + high_qc: Some((&high_qc).into()), + })) + } } diff --git a/applications/tari_validator_node/src/p2p/services/mod.rs b/applications/tari_validator_node/src/p2p/services/mod.rs index 7c8eb741aa..9cbf07e12f 100644 --- a/applications/tari_validator_node/src/p2p/services/mod.rs +++ b/applications/tari_validator_node/src/p2p/services/mod.rs @@ -20,7 +20,7 @@ // WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE // USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -pub mod committee_state_sync; +// pub mod committee_state_sync; pub mod comms_peer_provider; pub mod mempool; pub mod messaging; diff --git a/dan_layer/common_types/src/committee.rs b/dan_layer/common_types/src/committee.rs index e8b1bc611b..77d6d35933 100644 --- a/dan_layer/common_types/src/committee.rs +++ b/dan_layer/common_types/src/committee.rs @@ -31,16 +31,6 @@ impl Committee { &self.members } - /// Returns n - f where n is the number of committee members and f is the tolerated failure nodes. - pub fn consensus_threshold(&self) -> usize { - let len = self.members.len(); - if len == 0 { - return 0; - } - let max_failures = (len - 1) / 3; - len - max_failures - } - pub fn max_failures(&self) -> usize { let len = self.members.len(); if len == 0 { diff --git a/dan_layer/comms_rpc_state_sync/Cargo.toml b/dan_layer/comms_rpc_state_sync/Cargo.toml new file mode 100644 index 0000000000..3ee6a790f8 --- /dev/null +++ b/dan_layer/comms_rpc_state_sync/Cargo.toml @@ -0,0 +1,24 @@ +[package] +name = "tari_comms_rpc_state_sync" +version = "0.50.0-pre.0" +edition = "2021" +authors = ["The Tari Development Community"] +description = "Tari template runtime engine" +repository = "https://github.com/tari-project/tari-dan" +license = "BSD-3-Clause" + +[dependencies] +tari_epoch_manager = { path = "../epoch_manager" } +tari_dan_storage = { path = "../storage" } +tari_validator_node_rpc = { path = "../validator_node_rpc" } +tari_consensus = { path = "../consensus" } +tari_dan_common_types = { path = "../common_types" } +tari_transaction = { path = "../transaction" } + +tari_comms = { git = "https://github.com/tari-project/tari.git", branch = "feature-dan2" } + +anyhow = "1.0.75" +async-trait = "0.1.73" +futures = "0.3.28" +log = "0.4.20" +thiserror = "1.0.48" diff --git a/dan_layer/comms_rpc_state_sync/src/error.rs b/dan_layer/comms_rpc_state_sync/src/error.rs new file mode 100644 index 0000000000..ef8368d948 --- /dev/null +++ b/dan_layer/comms_rpc_state_sync/src/error.rs @@ -0,0 +1,28 @@ +// Copyright 2023 The Tari Project +// SPDX-License-Identifier: BSD-3-Clause + +use tari_comms::protocol::rpc::RpcError; +use tari_consensus::hotstuff::HotStuffError; +use tari_dan_storage::StorageError; +use tari_epoch_manager::EpochManagerError; +use tari_validator_node_rpc::ValidatorNodeRpcClientError; + +#[derive(Debug, thiserror::Error)] +pub enum CommsRpcConsensusSyncError { + #[error("Epoch manager error: {0}")] + EpochManagerError(#[from] EpochManagerError), + #[error("RPC error: {0}")] + RpcError(#[from] RpcError), + #[error("Storage error: {0}")] + StorageError(#[from] StorageError), + #[error("Validator node client error: {0}")] + ValidatorNodeClientError(#[from] ValidatorNodeRpcClientError), + #[error("Invalid response: {0}")] + InvalidResponse(anyhow::Error), +} + +impl From for HotStuffError { + fn from(value: CommsRpcConsensusSyncError) -> Self { + HotStuffError::SyncError(value.into()) + } +} diff --git a/dan_layer/comms_rpc_state_sync/src/lib.rs b/dan_layer/comms_rpc_state_sync/src/lib.rs new file mode 100644 index 0000000000..e0196d033a --- /dev/null +++ b/dan_layer/comms_rpc_state_sync/src/lib.rs @@ -0,0 +1,18 @@ +// Copyright 2023 The Tari Project +// SPDX-License-Identifier: BSD-3-Clause + +//! # Comms RPC State Sync Protocol +//! +//! ```mermaid +//! sequenceDiagram +//! participant A as Client +//! participant B as Server +//! A->>B: CheckSync +//! B->>A: SyncStatus +//! ``` + +mod error; +mod manager; + +pub use error::*; +pub use manager::*; diff --git a/dan_layer/comms_rpc_state_sync/src/manager.rs b/dan_layer/comms_rpc_state_sync/src/manager.rs new file mode 100644 index 0000000000..bfe4cd1eea --- /dev/null +++ b/dan_layer/comms_rpc_state_sync/src/manager.rs @@ -0,0 +1,281 @@ +// Copyright 2023 The Tari Project +// SPDX-License-Identifier: BSD-3-Clause + +use std::ops::DerefMut; + +use async_trait::async_trait; +use futures::StreamExt; +use log::*; +use tari_comms::{protocol::rpc::RpcError, types::CommsPublicKey}; +use tari_consensus::traits::{SyncManager, SyncStatus}; +use tari_dan_common_types::{committee::Committee, optional::Optional, Epoch, NodeHeight}; +use tari_dan_storage::{ + consensus_models::{Block, HighQc, QuorumCertificate, SubstateUpdate}, + StateStore, + StateStoreWriteTransaction, +}; +use tari_epoch_manager::EpochManagerReader; +use tari_validator_node_rpc::{ + client::{TariCommsValidatorNodeClientFactory, ValidatorNodeClientFactory}, + proto::rpc::{GetHighQcRequest, SyncBlocksRequest}, + rpc_service::ValidatorNodeRpcClient, +}; + +use crate::error::CommsRpcConsensusSyncError; + +const LOG_TARGET: &str = "tari::dan::comms_rpc_state_sync"; + +pub struct CommsRpcStateSyncManager { + epoch_manager: TEpochManager, + state_store: TStateStore, + client_factory: TariCommsValidatorNodeClientFactory, +} + +impl CommsRpcStateSyncManager +where + TEpochManager: EpochManagerReader, + TStateStore: StateStore, +{ + pub fn new( + epoch_manager: TEpochManager, + state_store: TStateStore, + client_factory: TariCommsValidatorNodeClientFactory, + ) -> Self { + Self { + epoch_manager, + state_store, + client_factory, + } + } + + async fn get_sync_peers( + &self, + current_epoch: Epoch, + ) -> Result, CommsRpcConsensusSyncError> { + let this_vn = self.epoch_manager.get_our_validator_node(current_epoch).await?; + let mut committee = self.epoch_manager.get_local_committee(current_epoch).await?; + committee.members.retain(|m| *m != this_vn.address); + committee.shuffle(); + Ok(committee) + } + + async fn sync_with_peer(&self, addr: &CommsPublicKey, high_qc: &HighQc) -> Result<(), CommsRpcConsensusSyncError> { + self.create_genesis_block_if_required()?; + let mut rpc_client = self.client_factory.create_client(addr); + let mut client = rpc_client.client_connection().await?; + + self.sync_blocks(&mut client, high_qc).await?; + + Ok(()) + } + + fn create_genesis_block_if_required(&self) -> Result<(), CommsRpcConsensusSyncError> { + let mut tx = self.state_store.create_write_tx()?; + + // The parent for genesis blocks refer to this zero block + let zero_block = Block::zero_block(); + if !zero_block.exists(tx.deref_mut())? { + debug!(target: LOG_TARGET, "Creating zero block"); + zero_block.justify().insert(&mut tx)?; + zero_block.insert(&mut tx)?; + zero_block.as_locked().set(&mut tx)?; + zero_block.as_leaf_block().set(&mut tx)?; + zero_block.as_last_executed().set(&mut tx)?; + zero_block.justify().as_high_qc().set(&mut tx)?; + zero_block.commit(&mut tx)?; + } + + tx.commit()?; + + Ok(()) + } + + async fn sync_blocks( + &self, + client: &mut ValidatorNodeRpcClient, + high_qc: &HighQc, + ) -> Result<(), CommsRpcConsensusSyncError> { + let mut stream = client + .sync_blocks(SyncBlocksRequest { + start_block_id: high_qc.block_id.as_bytes().to_vec(), + }) + .await?; + + info!(target: LOG_TARGET, "🌐 Syncing blocks from {}", high_qc.block_id); + + while let Some(resp) = stream.next().await { + let msg = resp.map_err(RpcError::from)?; + let block = msg + .block + .map(Block::::try_from) + .transpose() + .map_err(CommsRpcConsensusSyncError::InvalidResponse)? + .ok_or_else(|| { + CommsRpcConsensusSyncError::InvalidResponse(anyhow::anyhow!( + "Peer returned an empty block response" + )) + })?; + let qcs = msg + .quorum_certificates + .into_iter() + .map(QuorumCertificate::::try_from) + .collect::, _>>() + .map_err(CommsRpcConsensusSyncError::InvalidResponse)?; + let updates = msg + .substate_updates + .into_iter() + .map(SubstateUpdate::try_from) + .collect::, _>>() + .map_err(CommsRpcConsensusSyncError::InvalidResponse)?; + + debug!( + target: LOG_TARGET, + "🌐 Received block {}, {} qcs and {} substate updates", + block, + qcs.len(), + updates.len(), + ); + self.commit_block(block, qcs, updates)?; + } + + info!(target: LOG_TARGET, "🌐 Sync complete"); + + Ok(()) + } + + fn commit_block( + &self, + block: Block, + qcs: Vec>, + updates: Vec>, + ) -> Result<(), CommsRpcConsensusSyncError> { + self.state_store.with_write_tx(|tx| { + for qc in qcs { + qc.save(tx)?; + } + for update in updates { + update.apply(tx, &block)?; + } + block.justify().save(tx)?; + block.justify().update_high_qc(tx)?; + block.save(tx)?; + block.commit(tx)?; + Ok(()) + }) + } +} + +#[async_trait] +impl SyncManager for CommsRpcStateSyncManager +where + TEpochManager: EpochManagerReader + Send + Sync + 'static, + TStateStore: StateStore + Send + Sync + 'static, +{ + type Error = CommsRpcConsensusSyncError; + + async fn check_sync(&self) -> Result { + let current_epoch = self.epoch_manager.current_epoch().await?; + let committee = self.get_sync_peers(current_epoch).await?; + let mut highest_qc: Option> = None; + let mut num_succeeded = 0; + let max_failures = committee.max_failures(); + for addr in committee { + let mut rpc_client = self.client_factory.create_client(&addr); + let mut client = match rpc_client.client_connection().await { + Ok(client) => client, + Err(err) => { + warn!(target: LOG_TARGET, "Failed to connect to peer {}: {}", addr, err); + continue; + }, + }; + let result = client + .get_high_qc(GetHighQcRequest {}) + .await + .map_err(CommsRpcConsensusSyncError::RpcError) + .and_then(|resp| { + resp.high_qc + .map(QuorumCertificate::::try_from) + .transpose() + .map_err(CommsRpcConsensusSyncError::InvalidResponse)? + .ok_or_else(|| { + CommsRpcConsensusSyncError::InvalidResponse(anyhow::anyhow!( + "Peer returned an empty high qc" + )) + }) + }); + let remote_high_qc = match result { + Ok(resp) => resp, + Err(err) => { + warn!("Failed to get high qc from peer {}: {}", addr, err); + continue; + }, + }; + + num_succeeded += 1; + if highest_qc + .as_ref() + .map(|qc| qc.block_height() < remote_high_qc.block_height()) + .unwrap_or(true) + { + // TODO: validate + + highest_qc = Some(remote_high_qc); + } + + if num_succeeded == max_failures { + break; + } + } + + if let Some(highest_qc) = highest_qc { + let local_high_qc = self.state_store.with_read_tx(|tx| HighQc::get(tx).optional())?; + let local_height = local_high_qc + .as_ref() + .map(|qc| qc.block_height()) + .unwrap_or(NodeHeight(0)); + if highest_qc.block_height() > local_height { + info!( + target: LOG_TARGET, + "Highest QC from peers is at height {} and local high QC is at height {}", + highest_qc.block_height(), + local_height, + ); + return Ok(SyncStatus::Behind); + } + } + + Ok(SyncStatus::UpToDate) + } + + async fn sync(&self) -> Result<(), Self::Error> { + let current_epoch = self.epoch_manager.current_epoch().await?; + let committee = self.get_sync_peers(current_epoch).await?; + + let mut sync_error = None; + for member in committee { + // Refresh the HighQC each time because a partial sync could have been achieved from a peer + let high_qc = self + .state_store + .with_read_tx(|tx| HighQc::get(tx).optional())? + .unwrap_or_else(|| QuorumCertificate::::genesis().as_high_qc()); + + match self.sync_with_peer(&member, &high_qc).await { + Ok(()) => { + sync_error = None; + break; + }, + Err(err) => { + warn!(target: LOG_TARGET, "Failed to sync with peer {}: {}", member, err); + sync_error = Some(err); + continue; + }, + } + } + + if let Some(err) = sync_error { + return Err(err); + } + + Ok(()) + } +} diff --git a/dan_layer/consensus/Cargo.toml b/dan_layer/consensus/Cargo.toml index 0b12fdd071..790f7949a6 100644 --- a/dan_layer/consensus/Cargo.toml +++ b/dan_layer/consensus/Cargo.toml @@ -2,6 +2,9 @@ name = "tari_consensus" version = "0.50.0-pre.0" edition = "2021" +authors = ["The Tari Development Community"] +description = "Tari template runtime engine" +repository = "https://github.com/tari-project/tari-dan" license = "BSD-3-Clause" [dependencies] diff --git a/dan_layer/consensus/src/hotstuff/common.rs b/dan_layer/consensus/src/hotstuff/common.rs index cf05be0b8b..6cd0fb18ed 100644 --- a/dan_layer/consensus/src/hotstuff/common.rs +++ b/dan_layer/consensus/src/hotstuff/common.rs @@ -1,16 +1,9 @@ // Copyright 2023 The Tari Project // SPDX-License-Identifier: BSD-3-Clause -use std::ops::DerefMut; - use log::*; use tari_dan_common_types::{committee::Committee, Epoch, NodeAddressable, NodeHeight}; -use tari_dan_storage::{ - consensus_models::{Block, HighQc, QuorumCertificate, QuorumDecision}, - StateStoreReadTransaction, - StateStoreWriteTransaction, - StorageError, -}; +use tari_dan_storage::consensus_models::{Block, QuorumCertificate, QuorumDecision}; use crate::{messages::HotstuffMessage, traits::LeaderStrategy}; @@ -24,36 +17,6 @@ pub const EXHAUST_DIVISOR: u64 = 0; // To avoid clippy::type_complexity pub(super) type CommitteeAndMessage = (Committee, HotstuffMessage); -pub fn update_high_qc( - tx: &mut TTx, - qc: &QuorumCertificate, -) -> Result<(), StorageError> -where - TTx: StateStoreWriteTransaction + DerefMut, - TTx::Target: StateStoreReadTransaction, -{ - let high_qc = HighQc::get(tx.deref_mut())?; - let high_qc = high_qc.get_quorum_certificate(tx.deref_mut())?; - - if high_qc.block_height() < qc.block_height() { - debug!( - target: LOG_TARGET, - "🔥 UPDATE_HIGH_QC (node: {} {}, previous high QC: {} {})", - qc.id(), - qc.block_height(), - high_qc.block_id(), - high_qc.block_height(), - ); - - qc.save(tx)?; - // This will fail if the block doesnt exist - qc.as_leaf_block().set(tx)?; - qc.as_high_qc().set(tx)?; - } - - Ok(()) -} - pub fn calculate_dummy_blocks>( epoch: Epoch, high_qc: &QuorumCertificate, diff --git a/dan_layer/consensus/src/hotstuff/error.rs b/dan_layer/consensus/src/hotstuff/error.rs index 2afc7d717d..28c2301b41 100644 --- a/dan_layer/consensus/src/hotstuff/error.rs +++ b/dan_layer/consensus/src/hotstuff/error.rs @@ -54,6 +54,8 @@ pub enum HotStuffError { }, #[error("BUG Invariant error occurred: {0}")] InvariantError(String), + #[error("Sync error: {0}")] + SyncError(anyhow::Error), } impl From for HotStuffError { diff --git a/dan_layer/consensus/src/hotstuff/on_receive_new_view.rs b/dan_layer/consensus/src/hotstuff/on_receive_new_view.rs index 9b5fd97003..e4a8bed932 100644 --- a/dan_layer/consensus/src/hotstuff/on_receive_new_view.rs +++ b/dan_layer/consensus/src/hotstuff/on_receive_new_view.rs @@ -12,11 +12,7 @@ use tari_dan_storage::{ use tari_epoch_manager::EpochManagerReader; use crate::{ - hotstuff::{ - common::{calculate_dummy_blocks, update_high_qc}, - error::HotStuffError, - pacemaker_handle::PaceMakerHandle, - }, + hotstuff::{common::calculate_dummy_blocks, error::HotStuffError, pacemaker_handle::PaceMakerHandle}, messages::NewViewMessage, traits::{ConsensusSpec, LeaderStrategy}, }; @@ -93,7 +89,7 @@ where TConsensusSpec: ConsensusSpec self.validate_qc(&high_qc)?; - self.store.with_write_tx(|tx| update_high_qc(tx, &high_qc))?; + self.store.with_write_tx(|tx| high_qc.update_high_qc(tx))?; let local_committee = self.epoch_manager.get_local_committee(epoch).await?; let leader = self diff --git a/dan_layer/consensus/src/hotstuff/on_receive_proposal.rs b/dan_layer/consensus/src/hotstuff/on_receive_proposal.rs index ba2256f1a3..42b1cc921a 100644 --- a/dan_layer/consensus/src/hotstuff/on_receive_proposal.rs +++ b/dan_layer/consensus/src/hotstuff/on_receive_proposal.rs @@ -43,7 +43,7 @@ use tokio::sync::{broadcast, mpsc}; use crate::{ hotstuff::{ - common::{update_high_qc, BlockDecision, EXHAUST_DIVISOR}, + common::{BlockDecision, EXHAUST_DIVISOR}, error::HotStuffError, event::HotstuffEvent, pacemaker_handle::PaceMakerHandle, @@ -806,7 +806,7 @@ where TConsensusSpec: ConsensusSpec block: &Block, local_committee_shard: &CommitteeShard, ) -> Result<(), HotStuffError> { - update_high_qc(tx, block.justify())?; + block.justify().update_high_qc(tx)?; // b'' <- b*.justify.node let Some(commit_node) = block.justify().get_block(tx.deref_mut()).optional()? else { @@ -1096,7 +1096,7 @@ where TConsensusSpec: ConsensusSpec .into()); } - update_high_qc(tx, candidate_block.justify())?; + candidate_block.justify().update_high_qc(tx)?; // if candidate_block.height().saturating_sub(justify_block.height()).0 > local_committee.max_failures() as u64 // { TODO: We should maybe relax this constraint during GST, before the first block, many leaders might diff --git a/dan_layer/consensus/src/hotstuff/on_receive_vote.rs b/dan_layer/consensus/src/hotstuff/on_receive_vote.rs index a1063d04ae..2f01a38f84 100644 --- a/dan_layer/consensus/src/hotstuff/on_receive_vote.rs +++ b/dan_layer/consensus/src/hotstuff/on_receive_vote.rs @@ -12,10 +12,9 @@ use tari_dan_storage::{ StateStoreWriteTransaction, }; use tari_epoch_manager::EpochManagerReader; -use thiserror::__private::DisplayAsDisplay; use crate::{ - hotstuff::{common::update_high_qc, error::HotStuffError, pacemaker_handle::PaceMakerHandle}, + hotstuff::{error::HotStuffError, pacemaker_handle::PaceMakerHandle}, messages::VoteMessage, traits::{ConsensusSpec, LeaderStrategy, VoteSignatureService}, }; @@ -194,7 +193,7 @@ where TConsensusSpec: ConsensusSpec info!(target: LOG_TARGET, "🔥 New QC {}", qc); - update_high_qc(&mut tx, &qc)?; + qc.update_high_qc(&mut tx)?; tx.commit()?; } self.on_beat.beat(); @@ -236,7 +235,7 @@ where TConsensusSpec: ConsensusSpec .create_challenge(sender_leaf_hash, &message.block_id, &message.decision); if !self.vote_signature_service.verify(&message.signature, &challenge) { return Err(HotStuffError::InvalidVoteSignature { - signer_public_key: message.signature.public_key().as_display().to_string(), + signer_public_key: message.signature.public_key().to_string(), }); } Ok(()) diff --git a/dan_layer/consensus/src/hotstuff/state_machine/check_sync.rs b/dan_layer/consensus/src/hotstuff/state_machine/check_sync.rs index 53ce822e34..6c55c84aa1 100644 --- a/dan_layer/consensus/src/hotstuff/state_machine/check_sync.rs +++ b/dan_layer/consensus/src/hotstuff/state_machine/check_sync.rs @@ -3,8 +3,6 @@ use std::marker::PhantomData; -use log::*; - use crate::{ hotstuff::{ state_machine::{ @@ -15,21 +13,27 @@ use crate::{ }, HotStuffError, }, - traits::ConsensusSpec, + traits::{ConsensusSpec, SyncManager, SyncStatus}, }; -const LOG_TARGET: &str = "tari::dan::consensus::sm::check_sync"; +const _LOG_TARGET: &str = "tari::dan::consensus::sm::check_sync"; #[derive(Debug, Clone)] pub struct CheckSync(PhantomData); -impl CheckSync { +impl CheckSync +where + TSpec: ConsensusSpec, + HotStuffError: From<::Error>, +{ pub(super) async fn on_enter( &self, - _context: &mut ConsensusWorkerContext, + context: &mut ConsensusWorkerContext, ) -> Result { - warn!(target: LOG_TARGET, "CheckSync not implemented"); - Ok(ConsensusStateEvent::Ready) + match context.state_sync.check_sync().await? { + SyncStatus::UpToDate => Ok(ConsensusStateEvent::Ready), + SyncStatus::Behind => Ok(ConsensusStateEvent::NeedSync), + } } } diff --git a/dan_layer/consensus/src/hotstuff/state_machine/syncing.rs b/dan_layer/consensus/src/hotstuff/state_machine/syncing.rs index e17f88604c..9cf4adfe14 100644 --- a/dan_layer/consensus/src/hotstuff/state_machine/syncing.rs +++ b/dan_layer/consensus/src/hotstuff/state_machine/syncing.rs @@ -9,19 +9,22 @@ use crate::{ ConsensusWorkerContext, HotStuffError, }, - traits::ConsensusSpec, + traits::{ConsensusSpec, SyncManager}, }; #[derive(Debug)] pub struct Syncing(PhantomData); -impl Syncing { +impl Syncing +where + TSpec: ConsensusSpec, + HotStuffError: From<::Error>, +{ pub(super) async fn on_enter( &self, - _context: &mut ConsensusWorkerContext, + context: &mut ConsensusWorkerContext, ) -> Result { - // let mut sync = SyncWorker::new(context); - // sync.start().await?; + context.state_sync.sync().await?; Ok(ConsensusStateEvent::SyncComplete) } } diff --git a/dan_layer/consensus/src/hotstuff/state_machine/worker.rs b/dan_layer/consensus/src/hotstuff/state_machine/worker.rs index 196123dde1..450210b3e4 100644 --- a/dan_layer/consensus/src/hotstuff/state_machine/worker.rs +++ b/dan_layer/consensus/src/hotstuff/state_machine/worker.rs @@ -14,7 +14,7 @@ use crate::{ HotStuffError, HotstuffWorker, }, - traits::ConsensusSpec, + traits::{ConsensusSpec, SyncManager}, }; const LOG_TARGET: &str = "tari::dan::consensus::sm::worker"; @@ -30,9 +30,14 @@ pub struct ConsensusWorkerContext { pub epoch_manager: TSpec::EpochManager, pub epoch_events: broadcast::Receiver, pub hotstuff: HotstuffWorker, + pub state_sync: TSpec::SyncManager, } -impl ConsensusWorker { +impl ConsensusWorker +where + TSpec: ConsensusSpec, + HotStuffError: From<::Error>, +{ pub fn new(shutdown_signal: ShutdownSignal) -> Self { Self { shutdown_signal, @@ -50,7 +55,7 @@ impl ConsensusWorker { ConsensusState::CheckSync(state) => self.result_or_shutdown(state.on_enter(context)).await, ConsensusState::Syncing(state) => self.result_or_shutdown(state.on_enter(context)).await, ConsensusState::Sleeping => { - time::sleep(Duration::from_secs(1)).await; + time::sleep(Duration::from_secs(5)).await; ConsensusStateEvent::Resume }, ConsensusState::Running(state) => state diff --git a/dan_layer/consensus/src/hotstuff/worker.rs b/dan_layer/consensus/src/hotstuff/worker.rs index bc04295aba..ae8589481a 100644 --- a/dan_layer/consensus/src/hotstuff/worker.rs +++ b/dan_layer/consensus/src/hotstuff/worker.rs @@ -7,7 +7,7 @@ use std::{ }; use log::*; -use tari_dan_common_types::{Epoch, NodeHeight}; +use tari_dan_common_types::NodeHeight; use tari_dan_storage::{ consensus_models::{Block, HighQc, LeafBlock, TransactionPool}, StateStore, @@ -150,7 +150,7 @@ impl HotstuffWorker where TConsensusSpec: ConsensusSpec { pub async fn start(&mut self) -> Result<(), HotStuffError> { - self.create_genesis_block_if_required(Epoch(0))?; + self.create_genesis_block_if_required()?; let (leaf_block, high_qc) = self .state_store .with_read_tx(|tx| Ok::<_, HotStuffError>((LeafBlock::get(tx)?, HighQc::get(tx)?)))?; @@ -344,10 +344,10 @@ where TConsensusSpec: ConsensusSpec Ok(()) } - fn create_genesis_block_if_required(&mut self, epoch: Epoch) -> Result<(), HotStuffError> { + fn create_genesis_block_if_required(&self) -> Result<(), HotStuffError> { let mut tx = self.state_store.create_write_tx()?; - // The parent for all genesis blocks refer to this zero block + // The parent for genesis blocks refer to this zero block let zero_block = Block::zero_block(); if !zero_block.exists(tx.deref_mut())? { debug!(target: LOG_TARGET, "Creating zero block"); @@ -373,12 +373,6 @@ where TConsensusSpec: ConsensusSpec tx.commit()?; - info!( - target: LOG_TARGET, - "🚀 Epoch changed to {}", - epoch - ); - Ok(()) } diff --git a/dan_layer/consensus/src/traits/mod.rs b/dan_layer/consensus/src/traits/mod.rs index e729489a75..975f80284e 100644 --- a/dan_layer/consensus/src/traits/mod.rs +++ b/dan_layer/consensus/src/traits/mod.rs @@ -4,10 +4,12 @@ mod leader_strategy; mod signing_service; mod state_manager; +mod sync; pub use leader_strategy::*; use serde::Serialize; pub use state_manager::*; +pub use sync::*; use tari_dan_common_types::NodeAddressable; use tari_dan_storage::StateStore; use tari_epoch_manager::EpochManagerReader; @@ -22,4 +24,5 @@ pub trait ConsensusSpec: Send + Sync + 'static { type LeaderStrategy: LeaderStrategy + Send + Sync + 'static; type VoteSignatureService: VoteSignatureService + Send + Sync + 'static; type StateManager: StateManager + Send + Sync + 'static; + type SyncManager: SyncManager + Send + Sync + 'static; } diff --git a/dan_layer/consensus/src/traits/sync.rs b/dan_layer/consensus/src/traits/sync.rs new file mode 100644 index 0000000000..018c69676f --- /dev/null +++ b/dan_layer/consensus/src/traits/sync.rs @@ -0,0 +1,19 @@ +// Copyright 2023 The Tari Project +// SPDX-License-Identifier: BSD-3-Clause + +use async_trait::async_trait; + +#[async_trait] +pub trait SyncManager { + type Error: std::error::Error + Send + Sync + 'static; + + async fn check_sync(&self) -> Result; + + async fn sync(&self) -> Result<(), Self::Error>; +} + +#[derive(Debug, Clone, Copy)] +pub enum SyncStatus { + UpToDate, + Behind, +} diff --git a/dan_layer/consensus_tests/Cargo.toml b/dan_layer/consensus_tests/Cargo.toml index f916301380..de58eb668e 100644 --- a/dan_layer/consensus_tests/Cargo.toml +++ b/dan_layer/consensus_tests/Cargo.toml @@ -1,9 +1,11 @@ [package] name = "consensus_tests" -version = "0.1.0" +version = "0.50.0-pre.0" edition = "2021" - -# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html +authors = ["The Tari Development Community"] +description = "Tari template runtime engine" +repository = "https://github.com/tari-project/tari-dan" +license = "BSD-3-Clause" [dependencies] diff --git a/dan_layer/consensus_tests/src/support/mod.rs b/dan_layer/consensus_tests/src/support/mod.rs index 7ffdeff199..8b9f785ca2 100644 --- a/dan_layer/consensus_tests/src/support/mod.rs +++ b/dan_layer/consensus_tests/src/support/mod.rs @@ -14,6 +14,7 @@ mod network; mod signing_service; mod spec; mod state_manager; +mod sync; mod transaction; mod validator; diff --git a/dan_layer/consensus_tests/src/support/spec.rs b/dan_layer/consensus_tests/src/support/spec.rs index 20f1708e9e..3c815cfd1e 100644 --- a/dan_layer/consensus_tests/src/support/spec.rs +++ b/dan_layer/consensus_tests/src/support/spec.rs @@ -8,6 +8,7 @@ use crate::support::{ address::TestAddress, epoch_manager::TestEpochManager, signing_service::TestVoteSignatureService, + sync::NoopStateSyncManager, NoopStateManager, RoundRobinLeaderStrategy, }; @@ -20,5 +21,6 @@ impl ConsensusSpec for TestConsensusSpec { type LeaderStrategy = RoundRobinLeaderStrategy; type StateManager = NoopStateManager; type StateStore = SqliteStateStore; + type SyncManager = NoopStateSyncManager; type VoteSignatureService = TestVoteSignatureService; } diff --git a/dan_layer/consensus_tests/src/support/sync.rs b/dan_layer/consensus_tests/src/support/sync.rs new file mode 100644 index 0000000000..b9cbcf9a8f --- /dev/null +++ b/dan_layer/consensus_tests/src/support/sync.rs @@ -0,0 +1,23 @@ +// Copyright 2023 The Tari Project +// SPDX-License-Identifier: BSD-3-Clause + +use async_trait::async_trait; +use tari_consensus::{ + hotstuff::HotStuffError, + traits::{SyncManager, SyncStatus}, +}; + +pub struct NoopStateSyncManager; + +#[async_trait] +impl SyncManager for NoopStateSyncManager { + type Error = HotStuffError; + + async fn check_sync(&self) -> Result { + Ok(SyncStatus::UpToDate) + } + + async fn sync(&self) -> Result<(), Self::Error> { + Ok(()) + } +} diff --git a/dan_layer/consensus_tests/src/support/validator/builder.rs b/dan_layer/consensus_tests/src/support/validator/builder.rs index c766f39b91..e059bff446 100644 --- a/dan_layer/consensus_tests/src/support/validator/builder.rs +++ b/dan_layer/consensus_tests/src/support/validator/builder.rs @@ -12,6 +12,7 @@ use crate::support::{ address::TestAddress, epoch_manager::TestEpochManager, signing_service::TestVoteSignatureService, + sync::NoopStateSyncManager, NoopStateManager, RoundRobinLeaderStrategy, TestConsensusSpec, @@ -106,6 +107,7 @@ impl ValidatorBuilder { epoch_manager, epoch_events: rx_epoch_events, hotstuff: worker, + state_sync: NoopStateSyncManager, }; let mut worker = ConsensusWorker::new(shutdown_signal); diff --git a/dan_layer/epoch_manager/src/base_layer/base_layer_epoch_manager.rs b/dan_layer/epoch_manager/src/base_layer/base_layer_epoch_manager.rs index a7846dc342..18c92b8ae8 100644 --- a/dan_layer/epoch_manager/src/base_layer/base_layer_epoch_manager.rs +++ b/dan_layer/epoch_manager/src/base_layer/base_layer_epoch_manager.rs @@ -279,6 +279,10 @@ impl BaseLayerEpochManager { public_key: &CommsPublicKey, ) -> Result>, EpochManagerError> { let (start_epoch, end_epoch) = self.get_epoch_range(epoch)?; + debug!( + target: LOG_TARGET, + "get_validator_node: epoch {}-{} with public key {}", start_epoch, end_epoch, public_key, + ); let mut tx = self.global_db.create_transaction()?; let vn = self .global_db diff --git a/dan_layer/state_store_sqlite/src/error.rs b/dan_layer/state_store_sqlite/src/error.rs index 225dc170a1..1ff0d22d1e 100644 --- a/dan_layer/state_store_sqlite/src/error.rs +++ b/dan_layer/state_store_sqlite/src/error.rs @@ -29,6 +29,12 @@ pub enum SqliteStorageError { NotAllTransactionsFound { operation: &'static str, details: String }, #[error("[{operation}] Not all queried substates were found: {details}")] NotAllSubstatesFound { operation: &'static str, details: String }, + #[error("[{operation}] Not all {items} were found: {details}")] + NotAllItemsFound { + items: &'static str, + operation: &'static str, + details: String, + }, #[error("[{operation}] One or more substates were are write locked")] SubstatesWriteLocked { operation: &'static str }, #[error("[{operation}] lock error: {details}")] @@ -60,12 +66,6 @@ impl From for StorageError { } } -// impl From for SqliteStorageError { -// fn from(_: FixedHashSizeError) -> Self { -// SqliteStorageError::MalformedHashData -// } -// } - impl IsNotFoundError for SqliteStorageError { fn is_not_found_error(&self) -> bool { matches!(self, SqliteStorageError::DieselError { source, .. } if matches!(source, diesel::result::Error::NotFound)) diff --git a/dan_layer/state_store_sqlite/src/reader.rs b/dan_layer/state_store_sqlite/src/reader.rs index 4f3e543fbd..18a5b1317a 100644 --- a/dan_layer/state_store_sqlite/src/reader.rs +++ b/dan_layer/state_store_sqlite/src/reader.rs @@ -475,6 +475,39 @@ impl StateStoreReadTransa deserialize_json(&qc_json) } + fn quorum_certificates_get_all<'a, I: IntoIterator>( + &mut self, + qc_ids: I, + ) -> Result>, StorageError> { + use crate::schema::quorum_certificates; + + let qc_ids: Vec = qc_ids.into_iter().map(serialize_hex).collect(); + + let qc_json = quorum_certificates::table + .select(quorum_certificates::json) + .filter(quorum_certificates::qc_id.eq_any(&qc_ids)) + .get_results::(self.connection()) + .map_err(|e| SqliteStorageError::DieselError { + operation: "quorum_certificates_get_all", + source: e, + })?; + + if qc_json.len() != qc_ids.len() { + return Err(SqliteStorageError::NotAllItemsFound { + items: "QCs", + operation: "quorum_certificates_get_all", + details: format!( + "quorum_certificates_get_all: expected {} quorum certificates, got {}", + qc_ids.len(), + qc_json.len() + ), + } + .into()); + } + + qc_json.iter().map(|j| deserialize_json(j)).collect() + } + fn quorum_certificates_get_by_block_id( &mut self, block_id: &BlockId, @@ -742,6 +775,31 @@ impl StateStoreReadTransa substates.into_iter().map(TryInto::try_into).collect() } + + fn substates_get_all_for_block(&mut self, block_id: &BlockId) -> Result, StorageError> { + use crate::schema::substates; + + let block_id_hex = serialize_hex(block_id); + + let substates = substates::table + .filter( + substates::created_block + .eq(&block_id_hex) + .or(substates::destroyed_by_block.eq(Some(&block_id_hex))), + ) + .get_results::(self.connection()) + .map_err(|e| SqliteStorageError::DieselError { + operation: "substates_get_all_for_block", + source: e, + })?; + + let substates = substates + .into_iter() + .map(TryInto::try_into) + .collect::, _>>()?; + + Ok(substates) + } } #[derive(QueryableByName)] diff --git a/dan_layer/state_store_sqlite/src/sql_models/block.rs b/dan_layer/state_store_sqlite/src/sql_models/block.rs index 6269f9666e..8f9f25db5c 100644 --- a/dan_layer/state_store_sqlite/src/sql_models/block.rs +++ b/dan_layer/state_store_sqlite/src/sql_models/block.rs @@ -50,6 +50,7 @@ impl Block { self.total_leader_fees as u64, self.is_dummy, self.is_processed, + self.is_committed, )) } } diff --git a/dan_layer/state_store_sqlite/src/sql_models/substate.rs b/dan_layer/state_store_sqlite/src/sql_models/substate.rs index 983edbddc6..28032a3928 100644 --- a/dan_layer/state_store_sqlite/src/sql_models/substate.rs +++ b/dan_layer/state_store_sqlite/src/sql_models/substate.rs @@ -3,7 +3,7 @@ use diesel::Queryable; use tari_dan_common_types::{Epoch, NodeHeight}; -use tari_dan_storage::{consensus_models, StorageError}; +use tari_dan_storage::{consensus_models, consensus_models::SubstateDestroyed, StorageError}; use time::PrimitiveDateTime; use crate::serialization::{deserialize_hex_try_from, deserialize_json, parse_from_string}; @@ -36,6 +36,30 @@ impl TryFrom for consensus_models::SubstateRecord { type Error = StorageError; fn try_from(value: SubstateRecord) -> Result { + let destroyed = value + .destroyed_by_transaction + .map(|tx_id| { + Ok::<_, StorageError>(SubstateDestroyed { + by_transaction: deserialize_hex_try_from(&tx_id)?, + justify: deserialize_hex_try_from(value.destroyed_justify.as_deref().ok_or_else(|| { + StorageError::DataInconsistency { + details: "destroyed_justify not provided".to_string(), + } + })?)?, + by_block: deserialize_hex_try_from(value.destroyed_by_block.as_deref().ok_or_else(|| { + StorageError::DataInconsistency { + details: "destroyed_by_block not provided".to_string(), + } + })?)?, + at_epoch: value.destroyed_at_epoch.map(|x| Epoch(x as u64)).ok_or_else(|| { + StorageError::DataInconsistency { + details: "destroyed_at_epoch not provided".to_string(), + } + })?, + }) + }) + .transpose()?; + Ok(Self { address: parse_from_string(&value.address)?, version: value.version as u32, @@ -45,22 +69,8 @@ impl TryFrom for consensus_models::SubstateRecord { created_justify: deserialize_hex_try_from(&value.created_justify)?, created_block: deserialize_hex_try_from(&value.created_block)?, created_height: NodeHeight(value.created_height as u64), - destroyed_by_transaction: value - .destroyed_by_transaction - .map(|x| deserialize_hex_try_from(&x)) - .transpose()?, - destroyed_justify: value - .destroyed_justify - .as_deref() - .map(deserialize_hex_try_from) - .transpose()?, - destroyed_by_block: value - .destroyed_by_block - .as_deref() - .map(deserialize_hex_try_from) - .transpose()?, + destroyed, created_at_epoch: Epoch(value.created_at_epoch as u64), - destroyed_at_epoch: value.destroyed_at_epoch.map(|x| Epoch(x as u64)), }) } } diff --git a/dan_layer/state_store_sqlite/src/writer.rs b/dan_layer/state_store_sqlite/src/writer.rs index 27a86d18c6..e4cc101fe2 100644 --- a/dan_layer/state_store_sqlite/src/writer.rs +++ b/dan_layer/state_store_sqlite/src/writer.rs @@ -31,6 +31,7 @@ use tari_dan_storage::{ LeafBlock, LockedBlock, LockedOutput, + QcId, QuorumCertificate, SubstateLockFlag, SubstateLockState, @@ -919,6 +920,7 @@ impl StateStoreWriteTransaction for SqliteStateStoreWrit epoch: Epoch, destroyed_block_id: &BlockId, destroyed_transaction_id: &TransactionId, + destroyed_qc_id: &QcId, ) -> Result<(), StorageError> { use crate::schema::substates; @@ -956,6 +958,7 @@ impl StateStoreWriteTransaction for SqliteStateStoreWrit substates::destroyed_by_transaction.eq(Some(serialize_hex(destroyed_transaction_id))), substates::destroyed_by_block.eq(Some(serialize_hex(destroyed_block_id))), substates::destroyed_at_epoch.eq(Some(epoch.as_u64() as i64)), + substates::destroyed_justify.eq(Some(serialize_hex(destroyed_qc_id))), ); diesel::update(substates::table) @@ -983,11 +986,11 @@ impl StateStoreWriteTransaction for SqliteStateStoreWrit substates::created_justify.eq(serialize_hex(substate.created_justify)), substates::created_block.eq(serialize_hex(substate.created_block)), substates::created_height.eq(substate.created_height.as_u64() as i64), - substates::destroyed_by_transaction.eq(substate.destroyed_by_transaction.as_ref().map(serialize_hex)), - substates::destroyed_justify.eq(substate.destroyed_justify.as_ref().map(serialize_hex)), - substates::destroyed_by_block.eq(substate.destroyed_by_block.as_ref().map(serialize_hex)), substates::created_at_epoch.eq(substate.created_at_epoch.as_u64() as i64), - substates::destroyed_at_epoch.eq(substate.destroyed_at_epoch.map(|e| e.as_u64() as i64)), + substates::destroyed_by_transaction.eq(substate.destroyed().map(|d| serialize_hex(d.by_transaction))), + substates::destroyed_justify.eq(substate.destroyed().map(|d| serialize_hex(d.justify))), + substates::destroyed_by_block.eq(substate.destroyed().map(|d| serialize_hex(d.by_block))), + substates::destroyed_at_epoch.eq(substate.destroyed().map(|d| d.at_epoch.as_u64() as i64)), ); diesel::insert_into(substates::table) diff --git a/dan_layer/storage/src/consensus_models/block.rs b/dan_layer/storage/src/consensus_models/block.rs index e516cbabf8..4f3242e072 100644 --- a/dan_layer/storage/src/consensus_models/block.rs +++ b/dan_layer/storage/src/consensus_models/block.rs @@ -14,7 +14,17 @@ use tari_transaction::TransactionId; use super::QuorumCertificate; use crate::{ - consensus_models::{Command, LastExecuted, LastProposed, LastVoted, LeafBlock, LockedBlock, Vote}, + consensus_models::{ + Command, + LastExecuted, + LastProposed, + LastVoted, + LeafBlock, + LockedBlock, + SubstateCreatedProof, + SubstateUpdate, + Vote, + }, StateStoreReadTransaction, StateStoreWriteTransaction, StorageError, @@ -40,6 +50,8 @@ pub struct Block { is_dummy: bool, /// Flag that indicates that the block locked objects and made transaction stage transitions. is_processed: bool, + /// Flag that indicates that the block has been committed. + is_committed: bool, } impl Block { @@ -65,6 +77,7 @@ impl Block { total_leader_fee, is_dummy: false, is_processed: false, + is_committed: false, }; block.id = block.calculate_hash().into(); block @@ -81,6 +94,7 @@ impl Block { total_leader_fee: u64, is_dummy: bool, is_processed: bool, + is_committed: bool, ) -> Self { Self { id, @@ -95,6 +109,7 @@ impl Block { total_leader_fee, is_dummy, is_processed, + is_committed, } } @@ -124,6 +139,7 @@ impl Block { total_leader_fee: 0, is_dummy: false, is_processed: false, + is_committed: true, } } @@ -248,6 +264,10 @@ impl Block { pub fn is_processed(&self) -> bool { self.is_processed } + + pub fn is_committed(&self) -> bool { + self.is_committed + } } impl Block { @@ -371,6 +391,31 @@ impl Block { ) -> Result, StorageError> { tx.blocks_get_any_with_epoch_range(range, validator_public_key) } + + pub fn get_substate_updates>( + &self, + tx: &mut TTx, + ) -> Result>, StorageError> { + let substates = tx.substates_get_all_for_block(self.id())?; + let mut updates = Vec::with_capacity(substates.len()); + for substate in substates { + let update = if let Some(destroyed) = substate.destroyed() { + SubstateUpdate::Destroy { + shard_id: substate.to_shard_id(), + proof: QuorumCertificate::get(tx, &destroyed.justify)?, + destroyed_by_transaction: destroyed.by_transaction, + } + } else { + SubstateUpdate::Create(SubstateCreatedProof { + created_qc: substate.get_created_quorum_certificate(tx)?, + substate: substate.into(), + }) + }; + updates.push(update); + } + + Ok(updates) + } } impl Display for Block { diff --git a/dan_layer/storage/src/consensus_models/command.rs b/dan_layer/storage/src/consensus_models/command.rs index 20839e62db..f68c947e43 100644 --- a/dan_layer/storage/src/consensus_models/command.rs +++ b/dan_layer/storage/src/consensus_models/command.rs @@ -56,6 +56,10 @@ impl Evidence { pub fn shards_iter(&self) -> impl Iterator + '_ { self.evidence.keys() } + + pub fn qc_ids_iter(&self) -> impl Iterator + '_ { + self.evidence.values().flatten() + } } impl FromIterator<(ShardId, Vec)> for Evidence { @@ -141,6 +145,14 @@ impl Command { Command::Accept(tx) => tx.evidence.shards_iter(), } } + + pub fn evidence(&self) -> &Evidence { + match self { + Command::Prepare(tx) => &tx.evidence, + Command::LocalPrepared(tx) => &tx.evidence, + Command::Accept(tx) => &tx.evidence, + } + } } impl PartialOrd for Command { diff --git a/dan_layer/storage/src/consensus_models/quorum_certificate.rs b/dan_layer/storage/src/consensus_models/quorum_certificate.rs index aff9ce6832..e98edfc6ec 100644 --- a/dan_layer/storage/src/consensus_models/quorum_certificate.rs +++ b/dan_layer/storage/src/consensus_models/quorum_certificate.rs @@ -3,6 +3,7 @@ use std::{fmt::Display, ops::DerefMut}; +use log::*; use serde::{Deserialize, Serialize}; use tari_common_types::types::{FixedHash, FixedHashSizeError}; use tari_dan_common_types::{ @@ -27,6 +28,8 @@ use crate::{ StorageError, }; +const LOG_TARGET: &str = "tari::dan::storage::quorum_certificate"; + #[derive(Debug, Clone, Deserialize, Serialize)] pub struct QuorumCertificate { qc_id: QcId, @@ -163,6 +166,13 @@ impl QuorumCertificate { tx.quorum_certificates_get(qc_id) } + pub fn get_all<'a, TTx: StateStoreReadTransaction + ?Sized, I: IntoIterator>( + tx: &mut TTx, + qc_ids: I, + ) -> Result, StorageError> { + tx.quorum_certificates_get_all(qc_ids) + } + pub fn get_block( &self, tx: &mut TTx, @@ -178,6 +188,33 @@ impl QuorumCertificate { Ok(tx.quorum_certificates_get(&self.qc_id).optional()?.is_some()) } + pub fn update_high_qc(&self, tx: &mut TTx) -> Result<(), StorageError> + where + TTx: StateStoreWriteTransaction + DerefMut, + TTx::Target: StateStoreReadTransaction, + { + let high_qc = HighQc::get(tx.deref_mut())?; + let high_qc = high_qc.get_quorum_certificate(tx.deref_mut())?; + + if high_qc.block_height() < self.block_height() { + debug!( + target: LOG_TARGET, + "🔥 UPDATE_HIGH_QC (node: {} {}, previous high QC: {} {})", + self.id(), + self.block_height(), + high_qc.block_id(), + high_qc.block_height(), + ); + + self.save(tx)?; + // This will fail if the block doesnt exist + self.as_leaf_block().set(tx)?; + self.as_high_qc().set(tx)?; + } + + Ok(()) + } + pub fn save(&self, tx: &mut TTx) -> Result where TTx: StateStoreWriteTransaction + DerefMut, diff --git a/dan_layer/storage/src/consensus_models/substate.rs b/dan_layer/storage/src/consensus_models/substate.rs index deafd7abd6..62ef04e9a6 100644 --- a/dan_layer/storage/src/consensus_models/substate.rs +++ b/dan_layer/storage/src/consensus_models/substate.rs @@ -1,7 +1,12 @@ // Copyright 2023 The Tari Project // SPDX-License-Identifier: BSD-3-Clause -use std::{borrow::Borrow, collections::HashSet, ops::RangeInclusive}; +use std::{ + borrow::Borrow, + collections::HashSet, + iter, + ops::{DerefMut, RangeInclusive}, +}; use serde::{Deserialize, Serialize}; use tari_common_types::types::FixedHash; @@ -10,7 +15,7 @@ use tari_engine_types::substate::{Substate, SubstateAddress, SubstateValue}; use tari_transaction::TransactionId; use crate::{ - consensus_models::{BlockId, QcId, QuorumCertificate}, + consensus_models::{Block, BlockId, QcId, QuorumCertificate}, StateStoreReadTransaction, StateStoreWriteTransaction, StorageError, @@ -26,11 +31,16 @@ pub struct SubstateRecord { pub created_justify: QcId, pub created_block: BlockId, pub created_height: NodeHeight, - pub destroyed_by_transaction: Option, - pub destroyed_justify: Option, - pub destroyed_by_block: Option, pub created_at_epoch: Epoch, - pub destroyed_at_epoch: Option, + pub destroyed: Option, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct SubstateDestroyed { + pub by_transaction: TransactionId, + pub justify: QcId, + pub by_block: BlockId, + pub at_epoch: Epoch, } impl SubstateRecord { @@ -51,13 +61,10 @@ impl SubstateRecord { state_hash: Default::default(), created_height, created_justify, - destroyed_justify: None, - destroyed_by_block: None, created_at_epoch, - destroyed_at_epoch: None, created_by_transaction, created_block, - destroyed_by_transaction: None, + destroyed: None, } } @@ -93,10 +100,6 @@ impl SubstateRecord { self.created_height } - pub fn destroyed_by_block(&self) -> Option { - self.destroyed_by_block - } - pub fn created_block(&self) -> BlockId { self.created_block } @@ -105,20 +108,16 @@ impl SubstateRecord { self.created_by_transaction } - pub fn destroyed_by_transaction(&self) -> Option { - self.destroyed_by_transaction - } - pub fn created_justify(&self) -> &QcId { &self.created_justify } - pub fn destroyed_justify(&self) -> Option<&QcId> { - self.destroyed_justify.as_ref() + pub fn destroyed(&self) -> Option<&SubstateDestroyed> { + self.destroyed.as_ref() } pub fn is_destroyed(&self) -> bool { - self.destroyed_by_transaction.is_some() + self.destroyed.is_some() } } @@ -215,10 +214,136 @@ impl SubstateRecord { &self, tx: &mut TTx, ) -> Result>, StorageError> { - self.destroyed_justify() - .map(|justify| tx.quorum_certificates_get(justify)) + self.destroyed() + .map(|destroyed| tx.quorum_certificates_get(&destroyed.justify)) .transpose() } + + pub fn destroy>( + &mut self, + tx: &mut TTx, + epoch: Epoch, + block_id: BlockId, + destroyed_qc: QcId, + destroyed_by_transaction: TransactionId, + ) -> Result<(), StorageError> { + self.destroyed = Some(SubstateDestroyed { + justify: destroyed_qc, + by_block: block_id, + by_transaction: destroyed_by_transaction, + at_epoch: epoch, + }); + + tx.substate_down_many( + iter::once(self.to_shard_id()), + epoch, + &block_id, + &destroyed_by_transaction, + &destroyed_qc, + ) + } + + pub fn destroy_many>( + tx: &mut TTx, + shard_ids: I, + epoch: Epoch, + destroyed_by_block: &BlockId, + destroyed_justify: &QcId, + destroyed_by_transaction: &TransactionId, + ) -> Result<(), StorageError> { + tx.substate_down_many( + shard_ids, + epoch, + destroyed_by_block, + destroyed_by_transaction, + destroyed_justify, + ) + } +} + +#[derive(Debug, Clone)] +pub struct SubstateCreatedProof { + pub substate: SubstateData, + pub created_qc: QuorumCertificate, +} + +#[derive(Debug, Clone)] +pub struct SubstateData { + pub address: SubstateAddress, + pub version: u32, + pub substate_value: SubstateValue, + pub created_by_transaction: TransactionId, +} + +impl From for SubstateData { + fn from(value: SubstateRecord) -> Self { + Self { + address: value.address, + version: value.version, + substate_value: value.substate_value, + created_by_transaction: value.created_by_transaction, + } + } +} + +#[derive(Debug, Clone)] +pub enum SubstateUpdate { + Create(SubstateCreatedProof), + Destroy { + shard_id: ShardId, + proof: QuorumCertificate, + destroyed_by_transaction: TransactionId, + }, +} + +impl SubstateUpdate { + pub fn apply(self, tx: &mut TTx, block: &Block) -> Result<(), StorageError> + where + TTx: StateStoreWriteTransaction + DerefMut, + TTx::Target: StateStoreReadTransaction, + { + match self { + Self::Create(proof) => { + proof.created_qc.save(tx)?; + SubstateRecord { + address: proof.substate.address, + version: proof.substate.version, + substate_value: proof.substate.substate_value, + state_hash: Default::default(), + created_by_transaction: proof.substate.created_by_transaction, + created_justify: *proof.created_qc.id(), + created_block: *block.id(), + created_height: block.height(), + created_at_epoch: block.epoch(), + destroyed: None, + } + .create(tx)?; + }, + Self::Destroy { + shard_id, + proof, + destroyed_by_transaction, + } => { + proof.save(tx)?; + SubstateRecord::destroy_many( + tx, + iter::once(shard_id), + block.epoch(), + block.id(), + proof.id(), + &destroyed_by_transaction, + )?; + }, + } + + Ok(()) + } +} + +impl From> for SubstateUpdate { + fn from(value: SubstateCreatedProof) -> Self { + Self::Create(value) + } } /// Substate lock flags diff --git a/dan_layer/storage/src/error.rs b/dan_layer/storage/src/error.rs index e679821b68..4e50058bd0 100644 --- a/dan_layer/storage/src/error.rs +++ b/dan_layer/storage/src/error.rs @@ -20,24 +20,17 @@ // WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE // USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -use std::io; - use tari_common_types::types::FixedHashSizeError; use tari_dan_common_types::optional::IsNotFoundError; -use tari_utilities::ByteArrayError; #[derive(Debug, thiserror::Error)] pub enum StorageError { #[error("Could not connect to storage:{reason}")] ConnectionError { reason: String }, - #[error("IO Error: {0}")] - Io(#[from] io::Error), #[error("Query error:{reason}")] QueryError { reason: String }, #[error("Migration error: {reason}")] MigrationError { reason: String }, - #[error("Invalid unit of work tracker type")] - InvalidUnitOfWorkTrackerType, #[error("Not found: item: {item}, key: {key}")] NotFound { item: String, key: String }, #[error("Not found in operation {operation}: {source}")] @@ -63,15 +56,10 @@ pub enum StorageError { FixedHashSizeError(#[from] FixedHashSizeError), #[error("Invalid integer cast")] InvalidIntegerCast, - #[error("Invalid ByteArray conversion: `{0}`")] - InvalidByteArrayConversion(#[from] ByteArrayError), - #[error("Invalid type cast: {reason}")] - InvalidTypeCasting { reason: String }, - + #[error("Data inconsistency: {details}")] + DataInconsistency { details: String }, #[error("General storage error: {details}")] General { details: String }, - #[error("Error converting substate type: {substate_type}")] - InvalidSubStateType { substate_type: String }, } impl IsNotFoundError for StorageError { diff --git a/dan_layer/storage/src/state_store/mod.rs b/dan_layer/storage/src/state_store/mod.rs index e46aed5124..6b0ee39c10 100644 --- a/dan_layer/storage/src/state_store/mod.rs +++ b/dan_layer/storage/src/state_store/mod.rs @@ -116,6 +116,10 @@ pub trait StateStoreReadTransaction { ) -> Result>, StorageError>; fn quorum_certificates_get(&mut self, qc_id: &QcId) -> Result, StorageError>; + fn quorum_certificates_get_all<'a, I: IntoIterator>( + &mut self, + qc_ids: I, + ) -> Result>, StorageError>; fn quorum_certificates_get_by_block_id( &mut self, block_id: &BlockId, @@ -166,6 +170,7 @@ pub trait StateStoreReadTransaction { &mut self, tx_id: &TransactionId, ) -> Result, StorageError>; + fn substates_get_all_for_block(&mut self, block_id: &BlockId) -> Result, StorageError>; } pub trait StateStoreWriteTransaction { @@ -259,6 +264,7 @@ pub trait StateStoreWriteTransaction { epoch: Epoch, destroyed_block_id: &BlockId, destroyed_transaction_id: &TransactionId, + destroyed_qc_id: &QcId, ) -> Result<(), StorageError>; fn substates_create(&mut self, substate: SubstateRecord) -> Result<(), StorageError>; // -------------------------------- Locked Outputs -------------------------------- // diff --git a/dan_layer/validator_node_rpc/proto/consensus.proto b/dan_layer/validator_node_rpc/proto/consensus.proto index c18b83d7e9..165e0fcb51 100644 --- a/dan_layer/validator_node_rpc/proto/consensus.proto +++ b/dan_layer/validator_node_rpc/proto/consensus.proto @@ -147,13 +147,34 @@ message DownState { } message RequestMissingTransactionsMessage { - uint64 epoch = 1; - bytes block_id = 2; - repeated bytes transaction_ids = 3; + uint64 epoch = 1; + bytes block_id = 2; + repeated bytes transaction_ids = 3; } message RequestedTransactionMessage { - uint64 epoch = 1; - bytes block_id = 2; - repeated tari.dan.transaction.Transaction transactions = 3; + uint64 epoch = 1; + bytes block_id = 2; + repeated tari.dan.transaction.Transaction transactions = 3; +} + +message Substate { + bytes address = 1; + uint32 version = 2; + bytes substate = 3; + + uint64 created_epoch = 4; + uint64 created_height = 5; + bytes created_block = 6; + bytes created_transaction = 7; + bytes created_justify = 8; + + SubstateDestroyed destroyed = 10; +} + +message SubstateDestroyed { + tari.dan.common.Epoch epoch = 9; + bytes block = 10; + bytes transaction = 11; + bytes justify = 12; } diff --git a/dan_layer/validator_node_rpc/proto/rpc.proto b/dan_layer/validator_node_rpc/proto/rpc.proto index b83dfbc493..cbc47c27a6 100644 --- a/dan_layer/validator_node_rpc/proto/rpc.proto +++ b/dan_layer/validator_node_rpc/proto/rpc.proto @@ -152,3 +152,55 @@ message GetVirtualSubstateResponse { bytes substate = 1; repeated tari.dan.consensus.QuorumCertificate quorum_certificates = 2; } + +message SyncStateRequest { + bytes start_block_id = 1; + uint64 end_epoch = 2; +} + +message SyncStateResponse { + SubstateUpdate update = 1; +} + +message SubstateCreatedProof { + SubstateData substate = 1; + tari.dan.consensus.QuorumCertificate created_justify = 2; +} + +// Minimal substate data +message SubstateData { + bytes address = 1; + uint32 version = 2; + bytes substate_value = 3; + bytes created_transaction = 7; +} + +message SubstateDestroyedProof { + bytes shard_id = 1; + tari.dan.consensus.QuorumCertificate destroyed_justify = 2; + bytes destroyed_by_transaction = 3; +} + +message SubstateUpdate { + oneof update { + SubstateCreatedProof create = 1; + SubstateDestroyedProof destroy = 2; + } +} + +message SyncBlocksRequest { + bytes start_block_id = 1; +} + +message SyncBlocksResponse { + tari.dan.consensus.Block block = 1; + repeated tari.dan.consensus.QuorumCertificate quorum_certificates = 2; + repeated SubstateUpdate substate_updates = 3; +} + +message GetHighQcRequest { } + +message GetHighQcResponse { + tari.dan.consensus.QuorumCertificate high_qc = 1; +} + diff --git a/dan_layer/validator_node_rpc/src/conversions/consensus.rs b/dan_layer/validator_node_rpc/src/conversions/consensus.rs index 7b217dd1b7..4f0c6dacb9 100644 --- a/dan_layer/validator_node_rpc/src/conversions/consensus.rs +++ b/dan_layer/validator_node_rpc/src/conversions/consensus.rs @@ -42,8 +42,11 @@ use tari_dan_storage::consensus_models::{ Evidence, QuorumCertificate, QuorumDecision, + SubstateDestroyed, + SubstateRecord, TransactionAtom, }; +use tari_engine_types::substate::{SubstateAddress, SubstateValue}; use tari_transaction::TransactionId; use crate::proto; @@ -92,7 +95,7 @@ impl TryFrom From> for proto::consensus::NewViewMessage { fn from(value: NewViewMessage) -> Self { Self { - high_qc: Some(value.high_qc.into()), + high_qc: Some((&value.high_qc).into()), new_height: value.new_height.0, epoch: value.epoch.as_u64(), } @@ -230,7 +233,7 @@ impl From for Evidence { // -------------------------------- QuorumCertificate -------------------------------- // -impl From> for proto::consensus::QuorumCertificate { - fn from(source: QuorumCertificate) -> Self { - // let source = value.borrow(); +impl From<&QuorumCertificate> for proto::consensus::QuorumCertificate { + fn from(source: &QuorumCertificate) -> Self { // TODO: unwrap let merged_merkle_proof = encode(&source.merged_proof()).unwrap(); Self { @@ -436,3 +438,72 @@ impl TryFrom for ValidatorMetadata { }) } } + +// -------------------------------- Substate -------------------------------- // + +impl TryFrom for SubstateRecord { + type Error = anyhow::Error; + + fn try_from(value: proto::consensus::Substate) -> Result { + Ok(Self { + address: SubstateAddress::from_bytes(&value.address)?, + version: value.version, + substate_value: SubstateValue::from_bytes(&value.substate)?, + state_hash: Default::default(), + + created_at_epoch: Epoch(value.created_epoch), + created_by_transaction: value.created_transaction.try_into()?, + created_justify: value.created_justify.try_into()?, + created_block: value.created_block.try_into()?, + created_height: NodeHeight(value.created_height), + + destroyed: value.destroyed.map(TryInto::try_into).transpose()?, + }) + } +} + +impl From for proto::consensus::Substate { + fn from(value: SubstateRecord) -> Self { + Self { + address: value.address.to_bytes(), + version: value.version, + substate: value.substate_value.to_bytes(), + + created_transaction: value.created_by_transaction.as_bytes().to_vec(), + created_justify: value.created_justify.as_bytes().to_vec(), + created_block: value.created_block.as_bytes().to_vec(), + created_height: value.created_height.as_u64(), + created_epoch: value.created_at_epoch.as_u64(), + + destroyed: value.destroyed.map(Into::into), + } + } +} + +// -------------------------------- SubstateDestroyed -------------------------------- // +impl TryFrom for SubstateDestroyed { + type Error = anyhow::Error; + + fn try_from(value: proto::consensus::SubstateDestroyed) -> Result { + Ok(Self { + by_transaction: value.transaction.try_into()?, + justify: value.justify.try_into()?, + by_block: value.block.try_into()?, + at_epoch: value + .epoch + .map(Into::into) + .ok_or_else(|| anyhow!("Epoch not provided"))?, + }) + } +} + +impl From for proto::consensus::SubstateDestroyed { + fn from(value: SubstateDestroyed) -> Self { + Self { + transaction: value.by_transaction.as_bytes().to_vec(), + justify: value.justify.as_bytes().to_vec(), + block: value.by_block.as_bytes().to_vec(), + epoch: Some(value.at_epoch.into()), + } + } +} diff --git a/dan_layer/validator_node_rpc/src/conversions/rpc.rs b/dan_layer/validator_node_rpc/src/conversions/rpc.rs index 42ab3359f3..d168309c8d 100644 --- a/dan_layer/validator_node_rpc/src/conversions/rpc.rs +++ b/dan_layer/validator_node_rpc/src/conversions/rpc.rs @@ -3,71 +3,100 @@ use std::convert::{TryFrom, TryInto}; -use tari_dan_common_types::{Epoch, NodeHeight}; -use tari_dan_storage::consensus_models::SubstateRecord; +use anyhow::anyhow; +use tari_dan_common_types::NodeAddressable; +use tari_dan_storage::consensus_models::{SubstateCreatedProof, SubstateData, SubstateUpdate}; use tari_engine_types::substate::{SubstateAddress, SubstateValue}; use crate::proto; -impl TryFrom for SubstateRecord { +impl TryFrom for SubstateCreatedProof { type Error = anyhow::Error; - fn try_from(value: proto::rpc::VnStateSyncResponse) -> Result { + fn try_from(value: proto::rpc::SubstateCreatedProof) -> Result { + Ok(Self { + substate: value + .substate + .map(TryInto::try_into) + .transpose()? + .ok_or_else(|| anyhow!("substate not provided"))?, + created_qc: value + .created_justify + .map(TryInto::try_into) + .transpose()? + .ok_or_else(|| anyhow!("created_justify not provided"))?, + }) + } +} + +impl From> for proto::rpc::SubstateCreatedProof { + fn from(value: SubstateCreatedProof) -> Self { + Self { + substate: Some(value.substate.into()), + created_justify: Some((&value.created_qc).into()), + } + } +} + +impl TryFrom for SubstateUpdate { + type Error = anyhow::Error; + + fn try_from(value: proto::rpc::SubstateUpdate) -> Result { + let update = value.update.ok_or_else(|| anyhow!("update not provided"))?; + match update { + proto::rpc::substate_update::Update::Create(substate_proof) => Ok(Self::Create(substate_proof.try_into()?)), + proto::rpc::substate_update::Update::Destroy(proof) => Ok(Self::Destroy { + shard_id: proof.shard_id.try_into()?, + proof: proof + .destroyed_justify + .map(TryInto::try_into) + .transpose()? + .ok_or_else(|| anyhow!("destroyed_justify not provided"))?, + destroyed_by_transaction: proof.destroyed_by_transaction.try_into()?, + }), + } + } +} + +impl From> for proto::rpc::SubstateUpdate { + fn from(value: SubstateUpdate) -> Self { + let update = match value { + SubstateUpdate::Create(proof) => proto::rpc::substate_update::Update::Create(proof.into()), + SubstateUpdate::Destroy { + proof, + shard_id, + destroyed_by_transaction, + } => proto::rpc::substate_update::Update::Destroy(proto::rpc::SubstateDestroyedProof { + shard_id: shard_id.as_bytes().to_vec(), + destroyed_justify: Some((&proof).into()), + destroyed_by_transaction: destroyed_by_transaction.as_bytes().to_vec(), + }), + }; + + Self { update: Some(update) } + } +} + +impl TryFrom for SubstateData { + type Error = anyhow::Error; + + fn try_from(value: proto::rpc::SubstateData) -> Result { Ok(Self { address: SubstateAddress::from_bytes(&value.address)?, version: value.version, - substate_value: SubstateValue::from_bytes(&value.substate)?, - state_hash: Default::default(), - - created_at_epoch: Epoch(value.created_epoch), + substate_value: SubstateValue::from_bytes(&value.substate_value)?, created_by_transaction: value.created_transaction.try_into()?, - created_justify: value.created_justify.try_into()?, - created_block: value.created_block.try_into()?, - created_height: NodeHeight(value.created_height), - - destroyed_by_transaction: Some(value.destroyed_transaction) - .filter(|v| !v.is_empty()) - .map(TryInto::try_into) - .transpose()?, - destroyed_justify: Some(value.destroyed_justify) - .filter(|v| !v.is_empty()) - .map(TryInto::try_into) - .transpose()?, - destroyed_by_block: Some(value.destroyed_block) - .filter(|v| !v.is_empty()) - .map(TryInto::try_into) - .transpose()?, - destroyed_at_epoch: value.destroyed_epoch.map(Into::into), }) } } -impl From for proto::rpc::VnStateSyncResponse { - fn from(value: SubstateRecord) -> Self { +impl From for proto::rpc::SubstateData { + fn from(value: SubstateData) -> Self { Self { address: value.address.to_bytes(), version: value.version, - substate: value.substate_value.to_bytes(), - + substate_value: value.substate_value.to_bytes(), created_transaction: value.created_by_transaction.as_bytes().to_vec(), - created_justify: value.created_justify.as_bytes().to_vec(), - created_block: value.created_block.as_bytes().to_vec(), - created_height: value.created_height.as_u64(), - created_epoch: value.created_at_epoch.as_u64(), - - destroyed_transaction: value - .destroyed_by_transaction - .map(|s| s.as_bytes().to_vec()) - .unwrap_or_default(), - destroyed_justify: value - .destroyed_justify - .map(|id| id.as_bytes().to_vec()) - .unwrap_or_default(), - destroyed_block: value - .destroyed_by_block - .map(|s| s.as_bytes().to_vec()) - .unwrap_or_default(), - destroyed_epoch: value.destroyed_at_epoch.map(Into::into), } } } diff --git a/dan_layer/validator_node_rpc/src/rpc_service.rs b/dan_layer/validator_node_rpc/src/rpc_service.rs index ce06ef1eb4..71d0b8c56a 100644 --- a/dan_layer/validator_node_rpc/src/rpc_service.rs +++ b/dan_layer/validator_node_rpc/src/rpc_service.rs @@ -20,11 +20,11 @@ pub trait ValidatorNodeRpcService: Send + Sync + 'static { request: Request, ) -> Result, RpcStatus>; - #[rpc(method = 3)] - async fn vn_state_sync( - &self, - request: Request, - ) -> Result, RpcStatus>; + // #[rpc(method = 3)] + // async fn vn_state_sync( + // &self, + // request: Request, + // ) -> Result, RpcStatus>; #[rpc(method = 4)] async fn get_substate( @@ -43,4 +43,15 @@ pub trait ValidatorNodeRpcService: Send + Sync + 'static { &self, req: Request, ) -> Result, RpcStatus>; + + #[rpc(method = 7)] + async fn sync_blocks( + &self, + request: Request, + ) -> Result, RpcStatus>; + #[rpc(method = 8)] + async fn get_high_qc( + &self, + request: Request, + ) -> Result, RpcStatus>; }