Hello community, here is the log from the commit of package python-distributed for openSUSE:Factory checked in at 2019-02-25 17:50:30 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ Comparing /work/SRC/openSUSE:Factory/python-distributed (Old) and /work/SRC/openSUSE:Factory/.python-distributed.new.28833 (New) ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Package is "python-distributed" Mon Feb 25 17:50:30 2019 rev:15 rq:670743 version:1.25.3 Changes: -------- --- /work/SRC/openSUSE:Factory/python-distributed/python-distributed.changes 2019-01-08 12:31:21.972083379 +0100 +++ /work/SRC/openSUSE:Factory/.python-distributed.new.28833/python-distributed.changes 2019-02-25 17:50:33.850779045 +0100 @@ -1,0 +2,29 @@ +Sat Feb 2 17:10:13 UTC 2019 - Arun Persaud <a...@gmx.de> + +- update to version 1.25.3: + * Fix excess threading on missing connections (:pr:`2403`) Daniel + Farrell + * Fix typo in doc (:pr:`2457`) Loïc Estève + * Start fewer but larger workers with LocalCluster (:pr:`2452`) + Matthew Rocklin + * Check for non-zero length first in read loop (:pr:`2465`) John + Kirkham + * DOC: Use of local cluster in script (:pr:`2462`) Peter Killick + * DOC/API: Signature for base class write / read (:pr:`2472`) Tom + Augspurger + * Support Pytest 4 in Tests (:pr:`2478`) Adam Beberg + * Ensure async behavior in event loop with LocalCluster (:pr:`2484`) + Matthew Rocklin + * Fix spurious CancelledError (:pr:`2485`) Loïc Estève + * Properly reset dask.config scheduler and shuffle when closing the + client (:pr:`2475`) George Sakkis + * Make it more explict that resources are per worker. (:pr:`2470`) + Loïc Estève + * Remove references to center (:pr:`2488`) Matthew Rocklin + * Expand client clearing timeout to 10s in testing (:pr:`2493`) + Matthew Rocklin + * Propagate key keyword in progressbar (:pr:`2492`) Matthew Rocklin + * Use provided cluster's IOLoop if present in Client (:pr:`2494`) + Matthew Rocklin + +------------------------------------------------------------------- Old: ---- distributed-1.25.2.tar.gz New: ---- distributed-1.25.3.tar.gz ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ Other differences: ------------------ ++++++ python-distributed.spec ++++++ --- /var/tmp/diff_new_pack.DnnGMM/_old 2019-02-25 17:50:34.358778768 +0100 +++ /var/tmp/diff_new_pack.DnnGMM/_new 2019-02-25 17:50:34.362778766 +0100 @@ -20,7 +20,7 @@ # Test requires network connection %bcond_with test Name: python-distributed -Version: 1.25.2 +Version: 1.25.3 Release: 0 Summary: Library for distributed computing with Python License: BSD-3-Clause ++++++ distributed-1.25.2.tar.gz -> distributed-1.25.3.tar.gz ++++++ diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/distributed-1.25.2/PKG-INFO new/distributed-1.25.3/PKG-INFO --- old/distributed-1.25.2/PKG-INFO 2019-01-04 23:14:54.000000000 +0100 +++ new/distributed-1.25.3/PKG-INFO 2019-01-31 21:17:27.000000000 +0100 @@ -1,6 +1,6 @@ Metadata-Version: 1.2 Name: distributed -Version: 1.25.2 +Version: 1.25.3 Summary: Distributed scheduler for Dask Home-page: https://distributed.readthedocs.io/en/latest/ Maintainer: Matthew Rocklin diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/distributed-1.25.2/distributed/_version.py new/distributed-1.25.3/distributed/_version.py --- old/distributed-1.25.2/distributed/_version.py 2019-01-04 23:14:54.000000000 +0100 +++ new/distributed-1.25.3/distributed/_version.py 2019-01-31 21:17:27.000000000 +0100 @@ -8,11 +8,11 @@ version_json = ''' { - "date": "2019-01-04T14:14:00-0800", + "date": "2019-01-31T12:16:55-0800", "dirty": false, "error": null, - "full-revisionid": "4e38022ed91b7d90ffe54703e9975d94a37fb9c3", - "version": "1.25.2" + "full-revisionid": "f7abbd68b824dc03c8535b57f9e914bddd1d447c", + "version": "1.25.3" } ''' # END VERSION_JSON diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/distributed-1.25.2/distributed/bokeh/components.py new/distributed-1.25.3/distributed/bokeh/components.py --- old/distributed-1.25.2/distributed/bokeh/components.py 2019-01-03 18:30:24.000000000 +0100 +++ new/distributed-1.25.3/distributed/bokeh/components.py 2019-01-31 19:43:38.000000000 +0100 @@ -311,7 +311,7 @@ state = profile.create() data = profile.plot_data(state, profile_interval) self.states = data.pop('states') - self.source = ColumnDataSource(data=data) + self.root, self.source = profile.plot_figure(data, **kwargs) @without_property_validation def cb(attr, old, new): @@ -335,45 +335,6 @@ else: self.source.on_change('selected', cb) - self.root = figure(tools='tap', **kwargs) - self.root.quad('left', 'right', 'top', 'bottom', color='color', - line_color='black', line_width=2, source=self.source) - - hover = HoverTool( - point_policy="follow_mouse", - tooltips=""" - <div> - <span style="font-size: 14px; font-weight: bold;">Name:</span> - <span style="font-size: 10px; font-family: Monaco, monospace;">@name</span> - </div> - <div> - <span style="font-size: 14px; font-weight: bold;">Filename:</span> - <span style="font-size: 10px; font-family: Monaco, monospace;">@filename</span> - </div> - <div> - <span style="font-size: 14px; font-weight: bold;">Line number:</span> - <span style="font-size: 10px; font-family: Monaco, monospace;">@line_number</span> - </div> - <div> - <span style="font-size: 14px; font-weight: bold;">Line:</span> - <span style="font-size: 10px; font-family: Monaco, monospace;">@line</span> - </div> - <div> - <span style="font-size: 14px; font-weight: bold;">Time:</span> - <span style="font-size: 10px; font-family: Monaco, monospace;">@time</span> - </div> - <div> - <span style="font-size: 14px; font-weight: bold;">Percentage:</span> - <span style="font-size: 10px; font-family: Monaco, monospace;">@width</span> - </div> - """ - ) - self.root.add_tools(hover) - - self.root.xaxis.visible = False - self.root.yaxis.visible = False - self.root.grid.visible = False - @without_property_validation def update(self, state): with log_errors(): @@ -412,7 +373,7 @@ self.state = profile.create() data = profile.plot_data(self.state, profile_interval) self.states = data.pop('states') - self.source = ColumnDataSource(data=data) + self.profile_plot, self.source = profile.plot_figure(data, **kwargs) changing = [False] # avoid repeated changes from within callback @@ -445,47 +406,6 @@ else: self.source.on_change('selected', cb) - self.profile_plot = figure(tools='tap', height=400, **kwargs) - r = self.profile_plot.quad('left', 'right', 'top', 'bottom', color='color', - line_color='black', source=self.source) - r.selection_glyph = None - r.nonselection_glyph = None - - hover = HoverTool( - point_policy="follow_mouse", - tooltips=""" - <div> - <span style="font-size: 14px; font-weight: bold;">Name:</span> - <span style="font-size: 10px; font-family: Monaco, monospace;">@name</span> - </div> - <div> - <span style="font-size: 14px; font-weight: bold;">Filename:</span> - <span style="font-size: 10px; font-family: Monaco, monospace;">@filename</span> - </div> - <div> - <span style="font-size: 14px; font-weight: bold;">Line number:</span> - <span style="font-size: 10px; font-family: Monaco, monospace;">@line_number</span> - </div> - <div> - <span style="font-size: 14px; font-weight: bold;">Line:</span> - <span style="font-size: 10px; font-family: Monaco, monospace;">@line</span> - </div> - <div> - <span style="font-size: 14px; font-weight: bold;">Time:</span> - <span style="font-size: 10px; font-family: Monaco, monospace;">@time</span> - </div> - <div> - <span style="font-size: 14px; font-weight: bold;">Percentage:</span> - <span style="font-size: 10px; font-family: Monaco, monospace;">@percentage</span> - </div> - """ - ) - self.profile_plot.add_tools(hover) - - self.profile_plot.xaxis.visible = False - self.profile_plot.yaxis.visible = False - self.profile_plot.grid.visible = False - self.ts_source = ColumnDataSource({'time': [], 'count': []}) self.ts_plot = figure(title='Activity over time', height=100, x_axis_type='datetime', active_drag='xbox_select', @@ -591,7 +511,7 @@ self.state = profile.get_profile(self.log) data = profile.plot_data(self.state, profile_interval) self.states = data.pop('states') - self.source = ColumnDataSource(data=data) + self.profile_plot, self.source = profile.plot_figure(data, **kwargs) changing = [False] # avoid repeated changes from within callback @@ -624,47 +544,6 @@ else: self.source.on_change('selected', cb) - self.profile_plot = figure(tools='tap', height=400, **kwargs) - r = self.profile_plot.quad('left', 'right', 'top', 'bottom', color='color', - line_color='black', source=self.source) - r.selection_glyph = None - r.nonselection_glyph = None - - hover = HoverTool( - point_policy="follow_mouse", - tooltips=""" - <div> - <span style="font-size: 14px; font-weight: bold;">Name:</span> - <span style="font-size: 10px; font-family: Monaco, monospace;">@name</span> - </div> - <div> - <span style="font-size: 14px; font-weight: bold;">Filename:</span> - <span style="font-size: 10px; font-family: Monaco, monospace;">@filename</span> - </div> - <div> - <span style="font-size: 14px; font-weight: bold;">Line number:</span> - <span style="font-size: 10px; font-family: Monaco, monospace;">@line_number</span> - </div> - <div> - <span style="font-size: 14px; font-weight: bold;">Line:</span> - <span style="font-size: 10px; font-family: Monaco, monospace;">@line</span> - </div> - <div> - <span style="font-size: 14px; font-weight: bold;">Time:</span> - <span style="font-size: 10px; font-family: Monaco, monospace;">@time</span> - </div> - <div> - <span style="font-size: 14px; font-weight: bold;">Percentage:</span> - <span style="font-size: 10px; font-family: Monaco, monospace;">@percentage</span> - </div> - """ - ) - self.profile_plot.add_tools(hover) - - self.profile_plot.xaxis.visible = False - self.profile_plot.yaxis.visible = False - self.profile_plot.grid.visible = False - self.ts_source = ColumnDataSource({'time': [], 'count': []}) self.ts_plot = figure(title='Activity over time', height=100, x_axis_type='datetime', active_drag='xbox_select', diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/distributed-1.25.2/distributed/bokeh/scheduler_html.py new/distributed-1.25.3/distributed/bokeh/scheduler_html.py --- old/distributed-1.25.2/distributed/bokeh/scheduler_html.py 2019-01-03 18:30:24.000000000 +0100 +++ new/distributed-1.25.3/distributed/bokeh/scheduler_html.py 2019-01-31 19:43:38.000000000 +0100 @@ -27,6 +27,7 @@ with log_errors(): self.render('workers.html', title='Workers', + scheduler=self.server, **toolz.merge(self.server.__dict__, ns, self.extra)) diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/distributed-1.25.2/distributed/bokeh/templates/workers.html new/distributed-1.25.3/distributed/bokeh/templates/workers.html --- old/distributed-1.25.2/distributed/bokeh/templates/workers.html 2019-01-03 18:30:24.000000000 +0100 +++ new/distributed-1.25.3/distributed/bokeh/templates/workers.html 2019-01-31 19:43:38.000000000 +0100 @@ -1,6 +1,8 @@ {% extends main.html %} {% block content %} + <h1 class="title"> Scheduler {{scheduler.address}} </h1> + <a class="button is-primary" href="logs.html">Logs</a> <a class="button is-primary" href="../../status">Bokeh</a> diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/distributed-1.25.2/distributed/cli/dask_worker.py new/distributed-1.25.3/distributed/cli/dask_worker.py --- old/distributed-1.25.2/distributed/cli/dask_worker.py 2019-01-03 18:30:24.000000000 +0100 +++ new/distributed-1.25.3/distributed/cli/dask_worker.py 2019-01-30 00:39:16.000000000 +0100 @@ -82,7 +82,9 @@ @click.option('--local-directory', default='', type=str, help="Directory to place worker files") @click.option('--resources', type=str, default='', - help='Resources for task constraints like "GPU=2 MEM=10e9"') + help='Resources for task constraints like "GPU=2 MEM=10e9". ' + 'Resources are applied separately to each worker process ' + "(only relevant when starting multiple worker processes with '--nprocs').") @click.option('--scheduler-file', type=str, default='', help='Filename to JSON encoded scheduler information. ' 'Use with dask-scheduler --scheduler-file') diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/distributed-1.25.2/distributed/client.py new/distributed-1.25.3/distributed/client.py --- old/distributed-1.25.2/distributed/client.py 2019-01-03 18:30:24.000000000 +0100 +++ new/distributed-1.25.3/distributed/client.py 2019-01-31 19:43:38.000000000 +0100 @@ -572,6 +572,20 @@ else: self.connection_args = self.security.get_connection_args('client') + if address is None: + address = dask.config.get('scheduler-address', None) + if address: + logger.info("Config value `scheduler-address` found: %s", + address) + + if isinstance(address, (rpc, PooledRPCCall)): + self.scheduler = address + elif hasattr(address, "scheduler_address"): + # It's a LocalCluster or LocalCluster-compatible object + self.cluster = address + with ignoring(AttributeError): + loop = address.loop + self._connecting_to_scheduler = False self._asynchronous = asynchronous self._should_close_loop = not loop @@ -592,25 +606,10 @@ io_loop=self.loop ) - if address is None: - address = dask.config.get('scheduler-address', None) - if address: - logger.info("Config value `scheduler-address` found: %s", - address) - - if isinstance(address, (rpc, PooledRPCCall)): - self.scheduler = address - elif hasattr(address, "scheduler_address"): - # It's a LocalCluster or LocalCluster-compatible object - self.cluster = address - self._start_arg = address if set_as_default: - self._previous_scheduler = dask.config.get('scheduler', None) - dask.config.set(scheduler='dask.distributed') - - self._previous_shuffle = dask.config.get('shuffle', None) - dask.config.set(shuffle='tasks') + self._set_config = dask.config.set(scheduler='dask.distributed', + shuffle='tasks') self._stream_handlers = { 'key-in-memory': self._handle_key_in_memory, @@ -1074,9 +1073,9 @@ pc.stop() self._scheduler_identity = {} with ignoring(AttributeError): - dask.config.set(scheduler=self._previous_scheduler) - with ignoring(AttributeError): - dask.config.set(shuffle=self._previous_shuffle) + # clear the dask.config set keys + with self._set_config: + pass if self.get == dask.config.get('get', None): del dask.config.config['get'] if self.status == 'closed': @@ -1090,7 +1089,8 @@ # This makes the shutdown slightly smoother and quieter with ignoring(AttributeError, gen.TimeoutError): yield gen.with_timeout(timedelta(milliseconds=100), - self._handle_scheduler_coroutine) + self._handle_scheduler_coroutine, + quiet_exceptions=(CancelledError,)) if self.scheduler_comm and self.scheduler_comm.comm and not self.scheduler_comm.comm.closed(): yield self.scheduler_comm.close() @@ -1159,13 +1159,6 @@ if self._should_close_loop and not shutting_down(): self._loop_runner.stop() - with ignoring(AttributeError): - dask.config.set(scheduler=self._previous_scheduler) - with ignoring(AttributeError): - dask.config.set(shuffle=self._previous_shuffle) - if self.get == dask.config.get('get', None): - del dask.config.config['get'] - def shutdown(self, *args, **kwargs): """ Deprecated, see close instead @@ -2984,7 +2977,8 @@ keys += list(map(tokey, {f.key for f in futures})) return self.sync(self.scheduler.call_stack, keys=keys or None) - def profile(self, key=None, start=None, stop=None, workers=None, merge_workers=True): + def profile(self, key=None, start=None, stop=None, workers=None, + merge_workers=True, plot=False, filename=None): """ Collect statistical profiling information about recent work Parameters @@ -2996,16 +2990,49 @@ stop: time workers: list List of workers to restrict profile information + plot: boolean or string + Whether or not to return a plot object + filename: str + Filename to save the plot Examples -------- >>> client.profile() # call on collections + >>> client.profile(filename='dask-profile.html') # save to html file """ if isinstance(workers, six.string_types + (Number,)): workers = [workers] - return self.sync(self.scheduler.profile, key=key, workers=workers, - merge_workers=merge_workers, start=start, stop=stop) + return self.sync(self._profile, key=key, workers=workers, + merge_workers=merge_workers, start=start, stop=stop, + plot=plot, filename=filename) + + @gen.coroutine + def _profile(self, key=None, start=None, stop=None, workers=None, + merge_workers=True, plot=False, filename=None): + if isinstance(workers, six.string_types + (Number,)): + workers = [workers] + + state = yield self.scheduler.profile(key=key, workers=workers, + merge_workers=merge_workers, start=start, stop=stop) + + if filename: + plot = True + + if plot: + from . import profile + data = profile.plot_data(state) + figure, source = profile.plot_figure(data, sizing_mode='stretch_both') + + if plot == 'save' and not filename: + filename = 'dask-profile.html' + + from bokeh.plotting import save + save(figure, title='Dask Profile', filename=filename) + raise gen.Return((state, figure)) + + else: + raise gen.Return(state) def scheduler_info(self, **kwargs): """ Basic information about the workers in the cluster diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/distributed-1.25.2/distributed/comm/core.py new/distributed-1.25.3/distributed/comm/core.py --- old/distributed-1.25.2/distributed/comm/core.py 2019-01-03 18:30:24.000000000 +0100 +++ new/distributed-1.25.3/distributed/comm/core.py 2019-01-28 19:00:36.000000000 +0100 @@ -40,19 +40,32 @@ # XXX add set_close_callback()? @abstractmethod - def read(self): + def read(self, deserializers=None): """ Read and return a message (a Python object). This method is a coroutine. + + Parameters + ---------- + deserializers : Optional[Dict[str, Tuple[Callable, Callable, bool]]] + An optional dict appropriate for distributed.protocol.deserialize. + See :ref:`serialization` for more. """ @abstractmethod - def write(self, msg): + def write(self, msg, on_error=None): """ Write a message (a Python object). This method is a coroutine. + + Parameters + ---------- + msg : + on_error : Optional[str] + The behavior when serialization fails. See + ``distributed.protocol.core.dumps`` for valid values. """ @abstractmethod @@ -181,6 +194,7 @@ % (addr, timeout, error)) raise IOError(msg) + # This starts a thread while True: try: future = connector.connect(loc, deserialize=deserialize, diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/distributed-1.25.2/distributed/comm/tcp.py new/distributed-1.25.3/distributed/comm/tcp.py --- old/distributed-1.25.2/distributed/comm/tcp.py 2019-01-03 18:30:24.000000000 +0100 +++ new/distributed-1.25.3/distributed/comm/tcp.py 2019-01-19 18:17:58.000000000 +0100 @@ -11,6 +11,8 @@ except ImportError: ssl = None +from concurrent.futures import ThreadPoolExecutor + import dask import tornado from tornado import gen, netutil @@ -184,16 +186,15 @@ frames = [] for length in lengths: - if PY3 and self._iostream_has_read_into: - frame = bytearray(length) - if length: + if length: + if PY3 and self._iostream_has_read_into: + frame = bytearray(length) n = yield stream.read_into(frame) assert n == length, (n, length) - else: - if length: - frame = yield stream.read_bytes(length) else: - frame = b'' + frame = yield stream.read_bytes(length) + else: + frame = b'' frames.append(frame) except StreamClosedError as e: self.stream = None @@ -320,6 +321,13 @@ class BaseTCPConnector(Connector, RequireEncryptionMixin): + if PY3: # see github PR #2403 discussion for more info + _executor = ThreadPoolExecutor(2) + _resolver = netutil.ExecutorResolver(close_executor=False, + executor=_executor) + else: + _resolver = None + client = TCPClient(resolver=_resolver) @gen.coroutine def connect(self, address, deserialize=True, **connection_args): @@ -327,11 +335,11 @@ ip, port = parse_host_port(address) kwargs = self._get_connect_args(**connection_args) - client = TCPClient() try: - stream = yield client.connect(ip, port, + stream = yield BaseTCPConnector.client.connect(ip, port, max_buffer_size=MAX_BUFFER_SIZE, **kwargs) + # Under certain circumstances tornado will have a closed connnection with an error and not raise # a StreamClosedError. # diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/distributed-1.25.2/distributed/comm/tests/test_comms.py new/distributed-1.25.3/distributed/comm/tests/test_comms.py --- old/distributed-1.25.2/distributed/comm/tests/test_comms.py 2019-01-03 18:30:24.000000000 +0100 +++ new/distributed-1.25.3/distributed/comm/tests/test_comms.py 2019-01-09 18:45:51.000000000 +0100 @@ -11,6 +11,7 @@ from tornado import gen, ioloop, locks, queues from tornado.concurrent import Future +from distributed.compatibility import PY3 from distributed.metrics import time from distributed.utils import get_ip, get_ipv6 from distributed.utils_test import (gen_test, requires_ipv6, has_ipv6, @@ -289,6 +290,46 @@ assert set(l) == {1234} | set(range(N)) +@gen_test() +def test_comm_failure_threading(): + """ + When we fail to connect, make sure we don't make a lot + of threads. + + We only assert for PY3, because the thread limit only is + set for python 3. See github PR #2403 discussion for info. + """ + + @gen.coroutine + def sleep_for_60ms(): + max_thread_count = 0 + for x in range(60): + yield gen.sleep(0.001) + thread_count = threading.active_count() + if thread_count > max_thread_count: + max_thread_count = thread_count + raise gen.Return(max_thread_count) + original_thread_count = threading.active_count() + + # tcp.TCPConnector() + sleep_future = sleep_for_60ms() + with pytest.raises(IOError): + yield connect("tcp://localhost:28400", 0.052) + max_thread_count = yield sleep_future + # 2 is the number set by BaseTCPConnector.executor (ThreadPoolExecutor) + if PY3: + assert max_thread_count <= 2 + original_thread_count + + # tcp.TLSConnector() + sleep_future = sleep_for_60ms() + with pytest.raises(IOError): + yield connect("tls://localhost:28400", 0.052, + connection_args={'ssl_context': get_client_ssl_context()}) + max_thread_count = yield sleep_future + if PY3: + assert max_thread_count <= 2 + original_thread_count + + @gen.coroutine def check_inproc_specific(run_client): """ diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/distributed-1.25.2/distributed/deploy/local.py new/distributed-1.25.3/distributed/deploy/local.py --- old/distributed-1.25.2/distributed/deploy/local.py 2019-01-03 18:30:24.000000000 +0100 +++ new/distributed-1.25.3/distributed/deploy/local.py 2019-01-31 19:43:36.000000000 +0100 @@ -8,9 +8,11 @@ import weakref import toolz +from dask.utils import factors from tornado import gen from .cluster import Cluster +from ..compatibility import get_thread_identity from ..core import CommClosedError from ..utils import (sync, ignoring, All, silence_logging, LoopRunner, log_errors, thread_state, parse_timedelta) @@ -99,8 +101,7 @@ self._old_logging_level = silence_logging(level=silence_logs) if n_workers is None and threads_per_worker is None: if processes: - n_workers = _ncores - threads_per_worker = 1 + n_workers, threads_per_worker = nprocesses_nthreads(_ncores) else: n_workers = 1 threads_per_worker = _ncores @@ -153,7 +154,11 @@ @property def asynchronous(self): - return self._asynchronous or getattr(thread_state, 'asynchronous', False) + return ( + self._asynchronous or + getattr(thread_state, 'asynchronous', False) or + hasattr(self.loop, '_thread_identity') and self.loop._thread_identity == get_thread_identity() + ) def sync(self, func, *args, **kwargs): if kwargs.pop('asynchronous', None) or self.asynchronous: @@ -372,6 +377,34 @@ return '<unstarted>' +def nprocesses_nthreads(n): + """ + The default breakdown of processes and threads for a given number of cores + + Parameters + ---------- + n: int + Number of available cores + + Examples + -------- + >>> nprocesses_nthreads(4) + (4, 1) + >>> nprocesses_nthreads(32) + (8, 4) + + Returns + ------- + nprocesses, nthreads + """ + if n <= 4: + processes = n + else: + processes = min(f for f in factors(n) if f >= math.sqrt(n)) + threads = n // processes + return (processes, threads) + + clusters_to_close = weakref.WeakSet() diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/distributed-1.25.2/distributed/deploy/tests/test_local.py new/distributed-1.25.3/distributed/deploy/tests/test_local.py --- old/distributed-1.25.2/distributed/deploy/tests/test_local.py 2019-01-03 18:30:24.000000000 +0100 +++ new/distributed-1.25.3/distributed/deploy/tests/test_local.py 2019-01-31 19:43:36.000000000 +0100 @@ -14,7 +14,7 @@ import pytest from distributed import Client, Worker, Nanny -from distributed.deploy.local import LocalCluster +from distributed.deploy.local import LocalCluster, nprocesses_nthreads from distributed.metrics import time from distributed.utils_test import (inc, gen_test, slowinc, assert_cannot_connect, @@ -32,12 +32,14 @@ def test_simple(loop): with LocalCluster(4, scheduler_port=0, processes=False, silence_logs=False, diagnostics_port=None, loop=loop) as c: - with Client(c.scheduler_address, loop=loop) as e: + with Client(c) as e: x = e.submit(inc, 1) x.result() assert x.key in c.scheduler.tasks assert any(w.data == {x.key: 2} for w in c.workers) + assert e.loop is c.loop + @pytest.mark.skipif('sys.version_info[0] == 2', reason='fork issues') def test_close_twice(): @@ -131,7 +133,7 @@ def test_Client_with_local(loop): with LocalCluster(1, scheduler_port=0, silence_logs=False, diagnostics_port=None, loop=loop) as c: - with Client(c, loop=loop) as e: + with Client(c) as e: assert len(e.ncores()) == len(c.workers) assert c.scheduler_address in repr(c) @@ -276,7 +278,7 @@ cluster = yield LocalCluster(0, scheduler_port=0, processes=False, silence_logs=False, diagnostics_port=None, loop=loop, asynchronous=True) - c = yield Client(cluster, loop=loop, asynchronous=True) + c = yield Client(cluster, asynchronous=True) assert not cluster.workers @@ -492,7 +494,7 @@ cluster = yield MyCluster(0, scheduler_port=0, processes=False, silence_logs=False, diagnostics_port=None, loop=loop, asynchronous=True) - c = yield Client(cluster, loop=loop, asynchronous=True) + c = yield Client(cluster, asynchronous=True) assert not cluster.workers @@ -529,5 +531,29 @@ assert workers_before != workers_after +def test_default_process_thread_breakdown(): + assert nprocesses_nthreads(1) == (1, 1) + assert nprocesses_nthreads(4) == (4, 1) + assert nprocesses_nthreads(5) == (5, 1) + assert nprocesses_nthreads(8) == (4, 2) + assert nprocesses_nthreads(12) in ((6, 2), (4, 3)) + assert nprocesses_nthreads(20) == (5, 4) + assert nprocesses_nthreads(24) in ((6, 4), (8, 3)) + assert nprocesses_nthreads(32) == (8, 4) + assert nprocesses_nthreads(40) in ((8, 5), (10, 4)) + assert nprocesses_nthreads(80) in ((10, 8), (16, 5)) + + +def test_asynchronous_property(loop): + with LocalCluster(4, scheduler_port=0, processes=False, silence_logs=False, + diagnostics_port=None, loop=loop) as cluster: + + @gen.coroutine + def _(): + assert cluster.asynchronous + + cluster.sync(_) + + if sys.version_info >= (3, 5): from distributed.deploy.tests.py3_test_deploy import * # noqa F401 diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/distributed-1.25.2/distributed/diagnostics/progress.py new/distributed-1.25.3/distributed/diagnostics/progress.py --- old/distributed-1.25.2/distributed/diagnostics/progress.py 2019-01-03 18:30:24.000000000 +0100 +++ new/distributed-1.25.3/distributed/diagnostics/progress.py 2019-01-31 19:43:36.000000000 +0100 @@ -116,7 +116,7 @@ if key in self.keys and finish == 'forgotten': logger.debug("A task was cancelled (%s), stopping progress", key) - self.stop(exception=True) + self.stop(exception=True, key=key) def restart(self, scheduler): self.stop() diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/distributed-1.25.2/distributed/profile.py new/distributed-1.25.3/distributed/profile.py --- old/distributed-1.25.2/distributed/profile.py 2019-01-03 18:30:24.000000000 +0100 +++ new/distributed-1.25.3/distributed/profile.py 2019-01-31 19:43:38.000000000 +0100 @@ -24,6 +24,8 @@ 'children': {...}}} } """ +from __future__ import print_function, division, absolute_import + import bisect from collections import defaultdict, deque import linecache @@ -299,3 +301,57 @@ prof = merge(prof, recent) return prof + + +def plot_figure(data, **kwargs): + from bokeh.plotting import ColumnDataSource, figure + from bokeh.models import HoverTool + + if 'states' in data: + data = toolz.dissoc(data, 'states') + + source = ColumnDataSource(data=data) + + fig = figure(tools='tap', **kwargs) + r = fig.quad('left', 'right', 'top', 'bottom', color='color', + line_color='black', line_width=2, source=source) + + r.selection_glyph = None + r.nonselection_glyph = None + + hover = HoverTool( + point_policy="follow_mouse", + tooltips=""" + <div> + <span style="font-size: 14px; font-weight: bold;">Name:</span> + <span style="font-size: 10px; font-family: Monaco, monospace;">@name</span> + </div> + <div> + <span style="font-size: 14px; font-weight: bold;">Filename:</span> + <span style="font-size: 10px; font-family: Monaco, monospace;">@filename</span> + </div> + <div> + <span style="font-size: 14px; font-weight: bold;">Line number:</span> + <span style="font-size: 10px; font-family: Monaco, monospace;">@line_number</span> + </div> + <div> + <span style="font-size: 14px; font-weight: bold;">Line:</span> + <span style="font-size: 10px; font-family: Monaco, monospace;">@line</span> + </div> + <div> + <span style="font-size: 14px; font-weight: bold;">Time:</span> + <span style="font-size: 10px; font-family: Monaco, monospace;">@time</span> + </div> + <div> + <span style="font-size: 14px; font-weight: bold;">Percentage:</span> + <span style="font-size: 10px; font-family: Monaco, monospace;">@width</span> + </div> + """ + ) + fig.add_tools(hover) + + fig.xaxis.visible = False + fig.yaxis.visible = False + fig.grid.visible = False + + return fig, source diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/distributed-1.25.2/distributed/protocol/serialize.py new/distributed-1.25.3/distributed/protocol/serialize.py --- old/distributed-1.25.2/distributed/protocol/serialize.py 2019-01-03 18:30:24.000000000 +0100 +++ new/distributed-1.25.3/distributed/protocol/serialize.py 2019-01-23 01:35:56.000000000 +0100 @@ -171,6 +171,9 @@ ---------- header: dict frames: list of bytes + deserializers : Optional[Dict[str, Tuple[Callable, Callable, bool]]] + An optional dict mapping a name to a (de)serializer. + See `dask_serialize` and `dask_deserialize` for more. See Also -------- diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/distributed-1.25.2/distributed/scheduler.py new/distributed-1.25.3/distributed/scheduler.py --- old/distributed-1.25.2/distributed/scheduler.py 2019-01-03 18:30:24.000000000 +0100 +++ new/distributed-1.25.3/distributed/scheduler.py 2019-01-31 19:43:36.000000000 +0100 @@ -764,7 +764,6 @@ def __init__( self, - center=None, loop=None, delete_interval='500ms', synchronize_worker_interval='60s', diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/distributed-1.25.2/distributed/tests/test_client.py new/distributed-1.25.3/distributed/tests/test_client.py --- old/distributed-1.25.2/distributed/tests/test_client.py 2019-01-03 18:30:24.000000000 +0100 +++ new/distributed-1.25.3/distributed/tests/test_client.py 2019-01-31 19:43:38.000000000 +0100 @@ -3243,17 +3243,17 @@ def test_default_get(): with cluster() as (s, [a, b]): pre_get = dask.base.get_scheduler() - pre_shuffle = dask.config.get('shuffle', None) + pytest.raises(KeyError, dask.config.get, 'shuffle') with Client(s['address'], set_as_default=True) as c: assert dask.base.get_scheduler() == c.get assert dask.config.get('shuffle') == 'tasks' assert dask.base.get_scheduler() == pre_get - assert dask.config.get('shuffle') == pre_shuffle + pytest.raises(KeyError, dask.config.get, 'shuffle') c = Client(s['address'], set_as_default=False) assert dask.base.get_scheduler() == pre_get - assert dask.config.get('shuffle') == pre_shuffle + pytest.raises(KeyError, dask.config.get, 'shuffle') c.close() c = Client(s['address'], set_as_default=True) @@ -3261,7 +3261,7 @@ assert dask.base.get_scheduler() == c.get c.close() assert dask.base.get_scheduler() == pre_get - assert dask.config.get('shuffle') == pre_shuffle + pytest.raises(KeyError, dask.config.get, 'shuffle') with Client(s['address']) as c: assert dask.base.get_scheduler() == c.get @@ -4579,6 +4579,22 @@ ), line +def test_quiet_client_close_when_cluster_is_closed_before_client(loop): + n_attempts = 5 + # Trying a few times to reduce the flakiness of the test. Without the bug + # fix in #2477 and with 5 attempts, this test passes by chance in about 10% + # of the cases. + for _ in range(n_attempts): + with captured_logger(logging.getLogger('tornado.application')) as logger: + cluster = LocalCluster(loop=loop) + client = Client(cluster, loop=loop) + cluster.close() + client.close() + + out = logger.getvalue() + assert 'CancelledError' not in out + + @gen_cluster() def test_close(s, a, b): c = yield Client(s.address, asynchronous=True) @@ -5469,5 +5485,18 @@ assert result == 101 +@gen_cluster(client=True, check_new_threads=False) +def test_profile_bokeh(c, s, a, b): + pytest.importorskip('bokeh.plotting') + from bokeh.model import Model + yield c.map(slowinc, range(10), delay=0.2) + state, figure = yield c.profile(plot=True) + assert isinstance(figure, Model) + + with tmpfile('html') as fn: + yield c.profile(filename=fn) + assert os.path.exists(fn) + + if sys.version_info >= (3, 5): from distributed.tests.py3_test_client import * # noqa F401 diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/distributed-1.25.2/distributed/tests/test_counter.py new/distributed-1.25.3/distributed/tests/test_counter.py --- old/distributed-1.25.2/distributed/tests/test_counter.py 2019-01-03 18:30:24.000000000 +0100 +++ new/distributed-1.25.3/distributed/tests/test_counter.py 2019-01-25 19:01:55.000000000 +0100 @@ -11,9 +11,11 @@ Digest = None -@pytest.mark.parametrize('CD,size', [(Counter, lambda d: sum(d.values())), - pytest.mark.skipif(not Digest, reason="no crick library")( - (Digest, lambda x: x.size()))]) +@pytest.mark.parametrize('CD,size', [ + (Counter, lambda d: sum(d.values())), + pytest.param(Digest, lambda x: x.size(), + marks=pytest.mark.skipif(not Digest, reason="no crick library")) +]) def test_digest(loop, CD, size): c = CD(loop=loop) c.add(1) diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/distributed-1.25.2/distributed/tests/test_resources.py new/distributed-1.25.3/distributed/tests/test_resources.py --- old/distributed-1.25.2/distributed/tests/test_resources.py 2019-01-03 18:30:24.000000000 +0100 +++ new/distributed-1.25.3/distributed/tests/test_resources.py 2019-01-25 19:01:55.000000000 +0100 @@ -277,8 +277,10 @@ @pytest.mark.parametrize('optimize_graph', [ - pytest.mark.xfail(True, reason="don't track resources through optimization"), - pytest.mark.skipif(WINDOWS, False, reason="intermittent failure"), + pytest.param(True, + marks=pytest.mark.xfail(reason="don't track resources through optimization")), + pytest.param(False, + marks=pytest.mark.skipif(WINDOWS, reason="intermittent failure")) ]) def test_collections_get(client, optimize_graph, s, a, b): da = pytest.importorskip('dask.array') diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/distributed-1.25.2/distributed/tests/test_steal.py new/distributed-1.25.3/distributed/tests/test_steal.py --- old/distributed-1.25.2/distributed/tests/test_steal.py 2019-01-03 18:30:24.000000000 +0100 +++ new/distributed-1.25.3/distributed/tests/test_steal.py 2019-01-25 19:01:55.000000000 +0100 @@ -468,12 +468,9 @@ [1], [1]]), - pytest.mark.xfail(([[1, 1, 1, 1, 1, 1, 1], - [1, 1], [1, 1], [1, 1], - []], - [[1, 1, 1, 1, 1], - [1, 1], [1, 1], [1, 1], - [1, 1]]), reason="Some uncertainty based on executing stolen task") + pytest.param([[1, 1, 1, 1, 1, 1, 1], [1, 1], [1, 1], [1, 1], []], + [[1, 1, 1, 1, 1], [1, 1], [1, 1], [1, 1], [1, 1]], + marks=pytest.mark.xfail(reason="Some uncertainty based on executing stolen task")) ]) def test_balance(inp, expected): test = lambda *args, **kwargs: assert_balanced(inp, expected, *args, **kwargs) diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/distributed-1.25.2/distributed/utils_test.py new/distributed-1.25.3/distributed/utils_test.py --- old/distributed-1.25.2/distributed/utils_test.py 2019-01-03 18:30:24.000000000 +0100 +++ new/distributed-1.25.3/distributed/utils_test.py 2019-01-31 19:43:36.000000000 +0100 @@ -136,7 +136,7 @@ start = time() while set(_global_clients): sleep(0.1) - assert time() < start + 5 + assert time() < start + 10 _cleanup_dangling() diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/distributed-1.25.2/distributed.egg-info/PKG-INFO new/distributed-1.25.3/distributed.egg-info/PKG-INFO --- old/distributed-1.25.2/distributed.egg-info/PKG-INFO 2019-01-04 23:14:53.000000000 +0100 +++ new/distributed-1.25.3/distributed.egg-info/PKG-INFO 2019-01-31 21:17:27.000000000 +0100 @@ -1,6 +1,6 @@ Metadata-Version: 1.2 Name: distributed -Version: 1.25.2 +Version: 1.25.3 Summary: Distributed scheduler for Dask Home-page: https://distributed.readthedocs.io/en/latest/ Maintainer: Matthew Rocklin diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/distributed-1.25.2/docs/source/changelog.rst new/distributed-1.25.3/docs/source/changelog.rst --- old/distributed-1.25.2/docs/source/changelog.rst 2019-01-04 23:13:19.000000000 +0100 +++ new/distributed-1.25.3/docs/source/changelog.rst 2019-01-31 21:15:58.000000000 +0100 @@ -1,6 +1,26 @@ Changelog ========= +1.25.3 - 2019-01-31 +------------------- + +- Fix excess threading on missing connections (:pr:`2403`) `Daniel Farrell`_ +- Fix typo in doc (:pr:`2457`) `Loïc Estève`_ +- Start fewer but larger workers with LocalCluster (:pr:`2452`) `Matthew Rocklin`_ +- Check for non-zero ``length`` first in ``read`` loop (:pr:`2465`) `John Kirkham`_ +- DOC: Use of local cluster in script (:pr:`2462`) `Peter Killick`_ +- DOC/API: Signature for base class write / read (:pr:`2472`) `Tom Augspurger`_ +- Support Pytest 4 in Tests (:pr:`2478`) `Adam Beberg`_ +- Ensure async behavior in event loop with LocalCluster (:pr:`2484`) `Matthew Rocklin`_ +- Fix spurious CancelledError (:pr:`2485`) `Loïc Estève`_ +- Properly reset dask.config scheduler and shuffle when closing the client (:pr:`2475`) `George Sakkis`_ +- Make it more explict that resources are per worker. (:pr:`2470`) `Loïc Estève`_ +- Remove references to center (:pr:`2488`) `Matthew Rocklin`_ +- Expand client clearing timeout to 10s in testing (:pr:`2493`) `Matthew Rocklin`_ +- Propagate key keyword in progressbar (:pr:`2492`) `Matthew Rocklin`_ +- Use provided cluster's IOLoop if present in Client (:pr:`2494`) `Matthew Rocklin`_ + + 1.25.2 - 2019-01-04 ------------------- @@ -897,3 +917,6 @@ .. _`Stephan Hoyer`: https://github.com/shoyer .. _`tjb900`: https://github.com/tjb900 .. _`Dirk Petersen`: https://github.com/dirkpetersen +.. _`Daniel Farrell`: https://github.com/danpf +.. _`George Sakkis`: https://github.com/gsakkis +.. _`Adam Beberg`: https://github.com/beberg diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/distributed-1.25.2/docs/source/diagnosing-performance.rst new/distributed-1.25.3/docs/source/diagnosing-performance.rst --- old/distributed-1.25.2/docs/source/diagnosing-performance.rst 2019-01-03 18:30:24.000000000 +0100 +++ new/distributed-1.25.3/docs/source/diagnosing-performance.rst 2019-01-31 19:43:38.000000000 +0100 @@ -84,7 +84,9 @@ Users can also query this data directly using the :doc:`Client.profile <api>` function. This will deliver the raw data structure used to produce these -plots. +plots. They can also pass a filename to save the plot as an HTML file +directly. Note that this file will have to be served from a webserver like +``python -m http.server`` to be visible. The 10ms and 1s parameters can be controlled by the ``profile-interval`` and ``profile-cycle-interval`` entries in the config.yaml file. diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/distributed-1.25.2/docs/source/local-cluster.rst new/distributed-1.25.3/docs/source/local-cluster.rst --- old/distributed-1.25.2/docs/source/local-cluster.rst 2019-01-03 18:30:24.000000000 +0100 +++ new/distributed-1.25.3/docs/source/local-cluster.rst 2019-01-19 18:17:58.000000000 +0100 @@ -28,6 +28,18 @@ >>> client <Client: scheduler=127.0.0.1:8786 processes=8 cores=8> +.. note:: + + Within a Python script you need to start a local cluster in the + ``if __name__ == '__main__'`` block: + + .. code-block:: python + + if __name__ == '__main__': + cluster = LocalCluster() + client = Client(cluster) + # Your code follows here + API --- diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/distributed-1.25.2/docs/source/resources.rst new/distributed-1.25.3/docs/source/resources.rst --- old/distributed-1.25.2/docs/source/resources.rst 2019-01-03 18:30:24.000000000 +0100 +++ new/distributed-1.25.3/docs/source/resources.rst 2019-01-30 00:39:16.000000000 +0100 @@ -48,6 +48,34 @@ final = client.submit(aggregate, processed, resources={'MEMORY': 70e9}) +Resources are applied separately to each worker process +------------------------------------------------------- + +If you are using ``dask-worker --nprocs <nprocs>`` the resource will be applied +separately to each of the ``nprocs`` worker processes. Suppose you have 2 GPUs +on your machine, if you want to use two worker processes, you have 1 GPU per +worker process so you need to do something like this:: + + dask-worker scheduler:8786 --nprocs 2 --resources "GPU=1" + +Here is an example that illustrates how to use resources to ensure each task is +run inside a separate process, which is useful to execute non thread-safe tasks +or tasks that uses multithreading internally:: + + dask-worker scheduler:8786 --nprocs 3 --nthreads 2 --resources "process=1" + +With the code below, there will be at most 3 tasks running concurrently and +each task will run in a separate process: + +.. code-block:: python + + from distributed import Client + client = Client('scheduler:8786') + + futures = [client.submit(non_thread_safe_function, arg, + resources={'process': 1}) for arg in args] + + Resources are Abstract ---------------------- @@ -71,7 +99,7 @@ x = dd.read_csv(...) y = x.map_partitions(func1) - z = y.map_parititons(func2) + z = y.map_partitions(func2) z.compute(resources={tuple(y.__dask_keys__()): {'GPU': 1}) diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/distributed-1.25.2/docs/source/serialization.rst new/distributed-1.25.3/docs/source/serialization.rst --- old/distributed-1.25.2/docs/source/serialization.rst 2019-01-03 18:30:24.000000000 +0100 +++ new/distributed-1.25.3/docs/source/serialization.rst 2019-01-23 01:35:56.000000000 +0100 @@ -1,3 +1,5 @@ +.. _serialization: + Serialization ============= diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/distributed-1.25.2/docs/source/setup.rst new/distributed-1.25.3/docs/source/setup.rst --- old/distributed-1.25.2/docs/source/setup.rst 2019-01-03 18:30:24.000000000 +0100 +++ new/distributed-1.25.3/docs/source/setup.rst 2019-01-31 19:43:36.000000000 +0100 @@ -21,16 +21,16 @@ node that hosts ``dask-scheduler``:: $ dask-worker 192.168.0.1:8786 - Start worker at: 192.168.0.2:12345 - Registered with center at: 192.168.0.1:8786 + Start worker at: 192.168.0.2:12345 + Registered with Scheduler at: 192.168.0.1:8786 $ dask-worker 192.168.0.1:8786 - Start worker at: 192.168.0.3:12346 - Registered with center at: 192.168.0.1:8786 + Start worker at: 192.168.0.3:12346 + Registered with Scheduler at: 192.168.0.1:8786 $ dask-worker 192.168.0.1:8786 - Start worker at: 192.168.0.4:12347 - Registered with center at: 192.168.0.1:8786 + Start worker at: 192.168.0.4:12347 + Registered with Scheduler at: 192.168.0.1:8786 There are various mechanisms to deploy these executables on a cluster, ranging from manualy SSH-ing into all of the nodes to more automated systems like