Hello community, here is the log from the commit of package python-distributed for openSUSE:Factory checked in at 2020-03-12 23:05:52 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ Comparing /work/SRC/openSUSE:Factory/python-distributed (Old) and /work/SRC/openSUSE:Factory/.python-distributed.new.3160 (New) ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Package is "python-distributed" Thu Mar 12 23:05:52 2020 rev:26 rq:782795 version:2.12.0 Changes: -------- --- /work/SRC/openSUSE:Factory/python-distributed/python-distributed.changes 2020-02-24 15:53:30.439618955 +0100 +++ /work/SRC/openSUSE:Factory/.python-distributed.new.3160/python-distributed.changes 2020-03-12 23:11:17.127299746 +0100 @@ -1,0 +2,37 @@ +Sun Mar 8 19:04:45 UTC 2020 - Arun Persaud <a...@gmx.de> + +- update to version 2.12.0: + * Update TaskGroup remove logic (GH#3557) James Bourbeau + * Fix-up CuPy sparse serialization (GH#3556) John Kirkham + * API docs for LocalCluster and SpecCluster (GH#3548) Tom Augspurger + * Serialize sparse arrays (GH#3545) John Kirkham + * Allow tasks with restrictions to be stolen (GH#3069) Stan Seibert + * Use UCX default configuration instead of raising (GH#3544) Peter + Andreas Entschev + * Support using other serializers with register_generic (GH#3536) + John Kirkham + * DOC: update to async await (GH#3543) Tom Augspurger + * Use pytest.raises in test_ucx_config.py (GH#3541) John Kirkham + * Fix/more ucx config options (GH#3539) Benjamin Zaitlen + * Update heartbeat CommClosedError error handling (GH#3529) James + Bourbeau + * Use makedirs when constructing local_directory (GH#3538) John + Kirkham + * Mark None as MessagePack serializable (GH#3537) John Kirkham + * Mark bool as MessagePack serializable (GH#3535) John Kirkham + * Use ‘temporary-directory’ from dask.config for Nanny’s directory + (GH#3531) John Kirkham + * Add try-except around getting source code in performance report + (GH#3505) Matthew Rocklin + * Fix typo in docstring (GH#3528) Davis Bennett + * Make work stealing callback time configurable (GH#3523) Lucas + Rademaker + * RMM/UCX Config Flags (GH#3515) Benjamin Zaitlen + * Revise develop-docs: conda env example (GH#3406) Darren Weber + * Remove import ucp from the top of ucx.py (GH#3510) Peter Andreas + Entschev + * Rename logs to get_logs (GH#3473) Jacob Tomlinson + * Stop keep alives when worker reconnecting to the scheduler + (GH#3493) Jacob Tomlinson + +------------------------------------------------------------------- Old: ---- distributed-2.11.0.tar.gz New: ---- distributed-2.12.0.tar.gz ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ Other differences: ------------------ ++++++ python-distributed.spec ++++++ --- /var/tmp/diff_new_pack.ipN7Jq/_old 2020-03-12 23:11:18.459300271 +0100 +++ /var/tmp/diff_new_pack.ipN7Jq/_new 2020-03-12 23:11:18.463300273 +0100 @@ -21,7 +21,7 @@ # Test requires network connection %bcond_with test Name: python-distributed -Version: 2.11.0 +Version: 2.12.0 Release: 0 Summary: Library for distributed computing with Python License: BSD-3-Clause ++++++ distributed-2.11.0.tar.gz -> distributed-2.12.0.tar.gz ++++++ diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/distributed-2.11.0/PKG-INFO new/distributed-2.12.0/PKG-INFO --- old/distributed-2.11.0/PKG-INFO 2020-02-19 18:46:22.634171200 +0100 +++ new/distributed-2.12.0/PKG-INFO 2020-03-06 21:17:16.877646200 +0100 @@ -1,6 +1,6 @@ Metadata-Version: 1.2 Name: distributed -Version: 2.11.0 +Version: 2.12.0 Summary: Distributed scheduler for Dask Home-page: https://distributed.dask.org Maintainer: Matthew Rocklin diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/distributed-2.11.0/distributed/_version.py new/distributed-2.12.0/distributed/_version.py --- old/distributed-2.11.0/distributed/_version.py 2020-02-19 18:46:22.636481300 +0100 +++ new/distributed-2.12.0/distributed/_version.py 2020-03-06 21:17:16.880402800 +0100 @@ -8,11 +8,11 @@ version_json = ''' { - "date": "2020-02-19T11:45:15-0600", + "date": "2020-03-06T14:16:22-0600", "dirty": false, "error": null, - "full-revisionid": "2a05299a934a2557b985dee93da1e0eff8689178", - "version": "2.11.0" + "full-revisionid": "15591929f3a6b7b390a7a5394e1f53fe6a6c16f4", + "version": "2.12.0" } ''' # END VERSION_JSON diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/distributed-2.11.0/distributed/client.py new/distributed-2.12.0/distributed/client.py --- old/distributed-2.11.0/distributed/client.py 2020-02-17 23:21:38.000000000 +0100 +++ new/distributed-2.12.0/distributed/client.py 2020-03-05 18:15:15.000000000 +0100 @@ -502,7 +502,7 @@ It is also common to create a Client without specifying the scheduler address , like ``Client()``. In this case the Client creates a - ``LocalCluster`` in the background and connects to that. Any extra + :class:`LocalCluster` in the background and connects to that. Any extra keywords are passed from Client to LocalCluster in this case. See the LocalCluster documentation for more information. @@ -569,7 +569,7 @@ See Also -------- distributed.scheduler.Scheduler: Internal scheduler - distributed.deploy.local.LocalCluster: + distributed.LocalCluster: """ _instances = weakref.WeakSet() @@ -4597,8 +4597,11 @@ async def __aexit__(self, typ, value, traceback, code=None): if not code: - frame = inspect.currentframe().f_back - code = inspect.getsource(frame) + try: + frame = inspect.currentframe().f_back + code = inspect.getsource(frame) + except Exception: + code = "" data = await get_client().scheduler.performance_report( start=self.start, code=code ) @@ -4609,8 +4612,11 @@ get_client().sync(self.__aenter__) def __exit__(self, typ, value, traceback): - frame = inspect.currentframe().f_back - code = inspect.getsource(frame) + try: + frame = inspect.currentframe().f_back + code = inspect.getsource(frame) + except Exception: + code = "" get_client().sync(self.__aexit__, type, value, traceback, code=code) diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/distributed-2.11.0/distributed/comm/tests/test_ucx.py new/distributed-2.12.0/distributed/comm/tests/test_ucx.py --- old/distributed-2.11.0/distributed/comm/tests/test_ucx.py 2020-02-12 03:23:12.000000000 +0100 +++ new/distributed-2.12.0/distributed/comm/tests/test_ucx.py 2020-03-01 17:01:14.000000000 +0100 @@ -10,7 +10,7 @@ from distributed.protocol import to_serialize from distributed.deploy.local import LocalCluster from dask.dataframe.utils import assert_eq -from distributed.utils_test import gen_test, loop, inc, cleanup # noqa: 401 +from distributed.utils_test import gen_test, loop, inc, cleanup, popen # noqa: 401 from .test_comms import check_deserialize diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/distributed-2.11.0/distributed/comm/tests/test_ucx_config.py new/distributed-2.12.0/distributed/comm/tests/test_ucx_config.py --- old/distributed-2.11.0/distributed/comm/tests/test_ucx_config.py 1970-01-01 01:00:00.000000000 +0100 +++ new/distributed-2.12.0/distributed/comm/tests/test_ucx_config.py 2020-03-04 22:11:23.000000000 +0100 @@ -0,0 +1,98 @@ +import pytest +from time import sleep + +import dask +from dask.utils import format_bytes +from distributed import Client +from distributed.utils_test import gen_test, loop, inc, cleanup, popen # noqa: 401 +from distributed.utils import get_ip +from distributed.comm.ucx import _scrub_ucx_config + +try: + HOST = get_ip() +except Exception: + HOST = "127.0.0.1" + +ucp = pytest.importorskip("ucp") +rmm = pytest.importorskip("rmm") + + +@pytest.mark.asyncio +async def test_ucx_config(cleanup): + + ucx = { + "nvlink": True, + "infiniband": True, + "net-devices": "", + "tcp": True, + "cuda_copy": True, + } + + with dask.config.set(ucx=ucx): + ucx_config = _scrub_ucx_config() + assert ucx_config.get("TLS") == "rc,tcp,sockcm,cuda_copy,cuda_ipc" + assert ucx_config.get("NET_DEVICES") is None + + ucx = { + "nvlink": False, + "infiniband": True, + "net-devices": "mlx5_0:1", + "tcp": True, + "cuda_copy": False, + } + + with dask.config.set(ucx=ucx): + ucx_config = _scrub_ucx_config() + assert ucx_config.get("TLS") == "rc,tcp,sockcm" + assert ucx_config.get("NET_DEVICES") == "mlx5_0:1" + + ucx = { + "nvlink": False, + "infiniband": True, + "net-devices": "all", + "MEMTYPE_CACHE": "y", + "tcp": True, + "cuda_copy": True, + } + + with dask.config.set(ucx=ucx): + ucx_config = _scrub_ucx_config() + assert ucx_config.get("TLS") == "rc,tcp,sockcm,cuda_copy" + assert ucx_config.get("MEMTYPE_CACHE") == "y" + + +def test_ucx_config_w_env_var(cleanup, loop, monkeypatch): + size = "1000.00 MB" + monkeypatch.setenv("DASK_RMM__POOL_SIZE", size) + + dask.config.refresh() + + port = "13339" + sched_addr = "ucx://%s:%s" % (HOST, port) + + with popen( + ["dask-scheduler", "--no-dashboard", "--protocol", "ucx", "--port", port] + ) as sched: + with popen( + [ + "dask-worker", + sched_addr, + "--no-dashboard", + "--protocol", + "ucx", + "--no-nanny", + ] + ) as w: + with Client(sched_addr, loop=loop, timeout=10) as c: + while not c.scheduler_info()["workers"]: + sleep(0.1) + + # configured with 1G pool + rmm_usage = c.run_on_scheduler(rmm.get_info) + assert size == format_bytes(rmm_usage.free) + + # configured with 1G pool + worker_addr = list(c.scheduler_info()["workers"])[0] + worker_rmm_usage = c.run(rmm.get_info) + rmm_usage = worker_rmm_usage[worker_addr] + assert size == format_bytes(rmm_usage.free) diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/distributed-2.11.0/distributed/comm/ucx.py new/distributed-2.12.0/distributed/comm/ucx.py --- old/distributed-2.11.0/distributed/comm/ucx.py 2020-02-14 16:29:09.000000000 +0100 +++ new/distributed-2.12.0/distributed/comm/ucx.py 2020-03-04 22:11:23.000000000 +0100 @@ -5,8 +5,6 @@ .. _UCX: https://github.com/openucx/ucx """ -import ucp - import logging import dask @@ -16,17 +14,23 @@ from .core import Comm, Connector, Listener, CommClosedError from .registry import Backend, backends from .utils import ensure_concrete_host, to_frames, from_frames -from ..utils import ensure_ip, get_ip, get_ipv6, nbytes, log_errors, CancelledError - -import dask -import numpy as np - +from ..utils import ( + ensure_ip, + get_ip, + get_ipv6, + nbytes, + log_errors, + CancelledError, + parse_bytes, +) logger = logging.getLogger(__name__) # In order to avoid double init when forking/spawning new processes (multiprocess), -# we make sure only to import and initialize UCX once at first use. +# we make sure only to import and initialize UCX once at first use. This is also +# required to ensure Dask configuration gets propagated to UCX, which needs +# variables to be set before being imported. ucp = None cuda_array = None @@ -39,7 +43,11 @@ import ucp as _ucp ucp = _ucp - ucp.init(options=dask.config.get("ucx"), env_takes_precedence=True) + + # remove/process dask.ucx flags for valid ucx options + ucx_config = _scrub_ucx_config() + + ucp.init(options=ucx_config, env_takes_precedence=True) # Find the function, `cuda_array()`, to use when allocating new CUDA arrays try: @@ -61,6 +69,13 @@ "In order to send/recv CUDA arrays, Numba or RMM is required" ) + pool_size_str = dask.config.get("rmm.pool-size") + if pool_size_str is not None: + pool_size = parse_bytes(pool_size_str) + rmm.reinitialize( + pool_allocator=True, managed_memory=False, initial_pool_size=pool_size + ) + class UCX(Comm): """Comm object using UCP. @@ -328,3 +343,58 @@ backends["ucx"] = UCXBackend() + + +def _scrub_ucx_config(): + """Function to scrub dask config options for valid UCX config options""" + + # configuration of UCX can happen in two ways: + # 1) high level on/off flags which correspond to UCX configuration + # 2) explicity defined UCX configuration flags + + # import does not initialize ucp -- this will occur outside this function + from ucp import get_config + + options = {} + + # if any of the high level flags are set, as long as they are not Null/None, + # we assume we should configure basic TLS settings for UCX, otherwise we + # leave UCX to its default configuration + if any( + [ + dask.config.get("ucx.tcp"), + dask.config.get("ucx.nvlink"), + dask.config.get("ucx.infiniband"), + ] + ): + tls = "tcp,sockcm" + tls_priority = "sockcm" + + # CUDA COPY can optionally be used with ucx -- we rely on the user + # to define when messages will include CUDA objects. Note: + # defining only the Infiniband flag will not enable cuda_copy + if any([dask.config.get("ucx.nvlink"), dask.config.get("ucx.cuda_copy")]): + tls = tls + ",cuda_copy" + + if dask.config.get("ucx.infiniband"): + tls = "rc," + tls + if dask.config.get("ucx.nvlink"): + tls = tls + ",cuda_ipc" + + options = {"TLS": tls, "SOCKADDR_TLS_PRIORITY": tls_priority} + + net_devices = dask.config.get("ucx.net-devices") + if net_devices is not None and net_devices != "": + options["NET_DEVICES"] = net_devices + + # ANY UCX options defined in config will overwrite high level dask.ucx flags + valid_ucx_keys = list(get_config().keys()) + for k, v in dask.config.get("ucx").items(): + if k in valid_ucx_keys: + options[k] = v + else: + logger.debug( + "Key: %s with value: %s not a valid UCX configuration option" % (k, v) + ) + + return options diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/distributed-2.11.0/distributed/deploy/adaptive_core.py new/distributed-2.12.0/distributed/deploy/adaptive_core.py --- old/distributed-2.11.0/distributed/deploy/adaptive_core.py 2019-11-11 22:08:57.000000000 +0100 +++ new/distributed-2.12.0/distributed/deploy/adaptive_core.py 2020-03-02 03:40:28.000000000 +0100 @@ -13,7 +13,7 @@ The core logic for adaptive deployments, with none of the cluster details This class controls our adaptive scaling behavior. It is intended to be - sued as a super-class or mixin. It expects the following state and methods: + used as a super-class or mixin. It expects the following state and methods: **State** diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/distributed-2.11.0/distributed/deploy/cluster.py new/distributed-2.12.0/distributed/deploy/cluster.py --- old/distributed-2.11.0/distributed/deploy/cluster.py 2020-02-14 16:29:06.000000000 +0100 +++ new/distributed-2.12.0/distributed/deploy/cluster.py 2020-03-01 17:01:14.000000000 +0100 @@ -1,6 +1,7 @@ import asyncio import logging import threading +import warnings from dask.utils import format_bytes @@ -159,11 +160,11 @@ else: return sync(self.loop, func, *args, **kwargs) - async def _logs(self, scheduler=True, workers=True): + async def _get_logs(self, scheduler=True, workers=True): logs = Logs() if scheduler: - L = await self.scheduler_comm.logs() + L = await self.scheduler_comm.get_logs() logs["Scheduler"] = Log("\n".join(line for level, line in L)) if workers: @@ -173,7 +174,7 @@ return logs - def logs(self, scheduler=True, workers=True): + def get_logs(self, scheduler=True, workers=True): """ Return logs for the scheduler and workers Parameters @@ -190,7 +191,11 @@ A dictionary of logs, with one item for the scheduler and one for each worker """ - return self.sync(self._logs, scheduler=scheduler, workers=workers) + return self.sync(self._get_logs, scheduler=scheduler, workers=workers) + + def logs(self, *args, **kwargs): + warnings.warn("logs is deprecated, use get_logs instead", DeprecationWarning) + return self.get_logs(*args, **kwargs) @property def dashboard_link(self): diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/distributed-2.11.0/distributed/distributed.yaml new/distributed-2.12.0/distributed/distributed.yaml --- old/distributed-2.11.0/distributed/distributed.yaml 2020-02-14 16:29:06.000000000 +0100 +++ new/distributed-2.12.0/distributed/distributed.yaml 2020-03-04 22:11:23.000000000 +0100 @@ -19,6 +19,7 @@ idle-timeout: null # Shut down after this duration, like "1h" or "30 minutes" transition-log-length: 100000 work-stealing: True # workers should steal tasks from each other + work-stealing-interval: 100ms # Callback time for work stealing worker-ttl: null # like '60s'. Time to live for workers. They must heartbeat faster than this pickle: True # Is the scheduler allowed to deserialize arbitrary bytestrings preload: [] @@ -132,5 +133,12 @@ log-length: 10000 # default length of logs to keep in memory log-format: '%(name)s - %(levelname)s - %(message)s' pdb-on-err: False # enter debug mode on scheduling error +rmm: + pool-size: null +ucx: + tcp: null # enable tcp + nvlink: null # enable cuda_ipc + infiniband: null # enable Infiniband + cuda_copy: null # enable cuda-copy + net-devices: null # define which Infiniband device to use -ucx: {} diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/distributed-2.11.0/distributed/nanny.py new/distributed-2.12.0/distributed/nanny.py --- old/distributed-2.11.0/distributed/nanny.py 2020-02-14 16:29:06.000000000 +0100 +++ new/distributed-2.12.0/distributed/nanny.py 2020-03-04 22:11:23.000000000 +0100 @@ -66,7 +66,7 @@ ncores=None, loop=None, local_dir=None, - local_directory="dask-worker-space", + local_directory=None, services=None, name=None, memory_limit="auto", @@ -150,6 +150,12 @@ warnings.warn("The local_dir keyword has moved to local_directory") local_directory = local_dir + if local_directory is None: + local_directory = dask.config.get("temporary-directory") or os.getcwd() + if not os.path.exists(local_directory): + os.makedirs(local_directory) + local_directory = os.path.join(local_directory, "dask-worker-space") + self.local_directory = local_directory self.services = services diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/distributed-2.11.0/distributed/protocol/__init__.py new/distributed-2.12.0/distributed/protocol/__init__.py --- old/distributed-2.11.0/distributed/protocol/__init__.py 2020-02-19 18:21:14.000000000 +0100 +++ new/distributed-2.12.0/distributed/protocol/__init__.py 2020-03-06 21:01:52.000000000 +0100 @@ -30,6 +30,12 @@ from . import numpy +@dask_serialize.register_lazy("scipy") +@dask_deserialize.register_lazy("scipy") +def _register_scipy(): + from . import scipy + + @dask_serialize.register_lazy("h5py") @dask_deserialize.register_lazy("h5py") def _register_h5py(): @@ -72,6 +78,10 @@ @cuda_deserialize.register_lazy("cupy") @dask_serialize.register_lazy("cupy") @dask_deserialize.register_lazy("cupy") +@cuda_serialize.register_lazy("cupyx") +@cuda_deserialize.register_lazy("cupyx") +@dask_serialize.register_lazy("cupyx") +@dask_deserialize.register_lazy("cupyx") def _register_cupy(): from . import cupy diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/distributed-2.11.0/distributed/protocol/cuda.py new/distributed-2.12.0/distributed/protocol/cuda.py --- old/distributed-2.11.0/distributed/protocol/cuda.py 2019-11-07 20:29:36.000000000 +0100 +++ new/distributed-2.12.0/distributed/protocol/cuda.py 2020-03-04 22:11:23.000000000 +0100 @@ -1,7 +1,7 @@ import dask from . import pickle -from .serialize import register_serialization_family +from .serialize import ObjectDictSerializer, register_serialization_family from dask.utils import typename cuda_serialize = dask.utils.Dispatch("cuda_serialize") @@ -29,3 +29,8 @@ register_serialization_family("cuda", cuda_dumps, cuda_loads) + + +cuda_object_with_dict_serializer = ObjectDictSerializer("cuda") + +cuda_deserialize.register(dict)(cuda_object_with_dict_serializer.deserialize) diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/distributed-2.11.0/distributed/protocol/cupy.py new/distributed-2.12.0/distributed/protocol/cupy.py --- old/distributed-2.11.0/distributed/protocol/cupy.py 2020-02-19 18:21:14.000000000 +0100 +++ new/distributed-2.12.0/distributed/protocol/cupy.py 2020-03-06 21:01:52.000000000 +0100 @@ -1,10 +1,12 @@ """ Efficient serialization GPU arrays. """ +import copyreg + import cupy from .cuda import cuda_deserialize, cuda_serialize -from .serialize import dask_deserialize, dask_serialize +from .serialize import dask_deserialize, dask_serialize, register_generic try: from .rmm import dask_deserialize_rmm_device_buffer as dask_deserialize_cuda_buffer @@ -80,3 +82,40 @@ frames = [dask_deserialize_cuda_buffer(header, frames)] arr = cuda_deserialize_cupy_ndarray(header, frames) return arr + + +try: + from cupy.cusparse import MatDescriptor + from cupyx.scipy.sparse import spmatrix +except ImportError: + MatDescriptor = None + spmatrix = None + + +if MatDescriptor is not None: + + def reduce_matdescriptor(other): + # Pickling MatDescriptor errors + # xref: https://github.com/cupy/cupy/issues/3061 + return cupy.cusparse.MatDescriptor.create, () + + copyreg.pickle(MatDescriptor, reduce_matdescriptor) + + @cuda_serialize.register(MatDescriptor) + @dask_serialize.register(MatDescriptor) + def serialize_cupy_matdescriptor(x): + header, frames = {}, [] + return header, frames + + @cuda_deserialize.register(MatDescriptor) + @dask_deserialize.register(MatDescriptor) + def deserialize_cupy_matdescriptor(header, frames): + return MatDescriptor.create() + + +if spmatrix is not None: + for n, s, d in [ + ("cuda", cuda_serialize, cuda_deserialize), + ("dask", dask_serialize, dask_deserialize), + ]: + register_generic(spmatrix, n, s, d) diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/distributed-2.11.0/distributed/protocol/scipy.py new/distributed-2.12.0/distributed/protocol/scipy.py --- old/distributed-2.11.0/distributed/protocol/scipy.py 1970-01-01 01:00:00.000000000 +0100 +++ new/distributed-2.12.0/distributed/protocol/scipy.py 2020-03-04 22:11:23.000000000 +0100 @@ -0,0 +1,30 @@ +""" +Efficient serialization of SciPy sparse matrices. +""" +import scipy + +from .serialize import dask_deserialize, dask_serialize, register_generic + +register_generic(scipy.sparse.spmatrix, "dask", dask_serialize, dask_deserialize) + + +@dask_serialize.register(scipy.sparse.dok.dok_matrix) +def serialize_scipy_sparse_dok(x): + x_coo = x.tocoo() + coo_header, coo_frames = dask_serialize(x.tocoo()) + + header = {"coo_header": coo_header} + frames = coo_frames + + return header, frames + + +@dask_deserialize.register(scipy.sparse.dok.dok_matrix) +def deserialize_scipy_sparse_dok(header, frames): + coo_header = header["coo_header"] + coo_frames = frames + x_coo = dask_deserialize(coo_header, coo_frames) + + x = x_coo.todok() + + return x diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/distributed-2.11.0/distributed/protocol/serialize.py new/distributed-2.12.0/distributed/protocol/serialize.py --- old/distributed-2.11.0/distributed/protocol/serialize.py 2020-02-17 23:21:38.000000000 +0100 +++ new/distributed-2.12.0/distributed/protocol/serialize.py 2020-03-04 22:11:23.000000000 +0100 @@ -547,7 +547,9 @@ def _is_msgpack_serializable(v): typ = type(v) return ( - typ is str + v is None + or typ is str + or typ is bool or typ is int or typ is float or isinstance(v, dict) @@ -558,59 +560,69 @@ ) -def serialize_object_with_dict(est): - header = { - "serializer": "dask", - "type-serialized": pickle.dumps(type(est)), - "simple": {}, - "complex": {}, - } - frames = [] - - if isinstance(est, dict): - d = est - else: - d = est.__dict__ - - for k, v in d.items(): - if _is_msgpack_serializable(v): - header["simple"][k] = v - else: - if isinstance(v, dict): - h, f = serialize_object_with_dict(v) - else: - h, f = serialize(v) - header["complex"][k] = { - "header": h, - "start": len(frames), - "stop": len(frames) + len(f), - } - frames += f - return header, frames - - -def deserialize_object_with_dict(header, frames): - cls = pickle.loads(header["type-serialized"]) - if issubclass(cls, dict): - dd = obj = {} - else: - obj = object.__new__(cls) - dd = obj.__dict__ - dd.update(header["simple"]) - for k, d in header["complex"].items(): - h = d["header"] - f = frames[d["start"] : d["stop"]] - v = deserialize(h, f) - dd[k] = v - - return obj - - -dask_deserialize.register(dict)(deserialize_object_with_dict) +class ObjectDictSerializer: + def __init__(self, serializer): + self.serializer = serializer + + def serialize(self, est): + header = { + "serializer": self.serializer, + "type-serialized": pickle.dumps(type(est)), + "simple": {}, + "complex": {}, + } + frames = [] + if isinstance(est, dict): + d = est + else: + d = est.__dict__ -def register_generic(cls): - """ Register dask_(de)serialize to traverse through __dict__ + for k, v in d.items(): + if _is_msgpack_serializable(v): + header["simple"][k] = v + else: + if isinstance(v, dict): + h, f = self.serialize(v) + else: + h, f = serialize(v, serializers=(self.serializer, "pickle")) + header["complex"][k] = { + "header": h, + "start": len(frames), + "stop": len(frames) + len(f), + } + frames += f + return header, frames + + def deserialize(self, header, frames): + cls = pickle.loads(header["type-serialized"]) + if issubclass(cls, dict): + dd = obj = {} + else: + obj = object.__new__(cls) + dd = obj.__dict__ + dd.update(header["simple"]) + for k, d in header["complex"].items(): + h = d["header"] + f = frames[d["start"] : d["stop"]] + v = deserialize(h, f) + dd[k] = v + + return obj + + +dask_object_with_dict_serializer = ObjectDictSerializer("dask") + +dask_deserialize.register(dict)(dask_object_with_dict_serializer.deserialize) + + +def register_generic( + cls, + serializer_name="dask", + serialize_func=dask_serialize, + deserialize_func=dask_deserialize, +): + """ Register (de)serialize to traverse through __dict__ Normally when registering new classes for Dask's custom serialization you need to manage headers and frames, which can be tedious. If all you want @@ -641,5 +653,6 @@ dask_serialize dask_deserialize """ - dask_serialize.register(cls)(serialize_object_with_dict) - dask_deserialize.register(cls)(deserialize_object_with_dict) + object_with_dict_serializer = ObjectDictSerializer(serializer_name) + serialize_func.register(cls)(object_with_dict_serializer.serialize) + deserialize_func.register(cls)(object_with_dict_serializer.deserialize) diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/distributed-2.11.0/distributed/protocol/tests/test_cupy.py new/distributed-2.12.0/distributed/protocol/tests/test_cupy.py --- old/distributed-2.11.0/distributed/protocol/tests/test_cupy.py 2020-02-19 18:21:14.000000000 +0100 +++ new/distributed-2.12.0/distributed/protocol/tests/test_cupy.py 2020-03-06 21:01:52.000000000 +0100 @@ -1,6 +1,7 @@ -from distributed.protocol import serialize, deserialize import pickle + import pytest +from distributed.protocol import deserialize, serialize cupy = pytest.importorskip("cupy") numpy = pytest.importorskip("numpy") @@ -61,3 +62,40 @@ y = deserialize(header, frames, deserializers=("cuda", "dask", "pickle", "error")) assert (x_np == cupy.asnumpy(y)).all() + + +@pytest.mark.parametrize( + "sparse_name", ["coo_matrix", "csc_matrix", "csr_matrix", "dia_matrix",], +) +@pytest.mark.parametrize( + "dtype", + [numpy.dtype("<f4"), numpy.dtype(">f4"), numpy.dtype("<f8"), numpy.dtype(">f8"),], +) +@pytest.mark.parametrize("serializer", ["cuda", "dask", "pickle"]) +def test_serialize_cupy_sparse(sparse_name, dtype, serializer): + scipy_sparse = pytest.importorskip("scipy.sparse") + cupy_sparse = pytest.importorskip("cupyx.scipy.sparse") + + scipy_sparse_type = getattr(scipy_sparse, sparse_name) + cupy_sparse_type = getattr(cupy_sparse, sparse_name) + + a_host = numpy.array([[0, 1, 0], [2, 0, 3], [0, 4, 0]], dtype=dtype) + asp_host = scipy_sparse_type(a_host) + if sparse_name == "dia_matrix": + # CuPy `dia_matrix` cannot be created from SciPy one + # xref: https://github.com/cupy/cupy/issues/3158 + asp_dev = cupy_sparse_type( + (asp_host.data, asp_host.offsets), + shape=asp_host.shape, + dtype=asp_host.dtype, + ) + else: + asp_dev = cupy_sparse_type(asp_host) + + header, frames = serialize(asp_dev, serializers=[serializer]) + a2sp_dev = deserialize(header, frames) + + a2sp_host = a2sp_dev.get() + a2_host = a2sp_host.todense() + + assert (a_host == a2_host).all() diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/distributed-2.11.0/distributed/protocol/tests/test_scipy.py new/distributed-2.12.0/distributed/protocol/tests/test_scipy.py --- old/distributed-2.11.0/distributed/protocol/tests/test_scipy.py 1970-01-01 01:00:00.000000000 +0100 +++ new/distributed-2.12.0/distributed/protocol/tests/test_scipy.py 2020-03-04 22:11:23.000000000 +0100 @@ -0,0 +1,37 @@ +import pytest +from distributed.protocol import deserialize, serialize + +numpy = pytest.importorskip("numpy") +scipy = pytest.importorskip("scipy") +scipy_sparse = pytest.importorskip("scipy.sparse") + + +@pytest.mark.parametrize( + "sparse_type", + [ + scipy_sparse.bsr_matrix, + scipy_sparse.coo_matrix, + scipy_sparse.csc_matrix, + scipy_sparse.csr_matrix, + scipy_sparse.dia_matrix, + scipy_sparse.dok_matrix, + scipy_sparse.lil_matrix, + ], +) +@pytest.mark.parametrize( + "dtype", + [numpy.dtype("<f4"), numpy.dtype(">f4"), numpy.dtype("<f8"), numpy.dtype(">f8"),], +) +def test_serialize_scipy_sparse(sparse_type, dtype): + a = numpy.array([[0, 1, 0], [2, 0, 3], [0, 4, 0]], dtype=dtype) + + anz = a.nonzero() + acoo = scipy_sparse.coo_matrix((a[anz], anz)) + asp = sparse_type(acoo) + + header, frames = serialize(asp, serializers=["dask"]) + asp2 = deserialize(header, frames) + + a2 = asp2.todense() + + assert (a == a2).all() diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/distributed-2.11.0/distributed/scheduler.py new/distributed-2.12.0/distributed/scheduler.py --- old/distributed-2.11.0/distributed/scheduler.py 2020-02-19 18:21:08.000000000 +0100 +++ new/distributed-2.12.0/distributed/scheduler.py 2020-03-06 21:01:52.000000000 +0100 @@ -1267,6 +1267,7 @@ "call_stack": self.get_call_stack, "profile": self.get_profile, "performance_report": self.performance_report, + "get_logs": self.get_logs, "logs": self.get_logs, "worker_logs": self.get_worker_logs, "nbytes": self.get_nbytes, @@ -4648,7 +4649,7 @@ if ts.state == "forgotten": del self.tasks[ts.key] - if ts.state == "forgotten": + if ts.state == "forgotten" and ts.group.name in self.task_groups: # Remove TaskGroup if all tasks are in the forgotten state tg = ts.group if not any(tg.states.get(s) for s in ALL_TASK_STATES): diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/distributed-2.11.0/distributed/stealing.py new/distributed-2.12.0/distributed/stealing.py --- old/distributed-2.11.0/distributed/stealing.py 2019-11-07 20:29:36.000000000 +0100 +++ new/distributed-2.12.0/distributed/stealing.py 2020-03-04 22:11:23.000000000 +0100 @@ -4,9 +4,10 @@ from time import time import dask +from .comm.addressing import get_address_host from .core import CommClosedError from .diagnostics.plugin import SchedulerPlugin -from .utils import log_errors, PeriodicCallback +from .utils import log_errors, parse_timedelta, PeriodicCallback try: from cytoolz import topk @@ -40,8 +41,15 @@ for worker in scheduler.workers: self.add_worker(worker=worker) + # `callback_time` is in milliseconds + callback_time = 1000 * parse_timedelta( + dask.config.get("distributed.scheduler.work-stealing-interval"), + default="ms", + ) pc = PeriodicCallback( - callback=self.balance, callback_time=100, io_loop=self.scheduler.loop + callback=self.balance, + callback_time=callback_time, + io_loop=self.scheduler.loop, ) self._pc = pc self.scheduler.periodic_callbacks["stealing"] = pc @@ -121,11 +129,6 @@ For example a result of zero implies a task without dependencies. level: The location within a stealable list to place this value """ - if not ts.loose_restrictions and ( - ts.host_restrictions or ts.worker_restrictions or ts.resource_restrictions - ): - return None, None # don't steal - if not ts.dependencies: # no dependencies fast path return 0, 0 @@ -251,7 +254,7 @@ self.scheduler.check_idle_saturated(victim) # Victim was waiting, has given up task, enact steal - elif state in ("waiting", "ready"): + elif state in ("waiting", "ready", "constrained"): self.remove_key_from_stealable(ts) ts.processing_on = thief duration = victim.processing.pop(ts) @@ -353,14 +356,23 @@ i += 1 if not idle: break - idl = idle[i % len(idle)] + + if _has_restrictions(ts): + thieves = [ws for ws in idle if _can_steal(ws, ts, sat)] + else: + thieves = idle + if not thieves: + break + thief = thieves[i % len(thieves)] duration = sat.processing.get(ts) if duration is None: stealable.discard(ts) continue - maybe_move_task(level, ts, sat, idl, duration, cost_multiplier) + maybe_move_task( + level, ts, sat, thief, duration, cost_multiplier + ) if self.cost_multipliers[level] < 20: # don't steal from public at cost stealable = self.stealable_all[level] @@ -381,10 +393,18 @@ continue i += 1 - idl = idle[i % len(idle)] + if _has_restrictions(ts): + thieves = [ws for ws in idle if _can_steal(ws, ts, sat)] + else: + thieves = idle + if not thieves: + continue + thief = thieves[i % len(thieves)] duration = sat.processing[ts] - maybe_move_task(level, ts, sat, idl, duration, cost_multiplier) + maybe_move_task( + level, ts, sat, thief, duration, cost_multiplier + ) if log: self.log.append(log) @@ -415,4 +435,41 @@ return out +def _has_restrictions(ts): + """Determine whether the given task has restrictions and whether these + restrictions are strict. + """ + return not ts.loose_restrictions and ( + ts.host_restrictions or ts.worker_restrictions or ts.resource_restrictions + ) + + +def _can_steal(thief, ts, victim): + """Determine whether worker ``thief`` can steal task ``ts`` from worker + ``victim``. + + Assumes that `ts` has some restrictions. + """ + if ( + ts.host_restrictions + and get_address_host(thief.address) not in ts.host_restrictions + ): + return False + elif ts.worker_restrictions and thief.address not in ts.worker_restrictions: + return False + + if victim.resources is None: + return True + + for resource, value in victim.resources.items(): + try: + supplied = thief.resources[resource] + except KeyError: + return False + else: + if supplied < value: + return False + return True + + fast_tasks = {"shuffle-split"} diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/distributed-2.11.0/distributed/tests/test_nanny.py new/distributed-2.12.0/distributed/tests/test_nanny.py --- old/distributed-2.11.0/distributed/tests/test_nanny.py 2020-02-12 03:23:12.000000000 +0100 +++ new/distributed-2.12.0/distributed/tests/test_nanny.py 2020-03-04 15:31:16.000000000 +0100 @@ -402,6 +402,16 @@ yield w.close() +@gen_cluster(nthreads=[]) +def test_local_directory(s): + with tmpfile() as fn: + with dask.config.set(temporary_directory=fn): + w = yield Nanny(s.address) + assert w.local_directory.startswith(fn) + assert "dask-worker-space" in w.local_directory + yield w.close() + + def _noop(x): """Define here because closures aren't pickleable.""" pass diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/distributed-2.11.0/distributed/tests/test_scheduler.py new/distributed-2.12.0/distributed/tests/test_scheduler.py --- old/distributed-2.11.0/distributed/tests/test_scheduler.py 2020-02-12 03:23:12.000000000 +0100 +++ new/distributed-2.12.0/distributed/tests/test_scheduler.py 2020-03-06 21:01:52.000000000 +0100 @@ -1837,6 +1837,17 @@ assert s.task_prefixes["sum"].states["memory"] == 2 +@gen_cluster(client=True) +async def test_task_group_on_fire_and_forget(c, s, a, b): + # Regression test for https://github.com/dask/distributed/issues/3465 + with captured_logger("distributed.scheduler") as logs: + x = await c.scatter(list(range(10))) + fire_and_forget([c.submit(slowadd, i, x[i]) for i in range(len(x))]) + await asyncio.sleep(1) + + assert "Error transitioning" not in logs.getvalue() + + class BrokenComm(Comm): peer_address = None local_address = None diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/distributed-2.11.0/distributed/tests/test_steal.py new/distributed-2.12.0/distributed/tests/test_steal.py --- old/distributed-2.11.0/distributed/tests/test_steal.py 2020-02-14 16:29:09.000000000 +0100 +++ new/distributed-2.12.0/distributed/tests/test_steal.py 2020-03-04 22:11:23.000000000 +0100 @@ -9,6 +9,7 @@ from toolz import sliding_window, concat from tornado import gen +import dask from distributed import Nanny, Worker, wait, worker_client from distributed.config import config from distributed.metrics import time @@ -223,6 +224,32 @@ assert len(b.task_state) == 0 +@gen_cluster( + client=True, nthreads=[("127.0.0.1", 1), ("127.0.0.1", 2), ("127.0.0.1", 2)] +) +def test_steal_worker_restrictions(c, s, wa, wb, wc): + future = c.submit(slowinc, 1, delay=0.1, workers={wa.address, wb.address}) + yield future + + ntasks = 100 + futures = c.map(slowinc, range(ntasks), delay=0.1, workers={wa.address, wb.address}) + + while sum(len(w.task_state) for w in [wa, wb, wc]) < ntasks: + yield gen.sleep(0.01) + + assert 0 < len(wa.task_state) < ntasks + assert 0 < len(wb.task_state) < ntasks + assert len(wc.task_state) == 0 + + s.extensions["stealing"].balance() + + yield gen.sleep(0.1) + + assert 0 < len(wa.task_state) < ntasks + assert 0 < len(wb.task_state) < ntasks + assert len(wc.task_state) == 0 + + @pytest.mark.skipif( not sys.platform.startswith("linux"), reason="Need 127.0.0.2 to mean localhost" ) @@ -244,6 +271,34 @@ assert len(b.task_state) == 0 +@pytest.mark.skipif( + not sys.platform.startswith("linux"), reason="Need 127.0.0.2 to mean localhost" +) +@gen_cluster(client=True, nthreads=[("127.0.0.1", 1), ("127.0.0.2", 2)]) +def test_steal_host_restrictions(c, s, wa, wb): + future = c.submit(slowinc, 1, delay=0.10, workers=wa.address) + yield future + + ntasks = 100 + futures = c.map(slowinc, range(ntasks), delay=0.1, workers="127.0.0.1") + while len(wa.task_state) < ntasks: + yield gen.sleep(0.01) + assert len(wa.task_state) == ntasks + assert len(wb.task_state) == 0 + + wc = yield Worker(s.address, ncores=1) + + start = time() + while not wc.task_state or len(wa.task_state) == ntasks: + yield gen.sleep(0.01) + assert time() < start + 3 + + yield gen.sleep(0.1) + assert 0 < len(wa.task_state) < ntasks + assert len(wb.task_state) == 0 + assert 0 < len(wc.task_state) < ntasks + + @gen_cluster( client=True, nthreads=[("127.0.0.1", 1, {"resources": {"A": 2}}), ("127.0.0.1", 1)] ) @@ -264,7 +319,6 @@ assert len(b.task_state) == 0 -@pytest.mark.skip(reason="no stealing of resources") @gen_cluster( client=True, nthreads=[("127.0.0.1", 1, {"resources": {"A": 2}})], timeout=3 ) @@ -676,3 +730,20 @@ out = log.getvalue() assert "Error" not in out + + +@gen_cluster(client=True) +def test_worker_stealing_interval(c, s, a, b): + from distributed.scheduler import WorkStealing + + ws = WorkStealing(s) + assert ws._pc.callback_time == 100 + + with dask.config.set({"distributed.scheduler.work-stealing-interval": "500ms"}): + ws = WorkStealing(s) + assert ws._pc.callback_time == 500 + + # Default unit is `ms` + with dask.config.set({"distributed.scheduler.work-stealing-interval": 2}): + ws = WorkStealing(s) + assert ws._pc.callback_time == 2 diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/distributed-2.11.0/distributed/tests/test_worker.py new/distributed-2.12.0/distributed/tests/test_worker.py --- old/distributed-2.11.0/distributed/tests/test_worker.py 2020-02-14 16:29:09.000000000 +0100 +++ new/distributed-2.12.0/distributed/tests/test_worker.py 2020-03-04 22:11:23.000000000 +0100 @@ -31,7 +31,7 @@ wait, ) from distributed.compatibility import WINDOWS -from distributed.core import rpc +from distributed.core import rpc, CommClosedError from distributed.scheduler import Scheduler from distributed.metrics import time from distributed.worker import Worker, error_message, logger, parse_memory_limit @@ -1629,3 +1629,26 @@ if w.digests is not None: assert w.digests["latency"].size() > 0 + + +@pytest.mark.asyncio +@pytest.mark.parametrize("reconnect", [True, False]) +async def test_heartbeat_comm_closed(cleanup, monkeypatch, reconnect): + with captured_logger("distributed.worker", level=logging.WARNING) as logger: + async with await Scheduler() as s: + + def bad_heartbeat_worker(*args, **kwargs): + raise CommClosedError() + + async with await Worker(s.address, reconnect=reconnect) as w: + # Trigger CommClosedError during worker heartbeat + monkeypatch.setattr( + w.scheduler, "heartbeat_worker", bad_heartbeat_worker + ) + + await w.heartbeat() + if reconnect: + assert w.status == "running" + else: + assert w.status == "closed" + assert "Heartbeat to scheduler failed" in logger.getvalue() diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/distributed-2.11.0/distributed/worker.py new/distributed-2.12.0/distributed/worker.py --- old/distributed-2.11.0/distributed/worker.py 2020-02-18 16:44:44.000000000 +0100 +++ new/distributed-2.12.0/distributed/worker.py 2020-03-04 22:11:23.000000000 +0100 @@ -382,7 +382,6 @@ self.executed_count = 0 self.long_running = set() - self.batched_stream = None self.recent_messages_log = deque( maxlen=dask.config.get("distributed.comm.recent-messages-log-length") ) @@ -493,7 +492,7 @@ if local_directory is None: local_directory = dask.config.get("temporary-directory") or os.getcwd() if not os.path.exists(local_directory): - os.mkdir(local_directory) + os.makedirs(local_directory) local_directory = os.path.join(local_directory, "dask-worker-space") with warn_on_duration( @@ -573,6 +572,7 @@ self.actor_executor = ThreadPoolExecutor( 1, thread_name_prefix="Dask-Actor-Threads" ) + self.batched_stream = BatchedSend(interval="2ms", loop=self.loop) self.name = name self.scheduler_delay = 0 self.stream_comms = dict() @@ -650,6 +650,13 @@ pc = PeriodicCallback(self.heartbeat, 1000, io_loop=self.io_loop) self.periodic_callbacks["heartbeat"] = pc + pc = PeriodicCallback( + lambda: self.batched_stream.send({"op": "keep-alive"}), + 60000, + io_loop=self.io_loop, + ) + self.periodic_callbacks["keep-alive"] = pc + self._address = contact_address if self.memory_limit: @@ -797,6 +804,7 @@ ##################### async def _register_with_scheduler(self): + self.periodic_callbacks["keep-alive"].stop() self.periodic_callbacks["heartbeat"].stop() start = time() if self.contact_address is None: @@ -863,15 +871,8 @@ logger.info(" Registered to: %26s", self.scheduler.address) logger.info("-" * 49) - self.batched_stream = BatchedSend(interval="2ms", loop=self.loop) self.batched_stream.start(comm) - pc = PeriodicCallback( - lambda: self.batched_stream.send({"op": "keep-alive"}), - 60000, - io_loop=self.io_loop, - ) - self.periodic_callbacks["keep-alive"] = pc - pc.start() + self.periodic_callbacks["keep-alive"].start() self.periodic_callbacks["heartbeat"].start() self.loop.add_callback(self.handle_scheduler, comm) @@ -912,14 +913,16 @@ ) self.bandwidth_workers.clear() self.bandwidth_types.clear() + except CommClosedError: + logger.warning("Heartbeat to scheduler failed") + if not self.reconnect: + await self.close(report=False) except IOError as e: # Scheduler is gone. Respect distributed.comm.timeouts.connect if "Timed out trying to connect" in str(e): await self.close(report=False) else: raise e - except CommClosedError: - logger.warning("Heartbeat to scheduler failed") finally: self.heartbeat_active = False else: @@ -1112,7 +1115,11 @@ for k, v in self.services.items(): v.stop() - if self.batched_stream and not self.batched_stream.comm.closed(): + if ( + self.batched_stream + and self.batched_stream.comm + and not self.batched_stream.comm.closed() + ): self.batched_stream.send({"op": "close-stream"}) if self.batched_stream: @@ -2143,7 +2150,7 @@ response = {"op": "steal-response", "key": key, "state": state} self.batched_stream.send(response) - if state in ("ready", "waiting"): + if state in ("ready", "waiting", "constrained"): self.release_key(key) def release_key(self, key, cause=None, reason=None, report=True): diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/distributed-2.11.0/distributed.egg-info/PKG-INFO new/distributed-2.12.0/distributed.egg-info/PKG-INFO --- old/distributed-2.11.0/distributed.egg-info/PKG-INFO 2020-02-19 18:46:22.000000000 +0100 +++ new/distributed-2.12.0/distributed.egg-info/PKG-INFO 2020-03-06 21:17:16.000000000 +0100 @@ -1,6 +1,6 @@ Metadata-Version: 1.2 Name: distributed -Version: 2.11.0 +Version: 2.12.0 Summary: Distributed scheduler for Dask Home-page: https://distributed.dask.org Maintainer: Matthew Rocklin diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/distributed-2.11.0/distributed.egg-info/SOURCES.txt new/distributed-2.12.0/distributed.egg-info/SOURCES.txt --- old/distributed-2.11.0/distributed.egg-info/SOURCES.txt 2020-02-19 18:46:22.000000000 +0100 +++ new/distributed-2.12.0/distributed.egg-info/SOURCES.txt 2020-03-06 21:17:16.000000000 +0100 @@ -79,6 +79,7 @@ distributed/comm/tests/__init__.py distributed/comm/tests/test_comms.py distributed/comm/tests/test_ucx.py +distributed/comm/tests/test_ucx_config.py distributed/dashboard/__init__.py distributed/dashboard/core.py distributed/dashboard/export_tool.coffee @@ -171,6 +172,7 @@ distributed/protocol/numpy.py distributed/protocol/pickle.py distributed/protocol/rmm.py +distributed/protocol/scipy.py distributed/protocol/serialize.py distributed/protocol/sparse.py distributed/protocol/torch.py @@ -189,6 +191,7 @@ distributed/protocol/tests/test_protocol.py distributed/protocol/tests/test_protocol_utils.py distributed/protocol/tests/test_rmm.py +distributed/protocol/tests/test_scipy.py distributed/protocol/tests/test_serialize.py distributed/protocol/tests/test_sklearn.py distributed/protocol/tests/test_sparse.py diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/distributed-2.11.0/docs/source/api.rst new/distributed-2.12.0/docs/source/api.rst --- old/distributed-2.11.0/docs/source/api.rst 2020-01-10 22:26:26.000000000 +0100 +++ new/distributed-2.12.0/docs/source/api.rst 2020-03-04 22:11:23.000000000 +0100 @@ -148,6 +148,28 @@ .. autoclass:: Future :members: +Cluster +------- + +Classes relevant for cluster creation and management. Other libraries +(like `dask-jobqueue`_, `dask-gateway`_, `dask-kubernetes`_, `dask-yarn`_ etc.) +provide additional cluster objects. + +.. _dask-jobqueue: https://jobqueue.dask.org/ +.. _dask-gateway: https://gateway.dask.org/ +.. _dask-kubernetes: https://kubernetes.dask.org/ +.. _dask-yarn: https://yarn.dask.org/en/latest/ + +.. autosummary:: + LocalCluster + SpecCluster + +.. autoclass:: LocalCluster + :members: + +.. autoclass:: SpecCluster + :members: + Other ----- diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/distributed-2.11.0/docs/source/changelog.rst new/distributed-2.12.0/docs/source/changelog.rst --- old/distributed-2.11.0/docs/source/changelog.rst 2020-02-19 18:44:47.000000000 +0100 +++ new/distributed-2.12.0/docs/source/changelog.rst 2020-03-06 21:14:55.000000000 +0100 @@ -1,6 +1,34 @@ Changelog ========= +2.12.0 - 2020-03-06 +------------------- + +- Update ``TaskGroup`` remove logic (:pr:`3557`) `James Bourbeau`_ +- Fix-up CuPy sparse serialization (:pr:`3556`) `John Kirkham`_ +- API docs for ``LocalCluster`` and ``SpecCluster`` (:pr:`3548`) `Tom Augspurger`_ +- Serialize sparse arrays (:pr:`3545`) `John Kirkham`_ +- Allow tasks with restrictions to be stolen (:pr:`3069`) `Stan Seibert`_ +- Use UCX default configuration instead of raising (:pr:`3544`) `Peter Andreas Entschev`_ +- Support using other serializers with ``register_generic`` (:pr:`3536`) `John Kirkham`_ +- DOC: update to async await (:pr:`3543`) `Tom Augspurger`_ +- Use ``pytest.raises`` in ``test_ucx_config.py`` (:pr:`3541`) `John Kirkham`_ +- Fix/more ucx config options (:pr:`3539`) `Benjamin Zaitlen`_ +- Update heartbeat ``CommClosedError`` error handling (:pr:`3529`) `James Bourbeau`_ +- Use ``makedirs`` when constructing ``local_directory`` (:pr:`3538`) `John Kirkham`_ +- Mark ``None`` as MessagePack serializable (:pr:`3537`) `John Kirkham`_ +- Mark ``bool`` as MessagePack serializable (:pr:`3535`) `John Kirkham`_ +- Use 'temporary-directory' from ``dask.config`` for Nanny's directory (:pr:`3531`) `John Kirkham`_ +- Add try-except around getting source code in performance report (:pr:`3505`) `Matthew Rocklin`_ +- Fix typo in docstring (:pr:`3528`) `Davis Bennett`_ +- Make work stealing callback time configurable (:pr:`3523`) `Lucas Rademaker`_ +- RMM/UCX Config Flags (:pr:`3515`) `Benjamin Zaitlen`_ +- Revise develop-docs: conda env example (:pr:`3406`) `Darren Weber`_ +- Remove ``import ucp`` from the top of ``ucx.py`` (:pr:`3510`) `Peter Andreas Entschev`_ +- Rename ``logs`` to ``get_logs`` (:pr:`3473`) `Jacob Tomlinson`_ +- Stop keep alives when worker reconnecting to the scheduler (:pr:`3493`) `Jacob Tomlinson`_ + + 2.11.0 - 2020-02-19 ------------------- @@ -1570,3 +1598,7 @@ .. _`Cyril Shcherbin`: https://github.com/shcherbin .. _`Søren Fuglede Jørgensen`: https://github.com/fuglede .. _`Igor Gotlibovych`: https://github.com/ig248 +.. _`Stan Seibert`: https://github.com/seibert +.. _`Davis Bennett`: https://github.com/d-v-b +.. _`Lucas Rademaker`: https://github.com/lr4d +.. _`Darren Weber`: https://github.com/dazza-codes diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/distributed-2.11.0/docs/source/develop.rst new/distributed-2.12.0/docs/source/develop.rst --- old/distributed-2.11.0/docs/source/develop.rst 2020-02-12 03:23:12.000000000 +0100 +++ new/distributed-2.12.0/docs/source/develop.rst 2020-03-04 22:11:23.000000000 +0100 @@ -22,6 +22,26 @@ cd distributed python setup.py install +Using conda, for example:: + + git clone g...@github.com:{your-fork}/distributed.git + cd distributed + conda create -y -n distributed python=3.6 + conda activate distributed + python -m pip install -U -r requirements.txt + python -m pip install -U -r dev-requirements.txt + python -m pip install -e . + +To keep a fork in sync with the upstream source:: + + cd distributed + git remote add upstream g...@github.com:dask/distributed.git + git remote -v + git fetch -a upstream + git checkout master + git pull upstream master + git push origin master + Test ---- @@ -88,20 +108,20 @@ from distributed import Client, Future, Scheduler, Worker @gen_cluster(client=True) - def test_submit(c, s, a, b): + async def test_submit(c, s, a, b): assert isinstance(c, Client) assert isinstance(s, Scheduler) assert isinstance(a, Worker) assert isinstance(b, Worker) - + future = c.submit(inc, 1) assert isinstance(future, Future) assert future.key in c.futures - + # result = future.result() # This synchronous API call would block - result = yield future + result = await future assert result == 2 - + assert future.key in s.tasks assert future.key in a.data or future.key in b.data @@ -111,8 +131,8 @@ the state of every element of the cluster directly. However, you can not use the normal synchronous API (doing so will cause the test to wait forever) and instead you need to use the coroutine API, where all blocking functions are -prepended with an underscore (``_``). Beware, it is a common mistake to use -the blocking interface within these tests. +prepended with an underscore (``_``) and awaited with ``await``. +Beware, it is a common mistake to use the blocking interface within these tests. If you want to test the normal synchronous API you can use the ``client`` pytest fixture style test, which sets up a scheduler and workers for you in @@ -146,7 +166,7 @@ In this style of test you do not have access to the scheduler or workers. The variables ``s, a, b`` are now dictionaries holding a ``multiprocessing.Process`` object and a port integer. However, you can now -use the normal synchronous API (never use yield in this style of test) and you +use the normal synchronous API (never use ``await`` in this style of test) and you can close processes easily by terminating them. Typically for most user-facing functions you will find both kinds of tests.