Hello community,

here is the log from the commit of package python-distributed for 
openSUSE:Factory checked in at 2020-03-12 23:05:52
++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Comparing /work/SRC/openSUSE:Factory/python-distributed (Old)
 and      /work/SRC/openSUSE:Factory/.python-distributed.new.3160 (New)
++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++

Package is "python-distributed"

Thu Mar 12 23:05:52 2020 rev:26 rq:782795 version:2.12.0

Changes:
--------
--- /work/SRC/openSUSE:Factory/python-distributed/python-distributed.changes    
2020-02-24 15:53:30.439618955 +0100
+++ 
/work/SRC/openSUSE:Factory/.python-distributed.new.3160/python-distributed.changes
  2020-03-12 23:11:17.127299746 +0100
@@ -1,0 +2,37 @@
+Sun Mar  8 19:04:45 UTC 2020 - Arun Persaud <a...@gmx.de>
+
+- update to version 2.12.0:
+  * Update TaskGroup remove logic (GH#3557) James Bourbeau
+  * Fix-up CuPy sparse serialization (GH#3556) John Kirkham
+  * API docs for LocalCluster and SpecCluster (GH#3548) Tom Augspurger
+  * Serialize sparse arrays (GH#3545) John Kirkham
+  * Allow tasks with restrictions to be stolen (GH#3069) Stan Seibert
+  * Use UCX default configuration instead of raising (GH#3544) Peter
+    Andreas Entschev
+  * Support using other serializers with register_generic (GH#3536)
+    John Kirkham
+  * DOC: update to async await (GH#3543) Tom Augspurger
+  * Use pytest.raises in test_ucx_config.py (GH#3541) John Kirkham
+  * Fix/more ucx config options (GH#3539) Benjamin Zaitlen
+  * Update heartbeat CommClosedError error handling (GH#3529) James
+    Bourbeau
+  * Use makedirs when constructing local_directory (GH#3538) John
+    Kirkham
+  * Mark None as MessagePack serializable (GH#3537) John Kirkham
+  * Mark bool as MessagePack serializable (GH#3535) John Kirkham
+  * Use ‘temporary-directory’ from dask.config for Nanny’s directory
+    (GH#3531) John Kirkham
+  * Add try-except around getting source code in performance report
+    (GH#3505) Matthew Rocklin
+  * Fix typo in docstring (GH#3528) Davis Bennett
+  * Make work stealing callback time configurable (GH#3523) Lucas
+    Rademaker
+  * RMM/UCX Config Flags (GH#3515) Benjamin Zaitlen
+  * Revise develop-docs: conda env example (GH#3406) Darren Weber
+  * Remove import ucp from the top of ucx.py (GH#3510) Peter Andreas
+    Entschev
+  * Rename logs to get_logs (GH#3473) Jacob Tomlinson
+  * Stop keep alives when worker reconnecting to the scheduler
+    (GH#3493) Jacob Tomlinson
+
+-------------------------------------------------------------------

Old:
----
  distributed-2.11.0.tar.gz

New:
----
  distributed-2.12.0.tar.gz

++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++

Other differences:
------------------
++++++ python-distributed.spec ++++++
--- /var/tmp/diff_new_pack.ipN7Jq/_old  2020-03-12 23:11:18.459300271 +0100
+++ /var/tmp/diff_new_pack.ipN7Jq/_new  2020-03-12 23:11:18.463300273 +0100
@@ -21,7 +21,7 @@
 # Test requires network connection
 %bcond_with     test
 Name:           python-distributed
-Version:        2.11.0
+Version:        2.12.0
 Release:        0
 Summary:        Library for distributed computing with Python
 License:        BSD-3-Clause

++++++ distributed-2.11.0.tar.gz -> distributed-2.12.0.tar.gz ++++++
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/distributed-2.11.0/PKG-INFO 
new/distributed-2.12.0/PKG-INFO
--- old/distributed-2.11.0/PKG-INFO     2020-02-19 18:46:22.634171200 +0100
+++ new/distributed-2.12.0/PKG-INFO     2020-03-06 21:17:16.877646200 +0100
@@ -1,6 +1,6 @@
 Metadata-Version: 1.2
 Name: distributed
-Version: 2.11.0
+Version: 2.12.0
 Summary: Distributed scheduler for Dask
 Home-page: https://distributed.dask.org
 Maintainer: Matthew Rocklin
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/distributed-2.11.0/distributed/_version.py 
new/distributed-2.12.0/distributed/_version.py
--- old/distributed-2.11.0/distributed/_version.py      2020-02-19 
18:46:22.636481300 +0100
+++ new/distributed-2.12.0/distributed/_version.py      2020-03-06 
21:17:16.880402800 +0100
@@ -8,11 +8,11 @@
 
 version_json = '''
 {
- "date": "2020-02-19T11:45:15-0600",
+ "date": "2020-03-06T14:16:22-0600",
  "dirty": false,
  "error": null,
- "full-revisionid": "2a05299a934a2557b985dee93da1e0eff8689178",
- "version": "2.11.0"
+ "full-revisionid": "15591929f3a6b7b390a7a5394e1f53fe6a6c16f4",
+ "version": "2.12.0"
 }
 '''  # END VERSION_JSON
 
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/distributed-2.11.0/distributed/client.py 
new/distributed-2.12.0/distributed/client.py
--- old/distributed-2.11.0/distributed/client.py        2020-02-17 
23:21:38.000000000 +0100
+++ new/distributed-2.12.0/distributed/client.py        2020-03-05 
18:15:15.000000000 +0100
@@ -502,7 +502,7 @@
 
     It is also common to create a Client without specifying the scheduler
     address , like ``Client()``.  In this case the Client creates a
-    ``LocalCluster`` in the background and connects to that.  Any extra
+    :class:`LocalCluster` in the background and connects to that.  Any extra
     keywords are passed from Client to LocalCluster in this case.  See the
     LocalCluster documentation for more information.
 
@@ -569,7 +569,7 @@
     See Also
     --------
     distributed.scheduler.Scheduler: Internal scheduler
-    distributed.deploy.local.LocalCluster:
+    distributed.LocalCluster:
     """
 
     _instances = weakref.WeakSet()
@@ -4597,8 +4597,11 @@
 
     async def __aexit__(self, typ, value, traceback, code=None):
         if not code:
-            frame = inspect.currentframe().f_back
-            code = inspect.getsource(frame)
+            try:
+                frame = inspect.currentframe().f_back
+                code = inspect.getsource(frame)
+            except Exception:
+                code = ""
         data = await get_client().scheduler.performance_report(
             start=self.start, code=code
         )
@@ -4609,8 +4612,11 @@
         get_client().sync(self.__aenter__)
 
     def __exit__(self, typ, value, traceback):
-        frame = inspect.currentframe().f_back
-        code = inspect.getsource(frame)
+        try:
+            frame = inspect.currentframe().f_back
+            code = inspect.getsource(frame)
+        except Exception:
+            code = ""
         get_client().sync(self.__aexit__, type, value, traceback, code=code)
 
 
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' 
old/distributed-2.11.0/distributed/comm/tests/test_ucx.py 
new/distributed-2.12.0/distributed/comm/tests/test_ucx.py
--- old/distributed-2.11.0/distributed/comm/tests/test_ucx.py   2020-02-12 
03:23:12.000000000 +0100
+++ new/distributed-2.12.0/distributed/comm/tests/test_ucx.py   2020-03-01 
17:01:14.000000000 +0100
@@ -10,7 +10,7 @@
 from distributed.protocol import to_serialize
 from distributed.deploy.local import LocalCluster
 from dask.dataframe.utils import assert_eq
-from distributed.utils_test import gen_test, loop, inc, cleanup  # noqa: 401
+from distributed.utils_test import gen_test, loop, inc, cleanup, popen  # 
noqa: 401
 
 from .test_comms import check_deserialize
 
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' 
old/distributed-2.11.0/distributed/comm/tests/test_ucx_config.py 
new/distributed-2.12.0/distributed/comm/tests/test_ucx_config.py
--- old/distributed-2.11.0/distributed/comm/tests/test_ucx_config.py    
1970-01-01 01:00:00.000000000 +0100
+++ new/distributed-2.12.0/distributed/comm/tests/test_ucx_config.py    
2020-03-04 22:11:23.000000000 +0100
@@ -0,0 +1,98 @@
+import pytest
+from time import sleep
+
+import dask
+from dask.utils import format_bytes
+from distributed import Client
+from distributed.utils_test import gen_test, loop, inc, cleanup, popen  # 
noqa: 401
+from distributed.utils import get_ip
+from distributed.comm.ucx import _scrub_ucx_config
+
+try:
+    HOST = get_ip()
+except Exception:
+    HOST = "127.0.0.1"
+
+ucp = pytest.importorskip("ucp")
+rmm = pytest.importorskip("rmm")
+
+
+@pytest.mark.asyncio
+async def test_ucx_config(cleanup):
+
+    ucx = {
+        "nvlink": True,
+        "infiniband": True,
+        "net-devices": "",
+        "tcp": True,
+        "cuda_copy": True,
+    }
+
+    with dask.config.set(ucx=ucx):
+        ucx_config = _scrub_ucx_config()
+        assert ucx_config.get("TLS") == "rc,tcp,sockcm,cuda_copy,cuda_ipc"
+        assert ucx_config.get("NET_DEVICES") is None
+
+    ucx = {
+        "nvlink": False,
+        "infiniband": True,
+        "net-devices": "mlx5_0:1",
+        "tcp": True,
+        "cuda_copy": False,
+    }
+
+    with dask.config.set(ucx=ucx):
+        ucx_config = _scrub_ucx_config()
+        assert ucx_config.get("TLS") == "rc,tcp,sockcm"
+        assert ucx_config.get("NET_DEVICES") == "mlx5_0:1"
+
+    ucx = {
+        "nvlink": False,
+        "infiniband": True,
+        "net-devices": "all",
+        "MEMTYPE_CACHE": "y",
+        "tcp": True,
+        "cuda_copy": True,
+    }
+
+    with dask.config.set(ucx=ucx):
+        ucx_config = _scrub_ucx_config()
+        assert ucx_config.get("TLS") == "rc,tcp,sockcm,cuda_copy"
+        assert ucx_config.get("MEMTYPE_CACHE") == "y"
+
+
+def test_ucx_config_w_env_var(cleanup, loop, monkeypatch):
+    size = "1000.00 MB"
+    monkeypatch.setenv("DASK_RMM__POOL_SIZE", size)
+
+    dask.config.refresh()
+
+    port = "13339"
+    sched_addr = "ucx://%s:%s" % (HOST, port)
+
+    with popen(
+        ["dask-scheduler", "--no-dashboard", "--protocol", "ucx", "--port", 
port]
+    ) as sched:
+        with popen(
+            [
+                "dask-worker",
+                sched_addr,
+                "--no-dashboard",
+                "--protocol",
+                "ucx",
+                "--no-nanny",
+            ]
+        ) as w:
+            with Client(sched_addr, loop=loop, timeout=10) as c:
+                while not c.scheduler_info()["workers"]:
+                    sleep(0.1)
+
+                # configured with 1G pool
+                rmm_usage = c.run_on_scheduler(rmm.get_info)
+                assert size == format_bytes(rmm_usage.free)
+
+                # configured with 1G pool
+                worker_addr = list(c.scheduler_info()["workers"])[0]
+                worker_rmm_usage = c.run(rmm.get_info)
+                rmm_usage = worker_rmm_usage[worker_addr]
+                assert size == format_bytes(rmm_usage.free)
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/distributed-2.11.0/distributed/comm/ucx.py 
new/distributed-2.12.0/distributed/comm/ucx.py
--- old/distributed-2.11.0/distributed/comm/ucx.py      2020-02-14 
16:29:09.000000000 +0100
+++ new/distributed-2.12.0/distributed/comm/ucx.py      2020-03-04 
22:11:23.000000000 +0100
@@ -5,8 +5,6 @@
 
 .. _UCX: https://github.com/openucx/ucx
 """
-import ucp
-
 import logging
 
 import dask
@@ -16,17 +14,23 @@
 from .core import Comm, Connector, Listener, CommClosedError
 from .registry import Backend, backends
 from .utils import ensure_concrete_host, to_frames, from_frames
-from ..utils import ensure_ip, get_ip, get_ipv6, nbytes, log_errors, 
CancelledError
-
-import dask
-import numpy as np
-
+from ..utils import (
+    ensure_ip,
+    get_ip,
+    get_ipv6,
+    nbytes,
+    log_errors,
+    CancelledError,
+    parse_bytes,
+)
 
 logger = logging.getLogger(__name__)
 
 
 # In order to avoid double init when forking/spawning new processes 
(multiprocess),
-# we make sure only to import and initialize UCX once at first use.
+# we make sure only to import and initialize UCX once at first use. This is 
also
+# required to ensure Dask configuration gets propagated to UCX, which needs
+# variables to be set before being imported.
 ucp = None
 cuda_array = None
 
@@ -39,7 +43,11 @@
     import ucp as _ucp
 
     ucp = _ucp
-    ucp.init(options=dask.config.get("ucx"), env_takes_precedence=True)
+
+    # remove/process dask.ucx flags for valid ucx options
+    ucx_config = _scrub_ucx_config()
+
+    ucp.init(options=ucx_config, env_takes_precedence=True)
 
     # Find the function, `cuda_array()`, to use when allocating new CUDA arrays
     try:
@@ -61,6 +69,13 @@
                     "In order to send/recv CUDA arrays, Numba or RMM is 
required"
                 )
 
+    pool_size_str = dask.config.get("rmm.pool-size")
+    if pool_size_str is not None:
+        pool_size = parse_bytes(pool_size_str)
+        rmm.reinitialize(
+            pool_allocator=True, managed_memory=False, 
initial_pool_size=pool_size
+        )
+
 
 class UCX(Comm):
     """Comm object using UCP.
@@ -328,3 +343,58 @@
 
 
 backends["ucx"] = UCXBackend()
+
+
+def _scrub_ucx_config():
+    """Function to scrub dask config options for valid UCX config options"""
+
+    # configuration of UCX can happen in two ways:
+    # 1) high level on/off flags which correspond to UCX configuration
+    # 2) explicity defined UCX configuration flags
+
+    # import does not initialize ucp -- this will occur outside this function
+    from ucp import get_config
+
+    options = {}
+
+    # if any of the high level flags are set, as long as they are not 
Null/None,
+    # we assume we should configure basic TLS settings for UCX, otherwise we
+    # leave UCX to its default configuration
+    if any(
+        [
+            dask.config.get("ucx.tcp"),
+            dask.config.get("ucx.nvlink"),
+            dask.config.get("ucx.infiniband"),
+        ]
+    ):
+        tls = "tcp,sockcm"
+        tls_priority = "sockcm"
+
+        # CUDA COPY can optionally be used with ucx -- we rely on the user
+        # to define when messages will include CUDA objects.  Note:
+        # defining only the Infiniband flag will not enable cuda_copy
+        if any([dask.config.get("ucx.nvlink"), 
dask.config.get("ucx.cuda_copy")]):
+            tls = tls + ",cuda_copy"
+
+        if dask.config.get("ucx.infiniband"):
+            tls = "rc," + tls
+        if dask.config.get("ucx.nvlink"):
+            tls = tls + ",cuda_ipc"
+
+        options = {"TLS": tls, "SOCKADDR_TLS_PRIORITY": tls_priority}
+
+        net_devices = dask.config.get("ucx.net-devices")
+        if net_devices is not None and net_devices != "":
+            options["NET_DEVICES"] = net_devices
+
+    # ANY UCX options defined in config will overwrite high level dask.ucx 
flags
+    valid_ucx_keys = list(get_config().keys())
+    for k, v in dask.config.get("ucx").items():
+        if k in valid_ucx_keys:
+            options[k] = v
+        else:
+            logger.debug(
+                "Key: %s with value: %s not a valid UCX configuration option" 
% (k, v)
+            )
+
+    return options
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' 
old/distributed-2.11.0/distributed/deploy/adaptive_core.py 
new/distributed-2.12.0/distributed/deploy/adaptive_core.py
--- old/distributed-2.11.0/distributed/deploy/adaptive_core.py  2019-11-11 
22:08:57.000000000 +0100
+++ new/distributed-2.12.0/distributed/deploy/adaptive_core.py  2020-03-02 
03:40:28.000000000 +0100
@@ -13,7 +13,7 @@
     The core logic for adaptive deployments, with none of the cluster details
 
     This class controls our adaptive scaling behavior.  It is intended to be
-    sued as a super-class or mixin.  It expects the following state and 
methods:
+    used as a super-class or mixin.  It expects the following state and 
methods:
 
     **State**
 
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/distributed-2.11.0/distributed/deploy/cluster.py 
new/distributed-2.12.0/distributed/deploy/cluster.py
--- old/distributed-2.11.0/distributed/deploy/cluster.py        2020-02-14 
16:29:06.000000000 +0100
+++ new/distributed-2.12.0/distributed/deploy/cluster.py        2020-03-01 
17:01:14.000000000 +0100
@@ -1,6 +1,7 @@
 import asyncio
 import logging
 import threading
+import warnings
 
 from dask.utils import format_bytes
 
@@ -159,11 +160,11 @@
         else:
             return sync(self.loop, func, *args, **kwargs)
 
-    async def _logs(self, scheduler=True, workers=True):
+    async def _get_logs(self, scheduler=True, workers=True):
         logs = Logs()
 
         if scheduler:
-            L = await self.scheduler_comm.logs()
+            L = await self.scheduler_comm.get_logs()
             logs["Scheduler"] = Log("\n".join(line for level, line in L))
 
         if workers:
@@ -173,7 +174,7 @@
 
         return logs
 
-    def logs(self, scheduler=True, workers=True):
+    def get_logs(self, scheduler=True, workers=True):
         """ Return logs for the scheduler and workers
 
         Parameters
@@ -190,7 +191,11 @@
             A dictionary of logs, with one item for the scheduler and one for
             each worker
         """
-        return self.sync(self._logs, scheduler=scheduler, workers=workers)
+        return self.sync(self._get_logs, scheduler=scheduler, workers=workers)
+
+    def logs(self, *args, **kwargs):
+        warnings.warn("logs is deprecated, use get_logs instead", 
DeprecationWarning)
+        return self.get_logs(*args, **kwargs)
 
     @property
     def dashboard_link(self):
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/distributed-2.11.0/distributed/distributed.yaml 
new/distributed-2.12.0/distributed/distributed.yaml
--- old/distributed-2.11.0/distributed/distributed.yaml 2020-02-14 
16:29:06.000000000 +0100
+++ new/distributed-2.12.0/distributed/distributed.yaml 2020-03-04 
22:11:23.000000000 +0100
@@ -19,6 +19,7 @@
     idle-timeout: null      # Shut down after this duration, like "1h" or "30 
minutes"
     transition-log-length: 100000
     work-stealing: True     # workers should steal tasks from each other
+    work-stealing-interval: 100ms  # Callback time for work stealing
     worker-ttl: null        # like '60s'. Time to live for workers.  They must 
heartbeat faster than this
     pickle: True            # Is the scheduler allowed to deserialize 
arbitrary bytestrings
     preload: []
@@ -132,5 +133,12 @@
     log-length: 10000  # default length of logs to keep in memory
     log-format: '%(name)s - %(levelname)s - %(message)s'
     pdb-on-err: False       # enter debug mode on scheduling error
+rmm:
+  pool-size: null
+ucx:
+  tcp: null  # enable tcp
+  nvlink: null  # enable cuda_ipc
+  infiniband: null # enable Infiniband
+  cuda_copy: null  # enable cuda-copy
+  net-devices: null  # define which Infiniband device to use
 
-ucx: {}
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/distributed-2.11.0/distributed/nanny.py 
new/distributed-2.12.0/distributed/nanny.py
--- old/distributed-2.11.0/distributed/nanny.py 2020-02-14 16:29:06.000000000 
+0100
+++ new/distributed-2.12.0/distributed/nanny.py 2020-03-04 22:11:23.000000000 
+0100
@@ -66,7 +66,7 @@
         ncores=None,
         loop=None,
         local_dir=None,
-        local_directory="dask-worker-space",
+        local_directory=None,
         services=None,
         name=None,
         memory_limit="auto",
@@ -150,6 +150,12 @@
             warnings.warn("The local_dir keyword has moved to local_directory")
             local_directory = local_dir
 
+        if local_directory is None:
+            local_directory = dask.config.get("temporary-directory") or 
os.getcwd()
+            if not os.path.exists(local_directory):
+                os.makedirs(local_directory)
+            local_directory = os.path.join(local_directory, 
"dask-worker-space")
+
         self.local_directory = local_directory
 
         self.services = services
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/distributed-2.11.0/distributed/protocol/__init__.py 
new/distributed-2.12.0/distributed/protocol/__init__.py
--- old/distributed-2.11.0/distributed/protocol/__init__.py     2020-02-19 
18:21:14.000000000 +0100
+++ new/distributed-2.12.0/distributed/protocol/__init__.py     2020-03-06 
21:01:52.000000000 +0100
@@ -30,6 +30,12 @@
     from . import numpy
 
 
+@dask_serialize.register_lazy("scipy")
+@dask_deserialize.register_lazy("scipy")
+def _register_scipy():
+    from . import scipy
+
+
 @dask_serialize.register_lazy("h5py")
 @dask_deserialize.register_lazy("h5py")
 def _register_h5py():
@@ -72,6 +78,10 @@
 @cuda_deserialize.register_lazy("cupy")
 @dask_serialize.register_lazy("cupy")
 @dask_deserialize.register_lazy("cupy")
+@cuda_serialize.register_lazy("cupyx")
+@cuda_deserialize.register_lazy("cupyx")
+@dask_serialize.register_lazy("cupyx")
+@dask_deserialize.register_lazy("cupyx")
 def _register_cupy():
     from . import cupy
 
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/distributed-2.11.0/distributed/protocol/cuda.py 
new/distributed-2.12.0/distributed/protocol/cuda.py
--- old/distributed-2.11.0/distributed/protocol/cuda.py 2019-11-07 
20:29:36.000000000 +0100
+++ new/distributed-2.12.0/distributed/protocol/cuda.py 2020-03-04 
22:11:23.000000000 +0100
@@ -1,7 +1,7 @@
 import dask
 
 from . import pickle
-from .serialize import register_serialization_family
+from .serialize import ObjectDictSerializer, register_serialization_family
 from dask.utils import typename
 
 cuda_serialize = dask.utils.Dispatch("cuda_serialize")
@@ -29,3 +29,8 @@
 
 
 register_serialization_family("cuda", cuda_dumps, cuda_loads)
+
+
+cuda_object_with_dict_serializer = ObjectDictSerializer("cuda")
+
+cuda_deserialize.register(dict)(cuda_object_with_dict_serializer.deserialize)
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/distributed-2.11.0/distributed/protocol/cupy.py 
new/distributed-2.12.0/distributed/protocol/cupy.py
--- old/distributed-2.11.0/distributed/protocol/cupy.py 2020-02-19 
18:21:14.000000000 +0100
+++ new/distributed-2.12.0/distributed/protocol/cupy.py 2020-03-06 
21:01:52.000000000 +0100
@@ -1,10 +1,12 @@
 """
 Efficient serialization GPU arrays.
 """
+import copyreg
+
 import cupy
 
 from .cuda import cuda_deserialize, cuda_serialize
-from .serialize import dask_deserialize, dask_serialize
+from .serialize import dask_deserialize, dask_serialize, register_generic
 
 try:
     from .rmm import dask_deserialize_rmm_device_buffer as 
dask_deserialize_cuda_buffer
@@ -80,3 +82,40 @@
     frames = [dask_deserialize_cuda_buffer(header, frames)]
     arr = cuda_deserialize_cupy_ndarray(header, frames)
     return arr
+
+
+try:
+    from cupy.cusparse import MatDescriptor
+    from cupyx.scipy.sparse import spmatrix
+except ImportError:
+    MatDescriptor = None
+    spmatrix = None
+
+
+if MatDescriptor is not None:
+
+    def reduce_matdescriptor(other):
+        # Pickling MatDescriptor errors
+        # xref: https://github.com/cupy/cupy/issues/3061
+        return cupy.cusparse.MatDescriptor.create, ()
+
+    copyreg.pickle(MatDescriptor, reduce_matdescriptor)
+
+    @cuda_serialize.register(MatDescriptor)
+    @dask_serialize.register(MatDescriptor)
+    def serialize_cupy_matdescriptor(x):
+        header, frames = {}, []
+        return header, frames
+
+    @cuda_deserialize.register(MatDescriptor)
+    @dask_deserialize.register(MatDescriptor)
+    def deserialize_cupy_matdescriptor(header, frames):
+        return MatDescriptor.create()
+
+
+if spmatrix is not None:
+    for n, s, d in [
+        ("cuda", cuda_serialize, cuda_deserialize),
+        ("dask", dask_serialize, dask_deserialize),
+    ]:
+        register_generic(spmatrix, n, s, d)
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/distributed-2.11.0/distributed/protocol/scipy.py 
new/distributed-2.12.0/distributed/protocol/scipy.py
--- old/distributed-2.11.0/distributed/protocol/scipy.py        1970-01-01 
01:00:00.000000000 +0100
+++ new/distributed-2.12.0/distributed/protocol/scipy.py        2020-03-04 
22:11:23.000000000 +0100
@@ -0,0 +1,30 @@
+"""
+Efficient serialization of SciPy sparse matrices.
+"""
+import scipy
+
+from .serialize import dask_deserialize, dask_serialize, register_generic
+
+register_generic(scipy.sparse.spmatrix, "dask", dask_serialize, 
dask_deserialize)
+
+
+@dask_serialize.register(scipy.sparse.dok.dok_matrix)
+def serialize_scipy_sparse_dok(x):
+    x_coo = x.tocoo()
+    coo_header, coo_frames = dask_serialize(x.tocoo())
+
+    header = {"coo_header": coo_header}
+    frames = coo_frames
+
+    return header, frames
+
+
+@dask_deserialize.register(scipy.sparse.dok.dok_matrix)
+def deserialize_scipy_sparse_dok(header, frames):
+    coo_header = header["coo_header"]
+    coo_frames = frames
+    x_coo = dask_deserialize(coo_header, coo_frames)
+
+    x = x_coo.todok()
+
+    return x
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/distributed-2.11.0/distributed/protocol/serialize.py 
new/distributed-2.12.0/distributed/protocol/serialize.py
--- old/distributed-2.11.0/distributed/protocol/serialize.py    2020-02-17 
23:21:38.000000000 +0100
+++ new/distributed-2.12.0/distributed/protocol/serialize.py    2020-03-04 
22:11:23.000000000 +0100
@@ -547,7 +547,9 @@
 def _is_msgpack_serializable(v):
     typ = type(v)
     return (
-        typ is str
+        v is None
+        or typ is str
+        or typ is bool
         or typ is int
         or typ is float
         or isinstance(v, dict)
@@ -558,59 +560,69 @@
     )
 
 
-def serialize_object_with_dict(est):
-    header = {
-        "serializer": "dask",
-        "type-serialized": pickle.dumps(type(est)),
-        "simple": {},
-        "complex": {},
-    }
-    frames = []
-
-    if isinstance(est, dict):
-        d = est
-    else:
-        d = est.__dict__
-
-    for k, v in d.items():
-        if _is_msgpack_serializable(v):
-            header["simple"][k] = v
-        else:
-            if isinstance(v, dict):
-                h, f = serialize_object_with_dict(v)
-            else:
-                h, f = serialize(v)
-            header["complex"][k] = {
-                "header": h,
-                "start": len(frames),
-                "stop": len(frames) + len(f),
-            }
-            frames += f
-    return header, frames
-
-
-def deserialize_object_with_dict(header, frames):
-    cls = pickle.loads(header["type-serialized"])
-    if issubclass(cls, dict):
-        dd = obj = {}
-    else:
-        obj = object.__new__(cls)
-        dd = obj.__dict__
-    dd.update(header["simple"])
-    for k, d in header["complex"].items():
-        h = d["header"]
-        f = frames[d["start"] : d["stop"]]
-        v = deserialize(h, f)
-        dd[k] = v
-
-    return obj
-
-
-dask_deserialize.register(dict)(deserialize_object_with_dict)
+class ObjectDictSerializer:
+    def __init__(self, serializer):
+        self.serializer = serializer
+
+    def serialize(self, est):
+        header = {
+            "serializer": self.serializer,
+            "type-serialized": pickle.dumps(type(est)),
+            "simple": {},
+            "complex": {},
+        }
+        frames = []
 
+        if isinstance(est, dict):
+            d = est
+        else:
+            d = est.__dict__
 
-def register_generic(cls):
-    """ Register dask_(de)serialize to traverse through __dict__
+        for k, v in d.items():
+            if _is_msgpack_serializable(v):
+                header["simple"][k] = v
+            else:
+                if isinstance(v, dict):
+                    h, f = self.serialize(v)
+                else:
+                    h, f = serialize(v, serializers=(self.serializer, 
"pickle"))
+                header["complex"][k] = {
+                    "header": h,
+                    "start": len(frames),
+                    "stop": len(frames) + len(f),
+                }
+                frames += f
+        return header, frames
+
+    def deserialize(self, header, frames):
+        cls = pickle.loads(header["type-serialized"])
+        if issubclass(cls, dict):
+            dd = obj = {}
+        else:
+            obj = object.__new__(cls)
+            dd = obj.__dict__
+        dd.update(header["simple"])
+        for k, d in header["complex"].items():
+            h = d["header"]
+            f = frames[d["start"] : d["stop"]]
+            v = deserialize(h, f)
+            dd[k] = v
+
+        return obj
+
+
+dask_object_with_dict_serializer = ObjectDictSerializer("dask")
+
+dask_deserialize.register(dict)(dask_object_with_dict_serializer.deserialize)
+
+
+def register_generic(
+    cls,
+    serializer_name="dask",
+    serialize_func=dask_serialize,
+    deserialize_func=dask_deserialize,
+):
+    """ Register (de)serialize to traverse through __dict__
 
     Normally when registering new classes for Dask's custom serialization you
     need to manage headers and frames, which can be tedious.  If all you want
@@ -641,5 +653,6 @@
     dask_serialize
     dask_deserialize
     """
-    dask_serialize.register(cls)(serialize_object_with_dict)
-    dask_deserialize.register(cls)(deserialize_object_with_dict)
+    object_with_dict_serializer = ObjectDictSerializer(serializer_name)
+    serialize_func.register(cls)(object_with_dict_serializer.serialize)
+    deserialize_func.register(cls)(object_with_dict_serializer.deserialize)
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' 
old/distributed-2.11.0/distributed/protocol/tests/test_cupy.py 
new/distributed-2.12.0/distributed/protocol/tests/test_cupy.py
--- old/distributed-2.11.0/distributed/protocol/tests/test_cupy.py      
2020-02-19 18:21:14.000000000 +0100
+++ new/distributed-2.12.0/distributed/protocol/tests/test_cupy.py      
2020-03-06 21:01:52.000000000 +0100
@@ -1,6 +1,7 @@
-from distributed.protocol import serialize, deserialize
 import pickle
+
 import pytest
+from distributed.protocol import deserialize, serialize
 
 cupy = pytest.importorskip("cupy")
 numpy = pytest.importorskip("numpy")
@@ -61,3 +62,40 @@
     y = deserialize(header, frames, deserializers=("cuda", "dask", "pickle", 
"error"))
 
     assert (x_np == cupy.asnumpy(y)).all()
+
+
+@pytest.mark.parametrize(
+    "sparse_name", ["coo_matrix", "csc_matrix", "csr_matrix", "dia_matrix",],
+)
+@pytest.mark.parametrize(
+    "dtype",
+    [numpy.dtype("<f4"), numpy.dtype(">f4"), numpy.dtype("<f8"), 
numpy.dtype(">f8"),],
+)
+@pytest.mark.parametrize("serializer", ["cuda", "dask", "pickle"])
+def test_serialize_cupy_sparse(sparse_name, dtype, serializer):
+    scipy_sparse = pytest.importorskip("scipy.sparse")
+    cupy_sparse = pytest.importorskip("cupyx.scipy.sparse")
+
+    scipy_sparse_type = getattr(scipy_sparse, sparse_name)
+    cupy_sparse_type = getattr(cupy_sparse, sparse_name)
+
+    a_host = numpy.array([[0, 1, 0], [2, 0, 3], [0, 4, 0]], dtype=dtype)
+    asp_host = scipy_sparse_type(a_host)
+    if sparse_name == "dia_matrix":
+        # CuPy `dia_matrix` cannot be created from SciPy one
+        # xref: https://github.com/cupy/cupy/issues/3158
+        asp_dev = cupy_sparse_type(
+            (asp_host.data, asp_host.offsets),
+            shape=asp_host.shape,
+            dtype=asp_host.dtype,
+        )
+    else:
+        asp_dev = cupy_sparse_type(asp_host)
+
+    header, frames = serialize(asp_dev, serializers=[serializer])
+    a2sp_dev = deserialize(header, frames)
+
+    a2sp_host = a2sp_dev.get()
+    a2_host = a2sp_host.todense()
+
+    assert (a_host == a2_host).all()
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' 
old/distributed-2.11.0/distributed/protocol/tests/test_scipy.py 
new/distributed-2.12.0/distributed/protocol/tests/test_scipy.py
--- old/distributed-2.11.0/distributed/protocol/tests/test_scipy.py     
1970-01-01 01:00:00.000000000 +0100
+++ new/distributed-2.12.0/distributed/protocol/tests/test_scipy.py     
2020-03-04 22:11:23.000000000 +0100
@@ -0,0 +1,37 @@
+import pytest
+from distributed.protocol import deserialize, serialize
+
+numpy = pytest.importorskip("numpy")
+scipy = pytest.importorskip("scipy")
+scipy_sparse = pytest.importorskip("scipy.sparse")
+
+
+@pytest.mark.parametrize(
+    "sparse_type",
+    [
+        scipy_sparse.bsr_matrix,
+        scipy_sparse.coo_matrix,
+        scipy_sparse.csc_matrix,
+        scipy_sparse.csr_matrix,
+        scipy_sparse.dia_matrix,
+        scipy_sparse.dok_matrix,
+        scipy_sparse.lil_matrix,
+    ],
+)
+@pytest.mark.parametrize(
+    "dtype",
+    [numpy.dtype("<f4"), numpy.dtype(">f4"), numpy.dtype("<f8"), 
numpy.dtype(">f8"),],
+)
+def test_serialize_scipy_sparse(sparse_type, dtype):
+    a = numpy.array([[0, 1, 0], [2, 0, 3], [0, 4, 0]], dtype=dtype)
+
+    anz = a.nonzero()
+    acoo = scipy_sparse.coo_matrix((a[anz], anz))
+    asp = sparse_type(acoo)
+
+    header, frames = serialize(asp, serializers=["dask"])
+    asp2 = deserialize(header, frames)
+
+    a2 = asp2.todense()
+
+    assert (a == a2).all()
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/distributed-2.11.0/distributed/scheduler.py 
new/distributed-2.12.0/distributed/scheduler.py
--- old/distributed-2.11.0/distributed/scheduler.py     2020-02-19 
18:21:08.000000000 +0100
+++ new/distributed-2.12.0/distributed/scheduler.py     2020-03-06 
21:01:52.000000000 +0100
@@ -1267,6 +1267,7 @@
             "call_stack": self.get_call_stack,
             "profile": self.get_profile,
             "performance_report": self.performance_report,
+            "get_logs": self.get_logs,
             "logs": self.get_logs,
             "worker_logs": self.get_worker_logs,
             "nbytes": self.get_nbytes,
@@ -4648,7 +4649,7 @@
                 if ts.state == "forgotten":
                     del self.tasks[ts.key]
 
-            if ts.state == "forgotten":
+            if ts.state == "forgotten" and ts.group.name in self.task_groups:
                 # Remove TaskGroup if all tasks are in the forgotten state
                 tg = ts.group
                 if not any(tg.states.get(s) for s in ALL_TASK_STATES):
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/distributed-2.11.0/distributed/stealing.py 
new/distributed-2.12.0/distributed/stealing.py
--- old/distributed-2.11.0/distributed/stealing.py      2019-11-07 
20:29:36.000000000 +0100
+++ new/distributed-2.12.0/distributed/stealing.py      2020-03-04 
22:11:23.000000000 +0100
@@ -4,9 +4,10 @@
 from time import time
 
 import dask
+from .comm.addressing import get_address_host
 from .core import CommClosedError
 from .diagnostics.plugin import SchedulerPlugin
-from .utils import log_errors, PeriodicCallback
+from .utils import log_errors, parse_timedelta, PeriodicCallback
 
 try:
     from cytoolz import topk
@@ -40,8 +41,15 @@
         for worker in scheduler.workers:
             self.add_worker(worker=worker)
 
+        # `callback_time` is in milliseconds
+        callback_time = 1000 * parse_timedelta(
+            dask.config.get("distributed.scheduler.work-stealing-interval"),
+            default="ms",
+        )
         pc = PeriodicCallback(
-            callback=self.balance, callback_time=100, 
io_loop=self.scheduler.loop
+            callback=self.balance,
+            callback_time=callback_time,
+            io_loop=self.scheduler.loop,
         )
         self._pc = pc
         self.scheduler.periodic_callbacks["stealing"] = pc
@@ -121,11 +129,6 @@
         For example a result of zero implies a task without dependencies.
         level: The location within a stealable list to place this value
         """
-        if not ts.loose_restrictions and (
-            ts.host_restrictions or ts.worker_restrictions or 
ts.resource_restrictions
-        ):
-            return None, None  # don't steal
-
         if not ts.dependencies:  # no dependencies fast path
             return 0, 0
 
@@ -251,7 +254,7 @@
                 self.scheduler.check_idle_saturated(victim)
 
             # Victim was waiting, has given up task, enact steal
-            elif state in ("waiting", "ready"):
+            elif state in ("waiting", "ready", "constrained"):
                 self.remove_key_from_stealable(ts)
                 ts.processing_on = thief
                 duration = victim.processing.pop(ts)
@@ -353,14 +356,23 @@
                         i += 1
                         if not idle:
                             break
-                        idl = idle[i % len(idle)]
+
+                        if _has_restrictions(ts):
+                            thieves = [ws for ws in idle if _can_steal(ws, ts, 
sat)]
+                        else:
+                            thieves = idle
+                        if not thieves:
+                            break
+                        thief = thieves[i % len(thieves)]
 
                         duration = sat.processing.get(ts)
                         if duration is None:
                             stealable.discard(ts)
                             continue
 
-                        maybe_move_task(level, ts, sat, idl, duration, 
cost_multiplier)
+                        maybe_move_task(
+                            level, ts, sat, thief, duration, cost_multiplier
+                        )
 
                 if self.cost_multipliers[level] < 20:  # don't steal from 
public at cost
                     stealable = self.stealable_all[level]
@@ -381,10 +393,18 @@
                             continue
 
                         i += 1
-                        idl = idle[i % len(idle)]
+                        if _has_restrictions(ts):
+                            thieves = [ws for ws in idle if _can_steal(ws, ts, 
sat)]
+                        else:
+                            thieves = idle
+                        if not thieves:
+                            continue
+                        thief = thieves[i % len(thieves)]
                         duration = sat.processing[ts]
 
-                        maybe_move_task(level, ts, sat, idl, duration, 
cost_multiplier)
+                        maybe_move_task(
+                            level, ts, sat, thief, duration, cost_multiplier
+                        )
 
             if log:
                 self.log.append(log)
@@ -415,4 +435,41 @@
         return out
 
 
+def _has_restrictions(ts):
+    """Determine whether the given task has restrictions and whether these
+    restrictions are strict.
+    """
+    return not ts.loose_restrictions and (
+        ts.host_restrictions or ts.worker_restrictions or 
ts.resource_restrictions
+    )
+
+
+def _can_steal(thief, ts, victim):
+    """Determine whether worker ``thief`` can steal task ``ts`` from worker
+    ``victim``.
+
+    Assumes that `ts` has some restrictions.
+    """
+    if (
+        ts.host_restrictions
+        and get_address_host(thief.address) not in ts.host_restrictions
+    ):
+        return False
+    elif ts.worker_restrictions and thief.address not in 
ts.worker_restrictions:
+        return False
+
+    if victim.resources is None:
+        return True
+
+    for resource, value in victim.resources.items():
+        try:
+            supplied = thief.resources[resource]
+        except KeyError:
+            return False
+        else:
+            if supplied < value:
+                return False
+    return True
+
+
 fast_tasks = {"shuffle-split"}
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/distributed-2.11.0/distributed/tests/test_nanny.py 
new/distributed-2.12.0/distributed/tests/test_nanny.py
--- old/distributed-2.11.0/distributed/tests/test_nanny.py      2020-02-12 
03:23:12.000000000 +0100
+++ new/distributed-2.12.0/distributed/tests/test_nanny.py      2020-03-04 
15:31:16.000000000 +0100
@@ -402,6 +402,16 @@
     yield w.close()
 
 
+@gen_cluster(nthreads=[])
+def test_local_directory(s):
+    with tmpfile() as fn:
+        with dask.config.set(temporary_directory=fn):
+            w = yield Nanny(s.address)
+            assert w.local_directory.startswith(fn)
+            assert "dask-worker-space" in w.local_directory
+            yield w.close()
+
+
 def _noop(x):
     """Define here because closures aren't pickleable."""
     pass
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' 
old/distributed-2.11.0/distributed/tests/test_scheduler.py 
new/distributed-2.12.0/distributed/tests/test_scheduler.py
--- old/distributed-2.11.0/distributed/tests/test_scheduler.py  2020-02-12 
03:23:12.000000000 +0100
+++ new/distributed-2.12.0/distributed/tests/test_scheduler.py  2020-03-06 
21:01:52.000000000 +0100
@@ -1837,6 +1837,17 @@
     assert s.task_prefixes["sum"].states["memory"] == 2
 
 
+@gen_cluster(client=True)
+async def test_task_group_on_fire_and_forget(c, s, a, b):
+    # Regression test for https://github.com/dask/distributed/issues/3465
+    with captured_logger("distributed.scheduler") as logs:
+        x = await c.scatter(list(range(10)))
+        fire_and_forget([c.submit(slowadd, i, x[i]) for i in range(len(x))])
+        await asyncio.sleep(1)
+
+    assert "Error transitioning" not in logs.getvalue()
+
+
 class BrokenComm(Comm):
     peer_address = None
     local_address = None
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/distributed-2.11.0/distributed/tests/test_steal.py 
new/distributed-2.12.0/distributed/tests/test_steal.py
--- old/distributed-2.11.0/distributed/tests/test_steal.py      2020-02-14 
16:29:09.000000000 +0100
+++ new/distributed-2.12.0/distributed/tests/test_steal.py      2020-03-04 
22:11:23.000000000 +0100
@@ -9,6 +9,7 @@
 from toolz import sliding_window, concat
 from tornado import gen
 
+import dask
 from distributed import Nanny, Worker, wait, worker_client
 from distributed.config import config
 from distributed.metrics import time
@@ -223,6 +224,32 @@
     assert len(b.task_state) == 0
 
 
+@gen_cluster(
+    client=True, nthreads=[("127.0.0.1", 1), ("127.0.0.1", 2), ("127.0.0.1", 
2)]
+)
+def test_steal_worker_restrictions(c, s, wa, wb, wc):
+    future = c.submit(slowinc, 1, delay=0.1, workers={wa.address, wb.address})
+    yield future
+
+    ntasks = 100
+    futures = c.map(slowinc, range(ntasks), delay=0.1, workers={wa.address, 
wb.address})
+
+    while sum(len(w.task_state) for w in [wa, wb, wc]) < ntasks:
+        yield gen.sleep(0.01)
+
+    assert 0 < len(wa.task_state) < ntasks
+    assert 0 < len(wb.task_state) < ntasks
+    assert len(wc.task_state) == 0
+
+    s.extensions["stealing"].balance()
+
+    yield gen.sleep(0.1)
+
+    assert 0 < len(wa.task_state) < ntasks
+    assert 0 < len(wb.task_state) < ntasks
+    assert len(wc.task_state) == 0
+
+
 @pytest.mark.skipif(
     not sys.platform.startswith("linux"), reason="Need 127.0.0.2 to mean 
localhost"
 )
@@ -244,6 +271,34 @@
     assert len(b.task_state) == 0
 
 
+@pytest.mark.skipif(
+    not sys.platform.startswith("linux"), reason="Need 127.0.0.2 to mean 
localhost"
+)
+@gen_cluster(client=True, nthreads=[("127.0.0.1", 1), ("127.0.0.2", 2)])
+def test_steal_host_restrictions(c, s, wa, wb):
+    future = c.submit(slowinc, 1, delay=0.10, workers=wa.address)
+    yield future
+
+    ntasks = 100
+    futures = c.map(slowinc, range(ntasks), delay=0.1, workers="127.0.0.1")
+    while len(wa.task_state) < ntasks:
+        yield gen.sleep(0.01)
+    assert len(wa.task_state) == ntasks
+    assert len(wb.task_state) == 0
+
+    wc = yield Worker(s.address, ncores=1)
+
+    start = time()
+    while not wc.task_state or len(wa.task_state) == ntasks:
+        yield gen.sleep(0.01)
+        assert time() < start + 3
+
+    yield gen.sleep(0.1)
+    assert 0 < len(wa.task_state) < ntasks
+    assert len(wb.task_state) == 0
+    assert 0 < len(wc.task_state) < ntasks
+
+
 @gen_cluster(
     client=True, nthreads=[("127.0.0.1", 1, {"resources": {"A": 2}}), 
("127.0.0.1", 1)]
 )
@@ -264,7 +319,6 @@
     assert len(b.task_state) == 0
 
 
-@pytest.mark.skip(reason="no stealing of resources")
 @gen_cluster(
     client=True, nthreads=[("127.0.0.1", 1, {"resources": {"A": 2}})], 
timeout=3
 )
@@ -676,3 +730,20 @@
 
     out = log.getvalue()
     assert "Error" not in out
+
+
+@gen_cluster(client=True)
+def test_worker_stealing_interval(c, s, a, b):
+    from distributed.scheduler import WorkStealing
+
+    ws = WorkStealing(s)
+    assert ws._pc.callback_time == 100
+
+    with dask.config.set({"distributed.scheduler.work-stealing-interval": 
"500ms"}):
+        ws = WorkStealing(s)
+    assert ws._pc.callback_time == 500
+
+    # Default unit is `ms`
+    with dask.config.set({"distributed.scheduler.work-stealing-interval": 2}):
+        ws = WorkStealing(s)
+    assert ws._pc.callback_time == 2
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/distributed-2.11.0/distributed/tests/test_worker.py 
new/distributed-2.12.0/distributed/tests/test_worker.py
--- old/distributed-2.11.0/distributed/tests/test_worker.py     2020-02-14 
16:29:09.000000000 +0100
+++ new/distributed-2.12.0/distributed/tests/test_worker.py     2020-03-04 
22:11:23.000000000 +0100
@@ -31,7 +31,7 @@
     wait,
 )
 from distributed.compatibility import WINDOWS
-from distributed.core import rpc
+from distributed.core import rpc, CommClosedError
 from distributed.scheduler import Scheduler
 from distributed.metrics import time
 from distributed.worker import Worker, error_message, logger, 
parse_memory_limit
@@ -1629,3 +1629,26 @@
 
             if w.digests is not None:
                 assert w.digests["latency"].size() > 0
+
+
+@pytest.mark.asyncio
+@pytest.mark.parametrize("reconnect", [True, False])
+async def test_heartbeat_comm_closed(cleanup, monkeypatch, reconnect):
+    with captured_logger("distributed.worker", level=logging.WARNING) as 
logger:
+        async with await Scheduler() as s:
+
+            def bad_heartbeat_worker(*args, **kwargs):
+                raise CommClosedError()
+
+            async with await Worker(s.address, reconnect=reconnect) as w:
+                # Trigger CommClosedError during worker heartbeat
+                monkeypatch.setattr(
+                    w.scheduler, "heartbeat_worker", bad_heartbeat_worker
+                )
+
+                await w.heartbeat()
+                if reconnect:
+                    assert w.status == "running"
+                else:
+                    assert w.status == "closed"
+    assert "Heartbeat to scheduler failed" in logger.getvalue()
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/distributed-2.11.0/distributed/worker.py 
new/distributed-2.12.0/distributed/worker.py
--- old/distributed-2.11.0/distributed/worker.py        2020-02-18 
16:44:44.000000000 +0100
+++ new/distributed-2.12.0/distributed/worker.py        2020-03-04 
22:11:23.000000000 +0100
@@ -382,7 +382,6 @@
         self.executed_count = 0
         self.long_running = set()
 
-        self.batched_stream = None
         self.recent_messages_log = deque(
             
maxlen=dask.config.get("distributed.comm.recent-messages-log-length")
         )
@@ -493,7 +492,7 @@
         if local_directory is None:
             local_directory = dask.config.get("temporary-directory") or 
os.getcwd()
             if not os.path.exists(local_directory):
-                os.mkdir(local_directory)
+                os.makedirs(local_directory)
             local_directory = os.path.join(local_directory, 
"dask-worker-space")
 
         with warn_on_duration(
@@ -573,6 +572,7 @@
         self.actor_executor = ThreadPoolExecutor(
             1, thread_name_prefix="Dask-Actor-Threads"
         )
+        self.batched_stream = BatchedSend(interval="2ms", loop=self.loop)
         self.name = name
         self.scheduler_delay = 0
         self.stream_comms = dict()
@@ -650,6 +650,13 @@
 
         pc = PeriodicCallback(self.heartbeat, 1000, io_loop=self.io_loop)
         self.periodic_callbacks["heartbeat"] = pc
+        pc = PeriodicCallback(
+            lambda: self.batched_stream.send({"op": "keep-alive"}),
+            60000,
+            io_loop=self.io_loop,
+        )
+        self.periodic_callbacks["keep-alive"] = pc
+
         self._address = contact_address
 
         if self.memory_limit:
@@ -797,6 +804,7 @@
     #####################
 
     async def _register_with_scheduler(self):
+        self.periodic_callbacks["keep-alive"].stop()
         self.periodic_callbacks["heartbeat"].stop()
         start = time()
         if self.contact_address is None:
@@ -863,15 +871,8 @@
             logger.info("        Registered to: %26s", self.scheduler.address)
             logger.info("-" * 49)
 
-        self.batched_stream = BatchedSend(interval="2ms", loop=self.loop)
         self.batched_stream.start(comm)
-        pc = PeriodicCallback(
-            lambda: self.batched_stream.send({"op": "keep-alive"}),
-            60000,
-            io_loop=self.io_loop,
-        )
-        self.periodic_callbacks["keep-alive"] = pc
-        pc.start()
+        self.periodic_callbacks["keep-alive"].start()
         self.periodic_callbacks["heartbeat"].start()
         self.loop.add_callback(self.handle_scheduler, comm)
 
@@ -912,14 +913,16 @@
                 )
                 self.bandwidth_workers.clear()
                 self.bandwidth_types.clear()
+            except CommClosedError:
+                logger.warning("Heartbeat to scheduler failed")
+                if not self.reconnect:
+                    await self.close(report=False)
             except IOError as e:
                 # Scheduler is gone. Respect distributed.comm.timeouts.connect
                 if "Timed out trying to connect" in str(e):
                     await self.close(report=False)
                 else:
                     raise e
-            except CommClosedError:
-                logger.warning("Heartbeat to scheduler failed")
             finally:
                 self.heartbeat_active = False
         else:
@@ -1112,7 +1115,11 @@
             for k, v in self.services.items():
                 v.stop()
 
-            if self.batched_stream and not self.batched_stream.comm.closed():
+            if (
+                self.batched_stream
+                and self.batched_stream.comm
+                and not self.batched_stream.comm.closed()
+            ):
                 self.batched_stream.send({"op": "close-stream"})
 
             if self.batched_stream:
@@ -2143,7 +2150,7 @@
         response = {"op": "steal-response", "key": key, "state": state}
         self.batched_stream.send(response)
 
-        if state in ("ready", "waiting"):
+        if state in ("ready", "waiting", "constrained"):
             self.release_key(key)
 
     def release_key(self, key, cause=None, reason=None, report=True):
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/distributed-2.11.0/distributed.egg-info/PKG-INFO 
new/distributed-2.12.0/distributed.egg-info/PKG-INFO
--- old/distributed-2.11.0/distributed.egg-info/PKG-INFO        2020-02-19 
18:46:22.000000000 +0100
+++ new/distributed-2.12.0/distributed.egg-info/PKG-INFO        2020-03-06 
21:17:16.000000000 +0100
@@ -1,6 +1,6 @@
 Metadata-Version: 1.2
 Name: distributed
-Version: 2.11.0
+Version: 2.12.0
 Summary: Distributed scheduler for Dask
 Home-page: https://distributed.dask.org
 Maintainer: Matthew Rocklin
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/distributed-2.11.0/distributed.egg-info/SOURCES.txt 
new/distributed-2.12.0/distributed.egg-info/SOURCES.txt
--- old/distributed-2.11.0/distributed.egg-info/SOURCES.txt     2020-02-19 
18:46:22.000000000 +0100
+++ new/distributed-2.12.0/distributed.egg-info/SOURCES.txt     2020-03-06 
21:17:16.000000000 +0100
@@ -79,6 +79,7 @@
 distributed/comm/tests/__init__.py
 distributed/comm/tests/test_comms.py
 distributed/comm/tests/test_ucx.py
+distributed/comm/tests/test_ucx_config.py
 distributed/dashboard/__init__.py
 distributed/dashboard/core.py
 distributed/dashboard/export_tool.coffee
@@ -171,6 +172,7 @@
 distributed/protocol/numpy.py
 distributed/protocol/pickle.py
 distributed/protocol/rmm.py
+distributed/protocol/scipy.py
 distributed/protocol/serialize.py
 distributed/protocol/sparse.py
 distributed/protocol/torch.py
@@ -189,6 +191,7 @@
 distributed/protocol/tests/test_protocol.py
 distributed/protocol/tests/test_protocol_utils.py
 distributed/protocol/tests/test_rmm.py
+distributed/protocol/tests/test_scipy.py
 distributed/protocol/tests/test_serialize.py
 distributed/protocol/tests/test_sklearn.py
 distributed/protocol/tests/test_sparse.py
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/distributed-2.11.0/docs/source/api.rst 
new/distributed-2.12.0/docs/source/api.rst
--- old/distributed-2.11.0/docs/source/api.rst  2020-01-10 22:26:26.000000000 
+0100
+++ new/distributed-2.12.0/docs/source/api.rst  2020-03-04 22:11:23.000000000 
+0100
@@ -148,6 +148,28 @@
 .. autoclass:: Future
    :members:
 
+Cluster
+-------
+
+Classes relevant for cluster creation and management. Other libraries
+(like `dask-jobqueue`_, `dask-gateway`_, `dask-kubernetes`_, `dask-yarn`_ etc.)
+provide additional cluster objects.
+
+.. _dask-jobqueue: https://jobqueue.dask.org/
+.. _dask-gateway: https://gateway.dask.org/
+.. _dask-kubernetes: https://kubernetes.dask.org/
+.. _dask-yarn: https://yarn.dask.org/en/latest/
+
+.. autosummary::
+   LocalCluster
+   SpecCluster
+
+.. autoclass:: LocalCluster
+   :members:
+
+.. autoclass:: SpecCluster
+   :members:
+
 
 Other
 -----
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/distributed-2.11.0/docs/source/changelog.rst 
new/distributed-2.12.0/docs/source/changelog.rst
--- old/distributed-2.11.0/docs/source/changelog.rst    2020-02-19 
18:44:47.000000000 +0100
+++ new/distributed-2.12.0/docs/source/changelog.rst    2020-03-06 
21:14:55.000000000 +0100
@@ -1,6 +1,34 @@
 Changelog
 =========
 
+2.12.0 - 2020-03-06
+-------------------
+
+- Update ``TaskGroup`` remove logic (:pr:`3557`) `James Bourbeau`_
+- Fix-up CuPy sparse serialization (:pr:`3556`) `John Kirkham`_
+- API docs for ``LocalCluster`` and ``SpecCluster`` (:pr:`3548`) `Tom 
Augspurger`_
+- Serialize sparse arrays (:pr:`3545`) `John Kirkham`_
+- Allow tasks with restrictions to be stolen (:pr:`3069`) `Stan Seibert`_
+- Use UCX default configuration instead of raising (:pr:`3544`) `Peter Andreas 
Entschev`_
+- Support using other serializers with ``register_generic`` (:pr:`3536`) `John 
Kirkham`_
+- DOC: update to async await (:pr:`3543`) `Tom Augspurger`_
+- Use ``pytest.raises`` in ``test_ucx_config.py`` (:pr:`3541`) `John Kirkham`_
+- Fix/more ucx config options (:pr:`3539`) `Benjamin Zaitlen`_
+- Update heartbeat ``CommClosedError`` error handling (:pr:`3529`) `James 
Bourbeau`_
+- Use ``makedirs`` when constructing ``local_directory`` (:pr:`3538`) `John 
Kirkham`_
+- Mark ``None`` as MessagePack serializable (:pr:`3537`) `John Kirkham`_
+- Mark ``bool`` as MessagePack serializable (:pr:`3535`) `John Kirkham`_
+- Use 'temporary-directory' from ``dask.config`` for Nanny's directory 
(:pr:`3531`) `John Kirkham`_
+- Add try-except around getting source code in performance report (:pr:`3505`) 
`Matthew Rocklin`_
+- Fix typo in docstring (:pr:`3528`) `Davis Bennett`_
+- Make work stealing callback time configurable (:pr:`3523`) `Lucas Rademaker`_
+- RMM/UCX Config Flags (:pr:`3515`) `Benjamin Zaitlen`_
+- Revise develop-docs: conda env example (:pr:`3406`) `Darren Weber`_
+- Remove ``import ucp`` from the top of ``ucx.py`` (:pr:`3510`) `Peter Andreas 
Entschev`_
+- Rename ``logs`` to ``get_logs`` (:pr:`3473`) `Jacob Tomlinson`_
+- Stop keep alives when worker reconnecting to the scheduler (:pr:`3493`) 
`Jacob Tomlinson`_
+
+
 2.11.0 - 2020-02-19
 -------------------
 
@@ -1570,3 +1598,7 @@
 .. _`Cyril Shcherbin`: https://github.com/shcherbin
 .. _`Søren Fuglede Jørgensen`: https://github.com/fuglede
 .. _`Igor Gotlibovych`: https://github.com/ig248
+.. _`Stan Seibert`: https://github.com/seibert
+.. _`Davis Bennett`: https://github.com/d-v-b
+.. _`Lucas Rademaker`: https://github.com/lr4d
+.. _`Darren Weber`: https://github.com/dazza-codes
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/distributed-2.11.0/docs/source/develop.rst 
new/distributed-2.12.0/docs/source/develop.rst
--- old/distributed-2.11.0/docs/source/develop.rst      2020-02-12 
03:23:12.000000000 +0100
+++ new/distributed-2.12.0/docs/source/develop.rst      2020-03-04 
22:11:23.000000000 +0100
@@ -22,6 +22,26 @@
    cd distributed
    python setup.py install
 
+Using conda, for example::
+
+   git clone g...@github.com:{your-fork}/distributed.git
+   cd distributed
+   conda create -y -n distributed python=3.6
+   conda activate distributed
+   python -m pip install -U -r requirements.txt
+   python -m pip install -U -r dev-requirements.txt
+   python -m pip install -e .
+
+To keep a fork in sync with the upstream source::
+
+   cd distributed
+   git remote add upstream g...@github.com:dask/distributed.git
+   git remote -v
+   git fetch -a upstream
+   git checkout master
+   git pull upstream master
+   git push origin master
+
 Test
 ----
 
@@ -88,20 +108,20 @@
     from distributed import Client, Future, Scheduler, Worker
 
     @gen_cluster(client=True)
-    def test_submit(c, s, a, b):
+    async def test_submit(c, s, a, b):
         assert isinstance(c, Client)
         assert isinstance(s, Scheduler)
         assert isinstance(a, Worker)
         assert isinstance(b, Worker)
-
+    
         future = c.submit(inc, 1)
         assert isinstance(future, Future)
         assert future.key in c.futures
-
+    
         # result = future.result()  # This synchronous API call would block
-        result = yield future
+        result = await future
         assert result == 2
-
+    
         assert future.key in s.tasks
         assert future.key in a.data or future.key in b.data
 
@@ -111,8 +131,8 @@
 the state of every element of the cluster directly.  However, you can not use
 the normal synchronous API (doing so will cause the test to wait forever) and
 instead you need to use the coroutine API, where all blocking functions are
-prepended with an underscore (``_``).  Beware, it is a common mistake to use
-the blocking interface within these tests.
+prepended with an underscore (``_``) and awaited with ``await``.
+Beware, it is a common mistake to use the blocking interface within these 
tests.
 
 If you want to test the normal synchronous API you can use the ``client``
 pytest fixture style test, which sets up a scheduler and workers for you in
@@ -146,7 +166,7 @@
 In this style of test you do not have access to the scheduler or workers.  The
 variables ``s, a, b`` are now dictionaries holding a
 ``multiprocessing.Process`` object and a port integer.  However, you can now
-use the normal synchronous API (never use yield in this style of test) and you
+use the normal synchronous API (never use ``await`` in this style of test) and 
you
 can close processes easily by terminating them.
 
 Typically for most user-facing functions you will find both kinds of tests.


Reply via email to