Hello community, here is the log from the commit of package python-distributed for openSUSE:Factory checked in at 2020-07-20 21:00:19 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ Comparing /work/SRC/openSUSE:Factory/python-distributed (Old) and /work/SRC/openSUSE:Factory/.python-distributed.new.3592 (New) ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Package is "python-distributed" Mon Jul 20 21:00:19 2020 rev:31 rq:821677 version:2.21.0 Changes: -------- --- /work/SRC/openSUSE:Factory/python-distributed/python-distributed.changes 2020-07-10 14:12:58.947549367 +0200 +++ /work/SRC/openSUSE:Factory/.python-distributed.new.3592/python-distributed.changes 2020-07-20 21:01:40.709111605 +0200 @@ -1,0 +2,25 @@ +Sat Jul 18 18:13:10 UTC 2020 - Arun Persaud <[email protected]> + +- update to version 2.21.0: + * Fix data replication error (GH#3963) Andrew Fulton + * Treat falsey local directory as None (GH#3964) Tom Augspurger + * Unpin numpydoc now that 1.1 is released (GH#3957) Gil Forsyth + * Error hard when Dask has mismatched versions or lz4 installed + (GH#3936) Matthew Rocklin + * Skip coercing to bytes in merge_frames (GH#3960) jakirkham + * UCX: reuse endpoints in order to fix NVLINK issue (GH#3953) Mads + R. B. Kristensen + * Optionally use pickle5 (GH#3849) jakirkham + * Update time per task chart with filtering and pie (GH#3933) + Benjamin Zaitlen + * UCX: explicit shutdown message (GH#3950) Mads R. B. Kristensen + * Avoid too aggressive retry of connections (GH#3944) Matthias + Bussonnier + * Parse timeouts in Client.sync (GH#3952) Matthew Rocklin + * Synchronize on non-trivial CUDA frame transmission (GH#3949) + jakirkham + * Serialize memoryview with shape and format (GH#3947) jakirkham + * Move scheduler_comm into Cluster.__init__ (GH#3945) Matthew + Rocklin + +------------------------------------------------------------------- Old: ---- distributed-2.20.0.tar.gz New: ---- distributed-2.21.0.tar.gz ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ Other differences: ------------------ ++++++ python-distributed.spec ++++++ --- /var/tmp/diff_new_pack.jHcxYz/_old 2020-07-20 21:01:42.581113502 +0200 +++ /var/tmp/diff_new_pack.jHcxYz/_new 2020-07-20 21:01:42.585113507 +0200 @@ -21,7 +21,7 @@ # Test requires network connection %bcond_with test Name: python-distributed -Version: 2.20.0 +Version: 2.21.0 Release: 0 Summary: Library for distributed computing with Python License: BSD-3-Clause ++++++ distributed-2.20.0.tar.gz -> distributed-2.21.0.tar.gz ++++++ diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/distributed-2.20.0/PKG-INFO new/distributed-2.21.0/PKG-INFO --- old/distributed-2.20.0/PKG-INFO 2020-07-03 06:24:10.787851000 +0200 +++ new/distributed-2.21.0/PKG-INFO 2020-07-18 00:18:01.338119300 +0200 @@ -1,6 +1,6 @@ Metadata-Version: 1.2 Name: distributed -Version: 2.20.0 +Version: 2.21.0 Summary: Distributed scheduler for Dask Home-page: https://distributed.dask.org Maintainer: Matthew Rocklin diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/distributed-2.20.0/distributed/_version.py new/distributed-2.21.0/distributed/_version.py --- old/distributed-2.20.0/distributed/_version.py 2020-07-03 06:24:10.789626800 +0200 +++ new/distributed-2.21.0/distributed/_version.py 2020-07-18 00:18:01.339028400 +0200 @@ -8,11 +8,11 @@ version_json = ''' { - "date": "2020-07-02T23:23:16-0500", + "date": "2020-07-17T17:17:21-0500", "dirty": false, "error": null, - "full-revisionid": "08d334e2e18bd977752eeab87e2c09272a2ac829", - "version": "2.20.0" + "full-revisionid": "ef168a471a7b6b4dc4b023196afc229cda109608", + "version": "2.21.0" } ''' # END VERSION_JSON diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/distributed-2.20.0/distributed/client.py new/distributed-2.21.0/distributed/client.py --- old/distributed-2.20.0/distributed/client.py 2020-07-03 06:15:41.000000000 +0200 +++ new/distributed-2.21.0/distributed/client.py 2020-07-17 06:40:21.000000000 +0200 @@ -818,6 +818,7 @@ return format_dashboard_link(host, port) def sync(self, func, *args, asynchronous=None, callback_timeout=None, **kwargs): + callback_timeout = parse_timedelta(callback_timeout) if ( asynchronous or self.asynchronous @@ -1043,7 +1044,7 @@ try: await self._ensure_connected(timeout=timeout) - except OSError: + except (OSError, ImportError): await self._close() raise @@ -1076,6 +1077,9 @@ # Wait a bit before retrying await asyncio.sleep(0.1) timeout = deadline - self.loop.time() + except ImportError: + await self._close() + break else: logger.error( "Failed to reconnect to scheduler after %.2f " @@ -1126,6 +1130,8 @@ assert len(msg) == 1 assert msg[0]["op"] == "stream-start" + if msg[0].get("error"): + raise ImportError(msg[0]["error"]) if msg[0].get("warning"): warnings.warn(version_module.VersionMismatchWarning(msg[0]["warning"])) @@ -3681,8 +3687,10 @@ if check: msg = version_module.error_message(scheduler, workers, client) - if msg: - raise ValueError(msg) + if msg["warning"]: + warnings.warn(msg["warning"]) + if msg["error"]: + raise ValueError(msg["error"]) return result diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/distributed-2.20.0/distributed/comm/core.py new/distributed-2.21.0/distributed/comm/core.py --- old/distributed-2.20.0/distributed/comm/core.py 2020-06-30 17:19:56.000000000 +0200 +++ new/distributed-2.21.0/distributed/comm/core.py 2020-07-10 15:10:16.000000000 +0200 @@ -3,6 +3,7 @@ from contextlib import suppress import inspect import logging +import random import weakref import dask @@ -218,6 +219,8 @@ if timeout and timeout / 20 < backoff: backoff = timeout / 20 + retry_timeout_backoff = random.randrange(140, 160) / 100 + # This starts a thread while True: try: @@ -227,7 +230,7 @@ ) with suppress(TimeoutError): comm = await asyncio.wait_for( - future, timeout=min(deadline - time(), 1) + future, timeout=min(deadline - time(), retry_timeout_backoff) ) break if not comm: @@ -239,7 +242,8 @@ if time() < deadline: logger.debug("Could not connect, waiting before retrying") await asyncio.sleep(backoff) - backoff *= 1.5 + backoff *= random.randrange(140, 160) / 100 + retry_timeout_backoff *= random.randrange(140, 160) / 100 backoff = min(backoff, 1) # wait at most one second else: _raise(error) diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/distributed-2.20.0/distributed/comm/ucx.py new/distributed-2.21.0/distributed/comm/ucx.py --- old/distributed-2.20.0/distributed/comm/ucx.py 2020-06-30 17:19:56.000000000 +0200 +++ new/distributed-2.21.0/distributed/comm/ucx.py 2020-07-17 06:40:21.000000000 +0200 @@ -35,6 +35,8 @@ ucp = None host_array = None device_array = None +ucx_create_endpoint = None +ucx_create_listener = None def synchronize_stream(stream=0): @@ -47,7 +49,7 @@ def init_once(): - global ucp, host_array, device_array + global ucp, host_array, device_array, ucx_create_endpoint, ucx_create_listener if ucp is not None: return @@ -107,6 +109,19 @@ pool_allocator=True, managed_memory=False, initial_pool_size=pool_size ) + try: + from ucp.endpoint_reuse import EndpointReuse + except ImportError: + ucx_create_endpoint = ucp.create_endpoint + ucx_create_listener = ucp.create_listener + else: + if dask.config.get("ucx.reuse-endpoints"): + ucx_create_endpoint = EndpointReuse.create_endpoint + ucx_create_listener = EndpointReuse.create_listener + else: + ucx_create_endpoint = ucp.create_endpoint + ucx_create_listener = ucp.create_listener + class UCX(Comm): """Comm object using UCP. @@ -136,7 +151,7 @@ The expected read cycle is - 1. Read the frame describing number of frames + 1. Read the frame describing if connection is closing and number of frames 2. Read the frame describing whether each data frame is gpu-bound 3. Read the frame describing whether each data frame is sized 4. Read all the data frames. @@ -186,16 +201,18 @@ hasattr(f, "__cuda_array_interface__") for f in frames ) sizes = tuple(nbytes(f) for f in frames) - send_frames = [ - each_frame - for each_frame, each_size in zip(frames, sizes) - if each_size - ] + cuda_send_frames, send_frames = zip( + *( + (is_cuda, each_frame) + for is_cuda, each_frame in zip(cuda_frames, frames) + if len(each_frame) > 0 + ) + ) # Send meta data - # Send # of frames (uint64) - await self.ep.send(struct.pack("Q", nframes)) + # Send close flag and number of frames (_Bool, int64) + await self.ep.send(struct.pack("?Q", False, nframes)) # Send which frames are CUDA (bool) and # how large each frame is (uint64) await self.ep.send( @@ -209,7 +226,7 @@ # syncing the default stream will wait for other non-blocking CUDA streams. # Note this is only sufficient if the memory being sent is not currently in use on # non-blocking CUDA streams. - if any(cuda_frames): + if any(cuda_send_frames): synchronize_stream(0) for each_frame in send_frames: @@ -230,11 +247,13 @@ try: # Recv meta data - # Recv # of frames (uint64) - nframes_fmt = "Q" - nframes = host_array(struct.calcsize(nframes_fmt)) - await self.ep.recv(nframes) - (nframes,) = struct.unpack(nframes_fmt, nframes) + # Recv close flag and number of frames (_Bool, int64) + msg = host_array(struct.calcsize("?Q")) + await self.ep.recv(msg) + (shutdown, nframes) = struct.unpack("?Q", msg) + + if shutdown: # The writer is closing the connection + raise CancelledError("Connection closed by writer") # Recv which frames are CUDA (bool) and # how large each frame is (uint64) @@ -252,13 +271,17 @@ device_array(each_size) if is_cuda else host_array(each_size) for is_cuda, each_size in zip(cuda_frames, sizes) ] - recv_frames = [ - each_frame for each_frame in frames if len(each_frame) > 0 - ] + cuda_recv_frames, recv_frames = zip( + *( + (is_cuda, each_frame) + for is_cuda, each_frame in zip(cuda_frames, frames) + if len(each_frame) > 0 + ) + ) # It is necessary to first populate `frames` with CUDA arrays and synchronize # the default stream before starting receiving to ensure buffers have been allocated - if any(cuda_frames): + if any(cuda_recv_frames): synchronize_stream(0) for each_frame in recv_frames: @@ -273,7 +296,8 @@ async def close(self): if self._ep is not None: - await self._ep.close() + await self.ep.send(struct.pack("?Q", True, 0)) + self.abort() self._ep = None def abort(self): @@ -301,7 +325,7 @@ logger.debug("UCXConnector.connect: %s", address) ip, port = parse_host_port(address) init_once() - ep = await ucp.create_endpoint(ip, port) + ep = await ucx_create_endpoint(ip, port) return self.comm_class( ep, local_addr=None, @@ -354,7 +378,7 @@ await self.comm_handler(ucx) init_once() - self.ucp_server = ucp.create_listener(serve_forever, port=self._input_port) + self.ucp_server = ucx_create_listener(serve_forever, port=self._input_port) def stop(self): self.ucp_server = None @@ -420,7 +444,7 @@ # configuration of UCX can happen in two ways: # 1) high level on/off flags which correspond to UCX configuration - # 2) explicity defined UCX configuration flags + # 2) explicitly defined UCX configuration flags # import does not initialize ucp -- this will occur outside this function from ucp import get_config diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/distributed-2.20.0/distributed/dashboard/components/scheduler.py new/distributed-2.21.0/distributed/dashboard/components/scheduler.py --- old/distributed-2.20.0/distributed/dashboard/components/scheduler.py 2020-06-30 17:19:56.000000000 +0200 +++ new/distributed-2.21.0/distributed/dashboard/components/scheduler.py 2020-07-10 15:10:16.000000000 +0200 @@ -26,12 +26,15 @@ BoxSelectTool, GroupFilter, CDSView, + Tabs, + Panel, + Title, ) from bokeh.models.widgets import DataTable, TableColumn from bokeh.plotting import figure from bokeh.palettes import Viridis11 from bokeh.themes import Theme -from bokeh.transform import factor_cmap, linear_cmap +from bokeh.transform import factor_cmap, linear_cmap, cumsum from bokeh.io import curdoc import dask from dask import config @@ -440,7 +443,7 @@ update(self.source, result) -class ComputerPerKey(DashboardComponent): +class ComputePerKey(DashboardComponent): """ Bar chart showing time spend in action by key prefix""" def __init__(self, scheduler, **kwargs): @@ -456,6 +459,8 @@ compute_data = { "times": [0.2, 0.1], + "formatted_time": ["0.2 ms", "2.8 us"], + "angles": [3.14, 0.785], "color": [ts_color_lookup["transfer"], ts_color_lookup["compute"]], "names": ["sum", "sum_partial"], } @@ -477,14 +482,13 @@ top="times", width=0.7, color="color", - legend_field="names", ) fig.y_range.start = 0 fig.min_border_right = 20 fig.min_border_bottom = 60 fig.yaxis.axis_label = "Time (s)" - fig.yaxis[0].formatter = NumeralTickFormatter(format="0.0s") + fig.yaxis[0].formatter = NumeralTickFormatter(format="0") fig.yaxis.ticker = AdaptiveTicker(**TICKS_1024) fig.xaxis.major_label_orientation = -math.pi / 12 rect.nonselection_glyph = None @@ -499,13 +503,76 @@ hover.tooltips = """ <div> <p><b>Name:</b> @names</p> - <p><b>Time:</b> @times s</p> + <p><b>Time:</b> @formatted_time</p> </div> """ hover.point_policy = "follow_mouse" fig.add_tools(hover) + fig.add_layout( + Title( + text="Note: tasks less than 2% of max are not displayed", + text_font_style="italic", + ), + "below", + ) + self.fig = fig + tab1 = Panel(child=fig, title="Bar Chart") + + compute_wedge_data = { + "times": [0.2, 0.1], + "formatted_time": ["0.2 ms", "2.8 us"], + "angles": [1.4, 0.8], + "color": [ts_color_lookup["transfer"], ts_color_lookup["compute"]], + "names": ["sum", "sum_partial"], + } + + fig2 = figure( + title="Compute Time Per Task", + tools="", + id="bk-Compute-by-key-pie", + name="compute_time_per_key-pie", + x_range=(-0.5, 1.0), + **kwargs, + ) + + wedge = fig2.wedge( + x=0, + y=1, + radius=0.4, + start_angle=cumsum("angles", include_zero=True), + end_angle=cumsum("angles"), + line_color="white", + fill_color="color", + legend_field="names", + source=self.compute_source, + ) + + fig2.axis.axis_label = None + fig2.axis.visible = False + fig2.grid.grid_line_color = None + fig2.add_layout( + Title( + text="Note: tasks less than 2% of max are not displayed", + text_font_style="italic", + ), + "below", + ) + + hover = HoverTool() + hover.tooltips = """ + <div> + <p><b>Name:</b> @names</p> + <p><b>Time:</b> @formatted_time</p> + </div> + """ + hover.point_policy = "follow_mouse" + fig2.add_tools(hover) + self.wedge_fig = fig2 + tab2 = Panel(child=fig2, title="Pie Chart") + + self.tabs = Tabs(tabs=[tab1, tab2]) @without_property_validation def update(self): @@ -523,22 +590,33 @@ compute_times.items(), key=lambda x: x[1], reverse=True ) - compute_colors = list() - compute_names = list() - compute_time = list() - for name, t in compute_times: - compute_names.append(name) - compute_colors.append(ts_color_of(name)) - compute_time.append(t) - - self.fig.x_range.factors = compute_names - self.fig.title.text = "Compute Time Per Task" - - compute_result = dict( - times=compute_time, color=compute_colors, names=compute_names - ) + # keep only time which are 2% of max or greater + if compute_times: + max_time = compute_times[0][1] * 0.02 + compute_times = [(n, t) for n, t in compute_times if t > max_time] + compute_colors = list() + compute_names = list() + compute_time = list() + total_time = 0 + for name, t in compute_times: + compute_names.append(name) + compute_colors.append(ts_color_of(name)) + compute_time.append(t) + total_time += t + + angles = [t / total_time * 2 * math.pi for t in compute_time] + + self.fig.x_range.factors = compute_names + + compute_result = dict( + angles=angles, + times=compute_time, + color=compute_colors, + names=compute_names, + formatted_time=[format_time(t) for t in compute_time], + ) - update(self.compute_source, compute_result) + update(self.compute_source, compute_result) class AggregateAction(DashboardComponent): @@ -557,6 +635,7 @@ action_data = { "times": [0.2, 0.1], + "formatted_time": ["0.2 ms", "2.8 us"], "color": [ts_color_lookup["transfer"], ts_color_lookup["compute"]], "names": ["transfer", "compute"], } @@ -578,13 +657,12 @@ top="times", width=0.7, color="color", - legend_field="names", ) fig.y_range.start = 0 fig.min_border_right = 20 fig.min_border_bottom = 60 - fig.yaxis[0].formatter = NumeralTickFormatter(format="0.0s") + fig.yaxis[0].formatter = NumeralTickFormatter(format="0") fig.yaxis.axis_label = "Time (s)" fig.yaxis.ticker = AdaptiveTicker(**TICKS_1024) fig.xaxis.major_label_orientation = -math.pi / 12 @@ -601,7 +679,7 @@ hover.tooltips = """ <div> <p><b>Name:</b> @names</p> - <p><b>Time:</b> @times s</p> + <p><b>Time:</b> @formatted_time</p> </div> """ hover.point_policy = "follow_mouse" @@ -635,7 +713,12 @@ self.fig.x_range.factors = agg_names self.fig.title.text = "Aggregate Time Per Action" - action_result = dict(times=agg_time, color=agg_colors, names=agg_names) + action_result = dict( + times=agg_time, + color=agg_colors, + names=agg_names, + formatted_time=[format_time(t) for t in agg_time], + ) update(self.action_source, action_result) @@ -2129,10 +2212,10 @@ def individual_compute_time_per_key_doc(scheduler, extra, doc): with log_errors(): - component = ComputerPerKey(scheduler, sizing_mode="stretch_both") + component = ComputePerKey(scheduler, sizing_mode="stretch_both") component.update() add_periodic_callback(doc, component, 500) - doc.add_root(component.fig) + doc.add_root(component.tabs) doc.theme = BOKEH_THEME diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/distributed-2.20.0/distributed/dashboard/tests/test_scheduler_bokeh.py new/distributed-2.21.0/distributed/dashboard/tests/test_scheduler_bokeh.py --- old/distributed-2.20.0/distributed/dashboard/tests/test_scheduler_bokeh.py 2020-06-30 17:19:56.000000000 +0200 +++ new/distributed-2.21.0/distributed/dashboard/tests/test_scheduler_bokeh.py 2020-07-10 15:10:16.000000000 +0200 @@ -36,7 +36,7 @@ ProfileServer, MemoryByKey, AggregateAction, - ComputerPerKey, + ComputePerKey, ) from distributed.dashboard import scheduler @@ -741,8 +741,8 @@ @gen_cluster(client=True, scheduler_kwargs={"dashboard": True}) -async def test_computer_per_key(c, s, a, b): - mbk = ComputerPerKey(s) +async def test_compute_per_key(c, s, a, b): + mbk = ComputePerKey(s) da = pytest.importorskip("dask.array") x = (da.ones((20, 20), chunks=(10, 10)) + 1).persist(optimize_graph=False) @@ -759,8 +759,8 @@ ) assert response.code == 200 assert ("sum-aggregate") in mbk.compute_source.data["names"] - assert ("inc") in mbk.compute_source.data["names"] assert ("add") in mbk.compute_source.data["names"] + assert "angles" in mbk.compute_source.data.keys() @gen_cluster(scheduler_kwargs={"http_prefix": "foo-bar", "dashboard": True}) diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/distributed-2.20.0/distributed/deploy/cluster.py new/distributed-2.21.0/distributed/deploy/cluster.py --- old/distributed-2.20.0/distributed/deploy/cluster.py 2020-07-03 06:15:41.000000000 +0200 +++ new/distributed-2.21.0/distributed/deploy/cluster.py 2020-07-10 15:10:16.000000000 +0200 @@ -55,6 +55,7 @@ self._asynchronous = asynchronous self._watch_worker_status_comm = None self._watch_worker_status_task = None + self.scheduler_comm = None self.status = "created" diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/distributed-2.20.0/distributed/deploy/spec.py new/distributed-2.21.0/distributed/deploy/spec.py --- old/distributed-2.20.0/distributed/deploy/spec.py 2020-07-03 06:15:41.000000000 +0200 +++ new/distributed-2.21.0/distributed/deploy/spec.py 2020-07-10 15:10:16.000000000 +0200 @@ -233,7 +233,6 @@ self.workers = {} self._i = 0 self.security = security or Security() - self.scheduler_comm = None self._futures = set() if silence_logs: diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/distributed-2.20.0/distributed/distributed-schema.yaml new/distributed-2.21.0/distributed/distributed-schema.yaml --- old/distributed-2.20.0/distributed/distributed-schema.yaml 2020-06-30 17:19:56.000000000 +0200 +++ new/distributed-2.21.0/distributed/distributed-schema.yaml 2020-07-17 06:40:21.000000000 +0200 @@ -800,3 +800,6 @@ - string - "null" description: Define which Infiniband device to use + reuse-endpoints: + type: boolean + description: Whether to reuse endpoints or not, default True diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/distributed-2.20.0/distributed/distributed.yaml new/distributed-2.21.0/distributed/distributed.yaml --- old/distributed-2.20.0/distributed/distributed.yaml 2020-06-30 17:19:56.000000000 +0200 +++ new/distributed-2.21.0/distributed/distributed.yaml 2020-07-17 06:40:21.000000000 +0200 @@ -179,3 +179,4 @@ rdmacm: null # enable RDMACM cuda_copy: null # enable cuda-copy net-devices: null # define which Infiniband device to use + reuse-endpoints: True # enable endpoint reuse diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/distributed-2.20.0/distributed/protocol/pickle.py new/distributed-2.21.0/distributed/protocol/pickle.py --- old/distributed-2.20.0/distributed/protocol/pickle.py 2020-06-30 17:19:56.000000000 +0200 +++ new/distributed-2.21.0/distributed/protocol/pickle.py 2020-07-17 06:40:21.000000000 +0200 @@ -1,8 +1,16 @@ import logging -import pickle +import sys import cloudpickle +if sys.version_info < (3, 8): + try: + import pickle5 as pickle + except ImportError: + import pickle +else: + import pickle + HIGHEST_PROTOCOL = pickle.HIGHEST_PROTOCOL diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/distributed-2.20.0/distributed/protocol/serialize.py new/distributed-2.21.0/distributed/protocol/serialize.py --- old/distributed-2.20.0/distributed/protocol/serialize.py 2020-06-30 17:19:56.000000000 +0200 +++ new/distributed-2.21.0/distributed/protocol/serialize.py 2020-07-10 15:10:16.000000000 +0200 @@ -564,7 +564,7 @@ # Teach serialize how to handle bytestrings -@dask_serialize.register((bytes, bytearray, memoryview)) +@dask_serialize.register((bytes, bytearray)) def _serialize_bytes(obj): header = {} # no special metadata frames = [obj] @@ -576,13 +576,23 @@ return b"".join(frames) +@dask_serialize.register(memoryview) +def _serialize_memoryview(obj): + if obj.format == "O": + raise ValueError("Cannot serialize `memoryview` containing Python objects") + header = {"format": obj.format, "shape": obj.shape} + frames = [obj] + return header, frames + + @dask_deserialize.register(memoryview) -def _serialize_memoryview(header, frames): +def _deserialize_memoryview(header, frames): if len(frames) == 1: - out = frames[0] + out = memoryview(frames[0]).cast("B") else: - out = b"".join(frames) - return memoryview(out) + out = memoryview(b"".join(frames)) + out = out.cast(header["format"], header["shape"]) + return out ######################### diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/distributed-2.20.0/distributed/protocol/tests/test_pickle.py new/distributed-2.21.0/distributed/protocol/tests/test_pickle.py --- old/distributed-2.20.0/distributed/protocol/tests/test_pickle.py 2020-06-30 17:19:56.000000000 +0200 +++ new/distributed-2.21.0/distributed/protocol/tests/test_pickle.py 2020-07-17 06:40:21.000000000 +0200 @@ -1,7 +1,6 @@ from functools import partial import gc from operator import add -import pickle import weakref import sys @@ -10,6 +9,14 @@ from distributed.protocol import deserialize, serialize from distributed.protocol.pickle import HIGHEST_PROTOCOL, dumps, loads +if sys.version_info < (3, 8): + try: + import pickle5 as pickle + except ImportError: + import pickle +else: + import pickle + def test_pickle_data(): data = [1, b"123", "123", [123], {}, set()] diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/distributed-2.20.0/distributed/protocol/tests/test_serialize.py new/distributed-2.21.0/distributed/protocol/tests/test_serialize.py --- old/distributed-2.20.0/distributed/protocol/tests/test_serialize.py 2020-06-30 17:19:56.000000000 +0200 +++ new/distributed-2.21.0/distributed/protocol/tests/test_serialize.py 2020-07-10 15:10:16.000000000 +0200 @@ -420,6 +420,7 @@ (tuple([MyObj(None)]), True), ({("x", i): MyObj(5) for i in range(100)}, True), (memoryview(b"hello"), True), + (memoryview(np.random.random((3, 4))), True), ], ) def test_check_dask_serializable(data, is_serializable): @@ -441,10 +442,18 @@ assert data_in == data_out -def test_deser_memoryview(): - data_in = memoryview(b"hello") [email protected]( + "data_in", [memoryview(b"hello"), memoryview(np.random.random((3, 4)))], +) +def test_deser_memoryview(data_in): header, frames = serialize(data_in) assert header["type"] == "builtins.memoryview" assert frames[0] is data_in data_out = deserialize(header, frames) assert data_in == data_out + + +def test_ser_memoryview_object(): + data_in = memoryview(np.array(["hello"], dtype=object)) + with pytest.raises(TypeError): + serialize(data_in, on_error="raise") diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/distributed-2.20.0/distributed/protocol/utils.py new/distributed-2.21.0/distributed/protocol/utils.py --- old/distributed-2.20.0/distributed/protocol/utils.py 2020-06-30 17:19:56.000000000 +0200 +++ new/distributed-2.21.0/distributed/protocol/utils.py 2020-07-17 06:40:21.000000000 +0200 @@ -1,7 +1,7 @@ import struct import msgpack -from ..utils import ensure_bytes, nbytes +from ..utils import nbytes BIG_BYTES_SHARD_SIZE = 2 ** 26 @@ -84,7 +84,7 @@ if len(L) == 1: # no work necessary out.extend(L) else: - out.append(b"".join(map(ensure_bytes, L))) + out.append(b"".join(L)) return out diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/distributed-2.20.0/distributed/scheduler.py new/distributed-2.21.0/distributed/scheduler.py --- old/distributed-2.20.0/distributed/scheduler.py 2020-07-03 06:15:57.000000000 +0200 +++ new/distributed-2.21.0/distributed/scheduler.py 2020-07-17 06:40:21.000000000 +0200 @@ -12,11 +12,10 @@ from numbers import Number import operator import os -import pickle +import sys import random import warnings import weakref - import psutil import sortedcontainers @@ -86,6 +85,14 @@ from .stealing import WorkStealing from .variable import VariableExtension +if sys.version_info < (3, 8): + try: + import pickle5 as pickle + except ImportError: + import pickle +else: + import pickle + logger = logging.getLogger(__name__) @@ -1804,8 +1811,7 @@ versions, client_name="This Worker", ) - if version_warning: - msg["warning"] = version_warning + msg.update(version_warning) if comm: await comm.write(msg) @@ -2569,8 +2575,7 @@ {w: ws.versions for w, ws in self.workers.items()}, versions, ) - if version_warning: - msg["warning"] = version_warning + msg.update(version_warning) bcomm.send(msg) try: @@ -3260,6 +3265,10 @@ while tasks: gathers = defaultdict(dict) for ts in list(tasks): + if ts.state == "forgotten": + # task is no longer needed by any client or dependant task + tasks.remove(ts) + continue n_missing = n - len(ts.who_has & workers) if n_missing <= 0: # Already replicated enough diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/distributed-2.20.0/distributed/tests/test_client.py new/distributed-2.21.0/distributed/tests/test_client.py --- old/distributed-2.20.0/distributed/tests/test_client.py 2020-06-30 17:19:57.000000000 +0200 +++ new/distributed-2.21.0/distributed/tests/test_client.py 2020-07-10 15:10:16.000000000 +0200 @@ -512,7 +512,7 @@ x = c.submit(slowinc, 1, delay=0.3) with pytest.raises(TimeoutError): - x.result(timeout=0.01) + x.result(timeout="10 ms") assert x.result() == 2 diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/distributed-2.20.0/distributed/tests/test_scheduler.py new/distributed-2.21.0/distributed/tests/test_scheduler.py --- old/distributed-2.20.0/distributed/tests/test_scheduler.py 2020-07-03 06:15:57.000000000 +0200 +++ new/distributed-2.21.0/distributed/tests/test_scheduler.py 2020-07-17 06:40:21.000000000 +0200 @@ -1,7 +1,6 @@ import asyncio import json import logging -import pickle import operator import re import sys @@ -42,6 +41,14 @@ from distributed.utils_test import loop, nodebug # noqa: F401 from dask.compatibility import apply +if sys.version_info < (3, 8): + try: + import pickle5 as pickle + except ImportError: + import pickle +else: + import pickle + alice = "alice:1234" bob = "bob:1234" @@ -2152,3 +2159,17 @@ @gen_cluster() async def test_unknown_task_duration_config(s, a, b): assert s.idle_since == s.time_started + + +@gen_cluster(client=True, timeout=1000) +async def test_retire_state_change(c, s, a, b): + np = pytest.importorskip("numpy") + y = c.map(lambda x: x ** 2, range(10)) + await c.scatter(y) + for x in range(2): + v = c.map(lambda i: i * np.random.randint(1000), y) + k = c.map(lambda i: i * np.random.randint(1000), v) + foo = c.map(lambda j: j * 6, k) + step = c.compute(foo) + c.gather(step) + await c.retire_workers(workers=[a.address]) diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/distributed-2.20.0/distributed/tests/test_versions.py new/distributed-2.21.0/distributed/tests/test_versions.py --- old/distributed-2.20.0/distributed/tests/test_versions.py 2020-07-03 06:15:41.000000000 +0200 +++ new/distributed-2.21.0/distributed/tests/test_versions.py 2020-07-17 06:40:21.000000000 +0200 @@ -2,10 +2,11 @@ import sys import pytest +from toolz import first from distributed.versions import get_versions, error_message -from distributed import Client, Worker -from distributed.utils_test import gen_cluster +from distributed import Client, Worker, LocalCluster +from distributed.utils_test import gen_cluster, loop # noqa: F401 # if one of the nodes reports this version, there's a mismatch @@ -34,7 +35,7 @@ def test_versions_match(kwargs_matching): - assert error_message(**kwargs_matching) == "" + assert error_message(**kwargs_matching)["warning"] == "" @pytest.fixture(params=["client", "scheduler", "worker-1"]) @@ -80,11 +81,13 @@ column_matching = {"client": 1, "scheduler": 2, "workers": 3} msg = error_message(**kwargs_not_matching) i = column_matching.get(node, 3) - assert "Mismatched versions found" in msg - assert "distributed" in msg + assert "Mismatched versions found" in msg["warning"] + assert "distributed" in msg["warning"] assert ( pattern - in re.search(r"distributed\s+(?:(?:\|[^|\r\n]*)+\|(?:\r?\n|\r)?)+", msg) + in re.search( + r"distributed\s+(?:(?:\|[^|\r\n]*)+\|(?:\r?\n|\r)?)+", msg["warning"] + ) .group(0) .split("|")[i] .strip() @@ -96,24 +99,24 @@ kwargs_matching["scheduler"]["packages"]["numpy"] = "0.0.0" assert "numpy" in kwargs_matching["client"]["packages"] - assert error_message(**kwargs_matching) == "" + assert error_message(**kwargs_matching)["warning"] == "" def test_scheduler_additional_irrelevant_package(kwargs_matching): """An irrelevant package on the scheduler does not need to be present elsewhere.""" kwargs_matching["scheduler"]["packages"]["pyspark"] = "0.0.0" - assert error_message(**kwargs_matching) == "" + assert error_message(**kwargs_matching)["warning"] == "" def test_python_mismatch(kwargs_matching): kwargs_matching["client"]["packages"]["python"] = "0.0.0" msg = error_message(**kwargs_matching) - assert "Mismatched versions found" in msg - assert "python" in msg + assert "Mismatched versions found" in msg["warning"] + assert "python" in msg["warning"] assert ( "0.0.0" - in re.search(r"python\s+(?:(?:\|[^|\r\n]*)+\|(?:\r?\n|\r)?)+", msg) + in re.search(r"python\s+(?:(?:\|[^|\r\n]*)+\|(?:\r?\n|\r)?)+", msg["warning"]) .group(0) .split("|")[1] .strip() @@ -142,3 +145,38 @@ required = get_versions()["packages"] assert "python" in required assert required["python"] == ".".join(map(str, sys.version_info)) + + +def test_python_version_error(loop): + + with LocalCluster(1, processes=False, silence_logs=False, loop=loop,) as cluster: + first(cluster.scheduler.workers.values()).versions["packages"][ + "python" + ] = "3.5.1" + with pytest.raises(ImportError) as info: + with Client(cluster): + pass + + assert "Python" in str(info.value) + assert "major" in str(info.value).lower() + + +def test_lz4_version_error(loop): + + with LocalCluster( + 1, processes=False, silence_logs=False, dashboard_address=None, loop=loop, + ) as cluster: + try: + import lz4 # noqa: F401 + + first(cluster.scheduler.workers.values()).versions["packages"]["lz4"] = None + except ImportError: + first(cluster.scheduler.workers.values()).versions["packages"][ + "lz4" + ] = "1.0.0" + + with pytest.raises(ImportError) as info: + with Client(cluster): + pass + + assert "lz4" in str(info.value) diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/distributed-2.20.0/distributed/tests/test_worker.py new/distributed-2.21.0/distributed/tests/test_worker.py --- old/distributed-2.20.0/distributed/tests/test_worker.py 2020-07-03 06:15:57.000000000 +0200 +++ new/distributed-2.21.0/distributed/tests/test_worker.py 2020-07-17 06:40:21.000000000 +0200 @@ -865,6 +865,15 @@ test_worker_dir() +@gen_cluster(nthreads=[]) +async def test_false_worker_dir(s): + async with Worker(s.address, local_directory="") as w: + local_directory = w.local_directory + + cwd = os.getcwd() + assert os.path.dirname(local_directory) == os.path.join(cwd, "dask-worker-space") + + @gen_cluster(client=True) async def test_dataframe_attribute_error(c, s, a, b): class BadSize: diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/distributed-2.20.0/distributed/versions.py new/distributed-2.21.0/distributed/versions.py --- old/distributed-2.20.0/distributed/versions.py 2020-07-03 06:15:41.000000000 +0200 +++ new/distributed-2.21.0/distributed/versions.py 2020-07-17 06:40:21.000000000 +0200 @@ -34,8 +34,6 @@ # notes to be displayed for mismatch packages notes_mismatch_package = { "msgpack": "Variation is ok, as long as everything is above 0.6", - "lz4": "Variation is ok, but missing libraries are not", - "python": "Variation is sometimes ok, sometimes not. It depends on your workloads", } @@ -157,14 +155,40 @@ if pkg in notes_mismatch_package.keys(): notes.append(f"- {pkg}: {notes_mismatch_package[pkg]}") + out = {"warning": "", "error": ""} + if errs: err_table = asciitable(["Package", client_name, "scheduler", "workers"], errs) err_msg = f"Mismatched versions found\n\n{err_table}" if notes: err_msg += "\nNotes: \n{}".format("\n".join(notes)) - return err_msg - else: - return "" + out["warning"] += err_msg + + for name, c, s, ws in errs: + if not isinstance(ws, set): + ws = {ws} + + if name == "python": + majors = [tuple(version.split(".")[:2]) for version in {c, s} | ws] + if len(set(majors)) != 1: + err_table = asciitable( + ["Package", client_name, "scheduler", "workers"], + [t for t in errs if t[0] == "python"], + ) + out["error"] += f"Python major versions must match\n\n{err_table}\n" + + if name == "lz4": + versions = [version for version in {c, s} | ws] + if any(versions) and not all(versions): + err_table = asciitable( + ["Package", client_name, "scheduler", "workers"], + [t for t in errs if t[0] == "lz4"], + ) + out[ + "error" + ] += f"\nLZ4 must be installed everywhere or nowhere\n\n{err_table}\n" + + return out class VersionMismatchWarning(Warning): diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/distributed-2.20.0/distributed/worker.py new/distributed-2.21.0/distributed/worker.py --- old/distributed-2.20.0/distributed/worker.py 2020-07-03 06:15:57.000000000 +0200 +++ new/distributed-2.21.0/distributed/worker.py 2020-07-17 06:40:21.000000000 +0200 @@ -482,7 +482,7 @@ warnings.warn("The local_dir keyword has moved to local_directory") local_directory = local_dir - if local_directory is None: + if not local_directory: local_directory = dask.config.get("temporary-directory") or os.getcwd() if not os.path.exists(local_directory): diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/distributed-2.20.0/distributed.egg-info/PKG-INFO new/distributed-2.21.0/distributed.egg-info/PKG-INFO --- old/distributed-2.20.0/distributed.egg-info/PKG-INFO 2020-07-03 06:24:10.000000000 +0200 +++ new/distributed-2.21.0/distributed.egg-info/PKG-INFO 2020-07-18 00:18:00.000000000 +0200 @@ -1,6 +1,6 @@ Metadata-Version: 1.2 Name: distributed -Version: 2.20.0 +Version: 2.21.0 Summary: Distributed scheduler for Dask Home-page: https://distributed.dask.org Maintainer: Matthew Rocklin diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/distributed-2.20.0/distributed.egg-info/requires.txt new/distributed-2.21.0/distributed.egg-info/requires.txt --- old/distributed-2.20.0/distributed.egg-info/requires.txt 2020-07-03 06:24:10.000000000 +0200 +++ new/distributed-2.21.0/distributed.egg-info/requires.txt 2020-07-18 00:18:00.000000000 +0200 @@ -1,5 +1,5 @@ click>=6.6 -cloudpickle>=1.3.0 +cloudpickle>=1.5.0 dask>=2.9.0 msgpack>=0.6.0 psutil>=5.0 diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/distributed-2.20.0/docs/source/changelog.rst new/distributed-2.21.0/docs/source/changelog.rst --- old/distributed-2.20.0/docs/source/changelog.rst 2020-07-03 06:21:45.000000000 +0200 +++ new/distributed-2.21.0/docs/source/changelog.rst 2020-07-18 00:16:01.000000000 +0200 @@ -1,6 +1,25 @@ Changelog ========= +2.21.0 - 2020-07-17 +------------------- + +- Fix data replication error (:pr:`3963`) `Andrew Fulton`_ +- Treat falsey local directory as ``None`` (:pr:`3964`) `Tom Augspurger`_ +- Unpin ``numpydoc`` now that 1.1 is released (:pr:`3957`) `Gil Forsyth`_ +- Error hard when Dask has mismatched versions or lz4 installed (:pr:`3936`) `Matthew Rocklin`_ +- Skip coercing to ``bytes`` in ``merge_frames`` (:pr:`3960`) `jakirkham`_ +- UCX: reuse endpoints in order to fix NVLINK issue (:pr:`3953`) `Mads R. B. Kristensen`_ +- Optionally use ``pickle5`` (:pr:`3849`) `jakirkham`_ +- Update time per task chart with filtering and pie (:pr:`3933`) `Benjamin Zaitlen`_ +- UCX: explicit shutdown message (:pr:`3950`) `Mads R. B. Kristensen`_ +- Avoid too aggressive retry of connections (:pr:`3944`) `Matthias Bussonnier`_ +- Parse timeouts in ``Client.sync`` (:pr:`3952`) `Matthew Rocklin`_ +- Synchronize on non-trivial CUDA frame transmission (:pr:`3949`) `jakirkham`_ +- Serialize ``memoryview`` with ``shape`` and ``format`` (:pr:`3947`) `jakirkham`_ +- Move ``scheduler_comm`` into ``Cluster.__init__`` (:pr:`3945`) `Matthew Rocklin`_ + + 2.20.0 - 2020-07-02 ------------------- @@ -1844,3 +1863,4 @@ .. _`Julien Jerphanion`: https://github.com/jjerphan .. _`joshreback`: https://github.com/joshreback .. _`Alexander Clausen`: https://github.com/sk1p +.. _`Andrew Fulton`: https://github.com/andrewfulton9 diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/distributed-2.20.0/requirements.txt new/distributed-2.21.0/requirements.txt --- old/distributed-2.20.0/requirements.txt 2020-06-30 17:19:57.000000000 +0200 +++ new/distributed-2.21.0/requirements.txt 2020-07-17 06:40:21.000000000 +0200 @@ -1,5 +1,5 @@ click >= 6.6 -cloudpickle >= 1.3.0 +cloudpickle >= 1.5.0 contextvars;python_version<'3.7' dask >= 2.9.0 msgpack >= 0.6.0
