Skip to content

Commit

Permalink
nfs uploader
Browse files Browse the repository at this point in the history
  • Loading branch information
kramstrom committed Jul 12, 2024
1 parent 8a99dad commit 941b08f
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 10 deletions.
3 changes: 1 addition & 2 deletions modal/cli/_download.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
from click import UsageError

from modal._utils.async_utils import TaskContext
from modal.config import logger
from modal.network_file_system import _NetworkFileSystem
from modal.volume import FileEntry, FileEntryType, _Volume

Expand Down Expand Up @@ -82,7 +81,7 @@ async def consumer():
async for chunk in volume.read_file(entry.path):
b += fp.write(chunk)
progress_cb(task_id=progress_task_id, advance=len(chunk))
logger.debug(f"Wrote {b} bytes to {output_path}", file=sys.stderr)
print(f"Wrote {b} bytes to {output_path}", file=sys.stderr)
progress_cb(task_id=progress_task_id, complete=True)
elif entry.type == FileEntryType.DIRECTORY:
output_path.mkdir(parents=True, exist_ok=True)
Expand Down
6 changes: 3 additions & 3 deletions modal/cli/network_file_system.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,9 +151,9 @@ async def put(
elif "*" in local_path:
raise UsageError("Glob uploads are currently not supported")
else:
spinner = step_progress(f"Uploading file '{local_path}' to '{remote_path}'...")
with Live(spinner, console=console):
written_bytes = await volume.add_local_file(local_path, remote_path)
progress_handler = ProgressHandler(type="upload", console=console)
with progress_handler.live:
written_bytes = await volume.add_local_file(local_path, remote_path, progress_cb=progress_handler.progress)
console.print(
step_completed(f"Uploaded file '{local_path}' to '{remote_path}' ({written_bytes} bytes written)")
)
Expand Down
18 changes: 13 additions & 5 deletions modal/network_file_system.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
# Copyright Modal Labs 2023
import functools
import os
import time
from pathlib import Path, PurePosixPath
from typing import AsyncIterator, BinaryIO, List, Optional, Tuple, Type, Union
from typing import AsyncIterator, BinaryIO, Callable, List, Optional, Tuple, Type, Union

import aiostream
from grpclib import GRPCError, Status
Expand Down Expand Up @@ -234,7 +235,7 @@ async def create_deployed(
return resp.shared_volume_id

@live_method
async def write_file(self, remote_path: str, fp: BinaryIO) -> int:
async def write_file(self, remote_path: str, fp: BinaryIO, progress_cb: Callable = None) -> int:
"""Write from a file object to a path on the network file system, atomically.
Will create any needed parent directories automatically.
Expand All @@ -247,7 +248,10 @@ async def write_file(self, remote_path: str, fp: BinaryIO) -> int:
data_size = fp.tell()
fp.seek(0)
if data_size > LARGE_FILE_LIMIT:
blob_id = await blob_upload_file(fp, self._client.stub)
progress_task_id = progress_cb(name=remote_path, size=data_size)
blob_id = await blob_upload_file(
fp, self._client.stub, progress_report_cb=functools.partial(progress_cb, progress_task_id)
)
req = api_pb2.SharedVolumePutFileRequest(
shared_volume_id=self.object_id,
path=remote_path,
Expand All @@ -256,6 +260,7 @@ async def write_file(self, remote_path: str, fp: BinaryIO) -> int:
resumable=True,
)
else:
# TODO: start task here too
data = fp.read()
req = api_pb2.SharedVolumePutFileRequest(
shared_volume_id=self.object_id, path=remote_path, data=data, resumable=True
Expand Down Expand Up @@ -301,7 +306,10 @@ async def iterdir(self, path: str) -> AsyncIterator[FileEntry]:

@live_method
async def add_local_file(
self, local_path: Union[Path, str], remote_path: Optional[Union[str, PurePosixPath, None]] = None
self,
local_path: Union[Path, str],
remote_path: Optional[Union[str, PurePosixPath, None]] = None,
progress_cb: Callable = None,
):
local_path = Path(local_path)
if remote_path is None:
Expand All @@ -310,7 +318,7 @@ async def add_local_file(
remote_path = PurePosixPath(remote_path).as_posix()

with local_path.open("rb") as local_file:
return await self.write_file(remote_path, local_file)
return await self.write_file(remote_path, local_file, progress_cb=progress_cb)

@live_method
async def add_local_dir(
Expand Down

0 comments on commit 941b08f

Please sign in to comment.