Skip to content

Commit

Permalink
updated convert to zarr processing
Browse files Browse the repository at this point in the history
  • Loading branch information
beatfactor committed Aug 11, 2024
1 parent f42f70b commit f7176e5
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 10 deletions.
6 changes: 3 additions & 3 deletions oceanstream/process/file_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ async def process_raw_file_with_progress(config_data, plot_echogram, waveform_mo
logging.exception(f"Error processing file {config_data['raw_path']}: {e}")


def convert_raw_file(file_path, config_data, base_path=None, progress_queue=None):
def convert_raw_file(file_path, config_data, base_path=None, progress_counter=None, counter_lock=None):
logging.debug("Starting processing of file: %s", file_path)

file_path_obj = Path(file_path)
Expand Down Expand Up @@ -251,8 +251,8 @@ def convert_raw_file(file_path, config_data, base_path=None, progress_queue=None
output_dest = output_path / file_name
echodata.to_zarr(save_path=output_dest, overwrite=True, parallel=False)

if progress_queue:
progress_queue.put(file_path)
with counter_lock:
progress_counter.value += 1

logging.debug("Finished processing of file: %s", file_path)

Expand Down
25 changes: 18 additions & 7 deletions oceanstream/process/folder_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,28 +192,39 @@ def convert_raw_files(config_data, workers_count=os.cpu_count()):
if sonar_model is None and config_data['sonar_model'] is None:
config_data['sonar_model'] = sonar_model

progress_bar = tqdm(total=len(sorted_files), desc="Processing Files", unit="file", ncols=100)

with Manager() as manager:
progress_counter = manager.Value('i', 0)
counter_lock = manager.Lock()
progress_queue = manager.Queue()
pool = Pool(processes=workers_count)

# Partial function with config_data, progress_queue and other arguments
process_func = partial(convert_raw_file, config_data=config_data, progress_queue=progress_queue,
base_path=dir_path)

progress_updater = Process(target=update_progress, args=(progress_queue, len(sorted_files), log_level))
progress_updater.start()
process_func = partial(convert_raw_file, config_data=config_data, base_path=dir_path,
progress_counter=progress_counter, counter_lock=counter_lock)

for file in sorted_files:
pool.apply_async(process_func, args=(file,))
# print("Processing file: ", file)
logging.debug("Started async processing for file: %s", file)

pool.close()

while progress_counter.value < len(sorted_files):
with counter_lock:
progress_bar.n = progress_counter.value
progress_bar.refresh()
pool.join()

with counter_lock:
progress_bar.n = progress_counter.value
progress_bar.refresh()
print(f"[green]✅ All files have been converted.[/green]")

# Wait for the progress updater to finish
progress_queue.put(None) # Signal the progress updater to finish
progress_updater.join()

progress_bar.close()
except KeyboardInterrupt:
logging.info("KeyboardInterrupt received, terminating processes...")
if pool:
Expand Down

0 comments on commit f7176e5

Please sign in to comment.