From 4e899a88141f3a894ebd3495ae1bf166300da03a Mon Sep 17 00:00:00 2001 From: fjetter Date: Wed, 13 Dec 2023 10:33:01 +0100 Subject: [PATCH] Remove allow-pickle option --- .../tests/test_scheduler_plugin.py | 17 ------------ distributed/distributed-schema.yaml | 10 ------- distributed/distributed.yaml | 1 - distributed/protocol/core.py | 12 +-------- distributed/protocol/serialize.py | 4 --- distributed/scheduler.py | 27 ++++--------------- distributed/tests/test_scheduler.py | 12 ++++----- 7 files changed, 11 insertions(+), 72 deletions(-) diff --git a/distributed/diagnostics/tests/test_scheduler_plugin.py b/distributed/diagnostics/tests/test_scheduler_plugin.py index 52e43ecfec..76c305e418 100644 --- a/distributed/diagnostics/tests/test_scheduler_plugin.py +++ b/distributed/diagnostics/tests/test_scheduler_plugin.py @@ -402,23 +402,6 @@ def start(self, scheduler): assert s.foo == "bar" -@gen_cluster(client=True, config={"distributed.scheduler.pickle": False}) -async def test_register_plugin_pickle_disabled(c, s, a, b): - class Dummy1(SchedulerPlugin): - def start(self, scheduler): - scheduler.foo = "bar" - - n_plugins = len(s.plugins) - with pytest.raises(ValueError) as excinfo: - await c.register_plugin(Dummy1()) - - msg = str(excinfo.value) - assert "disallowed from deserializing" in msg - assert "distributed.scheduler.pickle" in msg - - assert n_plugins == len(s.plugins) - - @gen_cluster(nthreads=[]) async def test_unregister_scheduler_plugin(s): class Plugin(SchedulerPlugin): diff --git a/distributed/distributed-schema.yaml b/distributed/distributed-schema.yaml index 209cd3e255..3d7cd5ead8 100644 --- a/distributed/distributed-schema.yaml +++ b/distributed/distributed-schema.yaml @@ -130,16 +130,6 @@ properties: If we don't receive a heartbeat faster than this then we assume that the worker has died. - pickle: - type: boolean - description: | - Is the scheduler allowed to deserialize arbitrary bytestrings? - - The scheduler almost never deserializes user data. - However there are some cases where the user can submit functions to run directly on the scheduler. - This can be convenient for debugging, but also introduces some security risk. - By setting this to false we ensure that the user is unable to run arbitrary code on the scheduler. - preload: type: array description: | diff --git a/distributed/distributed.yaml b/distributed/distributed.yaml index ade0af311b..4f5b31249c 100644 --- a/distributed/distributed.yaml +++ b/distributed/distributed.yaml @@ -22,7 +22,6 @@ distributed: work-stealing-interval: 100ms # Callback time for work stealing worker-saturation: 1.1 # Send this fraction of nthreads root tasks to workers worker-ttl: "5 minutes" # like '60s'. Time to live for workers. They must heartbeat faster than this - pickle: True # Is the scheduler allowed to deserialize arbitrary bytestrings preload: [] # Run custom modules with Scheduler preload-argv: [] # See https://docs.dask.org/en/latest/how-to/customize-initialization.html unknown-task-duration: 500ms # Default duration for all tasks with unknown durations ("15m", "2h") diff --git a/distributed/protocol/core.py b/distributed/protocol/core.py index 58740af62c..e698335aa3 100644 --- a/distributed/protocol/core.py +++ b/distributed/protocol/core.py @@ -4,8 +4,6 @@ import msgpack -import dask.config - from distributed.protocol import pickle from distributed.protocol.compression import decompress, maybe_compress from distributed.protocol.serialize import ( @@ -117,8 +115,6 @@ def _encode_default(obj): def loads(frames, deserialize=True, deserializers=None): """Transform bytestream back into Python value""" - allow_pickle = dask.config.get("distributed.scheduler.pickle") - try: def _decode_default(obj): @@ -148,13 +144,7 @@ def _decode_default(obj): sub_frames = frames[offset : offset + sub_header["num-sub-frames"]] if "compression" in sub_header: sub_frames = decompress(sub_header, sub_frames) - if allow_pickle: - return pickle.loads(sub_header["pickled-obj"], buffers=sub_frames) - else: - raise ValueError( - "Unpickle on the Scheduler isn't allowed, set `distributed.scheduler.pickle=true`" - ) - + return pickle.loads(sub_header["pickled-obj"], buffers=sub_frames) return msgpack_decode_default(obj) return msgpack.loads( diff --git a/distributed/protocol/serialize.py b/distributed/protocol/serialize.py index 98426332fe..3262032051 100644 --- a/distributed/protocol/serialize.py +++ b/distributed/protocol/serialize.py @@ -573,10 +573,6 @@ class ToPickle(Generic[T]): Both the scheduler and workers with automatically unpickle this object on arrival. - - Notice, this requires that the scheduler is allowed to use pickle. - If the configuration option "distributed.scheduler.pickle" is set - to False, the scheduler will raise an exception instead. """ data: T diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 4d089a39ec..882bcf0a7b 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -3533,6 +3533,10 @@ def __init__( jupyter=False, **kwargs, ): + if dask.config.get("distributed.scheduler.pickle", default=True) is False: + raise RuntimeError( + "Pickling can no longer be disabled with the `distributed.scheduler.pickle` option. Please remove this configuration to start the scheduler." + ) if loop is not None: warnings.warn( "the loop kwarg to Scheduler is deprecated", @@ -5868,13 +5872,6 @@ async def register_scheduler_plugin( idempotent: bool | None = None, ) -> None: """Register a plugin on the scheduler.""" - if not dask.config.get("distributed.scheduler.pickle"): - raise ValueError( - "Cannot register a scheduler plugin as the scheduler " - "has been explicitly disallowed from deserializing " - "arbitrary bytestrings using pickle via the " - "'distributed.scheduler.pickle' configuration setting." - ) if idempotent is None: warnings.warn( "The signature of `Scheduler.register_scheduler_plugin` now requires " @@ -6922,7 +6919,7 @@ def workers_to_close( if key is None: key = operator.attrgetter("address") - if isinstance(key, bytes) and dask.config.get("distributed.scheduler.pickle"): + if isinstance(key, bytes): key = pickle.loads(key) groups = groupby(key, self.workers.values()) @@ -7262,14 +7259,6 @@ async def feed( Caution: this runs arbitrary Python code on the scheduler. This should eventually be phased out. It is mostly used by diagnostics. """ - if not dask.config.get("distributed.scheduler.pickle"): - logger.warning( - "Tried to call 'feed' route with custom functions, but " - "pickle is disallowed. Set the 'distributed.scheduler.pickle'" - "config value to True to use the 'feed' route (this is mostly " - "commonly used with progress bars)" - ) - return interval = parse_timedelta(interval) if function: @@ -7484,12 +7473,6 @@ def run_function( """ from distributed.worker import run - if not dask.config.get("distributed.scheduler.pickle"): - raise ValueError( - "Cannot run function as the scheduler has been explicitly disallowed from " - "deserializing arbitrary bytestrings using pickle via the " - "'distributed.scheduler.pickle' configuration setting." - ) kwargs = kwargs or {} self.log_event("all", {"action": "run-function", "function": function}) return run(self, comm, function=function, args=args, kwargs=kwargs, wait=wait) diff --git a/distributed/tests/test_scheduler.py b/distributed/tests/test_scheduler.py index 0a668f0a0e..5485d44e63 100644 --- a/distributed/tests/test_scheduler.py +++ b/distributed/tests/test_scheduler.py @@ -1799,13 +1799,11 @@ def f(dask_scheduler=None): assert response == s.address -@gen_cluster(client=True, config={"distributed.scheduler.pickle": False}) -async def test_run_on_scheduler_disabled(c, s, a, b): - def f(dask_scheduler=None): - return dask_scheduler.address - - with pytest.raises(ValueError, match="disallowed from deserializing"): - await c._run_on_scheduler(f) +@gen_test() +async def test_allow_pickle_false(): + with dask.config.set({"distributed.scheduler.pickle": False}): + with pytest.raises(RuntimeError, match="Pickling can no longer be disabled"): + await Scheduler() @gen_cluster()