Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
sdbondi committed Jul 31, 2023
1 parent f403336 commit ac342ee
Show file tree
Hide file tree
Showing 54 changed files with 770 additions and 128 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 4 additions & 1 deletion applications/tari_validator_node/src/bootstrap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ use crate::{
services::{
comms_peer_provider::CommsPeerProvider,
mempool,
mempool::{FeeTransactionValidator, MempoolHandle, TemplateExistsValidator, Validator},
mempool::{FeeTransactionValidator, InputRefsValidator, MempoolHandle, TemplateExistsValidator, Validator},
messaging,
messaging::DanMessageReceivers,
networking,
Expand Down Expand Up @@ -185,6 +185,8 @@ pub async fn spawn_services(
if !config.validator_node.no_fees {
validator = validator.and_then(FeeTransactionValidator).boxed();
}
let after_executed_validator = InputRefsValidator::new();

let (tx_executed_transaction, rx_executed_transaction) = mpsc::channel(10);
let scanner = SubstateScanner::new(epoch_manager.clone(), validator_node_client_factory.clone());
let substate_resolver = TariSubstateResolver::new(state_store.clone(), scanner);
Expand All @@ -197,6 +199,7 @@ pub async fn spawn_services(
payload_processor.clone(),
substate_resolver.clone(),
validator,
after_executed_validator,
state_store.clone(),
);
handles.push(join_handle);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ use crate::{
substate_resolver::SubstateResolverError,
};

pub fn spawn<TExecutor, TValidator, TSubstateResolver>(
pub fn spawn<TExecutor, TValidator, TExecutedValidator, TSubstateResolver>(
new_transactions: mpsc::Receiver<Transaction>,
outbound: OutboundMessaging,
tx_executed_transactions: mpsc::Sender<ExecutedTransaction>,
Expand All @@ -47,10 +47,12 @@ pub fn spawn<TExecutor, TValidator, TSubstateResolver>(
transaction_executor: TExecutor,
substate_resolver: TSubstateResolver,
validator: TValidator,
after_executed_validator: TExecutedValidator,
state_store: SqliteStateStore,
) -> (MempoolHandle, JoinHandle<anyhow::Result<()>>)
where
TValidator: Validator<Transaction, Error = MempoolError> + Send + Sync + 'static,
TExecutedValidator: Validator<ExecutedTransaction, Error = MempoolError> + Send + Sync + 'static,
TExecutor: TransactionExecutor<Error = TransactionProcessorError> + Clone + Send + Sync + 'static,
TSubstateResolver: SubstateResolver<Error = SubstateResolverError> + Clone + Send + Sync + 'static,
{
Expand All @@ -66,6 +68,7 @@ where
transaction_executor,
substate_resolver,
validator,
after_executed_validator,
state_store,
);
let handle = MempoolHandle::new(tx_mempool_request);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,6 @@ pub use handle::{MempoolHandle, MempoolRequest};
mod initializer;
pub use initializer::spawn;

mod and_then;
pub use and_then::AndThen;

mod error;
mod executor;
mod service;
Expand Down
114 changes: 85 additions & 29 deletions applications/tari_validator_node/src/p2p/services/mempool/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,22 +20,29 @@
// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

use std::{collections::HashSet, fmt::Display, iter, sync::Arc};
use std::{
collections::{HashMap, HashSet},
fmt::Display,
iter,
sync::Arc,
};

use futures::{future::BoxFuture, stream::FuturesUnordered, FutureExt, StreamExt};
use log::*;
use tari_comms::NodeIdentity;
use tari_dan_app_utilities::transaction_executor::{TransactionExecutor, TransactionProcessorError};
use tari_dan_common_types::{Epoch, ShardId};
use tari_dan_common_types::{Epoch, NodeAddressable, ShardId};
use tari_dan_engine::runtime::ConsensusContext;
use tari_dan_p2p::{DanMessage, OutboundService};
use tari_dan_storage::{
consensus_models::{ExecutedTransaction, TransactionRecord},
consensus_models::{Block, Command, ExecutedTransaction, TransactionRecord, ValidatorFee},
StateStore,
StateStoreWriteTransaction,
};
use tari_engine_types::{epoch::Epoch, fee_claim::FeeClaimAddress, instruction::Instruction};
use tari_epoch_manager::{base_layer::EpochManagerHandle, EpochManagerReader};
use tari_state_store_sqlite::SqliteStateStore;
use tari_template_lib::{models::Amount, prelude::RistrettoPublicKeyBytes};
use tari_transaction::{Transaction, TransactionId};
use tokio::sync::{mpsc, oneshot};

Expand All @@ -56,7 +63,7 @@ use crate::{
const LOG_TARGET: &str = "tari::validator_node::mempool::service";

#[derive(Debug)]
pub struct MempoolService<TValidator, TExecutor, TSubstateResolver> {
pub struct MempoolService<TValidator, TExecutedValidator, TExecutor, TSubstateResolver> {
transactions: HashSet<TransactionId>,
pending_executions: FuturesUnordered<BoxFuture<'static, Result<ExecutionResult, MempoolError>>>,
new_transactions: mpsc::Receiver<Transaction>,
Expand All @@ -65,15 +72,18 @@ pub struct MempoolService<TValidator, TExecutor, TSubstateResolver> {
tx_executed_transactions: mpsc::Sender<ExecutedTransaction>,
epoch_manager: EpochManagerHandle,
node_identity: Arc<NodeIdentity>,
validator: TValidator,
before_execute_validator: TValidator,
after_execute_validator: TExecutedValidator,
transaction_executor: TExecutor,
substate_resolver: TSubstateResolver,
state_store: SqliteStateStore,
}

impl<TValidator, TExecutor, TSubstateResolver> MempoolService<TValidator, TExecutor, TSubstateResolver>
impl<TValidator, TExecutedValidator, TExecutor, TSubstateResolver>
MempoolService<TValidator, TExecutedValidator, TExecutor, TSubstateResolver>
where
TValidator: Validator<Transaction, Error = MempoolError>,
TExecutedValidator: Validator<ExecutedTransaction, Error = MempoolError>,
TExecutor: TransactionExecutor<Error = TransactionProcessorError> + Clone + Send + Sync + 'static,
TSubstateResolver: SubstateResolver<Error = SubstateResolverError> + Clone + Send + Sync + 'static,
{
Expand All @@ -86,7 +96,8 @@ where
node_identity: Arc<NodeIdentity>,
transaction_executor: TExecutor,
substate_resolver: TSubstateResolver,
validator: TValidator,
before_execute_validator: TValidator,
after_execute_validator: TExecutedValidator,
state_store: SqliteStateStore,
) -> Self {
Self {
Expand All @@ -100,7 +111,8 @@ where
node_identity,
transaction_executor,
substate_resolver,
validator,
before_execute_validator,
after_execute_validator,
state_store,
}
}
Expand Down Expand Up @@ -177,7 +189,7 @@ where
return Ok(());
}

self.validator.validate(&transaction).await?;
self.before_execute_validator.validate(&transaction).await?;

self.state_store
.with_write_tx(|tx| tx.transactions_insert(&transaction))?;
Expand All @@ -197,7 +209,7 @@ where
info!(target: LOG_TARGET, "🎱 New transaction in mempool");
self.transactions.insert(*transaction.id());
let current_epoch = self.epoch_manager.current_epoch().await?;
self.queue_transaction_for_execution(transaction, current_epoch);
self.queue_transaction_for_execution(transaction, current_epoch)?;
} else {
info!(
target: LOG_TARGET,
Expand All @@ -217,15 +229,76 @@ where
Ok(())
}

fn queue_transaction_for_execution(&mut self, transaction: Transaction, current_epoch: Epoch) {
fn queue_transaction_for_execution(
&mut self,
transaction: Transaction,
current_epoch: Epoch,
) -> Result<(), MempoolError> {
let substate_resolver = self.substate_resolver.clone();
let executor = self.transaction_executor.clone();
let consensus_context = ConsensusContext {
current_epoch: current_epoch.as_u64(),
permitted_fee_claims: self.get_permitted_fee_claims_for(&transaction)?,
signer: RistrettoPublicKeyBytes::from_bytes(&transaction.signer_public_key().as_bytes()).unwrap(),
};

self.pending_executions
.push(execute_transaction(transaction, substate_resolver, executor, consensus_context).boxed());

Ok(())
}

async fn get_permitted_fee_claims_for(
&self,
transaction: &Transaction,
) -> Result<HashMap<Epoch, Amount>, MempoolError> {
let claim_instructions = transaction
.instructions()
.iter()
.filter_map(|instruction| {
if let Some(Instruction::ClaimValidatorFees {
epoch,
validator_public_key,
}) = instruction
{
Some((Epoch(*epoch), validator_public_key.clone()))
} else {
None
}
})
.collect::<Vec<_>>();

if claim_instructions.is_empty() {
return Ok(HashMap::new());
}

// let mut claims = Vec::with_capacity(claim_instructions.len());
// for (epoch, validator) in claim_instructions {
// let vn = self.epoch_manager.get_validator_node(*epoch, &validator).await?;
// // TODO: add this when we add the delegated claim public key to the validator node registration
// // An alternative approach is to allow the validator to sign a message and return it to a allowlisted
// wallet // if transactions.signer_public_key() != v.delegated_claim_public_key {
// // return Err(MempoolError::SignerNotPermittedToClaimFees{epoch, signer, validator})
// // }
// claims.push((epoch, vn.shard_key));
// }

let mut permitted_fee_claims = HashMap::with_capacity(claims.len());
self.state_store.with_read_tx(|tx| {
for (epoch, proposed_by) in claim_instructions {
let validator_fee = ValidatorFee::get_validator_fee_for_epoch(tx, epoch, &proposed_by)?;
if validator_fee > 0 {
permitted_fee_claims.insert(
epoch,
Amount::try_from(validator_fee).expect("Fee greater than Amount::MAX"),
);
}
}

Ok::<_, MempoolError>(())
})?;

Ok(permitted_fee_claims)
}

async fn handle_execution_complete(
Expand All @@ -245,8 +318,7 @@ where
executed.result().finalize.result,
executed.execution_time()
);
// We refuse to process the transaction if any input_refs are downed
self.check_input_refs(&executed)?;
self.after_execute_validator.validate(&executed).await?;
// Fill the outputs that were missing so that we can propagate to output shards
self.fill_outputs(&mut executed);

Expand Down Expand Up @@ -287,22 +359,6 @@ where
Ok(())
}

fn check_input_refs(&self, executed: &ExecutedTransaction) -> Result<(), MempoolError> {
let Some(diff) = executed.result().finalize.result.accept().cloned() else {
return Ok(());
};

let is_input_refs_downed = diff
.down_iter()
.map(|(s, v)| ShardId::from_address(s, *v))
.any(|s| executed.transaction().input_refs().contains(&s));

if is_input_refs_downed {
return Err(MempoolError::InputRefsDowned);
}
Ok(())
}

fn fill_outputs(&mut self, executed: &mut ExecutedTransaction) {
let Some(diff) = executed.result().finalize.result.accept().cloned() else {
return;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
// Copyright 2023 The Tari Project
// SPDX-License-Identifier: BSD-3-Clause
// Copyright 2023 The Tari Project
// SPDX-License-Identifier: BSD-3-Clause

use async_trait::async_trait;

use crate::p2p::services::mempool::Validator;
use super::Validator;

pub struct AndThen<A, U> {
first: A,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
// Copyright 2023 The Tari Project
// SPDX-License-Identifier: BSD-3-Clause

use async_trait::async_trait;
use tari_transaction::Transaction;

use crate::p2p::services::mempool::{MempoolError, Validator};

#[derive(Debug)]
pub struct FeeTransactionValidator;

#[async_trait]
impl Validator<Transaction> for FeeTransactionValidator {
type Error = MempoolError;

async fn validate(&self, transaction: &Transaction) -> Result<(), MempoolError> {
if transaction.fee_instructions().is_empty() {
return Err(MempoolError::NoFeeInstructions);
}
Ok(())
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
// Copyright 2023 The Tari Project
// SPDX-License-Identifier: BSD-3-Clause

use tari_dan_common_types::ShardId;
use tari_dan_storage::consensus_models::ExecutedTransaction;

use crate::p2p::services::mempool::{MempoolError, Validator};

/// Refuse to process the transaction if any input_refs are downed
pub struct InputRefsValidator;

impl InputRefsValidator {
pub fn new() -> Self {
Self
}
}

impl Validator<ExecutedTransaction> for InputRefsValidator {
type Error = MempoolError;

async fn validate(&self, executed: &ExecutedTransaction) -> Result<(), Self::Error> {
let Some(diff) = executed.result().finalize.result.accept().cloned() else {
return Ok(());
};

let is_input_refs_downed = diff
.down_iter()
.map(|(s, v)| ShardId::from_address(s, *v))
.any(|s| executed.transaction().input_refs().contains(&s));

if is_input_refs_downed {
return Err(MempoolError::InputRefsDowned);
}
Ok(())
}
}
Loading

0 comments on commit ac342ee

Please sign in to comment.