diff --git a/oceanstream/process/file_processor.py b/oceanstream/process/file_processor.py index 920b203..beabbfb 100644 --- a/oceanstream/process/file_processor.py +++ b/oceanstream/process/file_processor.py @@ -239,7 +239,7 @@ def convert_raw_file(file_path, config_data, base_path=None, progress_queue=None file_location = file_name store = _get_chunk_store(config_data['cloud_storage'], file_location) echodata.to_zarr(save_path=store, overwrite=True, parallel=False) - output_zarr_path = store + output_dest = config_data['cloud_storage']['container_name'] + "/" + file_location else: if relative_path: @@ -248,17 +248,15 @@ def convert_raw_file(file_path, config_data, base_path=None, progress_queue=None else: output_path = Path(config_data["output_folder"]) - output_zarr_path = output_path / file_name - output_dest = output_zarr_path - echodata.to_zarr(save_path=output_zarr_path, overwrite=True, parallel=False) + output_dest = output_path / file_name + echodata.to_zarr(save_path=output_dest, overwrite=True, parallel=False) if progress_queue: progress_queue.put(file_path) - print( - f"[blue]✅ Converted raw file {file_path} to Zarr and wrote output to: {output_dest} [/blue]") + logging.debug("Finished processing of file: %s", file_path) - return output_zarr_path + return output_dest def write_zarr_file(zarr_path, zarr_file_name, ds_processed, config_data=None, output_path=None): @@ -274,8 +272,17 @@ def _get_chunk_store(storage_config, path): from oceanstream.process.azure.blob_storage import get_azfs azfs = get_azfs(storage_config) + container_name = storage_config['container_name'] + + if not azfs.exists(container_name): + try: + azfs.mkdir(container_name) + except Exception as e: + logging.error(f"Error creating container {container_name}: {e}") + raise + if azfs: - return azfs.get_mapper(f"{storage_config['container_name']}/{path}") + return azfs.get_mapper(f"{container_name}/{path}") raise ValueError(f"Unsupported storage type: {storage_config['storage_type']}") diff --git a/oceanstream/process/folder_processor.py b/oceanstream/process/folder_processor.py index 1c92024..2a70b03 100644 --- a/oceanstream/process/folder_processor.py +++ b/oceanstream/process/folder_processor.py @@ -21,6 +21,7 @@ from rich.progress import Progress, TimeElapsedColumn, BarColumn, TextColumn from oceanstream.plot import plot_sv_data from oceanstream.echodata import get_campaign_metadata, read_file +from tqdm.auto import tqdm from .combine_zarr import read_zarr_files from .process import compute_sv @@ -147,6 +148,14 @@ def find_raw_files(base_dir): return raw_files +def update_convert_raw(pgr_queue, pb): + while True: + item = pgr_queue.get() + if item is None: + break + pb.update(item) + + def convert_raw_files(config_data, workers_count=os.cpu_count()): global pool @@ -163,7 +172,7 @@ def convert_raw_files(config_data, workers_count=os.cpu_count()): logging.error("No raw files found in directory: %s", dir_path) return - logging.debug("Found %d raw files in directory: %s", len(raw_files), dir_path) + print(f"Found {len(raw_files)} raw files in directory.") for file in raw_files: creation_time = from_filename(file) @@ -178,10 +187,9 @@ def convert_raw_files(config_data, workers_count=os.cpu_count()): filtered_file_info.sort(key=lambda x: x[1]) sorted_files = [file for file, _ in filtered_file_info] - logging.debug("Sorted files by creation time") campaign_id, date, sonar_model, metadata, _ = get_campaign_metadata(sorted_files[0]) - if config_data['sonar_model'] is None: + if sonar_model is None and config_data['sonar_model'] is None: config_data['sonar_model'] = sonar_model with Manager() as manager: @@ -192,7 +200,6 @@ def convert_raw_files(config_data, workers_count=os.cpu_count()): process_func = partial(convert_raw_file, config_data=config_data, progress_queue=progress_queue, base_path=dir_path) - # Run the progress updater in a separate process progress_updater = Process(target=update_progress, args=(progress_queue, len(sorted_files), log_level)) progress_updater.start() @@ -229,7 +236,6 @@ def process_single_zarr_file(file_path, config_data, base_path=None, chunks=None def process_zarr_files(config_data, client, workers_count=None, chunks=None, plot_echogram=False, waveform_mode="CW", depth_offset=0): - from tqdm.auto import tqdm dir_path = config_data['raw_path'] zarr_files = read_zarr_files(dir_path) @@ -295,7 +301,6 @@ def worker(): def export_location_from_zarr_files(config_data, client=None, workers_count=os.cpu_count(), chunks=None): - from tqdm.auto import tqdm semaphore = Semaphore(max_leases=workers_count) tasks = []