From bfc10f580bae33dbf6c3f890fb0bcdc2e92315a6 Mon Sep 17 00:00:00 2001 From: Erik Bernhardsson Date: Fri, 12 Jul 2024 18:00:51 +0000 Subject: [PATCH 1/2] Add programmatic retrieval of image logs --- modal/image.py | 18 ++++++++++++++++-- test/conftest.py | 5 +++-- test/image_test.py | 12 ++++++++++++ 3 files changed, 31 insertions(+), 4 deletions(-) diff --git a/modal/image.py b/modal/image.py index 610fbb0fc..39b462fb0 100644 --- a/modal/image.py +++ b/modal/image.py @@ -9,7 +9,7 @@ from dataclasses import dataclass from inspect import isfunction from pathlib import Path, PurePosixPath -from typing import Any, Callable, Dict, List, Literal, Optional, Sequence, Set, Tuple, Union, get_args +from typing import Any, AsyncGenerator, Callable, Dict, List, Literal, Optional, Sequence, Set, Tuple, Union, get_args from google.protobuf.message import Message from grpclib.exceptions import GRPCError, StreamTerminatedError @@ -28,7 +28,7 @@ from .gpu import GPU_T, parse_gpu_config from .mount import _Mount, python_standalone_mount_name from .network_file_system import _NetworkFileSystem -from .object import _Object +from .object import _Object, live_method_gen from .secret import _Secret from .volume import _Volume @@ -1675,5 +1675,19 @@ def run_inside(self): """ deprecation_error((2023, 12, 15), Image.run_inside.__doc__) + @live_method_gen + async def logs(self) -> AsyncGenerator[str, None]: + last_entry_id: Optional[str] = None + + request = api_pb2.ImageJoinStreamingRequest(image_id=self._object_id, timeout=55, last_entry_id=last_entry_id) + async for response in unary_stream(self._client.stub.ImageJoinStreaming, request): + if response.result.status: + return + if response.entry_id: + last_entry_id = response.entry_id + for task_log in response.task_logs: + if task_log.data: + yield task_log.data + Image = synchronize_api(_Image) diff --git a/test/conftest.py b/test/conftest.py index 59905acf1..00be2f2ac 100644 --- a/test/conftest.py +++ b/test/conftest.py @@ -853,13 +853,14 @@ async def ImageJoinStreaming(self, stream): if self.image_join_sleep_duration is not None: await asyncio.sleep(self.image_join_sleep_duration) - task_log_1 = api_pb2.TaskLogs(data="hello, world\n", file_descriptor=api_pb2.FILE_DESCRIPTOR_INFO) + task_log_1 = api_pb2.TaskLogs(data="build starting\n", file_descriptor=api_pb2.FILE_DESCRIPTOR_INFO) task_log_2 = api_pb2.TaskLogs( task_progress=api_pb2.TaskProgress( len=1, pos=0, progress_type=api_pb2.IMAGE_SNAPSHOT_UPLOAD, description="xyz" ) ) - await stream.send_message(api_pb2.ImageJoinStreamingResponse(task_logs=[task_log_1, task_log_2])) + task_log_3 = api_pb2.TaskLogs(data="build finished\n", file_descriptor=api_pb2.FILE_DESCRIPTOR_INFO) + await stream.send_message(api_pb2.ImageJoinStreamingResponse(task_logs=[task_log_1, task_log_2, task_log_3])) await stream.send_message( api_pb2.ImageJoinStreamingResponse( result=api_pb2.GenericResult(status=api_pb2.GenericResult.GENERIC_STATUS_SUCCESS) diff --git a/test/image_test.py b/test/image_test.py index 91be27078..233cc33ed 100644 --- a/test/image_test.py +++ b/test/image_test.py @@ -1120,3 +1120,15 @@ async def MockImageJoinStreaming(self, stream): ctx.set_responder("ImageJoinStreaming", MockImageJoinStreaming) with parallel_app.run(client=client): pass + + +@pytest.mark.asyncio +async def test_logs(servicer, client): + app = App() + image = Image.debian_slim().pip_install("foobarbaz") + app.function(image=image)(dummy) + async with app.run.aio(client=client): + pass + + logs = [data async for data in image.logs.aio()] + assert logs == ["build starting\n", "build finished\n"] From 71e429a350355013f914ea59f777f928a576d8e4 Mon Sep 17 00:00:00 2001 From: Erik Bernhardsson Date: Fri, 12 Jul 2024 18:23:23 +0000 Subject: [PATCH 2/2] Set bool on proto that we want all logs --- modal/image.py | 4 +++- modal_proto/api.proto | 1 + 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/modal/image.py b/modal/image.py index 39b462fb0..27c2a3fc5 100644 --- a/modal/image.py +++ b/modal/image.py @@ -1679,7 +1679,9 @@ def run_inside(self): async def logs(self) -> AsyncGenerator[str, None]: last_entry_id: Optional[str] = None - request = api_pb2.ImageJoinStreamingRequest(image_id=self._object_id, timeout=55, last_entry_id=last_entry_id) + request = api_pb2.ImageJoinStreamingRequest( + image_id=self._object_id, timeout=55, last_entry_id=last_entry_id, include_logs_for_finished=True + ) async for response in unary_stream(self._client.stub.ImageJoinStreaming, request): if response.result.status: return diff --git a/modal_proto/api.proto b/modal_proto/api.proto index 2fc0a6020..44ccf773e 100644 --- a/modal_proto/api.proto +++ b/modal_proto/api.proto @@ -1359,6 +1359,7 @@ message ImageJoinStreamingRequest { string image_id = 1; float timeout = 2; string last_entry_id = 3; + bool include_logs_for_finished = 4; } message ImageJoinStreamingResponse {