Skip to content

Commit

Permalink
updated cloud storage connection
Browse files Browse the repository at this point in the history
  • Loading branch information
beatfactor committed Aug 10, 2024
1 parent bfa2091 commit 25c8dee
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 12 deletions.
13 changes: 8 additions & 5 deletions oceanstream/process/azure/blob_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,16 +28,19 @@ def list_zarr_files(path, azfs=None):
return zarr_files


def get_azfs():
def get_azfs(storage_config=None):
"""Get the Azure Blob Storage filesystem object using the connection string from environment variables."""
connection_string = os.getenv('AZURE_STORAGE_CONNECTION_STRING')

if not connection_string:
return None
if connection_string:
azfs = AzureBlobFileSystem(connection_string=connection_string)
return azfs

azfs = AzureBlobFileSystem(connection_string=connection_string)
if storage_config and storage_config['storage_type'] == 'azure':
azfs = AzureBlobFileSystem(**storage_config['storage_options'])
return azfs

return azfs
return None


def open_zarr_store(store_name, azfs=None, chunks=None):
Expand Down
12 changes: 5 additions & 7 deletions oceanstream/process/file_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
import traceback
import shutil
from asyncio import CancelledError

import echopype as ep

import xarray as xr
Expand Down Expand Up @@ -221,7 +220,6 @@ async def process_raw_file_with_progress(config_data, plot_echogram, waveform_mo
def convert_raw_file(file_path, config_data, base_path=None, progress_queue=None):
logging.debug("Starting processing of file: %s", file_path)


file_path_obj = Path(file_path)
file_config_data = {**config_data, 'raw_path': file_path_obj}

Expand Down Expand Up @@ -273,13 +271,13 @@ def write_zarr_file(zarr_path, zarr_file_name, ds_processed, config_data=None, o


def _get_chunk_store(storage_config, path):
if storage_config['storage_type'] == 'azure':
from adlfs import AzureBlobFileSystem
azfs = AzureBlobFileSystem(**storage_config['storage_options'])
from oceanstream.process.azure.blob_storage import get_azfs
azfs = get_azfs(storage_config)

if azfs:
return azfs.get_mapper(f"{storage_config['container_name']}/{path}")
else:
raise ValueError(f"Unsupported storage type: {storage_config['storage_type']}")

raise ValueError(f"Unsupported storage type: {storage_config['storage_type']}")


def _get_chunk_sizes(var_dims, chunk_sizes):
Expand Down

0 comments on commit 25c8dee

Please sign in to comment.