Skip to content

Commit

Permalink
Minor fixes for batch file locking
Browse files Browse the repository at this point in the history
  • Loading branch information
ethanbb committed Jun 11, 2024
1 parent 36a73d7 commit d7b1b1b
Show file tree
Hide file tree
Showing 5 changed files with 14 additions and 14 deletions.
2 changes: 1 addition & 1 deletion mesmerize_core/algorithms/cnmf.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ def run_algo(batch_path, uuid, data_path: str = None):
Yr._mmap.close() # accessing private attr but windows is annoying otherwise
move_file(fname_new, cnmf_memmap_path)

# save paths as realative path strings with forward slashes
# save paths as relative path strings with forward slashes
cnmf_hdf5_path = str(PurePosixPath(output_path.relative_to(output_dir.parent)))
cnmf_memmap_path = str(PurePosixPath(cnmf_memmap_path.relative_to(output_dir.parent)))
corr_img_path = str(PurePosixPath(corr_img_path.relative_to(output_dir.parent)))
Expand Down
2 changes: 1 addition & 1 deletion mesmerize_core/algorithms/cnmfe.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ def run_algo(batch_path, uuid, data_path: str = None):
Yr._mmap.close() # accessing private attr but windows is annoying otherwise
move_file(fname_new, cnmf_memmap_path)

# save path as realative path strings with forward slashes
# save path as relative path strings with forward slashes
cnmfe_memmap_path = str(PurePosixPath(cnmf_memmap_path.relative_to(output_dir.parent)))

d.update(
Expand Down
2 changes: 1 addition & 1 deletion mesmerize_core/algorithms/mcorr.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ def run_algo(batch_path, uuid, data_path: str = None):
shift_path = output_dir.joinpath(f"{uuid}_shifts.npy")
np.save(str(shift_path), shifts)

# save paths as realative path strings with forward slashes
# save paths as relative path strings with forward slashes
cn_path = str(PurePosixPath(cn_path.relative_to(output_dir.parent)))
mcorr_memmap_path = str(PurePosixPath(mcorr_memmap_path.relative_to(output_dir.parent)))
shift_path = str(PurePosixPath(shift_path.relative_to(output_dir.parent)))
Expand Down
18 changes: 9 additions & 9 deletions mesmerize_core/batch_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -261,8 +261,9 @@ class OverwriteError(IndexError):

class BatchLock:
"""Locks a batch file for safe writing, returning the dataframe in the target"""
def __init__(self, batch_path: Union[Path, str], *args, **kwargs):
self.lock = SoftFileLock(str(batch_path) + ".lock", *args, **kwargs)
TIMEOUT = 30 # must be consistent or else nested re-entrant locks break
def __init__(self, batch_path: Union[Path, str]):
self.lock = SoftFileLock(str(batch_path) + ".lock", is_singleton=True, timeout=self.TIMEOUT)
self.batch_path = batch_path

def __enter__(self) -> pd.DataFrame:
Expand All @@ -273,20 +274,19 @@ def __exit__(self, exc_type, exc_value, traceback):
self.lock.__exit__(exc_type, exc_value, traceback)


def open_batch_for_safe_writing(batch_path: Union[Path, str], lock_timeout: float = 30) -> BatchLock:
def open_batch_for_safe_writing(batch_path: Union[Path, str]) -> BatchLock:
"""Just a more self-documenting constructor"""
return BatchLock(batch_path, timeout=lock_timeout)
return BatchLock(batch_path)


def save_results_safely(batch_path: Union[Path, str], uuid, results: dict,
runtime: float, lock_timeout: float = 30):
def save_results_safely(batch_path: Union[Path, str], uuid, results: dict, runtime: float):
"""
Try to load the given batch and save results to the given item
Uses a file lock to ensure that no other process is writing to the same batch using this function,
which gives up after lock_timeout seconds (set to -1 to never give up)
"""
try:
with open_batch_for_safe_writing(batch_path, lock_timeout=lock_timeout) as df:
with open_batch_for_safe_writing(batch_path) as df:
# Add dictionary to output column of series
df.loc[df["uuid"] == uuid, "outputs"] = [results]
# Add ran timestamp to ran_time column of series
Expand All @@ -300,15 +300,15 @@ def save_results_safely(batch_path: Union[Path, str], uuid, results: dict,
# Print a message with details in lieu of writing to the batch file
msg = f"Batch file could not be written to"
if isinstance(e, Timeout):
msg += f" (file locked for {lock_timeout} seconds)"
msg += f" (file locked for {BatchLock.TIMEOUT} seconds)"
elif isinstance(e, OverwriteError):
msg += f" (items would be overwritten, even though file was locked)"

if results["success"]:
output_dir = Path(batch_path).parent.joinpath(str(uuid))
msg += f"\nRun succeeded; results are in {output_dir}."
else:
msg += f"Run failed. Traceback:\n"
msg += f"\nRun failed.\n"
msg += results["traceback"]

raise RuntimeError(msg)
4 changes: 2 additions & 2 deletions mesmerize_core/caiman_extensions/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,13 +132,13 @@ def add_item(self, algo: str, item_name: str, input_movie_path: Union[str, pd.Se
# Save DataFrame to disk
self.save_to_disk(max_index_diff=1)

def save_to_disk(self, max_index_diff: int = 0, lock_timeout: float = 1):
def save_to_disk(self, max_index_diff: int = 0):
"""
Saves DataFrame to disk, copies to a backup before overwriting existing file.
"""
path: Path = self._df.paths.get_batch_path()

with open_batch_for_safe_writing(path, lock_timeout=lock_timeout) as disk_df:
with open_batch_for_safe_writing(path) as disk_df:
# check that max_index_diff is not exceeded
if abs(disk_df.index.size - self._df.index.size) > max_index_diff:
raise OverwriteError(
Expand Down

0 comments on commit d7b1b1b

Please sign in to comment.