Skip to content

Commit

Permalink
updated convert to zarr processing
Browse files Browse the repository at this point in the history
  • Loading branch information
beatfactor committed Aug 11, 2024
1 parent 25c8dee commit f42f70b
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 14 deletions.
23 changes: 15 additions & 8 deletions oceanstream/process/file_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ def convert_raw_file(file_path, config_data, base_path=None, progress_queue=None
file_location = file_name
store = _get_chunk_store(config_data['cloud_storage'], file_location)
echodata.to_zarr(save_path=store, overwrite=True, parallel=False)
output_zarr_path = store

output_dest = config_data['cloud_storage']['container_name'] + "/" + file_location
else:
if relative_path:
Expand All @@ -248,17 +248,15 @@ def convert_raw_file(file_path, config_data, base_path=None, progress_queue=None
else:
output_path = Path(config_data["output_folder"])

output_zarr_path = output_path / file_name
output_dest = output_zarr_path
echodata.to_zarr(save_path=output_zarr_path, overwrite=True, parallel=False)
output_dest = output_path / file_name
echodata.to_zarr(save_path=output_dest, overwrite=True, parallel=False)

if progress_queue:
progress_queue.put(file_path)

print(
f"[blue]✅ Converted raw file {file_path} to Zarr and wrote output to: {output_dest} [/blue]")
logging.debug("Finished processing of file: %s", file_path)

return output_zarr_path
return output_dest


def write_zarr_file(zarr_path, zarr_file_name, ds_processed, config_data=None, output_path=None):
Expand All @@ -274,8 +272,17 @@ def _get_chunk_store(storage_config, path):
from oceanstream.process.azure.blob_storage import get_azfs
azfs = get_azfs(storage_config)

container_name = storage_config['container_name']

if not azfs.exists(container_name):
try:
azfs.mkdir(container_name)
except Exception as e:
logging.error(f"Error creating container {container_name}: {e}")
raise

if azfs:
return azfs.get_mapper(f"{storage_config['container_name']}/{path}")
return azfs.get_mapper(f"{container_name}/{path}")

raise ValueError(f"Unsupported storage type: {storage_config['storage_type']}")

Expand Down
17 changes: 11 additions & 6 deletions oceanstream/process/folder_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from rich.progress import Progress, TimeElapsedColumn, BarColumn, TextColumn
from oceanstream.plot import plot_sv_data
from oceanstream.echodata import get_campaign_metadata, read_file
from tqdm.auto import tqdm

from .combine_zarr import read_zarr_files
from .process import compute_sv
Expand Down Expand Up @@ -147,6 +148,14 @@ def find_raw_files(base_dir):
return raw_files


def update_convert_raw(pgr_queue, pb):
while True:
item = pgr_queue.get()
if item is None:
break
pb.update(item)


def convert_raw_files(config_data, workers_count=os.cpu_count()):
global pool

Expand All @@ -163,7 +172,7 @@ def convert_raw_files(config_data, workers_count=os.cpu_count()):
logging.error("No raw files found in directory: %s", dir_path)
return

logging.debug("Found %d raw files in directory: %s", len(raw_files), dir_path)
print(f"Found {len(raw_files)} raw files in directory.")

for file in raw_files:
creation_time = from_filename(file)
Expand All @@ -178,10 +187,9 @@ def convert_raw_files(config_data, workers_count=os.cpu_count()):
filtered_file_info.sort(key=lambda x: x[1])

sorted_files = [file for file, _ in filtered_file_info]
logging.debug("Sorted files by creation time")

campaign_id, date, sonar_model, metadata, _ = get_campaign_metadata(sorted_files[0])
if config_data['sonar_model'] is None:
if sonar_model is None and config_data['sonar_model'] is None:
config_data['sonar_model'] = sonar_model

with Manager() as manager:
Expand All @@ -192,7 +200,6 @@ def convert_raw_files(config_data, workers_count=os.cpu_count()):
process_func = partial(convert_raw_file, config_data=config_data, progress_queue=progress_queue,
base_path=dir_path)

# Run the progress updater in a separate process
progress_updater = Process(target=update_progress, args=(progress_queue, len(sorted_files), log_level))
progress_updater.start()

Expand Down Expand Up @@ -229,7 +236,6 @@ def process_single_zarr_file(file_path, config_data, base_path=None, chunks=None


def process_zarr_files(config_data, client, workers_count=None, chunks=None, plot_echogram=False, waveform_mode="CW", depth_offset=0):
from tqdm.auto import tqdm
dir_path = config_data['raw_path']
zarr_files = read_zarr_files(dir_path)

Expand Down Expand Up @@ -295,7 +301,6 @@ def worker():


def export_location_from_zarr_files(config_data, client=None, workers_count=os.cpu_count(), chunks=None):
from tqdm.auto import tqdm
semaphore = Semaphore(max_leases=workers_count)

tasks = []
Expand Down

0 comments on commit f42f70b

Please sign in to comment.