diff --git a/src/lib.rs b/src/lib.rs index 30160c9..070999c 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -31,7 +31,6 @@ use crate as libradicl; -use self::libradicl::chunk::CorrectedCbChunk; use self::libradicl::rad_types::RadIntId; use self::libradicl::record::AlevinFryReadRecord; use self::libradicl::record::AtacSeqReadRecord; @@ -39,7 +38,6 @@ use self::libradicl::schema::TempCellInfo; #[allow(unused_imports)] use ahash::{AHasher, RandomState}; use bio_types::strand::*; -use dashmap::DashMap; use scroll::Pread; use serde::{Deserialize, Serialize}; diff --git a/src/readers.rs b/src/readers.rs index bc10e9a..882fe59 100644 --- a/src/readers.rs +++ b/src/readers.rs @@ -128,10 +128,6 @@ where } } -fn until_eof(_ctr: usize, br: &mut T) -> bool { - utils::has_data_left(br).expect("encountered error reading input file") -} - /// This free function is used within the [ParallelRadReader] and [ParallelChunkReader] to /// fill a work queue with [MetaChunk]s from the current file position until the end of the /// file is reached. It applies the "filter" function to each chunk to determine if the chunk @@ -153,7 +149,7 @@ fn until_eof(_ctr: usize, br: &mut T) -> bool { /// placed /// * `done_var` - An [AtomicBool] that will be set to true only once all of the [Chunk]s of the /// underlying file have been read and added to the work queue. -fn fill_work_queue_filtered_until< +fn fill_work_queue_filtered< R: MappedRecord, T: BufRead, ChunkIt: Iterator + BufReadProvider + LastChunkSignaler, @@ -278,7 +274,6 @@ where buf.resize(BUFSIZE, 0); force_push = false; } - //chunk_num += 1; } done_var.store(true, Ordering::SeqCst); Ok(()) @@ -303,7 +298,7 @@ where /// placed /// * `done_var` - An [AtomicBool] that will be set to true only once all of the [Chunk]s of the /// underlying file have been read and added to the work queue. -fn fill_work_queue_until< +fn fill_work_queue< R: MappedRecord, T: BufRead, ChunkIt: Iterator + BufReadProvider + LastChunkSignaler, @@ -419,7 +414,6 @@ where buf.resize(BUFSIZE, 0); force_push = false; } - //chunk_num += 1; } done_var.store(true, Ordering::SeqCst); Ok(()) @@ -544,15 +538,26 @@ impl ParallelRadReader { } } +/// This trait represents the behavior of being able to determine if we +/// are currently looking at the last chunk in a RAD file. trait LastChunkSignaler { + /// Returns true if the current chunk under consideration is the + /// last chunk in a RAD file, and false otherwise. fn is_last_chunk(&mut self) -> bool; } +/// This trait represents the behavior of being able to provide a shared +/// or mutable reference to some type 'T' such that `T :` [BufRead]. trait BufReadProvider { + #[allow(dead_code)] + /// return a shared reference to the [BufRead] fn get_buf_read(&self) -> &T; + /// return a mutable reference to the [BufRead] fn get_mut_buf_read(&mut self) -> &mut T; } +/// An iterator that will iterate over chunk IDs given the total number of +/// chunks to be parsed. struct ChunkCountIterator { num_chunks: usize, current_chunk: usize, @@ -584,6 +589,9 @@ impl ExactSizeIterator for ChunkCountIterator {} impl LastChunkSignaler for ChunkCountIterator { fn is_last_chunk(&mut self) -> bool { + // this is > instead of == because we have already + // incremented the chunk by the time we call this + // that is, our iteration loop is c in 0..=num_chunks self.current_chunk > self.num_chunks } } @@ -597,12 +605,9 @@ impl BufReadProvider for ChunkCountIterator { } } -impl LastChunkSignaler for ReadUntilEOFIter { - fn is_last_chunk(&mut self) -> bool { - !utils::has_data_left(&mut self.buf_reader).expect("encountered error reading input file") - } -} - +/// An iterator that will iterate over chunk IDs until there is +/// no more data to be parsed from the underlying [BufRead] +/// object. struct ReadUntilEOFIter { current_chunk: usize, buf_reader: T, @@ -633,6 +638,12 @@ impl BufReadProvider for ReadUntilEOFIter { } } +impl LastChunkSignaler for ReadUntilEOFIter { + fn is_last_chunk(&mut self) -> bool { + !utils::has_data_left(&mut self.buf_reader).expect("encountered error reading input file") + } +} + /// Allows reading chunks from the underlying RAD file chunks /// in parallel by dedicating a single thread (the one running /// functions on this structure) to filling a work queue. @@ -698,8 +709,7 @@ impl<'a, R: MappedRecord> ParallelChunkReader<'a, R> { buf_reader: br, }; // fill queue known number of chunks - println!("known number of chunks"); - fill_work_queue_until( + fill_work_queue( chunk_iter, callback, self.prelude, @@ -711,9 +721,8 @@ impl<'a, R: MappedRecord> ParallelChunkReader<'a, R> { current_chunk: 0, buf_reader: br, }; - // fill queue unknown - println!("unknown number of chunks"); - fill_work_queue_until( + // fill queue unknown number of chunks + fill_work_queue( chunk_iter, callback, self.prelude, @@ -748,9 +757,8 @@ impl<'a, R: MappedRecord> ParallelChunkReader<'a, R> { current_chunk: 0, buf_reader: br, }; - // fill queue known number of chunks - println!("known number of chunks"); - fill_work_queue_filtered_until( + // fill queue known number of chunks filtered + fill_work_queue_filtered( chunk_iter, filter_fn, callback, @@ -763,9 +771,8 @@ impl<'a, R: MappedRecord> ParallelChunkReader<'a, R> { current_chunk: 0, buf_reader: br, }; - // fill queue unknown - println!("unknown number of chunks"); - fill_work_queue_filtered_until( + // fill queue unknown number of chunks filtered + fill_work_queue_filtered( chunk_iter, filter_fn, callback,