Skip to content

Commit

Permalink
fix(consensus): use temporary status updates for blocks > locked block
Browse files Browse the repository at this point in the history
  • Loading branch information
sdbondi committed Oct 2, 2023
1 parent 2b3b9ec commit 06115bc
Show file tree
Hide file tree
Showing 52 changed files with 1,642 additions and 519 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ where TTemplateProvider: TemplateProvider<Template = LoadedTemplate>
let result = match processor.execute(transaction.clone()) {
Ok(result) => result,
Err(err) => ExecuteResult {
finalize: FinalizeResult::reject(tx_id, RejectReason::ExecutionFailure(err.to_string())),
finalize: FinalizeResult::new_rejectted(tx_id, RejectReason::ExecutionFailure(err.to_string())),
transaction_failure: None,
fee_receipt: None,
},
Expand Down
42 changes: 21 additions & 21 deletions applications/tari_validator_node/src/bootstrap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ use tari_common::{
};
use tari_common_types::types::PublicKey;
use tari_comms::{protocol::rpc::RpcServer, types::CommsPublicKey, CommsNode, NodeIdentity, UnspawnedCommsNode};
use tari_consensus::hotstuff::HotstuffEvent;
use tari_dan_app_utilities::{
base_layer_scanner,
consensus_constants::ConsensusConstants,
Expand Down Expand Up @@ -76,8 +75,8 @@ use tokio::{sync::mpsc, task::JoinHandle};
use crate::{
comms,
consensus,
consensus::ConsensusHandle,
dry_run_transaction_processor::DryRunTransactionProcessor,
event_subscription::EventSubscription,
p2p::{
create_tari_validator_node_rpc_service,
services::{
Expand Down Expand Up @@ -194,9 +193,23 @@ pub async fn spawn_services(

let validator_node_client_factory = TariCommsValidatorNodeClientFactory::new(comms.connectivity());

// Consensus
let (tx_executed_transaction, rx_executed_transaction) = mpsc::channel(10);
let (consensus_join_handle, consensus_handle, rx_consensus_to_mempool) = consensus::spawn(
state_store.clone(),
node_identity.clone(),
epoch_manager.clone(),
rx_executed_transaction,
rx_consensus_message,
outbound_messaging.clone(),
validator_node_client_factory.clone(),
shutdown.clone(),
)
.await;
handles.push(consensus_join_handle);

// Mempool
let virtual_substate_manager = VirtualSubstateManager::new(state_store.clone(), epoch_manager.clone());
let (tx_executed_transaction, rx_executed_transaction) = mpsc::channel(10);
let scanner = SubstateScanner::new(epoch_manager.clone(), validator_node_client_factory.clone());
let substate_resolver = TariSubstateResolver::new(
state_store.clone(),
Expand All @@ -206,7 +219,7 @@ pub async fn spawn_services(
);
let (mempool, join_handle) = mempool::spawn(
rx_new_transaction_message,
outbound_messaging.clone(),
outbound_messaging,
tx_executed_transaction,
epoch_manager.clone(),
node_identity.clone(),
Expand All @@ -219,6 +232,8 @@ pub async fn spawn_services(
),
create_mempool_after_execute_validator(state_store.clone()),
state_store.clone(),
rx_consensus_to_mempool,
consensus_handle.clone(),
);
handles.push(join_handle);

Expand All @@ -236,21 +251,6 @@ pub async fn spawn_services(
);
handles.push(join_handle);

// Consensus
let (consensus_handle, consensus_events) = consensus::spawn(
state_store.clone(),
node_identity.clone(),
epoch_manager.clone(),
rx_executed_transaction,
rx_consensus_message,
outbound_messaging,
mempool.clone(),
validator_node_client_factory.clone(),
shutdown.clone(),
)
.await;
handles.push(consensus_handle);

let comms = setup_p2p_rpc(
config,
comms,
Expand Down Expand Up @@ -289,7 +289,7 @@ pub async fn spawn_services(
mempool,
epoch_manager,
template_manager: template_manager_service,
hotstuff_events: consensus_events,
consensus_handle,
global_db,
state_store,
dry_run_transaction_processor,
Expand Down Expand Up @@ -321,7 +321,7 @@ pub struct Services {
pub mempool: MempoolHandle,
pub epoch_manager: EpochManagerHandle,
pub template_manager: TemplateManagerHandle,
pub hotstuff_events: EventSubscription<HotstuffEvent>,
pub consensus_handle: ConsensusHandle,
pub global_db: GlobalDb<SqliteGlobalDbAdapter>,
pub dry_run_transaction_processor: DryRunTransactionProcessor,
pub validator_node_client_factory: TariCommsValidatorNodeClientFactory,
Expand Down
33 changes: 33 additions & 0 deletions applications/tari_validator_node/src/consensus/handle.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// Copyright 2023 The Tari Project
// SPDX-License-Identifier: BSD-3-Clause

use tari_consensus::hotstuff::{ConsensusCurrentState, HotstuffEvent};
use tokio::sync::{broadcast, watch};

use crate::event_subscription::EventSubscription;

#[derive(Debug, Clone)]
pub struct ConsensusHandle {
rx_current_state: watch::Receiver<ConsensusCurrentState>,
events_subscription: EventSubscription<HotstuffEvent>,
}

impl ConsensusHandle {
pub(super) fn new(
rx_current_state: watch::Receiver<ConsensusCurrentState>,
events_subscription: EventSubscription<HotstuffEvent>,
) -> Self {
Self {
rx_current_state,
events_subscription,
}
}

pub fn subscribe_to_hotstuff_events(&mut self) -> broadcast::Receiver<HotstuffEvent> {
self.events_subscription.subscribe()
}

pub fn get_current_state(&self) -> ConsensusCurrentState {
*self.rx_current_state.borrow()
}
}
36 changes: 21 additions & 15 deletions applications/tari_validator_node/src/consensus/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use tari_common_types::types::PublicKey;
use tari_comms::{types::CommsPublicKey, NodeIdentity};
use tari_comms_rpc_state_sync::CommsRpcStateSyncManager;
use tari_consensus::{
hotstuff::{ConsensusWorker, ConsensusWorkerContext, HotstuffEvent, HotstuffWorker},
hotstuff::{ConsensusWorker, ConsensusWorkerContext, HotstuffWorker},
messages::HotstuffMessage,
};
use tari_dan_common_types::committee::Committee;
Expand All @@ -19,7 +19,7 @@ use tari_state_store_sqlite::SqliteStateStore;
use tari_transaction::{Transaction, TransactionId};
use tari_validator_node_rpc::client::TariCommsValidatorNodeClientFactory;
use tokio::{
sync::{broadcast, mpsc},
sync::{broadcast, mpsc, watch},
task::JoinHandle,
};

Expand All @@ -31,25 +31,31 @@ use crate::{
state_manager::TariStateManager,
},
event_subscription::EventSubscription,
p2p::services::{mempool::MempoolHandle, messaging::OutboundMessaging},
p2p::services::messaging::OutboundMessaging,
};

mod handle;
mod leader_selection;
mod signature_service;
mod spec;
mod state_manager;

pub use handle::*;

pub async fn spawn(
store: SqliteStateStore<PublicKey>,
node_identity: Arc<NodeIdentity>,
epoch_manager: EpochManagerHandle,
rx_new_transactions: mpsc::Receiver<TransactionId>,
rx_hs_message: mpsc::Receiver<(CommsPublicKey, HotstuffMessage<PublicKey>)>,
outbound_messaging: OutboundMessaging,
mempool: MempoolHandle,
client_factory: TariCommsValidatorNodeClientFactory,
shutdown_signal: ShutdownSignal,
) -> (JoinHandle<Result<(), anyhow::Error>>, EventSubscription<HotstuffEvent>) {
) -> (
JoinHandle<Result<(), anyhow::Error>>,
ConsensusHandle,
mpsc::UnboundedReceiver<Transaction>,
) {
let (tx_broadcast, rx_broadcast) = mpsc::channel(10);
let (tx_leader, rx_leader) = mpsc::channel(10);
let (tx_mempool, rx_mempool) = mpsc::unbounded_channel();
Expand All @@ -59,7 +65,7 @@ pub async fn spawn(
let leader_strategy = RoundRobinLeaderStrategy::new();
let transaction_pool = TransactionPool::new();
let state_manager = TariStateManager::new();
let (tx_events, _) = broadcast::channel(100);
let (tx_hotstuff_events, _) = broadcast::channel(100);

let epoch_events = epoch_manager.subscribe().await.unwrap();

Expand All @@ -75,38 +81,40 @@ pub async fn spawn(
transaction_pool,
tx_broadcast,
tx_leader,
tx_events.clone(),
tx_hotstuff_events.clone(),
tx_mempool,
shutdown_signal.clone(),
);

let (tx_current_state, rx_current_state) = watch::channel(Default::default());
let context = ConsensusWorkerContext {
epoch_manager: epoch_manager.clone(),
epoch_events,
hotstuff: hotstuff_worker,
state_sync: CommsRpcStateSyncManager::new(epoch_manager, store, client_factory),
tx_current_state,
};

let handle = ConsensusWorker::new(shutdown_signal).spawn(context);

ConsensusMessageWorker {
rx_broadcast,
rx_leader,
rx_mempool,
outbound_messaging,
mempool,
}
.spawn();

(handle, EventSubscription::new(tx_events))
(
handle,
ConsensusHandle::new(rx_current_state, EventSubscription::new(tx_hotstuff_events)),
rx_mempool,
)
}

struct ConsensusMessageWorker {
rx_broadcast: mpsc::Receiver<(Committee<CommsPublicKey>, HotstuffMessage<CommsPublicKey>)>,
rx_leader: mpsc::Receiver<(CommsPublicKey, HotstuffMessage<CommsPublicKey>)>,
rx_mempool: mpsc::UnboundedReceiver<Transaction>,
outbound_messaging: OutboundMessaging,
mempool: MempoolHandle,
}

impl ConsensusMessageWorker {
Expand All @@ -126,9 +134,7 @@ impl ConsensusMessageWorker {
.await
.ok();
},
Some(tx) = self.rx_mempool.recv() => {
self.mempool.submit_transaction_without_propagating(tx).await.ok();
},

else => break,
}
}
Expand Down
2 changes: 1 addition & 1 deletion applications/tari_validator_node/src/dan_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ impl DanNode {
}

pub async fn start(mut self, mut shutdown: ShutdownSignal) -> Result<(), anyhow::Error> {
let mut hotstuff_events = self.services.hotstuff_events.subscribe();
let mut hotstuff_events = self.services.consensus_handle.subscribe_to_hotstuff_events();

let mut connectivity_events = self.services.comms.connectivity().get_event_subscription();

Expand Down
1 change: 1 addition & 0 deletions applications/tari_validator_node/src/event_subscription.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use tokio::sync::broadcast;
///
/// We hold a sender because if we held a receiver then the broadcast buffer would always fill up because the receiver
/// isn't reading off of it.
#[derive(Debug)]
pub struct EventSubscription<T>(broadcast::Sender<T>);

impl<T> EventSubscription<T> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,18 +70,6 @@ impl MempoolHandle {
rx.await?
}

pub async fn submit_transaction_without_propagating(&self, transaction: Transaction) -> Result<(), MempoolError> {
let (reply, rx) = oneshot::channel();
self.tx_mempool_request
.send(MempoolRequest::SubmitTransaction {
transaction: Box::new(transaction),
should_propagate: false,
reply,
})
.await?;
rx.await?
}

pub async fn remove_transaction(&self, transaction_id: TransactionId) -> Result<(), MempoolError> {
self.tx_mempool_request
.send(MempoolRequest::RemoveTransaction { transaction_id })
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ use tari_transaction::{Transaction, TransactionId};
use tokio::{sync::mpsc, task, task::JoinHandle};

use crate::{
consensus::ConsensusHandle,
p2p::services::{
mempool::{handle::MempoolHandle, service::MempoolService, MempoolError, SubstateResolver, Validator},
messaging::OutboundMessaging,
Expand All @@ -51,6 +52,8 @@ pub fn spawn<TExecutor, TValidator, TExecutedValidator, TSubstateResolver>(
validator: TValidator,
after_executed_validator: TExecutedValidator,
state_store: SqliteStateStore<PublicKey>,
rx_consensus_to_mempool: mpsc::UnboundedReceiver<Transaction>,
consensus_handle: ConsensusHandle,
) -> (MempoolHandle, JoinHandle<anyhow::Result<()>>)
where
TValidator: Validator<Transaction, Error = MempoolError> + Send + Sync + 'static,
Expand All @@ -72,6 +75,8 @@ where
validator,
after_executed_validator,
state_store,
rx_consensus_to_mempool,
consensus_handle,
);
let handle = MempoolHandle::new(tx_mempool_request);

Expand Down
Loading

0 comments on commit 06115bc

Please sign in to comment.