Skip to content

Commit

Permalink
prune outputs at end of horizon sync
Browse files Browse the repository at this point in the history
  • Loading branch information
sdbondi committed Nov 26, 2021
1 parent 4b9a4a7 commit 4b1e913
Show file tree
Hide file tree
Showing 9 changed files with 101 additions and 136 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,7 @@ pub struct HorizonStateSynchronization<'a, B: BlockchainBackend> {
prover: Arc<RangeProofService>,
num_kernels: u64,
num_outputs: u64,
kernel_sum: Commitment,
utxo_sum: Commitment,
full_bitmap: Option<Bitmap>,
}

impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> {
Expand All @@ -89,8 +88,7 @@ impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> {
prover,
num_kernels: 0,
num_outputs: 0,
kernel_sum: Default::default(),
utxo_sum: Default::default(),
full_bitmap: None,
}
}

Expand Down Expand Up @@ -135,47 +133,36 @@ impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> {
client: &mut rpc::BaseNodeSyncRpcClient,
to_header: &BlockHeader,
) -> Result<(), HorizonSyncError> {
// debug!(target: LOG_TARGET, "Initializing");
// self.initialize().await?;
debug!(target: LOG_TARGET, "Initializing");
self.initialize().await?;
debug!(target: LOG_TARGET, "Synchronizing kernels");
self.synchronize_kernels(client, to_header).await?;
debug!(target: LOG_TARGET, "Synchronizing outputs");
self.synchronize_outputs(client, to_header).await?;
Ok(())
}

// async fn initialize(&mut self) -> Result<(), HorizonSyncError> {
// let db = self.db();
// let local_metadata = db.get_chain_metadata().await?;
//
// let new_prune_height = cmp::min(local_metadata.height_of_longest_chain(), self.horizon_sync_height);
// if local_metadata.pruned_height() < new_prune_height {
// debug!(target: LOG_TARGET, "Pruning block chain to height {}", new_prune_height);
// db.prune_to_height(new_prune_height).await?;
// }
//
// // prune_to_height updates horizon data
// let horizon_data = db.fetch_horizon_data().await?;
//
// debug!(
// target: LOG_TARGET,
// "Loaded from horizon data utxo_sum = {}, kernel_sum = {}",
// horizon_data.utxo_sum().to_hex(),
// horizon_data.kernel_sum().to_hex(),
// );
// self.utxo_sum = horizon_data.utxo_sum().clone();
// self.kernel_sum = horizon_data.kernel_sum().clone();
//
// Ok(())
// }
async fn initialize(&mut self) -> Result<(), HorizonSyncError> {
let db = self.db();
let local_metadata = db.get_chain_metadata().await?;

let new_prune_height = cmp::min(local_metadata.height_of_longest_chain(), self.horizon_sync_height);
if local_metadata.pruned_height() < new_prune_height {
debug!(target: LOG_TARGET, "Pruning block chain to height {}", new_prune_height);
db.prune_to_height(new_prune_height + 1).await?;
}

self.full_bitmap = Some(db.fetch_deleted_bitmap_at_tip().await?.into_bitmap());

Ok(())
}

async fn synchronize_kernels(
&mut self,
client: &mut rpc::BaseNodeSyncRpcClient,
to_header: &BlockHeader,
) -> Result<(), HorizonSyncError> {
let local_num_kernels = self.db().fetch_mmr_size(MmrTree::Kernel).await?;
let metadata = self.db().get_chain_metadata().await?;

let remote_num_kernels = to_header.kernel_mmr_size;
self.num_kernels = remote_num_kernels;
Expand Down Expand Up @@ -235,7 +222,6 @@ impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> {
.map_err(HorizonSyncError::InvalidKernelSignature)?;

kernel_hashes.push(kernel.hash());
self.kernel_sum = &self.kernel_sum + &kernel.excess;

txn.insert_kernel_via_horizon_sync(kernel, current_header.hash().clone(), mmr_position as u32);
if mmr_position == current_header.header().kernel_mmr_size - 1 {
Expand Down Expand Up @@ -276,13 +262,10 @@ impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> {
txn.update_block_accumulated_data_via_horizon_sync(
current_header.hash().clone(),
UpdateBlockAccumulatedData {
kernel_sum: Some(self.kernel_sum.clone()),
kernel_hash_set: Some(kernel_hash_set),
..Default::default()
},
);
debug!(target: LOG_TARGET, "Setting kernel sum = {}", self.kernel_sum.to_hex());
txn.set_pruned_height(metadata.pruned_height(), self.kernel_sum.clone(), self.utxo_sum.clone());

txn.commit().await?;
debug!(
Expand Down Expand Up @@ -323,8 +306,6 @@ impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> {
) -> Result<(), HorizonSyncError> {
let local_num_outputs = self.db().fetch_mmr_size(MmrTree::Utxo).await?;

let metadata = self.db().get_chain_metadata().await?;

let remote_num_outputs = to_header.output_mmr_size;
self.num_outputs = remote_num_outputs;

Expand Down Expand Up @@ -390,7 +371,6 @@ impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> {
.fetch_block_accumulated_data(current_header.header().prev_hash.clone())
.await?;
let (_, output_pruned_set, witness_pruned_set, _) = block_data.dissolve();
let mut full_bitmap = self.db().fetch_deleted_bitmap_at_tip().await?.into_bitmap();

let mut output_mmr = MerkleMountainRange::<HashDigest, _>::new(output_pruned_set);
let mut witness_mmr = MerkleMountainRange::<HashDigest, _>::new(witness_pruned_set);
Expand Down Expand Up @@ -425,7 +405,6 @@ impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> {

output_mmr.push(output.hash())?;
witness_mmr.push(output.witness_hash())?;
self.utxo_sum = &self.utxo_sum + &output.commitment;

txn.insert_output_via_horizon_sync(
output,
Expand Down Expand Up @@ -489,11 +468,12 @@ impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> {

// Merge the differences into the final bitmap so that we can commit to the entire spend state
// in the output MMR
full_bitmap.or_inplace(&diff_bitmap);
full_bitmap.run_optimize();
let bitmap = self.full_bitmap_mut();
bitmap.or_inplace(&diff_bitmap);
bitmap.run_optimize();

let pruned_output_set = output_mmr.get_pruned_hash_set()?;
let output_mmr = MutableMmr::<HashDigest, _>::new(pruned_output_set.clone(), full_bitmap.clone())?;
let output_mmr = MutableMmr::<HashDigest, _>::new(pruned_output_set.clone(), bitmap.clone())?;

let mmr_root = output_mmr.get_merkle_root()?;
if mmr_root != current_header.header().output_mr {
Expand Down Expand Up @@ -523,14 +503,12 @@ impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> {
txn.update_block_accumulated_data_via_horizon_sync(
current_header.hash().clone(),
UpdateBlockAccumulatedData {
utxo_sum: Some(self.utxo_sum.clone()),
utxo_hash_set: Some(pruned_output_set),
witness_hash_set: Some(witness_hash_set),
deleted_diff: Some(diff_bitmap.into()),
..Default::default()
},
);
txn.set_pruned_height(metadata.pruned_height(), self.kernel_sum.clone(), self.utxo_sum.clone());
txn.commit().await?;

debug!(
Expand Down Expand Up @@ -588,6 +566,7 @@ impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> {
"Sync node did not send all utxos requested".to_string(),
));
}

Ok(())
}

Expand Down Expand Up @@ -632,7 +611,6 @@ impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> {
)));

let header = self.db().fetch_chain_header(self.horizon_sync_height).await?;
// TODO: Use cumulative kernel and utxo sums
let (calc_utxo_sum, calc_kernel_sum) = self.calculate_commitment_sums(&header).await?;

self.shared
Expand All @@ -659,29 +637,42 @@ impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> {
header.accumulated_data().total_accumulated_difficulty,
metadata.best_block().clone(),
)
.set_pruned_height(header.height(), self.kernel_sum.clone(), self.utxo_sum.clone())
.set_pruned_height(header.height(), calc_kernel_sum, calc_utxo_sum)
.commit()
.await?;

Ok(())
}

fn take_final_bitmap(&mut self) -> Arc<Bitmap> {
self.full_bitmap
.take()
.map(Arc::new)
.expect("take_full_bitmap called before initialize")
}

fn full_bitmap_mut(&mut self) -> &mut Bitmap {
self.full_bitmap
.as_mut()
.expect("full_bitmap_mut called before initialize")
}

/// (UTXO sum, Kernel sum)
async fn calculate_commitment_sums(
&self,
&mut self,
header: &ChainHeader,
) -> Result<(Commitment, Commitment), HorizonSyncError> {
let mut pruned_utxo_sum = HomomorphicCommitment::default();
let mut pruned_kernel_sum = HomomorphicCommitment::default();

let mut prev_mmr = 0;
let mut prev_kernel_mmr = 0;
let bitmap = Arc::new(
self.db()
.fetch_complete_deleted_bitmap_at(header.hash().clone())
.await?
.into_bitmap(),
);

let bitmap = self.take_final_bitmap();
let mut txn = self.db().write_transaction();
let mut utxo_mmr_position = 0;
let mut prune_positions = vec![];

for h in 0..=header.height() {
let curr_header = self.db().fetch_chain_header(h).await?;

Expand All @@ -693,10 +684,7 @@ impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> {
prev_mmr,
curr_header.header().output_mmr_size - 1
);
let (utxos, _) = self
.db()
.fetch_utxos_in_block(curr_header.hash().clone(), bitmap.clone())
.await?;
let (utxos, _) = self.db().fetch_utxos_in_block(curr_header.hash().clone(), None).await?;
trace!(
target: LOG_TARGET,
"Fetching kernels from db: height:{}, header.kernel_mmr:{}, prev_mmr:{}, end:{}",
Expand All @@ -711,12 +699,22 @@ impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> {
for u in utxos {
match u {
PrunedOutput::NotPruned { output } => {
pruned_utxo_sum = &output.commitment + &pruned_utxo_sum;
if bitmap.contains(utxo_mmr_position) {
debug!(
target: LOG_TARGET,
"Found output that needs pruning at height: {} position: {}", h, utxo_mmr_position
);
prune_positions.push(utxo_mmr_position);
prune_counter += 1;
} else {
pruned_utxo_sum = &output.commitment + &pruned_utxo_sum;
}
},
_ => {
prune_counter += 1;
},
}
utxo_mmr_position += 1;
}
if prune_counter > 0 {
trace!(target: LOG_TARGET, "Pruned {} outputs", prune_counter);
Expand All @@ -730,14 +728,23 @@ impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> {
}
prev_kernel_mmr = curr_header.header().kernel_mmr_size;

trace!(
target: LOG_TARGET,
"Height: {} Kernel sum:{:?} Pruned UTXO sum: {:?}",
h,
pruned_kernel_sum,
pruned_utxo_sum
);
if h % 1000 == 0 {
debug!(
target: LOG_TARGET,
"Final Validation: {:.2}% complete. Height: {}, mmr_position: {} ",
(h as f32 / header.height() as f32) * 100.0,
h,
utxo_mmr_position,
);
}
}

if !prune_positions.is_empty() {
debug!(target: LOG_TARGET, "Pruning {} spent outputs", prune_positions.len());
txn.prune_output_at_positions(prune_positions);
txn.commit().await?;
}

Ok((pruned_utxo_sum, pruned_kernel_sum))
}

Expand Down
2 changes: 1 addition & 1 deletion base_layer/core/src/base_node/sync/rpc/sync_utxos_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ where B: BlockchainBackend + 'static

let (utxos, deleted_diff) = self
.db
.fetch_utxos_in_block(current_header.hash(), bitmap.clone())
.fetch_utxos_in_block(current_header.hash(), Some(bitmap.clone()))
.await
.map_err(RpcStatus::log_internal_error(LOG_TARGET))?;
debug!(
Expand Down
23 changes: 7 additions & 16 deletions base_layer/core/src/blocks/accumulated_data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,14 +49,13 @@ use tari_mmr::{pruned_hashset::PrunedHashSet, ArrayLike};

const LOG_TARGET: &str = "c::bn::acc_data";

#[derive(Debug, Serialize, Deserialize, Clone)]
#[derive(Debug, Serialize, Deserialize)]
pub struct BlockAccumulatedData {
pub(crate) kernels: PrunedHashSet,
pub(crate) outputs: PrunedHashSet,
pub(crate) witness: PrunedHashSet,
pub(crate) deleted: DeletedBitmap,
pub(crate) cumulative_kernel_sum: Commitment,
pub(crate) cumulative_utxo_sum: Commitment,
pub(crate) kernel_sum: Commitment,
}

impl BlockAccumulatedData {
Expand All @@ -65,16 +64,14 @@ impl BlockAccumulatedData {
outputs: PrunedHashSet,
witness: PrunedHashSet,
deleted: Bitmap,
cumulative_kernel_sum: Commitment,
cumulative_utxo_sum: Commitment,
total_kernel_sum: Commitment,
) -> Self {
Self {
kernels,
outputs,
witness,
deleted: DeletedBitmap { deleted },
cumulative_kernel_sum,
cumulative_utxo_sum,
kernel_sum: total_kernel_sum,
}
}

Expand All @@ -91,12 +88,8 @@ impl BlockAccumulatedData {
(self.kernels, self.outputs, self.witness, self.deleted.deleted)
}

pub fn cumulative_kernel_sum(&self) -> &Commitment {
&self.cumulative_kernel_sum
}

pub fn cumulative_utxo_sum(&self) -> &Commitment {
&self.cumulative_utxo_sum
pub fn kernel_sum(&self) -> &Commitment {
&self.kernel_sum
}
}

Expand All @@ -109,8 +102,7 @@ impl Default for BlockAccumulatedData {
deleted: Bitmap::create(),
},
witness: Default::default(),
cumulative_kernel_sum: Default::default(),
cumulative_utxo_sum: Default::default(),
kernel_sum: Default::default(),
}
}
}
Expand All @@ -134,7 +126,6 @@ pub struct UpdateBlockAccumulatedData {
pub utxo_hash_set: Option<PrunedHashSet>,
pub witness_hash_set: Option<PrunedHashSet>,
pub deleted_diff: Option<DeletedBitmap>,
pub utxo_sum: Option<Commitment>,
pub kernel_sum: Option<Commitment>,
}

Expand Down
7 changes: 6 additions & 1 deletion base_layer/core/src/chain_storage/async_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ impl<B: BlockchainBackend + 'static> AsyncBlockchainDb<B> {

make_async_fn!(fetch_utxos_and_mined_info(hashes: Vec<HashOutput>) -> Vec<Option<UtxoMinedInfo>>, "fetch_utxos_and_mined_info");

make_async_fn!(fetch_utxos_in_block(hash: HashOutput, deleted: Arc<Bitmap>) -> (Vec<PrunedOutput>, Bitmap), "fetch_utxos_in_block");
make_async_fn!(fetch_utxos_in_block(hash: HashOutput, deleted: Option<Arc<Bitmap>>) -> (Vec<PrunedOutput>, Bitmap), "fetch_utxos_in_block");

//---------------------------------- Kernel --------------------------------------------//
make_async_fn!(fetch_kernel_by_excess_sig(excess_sig: Signature) -> Option<(TransactionKernel, HashOutput)>, "fetch_kernel_by_excess_sig");
Expand Down Expand Up @@ -367,6 +367,11 @@ impl<'a, B: BlockchainBackend + 'static> AsyncDbTransaction<'a, B> {
self
}

pub fn prune_output_at_positions(&mut self, positions: Vec<u32>) -> &mut Self {
self.transaction.prune_outputs_at_positions(positions);
self
}

pub async fn commit(&mut self) -> Result<(), ChainStorageError> {
let transaction = mem::take(&mut self.transaction);
self.db.write(transaction).await
Expand Down
Loading

0 comments on commit 4b1e913

Please sign in to comment.