Hello community,

here is the log from the commit of package python-distributed for 
openSUSE:Factory checked in at 2020-10-25 18:06:20
++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Comparing /work/SRC/openSUSE:Factory/python-distributed (Old)
 and      /work/SRC/openSUSE:Factory/.python-distributed.new.3463 (New)
++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++

Package is "python-distributed"

Sun Oct 25 18:06:20 2020 rev:37 rq:841145 version:2.30.0

Changes:
--------
--- /work/SRC/openSUSE:Factory/python-distributed/python-distributed.changes    
2020-10-07 14:17:37.957444944 +0200
+++ 
/work/SRC/openSUSE:Factory/.python-distributed.new.3463/python-distributed.changes
  2020-10-25 18:06:35.759343920 +0100
@@ -1,0 +2,11 @@
+Sat Oct 10 19:04:32 UTC 2020 - Arun Persaud <a...@gmx.de>
+
+- update to version 2.30.0:
+  * Support SubgraphCallable in str_graph() (GH#4148) Mads
+    R. B. Kristensen
+  * Handle exceptions in BatchedSend (GH#4135) Tom Augspurger
+  * Fix for missing : in autosummary docs (GH#4143) Gil Forsyth
+  * Limit GPU metrics to visible devices only (GH#3810) Jacob
+    Tomlinson
+
+-------------------------------------------------------------------

Old:
----
  distributed-2.29.0.tar.gz

New:
----
  distributed-2.30.0.tar.gz

++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++

Other differences:
------------------
++++++ python-distributed.spec ++++++
--- /var/tmp/diff_new_pack.xd9V4k/_old  2020-10-25 18:06:37.715345771 +0100
+++ /var/tmp/diff_new_pack.xd9V4k/_new  2020-10-25 18:06:37.719345775 +0100
@@ -21,7 +21,7 @@
 # Test requires network connection
 %bcond_with     test
 Name:           python-distributed
-Version:        2.29.0
+Version:        2.30.0
 Release:        0
 Summary:        Library for distributed computing with Python
 License:        BSD-3-Clause

++++++ distributed-2.29.0.tar.gz -> distributed-2.30.0.tar.gz ++++++
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/distributed-2.29.0/PKG-INFO 
new/distributed-2.30.0/PKG-INFO
--- old/distributed-2.29.0/PKG-INFO     2020-10-03 01:23:00.334293100 +0200
+++ new/distributed-2.30.0/PKG-INFO     2020-10-07 00:36:08.340993600 +0200
@@ -1,6 +1,6 @@
 Metadata-Version: 1.2
 Name: distributed
-Version: 2.29.0
+Version: 2.30.0
 Summary: Distributed scheduler for Dask
 Home-page: https://distributed.dask.org
 Maintainer: Matthew Rocklin
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/distributed-2.29.0/distributed/_version.py 
new/distributed-2.30.0/distributed/_version.py
--- old/distributed-2.29.0/distributed/_version.py      2020-10-03 
01:23:00.335679500 +0200
+++ new/distributed-2.30.0/distributed/_version.py      2020-10-07 
00:36:08.349047400 +0200
@@ -8,11 +8,11 @@
 
 version_json = '''
 {
- "date": "2020-10-02T18:22:26-0500",
+ "date": "2020-10-06T17:35:34-0500",
  "dirty": false,
  "error": null,
- "full-revisionid": "a80b867cf40b05aa423a19aea1a077764ffba0f4",
- "version": "2.29.0"
+ "full-revisionid": "a1dc5f437b39c1b35a9b05cbc048e3a793b89715",
+ "version": "2.30.0"
 }
 '''  # END VERSION_JSON
 
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/distributed-2.29.0/distributed/batched.py 
new/distributed-2.30.0/distributed/batched.py
--- old/distributed-2.29.0/distributed/batched.py       2020-09-18 
22:52:00.000000000 +0200
+++ new/distributed-2.30.0/distributed/batched.py       2020-10-07 
00:31:11.000000000 +0200
@@ -54,6 +54,7 @@
             
maxlen=dask.config.get("distributed.comm.recent-messages-log-length")
         )
         self.serializers = serializers
+        self._consecutive_failures = 0
 
     def start(self, comm):
         self.comm = comm
@@ -98,15 +99,43 @@
                     self.recent_message_log.append("large-message")
                 self.byte_count += nbytes
             except CommClosedError as e:
+                # If the comm is known to be closed, we'll immediately
+                # give up.
                 logger.info("Batched Comm Closed: %s", e)
                 break
             except Exception:
-                logger.exception("Error in batched write")
-                break
+                # In other cases we'll retry a few times.
+                # https://github.com/pangeo-data/pangeo/issues/788
+                if self._consecutive_failures <= 5:
+                    logger.warning("Error in batched write, retrying")
+                    yield gen.sleep(0.100 * 1.5 ** self._consecutive_failures)
+                    self._consecutive_failures += 1
+                    # Exponential backoff for retries.
+                    # Ensure we don't drop any messages.
+                    if self.buffer:
+                        # Someone could call send while we yielded above?
+                        self.buffer = payload + self.buffer
+                    else:
+                        self.buffer = payload
+                    continue
+                else:
+                    logger.exception("Error in batched write")
+                    break
             finally:
                 payload = None  # lose ref
-
+        else:
+            # nobreak. We've been gracefully closed.
+            self.stopped.set()
+            return
+
+        # If we've reached here, it means our comm is known to be closed or
+        # we've repeatedly failed to send a message. We can't close gracefully
+        # via `.close()` since we can't send messages. So we just abort.
+        # This means that any messages in our buffer our lost.
+        # To propagate exceptions, we rely on subsequent `BatchedSend.send`
+        # calls to raise CommClosedErrors.
         self.stopped.set()
+        self.abort()
 
     def send(self, msg):
         """Schedule a message for sending to the other side
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/distributed-2.29.0/distributed/compatibility.py 
new/distributed-2.30.0/distributed/compatibility.py
--- old/distributed-2.29.0/distributed/compatibility.py 2020-06-30 
17:19:56.000000000 +0200
+++ new/distributed-2.30.0/distributed/compatibility.py 2020-10-07 
00:31:11.000000000 +0200
@@ -2,11 +2,15 @@
 import platform
 import sys
 
+import tornado
+
 logging_names = logging._levelToName.copy()
 logging_names.update(logging._nameToLevel)
 
 PYPY = platform.python_implementation().lower() == "pypy"
 WINDOWS = sys.platform.startswith("win")
+TORNADO6 = tornado.version_info[0] >= 6
+PY37 = sys.version_info[:2] >= (3, 7)
 
 if sys.version_info[:2] >= (3, 7):
     from asyncio import get_running_loop
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' 
old/distributed-2.29.0/distributed/dashboard/components/nvml.py 
new/distributed-2.30.0/distributed/dashboard/components/nvml.py
--- old/distributed-2.29.0/distributed/dashboard/components/nvml.py     
2020-09-16 04:37:04.000000000 +0200
+++ new/distributed-2.30.0/distributed/dashboard/components/nvml.py     
2020-10-07 00:31:11.000000000 +0200
@@ -131,29 +131,23 @@
             memory_total = 0
             memory_max = 0
             worker = []
-            i = 0
 
-            for ws in workers:
+            for idx, ws in enumerate(workers):
                 try:
                     info = ws.extra["gpu"]
                 except KeyError:
                     continue
                 metrics = ws.metrics["gpu"]
-                for j, (u, mem_used, mem_total) in enumerate(
-                    zip(
-                        metrics["utilization"],
-                        metrics["memory-used"],
-                        info["memory-total"],
-                    )
-                ):
-                    memory_max = max(memory_max, mem_total)
-                    memory_total += mem_total
-                    utilization.append(int(u))
-                    memory.append(mem_used)
-                    worker.append(ws.address)
-                    gpu_index.append(j)
-                    y.append(i)
-                    i += 1
+                u = metrics["utilization"]
+                mem_used = metrics["memory-used"]
+                mem_total = info["memory-total"]
+                memory_max = max(memory_max, mem_total)
+                memory_total += mem_total
+                utilization.append(int(u))
+                memory.append(mem_used)
+                worker.append(ws.address)
+                gpu_index.append(idx)
+                y.append(idx)
 
             memory_text = [format_bytes(m) for m in memory]
 
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/distributed-2.29.0/distributed/diagnostics/nvml.py 
new/distributed-2.30.0/distributed/diagnostics/nvml.py
--- old/distributed-2.29.0/distributed/diagnostics/nvml.py      2020-06-30 
17:19:56.000000000 +0200
+++ new/distributed-2.30.0/distributed/diagnostics/nvml.py      2020-10-07 
00:31:11.000000000 +0200
@@ -1,28 +1,48 @@
+import os
 import pynvml
 
-handles = None
+nvmlInit = None
+
+
+def init_once():
+    global nvmlInit
+    if nvmlInit is not None:
+        return
+
+    from pynvml import nvmlInit as _nvmlInit
+
+    nvmlInit = _nvmlInit
+    nvmlInit()
 
 
 def _pynvml_handles():
-    global handles
-    if handles is None:
-        pynvml.nvmlInit()
-        count = pynvml.nvmlDeviceGetCount()
-        handles = [pynvml.nvmlDeviceGetHandleByIndex(i) for i in range(count)]
-    return handles
+    count = pynvml.nvmlDeviceGetCount()
+    try:
+        cuda_visible_devices = [
+            int(idx) for idx in os.environ.get("CUDA_VISIBLE_DEVICES", 
"").split(",")
+        ]
+    except ValueError:
+        # CUDA_VISIBLE_DEVICES is not set
+        cuda_visible_devices = False
+    if not cuda_visible_devices:
+        cuda_visible_devices = list(range(count))
+    gpu_idx = cuda_visible_devices[0]
+    return pynvml.nvmlDeviceGetHandleByIndex(gpu_idx)
 
 
 def real_time():
-    handles = _pynvml_handles()
+    init_once()
+    h = _pynvml_handles()
     return {
-        "utilization": [pynvml.nvmlDeviceGetUtilizationRates(h).gpu for h in 
handles],
-        "memory-used": [pynvml.nvmlDeviceGetMemoryInfo(h).used for h in 
handles],
+        "utilization": pynvml.nvmlDeviceGetUtilizationRates(h).gpu,
+        "memory-used": pynvml.nvmlDeviceGetMemoryInfo(h).used,
     }
 
 
 def one_time():
-    handles = _pynvml_handles()
+    init_once()
+    h = _pynvml_handles()
     return {
-        "memory-total": [pynvml.nvmlDeviceGetMemoryInfo(h).total for h in 
handles],
-        "name": [pynvml.nvmlDeviceGetName(h).decode() for h in handles],
+        "memory-total": pynvml.nvmlDeviceGetMemoryInfo(h).total,
+        "name": pynvml.nvmlDeviceGetName(h).decode(),
     }
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' 
old/distributed-2.29.0/distributed/diagnostics/tests/test_nvml.py 
new/distributed-2.30.0/distributed/diagnostics/tests/test_nvml.py
--- old/distributed-2.29.0/distributed/diagnostics/tests/test_nvml.py   
1970-01-01 01:00:00.000000000 +0100
+++ new/distributed-2.30.0/distributed/diagnostics/tests/test_nvml.py   
2020-10-07 00:31:11.000000000 +0200
@@ -0,0 +1,34 @@
+import pytest
+import os
+
+pynvml = pytest.importorskip("pynvml")
+
+from distributed.diagnostics import nvml
+
+
+def test_one_time():
+    output = nvml.one_time()
+    assert "memory-total" in output
+    assert "name" in output
+
+    assert len(output["name"]) > 0
+
+
+def test_1_visible_devices():
+    os.environ["CUDA_VISIBLE_DEVICES"] = "0"
+    output = nvml.one_time()
+    assert len(output["memory-total"]) == 1
+
+
+@pytest.mark.parametrize("CVD", ["1,0", "0,1"])
+def test_2_visible_devices(CVD):
+    os.environ["CUDA_VISIBLE_DEVICES"] = CVD
+    idx = int(CVD.split(",")[0])
+
+    h = nvml._pynvml_handles()
+    h2 = pynvml.nvmlDeviceGetHandleByIndex(idx)
+
+    s = pynvml.nvmlDeviceGetSerial(h)
+    s2 = pynvml.nvmlDeviceGetSerial(h2)
+
+    assert s == s2
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/distributed-2.29.0/distributed/tests/test_batched.py 
new/distributed-2.30.0/distributed/tests/test_batched.py
--- old/distributed-2.29.0/distributed/tests/test_batched.py    2020-08-25 
19:13:46.000000000 +0200
+++ new/distributed-2.30.0/distributed/tests/test_batched.py    2020-10-07 
00:31:11.000000000 +0200
@@ -1,5 +1,6 @@
 import asyncio
 import random
+from unittest import mock
 
 import pytest
 from tlz import assoc
@@ -10,6 +11,7 @@
 from distributed.utils import All, TimeoutError
 from distributed.utils_test import captured_logger
 from distributed.protocol import to_serialize
+from distributed.compatibility import WINDOWS, PY37, TORNADO6
 
 
 class EchoServer:
@@ -253,3 +255,55 @@
 
         with pytest.raises(TimeoutError):
             msg = await asyncio.wait_for(comm.read(), 0.1)
+
+
+@pytest.mark.asyncio
+@pytest.mark.skipif(
+    WINDOWS and not PY37 and not TORNADO6, reason="failing on windows, py36, 
tornado 5."
+)
+async def test_handles_exceptions():
+    # Ensure that we properly handle exceptions in BatchedSend.
+    # https://github.com/pangeo-data/pangeo/issues/788
+    # mentioned in https://github.com/dask/distributed/issues/4080, but
+    # possibly distinct.
+    #
+    # The reported issues (https://github.com/tornadoweb/tornado/pull/2008)
+    # claim that the BufferError *should* only happen when the application
+    # is incorrectly using threads. I haven't been able to construct an
+    # actual example, so we mock IOStream.write to raise and ensure that
+    # BufferedSend handles things correctly. We don't (yet) test that
+    # any *users* of BatchedSend correctly handle BatchedSend dropping
+    # messages.
+    async with EchoServer() as e:
+        comm = await connect(e.address)
+        b = BatchedSend(interval=10)
+        b.start(comm)
+        await asyncio.sleep(0.020)
+        orig = comm.stream.write
+
+        n = 0
+
+        def raise_buffererror(*args, **kwargs):
+            nonlocal n
+            n += 1
+
+            if n == 1:
+                raise BufferError("bad!")
+            elif n == 2:
+                orig(*args, **kwargs)
+            else:
+                raise CommClosedError
+
+        with mock.patch.object(comm.stream, "write", wraps=raise_buffererror):
+            b.send("hello")
+            b.send("hello")
+            b.send("world")
+            await asyncio.sleep(0.020)
+            result = await comm.read()
+            assert result == ("hello", "hello", "world")
+
+            b.send("raises when flushed")
+            await asyncio.sleep(0.020)  # CommClosedError hit in callback
+
+            with pytest.raises(CommClosedError):
+                b.send("raises when sent")
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/distributed-2.29.0/distributed/tests/test_client.py 
new/distributed-2.30.0/distributed/tests/test_client.py
--- old/distributed-2.29.0/distributed/tests/test_client.py     2020-10-03 
01:12:36.000000000 +0200
+++ new/distributed-2.30.0/distributed/tests/test_client.py     2020-10-07 
00:31:11.000000000 +0200
@@ -6165,3 +6165,28 @@
                     x = da.ones((10000, 10000))
                     y = x + x.T
                     await c.compute(y.sum())
+
+
+@gen_cluster(client=True)
+async def test_futures_in_subgraphs(c, s, a, b):
+    """Regression test of <https://github.com/dask/distributed/issues/4145>"""
+
+    dd = pytest.importorskip("dask.dataframe")
+    import pandas as pd
+
+    ddf = dd.from_pandas(
+        pd.DataFrame(
+            dict(
+                uid=range(50),
+                enter_time=pd.date_range(
+                    start="2020-01-01", end="2020-09-01", periods=50, tz="UTC"
+                ),
+            )
+        ),
+        npartitions=5,
+    )
+
+    ddf = ddf[ddf.uid.isin(range(29))].persist()
+    ddf["local_time"] = ddf.enter_time.dt.tz_convert("US/Central")
+    ddf["day"] = ddf.enter_time.dt.day_name()
+    ddf = await c.submit(dd.categorical.categorize, ddf, columns=["day"], 
index=False)
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/distributed-2.29.0/distributed/tests/test_utils.py 
new/distributed-2.30.0/distributed/tests/test_utils.py
--- old/distributed-2.29.0/distributed/tests/test_utils.py      2020-09-19 
04:28:28.000000000 +0200
+++ new/distributed-2.30.0/distributed/tests/test_utils.py      2020-10-07 
00:31:11.000000000 +0200
@@ -49,6 +49,7 @@
 )
 from distributed.utils_test import loop, loop_in_thread  # noqa: F401
 from distributed.utils_test import div, has_ipv6, inc, throws, gen_test, 
captured_logger
+from dask.optimization import SubgraphCallable
 
 
 def test_All(loop):
@@ -217,6 +218,11 @@
         assert all(isinstance(k, str) for k in sdsk)
         assert dask.get(dsk, keys) == dask.get(sdsk, skeys)
 
+    dsk = {("y", 1): (SubgraphCallable({"x": ("y", 1)}, "x", (("y", 1),)), 
(("z", 1),))}
+    dsk = str_graph(dsk, extra_values=(("z", 1),))
+    assert dsk["('y', 1)"][0].dsk["x"] == "('y', 1)"
+    assert dsk["('y', 1)"][1][0] == "('z', 1)"
+
 
 def test_maybe_complex():
     assert not _maybe_complex(1)
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/distributed-2.29.0/distributed/utils.py 
new/distributed-2.30.0/distributed/utils.py
--- old/distributed-2.29.0/distributed/utils.py 2020-10-03 01:12:36.000000000 
+0200
+++ new/distributed-2.30.0/distributed/utils.py 2020-10-07 00:31:11.000000000 
+0200
@@ -36,6 +36,7 @@
 
 import dask
 from dask import istask
+from dask.optimization import SubgraphCallable
 
 # provide format_bytes here for backwards compatibility
 from dask.utils import (  # noqa
@@ -777,17 +778,31 @@
 
 
 def convert(task, dsk, extra_values):
-    if type(task) is list:
+    typ = type(task)
+    if typ is tuple and task:
+        if type(task[0]) is SubgraphCallable:
+            sc = task[0]
+            return (
+                SubgraphCallable(
+                    convert(sc.dsk, dsk, extra_values),
+                    sc.outkey,
+                    convert(sc.inkeys, dsk, extra_values),
+                    sc.name,
+                ),
+            ) + tuple(convert(x, dsk, extra_values) for x in task[1:])
+        elif callable(task[0]):
+            return (task[0],) + tuple(convert(x, dsk, extra_values) for x in 
task[1:])
+    if typ is list:
         return [convert(v, dsk, extra_values) for v in task]
-    if type(task) is dict:
+    if typ is dict:
         return {k: convert(v, dsk, extra_values) for k, v in task.items()}
-    if istask(task):
-        return (task[0],) + tuple(convert(x, dsk, extra_values) for x in 
task[1:])
     try:
         if task in dsk or task in extra_values:
             return tokey(task)
     except TypeError:
         pass
+    if typ is tuple:  # If the tuple itself isn't a key, check its elements
+        return tuple(convert(v, dsk, extra_values) for v in task)
     return task
 
 
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/distributed-2.29.0/distributed.egg-info/PKG-INFO 
new/distributed-2.30.0/distributed.egg-info/PKG-INFO
--- old/distributed-2.29.0/distributed.egg-info/PKG-INFO        2020-10-03 
01:22:59.000000000 +0200
+++ new/distributed-2.30.0/distributed.egg-info/PKG-INFO        2020-10-07 
00:36:07.000000000 +0200
@@ -1,6 +1,6 @@
 Metadata-Version: 1.2
 Name: distributed
-Version: 2.29.0
+Version: 2.30.0
 Summary: Distributed scheduler for Dask
 Home-page: https://distributed.dask.org
 Maintainer: Matthew Rocklin
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/distributed-2.29.0/distributed.egg-info/SOURCES.txt 
new/distributed-2.30.0/distributed.egg-info/SOURCES.txt
--- old/distributed-2.29.0/distributed.egg-info/SOURCES.txt     2020-10-03 
01:22:59.000000000 +0200
+++ new/distributed-2.30.0/distributed.egg-info/SOURCES.txt     2020-10-07 
00:36:07.000000000 +0200
@@ -130,6 +130,7 @@
 distributed/diagnostics/websocket.py
 distributed/diagnostics/tests/test_eventstream.py
 distributed/diagnostics/tests/test_graph_layout.py
+distributed/diagnostics/tests/test_nvml.py
 distributed/diagnostics/tests/test_progress.py
 distributed/diagnostics/tests/test_progress_stream.py
 distributed/diagnostics/tests/test_progressbar.py
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/distributed-2.29.0/docs/source/changelog.rst 
new/distributed-2.30.0/docs/source/changelog.rst
--- old/distributed-2.29.0/docs/source/changelog.rst    2020-10-03 
01:21:56.000000000 +0200
+++ new/distributed-2.30.0/docs/source/changelog.rst    2020-10-07 
00:34:03.000000000 +0200
@@ -1,6 +1,15 @@
 Changelog
 =========
 
+2.30.0 - 2020-10-06
+-------------------
+
+- Support ``SubgraphCallable`` in ``str_graph()`` (:pr:`4148`) `Mads R. B. 
Kristensen`_
+- Handle exceptions in ``BatchedSend`` (:pr:`4135`) `Tom Augspurger`_
+- Fix for missing ``:`` in autosummary docs (:pr:`4143`) `Gil Forsyth`_
+- Limit GPU metrics to visible devices only (:pr:`3810`) `Jacob Tomlinson`_
+
+
 2.29.0 - 2020-10-02
 -------------------
 


Reply via email to