Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🎨 publish port events to frontend #6396

Open
wants to merge 14 commits into
base: master
Choose a base branch
from
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
from enum import StrEnum, auto

from models_library.projects import ProjectID
from models_library.projects_nodes_io import NodeID
from models_library.services_types import ServicePortKey
from pydantic import BaseModel


class OutputStatus(StrEnum):
GitHK marked this conversation as resolved.
Show resolved Hide resolved
UPLOAD_STARTED = auto()
UPLOAD_WAS_ABORTED = auto()
UPLOAD_FINISHED_SUCCESSFULLY = auto()
UPLOAD_FINISHED_WITH_ERRROR = auto()


class InputStatus(StrEnum):
DOWNLOAD_STARTED = auto()
DOWNLOAD_WAS_ABORTED = auto()
DOWNLOAD_FINISHED_SUCCESSFULLY = auto()
DOWNLOAD_FINISHED_WITH_ERRROR = auto()


class _PortStatusCommon(BaseModel):
project_id: ProjectID
node_id: NodeID
port_key: ServicePortKey


class OutputPortStatus(_PortStatusCommon):
status: OutputStatus


class InputPortSatus(_PortStatusCommon):
status: InputStatus
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from typing import Final

SOCKET_IO_SERVICE_DISK_USAGE_EVENT: Final[str] = "serviceDiskUsage"
SOCKET_IO_STATE_OUTPUT_PORTS_EVENT: Final[str] = "stateOutputPorts"
SOCKET_IO_STATE_INPUT_PORTS_EVENT: Final[str] = "stateInputPorts"
63 changes: 48 additions & 15 deletions packages/simcore-sdk/src/simcore_sdk/node_ports_v2/nodeports_v2.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import logging
from abc import ABC, abstractmethod
from asyncio import CancelledError
from collections.abc import Callable, Coroutine
from pathlib import Path
from typing import Any
Expand Down Expand Up @@ -27,6 +29,20 @@
log = logging.getLogger(__name__)


class OutputsCallbacks(ABC):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here I have a few comments:

  1. this OutputsCallbacks, and actually set_multiple are only used by the dynamic sidecar, why are they coming with this >complex stuff inside the node ports package? Would it not make sense that you build on top instead of creating these weird constructs?
  2. The tests you added do not check anything regarding how this callbacks work,
  3. Sorry, but this looks like C++ man... ;)

@abstractmethod
async def aborted(self, key: ServicePortKey) -> None:
pass

@abstractmethod
async def finished_succesfully(self, key: ServicePortKey) -> None:
pass

@abstractmethod
async def finished_with_error(self, key: ServicePortKey) -> None:
pass


class Nodeports(BaseModel):
"""
Represents a node in a project and all its input/output ports
Expand Down Expand Up @@ -148,6 +164,7 @@ async def set_multiple(
],
*,
progress_bar: ProgressBarData,
outputs_callbacks: OutputsCallbacks | None,
) -> None:
"""
Sets the provided values to the respective input or output ports
Expand All @@ -156,26 +173,42 @@ async def set_multiple(

raises ValidationError
"""

async def _set_with_notifications(
port_key: ServicePortKey,
value: ItemConcreteValue | None,
set_kwargs: SetKWargs | None,
sub_progress: ProgressBarData,
) -> None:
assert outputs_callbacks is not None # nosec
try:
# pylint: disable=protected-access
await self.internal_outputs[port_key]._set( # noqa: SLF001
value, set_kwargs=set_kwargs, progress_bar=sub_progress
)
await outputs_callbacks.finished_succesfully(port_key)
except UnboundPortError:
# not available try inputs
# if this fails it will raise another exception
# pylint: disable=protected-access
await self.internal_inputs[port_key]._set( # noqa: SLF001
value, set_kwargs=set_kwargs, progress_bar=sub_progress
)
except CancelledError:
await outputs_callbacks.aborted(port_key)
raise
except Exception:
await outputs_callbacks.finished_with_error(port_key)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Q: So if there is an **unexpected ** error, you want to notify simply that it failed? Since this reaches the user , I would use the technique of logging the error on the server side and sending an OEC to the front-end.

wait, you are reraising it ... so perhaps report it when you catch it?
BTW why do not you need to reraise?

raise

tasks = []
async with progress_bar.sub_progress(
steps=len(port_values.items()), description=IDStr("set multiple")
) as sub_progress:
for port_key, (value, set_kwargs) in port_values.items():
# pylint: disable=protected-access
try:
tasks.append(
self.internal_outputs[port_key]._set(
value, set_kwargs=set_kwargs, progress_bar=sub_progress
)
)
except UnboundPortError:
# not available try inputs
# if this fails it will raise another exception
tasks.append(
self.internal_inputs[port_key]._set(
value, set_kwargs=set_kwargs, progress_bar=sub_progress
)
)
tasks.append(
_set_with_notifications(port_key, value, set_kwargs, sub_progress)
)

results = await logged_gather(*tasks)
await self.save_to_db_cb(self)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from collections.abc import Awaitable, Callable, Iterable
from pathlib import Path
from typing import Any
from unittest.mock import AsyncMock
from uuid import uuid4

import np_helpers
Expand Down Expand Up @@ -777,6 +778,7 @@ async def test_batch_update_inputs_outputs(
for k, port in enumerate((await PORTS.outputs).values())
},
progress_bar=progress_bar,
outputs_callbacks=AsyncMock(),
)
# pylint: disable=protected-access
assert progress_bar._current_steps == pytest.approx(1) # noqa: SLF001
Expand All @@ -786,6 +788,7 @@ async def test_batch_update_inputs_outputs(
for k, port in enumerate((await PORTS.inputs).values(), start=1000)
},
progress_bar=progress_bar,
outputs_callbacks=AsyncMock(),
)
assert progress_bar._current_steps == pytest.approx(2) # noqa: SLF001

Expand All @@ -807,4 +810,5 @@ async def test_batch_update_inputs_outputs(
await PORTS.set_multiple(
{ServicePortKey("missing_key_in_both"): (123132, None)},
progress_bar=progress_bar,
outputs_callbacks=AsyncMock(),
)
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

from pathlib import Path
from typing import Any, Callable
from unittest.mock import AsyncMock

import pytest
from faker import Faker
Expand Down Expand Up @@ -138,6 +139,7 @@ async def mock_node_port_creator_cb(*args, **kwargs):
+ list(original_outputs.values())
},
progress_bar=progress_bar,
outputs_callbacks=AsyncMock(),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure I understand here. you do not test anything of what you added right?

)
assert progress_bar._current_steps == pytest.approx(1) # noqa: SLF001

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,7 @@ async def ports_inputs_pull_task(
request: Request,
tasks_manager: Annotated[TasksManager, Depends(get_tasks_manager)],
app: Annotated[FastAPI, Depends(get_application)],
settings: Annotated[ApplicationSettings, Depends(get_settings)],
mounted_volumes: Annotated[MountedVolumes, Depends(get_mounted_volumes)],
inputs_state: Annotated[InputsState, Depends(get_inputs_state)],
port_keys: list[str] | None = None,
Expand All @@ -223,6 +224,7 @@ async def ports_inputs_pull_task(
port_keys=port_keys,
mounted_volumes=mounted_volumes,
app=app,
settings=settings,
inputs_pulling_enabled=inputs_state.inputs_pulling_enabled,
)
except TaskAlreadyRunningError as e:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from ..modules.attribute_monitor import setup_attribute_monitor
from ..modules.inputs import setup_inputs
from ..modules.mounted_fs import MountedVolumes, setup_mounted_fs
from ..modules.notifications import setup_notifications
from ..modules.outputs import setup_outputs
from ..modules.prometheus_metrics import setup_prometheus_metrics
from ..modules.resource_tracking import setup_resource_tracking
Expand Down Expand Up @@ -172,6 +173,7 @@ def create_app():
setup_rabbitmq(app)
setup_background_log_fetcher(app)
setup_resource_tracking(app)
setup_notifications(app)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this not already a copy paste of something else? I have the feeling this notifications thing comes up several time in the repo

setup_system_monitor(app)

setup_mounted_fs(app)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@
from servicelib.progress_bar import ProgressBarData
from servicelib.utils import logged_gather
from simcore_sdk.node_data import data_manager
from simcore_service_dynamic_sidecar.modules.notifications._notifications_ports import (
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

relative imports

PortNotifier,
)
from tenacity import retry
from tenacity.before_sleep import before_sleep_log
from tenacity.retry import retry_if_result
Expand Down Expand Up @@ -472,6 +475,7 @@ async def task_ports_inputs_pull(
port_keys: list[str] | None,
mounted_volumes: MountedVolumes,
app: FastAPI,
settings: ApplicationSettings,
*,
inputs_pulling_enabled: bool,
) -> int:
Expand Down Expand Up @@ -505,6 +509,12 @@ async def task_ports_inputs_pull(
post_sidecar_log_message, app, log_level=logging.INFO
),
progress_bar=root_progress,
port_notifier=PortNotifier(
app,
settings.DY_SIDECAR_USER_ID,
settings.DY_SIDECAR_PROJECT_ID,
settings.DY_SIDECAR_NODE_ID,
),
)
await post_sidecar_log_message(
app, "Finished pulling inputs", log_level=logging.INFO
Expand Down Expand Up @@ -541,6 +551,7 @@ async def task_ports_outputs_pull(
post_sidecar_log_message, app, log_level=logging.INFO
),
progress_bar=root_progress,
port_notifier=None,
)
await post_sidecar_log_message(
app, "Finished pulling outputs", log_level=logging.INFO
Expand Down
Loading
Loading