Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Distributed client closing with ValueError, client was created in a different Context #8031

Closed
sanderegg opened this issue Jul 24, 2023 · 1 comment

Comments

@sanderegg
Copy link

Describe the issue:
After upgrade to distributed 2023.7.0 (and also 2023.7.1) tests started failing with the following error:

ERROR test_publish_dataset.py::test_submit_future_with_resources - ValueError: <Token var=<ContextVar name='_current_client' default=None at 0x7f13e2dbb330> at 0x7f13e206a940> was created in a different Context

This happens in the provided code when the client is closed in the pytest fixture.

I already had another issue regarding the resources that are broken (see #7859). But this came as an additional issue with 2023.7. I am not sure how I should modify the code to solve this or if this is a real issue that should be fixed on the dask library side?

Minimal Complete Verifiable Example:

from typing import AsyncIterator, Callable, Coroutine
from distributed import Client, Scheduler, SpecCluster, Worker, get_worker
import pytest


@pytest.fixture
async def dask_spec_local_cluster(
    unused_tcp_port_factory: Callable,
) -> AsyncIterator[SpecCluster]:
    # in this mode we can precisely create a specific cluster
    workers = {
        "cpu-worker": {
            "cls": Worker,
            "options": {
                "nthreads": 2,
                "resources": {"CPU": 2, "RAM": 48e9},
            },
        },
        "gpu-worker": {
            "cls": Worker,
            "options": {
                "nthreads": 1,
                "resources": {
                    "CPU": 1,
                    "GPU": 1,
                    "RAM": 48e9,
                },
            },
        },
        "bigcpu-worker": {
            "cls": Worker,
            "options": {
                "nthreads": 1,
                "resources": {
                    "CPU": 8,
                    "RAM": 768e9,
                },
            },
        },
    }
    scheduler = {
        "cls": Scheduler,
        "options": {
            "port": unused_tcp_port_factory(),
            "dashboard_address": f":{unused_tcp_port_factory()}",
        },
    }

    async with SpecCluster(
        workers=workers, scheduler=scheduler, asynchronous=True, name="pytest_cluster"
    ) as cluster:
        yield cluster


@pytest.fixture
async def dask_client(dask_spec_local_cluster: SpecCluster) -> AsyncIterator[Client]:
    async with Client(
        dask_spec_local_cluster.scheduler_address, asynchronous=True, set_as_default=True
    ) as client:
        client.as_current()
        yield client
        client.as_current()


def _retrieve_annotations() -> None:
    worker = get_worker()
    task = worker.state.tasks.get(worker.get_current_task())
    return task.annotations


RESOURCES = {"CPU": 1.0, "RAM": 123423}


async def test_submit_future_with_resources(dask_client: Client):
    future = dask_client.submit(_retrieve_annotations, resources=RESOURCES)
    assert future
    coro = future.result()
    assert isinstance(coro, Coroutine)
    assert await coro == {"resources": RESOURCES}

Anything else we need to know?:

Environment:

  • Dask version: 2023.7.X
  • Python version: 3.10.X, 3.11.X
  • Operating System: Windows(WSL2) and Ubuntu 22.04
  • Install method (conda, pip, source): pip
@hendrikmakait
Copy link
Member

@sanderegg: Thanks for reporting this issue. This appears to be a duplicate of #7984, which contains some guidance on how to avoid the problem. I would suggest moving further conversation over to that issue and will therefore close this for now.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants