Hello community, here is the log from the commit of package python-distributed for openSUSE:Factory checked in at 2019-11-26 17:02:21 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ Comparing /work/SRC/openSUSE:Factory/python-distributed (Old) and /work/SRC/openSUSE:Factory/.python-distributed.new.26869 (New) ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Package is "python-distributed" Tue Nov 26 17:02:21 2019 rev:20 rq:750861 version:2.8.1 Changes: -------- --- /work/SRC/openSUSE:Factory/python-distributed/python-distributed.changes 2019-11-17 19:23:27.694858205 +0100 +++ /work/SRC/openSUSE:Factory/.python-distributed.new.26869/python-distributed.changes 2019-11-26 17:02:55.692036843 +0100 @@ -1,0 +2,18 @@ +Sun Nov 24 17:36:02 UTC 2019 - Arun Persaud <[email protected]> + +- update to version 2.8.1: + * Fix hanging worker when the scheduler leaves (GH#3250) Tom + Augspurger + * Fix NumPy writeable serialization bug (GH#3253) James Bourbeau + * Skip numba.cuda tests if CUDA is not available (GH#3255) Peter + Andreas Entschev + * Add new dashboard plot for memory use by key (GH#3243) Matthew + Rocklin + * Fix array.shape() -> array.shape (GH#3247) Jed Brown + * Fixed typos in pubsub.py (GH#3244) He Jia + * Fixed cupy array going out of scope (GH#3240) Mads + R. B. Kristensen + * Remove gen.coroutine usage in scheduler (GH#3242) Jim Crist-Harif + * Use inspect.isawaitable where relevant (GH#3241) Jim Crist-Harif + +------------------------------------------------------------------- Old: ---- distributed-2.8.0.tar.gz New: ---- distributed-2.8.1.tar.gz ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ Other differences: ------------------ ++++++ python-distributed.spec ++++++ --- /var/tmp/diff_new_pack.o1ESYF/_old 2019-11-26 17:02:57.904036079 +0100 +++ /var/tmp/diff_new_pack.o1ESYF/_new 2019-11-26 17:02:57.932036068 +0100 @@ -1,7 +1,7 @@ # # spec file for package python-distributed # -# Copyright (c) 2019 SUSE LINUX GmbH, Nuernberg, Germany. +# Copyright (c) 2019 SUSE LLC # # All modifications and additions to the file contributed by third parties # remain the property of their copyright owners, unless otherwise agreed @@ -21,7 +21,7 @@ # Test requires network connection %bcond_with test Name: python-distributed -Version: 2.8.0 +Version: 2.8.1 Release: 0 Summary: Library for distributed computing with Python License: BSD-3-Clause ++++++ distributed-2.8.0.tar.gz -> distributed-2.8.1.tar.gz ++++++ diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/distributed-2.8.0/PKG-INFO new/distributed-2.8.1/PKG-INFO --- old/distributed-2.8.0/PKG-INFO 2019-11-14 23:59:02.000000000 +0100 +++ new/distributed-2.8.1/PKG-INFO 2019-11-23 05:48:31.000000000 +0100 @@ -1,6 +1,6 @@ Metadata-Version: 1.2 Name: distributed -Version: 2.8.0 +Version: 2.8.1 Summary: Distributed scheduler for Dask Home-page: https://distributed.dask.org Maintainer: Matthew Rocklin diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/distributed-2.8.0/distributed/_version.py new/distributed-2.8.1/distributed/_version.py --- old/distributed-2.8.0/distributed/_version.py 2019-11-14 23:59:02.000000000 +0100 +++ new/distributed-2.8.1/distributed/_version.py 2019-11-23 05:48:31.000000000 +0100 @@ -8,11 +8,11 @@ version_json = ''' { - "date": "2019-11-14T14:58:28-0800", + "date": "2019-11-22T22:46:55-0600", "dirty": false, "error": null, - "full-revisionid": "4d0d58aade4460fab6e7e85a3548353671036d2c", - "version": "2.8.0" + "full-revisionid": "507659d79434845e50d48c247ff42d5efd336686", + "version": "2.8.1" } ''' # END VERSION_JSON diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/distributed-2.8.0/distributed/core.py new/distributed-2.8.1/distributed/core.py --- old/distributed-2.8.0/distributed/core.py 2019-11-14 19:11:20.000000000 +0100 +++ new/distributed-2.8.1/distributed/core.py 2019-11-19 17:18:47.000000000 +0100 @@ -2,6 +2,7 @@ from collections import defaultdict, deque from concurrent.futures import CancelledError from functools import partial +from inspect import isawaitable import logging import threading import traceback @@ -397,7 +398,7 @@ logger.debug("Calling into handler %s", handler.__name__) try: result = handler(comm, **msg) - if hasattr(result, "__await__"): + if isawaitable(result): result = asyncio.ensure_future(result) self._ongoing_coroutines.add(result) result = await result diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/distributed-2.8.0/distributed/dashboard/components/scheduler.py new/distributed-2.8.1/distributed/dashboard/components/scheduler.py --- old/distributed-2.8.0/distributed/dashboard/components/scheduler.py 2019-11-14 23:49:45.000000000 +0100 +++ new/distributed-2.8.1/distributed/dashboard/components/scheduler.py 2019-11-20 20:03:05.000000000 +0100 @@ -1,3 +1,4 @@ +from collections import defaultdict import logging import math from numbers import Number @@ -36,7 +37,7 @@ from bokeh.transform import factor_cmap, linear_cmap from bokeh.io import curdoc import dask -from dask.utils import format_bytes +from dask.utils import format_bytes, key_split from toolz import pipe from tornado import escape @@ -428,6 +429,82 @@ update(self.source, result) +class MemoryByKey(DashboardComponent): + """ Bar chart showing memory use by key prefix""" + + def __init__(self, scheduler, **kwargs): + with log_errors(): + self.last = 0 + self.scheduler = scheduler + self.source = ColumnDataSource( + { + "name": ["a", "b"], + "nbytes": [100, 1000], + "count": [1, 2], + "color": ["blue", "blue"], + } + ) + + fig = figure( + title="Memory Use", + tools="", + id="bk-memory-by-key-plot", + name="memory_by_key", + x_range=["a", "b"], + **kwargs, + ) + rect = fig.vbar( + source=self.source, x="name", top="nbytes", width=0.9, color="color" + ) + fig.yaxis[0].formatter = NumeralTickFormatter(format="0.0 b") + fig.xaxis.major_label_orientation = -math.pi / 12 + rect.nonselection_glyph = None + + fig.xaxis.minor_tick_line_alpha = 0 + fig.ygrid.visible = False + + fig.toolbar.logo = None + fig.toolbar_location = None + + hover = HoverTool() + hover.tooltips = "@name: @nbytes_text" + hover.tooltips = """ + <div> + <p><b>Name:</b> @name</p> + <p><b>Bytes:</b> @nbytes_text </p> + <p><b>Count:</b> @count objects </p> + </div> + """ + hover.point_policy = "follow_mouse" + fig.add_tools(hover) + + self.fig = fig + + @without_property_validation + def update(self): + with log_errors(): + counts = defaultdict(int) + nbytes = defaultdict(int) + for ws in self.scheduler.workers.values(): + for ts in ws.has_what: + ks = key_split(ts.key) + counts[ks] += 1 + nbytes[ks] += ts.nbytes + + names = list(sorted(counts)) + self.fig.x_range.factors = names + result = { + "name": names, + "count": [counts[name] for name in names], + "nbytes": [nbytes[name] for name in names], + "nbytes_text": [format_bytes(nbytes[name]) for name in names], + "color": [color_of(name) for name in names], + } + self.fig.title.text = "Total Use: " + format_bytes(sum(nbytes.values())) + + update(self.source, result) + + class CurrentLoad(DashboardComponent): """ How many tasks are on each worker """ @@ -1865,6 +1942,15 @@ doc.theme = BOKEH_THEME +def individual_memory_by_key_doc(scheduler, extra, doc): + with log_errors(): + component = MemoryByKey(scheduler, sizing_mode="stretch_both") + component.update() + add_periodic_callback(doc, component, 500) + doc.add_root(component.fig) + doc.theme = BOKEH_THEME + + def profile_doc(scheduler, extra, doc): with log_errors(): doc.title = "Dask: Profile" diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/distributed-2.8.0/distributed/dashboard/scheduler.py new/distributed-2.8.1/distributed/dashboard/scheduler.py --- old/distributed-2.8.0/distributed/dashboard/scheduler.py 2019-11-14 19:11:20.000000000 +0100 +++ new/distributed-2.8.1/distributed/dashboard/scheduler.py 2019-11-20 20:03:05.000000000 +0100 @@ -36,6 +36,7 @@ individual_workers_doc, individual_bandwidth_types_doc, individual_bandwidth_workers_doc, + individual_memory_by_key_doc, ) from .core import BokehServer from .worker import counters_doc @@ -408,6 +409,7 @@ "/individual-workers": individual_workers_doc, "/individual-bandwidth-types": individual_bandwidth_types_doc, "/individual-bandwidth-workers": individual_bandwidth_workers_doc, + "/individual-memory-by-key": individual_memory_by_key_doc, } try: diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/distributed-2.8.0/distributed/dashboard/tests/test_scheduler_bokeh.py new/distributed-2.8.1/distributed/dashboard/tests/test_scheduler_bokeh.py --- old/distributed-2.8.0/distributed/dashboard/tests/test_scheduler_bokeh.py 2019-11-12 21:02:54.000000000 +0100 +++ new/distributed-2.8.1/distributed/dashboard/tests/test_scheduler_bokeh.py 2019-11-20 20:03:05.000000000 +0100 @@ -11,6 +11,7 @@ from tornado import gen from tornado.httpclient import AsyncHTTPClient, HTTPRequest +import dask from dask.core import flatten from distributed.utils import tokey, format_dashboard_link from distributed.client import wait @@ -34,6 +35,7 @@ WorkerTable, TaskGraph, ProfileServer, + MemoryByKey, ) from distributed.dashboard import scheduler @@ -690,3 +692,20 @@ body = response.body.decode() assert "bokeh" in body.lower() assert not re.search("href=./", body) # no absolute links + + +@gen_cluster( + client=True, scheduler_kwargs={"services": {("dashboard", 0): BokehScheduler}} +) +async def test_memory_by_key(c, s, a, b): + mbk = MemoryByKey(s) + + da = pytest.importorskip("dask.array") + x = (da.random.random((20, 20), chunks=(10, 10)) + 1).persist(optimize_graph=False) + await x + + y = await dask.delayed(inc)(1).persist() + + mbk.update() + assert mbk.source.data["name"] == ["add", "inc"] + assert mbk.source.data["nbytes"] == [x.nbytes, sys.getsizeof(1)] diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/distributed-2.8.0/distributed/deploy/adaptive.py new/distributed-2.8.1/distributed/deploy/adaptive.py --- old/distributed-2.8.0/distributed/deploy/adaptive.py 2019-11-12 21:02:54.000000000 +0100 +++ new/distributed-2.8.1/distributed/deploy/adaptive.py 2019-11-19 17:18:47.000000000 +0100 @@ -1,3 +1,4 @@ +from inspect import isawaitable import logging import math @@ -158,7 +159,7 @@ # close workers more forcefully logger.info("Retiring workers %s", workers) f = self.cluster.scale_down(workers) - if hasattr(f, "__await__"): + if isawaitable(f): await f async def scale_up(self, n): diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/distributed-2.8.0/distributed/protocol/cupy.py new/distributed-2.8.1/distributed/protocol/cupy.py --- old/distributed-2.8.0/distributed/protocol/cupy.py 2019-11-14 19:11:20.000000000 +0100 +++ new/distributed-2.8.1/distributed/protocol/cupy.py 2019-11-19 17:18:47.000000000 +0100 @@ -6,14 +6,34 @@ class PatchedCudaArrayInterface(object): - # TODO: This class wont be necessary - # once Cupy<7.0 is no longer supported + """This class do two things: + 1) Makes sure that __cuda_array_interface__['strides'] + behaves as specified in the protocol. + 2) Makes sure that the cuda context is active + when deallocating the base cuda array. + Notice, this is only needed when the array to deserialize + isn't a native cupy array. + """ + def __init__(self, ary): cai = ary.__cuda_array_interface__ cai_cupy_vsn = cupy.ndarray(0).__cuda_array_interface__["version"] if cai.get("strides") is None and cai_cupy_vsn < 2: cai.pop("strides", None) self.__cuda_array_interface__ = cai + # Save a ref to ary so it won't go out of scope + self.base = ary + + def __del__(self): + # Making sure that the cuda context is active + # when deallocating the base cuda array + try: + import numba.cuda + + numba.cuda.current_context() + except ImportError: + pass + del self.base @cuda_serialize.register(cupy.ndarray) diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/distributed-2.8.0/distributed/protocol/numpy.py new/distributed-2.8.1/distributed/protocol/numpy.py --- old/distributed-2.8.0/distributed/protocol/numpy.py 2019-11-14 19:11:20.000000000 +0100 +++ new/distributed-2.8.1/distributed/protocol/numpy.py 2019-11-23 05:33:46.000000000 +0100 @@ -46,7 +46,7 @@ # Only serialize non-broadcasted data for arrays with zero strided axes if 0 in x.strides: - broadcast_to = (x.shape, x.flags.writeable) + broadcast_to = x.shape x = x[tuple(slice(None) if s != 0 else slice(1) for s in x.strides)] else: broadcast_to = None @@ -103,14 +103,12 @@ else: dt = np.dtype(dt) - x = np.ndarray( - header["shape"], dtype=dt, buffer=frames[0], strides=header["strides"] - ) - if header.get("broadcast_to"): - shape, writeable = header["broadcast_to"] - x = np.broadcast_to(x, shape) - x.setflags(write=writeable) + shape = header["broadcast_to"] + else: + shape = header["shape"] + + x = np.ndarray(shape, dtype=dt, buffer=frames[0], strides=header["strides"]) return x diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/distributed-2.8.0/distributed/protocol/tests/test_numba.py new/distributed-2.8.1/distributed/protocol/tests/test_numba.py --- old/distributed-2.8.0/distributed/protocol/tests/test_numba.py 2019-11-12 21:02:54.000000000 +0100 +++ new/distributed-2.8.1/distributed/protocol/tests/test_numba.py 2019-11-21 19:57:09.000000000 +0100 @@ -7,6 +7,9 @@ @pytest.mark.parametrize("dtype", ["u1", "u4", "u8", "f4"]) def test_serialize_cupy(dtype): + if not cuda.is_available(): + pytest.skip("CUDA is not available") + ary = np.arange(100, dtype=dtype) x = cuda.to_device(ary) header, frames = serialize(x, serializers=("cuda", "dask", "pickle")) diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/distributed-2.8.0/distributed/protocol/tests/test_numpy.py new/distributed-2.8.1/distributed/protocol/tests/test_numpy.py --- old/distributed-2.8.0/distributed/protocol/tests/test_numpy.py 2019-11-14 19:11:20.000000000 +0100 +++ new/distributed-2.8.1/distributed/protocol/tests/test_numpy.py 2019-11-23 05:33:46.000000000 +0100 @@ -288,3 +288,22 @@ header, frames = serialize(x) assert "broadcast_to" not in header assert sum(map(nbytes, frames)) == x.nbytes + + +def test_serialize_writeable_array_readonly_base_object(): + # Regression test for https://github.com/dask/distributed/issues/3252 + + x = np.arange(3) + # Create array which doesn't own it's own memory + y = np.broadcast_to(x, (3, 3)) + + # Make y writeable and it's base object (x) read-only + y.setflags(write=True) + x.setflags(write=False) + + # Serialize / deserialize y + z = deserialize(*serialize(y)) + np.testing.assert_equal(z, y) + + # Ensure z and y have the same flags (including WRITEABLE) + assert z.flags == y.flags diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/distributed-2.8.0/distributed/pubsub.py new/distributed-2.8.1/distributed/pubsub.py --- old/distributed-2.8.0/distributed/pubsub.py 2019-11-14 19:11:20.000000000 +0100 +++ new/distributed-2.8.1/distributed/pubsub.py 2019-11-19 17:18:47.000000000 +0100 @@ -74,7 +74,7 @@ def remove_subscriber(self, comm=None, name=None, worker=None, client=None): if worker: - logger.debug("Add worker subscriber: %s %s", name, worker) + logger.debug("Remove worker subscriber: %s %s", name, worker) self.subscribers[name].remove(worker) for pub in self.publishers[name]: self.scheduler.worker_send( @@ -82,7 +82,7 @@ {"op": "pubsub-remove-subscriber", "address": worker, "name": name}, ) elif client: - logger.debug("Add client subscriber: %s %s", name, client) + logger.debug("Remove client subscriber: %s %s", name, client) self.client_subscribers[name].remove(client) if not self.client_subscribers[name]: del self.client_subscribers[name] diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/distributed-2.8.0/distributed/scheduler.py new/distributed-2.8.1/distributed/scheduler.py --- old/distributed-2.8.0/distributed/scheduler.py 2019-11-14 19:11:20.000000000 +0100 +++ new/distributed-2.8.1/distributed/scheduler.py 2019-11-19 17:18:47.000000000 +0100 @@ -3,6 +3,7 @@ from collections.abc import Mapping, Set from datetime import timedelta from functools import partial +from inspect import isawaitable import itertools import json import logging @@ -24,7 +25,6 @@ from toolz import frequencies, merge, pluck, merge_sorted, first from toolz import valmap, second, compose, groupby from tornado import gen -from tornado.gen import Return from tornado.ioloop import IOLoop import dask @@ -2007,8 +2007,8 @@ return if ts is None or not ts.who_wants: # no key yet, lets try again in a moment if retries: - self.loop.add_future( - gen.sleep(0.2), lambda _: self.cancel_key(key, client, retries - 1) + self.loop.call_later( + 0.2, lambda: self.cancel_key(key, client, retries - 1) ) return if force or ts.who_wants == {cs}: # no one else wants this key @@ -2700,8 +2700,7 @@ ) return d[worker] - @gen.coroutine - def rebalance(self, comm=None, keys=None, workers=None): + async def rebalance(self, comm=None, keys=None, workers=None): """ Rebalance keys so that each worker stores roughly equal bytes **Policy** @@ -2777,9 +2776,9 @@ to_recipients[recipient.address][ts.key].append(sender.address) to_senders[sender.address].append(ts.key) - result = yield { - r: self.rpc(addr=r).gather(who_has=v) for r, v in to_recipients.items() - } + result = await asyncio.gather( + *(self.rpc(addr=r).gather(who_has=v) for r, v in to_recipients.items()) + ) for r, v in to_recipients.items(): self.log_event(r, {"action": "rebalance", "who_has": v}) @@ -2794,13 +2793,11 @@ }, ) - if not all(r["status"] == "OK" for r in result.values()): - raise Return( - { - "status": "missing-data", - "keys": sum([r["keys"] for r in result if "keys" in r], []), - } - ) + if not all(r["status"] == "OK" for r in result): + return { + "status": "missing-data", + "keys": sum([r["keys"] for r in result if "keys" in r], []), + } for sender, recipient, ts in msgs: assert ts.state == "memory" @@ -2811,20 +2808,21 @@ ("rebalance", ts.key, time(), sender.address, recipient.address) ) - result = yield { - r: self.rpc(addr=r).delete_data(keys=v, report=False) - for r, v in to_senders.items() - } + await asyncio.gather( + *( + self.rpc(addr=r).delete_data(keys=v, report=False) + for r, v in to_senders.items() + ) + ) for sender, recipient, ts in msgs: ts.who_has.remove(sender) sender.has_what.remove(ts) sender.nbytes -= ts.get_nbytes() - raise Return({"status": "OK"}) + return {"status": "OK"} - @gen.coroutine - def replicate( + async def replicate( self, comm=None, keys=None, @@ -2867,7 +2865,7 @@ tasks = {self.tasks[k] for k in keys} missing_data = [ts.key for ts in tasks if not ts.who_has] if missing_data: - raise Return({"status": "missing-data", "keys": missing_data}) + return {"status": "missing-data", "keys": missing_data} # Delete extraneous data if delete: @@ -2878,12 +2876,14 @@ for ws in random.sample(del_candidates, len(del_candidates) - n): del_worker_tasks[ws].add(ts) - yield [ - self.rpc(addr=ws.address).delete_data( - keys=[ts.key for ts in tasks], report=False + await asyncio.gather( + *( + self.rpc(addr=ws.address).delete_data( + keys=[ts.key for ts in tasks], report=False + ) + for ws, tasks in del_worker_tasks.items() ) - for ws, tasks in del_worker_tasks.items() - ] + ) for ws, tasks in del_worker_tasks.items(): ws.has_what -= tasks @@ -2911,11 +2911,13 @@ for ws in random.sample(workers - ts.who_has, count): gathers[ws.address][ts.key] = [wws.address for wws in ts.who_has] - results = yield { - w: self.rpc(addr=w).gather(who_has=who_has) - for w, who_has in gathers.items() - } - for w, v in results.items(): + results = await asyncio.gather( + *( + self.rpc(addr=w).gather(who_has=who_has) + for w, who_has in gathers.items() + ) + ) + for w, v in zip(gathers, results): if v["status"] == "OK": self.add_keys(worker=w, keys=list(gathers[w])) else: @@ -3283,7 +3285,7 @@ if teardown: teardown = pickle.loads(teardown) state = setup(self) if setup else None - if hasattr(state, "__await__"): + if isawaitable(state): state = await state try: while self.status == "running": @@ -3348,8 +3350,7 @@ else: return {w: ws.nthreads for w, ws in self.workers.items()} - @gen.coroutine - def get_call_stack(self, comm=None, keys=None): + async def get_call_stack(self, comm=None, keys=None): if keys is not None: stack = list(keys) processing = set() @@ -3369,14 +3370,13 @@ workers = {w: None for w in self.workers} if not workers: - raise gen.Return({}) + return {} - else: - response = yield { - w: self.rpc(w).call_stack(keys=v) for w, v in workers.items() - } - response = {k: v for k, v in response.items() if v} - raise gen.Return(response) + results = await asyncio.gather( + *(self.rpc(w).call_stack(keys=v) for w, v in workers.items()) + ) + response = {w: r for w, r in zip(workers, results) if r} + return response def get_nbytes(self, comm=None, keys=None, summary=True): with log_errors(): @@ -4613,8 +4613,7 @@ else: return (start_time, ws.nbytes) - @gen.coroutine - def get_profile( + async def get_profile( self, comm=None, workers=None, @@ -4627,15 +4626,17 @@ workers = self.workers else: workers = set(self.workers) & set(workers) - result = yield { - w: self.rpc(w).profile(start=start, stop=stop, key=key) for w in workers - } + results = await asyncio.gather( + *(self.rpc(w).profile(start=start, stop=stop, key=key) for w in workers) + ) + if merge_workers: - result = profile.merge(*result.values()) - raise gen.Return(result) + response = profile.merge(*results) + else: + response = dict(zip(workers, results)) + return response - @gen.coroutine - def get_profile_metadata( + async def get_profile_metadata( self, comm=None, workers=None, @@ -4653,22 +4654,22 @@ workers = self.workers else: workers = set(self.workers) & set(workers) - result = yield { - w: self.rpc(w).profile_metadata(start=start, stop=stop) for w in workers - } + results = await asyncio.gather( + *(self.rpc(w).profile_metadata(start=start, stop=stop) for w in workers) + ) - counts = [v["counts"] for v in result.values()] + counts = [v["counts"] for v in results] counts = itertools.groupby(merge_sorted(*counts), lambda t: t[0] // dt * dt) counts = [(time, sum(pluck(1, group))) for time, group in counts] keys = set() - for v in result.values(): + for v in results: for t, d in v["keys"]: for k in d: keys.add(k) keys = {k: [] for k in keys} - groups1 = [v["keys"] for v in result.values()] + groups1 = [v["keys"] for v in results] groups2 = list(merge_sorted(*groups1, key=first)) last = 0 @@ -4681,7 +4682,7 @@ for k, v in d.items(): keys[k][-1][1] += v - raise gen.Return({"counts": counts, "keys": keys}) + return {"counts": counts, "keys": keys} async def get_worker_logs(self, comm=None, n=None, workers=None, nanny=False): results = await self.broadcast( diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/distributed-2.8.0/distributed/tests/test_nanny.py new/distributed-2.8.1/distributed/tests/test_nanny.py --- old/distributed-2.8.0/distributed/tests/test_nanny.py 2019-11-12 21:02:54.000000000 +0100 +++ new/distributed-2.8.1/distributed/tests/test_nanny.py 2019-11-23 05:33:46.000000000 +0100 @@ -1,3 +1,4 @@ +import asyncio import gc import logging import os @@ -130,6 +131,20 @@ @pytest.mark.slow +@gen_cluster(config={"distributed.comm.timeouts.connect": "1s"}) +async def test_no_hang_when_scheduler_closes(s, a, b): + # https://github.com/dask/distributed/issues/2880 + with captured_logger("tornado.application", logging.ERROR) as logger: + await s.close() + await asyncio.sleep(1.2) + assert a.status == "closed" + assert b.status == "closed" + + out = logger.getvalue() + assert "Timed out trying to connect" not in out + + [email protected] @gen_cluster( Worker=Nanny, nthreads=[("127.0.0.1", 1)], worker_kwargs={"reconnect": False} ) diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/distributed-2.8.0/distributed/worker.py new/distributed-2.8.1/distributed/worker.py --- old/distributed-2.8.0/distributed/worker.py 2019-11-14 19:11:20.000000000 +0100 +++ new/distributed-2.8.1/distributed/worker.py 2019-11-23 05:33:46.000000000 +0100 @@ -4,6 +4,7 @@ from collections.abc import MutableMapping from datetime import timedelta import heapq +from inspect import isawaitable import logging import os from pickle import PicklingError @@ -748,7 +749,7 @@ for k, metric in self.metrics.items(): try: result = metric(self) - if hasattr(result, "__await__"): + if isawaitable(result): result = await result custom[k] = result except Exception: # TODO: log error once @@ -761,7 +762,7 @@ for k, f in self.startup_information.items(): try: v = f(self) - if hasattr(v, "__await__"): + if isawaitable(v): v = await v result[k] = v except Exception: # TODO: log error once @@ -881,6 +882,12 @@ ) self.bandwidth_workers.clear() self.bandwidth_types.clear() + except IOError as e: + # Scheduler is gone. Respect distributed.comm.timeouts.connect + if "Timed out trying to connect" in str(e): + await self.close(report=False) + else: + raise e except CommClosedError: logger.warning("Heartbeat to scheduler failed") finally: @@ -1057,7 +1064,7 @@ if hasattr(plugin, "teardown") ] - await asyncio.gather(*[td for td in teardowns if hasattr(td, "__await__")]) + await asyncio.gather(*[td for td in teardowns if isawaitable(td)]) for pc in self.periodic_callbacks.values(): pc.stop() @@ -2301,7 +2308,7 @@ if hasattr(plugin, "setup"): try: result = plugin.setup(worker=self) - if hasattr(result, "__await__"): + if isawaitable(result): result = await result except Exception as e: msg = error_message(e) diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/distributed-2.8.0/distributed.egg-info/PKG-INFO new/distributed-2.8.1/distributed.egg-info/PKG-INFO --- old/distributed-2.8.0/distributed.egg-info/PKG-INFO 2019-11-14 23:59:01.000000000 +0100 +++ new/distributed-2.8.1/distributed.egg-info/PKG-INFO 2019-11-23 05:48:30.000000000 +0100 @@ -1,6 +1,6 @@ Metadata-Version: 1.2 Name: distributed -Version: 2.8.0 +Version: 2.8.1 Summary: Distributed scheduler for Dask Home-page: https://distributed.dask.org Maintainer: Matthew Rocklin diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/distributed-2.8.0/docs/source/changelog.rst new/distributed-2.8.1/docs/source/changelog.rst --- old/distributed-2.8.0/docs/source/changelog.rst 2019-11-14 23:51:50.000000000 +0100 +++ new/distributed-2.8.1/docs/source/changelog.rst 2019-11-23 05:44:44.000000000 +0100 @@ -1,6 +1,20 @@ Changelog ========= +2.8.1 - 2019-11-22 +------------------ + +- Fix hanging worker when the scheduler leaves (:pr:`3250`) `Tom Augspurger`_ +- Fix NumPy writeable serialization bug (:pr:`3253`) `James Bourbeau`_ +- Skip ``numba.cuda`` tests if CUDA is not available (:pr:`3255`) `Peter Andreas Entschev`_ +- Add new dashboard plot for memory use by key (:pr:`3243`) `Matthew Rocklin`_ +- Fix ``array.shape()`` -> ``array.shape`` (:pr:`3247`) `Jed Brown`_ +- Fixed typos in ``pubsub.py`` (:pr:`3244`) `He Jia`_ +- Fixed cupy array going out of scope (:pr:`3240`) `Mads R. B. Kristensen`_ +- Remove ``gen.coroutine`` usage in scheduler (:pr:`3242`) `Jim Crist-Harif`_ +- Use ``inspect.isawaitable`` where relevant (:pr:`3241`) `Jim Crist-Harif`_ + + 2.8.0 - 2019-11-14 ------------------ @@ -1391,3 +1405,6 @@ .. _`IPetrik`: https://github.com/IPetrik .. _`Simon Boothroyd`: https://github.com/SimonBoothroyd .. _`rockwellw`: https://github.com/rockwellw +.. _`Jed Brown`: https://github.com/jedbrown +.. _`He Jia`: https://github.com/HerculesJack +.. _`Jim Crist-Harif`: https://github.com/jcrist diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/distributed-2.8.0/docs/source/efficiency.rst new/distributed-2.8.1/docs/source/efficiency.rst --- old/distributed-2.8.0/docs/source/efficiency.rst 2019-05-29 19:29:52.000000000 +0200 +++ new/distributed-2.8.1/docs/source/efficiency.rst 2019-11-19 17:18:47.000000000 +0100 @@ -31,7 +31,7 @@ .. code-block:: python - >>> x.result().shape() # Slow from lots of data transfer + >>> x.result().shape # Slow from lots of data transfer (1000, 1000) **Fast**
