Hello community, here is the log from the commit of package python-distributed for openSUSE:Factory checked in at 2020-10-25 18:06:20 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ Comparing /work/SRC/openSUSE:Factory/python-distributed (Old) and /work/SRC/openSUSE:Factory/.python-distributed.new.3463 (New) ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Package is "python-distributed" Sun Oct 25 18:06:20 2020 rev:37 rq:841145 version:2.30.0 Changes: -------- --- /work/SRC/openSUSE:Factory/python-distributed/python-distributed.changes 2020-10-07 14:17:37.957444944 +0200 +++ /work/SRC/openSUSE:Factory/.python-distributed.new.3463/python-distributed.changes 2020-10-25 18:06:35.759343920 +0100 @@ -1,0 +2,11 @@ +Sat Oct 10 19:04:32 UTC 2020 - Arun Persaud <a...@gmx.de> + +- update to version 2.30.0: + * Support SubgraphCallable in str_graph() (GH#4148) Mads + R. B. Kristensen + * Handle exceptions in BatchedSend (GH#4135) Tom Augspurger + * Fix for missing : in autosummary docs (GH#4143) Gil Forsyth + * Limit GPU metrics to visible devices only (GH#3810) Jacob + Tomlinson + +------------------------------------------------------------------- Old: ---- distributed-2.29.0.tar.gz New: ---- distributed-2.30.0.tar.gz ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ Other differences: ------------------ ++++++ python-distributed.spec ++++++ --- /var/tmp/diff_new_pack.xd9V4k/_old 2020-10-25 18:06:37.715345771 +0100 +++ /var/tmp/diff_new_pack.xd9V4k/_new 2020-10-25 18:06:37.719345775 +0100 @@ -21,7 +21,7 @@ # Test requires network connection %bcond_with test Name: python-distributed -Version: 2.29.0 +Version: 2.30.0 Release: 0 Summary: Library for distributed computing with Python License: BSD-3-Clause ++++++ distributed-2.29.0.tar.gz -> distributed-2.30.0.tar.gz ++++++ diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/distributed-2.29.0/PKG-INFO new/distributed-2.30.0/PKG-INFO --- old/distributed-2.29.0/PKG-INFO 2020-10-03 01:23:00.334293100 +0200 +++ new/distributed-2.30.0/PKG-INFO 2020-10-07 00:36:08.340993600 +0200 @@ -1,6 +1,6 @@ Metadata-Version: 1.2 Name: distributed -Version: 2.29.0 +Version: 2.30.0 Summary: Distributed scheduler for Dask Home-page: https://distributed.dask.org Maintainer: Matthew Rocklin diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/distributed-2.29.0/distributed/_version.py new/distributed-2.30.0/distributed/_version.py --- old/distributed-2.29.0/distributed/_version.py 2020-10-03 01:23:00.335679500 +0200 +++ new/distributed-2.30.0/distributed/_version.py 2020-10-07 00:36:08.349047400 +0200 @@ -8,11 +8,11 @@ version_json = ''' { - "date": "2020-10-02T18:22:26-0500", + "date": "2020-10-06T17:35:34-0500", "dirty": false, "error": null, - "full-revisionid": "a80b867cf40b05aa423a19aea1a077764ffba0f4", - "version": "2.29.0" + "full-revisionid": "a1dc5f437b39c1b35a9b05cbc048e3a793b89715", + "version": "2.30.0" } ''' # END VERSION_JSON diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/distributed-2.29.0/distributed/batched.py new/distributed-2.30.0/distributed/batched.py --- old/distributed-2.29.0/distributed/batched.py 2020-09-18 22:52:00.000000000 +0200 +++ new/distributed-2.30.0/distributed/batched.py 2020-10-07 00:31:11.000000000 +0200 @@ -54,6 +54,7 @@ maxlen=dask.config.get("distributed.comm.recent-messages-log-length") ) self.serializers = serializers + self._consecutive_failures = 0 def start(self, comm): self.comm = comm @@ -98,15 +99,43 @@ self.recent_message_log.append("large-message") self.byte_count += nbytes except CommClosedError as e: + # If the comm is known to be closed, we'll immediately + # give up. logger.info("Batched Comm Closed: %s", e) break except Exception: - logger.exception("Error in batched write") - break + # In other cases we'll retry a few times. + # https://github.com/pangeo-data/pangeo/issues/788 + if self._consecutive_failures <= 5: + logger.warning("Error in batched write, retrying") + yield gen.sleep(0.100 * 1.5 ** self._consecutive_failures) + self._consecutive_failures += 1 + # Exponential backoff for retries. + # Ensure we don't drop any messages. + if self.buffer: + # Someone could call send while we yielded above? + self.buffer = payload + self.buffer + else: + self.buffer = payload + continue + else: + logger.exception("Error in batched write") + break finally: payload = None # lose ref - + else: + # nobreak. We've been gracefully closed. + self.stopped.set() + return + + # If we've reached here, it means our comm is known to be closed or + # we've repeatedly failed to send a message. We can't close gracefully + # via `.close()` since we can't send messages. So we just abort. + # This means that any messages in our buffer our lost. + # To propagate exceptions, we rely on subsequent `BatchedSend.send` + # calls to raise CommClosedErrors. self.stopped.set() + self.abort() def send(self, msg): """Schedule a message for sending to the other side diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/distributed-2.29.0/distributed/compatibility.py new/distributed-2.30.0/distributed/compatibility.py --- old/distributed-2.29.0/distributed/compatibility.py 2020-06-30 17:19:56.000000000 +0200 +++ new/distributed-2.30.0/distributed/compatibility.py 2020-10-07 00:31:11.000000000 +0200 @@ -2,11 +2,15 @@ import platform import sys +import tornado + logging_names = logging._levelToName.copy() logging_names.update(logging._nameToLevel) PYPY = platform.python_implementation().lower() == "pypy" WINDOWS = sys.platform.startswith("win") +TORNADO6 = tornado.version_info[0] >= 6 +PY37 = sys.version_info[:2] >= (3, 7) if sys.version_info[:2] >= (3, 7): from asyncio import get_running_loop diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/distributed-2.29.0/distributed/dashboard/components/nvml.py new/distributed-2.30.0/distributed/dashboard/components/nvml.py --- old/distributed-2.29.0/distributed/dashboard/components/nvml.py 2020-09-16 04:37:04.000000000 +0200 +++ new/distributed-2.30.0/distributed/dashboard/components/nvml.py 2020-10-07 00:31:11.000000000 +0200 @@ -131,29 +131,23 @@ memory_total = 0 memory_max = 0 worker = [] - i = 0 - for ws in workers: + for idx, ws in enumerate(workers): try: info = ws.extra["gpu"] except KeyError: continue metrics = ws.metrics["gpu"] - for j, (u, mem_used, mem_total) in enumerate( - zip( - metrics["utilization"], - metrics["memory-used"], - info["memory-total"], - ) - ): - memory_max = max(memory_max, mem_total) - memory_total += mem_total - utilization.append(int(u)) - memory.append(mem_used) - worker.append(ws.address) - gpu_index.append(j) - y.append(i) - i += 1 + u = metrics["utilization"] + mem_used = metrics["memory-used"] + mem_total = info["memory-total"] + memory_max = max(memory_max, mem_total) + memory_total += mem_total + utilization.append(int(u)) + memory.append(mem_used) + worker.append(ws.address) + gpu_index.append(idx) + y.append(idx) memory_text = [format_bytes(m) for m in memory] diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/distributed-2.29.0/distributed/diagnostics/nvml.py new/distributed-2.30.0/distributed/diagnostics/nvml.py --- old/distributed-2.29.0/distributed/diagnostics/nvml.py 2020-06-30 17:19:56.000000000 +0200 +++ new/distributed-2.30.0/distributed/diagnostics/nvml.py 2020-10-07 00:31:11.000000000 +0200 @@ -1,28 +1,48 @@ +import os import pynvml -handles = None +nvmlInit = None + + +def init_once(): + global nvmlInit + if nvmlInit is not None: + return + + from pynvml import nvmlInit as _nvmlInit + + nvmlInit = _nvmlInit + nvmlInit() def _pynvml_handles(): - global handles - if handles is None: - pynvml.nvmlInit() - count = pynvml.nvmlDeviceGetCount() - handles = [pynvml.nvmlDeviceGetHandleByIndex(i) for i in range(count)] - return handles + count = pynvml.nvmlDeviceGetCount() + try: + cuda_visible_devices = [ + int(idx) for idx in os.environ.get("CUDA_VISIBLE_DEVICES", "").split(",") + ] + except ValueError: + # CUDA_VISIBLE_DEVICES is not set + cuda_visible_devices = False + if not cuda_visible_devices: + cuda_visible_devices = list(range(count)) + gpu_idx = cuda_visible_devices[0] + return pynvml.nvmlDeviceGetHandleByIndex(gpu_idx) def real_time(): - handles = _pynvml_handles() + init_once() + h = _pynvml_handles() return { - "utilization": [pynvml.nvmlDeviceGetUtilizationRates(h).gpu for h in handles], - "memory-used": [pynvml.nvmlDeviceGetMemoryInfo(h).used for h in handles], + "utilization": pynvml.nvmlDeviceGetUtilizationRates(h).gpu, + "memory-used": pynvml.nvmlDeviceGetMemoryInfo(h).used, } def one_time(): - handles = _pynvml_handles() + init_once() + h = _pynvml_handles() return { - "memory-total": [pynvml.nvmlDeviceGetMemoryInfo(h).total for h in handles], - "name": [pynvml.nvmlDeviceGetName(h).decode() for h in handles], + "memory-total": pynvml.nvmlDeviceGetMemoryInfo(h).total, + "name": pynvml.nvmlDeviceGetName(h).decode(), } diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/distributed-2.29.0/distributed/diagnostics/tests/test_nvml.py new/distributed-2.30.0/distributed/diagnostics/tests/test_nvml.py --- old/distributed-2.29.0/distributed/diagnostics/tests/test_nvml.py 1970-01-01 01:00:00.000000000 +0100 +++ new/distributed-2.30.0/distributed/diagnostics/tests/test_nvml.py 2020-10-07 00:31:11.000000000 +0200 @@ -0,0 +1,34 @@ +import pytest +import os + +pynvml = pytest.importorskip("pynvml") + +from distributed.diagnostics import nvml + + +def test_one_time(): + output = nvml.one_time() + assert "memory-total" in output + assert "name" in output + + assert len(output["name"]) > 0 + + +def test_1_visible_devices(): + os.environ["CUDA_VISIBLE_DEVICES"] = "0" + output = nvml.one_time() + assert len(output["memory-total"]) == 1 + + +@pytest.mark.parametrize("CVD", ["1,0", "0,1"]) +def test_2_visible_devices(CVD): + os.environ["CUDA_VISIBLE_DEVICES"] = CVD + idx = int(CVD.split(",")[0]) + + h = nvml._pynvml_handles() + h2 = pynvml.nvmlDeviceGetHandleByIndex(idx) + + s = pynvml.nvmlDeviceGetSerial(h) + s2 = pynvml.nvmlDeviceGetSerial(h2) + + assert s == s2 diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/distributed-2.29.0/distributed/tests/test_batched.py new/distributed-2.30.0/distributed/tests/test_batched.py --- old/distributed-2.29.0/distributed/tests/test_batched.py 2020-08-25 19:13:46.000000000 +0200 +++ new/distributed-2.30.0/distributed/tests/test_batched.py 2020-10-07 00:31:11.000000000 +0200 @@ -1,5 +1,6 @@ import asyncio import random +from unittest import mock import pytest from tlz import assoc @@ -10,6 +11,7 @@ from distributed.utils import All, TimeoutError from distributed.utils_test import captured_logger from distributed.protocol import to_serialize +from distributed.compatibility import WINDOWS, PY37, TORNADO6 class EchoServer: @@ -253,3 +255,55 @@ with pytest.raises(TimeoutError): msg = await asyncio.wait_for(comm.read(), 0.1) + + +@pytest.mark.asyncio +@pytest.mark.skipif( + WINDOWS and not PY37 and not TORNADO6, reason="failing on windows, py36, tornado 5." +) +async def test_handles_exceptions(): + # Ensure that we properly handle exceptions in BatchedSend. + # https://github.com/pangeo-data/pangeo/issues/788 + # mentioned in https://github.com/dask/distributed/issues/4080, but + # possibly distinct. + # + # The reported issues (https://github.com/tornadoweb/tornado/pull/2008) + # claim that the BufferError *should* only happen when the application + # is incorrectly using threads. I haven't been able to construct an + # actual example, so we mock IOStream.write to raise and ensure that + # BufferedSend handles things correctly. We don't (yet) test that + # any *users* of BatchedSend correctly handle BatchedSend dropping + # messages. + async with EchoServer() as e: + comm = await connect(e.address) + b = BatchedSend(interval=10) + b.start(comm) + await asyncio.sleep(0.020) + orig = comm.stream.write + + n = 0 + + def raise_buffererror(*args, **kwargs): + nonlocal n + n += 1 + + if n == 1: + raise BufferError("bad!") + elif n == 2: + orig(*args, **kwargs) + else: + raise CommClosedError + + with mock.patch.object(comm.stream, "write", wraps=raise_buffererror): + b.send("hello") + b.send("hello") + b.send("world") + await asyncio.sleep(0.020) + result = await comm.read() + assert result == ("hello", "hello", "world") + + b.send("raises when flushed") + await asyncio.sleep(0.020) # CommClosedError hit in callback + + with pytest.raises(CommClosedError): + b.send("raises when sent") diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/distributed-2.29.0/distributed/tests/test_client.py new/distributed-2.30.0/distributed/tests/test_client.py --- old/distributed-2.29.0/distributed/tests/test_client.py 2020-10-03 01:12:36.000000000 +0200 +++ new/distributed-2.30.0/distributed/tests/test_client.py 2020-10-07 00:31:11.000000000 +0200 @@ -6165,3 +6165,28 @@ x = da.ones((10000, 10000)) y = x + x.T await c.compute(y.sum()) + + +@gen_cluster(client=True) +async def test_futures_in_subgraphs(c, s, a, b): + """Regression test of <https://github.com/dask/distributed/issues/4145>""" + + dd = pytest.importorskip("dask.dataframe") + import pandas as pd + + ddf = dd.from_pandas( + pd.DataFrame( + dict( + uid=range(50), + enter_time=pd.date_range( + start="2020-01-01", end="2020-09-01", periods=50, tz="UTC" + ), + ) + ), + npartitions=5, + ) + + ddf = ddf[ddf.uid.isin(range(29))].persist() + ddf["local_time"] = ddf.enter_time.dt.tz_convert("US/Central") + ddf["day"] = ddf.enter_time.dt.day_name() + ddf = await c.submit(dd.categorical.categorize, ddf, columns=["day"], index=False) diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/distributed-2.29.0/distributed/tests/test_utils.py new/distributed-2.30.0/distributed/tests/test_utils.py --- old/distributed-2.29.0/distributed/tests/test_utils.py 2020-09-19 04:28:28.000000000 +0200 +++ new/distributed-2.30.0/distributed/tests/test_utils.py 2020-10-07 00:31:11.000000000 +0200 @@ -49,6 +49,7 @@ ) from distributed.utils_test import loop, loop_in_thread # noqa: F401 from distributed.utils_test import div, has_ipv6, inc, throws, gen_test, captured_logger +from dask.optimization import SubgraphCallable def test_All(loop): @@ -217,6 +218,11 @@ assert all(isinstance(k, str) for k in sdsk) assert dask.get(dsk, keys) == dask.get(sdsk, skeys) + dsk = {("y", 1): (SubgraphCallable({"x": ("y", 1)}, "x", (("y", 1),)), (("z", 1),))} + dsk = str_graph(dsk, extra_values=(("z", 1),)) + assert dsk["('y', 1)"][0].dsk["x"] == "('y', 1)" + assert dsk["('y', 1)"][1][0] == "('z', 1)" + def test_maybe_complex(): assert not _maybe_complex(1) diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/distributed-2.29.0/distributed/utils.py new/distributed-2.30.0/distributed/utils.py --- old/distributed-2.29.0/distributed/utils.py 2020-10-03 01:12:36.000000000 +0200 +++ new/distributed-2.30.0/distributed/utils.py 2020-10-07 00:31:11.000000000 +0200 @@ -36,6 +36,7 @@ import dask from dask import istask +from dask.optimization import SubgraphCallable # provide format_bytes here for backwards compatibility from dask.utils import ( # noqa @@ -777,17 +778,31 @@ def convert(task, dsk, extra_values): - if type(task) is list: + typ = type(task) + if typ is tuple and task: + if type(task[0]) is SubgraphCallable: + sc = task[0] + return ( + SubgraphCallable( + convert(sc.dsk, dsk, extra_values), + sc.outkey, + convert(sc.inkeys, dsk, extra_values), + sc.name, + ), + ) + tuple(convert(x, dsk, extra_values) for x in task[1:]) + elif callable(task[0]): + return (task[0],) + tuple(convert(x, dsk, extra_values) for x in task[1:]) + if typ is list: return [convert(v, dsk, extra_values) for v in task] - if type(task) is dict: + if typ is dict: return {k: convert(v, dsk, extra_values) for k, v in task.items()} - if istask(task): - return (task[0],) + tuple(convert(x, dsk, extra_values) for x in task[1:]) try: if task in dsk or task in extra_values: return tokey(task) except TypeError: pass + if typ is tuple: # If the tuple itself isn't a key, check its elements + return tuple(convert(v, dsk, extra_values) for v in task) return task diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/distributed-2.29.0/distributed.egg-info/PKG-INFO new/distributed-2.30.0/distributed.egg-info/PKG-INFO --- old/distributed-2.29.0/distributed.egg-info/PKG-INFO 2020-10-03 01:22:59.000000000 +0200 +++ new/distributed-2.30.0/distributed.egg-info/PKG-INFO 2020-10-07 00:36:07.000000000 +0200 @@ -1,6 +1,6 @@ Metadata-Version: 1.2 Name: distributed -Version: 2.29.0 +Version: 2.30.0 Summary: Distributed scheduler for Dask Home-page: https://distributed.dask.org Maintainer: Matthew Rocklin diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/distributed-2.29.0/distributed.egg-info/SOURCES.txt new/distributed-2.30.0/distributed.egg-info/SOURCES.txt --- old/distributed-2.29.0/distributed.egg-info/SOURCES.txt 2020-10-03 01:22:59.000000000 +0200 +++ new/distributed-2.30.0/distributed.egg-info/SOURCES.txt 2020-10-07 00:36:07.000000000 +0200 @@ -130,6 +130,7 @@ distributed/diagnostics/websocket.py distributed/diagnostics/tests/test_eventstream.py distributed/diagnostics/tests/test_graph_layout.py +distributed/diagnostics/tests/test_nvml.py distributed/diagnostics/tests/test_progress.py distributed/diagnostics/tests/test_progress_stream.py distributed/diagnostics/tests/test_progressbar.py diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/distributed-2.29.0/docs/source/changelog.rst new/distributed-2.30.0/docs/source/changelog.rst --- old/distributed-2.29.0/docs/source/changelog.rst 2020-10-03 01:21:56.000000000 +0200 +++ new/distributed-2.30.0/docs/source/changelog.rst 2020-10-07 00:34:03.000000000 +0200 @@ -1,6 +1,15 @@ Changelog ========= +2.30.0 - 2020-10-06 +------------------- + +- Support ``SubgraphCallable`` in ``str_graph()`` (:pr:`4148`) `Mads R. B. Kristensen`_ +- Handle exceptions in ``BatchedSend`` (:pr:`4135`) `Tom Augspurger`_ +- Fix for missing ``:`` in autosummary docs (:pr:`4143`) `Gil Forsyth`_ +- Limit GPU metrics to visible devices only (:pr:`3810`) `Jacob Tomlinson`_ + + 2.29.0 - 2020-10-02 -------------------