Skip to content

Commit

Permalink
Remove allow-pickle option
Browse files Browse the repository at this point in the history
  • Loading branch information
fjetter committed Dec 13, 2023
1 parent 05ba316 commit 4e899a8
Show file tree
Hide file tree
Showing 7 changed files with 11 additions and 72 deletions.
17 changes: 0 additions & 17 deletions distributed/diagnostics/tests/test_scheduler_plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -402,23 +402,6 @@ def start(self, scheduler):
assert s.foo == "bar"


@gen_cluster(client=True, config={"distributed.scheduler.pickle": False})
async def test_register_plugin_pickle_disabled(c, s, a, b):
class Dummy1(SchedulerPlugin):
def start(self, scheduler):
scheduler.foo = "bar"

n_plugins = len(s.plugins)
with pytest.raises(ValueError) as excinfo:
await c.register_plugin(Dummy1())

msg = str(excinfo.value)
assert "disallowed from deserializing" in msg
assert "distributed.scheduler.pickle" in msg

assert n_plugins == len(s.plugins)


@gen_cluster(nthreads=[])
async def test_unregister_scheduler_plugin(s):
class Plugin(SchedulerPlugin):
Expand Down
10 changes: 0 additions & 10 deletions distributed/distributed-schema.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -130,16 +130,6 @@ properties:
If we don't receive a heartbeat faster than this then we assume that the worker has died.
pickle:
type: boolean
description: |
Is the scheduler allowed to deserialize arbitrary bytestrings?
The scheduler almost never deserializes user data.
However there are some cases where the user can submit functions to run directly on the scheduler.
This can be convenient for debugging, but also introduces some security risk.
By setting this to false we ensure that the user is unable to run arbitrary code on the scheduler.
preload:
type: array
description: |
Expand Down
1 change: 0 additions & 1 deletion distributed/distributed.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ distributed:
work-stealing-interval: 100ms # Callback time for work stealing
worker-saturation: 1.1 # Send this fraction of nthreads root tasks to workers
worker-ttl: "5 minutes" # like '60s'. Time to live for workers. They must heartbeat faster than this
pickle: True # Is the scheduler allowed to deserialize arbitrary bytestrings
preload: [] # Run custom modules with Scheduler
preload-argv: [] # See https://docs.dask.org/en/latest/how-to/customize-initialization.html
unknown-task-duration: 500ms # Default duration for all tasks with unknown durations ("15m", "2h")
Expand Down
12 changes: 1 addition & 11 deletions distributed/protocol/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@

import msgpack

import dask.config

from distributed.protocol import pickle
from distributed.protocol.compression import decompress, maybe_compress
from distributed.protocol.serialize import (
Expand Down Expand Up @@ -117,8 +115,6 @@ def _encode_default(obj):
def loads(frames, deserialize=True, deserializers=None):
"""Transform bytestream back into Python value"""

allow_pickle = dask.config.get("distributed.scheduler.pickle")

try:

def _decode_default(obj):
Expand Down Expand Up @@ -148,13 +144,7 @@ def _decode_default(obj):
sub_frames = frames[offset : offset + sub_header["num-sub-frames"]]
if "compression" in sub_header:
sub_frames = decompress(sub_header, sub_frames)
if allow_pickle:
return pickle.loads(sub_header["pickled-obj"], buffers=sub_frames)
else:
raise ValueError(
"Unpickle on the Scheduler isn't allowed, set `distributed.scheduler.pickle=true`"
)

return pickle.loads(sub_header["pickled-obj"], buffers=sub_frames)

Check warning on line 147 in distributed/protocol/core.py

View check run for this annotation

Codecov / codecov/patch

distributed/protocol/core.py#L147

Added line #L147 was not covered by tests
return msgpack_decode_default(obj)

return msgpack.loads(
Expand Down
4 changes: 0 additions & 4 deletions distributed/protocol/serialize.py
Original file line number Diff line number Diff line change
Expand Up @@ -573,10 +573,6 @@ class ToPickle(Generic[T]):
Both the scheduler and workers with automatically unpickle this
object on arrival.
Notice, this requires that the scheduler is allowed to use pickle.
If the configuration option "distributed.scheduler.pickle" is set
to False, the scheduler will raise an exception instead.
"""

data: T
Expand Down
27 changes: 5 additions & 22 deletions distributed/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -3533,6 +3533,10 @@ def __init__(
jupyter=False,
**kwargs,
):
if dask.config.get("distributed.scheduler.pickle", default=True) is False:
raise RuntimeError(

Check warning on line 3537 in distributed/scheduler.py

View check run for this annotation

Codecov / codecov/patch

distributed/scheduler.py#L3536-L3537

Added lines #L3536 - L3537 were not covered by tests
"Pickling can no longer be disabled with the `distributed.scheduler.pickle` option. Please remove this configuration to start the scheduler."
)
if loop is not None:
warnings.warn(
"the loop kwarg to Scheduler is deprecated",
Expand Down Expand Up @@ -5868,13 +5872,6 @@ async def register_scheduler_plugin(
idempotent: bool | None = None,
) -> None:
"""Register a plugin on the scheduler."""
if not dask.config.get("distributed.scheduler.pickle"):
raise ValueError(
"Cannot register a scheduler plugin as the scheduler "
"has been explicitly disallowed from deserializing "
"arbitrary bytestrings using pickle via the "
"'distributed.scheduler.pickle' configuration setting."
)
if idempotent is None:
warnings.warn(
"The signature of `Scheduler.register_scheduler_plugin` now requires "
Expand Down Expand Up @@ -6922,7 +6919,7 @@ def workers_to_close(

if key is None:
key = operator.attrgetter("address")
if isinstance(key, bytes) and dask.config.get("distributed.scheduler.pickle"):
if isinstance(key, bytes):

Check warning on line 6922 in distributed/scheduler.py

View check run for this annotation

Codecov / codecov/patch

distributed/scheduler.py#L6922

Added line #L6922 was not covered by tests
key = pickle.loads(key)

groups = groupby(key, self.workers.values())
Expand Down Expand Up @@ -7262,14 +7259,6 @@ async def feed(
Caution: this runs arbitrary Python code on the scheduler. This should
eventually be phased out. It is mostly used by diagnostics.
"""
if not dask.config.get("distributed.scheduler.pickle"):
logger.warning(
"Tried to call 'feed' route with custom functions, but "
"pickle is disallowed. Set the 'distributed.scheduler.pickle'"
"config value to True to use the 'feed' route (this is mostly "
"commonly used with progress bars)"
)
return

interval = parse_timedelta(interval)
if function:
Expand Down Expand Up @@ -7484,12 +7473,6 @@ def run_function(
"""
from distributed.worker import run

if not dask.config.get("distributed.scheduler.pickle"):
raise ValueError(
"Cannot run function as the scheduler has been explicitly disallowed from "
"deserializing arbitrary bytestrings using pickle via the "
"'distributed.scheduler.pickle' configuration setting."
)
kwargs = kwargs or {}
self.log_event("all", {"action": "run-function", "function": function})
return run(self, comm, function=function, args=args, kwargs=kwargs, wait=wait)
Expand Down
12 changes: 5 additions & 7 deletions distributed/tests/test_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -1799,13 +1799,11 @@ def f(dask_scheduler=None):
assert response == s.address


@gen_cluster(client=True, config={"distributed.scheduler.pickle": False})
async def test_run_on_scheduler_disabled(c, s, a, b):
def f(dask_scheduler=None):
return dask_scheduler.address

with pytest.raises(ValueError, match="disallowed from deserializing"):
await c._run_on_scheduler(f)
@gen_test()
async def test_allow_pickle_false():
with dask.config.set({"distributed.scheduler.pickle": False}):
with pytest.raises(RuntimeError, match="Pickling can no longer be disabled"):
await Scheduler()


@gen_cluster()
Expand Down

0 comments on commit 4e899a8

Please sign in to comment.