Skip to content

Commit

Permalink
Write separate "doneprocessing" files, and check "done_all_processing…
Browse files Browse the repository at this point in the history
…" as part of #548.
  • Loading branch information
donkirkby committed Mar 27, 2020
1 parent 0bcfd06 commit ec07d2f
Show file tree
Hide file tree
Showing 3 changed files with 124 additions and 15 deletions.
24 changes: 15 additions & 9 deletions micall/monitor/kive_watcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ def scan_samples(raw_data_folder, pipeline_version, sample_queue, wait):
if error_path.exists():
continue
done_path = (run_path /
f"Results/version_{pipeline_version}/doneprocessing")
f"Results/version_{pipeline_version}/done_all_processing")
if done_path.exists():
continue
base_calls_path = run_path / "Data/Intensities/BaseCalls"
Expand Down Expand Up @@ -277,6 +277,17 @@ def get_scratch_path(results_path, pipeline_group):
return scratch_path


def get_collated_path(results_path, pipeline_group):
if pipeline_group == PipelineType.MAIN:
target_path = results_path
elif pipeline_group == PipelineType.DENOVO_MAIN:
target_path = results_path / "denovo"
else:
assert pipeline_group == PipelineType.MIXED_HCV_MAIN
target_path = results_path / "mixed_hcv"
return target_path


class KiveWatcher:
def __init__(self,
config=None,
Expand Down Expand Up @@ -516,7 +527,7 @@ def check_completed_folders(self):
if (results_path / "coverage_scores.csv").exists():
self.qai_upload_queue.put(results_path)
if not folder_watcher.active_pipeline_groups:
(results_path / "done_all").touch()
(results_path / "done_all_processing").touch()
self.folder_watchers.pop(folder)
if not self.folder_watchers:
logger.info('No more folders to process.')
Expand Down Expand Up @@ -548,16 +559,11 @@ def collate_folder(self, folder_watcher, pipeline_group):
if pipeline_group == PipelineType.FILTER_QUALITY:
return results_path
scratch_path = get_scratch_path(results_path, pipeline_group)
if pipeline_group == PipelineType.MAIN:
target_path = results_path
elif pipeline_group == PipelineType.DENOVO_MAIN:
target_path = results_path / "denovo"
else:
assert pipeline_group == PipelineType.MIXED_HCV_MAIN
target_path = results_path / "mixed_hcv"
target_path = get_collated_path(results_path, pipeline_group)
logger.info('Collating results in %s', target_path)
self.copy_outputs(folder_watcher, scratch_path, target_path)
shutil.rmtree(scratch_path)
(target_path / 'doneprocessing').touch()
return results_path

def copy_outputs(self,
Expand Down
3 changes: 0 additions & 3 deletions micall/monitor/update_qai.py
Original file line number Diff line number Diff line change
Expand Up @@ -366,7 +366,6 @@ def process_folder(result_folder,
all_results_path, _ = os.path.split(os.path.normpath(result_folder))
run_path, _ = os.path.split(all_results_path)
sample_sheet_file = os.path.join(run_path, "SampleSheet.csv")
done_path = os.path.join(result_folder, 'doneprocessing')
with open(sample_sheet_file, "rU") as f:
sample_sheet = sample_sheet_parser.sample_sheet_parser(f)

Expand Down Expand Up @@ -399,8 +398,6 @@ def process_folder(result_folder,
conseqs,
session,
pipeline_version)
with open(done_path, "w"):
pass
logger.info('Upload success!')
break
except Exception:
Expand Down
112 changes: 109 additions & 3 deletions micall/tests/test_kive_watcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -463,7 +463,7 @@ def test_skip_done_runs(raw_data_with_two_runs):
done_run = raw_data_with_two_runs / "MiSeq/runs/140201_M01234"
results_path = done_run / "Results/version_0-dev"
results_path.mkdir(parents=True)
done_path = results_path / "doneprocessing"
done_path = results_path / "done_all_processing"
done_path.touch()
pipeline_version = '0-dev'
sample_queue = DummyQueueSink()
Expand All @@ -483,7 +483,8 @@ def test_skip_done_runs(raw_data_with_two_runs):
find_samples(raw_data_with_two_runs,
pipeline_version,
sample_queue,
wait=False)
wait=False,
retry=False)

sample_queue.verify()

Expand Down Expand Up @@ -2140,6 +2141,8 @@ def test_folder_completed(raw_data_with_two_samples, mock_open_kive, default_con
expected_coverage_map_path = (
results_path / "coverage_maps/2110A-V3LOOP_S13.R1_coverage.txt")
expected_mutations_path = results_path / "mutations.csv"
expected_done_path = results_path / "doneprocessing"
expected_all_done_path = results_path / "done_all_processing"
expected_resistance_path = results_path / "resistance.csv"
expected_resistance_content = """\
sample,url,n
Expand All @@ -2157,6 +2160,9 @@ def test_folder_completed(raw_data_with_two_samples, mock_open_kive, default_con
assert not expected_mutations_path.exists()
assert expected_resistance_content == expected_resistance_path.read_text()
assert expected_coverage_map_content == expected_coverage_map_path.read_bytes()
assert expected_done_path.exists()
assert expected_all_done_path.exists()
assert kive_watcher.is_idle()


def test_folder_completed_except_denovo(raw_data_with_two_samples, mock_open_kive, default_config):
Expand Down Expand Up @@ -2212,6 +2218,7 @@ def test_folder_completed_except_denovo(raw_data_with_two_samples, mock_open_kiv
sample_scratch_path.mkdir(parents=True)
denovo_scratch_path = results_path / "scratch_denovo" / "2110A-V3LOOP_S13"
expected_done_path = results_path / "doneprocessing"
expected_all_done_path = results_path / "done_all_processing"
expected_mutations_path = results_path / "mutations.csv"
expected_resistance_path = results_path / "resistance.csv"
expected_amino_path = denovo_scratch_path / "amino.csv"
Expand All @@ -2221,8 +2228,10 @@ def test_folder_completed_except_denovo(raw_data_with_two_samples, mock_open_kiv
assert not scratch_path.exists()
assert not expected_mutations_path.exists()
assert expected_resistance_path.exists()
assert not expected_done_path.exists()
assert expected_done_path.exists()
assert not expected_all_done_path.exists()
assert expected_amino_path.exists()
assert not kive_watcher.is_idle()


def test_folder_completed_with_fasta(raw_data_with_two_samples, mock_open_kive, default_config):
Expand Down Expand Up @@ -2608,3 +2617,100 @@ def test_calculate_retry_wait():
assert timedelta(minutes=9) == calculate_retry_wait(min_wait,
max_wait,
attempt_count=10000)


def test_collate_main_results(raw_data_with_two_samples, default_config, mock_open_kive):
run_folder = raw_data_with_two_samples / "MiSeq/runs/140101_M01234"
base_calls = run_folder / "Data/Intensities/BaseCalls"
results_path = run_folder / "Results"
results_path.mkdir(parents=True)
version_folder: Path = results_path / 'version_0-dev'
version_folder.mkdir()

sample1_scratch = version_folder / "scratch" / "2120A-PR_S14"
sample1_scratch.mkdir(parents=True)
(sample1_scratch / "cascade.csv").write_text("col1,col2\nval1.1,val2.1\n")
sample2_scratch = version_folder / "scratch" / "2110A-V3LOOP_S13"
sample2_scratch.mkdir(parents=True)
(sample2_scratch / "cascade.csv").write_text("col1,col2\nval1.2,val2.2\n")

expected_cascade_path = version_folder / "cascade.csv"
expected_cascade_text = "sample,col1,col2\n2120A-PR_S14,val1.1,val2.1\n2110A-V3LOOP_S13,val1.2,val2.2\n"
expected_done_path = version_folder / "doneprocessing"

denovo_scratch_path = version_folder / "scratch_denovo"
denovo_scratch_path.mkdir()

kive_watcher = KiveWatcher(default_config)
folder_watcher = kive_watcher.add_folder(base_calls)
kive_watcher.add_sample_group(base_calls, SampleGroup('2120A', ('2120A-PR_S14_L001_R1_001.fastq.gz', None)))
kive_watcher.add_sample_group(base_calls, SampleGroup('2110A', ('2110A-V3LOOP_S13_L001_R1_001.fastq.gz', None)))

kive_watcher.collate_folder(folder_watcher, PipelineType.MAIN)

cascade_text = expected_cascade_path.read_text()
assert cascade_text == expected_cascade_text
assert expected_done_path.exists()
assert denovo_scratch_path.exists()


def test_collate_denovo_results(raw_data_with_two_samples, default_config, mock_open_kive):
run_folder = raw_data_with_two_samples / "MiSeq/runs/140101_M01234"
base_calls = run_folder / "Data/Intensities/BaseCalls"
results_path = run_folder / "Results"
results_path.mkdir(parents=True)
version_folder: Path = results_path / 'version_0-dev'
version_folder.mkdir()

sample1_scratch = version_folder / "scratch_denovo" / "2120A-PR_S14"
sample1_scratch.mkdir(parents=True)
(sample1_scratch / "cascade.csv").write_text("col1,col2\n")
sample2_scratch = version_folder / "scratch_denovo" / "2110A-V3LOOP_S13"
sample2_scratch.mkdir(parents=True)
(sample2_scratch / "cascade.csv").write_text("col1,col2\n")

expected_cascade_path = version_folder / "denovo" / "cascade.csv"
expected_done_path = version_folder / "denovo" / "doneprocessing"

main_scratch_path = version_folder / "scratch"
main_scratch_path.mkdir()

kive_watcher = KiveWatcher(default_config)
folder_watcher = kive_watcher.add_folder(base_calls)
kive_watcher.add_sample_group(base_calls, SampleGroup('2120A', ('2120A-PR_S14_L001_R1_001.fastq.gz', None)))
kive_watcher.add_sample_group(base_calls, SampleGroup('2110A', ('2110A-V3LOOP_S13_L001_R1_001.fastq.gz', None)))

kive_watcher.collate_folder(folder_watcher, PipelineType.DENOVO_MAIN)

assert expected_cascade_path.exists()
assert expected_done_path.exists()
assert main_scratch_path.exists()


def test_collate_mixed_hcv_results(raw_data_with_two_samples, default_config, mock_open_kive):
run_folder = raw_data_with_two_samples / "MiSeq/runs/140101_M01234"
base_calls = run_folder / "Data/Intensities/BaseCalls"
results_path = run_folder / "Results"
results_path.mkdir(parents=True)
version_folder: Path = results_path / 'version_0-dev'
version_folder.mkdir()

sample1_scratch = version_folder / "scratch_mixed_hcv" / "2120A-PR_S14"
sample1_scratch.mkdir(parents=True)
(sample1_scratch / "mixed_counts.csv").write_text("col1,col2\n")
sample2_scratch = version_folder / "scratch_mixed_hcv" / "2110A-V3LOOP_S13"
sample2_scratch.mkdir(parents=True)
(sample2_scratch / "mixed_counts.csv").write_text("col1,col2\n")

expected_cascade_path = version_folder / "mixed_hcv" / "mixed_counts.csv"
expected_done_path = version_folder / "mixed_hcv" / "doneprocessing"

kive_watcher = KiveWatcher(default_config)
folder_watcher = kive_watcher.add_folder(base_calls)
kive_watcher.add_sample_group(base_calls, SampleGroup('2120A', ('2120A-PR_S14_L001_R1_001.fastq.gz', None)))
kive_watcher.add_sample_group(base_calls, SampleGroup('2110A', ('2110A-V3LOOP_S13_L001_R1_001.fastq.gz', None)))

kive_watcher.collate_folder(folder_watcher, PipelineType.MIXED_HCV_MAIN)

assert expected_cascade_path.exists()
assert expected_done_path.exists()

0 comments on commit ec07d2f

Please sign in to comment.