Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
sdbondi committed Sep 18, 2023
1 parent 6cec238 commit 3ce4c9f
Show file tree
Hide file tree
Showing 33 changed files with 831 additions and 53 deletions.
41 changes: 30 additions & 11 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ members = [
"clients/base_node_client",
"clients/validator_node_client",
"clients/wallet_daemon_client",
"dan_layer/comms_rpc_state_sync",
"dan_layer/consensus",
"dan_layer/consensus_tests",
"dan_layer/epoch_manager",
Expand Down
1 change: 1 addition & 0 deletions applications/tari_validator_node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ minotari_wallet_grpc_client = { git = "https://github.com/tari-project/tari.git"
tari_base_node_client = { path = "../../clients/base_node_client" }
tari_epoch_manager = { path = "../../dan_layer/epoch_manager", features = ["base_layer"] }
tari_indexer_lib = { path = "../../dan_layer/indexer_lib" }
tari_comms_rpc_state_sync = { path = "../../dan_layer/comms_rpc_state_sync" }

tari_consensus = { path = "../../dan_layer/consensus" }
tari_state_store_sqlite = { path = "../../dan_layer/state_store_sqlite" }
Expand Down
1 change: 1 addition & 0 deletions applications/tari_validator_node/src/bootstrap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,7 @@ pub async fn spawn_services(
rx_consensus_message,
outbound_messaging,
mempool.clone(),
validator_node_client_factory.clone(),
shutdown.clone(),
)
.await;
Expand Down
8 changes: 6 additions & 2 deletions applications/tari_validator_node/src/consensus/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use std::sync::Arc;

use tari_common_types::types::PublicKey;
use tari_comms::{types::CommsPublicKey, NodeIdentity};
use tari_comms_rpc_state_sync::CommsRpcStateSyncManager;
use tari_consensus::{
hotstuff::{ConsensusWorker, ConsensusWorkerContext, HotstuffEvent, HotstuffWorker},
messages::HotstuffMessage,
Expand All @@ -16,6 +17,7 @@ use tari_epoch_manager::base_layer::EpochManagerHandle;
use tari_shutdown::ShutdownSignal;
use tari_state_store_sqlite::SqliteStateStore;
use tari_transaction::{Transaction, TransactionId};
use tari_validator_node_rpc::client::TariCommsValidatorNodeClientFactory;
use tokio::{
sync::{broadcast, mpsc},
task::JoinHandle,
Expand Down Expand Up @@ -45,6 +47,7 @@ pub async fn spawn(
rx_hs_message: mpsc::Receiver<(CommsPublicKey, HotstuffMessage<PublicKey>)>,
outbound_messaging: OutboundMessaging,
mempool: MempoolHandle,
client_factory: TariCommsValidatorNodeClientFactory,
shutdown_signal: ShutdownSignal,
) -> (JoinHandle<Result<(), anyhow::Error>>, EventSubscription<HotstuffEvent>) {
let (tx_broadcast, rx_broadcast) = mpsc::channel(10);
Expand All @@ -64,7 +67,7 @@ pub async fn spawn(
validator_addr,
rx_new_transactions,
rx_hs_message,
store,
store.clone(),
epoch_manager.clone(),
leader_strategy,
signing_service,
Expand All @@ -78,9 +81,10 @@ pub async fn spawn(
);

let context = ConsensusWorkerContext {
epoch_manager,
epoch_manager: epoch_manager.clone(),
epoch_events,
hotstuff: hotstuff_worker,
state_sync: CommsRpcStateSyncManager::new(epoch_manager, store, client_factory),
};

let handle = ConsensusWorker::new(shutdown_signal).spawn(context);
Expand Down
2 changes: 2 additions & 0 deletions applications/tari_validator_node/src/consensus/spec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// SPDX-License-Identifier: BSD-3-Clause

use tari_comms::types::CommsPublicKey;
use tari_comms_rpc_state_sync::CommsRpcStateSyncManager;
use tari_consensus::traits::ConsensusSpec;
use tari_epoch_manager::base_layer::EpochManagerHandle;
use tari_state_store_sqlite::SqliteStateStore;
Expand All @@ -20,5 +21,6 @@ impl ConsensusSpec for TariConsensusSpec {
type LeaderStrategy = RoundRobinLeaderStrategy;
type StateManager = TariStateManager;
type StateStore = SqliteStateStore<Self::Addr>;
type SyncManager = CommsRpcStateSyncManager<Self::EpochManager, Self::StateStore>;
type VoteSignatureService = TariSignatureService;
}
143 changes: 140 additions & 3 deletions applications/tari_validator_node/src/p2p/rpc/service_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use tari_comms::protocol::rpc::{Request, Response, RpcStatus, Streaming};
use tari_dan_common_types::{optional::Optional, NodeAddressable, ShardId};
use tari_dan_p2p::PeerProvider;
use tari_dan_storage::{
consensus_models::{SubstateRecord, TransactionRecord},
consensus_models::{Block, BlockId, HighQc, SubstateRecord, TransactionRecord},
StateStore,
};
use tari_engine_types::virtual_substate::VirtualSubstateAddress;
Expand All @@ -39,12 +39,18 @@ use tari_transaction::{Transaction, TransactionId};
use tari_validator_node_rpc::{
proto,
proto::rpc::{
GetHighQcRequest,
GetHighQcResponse,
GetSubstateRequest,
GetSubstateResponse,
GetTransactionResultRequest,
GetTransactionResultResponse,
PayloadResultStatus,
SubstateStatus,
SyncBlocksRequest,
SyncBlocksResponse,
SyncStateRequest,
SyncStateResponse,
VnStateSyncRequest,
VnStateSyncResponse,
},
Expand Down Expand Up @@ -245,7 +251,7 @@ where TPeerProvider: PeerProvider + Clone + Send + Sync + 'static
quorum_certificates: Some(created_qc)
.into_iter()
.chain(destroyed_qc)
.map(Into::into)
.map(|qc| (&qc).into())
.collect(),
..Default::default()
}
Expand All @@ -257,7 +263,7 @@ where TPeerProvider: PeerProvider + Clone + Send + Sync + 'static
substate: substate.substate_value().to_bytes(),
created_transaction_hash: substate.created_by_transaction().into_array().to_vec(),
destroyed_transaction_hash: vec![],
quorum_certificates: vec![created_qc.into()],
quorum_certificates: vec![(&created_qc).into()],
}
};

Expand Down Expand Up @@ -328,4 +334,135 @@ where TPeerProvider: PeerProvider + Clone + Send + Sync + 'static
abort_details,
}))
}

async fn sync_state(&self, request: Request<SyncStateRequest>) -> Result<Streaming<SyncStateResponse>, RpcStatus> {
let (tx, rx) = mpsc::channel(10);
let msg = request.into_message();

let start_shard_id = msg
.start_block_id
.and_then(|s| ShardId::try_from(s).ok())
.ok_or_else(|| RpcStatus::bad_request("start_shard_id malformed or not provided"))?;

let shard_db = self.shard_state_store.clone();

task::spawn(async move {
let shards_substates_data = shard_db.with_read_tx(|tx| {
SubstateRecord::get_many_within_range(tx, start_shard_id..=end_shard_id, excluded_shards.as_slice())
});

let substates = match shards_substates_data {
Ok(s) => s,
Err(err) => {
error!(target: LOG_TARGET, "{}", err);
let _ignore = tx.send(Err(RpcStatus::general(&err))).await;
return;
},
};

if substates.is_empty() {
return;
}

// select data from db where shard_id <= end_shard_id and shard_id >= start_shard_id
for substate in substates {
match proto::rpc::VnStateSyncResponse::try_from(substate) {
Ok(r) => {
if tx.send(Ok(r)).await.is_err() {
debug!(
target: LOG_TARGET,
"Peer stream closed by client before completing. Aborting"
);
break;
}
},
Err(e) => {
error!(target: LOG_TARGET, "{}", e);
let _ignore = tx.send(Err(RpcStatus::general(&e))).await;
return;
},
}
}
});
Ok(Streaming::new(rx))
}

async fn sync_blocks(
&self,
request: Request<SyncBlocksRequest>,
) -> Result<Streaming<SyncBlocksResponse>, RpcStatus> {
let req = request.into_message();
let store = self.shard_state_store.clone();

let (sender, receiver) = mpsc::channel(10);

let start_block_id = BlockId::try_from(req.start_block_id)
.map_err(|e| RpcStatus::bad_request(&format!("Invalid encoded block id: {}", e)))?;
// Check if we have the blocks
let start_block = store
.with_read_tx(|tx| Block::get(tx, &start_block_id).optional())
.map_err(RpcStatus::log_internal_error(LOG_TARGET))?
.ok_or_else(|| RpcStatus::not_found("start_block_id not found"))?;

task::spawn(async move {
let mut buffer = Vec::with_capacity(10);
let mut current_block = &start_block;
loop {
store
.with_read_tx(|tx| {
loop {
let children = current_block.get_child_blocks(tx)?;
let Some(child) = children.into_iter().find(|b| b.is_committed()) else {
break;
};

current_block = &child;
buffer.push(child);
if buffer.len() == buffer.capacity() {
break;
}
}
Ok(())
})
.map_err(RpcStatus::log_internal_error(LOG_TARGET))?;

let num_items = buffer.len();

for block in buffer.drain(..) {
if sender
.send(Ok(SyncBlocksResponse {
block: Some(block.into()),
foreign_qcs: vec![],
transactions: vec![],
}))
.await
.is_err()
{
debug!(
target: LOG_TARGET,
"Peer stream closed by client before completing. Aborting"
);
break;
}
}

// If we didnt fill up the buffer, we have no more to send so we can end
if num_items < buffer.capacity() {
break;
}
}
});
Ok(Streaming::new(receiver))
}

async fn get_high_qc(&self, _request: Request<GetHighQcRequest>) -> Result<Response<GetHighQcResponse>, RpcStatus> {
let high_qc = self
.shard_state_store
.with_read_tx(|tx| HighQc::get(tx)?.get_quorum_certificate(tx))
.map_err(RpcStatus::log_internal_error(LOG_TARGET))?;

Ok(Response::new(GetHighQcResponse {
high_qc: Some((&high_qc).into()),
}))
}
}
10 changes: 0 additions & 10 deletions dan_layer/common_types/src/committee.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,6 @@ impl<TAddr: NodeAddressable> Committee<TAddr> {
&self.members
}

/// Returns n - f where n is the number of committee members and f is the tolerated failure nodes.
pub fn consensus_threshold(&self) -> usize {
let len = self.members.len();
if len == 0 {
return 0;
}
let max_failures = (len - 1) / 3;
len - max_failures
}

pub fn max_failures(&self) -> usize {
let len = self.members.len();
if len == 0 {
Expand Down
Loading

0 comments on commit 3ce4c9f

Please sign in to comment.