From 941b08f1a59c914c235d618153d360cee4cb80ab Mon Sep 17 00:00:00 2001 From: kramstrom Date: Fri, 12 Jul 2024 14:08:05 -0400 Subject: [PATCH] nfs uploader --- modal/cli/_download.py | 3 +-- modal/cli/network_file_system.py | 6 +++--- modal/network_file_system.py | 18 +++++++++++++----- 3 files changed, 17 insertions(+), 10 deletions(-) diff --git a/modal/cli/_download.py b/modal/cli/_download.py index c8a4d5d6e..beaac8981 100644 --- a/modal/cli/_download.py +++ b/modal/cli/_download.py @@ -9,7 +9,6 @@ from click import UsageError from modal._utils.async_utils import TaskContext -from modal.config import logger from modal.network_file_system import _NetworkFileSystem from modal.volume import FileEntry, FileEntryType, _Volume @@ -82,7 +81,7 @@ async def consumer(): async for chunk in volume.read_file(entry.path): b += fp.write(chunk) progress_cb(task_id=progress_task_id, advance=len(chunk)) - logger.debug(f"Wrote {b} bytes to {output_path}", file=sys.stderr) + print(f"Wrote {b} bytes to {output_path}", file=sys.stderr) progress_cb(task_id=progress_task_id, complete=True) elif entry.type == FileEntryType.DIRECTORY: output_path.mkdir(parents=True, exist_ok=True) diff --git a/modal/cli/network_file_system.py b/modal/cli/network_file_system.py index a90af6bd9..f9d2f3d72 100644 --- a/modal/cli/network_file_system.py +++ b/modal/cli/network_file_system.py @@ -151,9 +151,9 @@ async def put( elif "*" in local_path: raise UsageError("Glob uploads are currently not supported") else: - spinner = step_progress(f"Uploading file '{local_path}' to '{remote_path}'...") - with Live(spinner, console=console): - written_bytes = await volume.add_local_file(local_path, remote_path) + progress_handler = ProgressHandler(type="upload", console=console) + with progress_handler.live: + written_bytes = await volume.add_local_file(local_path, remote_path, progress_cb=progress_handler.progress) console.print( step_completed(f"Uploaded file '{local_path}' to '{remote_path}' ({written_bytes} bytes written)") ) diff --git a/modal/network_file_system.py b/modal/network_file_system.py index d0fdd33ea..ddf46c832 100644 --- a/modal/network_file_system.py +++ b/modal/network_file_system.py @@ -1,8 +1,9 @@ # Copyright Modal Labs 2023 +import functools import os import time from pathlib import Path, PurePosixPath -from typing import AsyncIterator, BinaryIO, List, Optional, Tuple, Type, Union +from typing import AsyncIterator, BinaryIO, Callable, List, Optional, Tuple, Type, Union import aiostream from grpclib import GRPCError, Status @@ -234,7 +235,7 @@ async def create_deployed( return resp.shared_volume_id @live_method - async def write_file(self, remote_path: str, fp: BinaryIO) -> int: + async def write_file(self, remote_path: str, fp: BinaryIO, progress_cb: Callable = None) -> int: """Write from a file object to a path on the network file system, atomically. Will create any needed parent directories automatically. @@ -247,7 +248,10 @@ async def write_file(self, remote_path: str, fp: BinaryIO) -> int: data_size = fp.tell() fp.seek(0) if data_size > LARGE_FILE_LIMIT: - blob_id = await blob_upload_file(fp, self._client.stub) + progress_task_id = progress_cb(name=remote_path, size=data_size) + blob_id = await blob_upload_file( + fp, self._client.stub, progress_report_cb=functools.partial(progress_cb, progress_task_id) + ) req = api_pb2.SharedVolumePutFileRequest( shared_volume_id=self.object_id, path=remote_path, @@ -256,6 +260,7 @@ async def write_file(self, remote_path: str, fp: BinaryIO) -> int: resumable=True, ) else: + # TODO: start task here too data = fp.read() req = api_pb2.SharedVolumePutFileRequest( shared_volume_id=self.object_id, path=remote_path, data=data, resumable=True @@ -301,7 +306,10 @@ async def iterdir(self, path: str) -> AsyncIterator[FileEntry]: @live_method async def add_local_file( - self, local_path: Union[Path, str], remote_path: Optional[Union[str, PurePosixPath, None]] = None + self, + local_path: Union[Path, str], + remote_path: Optional[Union[str, PurePosixPath, None]] = None, + progress_cb: Callable = None, ): local_path = Path(local_path) if remote_path is None: @@ -310,7 +318,7 @@ async def add_local_file( remote_path = PurePosixPath(remote_path).as_posix() with local_path.open("rb") as local_file: - return await self.write_file(remote_path, local_file) + return await self.write_file(remote_path, local_file, progress_cb=progress_cb) @live_method async def add_local_dir(