Skip to content

Commit

Permalink
Parallelized buffer flush and compaction
Browse files Browse the repository at this point in the history
  • Loading branch information
cswinter committed Aug 18, 2024
1 parent 78a9f91 commit 1a616f3
Show file tree
Hide file tree
Showing 10 changed files with 184 additions and 108 deletions.
14 changes: 12 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ fnv = "1.0"
futures = "0.3"
google-cloud-storage = { version = "0.16", features = ["rustls-tls", "auth"], default-features = false }
hex = "0.3"
itertools = "0.5"
itertools = "0.13"
lazy_static = "1.4.0"
locustdb-compression-utils = {path = "./locustdb-compression-utils", version = "0.2.0"}
locustdb-derive = {path = "./locustdb-derive", version = "0.2.1"}
Expand Down Expand Up @@ -63,6 +63,7 @@ structopt = "0.3"
systemstat = "0.1.8"
tempfile = "3"
tera = "1"
threadpool = "1.8.1"
time = "0.2"
tokio = { version = "1", features = ["full"] }
tokio-util = "0.7"
Expand Down
6 changes: 6 additions & 0 deletions src/bin/repl/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,10 @@ struct Opt {
/// Maximum length of temporary buffer used in streaming stages during query execution
#[structopt(long, default_value = "1024")]
batch_size: usize,

/// Number of parallel threads used during WAL flush table batching and compacting
#[structopt(long, default_value = "1")]
wal_flush_compaction_threads: usize,
}

fn main() {
Expand All @@ -128,6 +132,7 @@ fn main() {
cors_allow_origin,
addrs,
batch_size,
wal_flush_compaction_threads,
} = Opt::from_args();

let options = locustdb::Options {
Expand All @@ -143,6 +148,7 @@ fn main() {
partition_combine_factor,
batch_size,
max_partition_length: 1024 * 1024,
wal_flush_compaction_threads,
};

if options.readahead > options.mem_size_limit_tables {
Expand Down
3 changes: 2 additions & 1 deletion src/disk_store/meta_store.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use capnp::serialize_packed;
use itertools::Itertools;
use locustdb_serialization::{dbmeta_capnp, default_reader_options};
use lz4_flex::block::{compress_prepend_size, decompress_size_prepended};
use pco::standalone::{simple_decompress, simpler_compress};
Expand Down Expand Up @@ -100,7 +101,7 @@ impl MetaStore {
let mut subpartition_builder = subpartitions_builder.reborrow().get(i as u32);
subpartition_builder.set_size_bytes(subpartition.size_bytes);
subpartition_builder.set_subpartition_key(&subpartition.subpartition_key);
let subpartition_column_ids_sorted = itertools::Itertools::sorted(subpartition_index_to_column_names[i].iter().cloned());
let subpartition_column_ids_sorted: Vec<_> = subpartition_index_to_column_names[i].iter().cloned().sorted().collect();
let all_column_ids_compressed = simpler_compress(&subpartition_column_ids_sorted, DEFAULT_COMPRESSION_LEVEL).unwrap();
subpartition_builder.set_compressed_interned_columns(&all_column_ids_compressed[..]);
}
Expand Down
1 change: 1 addition & 0 deletions src/engine/execution/query_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ impl QueryTask {
name,
})
.sorted_by(|a, b| a.name.cmp(&b.name))
.collect()
}

let referenced_cols = query.find_referenced_cols();
Expand Down
4 changes: 2 additions & 2 deletions src/ingest/csv_loader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ impl Options {
}
}

pub fn ingest_file(ldb: &InnerLocustDB, opts: &Options) -> Result<(), String> {
pub fn ingest_file(ldb: &Arc<InnerLocustDB>, opts: &Options) -> Result<(), String> {
// Can't combine these two branches because csv::Reader takes a type param which differs for creating from Reader/File
if opts.unzip {
let f = File::open(&opts.filename).map_err(|x| x.to_string())?;
Expand Down Expand Up @@ -173,7 +173,7 @@ pub fn ingest_file(ldb: &InnerLocustDB, opts: &Options) -> Result<(), String> {
}

fn auto_ingest<T>(
ldb: &InnerLocustDB,
ldb: &Arc<InnerLocustDB>,
records: T,
colnames: &[String],
opts: &Options,
Expand Down
3 changes: 3 additions & 0 deletions src/locustdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,8 @@ pub struct Options {
pub batch_size: usize,
/// Maximum number of rows in a partitions. Not implemented.
pub max_partition_length: usize,
/// Number of parallel threads used during WAL flush table batching and compacting
pub wal_flush_compaction_threads: usize,
}

impl Default for Options {
Expand All @@ -233,6 +235,7 @@ impl Default for Options {
partition_combine_factor: 4,
batch_size: 1024,
max_partition_length: 1024 * 1024,
wal_flush_compaction_threads: 1,
}
}
}
Expand Down
11 changes: 7 additions & 4 deletions src/mem_store/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,8 @@ impl Table {
let by_offset: Vec<Arc<Partition>> = partitions
.values()
.cloned()
.sorted_by(|p1, p2| p1.range().start.cmp(&p2.range().start));
.sorted_by(|p1, p2| p1.range().start.cmp(&p2.range().start))
.collect();
let cumulative = by_offset
.iter()
.rev()
Expand Down Expand Up @@ -400,7 +401,7 @@ impl Table {
columns.insert(col.clone());
}
}
columns.into_iter().sorted()
columns.into_iter().sorted().collect()
}

pub fn search_column_names(&self, pattern: &str) -> Vec<String> {
Expand All @@ -410,12 +411,14 @@ impl Table {
.iter()
.filter(|col| re.is_match(col))
.cloned()
.sorted(),
.sorted()
.collect(),
Err(_) => column_names
.iter()
.filter(|col| col.contains(pattern))
.cloned()
.sorted(),
.sorted()
.collect(),
}
}

Expand Down
Loading

0 comments on commit 1a616f3

Please sign in to comment.