Skip to content

Commit

Permalink
feat: deprecated some types, made parallel readers more generic
Browse files Browse the repository at this point in the history
  • Loading branch information
Rob Patro committed Jul 12, 2024
1 parent 96a7c7a commit 5dd74da
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 27 deletions.
2 changes: 0 additions & 2 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,13 @@

use crate as libradicl;

use self::libradicl::chunk::CorrectedCbChunk;
use self::libradicl::rad_types::RadIntId;
use self::libradicl::record::AlevinFryReadRecord;
use self::libradicl::record::AtacSeqReadRecord;
use self::libradicl::schema::TempCellInfo;
#[allow(unused_imports)]
use ahash::{AHasher, RandomState};
use bio_types::strand::*;
use dashmap::DashMap;
use scroll::Pread;
use serde::{Deserialize, Serialize};

Expand Down
57 changes: 32 additions & 25 deletions src/readers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,10 +128,6 @@ where
}
}

fn until_eof<T: BufRead>(_ctr: usize, br: &mut T) -> bool {
utils::has_data_left(br).expect("encountered error reading input file")
}

/// This free function is used within the [ParallelRadReader] and [ParallelChunkReader] to
/// fill a work queue with [MetaChunk]s from the current file position until the end of the
/// file is reached. It applies the "filter" function to each chunk to determine if the chunk
Expand All @@ -153,7 +149,7 @@ fn until_eof<T: BufRead>(_ctr: usize, br: &mut T) -> bool {
/// placed
/// * `done_var` - An [AtomicBool] that will be set to true only once all of the [Chunk]s of the
/// underlying file have been read and added to the work queue.
fn fill_work_queue_filtered_until<
fn fill_work_queue_filtered<
R: MappedRecord,
T: BufRead,
ChunkIt: Iterator<Item = usize> + BufReadProvider<T> + LastChunkSignaler,
Expand Down Expand Up @@ -278,7 +274,6 @@ where
buf.resize(BUFSIZE, 0);
force_push = false;
}
//chunk_num += 1;
}
done_var.store(true, Ordering::SeqCst);
Ok(())
Expand All @@ -303,7 +298,7 @@ where
/// placed
/// * `done_var` - An [AtomicBool] that will be set to true only once all of the [Chunk]s of the
/// underlying file have been read and added to the work queue.
fn fill_work_queue_until<
fn fill_work_queue<
R: MappedRecord,
T: BufRead,
ChunkIt: Iterator<Item = usize> + BufReadProvider<T> + LastChunkSignaler,
Expand Down Expand Up @@ -419,7 +414,6 @@ where
buf.resize(BUFSIZE, 0);
force_push = false;
}
//chunk_num += 1;
}
done_var.store(true, Ordering::SeqCst);
Ok(())
Expand Down Expand Up @@ -544,15 +538,26 @@ impl<R: MappedRecord, T: BufRead + Seek> ParallelRadReader<R, T> {
}
}

/// This trait represents the behavior of being able to determine if we
/// are currently looking at the last chunk in a RAD file.
trait LastChunkSignaler {
/// Returns true if the current chunk under consideration is the
/// last chunk in a RAD file, and false otherwise.
fn is_last_chunk(&mut self) -> bool;
}

/// This trait represents the behavior of being able to provide a shared
/// or mutable reference to some type 'T' such that `T :` [BufRead].
trait BufReadProvider<T: BufRead> {
#[allow(dead_code)]
/// return a shared reference to the [BufRead]
fn get_buf_read(&self) -> &T;
/// return a mutable reference to the [BufRead]
fn get_mut_buf_read(&mut self) -> &mut T;
}

/// An iterator that will iterate over chunk IDs given the total number of
/// chunks to be parsed.
struct ChunkCountIterator<T: BufRead> {
num_chunks: usize,
current_chunk: usize,
Expand Down Expand Up @@ -584,6 +589,9 @@ impl<T: BufRead> ExactSizeIterator for ChunkCountIterator<T> {}

impl<T: BufRead> LastChunkSignaler for ChunkCountIterator<T> {
fn is_last_chunk(&mut self) -> bool {
// this is > instead of == because we have already
// incremented the chunk by the time we call this
// that is, our iteration loop is c in 0..=num_chunks
self.current_chunk > self.num_chunks
}
}
Expand All @@ -597,12 +605,9 @@ impl<T: BufRead> BufReadProvider<T> for ChunkCountIterator<T> {
}
}

impl<T: BufRead> LastChunkSignaler for ReadUntilEOFIter<T> {
fn is_last_chunk(&mut self) -> bool {
!utils::has_data_left(&mut self.buf_reader).expect("encountered error reading input file")
}
}

/// An iterator that will iterate over chunk IDs until there is
/// no more data to be parsed from the underlying [BufRead]
/// object.
struct ReadUntilEOFIter<T: BufRead> {
current_chunk: usize,
buf_reader: T,
Expand Down Expand Up @@ -633,6 +638,12 @@ impl<T: BufRead> BufReadProvider<T> for ReadUntilEOFIter<T> {
}
}

impl<T: BufRead> LastChunkSignaler for ReadUntilEOFIter<T> {
fn is_last_chunk(&mut self) -> bool {
!utils::has_data_left(&mut self.buf_reader).expect("encountered error reading input file")
}
}

/// Allows reading chunks from the underlying RAD file chunks
/// in parallel by dedicating a single thread (the one running
/// functions on this structure) to filling a work queue.
Expand Down Expand Up @@ -698,8 +709,7 @@ impl<'a, R: MappedRecord> ParallelChunkReader<'a, R> {
buf_reader: br,
};
// fill queue known number of chunks
println!("known number of chunks");
fill_work_queue_until(
fill_work_queue(
chunk_iter,
callback,
self.prelude,
Expand All @@ -711,9 +721,8 @@ impl<'a, R: MappedRecord> ParallelChunkReader<'a, R> {
current_chunk: 0,
buf_reader: br,
};
// fill queue unknown
println!("unknown number of chunks");
fill_work_queue_until(
// fill queue unknown number of chunks
fill_work_queue(
chunk_iter,
callback,
self.prelude,
Expand Down Expand Up @@ -748,9 +757,8 @@ impl<'a, R: MappedRecord> ParallelChunkReader<'a, R> {
current_chunk: 0,
buf_reader: br,
};
// fill queue known number of chunks
println!("known number of chunks");
fill_work_queue_filtered_until(
// fill queue known number of chunks filtered
fill_work_queue_filtered(
chunk_iter,
filter_fn,
callback,
Expand All @@ -763,9 +771,8 @@ impl<'a, R: MappedRecord> ParallelChunkReader<'a, R> {
current_chunk: 0,
buf_reader: br,
};
// fill queue unknown
println!("unknown number of chunks");
fill_work_queue_filtered_until(
// fill queue unknown number of chunks filtered
fill_work_queue_filtered(
chunk_iter,
filter_fn,
callback,
Expand Down

0 comments on commit 5dd74da

Please sign in to comment.