Skip to content

Commit

Permalink
feat: integrate batched consensus
Browse files Browse the repository at this point in the history
  • Loading branch information
sdbondi committed Jul 14, 2023
1 parent dc906bf commit d3dd815
Show file tree
Hide file tree
Showing 286 changed files with 5,359 additions and 15,381 deletions.
189 changes: 30 additions & 159 deletions Cargo.lock

Large diffs are not rendered by default.

3 changes: 1 addition & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,11 @@ members = [
"clients/base_node_client",
"clients/validator_node_client",
"clients/wallet_daemon_client",
"dan_layer/core",
"dan_layer/consensus",
"dan_layer/consensus_tests",
"dan_layer/epoch_manager",
"dan_layer/indexer_lib",
"dan_layer/integration_tests",
"dan_layer/p2p",
"dan_layer/state_store_sqlite",
"dan_layer/storage_lmdb",
"dan_layer/storage_sqlite",
Expand Down
2 changes: 1 addition & 1 deletion applications/tari_dan_app_utilities/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ tari_crypto = { version = "0.17" }
tari_shutdown = { git = "https://github.com/tari-project/tari.git", tag = "v0.51.0-pre.4" }

tari_dan_common_types = { path = "../../dan_layer/common_types" }
tari_dan_core = { path = "../../dan_layer/core" }
tari_state_store_sqlite = { path = "../../dan_layer/state_store_sqlite" }
tari_dan_engine = { path = "../../dan_layer/engine" }
tari_dan_storage = { path = "../../dan_layer/storage" }
tari_dan_storage_sqlite = { path = "../../dan_layer/storage_sqlite" }
Expand Down
85 changes: 53 additions & 32 deletions applications/tari_dan_app_utilities/src/base_layer_scanner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,32 +37,28 @@ use tari_core::transactions::transaction_components::{
ValidatorNodeRegistration,
};
use tari_crypto::tari_utilities::ByteArray;
use tari_dan_common_types::{optional::Optional, ShardId};
use tari_dan_core::consensus_constants::ConsensusConstants;
use tari_dan_common_types::{optional::Optional, Epoch, NodeHeight};
use tari_dan_storage::{
consensus_models::{Block, SubstateRecord},
global::{GlobalDb, MetadataKey},
ShardStore,
ShardStoreWriteTransaction,
StateStore,
StorageError,
};
use tari_dan_storage_sqlite::{
error::SqliteStorageError,
global::SqliteGlobalDbAdapter,
sqlite_shard_store_factory::SqliteShardStore,
};
use tari_dan_storage_sqlite::{error::SqliteStorageError, global::SqliteGlobalDbAdapter};
use tari_engine_types::{
confidential::UnclaimedConfidentialOutput,
substate::{Substate, SubstateAddress, SubstateValue},
};
use tari_epoch_manager::{
base_layer::{EpochManagerError, EpochManagerHandle},
EpochManager,
substate::{SubstateAddress, SubstateValue},
};
use tari_epoch_manager::{base_layer::EpochManagerHandle, EpochManagerError, EpochManagerReader};
use tari_shutdown::ShutdownSignal;
use tari_state_store_sqlite::SqliteStateStore;
use tari_template_lib::models::{EncryptedData, TemplateAddress, UnclaimedConfidentialOutputAddress};
use tokio::{task, task::JoinHandle, time};

use crate::template_manager::interface::{TemplateManagerError, TemplateManagerHandle, TemplateRegistration};
use crate::{
consensus_constants::ConsensusConstants,
template_manager::interface::{TemplateManagerError, TemplateManagerHandle, TemplateRegistration},
};

const LOG_TARGET: &str = "tari::dan::base_layer_scanner";

Expand All @@ -73,7 +69,7 @@ pub fn spawn(
template_manager: TemplateManagerHandle,
shutdown: ShutdownSignal,
consensus_constants: ConsensusConstants,
shard_store: SqliteShardStore,
shard_store: SqliteStateStore,
scan_base_layer: bool,
base_layer_scanning_interval: Duration,
) -> JoinHandle<anyhow::Result<()>> {
Expand Down Expand Up @@ -106,9 +102,10 @@ pub struct BaseLayerScanner {
template_manager: TemplateManagerHandle,
shutdown: ShutdownSignal,
consensus_constants: ConsensusConstants,
shard_store: SqliteShardStore,
state_store: SqliteStateStore,
scan_base_layer: bool,
base_layer_scanning_interval: Duration,
has_attempted_scan: bool,
}

impl BaseLayerScanner {
Expand All @@ -119,7 +116,7 @@ impl BaseLayerScanner {
template_manager: TemplateManagerHandle,
shutdown: ShutdownSignal,
consensus_constants: ConsensusConstants,
state_store: SqliteShardStore,
state_store: SqliteStateStore,
scan_base_layer: bool,
base_layer_scanning_interval: Duration,
) -> Self {
Expand All @@ -134,9 +131,10 @@ impl BaseLayerScanner {
template_manager,
shutdown,
consensus_constants,
shard_store: state_store,
state_store,
scan_base_layer,
base_layer_scanning_interval,
has_attempted_scan: false,
}
}

Expand Down Expand Up @@ -200,7 +198,7 @@ impl BaseLayerScanner {
self.sync_blockchain().await?;
},
BlockchainProgression::Reorged => {
warn!(
error!(
target: LOG_TARGET,
"⚠️ Base layer reorg detected. Rescanning from genesis."
);
Expand All @@ -211,9 +209,16 @@ impl BaseLayerScanner {
},
BlockchainProgression::NoProgress => {
trace!(target: LOG_TARGET, "No new blocks to scan.");
// If no progress has been made since restarting, we still need to tell the epoch manager that scanning
// is done
if !self.has_attempted_scan {
self.epoch_manager.notify_scanning_complete().await?;
}
},
}

self.has_attempted_scan = false;

Ok(())
}

Expand Down Expand Up @@ -319,7 +324,7 @@ impl BaseLayerScanner {
output_hash,
output.commitment.as_public_key()
);
self.register_burnt_utxo(&output)?;
self.register_burnt_utxo(&output).await?;
},
}
}
Expand Down Expand Up @@ -356,23 +361,39 @@ impl BaseLayerScanner {
Ok(())
}

fn register_burnt_utxo(&mut self, output: &TransactionOutput) -> Result<(), BaseLayerScannerError> {
async fn register_burnt_utxo(&mut self, output: &TransactionOutput) -> Result<(), BaseLayerScannerError> {
let address = SubstateAddress::UnclaimedConfidentialOutput(
UnclaimedConfidentialOutputAddress::try_from_commitment(output.commitment.as_bytes()).map_err(|e|
// Technically impossible, but anyway
BaseLayerScannerError::InvalidSideChainUtxoResponse(format!("Invalid commitment: {}", e)))?,
);

let substate = Substate::new(
0,
SubstateValue::UnclaimedConfidentialOutput(UnclaimedConfidentialOutput {
commitment: output.commitment.clone(),
encrypted_data: EncryptedData(output.encrypted_data.to_bytes()),
}),
);
let shard_id = ShardId::from_address(&address, 0);
self.shard_store
.with_write_tx(|tx| tx.save_burnt_utxo(&substate, address, shard_id))
let substate = SubstateValue::UnclaimedConfidentialOutput(UnclaimedConfidentialOutput {
commitment: output.commitment.clone(),
encrypted_data: EncryptedData(output.encrypted_data.to_bytes()),
});
let epoch = self.epoch_manager.current_epoch().await?;
self.state_store
.with_write_tx(|tx| {
let genesis = Block::genesis(epoch);

SubstateRecord {
address,
version: 0,
substate_value: substate,
state_hash: Default::default(),
created_by_transaction: Default::default(),
created_justify: *genesis.justify().id(),
created_block: *genesis.id(),
created_height: NodeHeight::zero(),
destroyed_by_transaction: None,
destroyed_justify: None,
destroyed_by_block: None,
created_at_epoch: Epoch(0),
destroyed_at_epoch: None,
}
.create(tx)
})
.map_err(|source| BaseLayerScannerError::CouldNotRegisterBurntUtxo {
commitment: Box::new(output.commitment.clone()),
source,
Expand Down
File renamed without changes.
3 changes: 2 additions & 1 deletion applications/tari_dan_app_utilities/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,6 @@
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

pub mod base_layer_scanner;
pub mod payload_processor;
pub mod consensus_constants;
pub mod template_manager;
pub mod transaction_executor;
101 changes: 101 additions & 0 deletions applications/tari_dan_app_utilities/src/transaction_executor.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
// Copyright 2023 The Tari Project
// SPDX-License-Identifier: BSD-3-Clause

use std::sync::Arc;

use tari_common_types::types::PublicKey;
use tari_crypto::tari_utilities::ByteArray;
use tari_dan_common_types::services::template_provider::TemplateProvider;
use tari_dan_engine::{
fees::{FeeModule, FeeTable},
packager::LoadedTemplate,
runtime::{AuthParams, ConsensusContext, RuntimeModule},
state_store::{memory::MemoryStateStore, StateStoreError},
transaction::{TransactionError, TransactionProcessor},
};
use tari_dan_storage::consensus_models::ExecutedTransaction;
use tari_engine_types::commit_result::{ExecuteResult, FinalizeResult, RejectReason};
use tari_template_lib::{crypto::RistrettoPublicKeyBytes, prelude::NonFungibleAddress};
use tari_transaction::Transaction;

pub trait TransactionExecutor {
type Error: Send + Sync + 'static;

fn execute(
&self,
transaction: Transaction,
state_store: MemoryStateStore,
consensus_context: ConsensusContext,
) -> Result<ExecutedTransaction, Self::Error>;
}

#[derive(Debug, Clone)]
pub struct TariDanTransactionProcessor<TTemplateProvider> {
template_provider: Arc<TTemplateProvider>,
fee_table: FeeTable,
}

impl<TTemplateProvider> TariDanTransactionProcessor<TTemplateProvider> {
pub fn new(template_provider: TTemplateProvider, fee_table: FeeTable) -> Self {
Self {
template_provider: Arc::new(template_provider),
fee_table,
}
}
}

impl<TTemplateProvider> TransactionExecutor for TariDanTransactionProcessor<TTemplateProvider>
where TTemplateProvider: TemplateProvider<Template = LoadedTemplate>
{
type Error = TransactionProcessorError;

fn execute(
&self,
transaction: Transaction,
state_store: MemoryStateStore,
consensus_context: ConsensusContext,
) -> Result<ExecutedTransaction, Self::Error> {
// Include ownership token for the signers of this in the auth scope
let owner_token = get_auth_token(transaction.signer_public_key());
let auth_params = AuthParams {
initial_ownership_proofs: vec![owner_token],
};

let initial_cost = 0;
let modules: Vec<Arc<dyn RuntimeModule<TTemplateProvider>>> =
vec![Arc::new(FeeModule::new(initial_cost, self.fee_table.clone()))];

let processor = TransactionProcessor::new(
self.template_provider.clone(),
state_store,
auth_params,
consensus_context,
modules,
);
let tx_id = transaction.hash();
let result = match processor.execute(transaction.clone()) {
Ok(result) => result,
Err(err) => ExecuteResult {
finalize: FinalizeResult::reject(tx_id, RejectReason::ExecutionFailure(err.to_string())),
transaction_failure: None,
fee_receipt: None,
},
};

Ok(ExecutedTransaction::new(transaction, result))
}
}

fn get_auth_token(public_key: &PublicKey) -> NonFungibleAddress {
let public_key =
RistrettoPublicKeyBytes::from_bytes(public_key.as_bytes()).expect("Expected public key to be 32 bytes");
NonFungibleAddress::from_public_key(public_key)
}

#[derive(Debug, thiserror::Error)]
pub enum TransactionProcessorError {
#[error(transparent)]
TransactionError(#[from] TransactionError),
#[error(transparent)]
StateStoreError(#[from] StateStoreError),
}
2 changes: 1 addition & 1 deletion applications/tari_dan_wallet_cli/src/command/account.rs
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,7 @@ pub async fn handle_reveal_funds(args: RevealFundsArgs, client: &mut WalletDaemo
})
.await?;

println!("Transaction: {}", resp.hash);
println!("Transaction: {}", resp.transaction_id);
println!("Fee: {}", resp.fee);
println!();
summarize_finalize_result(&resp.result);
Expand Down
Loading

0 comments on commit d3dd815

Please sign in to comment.