diff --git a/dan_layer/consensus/src/hotstuff/on_receive_proposal.rs b/dan_layer/consensus/src/hotstuff/on_receive_proposal.rs index b08f87899a..eea6e8b5aa 100644 --- a/dan_layer/consensus/src/hotstuff/on_receive_proposal.rs +++ b/dan_layer/consensus/src/hotstuff/on_receive_proposal.rs @@ -11,6 +11,7 @@ use log::*; use tari_dan_common_types::{ committee::{Committee, CommitteeShard}, optional::Optional, + NodeAddressable, NodeHeight, }; use tari_dan_storage::{ @@ -23,6 +24,7 @@ use tari_dan_storage::{ HighQc, LastExecuted, LastVoted, + LeafBlock, LockedBlock, LockedOutput, QuorumDecision, @@ -152,7 +154,7 @@ where TConsensusSpec: ConsensusSpec }, HotStuffError::ProposalValidationError(err) => { warn!(target: LOG_TARGET, "❌ Block failed validation: {}", err); - // A bad block should not disrupt consensus + // A bad block should not cause a FAILURE state transition tx.rollback()?; return Ok(()); }, @@ -181,11 +183,13 @@ where TConsensusSpec: ConsensusSpec ); } - let missing_tuple = self.block_get_missing_transaction(&mut tx, &block)?; + let missing_tuple = self.handle_missing_transaction(&mut tx, &block)?; tx.commit()?; missing_tuple }; + let high_qc = self.store.with_read_tx(|tx| HighQc::get(tx))?; + if !missing_tx_ids.is_empty() { self.send_to_leader( &local_committee, @@ -196,12 +200,16 @@ where TConsensusSpec: ConsensusSpec transactions: missing_tx_ids, }), ) - .await?; + .await; + self.pacemaker + .reset_leader_timeout(block.height(), high_qc.block_height()) + .await?; return Ok(()); } if awaiting_execution.is_empty() { - if let Some(vote) = self.process_block(&local_committee_shard, &block).await? { + if let Some(decision) = self.decide_on_block(&local_committee_shard, &block)? { + let vote = self.generate_vote_message(&block, decision).await?; let high_qc = self.store.with_write_tx(|tx| { block.as_last_voted().set(tx)?; HighQc::get(tx.deref_mut()) @@ -209,14 +217,18 @@ where TConsensusSpec: ConsensusSpec self.pacemaker .reset_leader_timeout(block.height(), high_qc.block_height()) .await?; - self.send_vote_to_leader(&local_committee, vote, block.height()).await?; + self.send_vote_to_leader(&local_committee, vote, block.height()).await; } + } else { + self.pacemaker + .reset_leader_timeout(block.height(), high_qc.block_height()) + .await?; } Ok(()) } - fn block_get_missing_transaction( + fn handle_missing_transaction( &self, tx: &mut ::WriteTransaction<'_>, block: &Block, @@ -261,52 +273,28 @@ where TConsensusSpec: ConsensusSpec Ok(()) } - async fn process_block( + fn decide_on_block( &self, local_committee_shard: &CommitteeShard, block: &Block, - ) -> Result>, HotStuffError> { - let mut maybe_decision = None; - { - let mut tx = self.store.create_write_tx()?; + ) -> Result, HotStuffError> { + self.store.with_write_tx(|tx| { + let mut maybe_decision = None; let should_vote = self.should_vote(tx.deref_mut(), block)?; - let mut abort_transactions = Vec::new(); if should_vote { - (maybe_decision, abort_transactions) = - self.decide_what_to_vote(&mut tx, block, local_committee_shard)?; + maybe_decision = self.decide_what_to_vote(tx, block, local_committee_shard)?; } if maybe_decision.is_some() { block.update_nodes( - &mut tx, - |tx, _, block| self.on_lock_block(tx, block), + tx, + |tx, locked, block| self.on_lock_block(tx, locked, block, local_committee_shard), |tx, last_exec, commit_block| self.on_commit(tx, last_exec, commit_block, local_committee_shard), )?; - - tx.commit()?; - } else { - tx.rollback()?; - debug!( - target: LOG_TARGET, - "❌ ROLLBACK block {}. {} aborted transaction(s)", block, abort_transactions.len(), - ); - if !abort_transactions.is_empty() { - self.store.with_write_tx(|tx| { - for mut tx_rec in abort_transactions { - tx_rec.update_local_decision(tx, Decision::Abort)?; - } - Ok::<_, HotStuffError>(()) - })?; - } } - }; - - let mut maybe_vote = None; - if let Some(decision) = maybe_decision { - maybe_vote = Some(self.generate_vote_message(block, decision).await?); - } - Ok(maybe_vote) + Ok::<_, HotStuffError>(maybe_decision) + }) } /// if b_new .height > vheight && (b_new extends b_lock || b_new .justify.node.height > b_lock .height) @@ -370,7 +358,8 @@ where TConsensusSpec: ConsensusSpec let Some(t) = cmd.local_prepared() else { continue; }; - let Some(mut tx_rec) = self.transaction_pool.get(tx, &t.id).optional()? else { + let leaf = LeafBlock::get(tx.deref_mut())?; + let Some(mut tx_rec) = self.transaction_pool.get(tx, leaf, &t.id).optional()? else { continue; }; @@ -383,22 +372,30 @@ where TConsensusSpec: ConsensusSpec continue; } - tx_rec.update_evidence(tx, foreign_committee_shard, *block.justify().id())?; - let change_to_abort = cmd.decision().is_abort() && tx_rec.original_decision().is_commit(); - if change_to_abort { + let remote_decision = cmd.decision(); + let local_decision = tx_rec.current_local_decision(); + if remote_decision.is_abort() && local_decision.is_commit() { info!( target: LOG_TARGET, - "⚠️ Foreign shard ABORT {}. Update decision to ABORT", + "⚠️ Foreign shard ABORT {}. Update overall decision to ABORT", tx_rec.transaction_id() ); - tx_rec.update_remote_decision(tx, Decision::Abort)?; } + tx_rec + .add_evidence(foreign_committee_shard, *block.justify().id()) + .set_remote_decision(remote_decision); + // If all shards are complete and we've already received our LocalPrepared, we can set out LocalPrepared // transaction as ready to propose ACCEPT. If we have not received the local LocalPrepared, the transition // will happen when we receive the local block. if tx_rec.current_stage().is_local_prepared() && tx_rec.transaction().evidence.all_shards_complete() { - tx_rec.pending_transition(tx, TransactionPoolStage::LocalPrepared, true)?; + tx_rec.add_pending_status_update( + tx, + block.as_leaf_block(), + TransactionPoolStage::LocalPrepared, + true, + )?; } } @@ -410,14 +407,14 @@ where TConsensusSpec: ConsensusSpec local_committee: &Committee, height: NodeHeight, message: HotstuffMessage, - ) -> Result<(), HotStuffError> { + ) { let leader = self.leader_strategy.get_leader(local_committee, height); - self.tx_leader - .send((leader.clone(), message)) - .await - .map_err(|_| HotStuffError::InternalChannelClosed { - context: "tx_leader in OnReceiveProposalHandler::send_to_leader", - }) + if self.tx_leader.send((leader.clone(), message)).await.is_err() { + debug!( + target: LOG_TARGET, + "tx_leader in OnReceiveProposalHandler::send_to_leader is closed", + ); + } } async fn send_vote_to_leader( @@ -425,7 +422,7 @@ where TConsensusSpec: ConsensusSpec local_committee: &Committee, vote: VoteMessage, height: NodeHeight, - ) -> Result<(), HotStuffError> { + ) { let leader = self.leader_strategy.get_leader_for_next_block(local_committee, height); info!( target: LOG_TARGET, @@ -434,12 +431,17 @@ where TConsensusSpec: ConsensusSpec vote.block_id, leader, ); - self.tx_leader + if self + .tx_leader .send((leader.clone(), HotstuffMessage::Vote(vote))) .await - .map_err(|_| HotStuffError::InternalChannelClosed { - context: "tx_leader in OnReceiveProposalHandler::send_vote_to_leader", - }) + .is_err() + { + debug!( + target: LOG_TARGET, + "tx_leader in OnReceiveProposalHandler::send_vote_to_leader is closed", + ); + } } #[allow(clippy::too_many_lines)] @@ -448,26 +450,41 @@ where TConsensusSpec: ConsensusSpec tx: &mut ::WriteTransaction<'_>, block: &Block, local_committee_shard: &CommitteeShard, - ) -> Result<(Option, Vec), HotStuffError> { + ) -> Result, HotStuffError> { let mut total_leader_fee = 0; - let mut abort_transactions = Vec::new(); let mut decision = BlockDecision::vote_accept(); for cmd in block.commands() { - let Some(mut tx_rec) = self.transaction_pool.get(tx, cmd.transaction_id()).optional()? else { + let Some(mut tx_rec) = self + .transaction_pool + .get(tx, block.as_leaf_block(), cmd.transaction_id()) + .optional()? + else { warn!( target: LOG_TARGET, - "⚠️ Local proposal received ({}) for transaction {} which is not in the pool. Ignoring.", + "⚠️ BUG(should_not_happen): Local proposal received ({}) for transaction {} which is not in the pool. Not voting.", block, cmd.transaction_id(), ); decision.dont_vote(); continue; }; + + if !tx_rec.is_ready() { + warn!( + target: LOG_TARGET, + "⚠️ Local proposal received ({}) for transaction {} which is not ready. Not voting.", + block, + cmd.transaction_id(), + ); + decision.dont_vote(); + continue; + } + // TODO: we probably need to provide the all/some of the QCs referenced in local transactions as // part of the proposal DanMessage so that there is no race condition between receiving the // proposed block and receiving the foreign proposals. Because this is only added on locked block, // this should be less common. - tx_rec.update_evidence(tx, local_committee_shard, *block.justify().id())?; + tx_rec.add_evidence(local_committee_shard, *block.justify().id()); debug!( target: LOG_TARGET, @@ -502,54 +519,55 @@ where TConsensusSpec: ConsensusSpec } if tx_rec.current_decision() == t.decision { - // We allow blocks to ask us to prepare more than once - only lock objects if the stage is New - if tx_rec.current_stage().is_new() { - if tx_rec.current_decision().is_commit() { - let transaction = ExecutedTransaction::get(tx.deref_mut(), cmd.transaction_id())?; - // Lock all inputs for the transaction as part of LocalPrepare - if !self.lock_inputs(tx, transaction.transaction(), local_committee_shard)? { - // Unable to lock all inputs - do not vote - warn!( - target: LOG_TARGET, - "❌ Unable to lock all inputs for transaction {} in block {}. Leader proposed {}, we decided {}", - block.id(), - transaction.id(), - t.decision, - Decision::Abort - ); - // We change our decision to ABORT so that the next time we propose/receive a - // proposal we will check for ABORT. It may - // happen that the transaction causing the lock failure - // is ABORTED too and the locks released allowing this transaction to succeed. - // Currently, the client would have to resubmit the transaction to resolve this. - // tx_rec.update_local_decision(tx, Decision::Abort)?; - abort_transactions.push(tx_rec); - // This brings up an interesting problem. If we decide to abstain from voting, then - // object conflicts essentially induce leader failures. This is problematic since it - // puts leader failure under the control of users and potentially malicious parties. - decision.dont_vote(); - continue; - } - if !self.lock_outputs(tx, block.id(), &transaction)? { - // Unable to lock all outputs - do not vote - warn!( - target: LOG_TARGET, - "❌ Unable to lock all outputs for transaction {} in block {}. Leader proposed {}, we decided {}", - block.id(), - transaction.id(), - t.decision, - Decision::Abort - ); - // We change our decision to ABORT so that the next time we propose/receive a - // proposal we will check for ABORT - abort_transactions.push(tx_rec); - // tx_rec.update_local_decision(tx, Decision::Abort)?; - decision.dont_vote(); - continue; - } + if tx_rec.current_stage().is_new() && tx_rec.current_decision().is_commit() { + let transaction = ExecutedTransaction::get(tx.deref_mut(), cmd.transaction_id())?; + // Lock all inputs for the transaction as part of LocalPrepare + if !self.lock_inputs(tx, transaction.transaction(), local_committee_shard)? { + // Unable to lock all inputs - do not vote + warn!( + target: LOG_TARGET, + "❌ Unable to lock all inputs for transaction {} in block {}. Leader proposed {}, we decided {}", + block.id(), + transaction.id(), + t.decision, + Decision::Abort + ); + // We change our decision to ABORT so that the next time we propose/receive a + // proposal we will check for ABORT. It may + // happen that the transaction causing the lock failure + // is ABORTED too and the locks released allowing this transaction to succeed. + // Currently, the client would have to resubmit the transaction to resolve this. + // tx_rec.set_local_decision(Decision::Abort)?; + + // abort_transactions.push(tx_rec); + // This brings up an interesting problem. If we decide to abstain from voting, then + // object conflicts essentially induce leader failures. This is problematic since it + // puts leader failure under the control of users and potentially malicious parties. + decision.dont_vote(); + } else if !self.lock_outputs(tx, block.id(), &transaction)? { + // Unable to lock all outputs - do not vote + warn!( + target: LOG_TARGET, + "❌ Unable to lock all outputs for transaction {} in block {}. Leader proposed {}, we decided {}", + block.id(), + transaction.id(), + t.decision, + Decision::Abort + ); + // We change our decision to ABORT so that the next time we propose/receive a + // proposal we will check for ABORT + // tx_rec.set_local_decision(Decision::Abort)?; + decision.dont_vote(); } + } - tx_rec.pending_transition(tx, TransactionPoolStage::Prepared, true)?; + if decision.is_accept() { + tx_rec.add_pending_status_update( + tx, + block.as_leaf_block(), + TransactionPoolStage::Prepared, + true, + )?; } } else { // If we disagree with any local decision we abstain from voting @@ -561,7 +579,6 @@ where TConsensusSpec: ConsensusSpec tx_rec.current_decision() ); decision.dont_vote(); - continue; } }, Command::LocalPrepared(t) => { @@ -569,16 +586,16 @@ where TConsensusSpec: ConsensusSpec // We only mark the next step (Accept) as ready to propose once all shards have reported // LocalPrepared. - if !tx_rec.current_stage().is_prepared() { + if !tx_rec.current_stage().is_prepared() && !tx_rec.current_stage().is_local_prepared() { warn!( target: LOG_TARGET, - "❌ Stage disagreement in block {} for transaction {}. Leader proposed LocalPrepared, but local stage is {}", + "{} ❌ Stage disagreement in block {} for transaction {}. Leader proposed LocalPrepared, but local stage is {}", + self.validator_addr, block.id(), tx_rec.transaction_id(), tx_rec.current_stage() ); decision.dont_vote(); - continue; } // We check that the leader decision is the same as our local decision. // We disregard the remote decision because not all validators may have received the foreign @@ -594,7 +611,6 @@ where TConsensusSpec: ConsensusSpec ); decision.dont_vote(); - continue; } if tx_rec.transaction().transaction_fee != t.transaction_fee { @@ -606,19 +622,21 @@ where TConsensusSpec: ConsensusSpec tx_rec.transaction().transaction_fee ); decision.dont_vote(); - continue; } - tx_rec.pending_transition( - tx, - TransactionPoolStage::LocalPrepared, - tx_rec.transaction().evidence.all_shards_complete(), - )?; + if decision.is_accept() { + tx_rec.add_pending_status_update( + tx, + block.as_leaf_block(), + TransactionPoolStage::LocalPrepared, + tx_rec.transaction().evidence.all_shards_complete(), + )?; + } }, Command::Accept(t) => { // Happy path: We've validated all the QCs and therefore are convinced that everyone also received // LocalPrepare. We then propose new blocks until we have a 3-chain - if !tx_rec.current_stage().is_local_prepared() { + if !tx_rec.current_stage().is_local_prepared() && !tx_rec.current_stage().is_accepted() { warn!( target: LOG_TARGET, "❌ Stage disagreement for tx {} in block {}. Leader proposed Accept, local stage {}", @@ -693,9 +711,19 @@ where TConsensusSpec: ConsensusSpec // and we decide COMMIT, we set SomePrepared, otherwise AllPrepared. There are no further stages // after these, so these MUST never be ready to propose. if tx_rec.remote_decision().map(|d| d.is_abort()).unwrap_or(false) { - tx_rec.pending_transition(tx, TransactionPoolStage::SomePrepared, false)?; + tx_rec.add_pending_status_update( + tx, + block.as_leaf_block(), + TransactionPoolStage::SomePrepared, + false, + )?; } else { - tx_rec.pending_transition(tx, TransactionPoolStage::AllPrepared, false)?; + tx_rec.add_pending_status_update( + tx, + block.as_leaf_block(), + TransactionPoolStage::AllPrepared, + false, + )?; } }, } @@ -715,7 +743,7 @@ where TConsensusSpec: ConsensusSpec decision.dont_vote(); } - Ok((decision.as_quorum_decision(), abort_transactions)) + Ok(decision.as_quorum_decision()) } fn lock_inputs( @@ -765,6 +793,51 @@ where TConsensusSpec: ConsensusSpec Ok(true) } + fn check_lock_inputs( + &self, + tx: &mut ::ReadTransaction<'_>, + transaction: &Transaction, + local_committee_shard: &CommitteeShard, + ) -> Result { + let state = SubstateRecord::check_lock_all( + tx, + local_committee_shard.filter(transaction.inputs().iter().chain(transaction.filled_inputs())), + SubstateLockFlag::Write, + )?; + if !state.is_acquired() { + warn!( + target: LOG_TARGET, + "❌ Unable to write lock all inputs for transaction {}: {:?}", + transaction.id(), + state, + ); + return Ok(false); + } + let state = SubstateRecord::check_lock_all( + tx, + local_committee_shard.filter(transaction.input_refs()), + SubstateLockFlag::Read, + )?; + + if !state.is_acquired() { + warn!( + target: LOG_TARGET, + "❌ Unable to read lock all input refs for transaction {}: {:?}", + transaction.id(), + state, + ); + return Ok(false); + } + + debug!( + target: LOG_TARGET, + "🔒️ Locked inputs for transaction {}", + transaction.id(), + ); + + Ok(true) + } + fn unlock_inputs( &self, tx: &mut ::WriteTransaction<'_>, @@ -809,6 +882,20 @@ where TConsensusSpec: ConsensusSpec Ok(true) } + fn check_lock_outputs( + &self, + tx: &mut ::ReadTransaction<'_>, + transaction: &ExecutedTransaction, + ) -> Result { + let state = LockedOutput::check_locks(tx, transaction.resulting_outputs())?; + + if !state.is_acquired() { + return Ok(false); + } + + Ok(true) + } + fn unlock_outputs( &self, tx: &mut ::WriteTransaction<'_>, @@ -874,33 +961,46 @@ where TConsensusSpec: ConsensusSpec fn on_lock_block( &self, tx: &mut ::WriteTransaction<'_>, + locked: &LockedBlock, block: &Block, + local_committee_shard: &CommitteeShard, ) -> Result<(), HotStuffError> { - info!( - target: LOG_TARGET, - "🔒️ LOCKED BLOCK: {} {}", - block.height(), - block.id() - ); - - // self.process_commands(tx, local_committee_shard, block)?; + if locked.height < block.height() { + info!( + target: LOG_TARGET, + "🔒️ LOCKED BLOCK: {} {}", + block.height(), + block.id() + ); - // This moves the stage update from pending to current for all transactions on on the locked block - self.transaction_pool - .confirm_all_transitions(tx, block.all_transaction_ids())?; + let parent = block.get_parent(tx.deref_mut())?; + self.on_lock_block(tx, locked, &parent, local_committee_shard)?; + + // self.processed_locked_commands(tx, local_committee_shard, block)?; + // This moves the stage update from pending to current for all transactions on on the locked block + self.transaction_pool.confirm_all_transitions( + tx, + locked, + &block.as_locked_block(), + block.all_transaction_ids(), + )?; + } Ok(()) } - // fn process_commands( + // fn processed_locked_commands( // &self, // tx: &mut ::WriteTransaction<'_>, // local_committee_shard: &CommitteeShard, // block: &Block, // ) -> Result<(), HotStuffError> { for cmd in block.commands() { let mut tx_rec = self.transaction_pool.get(tx, - // cmd.transaction_id())?; // TODO: we probably need to provide the all/some of the QCs referenced in local - // transactions as // part of the proposal DanMessage so that there is no race condition between receiving - // the // proposed block and receiving the foreign proposals. Because this is only added on locked block, // - // this should be less common. tx_rec.update_evidence(tx, local_committee_shard, *block.justify().id())?; + // cmd.transaction_id())?; + // + // // TODO: we probably need to provide the all/some of the QCs referenced in local transactions as + // // part of the proposal DanMessage so that there is no race condition between receiving the + // // proposed block and receiving the foreign proposals. Because this is only added on locked block, + // // this should be less common. + // tx_rec.update_evidence(tx, local_committee_shard, *block.justify().id())?; // // match cmd { // Command::Prepare(_) => { @@ -950,7 +1050,9 @@ where TConsensusSpec: ConsensusSpec // TODO: Check if it's ok to unlock the inputs for ABORT at this point }, Command::Accept(t) => { - let tx_rec = self.transaction_pool.get(tx, cmd.transaction_id())?; + let tx_rec = self + .transaction_pool + .get(tx, block.as_leaf_block(), cmd.transaction_id())?; debug!( target: LOG_TARGET, "Transaction {} is finalized ({})", tx_rec.transaction_id(), t.decision @@ -1031,6 +1133,14 @@ where TConsensusSpec: ConsensusSpec } self.validate_proposed_block(from, candidate_block)?; + if Block::has_been_processed(tx.deref_mut(), candidate_block.id())? { + return Err(ProposalValidationError::BlockAlreadyProcessed { + block_id: *candidate_block.id(), + height: candidate_block.height(), + } + .into()); + } + // Check that details included in the justify match previously added blocks let Some(justify_block) = candidate_block.justify().get_block(tx.deref_mut()).optional()? else { // This will trigger a sync @@ -1071,15 +1181,15 @@ where TConsensusSpec: ConsensusSpec } // Part of the safenode predicate. Exclude this block early if this is the case - let locked_block = LockedBlock::get(tx.deref_mut())?; - if !locked_block.block_id.is_genesis() && candidate_block.justify().block_height() <= locked_block.height() { - return Err(ProposalValidationError::CandidateBlockNotHigherThanLockedBlock { - proposed_by: from.to_string(), - locked_block, - candidate_block: candidate_block.as_leaf_block(), - } - .into()); - } + // let locked_block = LockedBlock::get(tx.deref_mut())?; + // if !locked_block.block_id.is_genesis() && candidate_block.justify().block_height() <= locked_block.height() { + // return Err(ProposalValidationError::CandidateBlockNotHigherThanLockedBlock { + // proposed_by: from.to_string(), + // locked_block, + // candidate_block: candidate_block.as_leaf_block(), + // } + // .into()); + // } // candidate_block.justify().update_high_qc(tx)?; @@ -1175,11 +1285,11 @@ where TConsensusSpec: ConsensusSpec // if child_block.is_processed() { // self.transaction_pool.rollback_pending_stages(tx, &child_block)?; // } - // if *child_block.proposed_by() == self.validator_addr { - // child_block.as_last_proposed().unset(tx)?; - // } - // child_block.as_last_voted().unset(tx)?; - // child_block.as_leaf_block().unset(tx)?; + // // if *child_block.proposed_by() == self.validator_addr { + // // child_block.as_last_proposed().unset(tx)?; + // // } + // // child_block.as_last_voted().unset(tx)?; + // // child_block.as_leaf_block().unset(tx)?; // child_block.unset_as_processed(tx)?; // // self.undo_other_chains(tx, &child_block)?; diff --git a/dan_layer/consensus_tests/src/consensus.rs b/dan_layer/consensus_tests/src/consensus.rs index b7b827320b..425b3cb286 100644 --- a/dan_layer/consensus_tests/src/consensus.rs +++ b/dan_layer/consensus_tests/src/consensus.rs @@ -242,8 +242,12 @@ async fn multi_shard_propose_blocks_with_new_transactions_until_all_committed() async fn foreign_shard_decides_to_abort() { setup_logger(); let mut test = Test::builder() - .add_committee(0, vec!["1", "3", "4"]) - .add_committee(1, vec!["2", "5", "6"]) + .debug_sql("/tmp/test{}.db") + .disable_timeout() + .add_committee(0, vec!["1"]) + .add_committee(1, vec!["2"]) + // .add_committee(0, vec!["1", "3", "4"]) + // .add_committee(1, vec!["2", "5", "6"]) .start() .await; @@ -380,7 +384,7 @@ async fn leader_failure_node_goes_down() { break; } - if committed_height > NodeHeight(20) { + if committed_height > NodeHeight(40) { panic!("Not all transaction committed after {} blocks", committed_height); } } diff --git a/dan_layer/consensus_tests/src/support/harness.rs b/dan_layer/consensus_tests/src/support/harness.rs index 3d7ca1f083..45754597a1 100644 --- a/dan_layer/consensus_tests/src/support/harness.rs +++ b/dan_layer/consensus_tests/src/support/harness.rs @@ -35,7 +35,7 @@ pub struct Test { _leader_strategy: RoundRobinLeaderStrategy, epoch_manager: TestEpochManager, shutdown: Shutdown, - timeout: Duration, + timeout: Option, } impl Test { @@ -74,9 +74,13 @@ impl Test { pub async fn on_block_committed(&mut self) -> (BlockId, NodeHeight) { loop { - let event = timeout(self.timeout, self.on_hotstuff_event()) - .await - .unwrap_or_else(|_| panic!("Timeout waiting for Hotstuff event")); + let event = if let Some(timeout) = self.timeout { + tokio::time::timeout(timeout, self.on_hotstuff_event()) + .await + .unwrap_or_else(|_| panic!("Timeout waiting for Hotstuff event")) + } else { + self.on_hotstuff_event().await + }; match event { HotstuffEvent::BlockCommitted { block_id, height } => return (block_id, height), HotstuffEvent::Failure { message } => panic!("Consensus failure: {}", message), @@ -280,7 +284,7 @@ impl Test { pub struct TestBuilder { committees: HashMap>, sql_address: String, - timeout: Duration, + timeout: Option, debug_sql_file: Option, } @@ -289,11 +293,17 @@ impl TestBuilder { Self { committees: HashMap::new(), sql_address: ":memory:".to_string(), - timeout: Duration::from_secs(10), + timeout: Some(Duration::from_secs(10)), debug_sql_file: None, } } + #[allow(dead_code)] + pub fn disable_timeout(&mut self) -> &mut Self { + self.timeout = None; + self + } + #[allow(dead_code)] pub fn debug_sql>(&mut self, path: P) -> &mut Self { self.debug_sql_file = Some(path.into()); @@ -307,7 +317,7 @@ impl TestBuilder { } pub fn with_test_timeout(&mut self, timeout: Duration) -> &mut Self { - self.timeout = timeout; + self.timeout = Some(timeout); self } diff --git a/dan_layer/state_store_sqlite/migrations/2023-06-08-091819_create_state_store/up.sql b/dan_layer/state_store_sqlite/migrations/2023-06-08-091819_create_state_store/up.sql index dc0e9e989a..e0bd1aec38 100644 --- a/dan_layer/state_store_sqlite/migrations/2023-06-08-091819_create_state_store/up.sql +++ b/dan_layer/state_store_sqlite/migrations/2023-06-08-091819_create_state_store/up.sql @@ -16,7 +16,7 @@ create table blocks ( id integer not null primary key AUTOINCREMENT, block_id text not NULL, - parent_block_id text NULL, + parent_block_id text not NULL, height bigint not NULL, epoch bigint not NULL, proposed_by text not NULL, @@ -159,19 +159,22 @@ create table transaction_pool create unique index transaction_pool_uniq_idx_transaction_id on transaction_pool (transaction_id); create index transaction_pool_idx_is_ready on transaction_pool (is_ready); -create table transaction_pool_status +create table transaction_pool_state_updates ( - id integer not null primary key AUTOINCREMENT, - block_id text not null, - block_height bigint not null, - transaction_id text not null, - stage text not null, - is_ready boolean not null, - created_at timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, + id integer not null primary key AUTOINCREMENT, + block_id text not null, + block_height bigint not null, + transaction_id text not null, + stage text not null, + evidence text not null, + is_ready boolean not null, + local_decision text not null, + remote_decision text null, + created_at timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY (block_id) REFERENCES blocks (block_id), FOREIGN KEY (transaction_id) REFERENCES transactions (transaction_id) ); -create index transaction_pool_idx_block_id_transaction_id on transaction_pool_status (block_id, transaction_id); +create unique index transaction_pool_uniq_block_id_transaction_id on transaction_pool_state_updates (block_id, transaction_id); create table locked_outputs ( @@ -214,3 +217,66 @@ CREATE TABLE missing_transactions is_awaiting_execution boolean not NULL, created_at timestamp not NULL DEFAULT CURRENT_TIMESTAMP ); + + +-- Debug Triggers +CREATE TABLE transaction_pool_history +( + history_id INTEGER PRIMARY KEY, + id integer not null, + transaction_id text not null, + original_decision text not null, + local_decision text null, + remote_decision text null, + evidence text not null, + transaction_fee bigint not null, + leader_fee bigint not null, + stage text not null, + new_stage text not null, + pending_stage text null, + new_pending_stage text null, + is_ready boolean not null, + new_is_ready boolean not null, + updated_at timestamp NOT NULL, + created_at timestamp NOT NULL, + change_time DATETIME DEFAULT (STRFTIME('%Y-%m-%d %H:%M:%f', 'NOW')) +); + +CREATE TRIGGER copy_transaction_pool_history + AFTER UPDATE + ON transaction_pool + FOR EACH ROW +BEGIN + INSERT INTO transaction_pool_history (id, + transaction_id, + original_decision, + local_decision, + remote_decision, + evidence, + transaction_fee, + leader_fee, + stage, + new_stage, + pending_stage, + new_pending_stage, + is_ready, + new_is_ready, + updated_at, + created_at) + VALUES (OLD.id, + OLD.transaction_id, + OLD.original_decision, + OLD.local_decision, + OLD.remote_decision, + OLD.evidence, + OLD.transaction_fee, + OLD.leader_fee, + OLD.stage, + NEW.stage, + OLD.pending_stage, + NEW.pending_stage, + OLD.is_ready, + NEW.is_ready, + OLD.updated_at, + OLD.created_at); +END; \ No newline at end of file diff --git a/dan_layer/state_store_sqlite/src/reader.rs b/dan_layer/state_store_sqlite/src/reader.rs index 7f27ec323e..aaee28c44d 100644 --- a/dan_layer/state_store_sqlite/src/reader.rs +++ b/dan_layer/state_store_sqlite/src/reader.rs @@ -1,10 +1,16 @@ // Copyright 2023 The Tari Project // SPDX-License-Identifier: BSD-3-Clause -use std::{borrow::Borrow, collections::HashSet, marker::PhantomData, ops::RangeInclusive}; +use std::{ + borrow::Borrow, + collections::{HashMap, HashSet}, + marker::PhantomData, + ops::RangeInclusive, +}; use bigdecimal::{BigDecimal, ToPrimitive}; use diesel::{ + query_builder::SqlQuery, sql_query, sql_types::{BigInt, Text}, BoolExpressionMethods, @@ -32,6 +38,8 @@ use tari_dan_storage::{ LockedBlock, QcId, QuorumCertificate, + SubstateLockFlag, + SubstateLockState, SubstateRecord, TransactionPoolRecord, TransactionPoolStage, @@ -79,6 +87,149 @@ impl<'a, TAddr> SqliteStateStoreReadTransaction<'a, TAddr> { } } +impl<'a, TAddr: NodeAddressable + Serialize + DeserializeOwned> SqliteStateStoreReadTransaction<'a, TAddr> { + pub(super) fn get_transaction_atom_state_updates_between_blocks<'i, ITx>( + &mut self, + from_block_id: &BlockId, + to_block_id: &BlockId, + transaction_ids: ITx, + ) -> Result, SqliteStorageError> + where + ITx: Iterator + ExactSizeIterator, + { + if transaction_ids.len() == 0 { + return Ok(HashMap::new()); + } + + let applicable_block_ids = self.get_block_ids_that_change_state_between(from_block_id, to_block_id)?; + + debug!( + target: LOG_TARGET, + "get_transaction_atom_state_updates_between_blocks: from_block_id={}, to_block_id={}, len(applicable_block_ids)={}", + from_block_id, + to_block_id, + applicable_block_ids.len()); + + if applicable_block_ids.is_empty() { + return Ok(HashMap::new()); + } + + self.create_transaction_atom_updates_query(transaction_ids, applicable_block_ids.iter().map(|s| s.as_str())) + .load_iter::(self.connection()) + .map_err(|e| SqliteStorageError::DieselError { + operation: "transaction_pool_get_many_ready", + source: e, + })? + .map(|update| update.map(|u| (u.transaction_id.clone(), u))) + .collect::>>() + .map_err(|e| SqliteStorageError::DieselError { + operation: "transaction_pool_get_many_ready", + source: e, + }) + } + + /// Creates a query to select the latest transaction pool state updates for the given transaction ids and block ids. + /// WARNING: This method does not protect against SQL-injection, Be sure that the transaction ids and block ids + /// strings are what they are meant to be. + fn create_transaction_atom_updates_query< + 'i1, + 'i2, + IBlk: Iterator + ExactSizeIterator, + ITx: Iterator + ExactSizeIterator, + >( + &mut self, + transaction_ids: ITx, + block_ids: IBlk, + ) -> SqlQuery { + // Unfortunate hack. Binding array types in diesel is only supported for postgres. + sql_query(format!( + r#" + WITH RankedResults AS ( + SELECT + tpsu.*, + ROW_NUMBER() OVER (PARTITION BY tpsu.transaction_id ORDER BY tpsu.block_height DESC) AS `rank` + FROM + transaction_pool_state_updates AS tpsu + WHERE + tpsu.block_id in ({}) + AND tpsu.transaction_id in ({}) + ) + SELECT + id, + block_id, + block_height, + transaction_id, + stage, + evidence, + is_ready, + local_decision, + remote_decision, + created_at + FROM + RankedResults + WHERE + rank = 1; + "#, + self.sql_frag_for_in_statement(block_ids, BlockId::byte_size() * 2), + self.sql_frag_for_in_statement(transaction_ids, TransactionId::byte_size() * 2), + )) + } + + fn sql_frag_for_in_statement<'i, I: Iterator + ExactSizeIterator>( + &self, + values: I, + item_size: usize, + ) -> String { + let len = values.len(); + let mut sql_frag = String::with_capacity(len * item_size + len * 3 + len - 1); + for (i, value) in values.enumerate() { + sql_frag.push('"'); + sql_frag.push_str(value); + sql_frag.push('"'); + if i < len - 1 { + sql_frag.push(','); + } + } + sql_frag + } + + fn get_block_ids_that_change_state_between( + &mut self, + start_block: &BlockId, + end_block: &BlockId, + ) -> Result, SqliteStorageError> { + let applicable_block_ids = sql_query( + r#" + WITH RECURSIVE tree(bid, parent, is_dummy, command_count) AS ( + SELECT block_id, parent_block_id, is_dummy, command_count FROM blocks where block_id = ? + UNION ALL + SELECT block_id, parent_block_id, blocks.is_dummy, blocks.command_count + FROM blocks JOIN tree ON + block_id = tree.parent + AND tree.bid != ? + LIMIT 1000 + ) + SELECT bid FROM tree where is_dummy = 0 AND command_count > 0"#, + ) + .bind::(serialize_hex(end_block)) + .bind::(serialize_hex(start_block)) + .load_iter::(self.connection()) + .map_err(|e| SqliteStorageError::DieselError { + operation: "get_block_ids_that_change_state_between", + source: e, + })?; + + applicable_block_ids + .map(|b| { + b.map(|b| b.bid).map_err(|e| SqliteStorageError::DieselError { + operation: "get_block_ids_that_change_state_between", + source: e, + }) + }) + .collect() + } +} + impl StateStoreReadTransaction for SqliteStateStoreReadTransaction<'_, TAddr> { @@ -701,32 +852,104 @@ impl StateStoreReadTransa Vote::try_from(vote) } - fn transaction_pool_get(&mut self, transaction_id: &TransactionId) -> Result { + fn transaction_pool_get( + &mut self, + from_block_id: &BlockId, + to_block_id: &BlockId, + transaction_id: &TransactionId, + ) -> Result { use crate::schema::transaction_pool; - transaction_pool::table - .filter(transaction_pool::transaction_id.eq(serialize_hex(transaction_id))) + let transaction_id = serialize_hex(transaction_id); + let mut updates = self.get_transaction_atom_state_updates_between_blocks( + from_block_id, + to_block_id, + std::iter::once(transaction_id.as_str()), + )?; + + debug!( + target: LOG_TARGET, + "transaction_pool_get: from_block_id={}, to_block_id={}, transaction_id={}, updates={}", + from_block_id, + to_block_id, + transaction_id, + updates.len() + ); + + let mut rec = transaction_pool::table + .filter(transaction_pool::transaction_id.eq(&transaction_id)) .first::(self.connection()) .map_err(|e| SqliteStorageError::DieselError { operation: "transaction_pool_get", source: e, - })? - .try_into() + })?; + + if let Some(update) = updates.remove(&transaction_id) { + rec.merge_update(update); + } + rec.try_into() + } + + fn transaction_pool_exists(&mut self, transaction_id: &TransactionId) -> Result { + use crate::schema::transaction_pool; + + let count = transaction_pool::table + .count() + .filter(transaction_pool::transaction_id.eq(serialize_hex(transaction_id))) + .first::(self.connection()) + .map_err(|e| SqliteStorageError::DieselError { + operation: "transaction_pool_get", + source: e, + })?; + + Ok(count > 0) } fn transaction_pool_get_many_ready(&mut self, max_txs: usize) -> Result, StorageError> { use crate::schema::transaction_pool; - transaction_pool::table - .filter(transaction_pool::is_ready.eq(true)) - .limit(max_txs as i64) + let ready_txs = transaction_pool::table + // .filter(transaction_pool::is_ready.eq(true)) + // .limit(max_txs as i64) .get_results::(self.connection()) .map_err(|e| SqliteStorageError::DieselError { operation: "transaction_pool_get_many_ready", source: e, - })? + })?; + + if ready_txs.is_empty() { + return Ok(Vec::new()); + } + + // Fetch all applicable block ids between the locked block and the given block + let locked = self.locked_block_get()?; + let leaf = self.leaf_block_get()?; + + let mut updates = self.get_transaction_atom_state_updates_between_blocks( + &locked.block_id, + &leaf.block_id, + ready_txs.iter().map(|s| s.transaction_id.as_str()), + )?; + + debug!( + target: LOG_TARGET, + "transaction_pool_get: from_block_id={}, to_block_id={}, len(ready_txs)={}, updates={}", + locked.block_id, + leaf.block_id, + ready_txs.len(), + updates.len() + ); + + ready_txs .into_iter() - .map(TryInto::try_into) + .filter_map(|mut rec| { + if let Some(update) = updates.remove(&rec.transaction_id) { + rec.merge_update(update); + } + + Some(rec).filter(|r| r.is_ready).map(|r| r.try_into()) + }) + .take(max_txs) .collect() } @@ -916,6 +1139,77 @@ impl StateStoreReadTransa Ok(substates) } + + fn substates_check_lock_many<'a, I: IntoIterator>( + &mut self, + objects: I, + lock_flag: SubstateLockFlag, + ) -> Result { + use crate::schema::substates; + + // Lock unique shards + let objects: HashSet = objects.into_iter().map(serialize_hex).collect(); + + let locked_details = substates::table + .select((substates::is_locked_w, substates::destroyed_by_transaction)) + .filter(substates::shard_id.eq_any(&objects)) + .get_results::<(bool, Option)>(self.connection()) + .map_err(|e| SqliteStorageError::DieselError { + operation: "transactions_try_lock_many", + source: e, + })?; + if locked_details.len() < objects.len() { + return Err(SqliteStorageError::NotAllSubstatesFound { + operation: "substates_try_lock_all", + details: format!( + "{:?}: Found {} substates, but {} were requested", + lock_flag, + locked_details.len(), + objects.len() + ), + } + .into()); + } + + if locked_details.iter().any(|(w, _)| *w) { + return Ok(SubstateLockState::SomeAlreadyWriteLocked); + } + + if locked_details.iter().any(|(_, downed)| downed.is_some()) { + return Ok(SubstateLockState::SomeDestroyed); + } + + Ok(SubstateLockState::LockAcquired) + } + + // -------------------------------- LockedOutputs -------------------------------- // + fn locked_outputs_check_all(&mut self, output_shards: I) -> Result + where + I: IntoIterator, + B: Borrow, + { + use crate::schema::locked_outputs; + + let outputs_hex = output_shards + .into_iter() + .map(|shard_id| serialize_hex(shard_id.borrow())) + .collect::>(); + + let has_conflict = locked_outputs::table + .count() + .filter(locked_outputs::shard_id.eq_any(outputs_hex)) + .first::(self.connection()) + .map_err(|e| SqliteStorageError::DieselError { + operation: "locked_outputs_check_all", + source: e, + })?; + + if has_conflict > 0 { + Ok(SubstateLockState::SomeAlreadyWriteLocked) + } else { + Ok(SubstateLockState::LockAcquired) + } + } } #[derive(QueryableByName)] @@ -923,3 +1217,9 @@ struct Count { #[diesel(sql_type = diesel::sql_types::BigInt)] pub count: i64, } + +#[derive(QueryableByName)] +struct BlockIdSqlValue { + #[diesel(sql_type = diesel::sql_types::Text)] + pub bid: String, +} diff --git a/dan_layer/state_store_sqlite/src/schema.rs b/dan_layer/state_store_sqlite/src/schema.rs index 06cbf0b1b7..3c4b435717 100644 --- a/dan_layer/state_store_sqlite/src/schema.rs +++ b/dan_layer/state_store_sqlite/src/schema.rs @@ -158,13 +158,39 @@ diesel::table! { } diesel::table! { - transaction_pool_status (id) { + transaction_pool_history (history_id) { + history_id -> Nullable, + id -> Integer, + transaction_id -> Text, + original_decision -> Text, + local_decision -> Nullable, + remote_decision -> Nullable, + evidence -> Text, + transaction_fee -> BigInt, + leader_fee -> BigInt, + stage -> Text, + new_stage -> Text, + pending_stage -> Nullable, + new_pending_stage -> Nullable, + is_ready -> Bool, + new_is_ready -> Bool, + updated_at -> Timestamp, + created_at -> Timestamp, + change_time -> Nullable, + } +} + +diesel::table! { + transaction_pool_state_updates (id) { id -> Integer, block_id -> Text, block_height -> BigInt, transaction_id -> Text, stage -> Text, + evidence -> Text, is_ready -> Bool, + local_decision -> Text, + remote_decision -> Nullable, created_at -> Timestamp, } } @@ -217,7 +243,8 @@ diesel::allow_tables_to_appear_in_same_query!( quorum_certificates, substates, transaction_pool, - transaction_pool_status, + transaction_pool_history, + transaction_pool_state_updates, transactions, votes, ); diff --git a/dan_layer/state_store_sqlite/src/sql_models/transaction_pool.rs b/dan_layer/state_store_sqlite/src/sql_models/transaction_pool.rs index 041ab2a5f1..8fb8f1b3e3 100644 --- a/dan_layer/state_store_sqlite/src/sql_models/transaction_pool.rs +++ b/dan_layer/state_store_sqlite/src/sql_models/transaction_pool.rs @@ -1,7 +1,7 @@ // Copyright 2023 The Tari Project // SPDX-License-Identifier: BSD-3-Clause -use diesel::Queryable; +use diesel::{Queryable, QueryableByName}; use tari_dan_storage::{consensus_models, consensus_models::TransactionAtom, StorageError}; use time::PrimitiveDateTime; @@ -25,6 +25,17 @@ pub struct TransactionPoolRecord { pub created_at: PrimitiveDateTime, } +impl TransactionPoolRecord { + pub fn merge_update(&mut self, update: TransactionPoolStateUpdate) -> &mut Self { + self.pending_stage = Some(update.stage); + self.evidence = update.evidence; + self.is_ready = update.is_ready; + self.local_decision = update.local_decision; + self.remote_decision = update.remote_decision; + self + } +} + impl TryFrom for consensus_models::TransactionPoolRecord { type Error = StorageError; @@ -46,13 +57,27 @@ impl TryFrom for consensus_models::TransactionPoolRecord } } -#[derive(Debug, Clone, Queryable)] -pub struct TransactionPoolState { +#[derive(Debug, Clone, Queryable, QueryableByName)] +#[diesel(table_name = transaction_pool_state_updates)] +pub struct TransactionPoolStateUpdate { + #[diesel(sql_type = diesel::sql_types::Integer)] pub id: i32, + #[diesel(sql_type = diesel::sql_types::Text)] pub block_id: String, + #[diesel(sql_type = diesel::sql_types::BigInt)] pub block_height: i64, + #[diesel(sql_type = diesel::sql_types::Text)] pub transaction_id: String, + #[diesel(sql_type = diesel::sql_types::Text)] pub stage: String, + #[diesel(sql_type = diesel::sql_types::Text)] + pub evidence: String, + #[diesel(sql_type = diesel::sql_types::Bool)] pub is_ready: bool, + #[diesel(sql_type = diesel::sql_types::Nullable)] + pub local_decision: Option, + #[diesel(sql_type = diesel::sql_types::Nullable)] + pub remote_decision: Option, + #[diesel(sql_type = diesel::sql_types::Timestamp)] pub created_at: PrimitiveDateTime, } diff --git a/dan_layer/state_store_sqlite/src/store.rs b/dan_layer/state_store_sqlite/src/store.rs index 638941470a..959138e726 100644 --- a/dan_layer/state_store_sqlite/src/store.rs +++ b/dan_layer/state_store_sqlite/src/store.rs @@ -51,6 +51,16 @@ impl SqliteStateStore { _addr: PhantomData, }) } + + pub fn foreign_keys_off(&self) -> Result<(), StorageError> { + sql_query("PRAGMA foreign_keys = OFF;") + .execute(&mut *self.connection.lock().unwrap()) + .map_err(|source| SqliteStorageError::DieselError { + source, + operation: "set pragma", + })?; + Ok(()) + } } // Manually implement the Debug implementation because `SqliteConnection` does not implement the Debug trait diff --git a/dan_layer/state_store_sqlite/src/writer.rs b/dan_layer/state_store_sqlite/src/writer.rs index f4f4430ec8..6c6d0397e6 100644 --- a/dan_layer/state_store_sqlite/src/writer.rs +++ b/dan_layer/state_store_sqlite/src/writer.rs @@ -7,23 +7,13 @@ use std::{ ops::{Deref, DerefMut}, }; -use diesel::{ - AsChangeset, - ExpressionMethods, - NullableExpressionMethods, - OptionalExtension, - QueryDsl, - RunQueryDsl, - SqliteConnection, -}; +use diesel::{AsChangeset, ExpressionMethods, OptionalExtension, QueryDsl, RunQueryDsl, SqliteConnection}; use log::*; use tari_dan_common_types::{optional::Optional, Epoch, NodeAddressable, ShardId}; use tari_dan_storage::{ consensus_models::{ Block, BlockId, - Decision, - Evidence, HighQc, LastExecuted, LastProposed, @@ -38,6 +28,7 @@ use tari_dan_storage::{ SubstateRecord, TransactionAtom, TransactionPoolStage, + TransactionPoolStatusUpdate, TransactionRecord, Vote, }, @@ -617,46 +608,57 @@ impl StateStoreWriteTransaction for SqliteStateStoreWrit Ok(()) } - fn transaction_pool_update( - &mut self, - transaction_id: &TransactionId, - evidence: Option<&Evidence>, - pending_stage: Option>, - local_decision: Option, - remote_decision: Option, - is_ready: Option, - ) -> Result<(), StorageError> { - use crate::schema::transaction_pool; - - #[derive(AsChangeset)] - #[diesel(table_name=transaction_pool)] - struct Changes { - evidence: Option, - pending_stage: Option>, - local_decision: Option, - remote_decision: Option, - is_ready: Option, - updated_at: PrimitiveDateTime, - } + fn transaction_pool_add_pending_update(&mut self, update: TransactionPoolStatusUpdate) -> Result<(), StorageError> { + use crate::schema::{transaction_pool, transaction_pool_state_updates}; - let change_set = Changes { - evidence: evidence.map(serialize_json).transpose()?, - pending_stage: pending_stage.map(|s| s.map(|s| s.to_string())), - local_decision: local_decision.map(|d| d.to_string()), - remote_decision: remote_decision.map(|d| d.to_string()), - is_ready, - updated_at: now(), - }; + let values = ( + transaction_pool_state_updates::block_id.eq(serialize_hex(update.block_id())), + transaction_pool_state_updates::block_height.eq(update.block_height().as_u64() as i64), + transaction_pool_state_updates::transaction_id.eq(serialize_hex(update.transaction_id())), + transaction_pool_state_updates::evidence.eq(serialize_json(update.evidence())?), + transaction_pool_state_updates::stage.eq(update.stage().to_string()), + transaction_pool_state_updates::local_decision.eq(update.local_decision().to_string()), + transaction_pool_state_updates::remote_decision.eq(update.remote_decision().map(|d| d.to_string())), + transaction_pool_state_updates::is_ready.eq(update.is_ready()), + ); - diesel::update(transaction_pool::table) - .filter(transaction_pool::transaction_id.eq(serialize_hex(transaction_id))) - .set(change_set) + diesel::insert_into(transaction_pool_state_updates::table) + .values(values) .execute(self.connection()) .map_err(|e| SqliteStorageError::DieselError { - operation: "transaction_pool_update", + operation: "transaction_pool_add_pending_update", source: e, })?; + // #[derive(AsChangeset)] + // #[diesel(table_name=transaction_pool)] + // struct Changes { + // evidence: Option, + // pending_stage: Option>, + // local_decision: Option, + // remote_decision: Option, + // is_ready: Option, + // updated_at: PrimitiveDateTime, + // } + // + // let change_set = Changes { + // evidence: Some(serialize_json(&update.evidence())?), + // pending_stage: Some(Some(update.stage().to_string())), + // local_decision: Some(update.local_decision().to_string()), + // remote_decision: update.remote_decision().map(|d| d.to_string()), + // is_ready: Some(update.is_ready()), + // updated_at: now(), + // }; + // + // diesel::update(transaction_pool::table) + // .filter(transaction_pool::transaction_id.eq(serialize_hex(update.transaction_id()))) + // .set(change_set) + // .execute(self.connection()) + // .map_err(|e| SqliteStorageError::DieselError { + // operation: "transaction_pool_update", + // source: e, + // })?; + Ok(()) } @@ -683,9 +685,11 @@ impl StateStoreWriteTransaction for SqliteStateStoreWrit fn transaction_pool_set_all_transitions<'a, I: IntoIterator>( &mut self, + locked_block: &LockedBlock, + new_locked_block: &LockedBlock, tx_ids: I, ) -> Result<(), StorageError> { - use crate::schema::transaction_pool; + use crate::schema::{transaction_pool, transaction_pool_state_updates}; let tx_ids = tx_ids.into_iter().map(serialize_hex).collect::>(); @@ -706,29 +710,44 @@ impl StateStoreWriteTransaction for SqliteStateStoreWrit .into()); } - diesel::update(transaction_pool::table) - .filter(transaction_pool::transaction_id.eq_any(&tx_ids)) - .filter(transaction_pool::pending_stage.is_not_null()) - .set(transaction_pool::stage.eq(transaction_pool::pending_stage.assume_not_null())) - .execute(self.connection()) - .map_err(|e| SqliteStorageError::DieselError { - operation: "transaction_pool_set_all_transitions", - source: e, - })?; + let updates = self.get_transaction_atom_state_updates_between_blocks( + locked_block.block_id(), + new_locked_block.block_id(), + tx_ids.iter().map(|s| s.as_str()), + )?; - diesel::update(transaction_pool::table) - .filter(transaction_pool::transaction_id.eq_any(&tx_ids)) - .filter(transaction_pool::pending_stage.is_not_null()) - .set(( - transaction_pool::pending_stage.eq(None::), - transaction_pool::updated_at.eq(now()), - )) + debug!( + target: LOG_TARGET, + "transaction_pool_set_all_transitions: locked_block={}, new_locked_block={}, {} transactions, {} updates", locked_block, new_locked_block, tx_ids.len(), updates.len() + ); + + diesel::delete(transaction_pool_state_updates::table) + .filter(transaction_pool_state_updates::transaction_id.eq_any(&tx_ids)) + .filter(transaction_pool_state_updates::block_id.eq(&new_locked_block)) .execute(self.connection()) .map_err(|e| SqliteStorageError::DieselError { operation: "transaction_pool_set_all_transitions", source: e, })?; + for update in updates.into_values() { + diesel::update(transaction_pool::table) + .filter(transaction_pool::transaction_id.eq(&update.transaction_id)) + .set(( + transaction_pool::stage.eq(update.stage), + transaction_pool::local_decision.eq(update.local_decision), + transaction_pool::remote_decision.eq(update.remote_decision), + transaction_pool::evidence.eq(update.evidence), + transaction_pool::is_ready.eq(update.is_ready), + transaction_pool::updated_at.eq(now()), + )) + .execute(self.connection()) + .map_err(|e| SqliteStorageError::DieselError { + operation: "transaction_pool_set_all_transitions", + source: e, + })?; + } + Ok(()) } diff --git a/dan_layer/state_store_sqlite/tests/tests.rs b/dan_layer/state_store_sqlite/tests/tests.rs index 59ae232c50..e927d683bb 100644 --- a/dan_layer/state_store_sqlite/tests/tests.rs +++ b/dan_layer/state_store_sqlite/tests/tests.rs @@ -1,5 +1,129 @@ // Copyright 2023 The Tari Project // SPDX-License-Identifier: BSD-3-Clause -#[test] -fn todo_write_some_tests() {} +use rand::{rngs::OsRng, RngCore}; +use tari_dan_common_types::{Epoch, NodeHeight}; +use tari_dan_storage::{ + consensus_models::{Block, Command, Decision, TransactionAtom, TransactionPoolStage, TransactionPoolStatusUpdate}, + StateStore, + StateStoreReadTransaction, + StateStoreWriteTransaction, +}; +use tari_state_store_sqlite::SqliteStateStore; +use tari_transaction::TransactionId; + +fn create_db() -> SqliteStateStore { + SqliteStateStore::connect(":memory:").unwrap() +} + +fn create_tx_atom() -> TransactionAtom { + let mut bytes = [0u8; 32]; + OsRng.fill_bytes(&mut bytes); + TransactionAtom { + id: TransactionId::new(bytes), + decision: Decision::Commit, + evidence: Default::default(), + transaction_fee: 0, + leader_fee: 0, + } +} + +mod confirm_all_transitions { + + use super::*; + + #[test] + fn it_sets_pending_stage_to_stage() { + let db = create_db(); + db.foreign_keys_off().unwrap(); + let mut tx = db.create_write_tx().unwrap(); + + let atom1 = create_tx_atom(); + let atom2 = create_tx_atom(); + let atom3 = create_tx_atom(); + + let zero_block = Block::zero_block(); + zero_block.insert(&mut tx).unwrap(); + let block1 = Block::new( + *zero_block.id(), + zero_block.justify().clone(), + NodeHeight(1), + Epoch(0), + Default::default(), + // Need to have a command in, otherwise this block will not be included internally in the query because it + // cannot cause a state change without any commands + [Command::Prepare(atom1.clone())].into_iter().collect(), + Default::default(), + ); + block1.insert(&mut tx).unwrap(); + + tx.transaction_pool_insert(atom1.clone(), TransactionPoolStage::New, false) + .unwrap(); + tx.transaction_pool_insert(atom2.clone(), TransactionPoolStage::New, false) + .unwrap(); + tx.transaction_pool_insert(atom3.clone(), TransactionPoolStage::New, false) + .unwrap(); + let block_id = *block1.id(); + + tx.transaction_pool_add_pending_update(TransactionPoolStatusUpdate { + block_id, + block_height: NodeHeight(1), + transaction_id: atom1.id, + stage: TransactionPoolStage::LocalPrepared, + evidence: Default::default(), + is_ready: false, + local_decision: Decision::Commit, + remote_decision: None, + }) + .unwrap(); + tx.transaction_pool_add_pending_update(TransactionPoolStatusUpdate { + block_id, + block_height: NodeHeight(1), + transaction_id: atom2.id, + stage: TransactionPoolStage::Prepared, + evidence: Default::default(), + is_ready: false, + local_decision: Decision::Commit, + remote_decision: None, + }) + .unwrap(); + tx.transaction_pool_add_pending_update(TransactionPoolStatusUpdate { + block_id, + block_height: NodeHeight(1), + transaction_id: atom3.id, + stage: TransactionPoolStage::Prepared, + evidence: Default::default(), + is_ready: false, + local_decision: Decision::Commit, + remote_decision: None, + }) + .unwrap(); + + let rec = tx.transaction_pool_get(zero_block.id(), &block_id, &atom1.id).unwrap(); + assert!(rec.stage().is_new()); + assert!(rec.pending_stage().unwrap().is_local_prepared()); + + let rec = tx.transaction_pool_get(zero_block.id(), &block_id, &atom2.id).unwrap(); + assert!(rec.stage().is_new()); + assert!(rec.pending_stage().unwrap().is_prepared()); + + tx.transaction_pool_set_all_transitions(&zero_block.as_locked_block(), &block1.as_locked_block(), &[ + atom1.id, atom3.id, + ]) + .unwrap(); + + let rec = tx.transaction_pool_get(zero_block.id(), &block_id, &atom1.id).unwrap(); + assert!(rec.stage().is_local_prepared()); + assert!(rec.pending_stage().is_none()); + + let rec = tx.transaction_pool_get(zero_block.id(), &block_id, &atom2.id).unwrap(); + assert!(rec.stage().is_new()); + assert!(rec.pending_stage().unwrap().is_prepared()); + + let rec = tx.transaction_pool_get(zero_block.id(), &block_id, &atom3.id).unwrap(); + assert!(rec.stage().is_prepared()); + assert!(rec.pending_stage().is_none()); + + tx.rollback().unwrap(); + } +} diff --git a/dan_layer/storage/src/consensus_models/block.rs b/dan_layer/storage/src/consensus_models/block.rs index 14e1b87645..5061cf3886 100644 --- a/dan_layer/storage/src/consensus_models/block.rs +++ b/dan_layer/storage/src/consensus_models/block.rs @@ -612,6 +612,10 @@ impl BlockId { pub fn is_genesis(&self) -> bool { self.0.iter().all(|b| *b == 0) } + + pub const fn byte_size() -> usize { + FixedHash::byte_size() + } } impl AsRef<[u8]> for BlockId { diff --git a/dan_layer/storage/src/consensus_models/locked_output.rs b/dan_layer/storage/src/consensus_models/locked_output.rs index f77d7f43de..f48a8801a7 100644 --- a/dan_layer/storage/src/consensus_models/locked_output.rs +++ b/dan_layer/storage/src/consensus_models/locked_output.rs @@ -37,6 +37,14 @@ impl LockedOutput { tx.locked_outputs_acquire_all(block_id, transaction_id, output_shards) } + pub fn check_locks(tx: &mut TTx, output_shards: &[ShardId]) -> Result + where TTx: StateStoreReadTransaction { + if tx.substates_any_exist(output_shards)? { + return Ok(SubstateLockState::SomeOutputSubstatesExist); + } + tx.locked_outputs_check_all(output_shards) + } + pub fn try_release_all(tx: &mut TTx, output_shards: I) -> Result, StorageError> where TTx: StateStoreWriteTransaction, diff --git a/dan_layer/storage/src/consensus_models/mod.rs b/dan_layer/storage/src/consensus_models/mod.rs index 0653767fbe..89f9da35fa 100644 --- a/dan_layer/storage/src/consensus_models/mod.rs +++ b/dan_layer/storage/src/consensus_models/mod.rs @@ -17,6 +17,7 @@ mod substate; mod transaction; mod transaction_decision; mod transaction_pool; +mod transaction_pool_status_update; mod vote; mod vote_signature; @@ -36,5 +37,6 @@ pub use substate::*; pub use transaction::*; pub use transaction_decision::*; pub use transaction_pool::*; +pub use transaction_pool_status_update::*; pub use vote::*; pub use vote_signature::*; diff --git a/dan_layer/storage/src/consensus_models/substate.rs b/dan_layer/storage/src/consensus_models/substate.rs index dfc161b3b7..03fbc1335c 100644 --- a/dan_layer/storage/src/consensus_models/substate.rs +++ b/dan_layer/storage/src/consensus_models/substate.rs @@ -134,6 +134,14 @@ impl SubstateRecord { tx.substates_try_lock_many(locked_by_tx, inputs, lock_flag) } + pub fn check_lock_all<'a, TTx: StateStoreReadTransaction, I: IntoIterator>( + tx: &mut TTx, + inputs: I, + lock_flag: SubstateLockFlag, + ) -> Result { + tx.substates_check_lock_many(inputs, lock_flag) + } + pub fn try_unlock_many<'a, TTx: StateStoreWriteTransaction, I: IntoIterator>( tx: &mut TTx, locked_by_tx: &TransactionId, diff --git a/dan_layer/storage/src/consensus_models/transaction_pool.rs b/dan_layer/storage/src/consensus_models/transaction_pool.rs index ec6aa2ff53..7721c8d3e0 100644 --- a/dan_layer/storage/src/consensus_models/transaction_pool.rs +++ b/dan_layer/storage/src/consensus_models/transaction_pool.rs @@ -5,7 +5,6 @@ use std::{ fmt::{Display, Formatter}, marker::PhantomData, num::NonZeroU64, - ops::DerefMut, str::FromStr, }; @@ -17,14 +16,14 @@ use tari_dan_common_types::{ use tari_transaction::TransactionId; use crate::{ - consensus_models::{Block, Command, Decision, QcId, TransactionAtom}, + consensus_models::{BlockId, Decision, LeafBlock, LockedBlock, QcId, TransactionAtom, TransactionPoolStatusUpdate}, StateStore, StateStoreReadTransaction, StateStoreWriteTransaction, StorageError, }; -const LOG_TARGET: &str = "tari::dan::storage::transaction_pool"; +const _LOG_TARGET: &str = "tari::dan::storage::transaction_pool"; #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum TransactionPoolStage { @@ -120,9 +119,20 @@ impl TransactionPool { pub fn get( &self, tx: &mut TStateStore::ReadTransaction<'_>, + leaf: LeafBlock, id: &TransactionId, ) -> Result { - let rec = tx.transaction_pool_get(id)?; + // We always want to fetch the state at the current leaf block until the leaf block + // let leaf = LeafBlock::get(tx)?; + let locked = LockedBlock::get(tx)?; + debug!( + target: _LOG_TARGET, + "TransactionPool::get: transaction_id {}, leaf block {} and locked block {}", + id, + leaf, + locked, + ); + let rec = tx.transaction_pool_get(locked.block_id(), leaf.block_id(), id)?; Ok(rec) } @@ -131,9 +141,8 @@ impl TransactionPool { tx: &mut TStateStore::ReadTransaction<'_>, id: &TransactionId, ) -> Result { - // TODO: optimise - let rec = tx.transaction_pool_get(id).optional()?; - Ok(rec.is_some()) + let exists = tx.transaction_pool_exists(id)?; + Ok(exists) } pub fn insert( @@ -192,53 +201,11 @@ impl TransactionPool { pub fn confirm_all_transitions<'a, I: IntoIterator>( &self, tx: &mut TStateStore::WriteTransaction<'_>, + locked_block: &LockedBlock, + new_locked_block: &LockedBlock, tx_ids: I, ) -> Result<(), TransactionPoolError> { - tx.transaction_pool_set_all_transitions(tx_ids)?; - Ok(()) - } - - pub fn rollback_pending_stages( - &self, - tx: &mut TStateStore::WriteTransaction<'_>, - block: &Block, - ) -> Result<(), TransactionPoolError> { - for cmd in block.commands() { - let transaction = self.get(tx.deref_mut(), cmd.transaction_id())?; - - debug!( - target: LOG_TARGET, - "↩️ Rolling back pending stage for transaction {} from {:?} ", - cmd.transaction_id(), - transaction.pending_stage, - ); - match cmd { - Command::Prepare(t) => { - tx.transaction_pool_update(t.id(), None, Some(None), None, None, Some(true))?; - }, - Command::LocalPrepared(t) => { - // TODO: We can never go back from to Prepared - tx.transaction_pool_update( - t.id(), - None, - Some(Some(TransactionPoolStage::Prepared)), - None, - None, - Some(true), - )?; - }, - Command::Accept(t) => { - tx.transaction_pool_update( - t.id(), - None, - Some(Some(TransactionPoolStage::LocalPrepared)), - None, - None, - Some(true), - )?; - }, - } - } + tx.transaction_pool_set_all_transitions(locked_block, new_locked_block, tx_ids)?; Ok(()) } } @@ -365,12 +332,24 @@ impl TransactionPoolRecord { self.local_decision = Some(decision); self } + + pub fn add_evidence(&mut self, committee_shard: &CommitteeShard, qc_id: QcId) -> &mut Self { + let evidence = &mut self.transaction.evidence; + for (shard, qcs_mut) in evidence.iter_mut() { + if committee_shard.includes_shard(shard) { + qcs_mut.push(qc_id); + } + } + + self + } } impl TransactionPoolRecord { - pub fn pending_transition( + pub fn add_pending_status_update( &mut self, tx: &mut TTx, + block: LeafBlock, pending_stage: TransactionPoolStage, is_ready: bool, ) -> Result<(), TransactionPoolError> { @@ -391,57 +370,51 @@ impl TransactionPoolRecord { }, } - tx.transaction_pool_update( - &self.transaction.id, - None, - Some(Some(pending_stage)), - None, - None, - Some(is_ready), - )?; - self.pending_stage = Some(pending_stage); - - Ok(()) - } + let update = TransactionPoolStatusUpdate { + block_id: block.block_id, + block_height: block.height, + transaction_id: self.transaction.id, + stage: pending_stage, + evidence: self.transaction.evidence.clone(), + is_ready, + local_decision: self.current_local_decision(), + remote_decision: self.remote_decision, + }; - pub fn update_remote_decision( - &mut self, - tx: &mut TTx, - decision: Decision, - ) -> Result<(), TransactionPoolError> { - tx.transaction_pool_update(&self.transaction.id, None, None, None, Some(decision), None)?; - self.set_remote_decision(decision); - Ok(()) - } + tx.transaction_pool_add_pending_update(update)?; + self.pending_stage = Some(pending_stage); - pub fn update_local_decision( - &mut self, - tx: &mut TTx, - decision: Decision, - ) -> Result<(), TransactionPoolError> { - if self.local_decision.map(|d| d != decision).unwrap_or(true) { - self.set_local_decision(decision); - tx.transaction_pool_update(&self.transaction.id, None, None, Some(decision), None, None)?; - } Ok(()) } - pub fn update_evidence( - &mut self, - tx: &mut TTx, - committee_shard: &CommitteeShard, - qc_id: QcId, - ) -> Result<(), TransactionPoolError> { - let evidence = &mut self.transaction.evidence; - for (shard, qcs_mut) in evidence.iter_mut() { - if committee_shard.includes_shard(shard) { - qcs_mut.push(qc_id); - } - } - tx.transaction_pool_update(&self.transaction.id, Some(evidence), None, None, None, None)?; - - Ok(()) - } + // pub fn update_remote_decision( + // &mut self, + // tx: &mut TTx, + // decision: Decision, + // ) -> Result<(), TransactionPoolError> { tx.transaction_pool_add_pending_update(&self.transaction.id, None, None, + // None, Some(decision), None)?; self.set_remote_decision(decision); Ok(()) + // } + + // pub fn update_local_decision( + // &mut self, + // tx: &mut TTx, + // decision: Decision, + // ) -> Result<(), TransactionPoolError> { if self.local_decision.map(|d| d != decision).unwrap_or(true) { + // self.set_local_decision(decision); tx.transaction_pool_add_pending_update(&self.transaction.id, None, None, + // Some(decision), None, None)?; } Ok(()) + // } + + // pub fn update_evidence( + // &mut self, + // tx: &mut TTx, + // committee_shard: &CommitteeShard, + // qc_id: QcId, + // ) -> Result<(), TransactionPoolError> { let evidence = &mut self.transaction.evidence; for (shard, qcs_mut) in + // evidence.iter_mut() { if committee_shard.includes_shard(shard) { qcs_mut.push(qc_id); } } + // tx.transaction_pool_add_pending_update(&self.transaction.id, Some(evidence), None, None, None, None)?; + // + // Ok(()) + // } pub fn remove(&self, tx: &mut TTx) -> Result<(), TransactionPoolError> { tx.transaction_pool_remove(&self.transaction.id)?; diff --git a/dan_layer/storage/src/consensus_models/transaction_pool_status_update.rs b/dan_layer/storage/src/consensus_models/transaction_pool_status_update.rs new file mode 100644 index 0000000000..2e38277a8b --- /dev/null +++ b/dan_layer/storage/src/consensus_models/transaction_pool_status_update.rs @@ -0,0 +1,53 @@ +// Copyright 2023 The Tari Project +// SPDX-License-Identifier: BSD-3-Clause + +use tari_dan_common_types::NodeHeight; +use tari_transaction::TransactionId; + +use crate::consensus_models::{BlockId, Decision, Evidence, TransactionPoolStage}; + +#[derive(Debug, Clone)] +pub struct TransactionPoolStatusUpdate { + pub block_id: BlockId, + pub block_height: NodeHeight, + pub transaction_id: TransactionId, + pub stage: TransactionPoolStage, + pub evidence: Evidence, + pub is_ready: bool, + pub local_decision: Decision, + pub remote_decision: Option, +} + +impl TransactionPoolStatusUpdate { + pub fn block_id(&self) -> &BlockId { + &self.block_id + } + + pub fn block_height(&self) -> NodeHeight { + self.block_height + } + + pub fn transaction_id(&self) -> &TransactionId { + &self.transaction_id + } + + pub fn stage(&self) -> TransactionPoolStage { + self.stage + } + + pub fn evidence(&self) -> &Evidence { + &self.evidence + } + + pub fn is_ready(&self) -> bool { + self.is_ready + } + + pub fn local_decision(&self) -> Decision { + self.local_decision + } + + pub fn remote_decision(&self) -> Option { + self.remote_decision + } +} diff --git a/dan_layer/storage/src/state_store/mod.rs b/dan_layer/storage/src/state_store/mod.rs index 4a43952e60..ad399d4cc5 100644 --- a/dan_layer/storage/src/state_store/mod.rs +++ b/dan_layer/storage/src/state_store/mod.rs @@ -15,8 +15,6 @@ use crate::{ consensus_models::{ Block, BlockId, - Decision, - Evidence, HighQc, LastExecuted, LastProposed, @@ -32,6 +30,7 @@ use crate::{ TransactionAtom, TransactionPoolRecord, TransactionPoolStage, + TransactionPoolStatusUpdate, TransactionRecord, Vote, }, @@ -132,7 +131,13 @@ pub trait StateStoreReadTransaction { ) -> Result, StorageError>; // -------------------------------- Transaction Pools -------------------------------- // - fn transaction_pool_get(&mut self, transaction_id: &TransactionId) -> Result; + fn transaction_pool_get( + &mut self, + from_block_id: &BlockId, + to_block_id: &BlockId, + transaction_id: &TransactionId, + ) -> Result; + fn transaction_pool_exists(&mut self, transaction_id: &TransactionId) -> Result; fn transaction_pool_get_many_ready(&mut self, max_txs: usize) -> Result, StorageError>; fn transaction_pool_count( &mut self, @@ -181,6 +186,16 @@ pub trait StateStoreReadTransaction { &mut self, transaction_id: &TransactionId, ) -> Result, StorageError>; + fn substates_check_lock_many<'a, I: IntoIterator>( + &mut self, + objects: I, + lock_flag: SubstateLockFlag, + ) -> Result; + + fn locked_outputs_check_all(&mut self, output_shards: I) -> Result + where + I: IntoIterator, + B: Borrow; } pub trait StateStoreWriteTransaction { @@ -222,18 +237,15 @@ pub trait StateStoreWriteTransaction { stage: TransactionPoolStage, is_ready: bool, ) -> Result<(), StorageError>; - fn transaction_pool_update( + fn transaction_pool_add_pending_update( &mut self, - transaction_id: &TransactionId, - evidence: Option<&Evidence>, - pending_stage: Option>, - local_decision: Option, - remote_decision: Option, - is_ready: Option, + pool_update: TransactionPoolStatusUpdate, ) -> Result<(), StorageError>; fn transaction_pool_remove(&mut self, transaction_id: &TransactionId) -> Result<(), StorageError>; fn transaction_pool_set_all_transitions<'a, I: IntoIterator>( &mut self, + locked_block: &LockedBlock, + new_locked_block: &LockedBlock, tx_ids: I, ) -> Result<(), StorageError>; diff --git a/dan_layer/transaction/src/transaction_id.rs b/dan_layer/transaction/src/transaction_id.rs index 8214dd1da4..b3d25d5e60 100644 --- a/dan_layer/transaction/src/transaction_id.rs +++ b/dan_layer/transaction/src/transaction_id.rs @@ -36,6 +36,10 @@ impl TransactionId { let bytes = from_hex(hex).map_err(|_| FixedHashSizeError)?; Self::try_from(bytes.as_slice()) } + + pub const fn byte_size() -> usize { + 32 + } } impl AsRef<[u8]> for TransactionId {