Skip to content

Commit

Permalink
fix: removed some unused code, adding filter version of rad and chunk…
Browse files Browse the repository at this point in the history
… reader
  • Loading branch information
Rob Patro committed Jul 11, 2024
1 parent 621b96f commit 7bfc668
Show file tree
Hide file tree
Showing 3 changed files with 202 additions and 20 deletions.
2 changes: 2 additions & 0 deletions src/chunk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ pub struct CorrectedCbChunk {
pub(crate) data: Cursor<Vec<u8>>,
}

/*
impl CorrectedCbChunk {
pub fn from_label_and_counter(corrected_bc_in: u64, num_remain: u32) -> CorrectedCbChunk {
let mut cc = CorrectedCbChunk {
Expand All @@ -60,6 +61,7 @@ impl CorrectedCbChunk {
cc
}
}
*/

#[deprecated(
since = "0.9.0",
Expand Down
29 changes: 15 additions & 14 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,7 @@ impl GlobalEqCellList {
}
}

/*
#[inline]
pub fn dump_chunk(v: &mut CorrectedCbChunk, owriter: &Mutex<BufWriter<File>>) {
v.data.set_position(0);
Expand All @@ -283,6 +284,7 @@ pub fn dump_chunk(v: &mut CorrectedCbChunk, owriter: &Mutex<BufWriter<File>>) {
v.data.write_all(&nrec.to_le_bytes()).unwrap();
owriter.lock().unwrap().write_all(v.data.get_ref()).unwrap();
}
*/

/// Given a [BufReader]`<T>` from which to read a set of records that
/// should reside in the same collated bucket, this function will
Expand Down Expand Up @@ -441,15 +443,14 @@ pub fn collate_temporary_bucket_twopass_atac<T: Read + Seek, U: Write>(
let mut total_bytes = 0usize;
let header_size = 2 * std::mem::size_of::<u32>() as u64;
let size_of_bc = bct.bytes_for_type();
let size_of_rec = std::mem::size_of::<u32>() +
std::mem::size_of::<u8>() +
std::mem::size_of::<u32>() +
std::mem::size_of::<u16>();
let size_of_rec = std::mem::size_of::<u32>()
+ std::mem::size_of::<u8>()
+ std::mem::size_of::<u32>()
+ std::mem::size_of::<u16>();

let size_of_u32 = std::mem::size_of::<u32>();
let calc_record_bytes = |num_aln: usize| -> usize {
size_of_u32 + size_of_bc + (size_of_rec * num_aln)
};
let calc_record_bytes =
|num_aln: usize| -> usize { size_of_u32 + size_of_bc + (size_of_rec * num_aln) };

// read each record
for _ in 0..(nrec as usize) {
Expand Down Expand Up @@ -569,6 +570,7 @@ pub fn collate_temporary_bucket_twopass_atac<T: Read + Seek, U: Write>(
cb_byte_map.len()
}

/*
pub fn collate_temporary_bucket<T: Read>(
reader: &mut T,
bct: &RadIntId,
Expand Down Expand Up @@ -670,6 +672,7 @@ pub fn process_corrected_cb_chunk<T: Read>(
}
}
}
*/

/// Represents a temporary bucket of barcodes whose records will
/// be written together and then collated later in memory.
Expand Down Expand Up @@ -846,11 +849,7 @@ pub fn dump_corrected_cb_chunk_to_temp_file_atac<T: Read>(
// if this record had a correct or correctable barcode
if let Some(corrected_id) = correct_map.get(&tup.0) {
// could be replaced with orientation
let rr = AtacSeqReadRecord::from_bytes_with_header(
reader,
tup.0,
tup.1
);
let rr = AtacSeqReadRecord::from_bytes_with_header(reader, tup.0, tup.1);

if rr.is_empty() {
continue;
Expand Down Expand Up @@ -887,7 +886,9 @@ pub fn dump_corrected_cb_chunk_to_temp_file_atac<T: Read>(
bcursor.write_all(as_u8_slice(&rr.refs[..])).unwrap();
bcursor.write_all(as_u8_slice_u8(&rr.map_type[..])).unwrap();
bcursor.write_all(as_u8_slice(&rr.start_pos[..])).unwrap();
bcursor.write_all(as_u8_slice_u16(&rr.frag_lengths[..])).unwrap();
bcursor
.write_all(as_u8_slice_u16(&rr.frag_lengths[..]))
.unwrap();

// update number of written records
v.num_records_written.fetch_add(1, Ordering::SeqCst);
Expand Down
191 changes: 185 additions & 6 deletions src/readers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,153 @@ where
}
}

/// This free function is used within the [ParallelRadReader] and [ParallelChunkReader] to
/// fill a work queue with [MetaChunk]s from the current file position until the end of the
/// file is reached. It applies the "filter" function to each chunk to determine if the chunk
/// should be included in the output (`filter_fn` returns `true`) or not (`filter_fn` returns
/// `false`).
///
/// <div class="warning">
/// NOTE:: For this function to work correctly, it is assumed that, at the point this function is
/// invoked, the reader `br` is offset at the start of the first [Chunk] in the file (directly
/// after file-level tag values).
/// </div>
///
/// * `br` - The underlying reader from which the [Chunk]s are drawn
/// * `callback` - An optional callback to be invoked when each new [MetaChunk] is placed on the work
/// queue. The callback is given 2 values; the first is the number of bytes of the just-pushed
/// [MetaChunk] and the second is the number of records of the just-pushed [MetaChunk].
/// * `prelude` - A shared reference to the [RadPrelude] corresponding to the chunks in the file
/// * `meta_chunk_queue` - A parallel queue onto which the raw data for each [MetaChunk] will be
/// placed
/// * `done_var` - An [AtomicBool] that will be set to true only once all of the [Chunk]s of the
/// underlying file have been read and added to the work queue.
fn fill_work_queue_filtered_until_eof<R: MappedRecord, T: BufRead, FilterF, F: FnMut(u64, u64)>(
mut br: T,
filter_fn: FilterF,
mut callback: Option<F>,
prelude: &RadPrelude,
meta_chunk_queue: Arc<ArrayQueue<MetaChunk<R>>>,
done_var: Arc<AtomicBool>,
) -> anyhow::Result<()>
where
<R as MappedRecord>::ParsingContext: RecordContext,
<R as MappedRecord>::ParsingContext: Clone,
FilterF: Fn(&[u8], &<R as MappedRecord>::ParsingContext) -> bool,
{
const BUFSIZE: usize = 524208;
// the buffer that will hold our records
let mut buf = vec![0u8; BUFSIZE];
// the number of bytes currently packed into the meta chunk
let mut cbytes = 0u32;
// the number of records currently packed into the meta chunk
let mut crec = 0u32;
// the number of chunks in the current meta chunk
let mut chunks_in_meta_chunk = 0usize;
// the offset of the first chunk in this chunk
let mut first_chunk = 0usize;
// if we had to expand the buffer already and should
// forcibly push the current buffer onto the queue
let mut force_push = false;
// the number of bytes and records in the next chunk header
let mut nbytes_chunk = 0u32;
let mut nrec_chunk = 0u32;
let mut last_chunk = false;

// we include the endpoint here because we will not actually
// copy a chunk in the first iteration (since we have not yet
// read the chunk header, which comes at the end of the loop).
let mut chunk_num = 0;
let record_context = prelude
.get_record_context::<<R as MappedRecord>::ParsingContext>()
.unwrap();
while utils::has_data_left(&mut br).expect("encountered error reading input file") {
// in the first iteration we've not read a header yet
// so we can't fill a chunk, otherwise we read the header
// at the bottom of the previous iteration of this loop, and
// we will fill in the buffer appropriately here.
if chunk_num > 0 {
// if the current chunk (the chunk whose header we read in the last iteration of
// the loop) alone is too big for the buffer, then resize the buffer to be big enough
if nbytes_chunk as usize > buf.len() {
// if we had to resize the buffer to fit this cell, then make sure we push
// immediately in the next round
force_push = true;
let chunk_resize = nbytes_chunk as usize + cbytes as usize;
buf.resize(chunk_resize, 0);
}

// copy the data for the current chunk into the buffer
let boffset = cbytes as usize;
buf.pwrite::<u32>(nbytes_chunk, boffset)?;
buf.pwrite::<u32>(nrec_chunk, boffset + 4)?;
// read everything from the end of the eader into the buffer
br.read_exact(&mut buf[(boffset + 8)..(boffset + nbytes_chunk as usize)])
.context("failed to read from work queue.")?;
// apply the filter
if filter_fn(&buf[boffset + 8..], &record_context) {
chunks_in_meta_chunk += 1;
cbytes += nbytes_chunk;
crec += nrec_chunk;
} else {
// if we are skipping this collated chunk, and it triggered a
// force_push, then undo that.
force_push = false;
}
}

// in the last iteration of the loop, we will have read all headers already
// and we are just filling up the buffer with the last chunk, and there will be no more
// headers left to read
if utils::has_data_left(&mut br).expect("encountered error reading input file") {
let (nc, nr) = Chunk::<R>::read_header(&mut br);
nbytes_chunk = nc;
nrec_chunk = nr;
} else {
last_chunk = true;
}

// determine if we should dump the current buffer to the work queue
if force_push // if we were told to push this chunk
|| // or if adding the next cell to this chunk would exceed the buffer size
((cbytes + nbytes_chunk) as usize > buf.len() && chunks_in_meta_chunk > 0)
|| // of if this was the last chunk
last_chunk
{
// launch off these cells on the queue
let mut bclone = MetaChunk::<R>::new(
first_chunk,
chunks_in_meta_chunk,
cbytes,
crec,
record_context.clone(),
buf.clone(),
);
// keep trying until we can push this payload
while let Err(t) = meta_chunk_queue.push(bclone) {
bclone = t;
// no point trying to push if the queue is full
while meta_chunk_queue.is_full() {}
}
callback
.iter_mut()
.for_each(|f| f(cbytes as u64, chunks_in_meta_chunk as u64));

// offset of the first cell in the next chunk
first_chunk += chunks_in_meta_chunk;
// reset the counters
chunks_in_meta_chunk = 0;
cbytes = 0;
crec = 0;
buf.resize(BUFSIZE, 0);
force_push = false;
}
chunk_num += 1;
}
done_var.store(true, Ordering::SeqCst);
Ok(())
}

/// This free function is used within the [ParallelRadReader] and [ParallelChunkReader] to
/// fill a work queue with [MetaChunk]s from the current file position until the end of the
/// file is reached.
Expand Down Expand Up @@ -349,6 +496,38 @@ impl<R: MappedRecord, T: BufRead + Seek> ParallelRadReader<R, T> {

pcr.start(&mut self.reader, callback)
}

/// This function starts the process of parsing the [Chunk]s of the underlying RAD
/// file into a work queue of [MetaChunk]s, which can then be consumed by multiple
/// worker threads in parallel. **Note**: This variant of the function will apply
/// the filter function `filter_fn` and the resulting iterators returned to the
/// consumer will include only chunks for which `filter_fn(chunk)` is `true`.
/// <div class="warning">
/// NOTE: This function will attempt to populate the queue until the
/// file is exhausted (all Chunks have been placed on the queue). However, to control
/// potential memory use, we use a bounded work queue. Therefore, if the queue is not being
/// emptied by workers, this function will spin endlessly waiting to put the next MetaChunk
/// on the work queue. Since this is a blocking function, be sure to have the worker threads
/// obtain a reference to the queue (via the get_queue() method) before calling this function!
/// </div>
pub fn start_chunk_parsing_filtered<FilterFn, F: FnMut(u64, u64)>(
&mut self,
filter_fn: FilterFn,
callback: Option<F>,
) -> anyhow::Result<()>
where
<R as MappedRecord>::ParsingContext: RecordContext,
<R as MappedRecord>::ParsingContext: Clone,
FilterFn: Fn(&[u8], &<R as MappedRecord>::ParsingContext) -> bool,
{
let mut pcr = ParallelChunkReader::<R> {
prelude: &self.prelude,
meta_chunk_queue: self.meta_chunk_queue.clone(),
done_var: self.done_var.clone(),
};

pcr.start_filtered(&mut self.reader, filter_fn, callback)
}
}

/// Allows reading chunks from the underlying RAD file chunks
Expand Down Expand Up @@ -432,14 +611,13 @@ impl<'a, R: MappedRecord> ParallelChunkReader<'a, R> {
Ok(())
}

/*
/// Start this [ParallelChunkReader] processing input from the [BufRead] `br`.
/// Note that this reader should be positioned at the start of the chunks for this
/// RAD file, so that the prelude and file tag values have already been parsed/consumded.
/// The provided filter will be applied at the **chunk** level, and chunks passing the filter
/// for which the filter function returns `true` will be retained; others will be
/// discarded / skipped.
pub fn start<T: BufRead, FilterF, F: FnMut(u64, u64)>(
pub fn start_filtered<T: BufRead, FilterF, F: FnMut(u64, u64)>(
&mut self,
br: T,
filter_fn: FilterF,
Expand All @@ -448,13 +626,14 @@ impl<'a, R: MappedRecord> ParallelChunkReader<'a, R> {
where
<R as MappedRecord>::ParsingContext: RecordContext,
<R as MappedRecord>::ParsingContext: Clone,
FilterF: Fn(&Chunk<R>) -> bool
FilterF: Fn(&[u8], &<R as MappedRecord>::ParsingContext) -> bool,
{
if let Some(_nchunks) = self.prelude.hdr.num_chunks() {
// fill queue known number of chunks
println!("known number of chunks");
fill_work_queue_until_eof(
fill_work_queue_filtered_until_eof(
br,
filter_fn,
callback,
self.prelude,
self.meta_chunk_queue.clone(),
Expand All @@ -463,8 +642,9 @@ impl<'a, R: MappedRecord> ParallelChunkReader<'a, R> {
} else {
// fill queue unknown
println!("unknown number of chunks");
fill_work_queue_until_eof(
fill_work_queue_filtered_until_eof(
br,
filter_fn,
callback,
self.prelude,
self.meta_chunk_queue.clone(),
Expand All @@ -473,5 +653,4 @@ impl<'a, R: MappedRecord> ParallelChunkReader<'a, R> {
}
Ok(())
}
*/
}

0 comments on commit 7bfc668

Please sign in to comment.