Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

task resources is deleted after call to publish_dataset on the task future since 2023.4.0 #7859

Closed
sanderegg opened this issue May 25, 2023 · 16 comments · Fixed by #8577
Closed

Comments

@sanderegg
Copy link

sanderegg commented May 25, 2023

Describe the issue:
After upgrading dask[distributed] from 2023.3.x to the latest 2023.5.0 I detected in my tests that the behavior of task resources changed.
In my system one can start a computational task, logout and check later the status of the computation. To this end I heavily rely on the dask client submit and publish_dataset methods. From version 2023.4.0 it seems that publishing the dataset actually has the side effect to also completely remove the task defined resources (e.g. CPU, RAM, etc) and therefore breaking my code.

Am I using a feature that was not supposed to work? I see inthe changelog of 2023.4 that there were it seems big changes with the dask-scheduler needing similar "software and hardware". is there a link with this sentence?
Thanks a lot for the great work with dask, it's an awesome platform!

Minimal Complete Verifiable Example:

# requirements.txt
dask[distributed]
pytest
pytest-asyncio
from typing import AsyncIterator, Callable, Coroutine
from distributed import Client, Scheduler, SpecCluster, Worker, get_worker
import pytest


@pytest.fixture
async def dask_spec_local_cluster(
    unused_tcp_port_factory: Callable,
) -> AsyncIterator[SpecCluster]:
    # in this mode we can precisely create a specific cluster
    workers = {
        "cpu-worker": {
            "cls": Worker,
            "options": {
                "nthreads": 2,
                "resources": {"CPU": 2, "RAM": 48e9},
            },
        },
        "gpu-worker": {
            "cls": Worker,
            "options": {
                "nthreads": 1,
                "resources": {
                    "CPU": 1,
                    "GPU": 1,
                    "RAM": 48e9,
                },
            },
        },
        "bigcpu-worker": {
            "cls": Worker,
            "options": {
                "nthreads": 1,
                "resources": {
                    "CPU": 8,
                    "RAM": 768e9,
                },
            },
        },
    }
    scheduler = {
        "cls": Scheduler,
        "options": {
            "port": unused_tcp_port_factory(),
            "dashboard_address": f":{unused_tcp_port_factory()}",
        },
    }

    async with SpecCluster(
        workers=workers, scheduler=scheduler, asynchronous=True, name="pytest_cluster"
    ) as cluster:
        yield cluster


@pytest.fixture
async def dask_client(dask_spec_local_cluster: SpecCluster) -> AsyncIterator[Client]:
    async with Client(
        dask_spec_local_cluster.scheduler_address, asynchronous=True
    ) as client:
        yield client


def _retrieve_annotations() -> None:
    worker = get_worker()
    task = worker.state.tasks.get(worker.get_current_task())
    return task.annotations


RESOURCES = {"CPU": 1.0, "RAM": 123423}


async def test_submit_future_with_resources(dask_client: Client):
    future = dask_client.submit(_retrieve_annotations, resources=RESOURCES)
    assert future
    coro = future.result()
    assert isinstance(coro, Coroutine)
    assert await coro == {"resources": RESOURCES}


async def test_submit_future_with_resources_and_published_dataset(dask_client: Client):
    future = dask_client.submit(_retrieve_annotations, resources=RESOURCES)
    assert future
    await dask_client.publish_dataset(future, name="myfuture")
    coro = future.result()
    assert isinstance(coro, Coroutine)
    assert await coro == {"resources": RESOURCES}

run using the following code

pip install -r requirements.txt
pytest --asyncio-mode=auto -xv

Anything else we need to know?:

  • using pip install dask[distributed]==2023.3 will make the test run correctly, where pip install dask[distributed]==2023.4 will fail the second test (same with 2023.5)
    Environment:

  • Dask version: 2023.3.x vs 2023.4.x,2023.5.x

  • Python version: 3.10.10

  • Operating System: ubuntu 22.04, 20.04, windows 11

  • Install method (conda, pip, source): pip

@quasiben
Copy link
Member

I can reproduce and I think I triaged the issue down to #7564 . I don't have a lot of time to figure out what's going on . I'm not very familiar with this area of the code but shouldn't the tasks on scheduler have the annotations not the worker ?

@fjetter
Copy link
Member

fjetter commented May 25, 2023

very familiar with this area of the code but shouldn't the tasks on scheduler have the annotations not the worker ?

Indeed. The relevant annotations are on scheduler side. However, afaik we're just forwarding the scheduler annotations so I assume something broke

@sanderegg
Copy link
Author

@quasiben @fjetter thank you for your answers.
so this is indeed a bug it seems. Should I use another API in order to get the resources from inside the worker?

@quasiben
Copy link
Member

Would using total_resources or available_resources work for you ? I believe these are now in the state_machine of the worker

In [6]: client.run(lambda dask_worker: dask_worker.state.total_resources)
Out[6]: {'tcp://127.0.0.1:57326': {'GPU': 2.0}}

In [7]: client.run(lambda dask_worker: dask_worker.state.available_resources)
Out[7]: {'tcp://127.0.0.1:57326': {'GPU': 2.0}}

@sanderegg
Copy link
Author

Thanks for your answer @quasiben . So I tried with what you propose, and the exact same issue arises.
As soon as I use publish_dataset, then available_resources == total_resources. Otherwise with not calling publish_dataset it would work

@sanderegg
Copy link
Author

But then this is still an issue and for us that means we cannot upgrade dask anymore since we do run computations in the background.

@sanderegg
Copy link
Author

@fjetter , @quasiben : sorry to insist here but is there any news/plan on this issue?

@sanderegg
Copy link
Author

I was now testing with distributed==2023.7.0 and it got worse.

Now even the first test fails when the fixture of the dask_client is tearing down.
The error I get is ERROR test_publish_dataset.py::test_submit_future_with_resources - ValueError: <Token var=<ContextVar name='_current_client' default=None at 0x7f036fc2cb80> at 0x7f036f08e6c0> was created in a different Context

I have a feeling this might have to do with #6527, was this expected?
Any info on this @fjetter ? how should I change the test to make it work again?

@fjetter
Copy link
Member

fjetter commented Aug 2, 2023

Sorry for the slow response time. I'm struggling a bit to understand what your problems is about.

Is this about _retrieve_annotations not returning the correct annotations? A minimal reproducer (i.e. without the pytest stuff) would be helpful

@sanderegg
Copy link
Author

@fjetter sorry was off for some time and I missed your answer. yes the problem is that the task annotations disappear from the worker as soon as I use publish_dataset.
In the code above the differences are:

async def test_submit_future_with_resources(dask_client: Client):
    future = dask_client.submit(_retrieve_annotations, resources=RESOURCES)
    assert future
    coro = future.result()
    assert isinstance(coro, Coroutine)
    assert await coro == {"resources": RESOURCES} # THIS WORKS


async def test_submit_future_with_resources_and_published_dataset(dask_client: Client):
    future = dask_client.submit(_retrieve_annotations, resources=RESOURCES)
    assert future
    await dask_client.publish_dataset(future, name="myfuture") # CALLING THIS MAKES THE ANNOTATIONS IN WORKER DISAPPEAR
    coro = future.result()
    assert isinstance(coro, Coroutine)
    assert await coro == {"resources": RESOURCES} # THIS DOES NOT WORK

Yes _retrieve_annotations fails when I use publish_dataset.

After some thinking I guess I can also manually pass the resources as an argument of the function instead. then I do not rely on the Worker code anymore. But I guess that is not the intention of that code.

@sanderegg
Copy link
Author

just for info I upgraded today to version 2024.1.1 and the issue described here still happens, e.g. the resources assigned to at task are not retrievable from the worker once the task is published on the scheduler.
Any news here? @fjetter , @quasiben ?

@sanderegg
Copy link
Author

sanderegg commented Jan 31, 2024

Here is a simplified reproducible code that shows the issue:

from distributed import Client, SpecCluster, Worker, get_worker


def _retrieve_annotations() -> dict | None:
    import time

    worker = get_worker()
    task = worker.state.tasks.get(worker.get_current_task())
    print(f"retrieved {task=}")
    print(f"{task.annotations=}")
    print("going to sleep 60 seconds")
    time.sleep(60)
    return task.annotations


TASK_RESOURCES = {"RAM": 123}
WORKER_RESOURCES = {"CPU": 2.0, "RAM": 223423}

if __name__ == "__main__":
    cluster = SpecCluster(
        workers={
            "cpu-worker": {
                "cls": Worker,
                "options": {
                    # "nthreads": 2,
                    "resources": WORKER_RESOURCES,
                },
            }
        }
    )
    dask_client = Client(cluster)
    print(dask_client.dashboard_link)
    future = dask_client.submit(_retrieve_annotations, resources=TASK_RESOURCES)
    assert future
    dask_client.publish_dataset(future, name="myfuture")
    result = future.result()

    assert result == {"resources": TASK_RESOURCES}

looking at the dashboard you can see that the Consumed Resources are showing 0
image

If you comment out dask_client.publish_dataset(future, name="myfuture") then it works as it should and shows the correct consumed resources. But then I think there is something heavily broken with the resources assignment together with publishing a dataset.

for info that is:
python 3.11.7
distributed 2024.1.1

@sanderegg
Copy link
Author

Hey @fjetter, sorry to bother again but is there some way I can help in solving that issue? I and my company cannot upgrade dask anymore because of it since 2023.3.

@sanderegg
Copy link
Author

@fjetter @quasiben I created a test inside of the distributed repository for you to demonstrate the issue:

# distributed/tests/test_publish_dataset_issue.py
from distributed.client import Client
from distributed.scheduler import Scheduler
from distributed.utils_test import gen_cluster

from distributed.worker import get_worker

def _retrieve_annotations() -> None:
    print("starting task")
    worker = get_worker()
    task = worker.state.tasks.get(worker.get_current_task())
    print("finished task")
    return task.annotations

RESOURCES = {"CPU": 1.0, "RAM": 123423}
@gen_cluster(client=True, worker_kwargs={"resources":{"CPU": 2, "RAM": 48e9}})
async def test_submit(c: Client, s: Scheduler, a, b):
    future = c.submit(_retrieve_annotations, resources=RESOURCES, pure=False)
    
    await c.publish_dataset(future, name="thefailing")
    assert await c.list_datasets() == ("thefailing",)

    result = await future.result()
    assert result == {"resources": RESOURCES}
  1. git checkout 2023.3.2 then run the test --> all is fine
  2. git checkout 2023.4.0 then run the test --> fails
  3. git checkout main then run the test --> fails
  4. interestingly if I add await asyncio.sleep(2) before publishing then it works, but I am not sure if that is useful.

@fjetter
Copy link
Member

fjetter commented Mar 13, 2024

Sorry for the silence. We've been busy with the latest release since that changed a lot. The reproducer was quite helpful and I was able to track the issue down and fix it, see #8577

It's a subtle race condition. As you already noticed yourself, if you sleep briefly before the publish, everything works as expected

@sanderegg
Copy link
Author

@fjetter thank you! that is great and I'm looking forward to seeing this in the next release and test it!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants