Skip to content

Commit

Permalink
Combine all compaction meta store writes
Browse files Browse the repository at this point in the history
  • Loading branch information
cswinter committed Aug 18, 2024
1 parent 0d4d741 commit e62b887
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 11 deletions.
33 changes: 23 additions & 10 deletions src/disk_store/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -253,15 +253,15 @@ impl Storage {
}

// Combine set of partitions into single new partition.
pub fn compact(
pub fn prepare_compact(
&self,
table: &str,
id: PartitionID,
metadata: Vec<SubpartitionMetadata>,
subpartitions: Vec<Vec<Arc<Column>>>,
old_partitions: &[PartitionID],
offset: usize,
) {
) -> Vec<(u64, String)> {
log::debug!(
"compacting {} parititions into {} for table {}",
old_partitions.len(),
Expand All @@ -278,6 +278,7 @@ impl Storage {
.map(move |name| (name, i))
})
.collect();

// Persist new partition files
let partition = PartitionMetadata {
id,
Expand All @@ -289,7 +290,7 @@ impl Storage {
};
self.write_subpartitions(&partition, subpartitions);

// Atomically update metastore
// Update metastore
let mut meta_store = self.meta_store.write().unwrap();
let all_partitions = meta_store.partitions.get_mut(table).unwrap();
let to_delete: Vec<(u64, String)> = old_partitions
Expand All @@ -309,15 +310,27 @@ impl Storage {
.get_mut(table)
.unwrap()
.insert(partition.id, partition);
drop(meta_store);
let meta_store = self.meta_store.read().unwrap();
self.write_metastore(&meta_store);

to_delete
}

pub fn commit_compacts(
&self,
to_delete: Vec<(String, Vec<(u64, String)>)>,
) {
// Persist metastore
{
let meta_store = self.meta_store.read().unwrap();
self.write_metastore(&meta_store);
}

// Delete old partition files
let table_dir = self.tables_path.join(sanitize_table_name(table));
for (id, key) in to_delete {
let path = table_dir.join(partition_filename(id, &key));
self.writer.delete(&path).unwrap();
for (table, to_delete) in &to_delete {
for (id, key) in to_delete {
let table_dir = self.tables_path.join(sanitize_table_name(table));
let path = table_dir.join(partition_filename(*id, key));
self.writer.delete(&path).unwrap();
}
}
}

Expand Down
1 change: 1 addition & 0 deletions src/mem_store/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,7 @@ impl Table {
buffer.push_untyped_cols(columns);
}

/// Creates a new partition from current buffer and returns it.
pub(crate) fn batch(&self) -> Option<Arc<Partition>> {
let mut buffer = self.buffer.lock().unwrap();
if buffer.len() == 0 {
Expand Down
9 changes: 8 additions & 1 deletion src/scheduler/inner_locustdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,7 @@ impl InnerLocustDB {
let mut compactions = Vec::new();
let start_time_batching = Instant::now();
for table in tables.values() {
// TODO: use double buffering to quickly create new open buffer that unblocks ingestion
if let Some(partition) = table.batch() {
let columns: Vec<_> = partition
.col_handles()
Expand Down Expand Up @@ -326,6 +327,7 @@ impl InnerLocustDB {
}

let start_time_compaction = Instant::now();
let mut partitions_to_delete = Vec::new();
for (table, id, (range, parts)) in compactions {
// get table, create new merged partition/sub-partitions (not registered with table)
// - get names of all columns
Expand Down Expand Up @@ -373,14 +375,19 @@ impl InnerLocustDB {
let (metadata, subpartitions) = subpartition(&self.opts, columns.clone());
// write subpartitions to disk, update metastore unlinking old partitions, delete old partitions
if let Some(storage) = self.storage.as_ref() {
storage.compact(table, id, metadata, subpartitions, &parts, range.start);
let to_delete = storage.prepare_compact(table, id, metadata, subpartitions, &parts, range.start);
partitions_to_delete.push((table.to_string(), to_delete));
}

// replace old partitions with new partition
tables[table].compact(id, range.start, columns, &parts);
}
let time_compaction = start_time_compaction.elapsed();

if let Some(storage) = self.storage.as_ref() {
storage.commit_compacts(partitions_to_delete);
}

let total_time = start_time.elapsed();
match persist_timings {
None =>
Expand Down

0 comments on commit e62b887

Please sign in to comment.