Hello community,

here is the log from the commit of package python-distributed for 
openSUSE:Factory checked in at 2019-02-25 17:50:30
++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Comparing /work/SRC/openSUSE:Factory/python-distributed (Old)
 and      /work/SRC/openSUSE:Factory/.python-distributed.new.28833 (New)
++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++

Package is "python-distributed"

Mon Feb 25 17:50:30 2019 rev:15 rq:670743 version:1.25.3

Changes:
--------
--- /work/SRC/openSUSE:Factory/python-distributed/python-distributed.changes    
2019-01-08 12:31:21.972083379 +0100
+++ 
/work/SRC/openSUSE:Factory/.python-distributed.new.28833/python-distributed.changes
 2019-02-25 17:50:33.850779045 +0100
@@ -1,0 +2,29 @@
+Sat Feb  2 17:10:13 UTC 2019 - Arun Persaud <a...@gmx.de>
+
+- update to version 1.25.3:
+  * Fix excess threading on missing connections (:pr:`2403`) Daniel
+    Farrell
+  * Fix typo in doc (:pr:`2457`) Loïc Estève
+  * Start fewer but larger workers with LocalCluster (:pr:`2452`)
+    Matthew Rocklin
+  * Check for non-zero length first in read loop (:pr:`2465`) John
+    Kirkham
+  * DOC: Use of local cluster in script (:pr:`2462`) Peter Killick
+  * DOC/API: Signature for base class write / read (:pr:`2472`) Tom
+    Augspurger
+  * Support Pytest 4 in Tests (:pr:`2478`) Adam Beberg
+  * Ensure async behavior in event loop with LocalCluster (:pr:`2484`)
+    Matthew Rocklin
+  * Fix spurious CancelledError (:pr:`2485`) Loïc Estève
+  * Properly reset dask.config scheduler and shuffle when closing the
+    client (:pr:`2475`) George Sakkis
+  * Make it more explict that resources are per worker. (:pr:`2470`)
+    Loïc Estève
+  * Remove references to center (:pr:`2488`) Matthew Rocklin
+  * Expand client clearing timeout to 10s in testing (:pr:`2493`)
+    Matthew Rocklin
+  * Propagate key keyword in progressbar (:pr:`2492`) Matthew Rocklin
+  * Use provided cluster's IOLoop if present in Client (:pr:`2494`)
+    Matthew Rocklin
+
+-------------------------------------------------------------------

Old:
----
  distributed-1.25.2.tar.gz

New:
----
  distributed-1.25.3.tar.gz

++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++

Other differences:
------------------
++++++ python-distributed.spec ++++++
--- /var/tmp/diff_new_pack.DnnGMM/_old  2019-02-25 17:50:34.358778768 +0100
+++ /var/tmp/diff_new_pack.DnnGMM/_new  2019-02-25 17:50:34.362778766 +0100
@@ -20,7 +20,7 @@
 # Test requires network connection
 %bcond_with     test
 Name:           python-distributed
-Version:        1.25.2
+Version:        1.25.3
 Release:        0
 Summary:        Library for distributed computing with Python
 License:        BSD-3-Clause

++++++ distributed-1.25.2.tar.gz -> distributed-1.25.3.tar.gz ++++++
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/distributed-1.25.2/PKG-INFO 
new/distributed-1.25.3/PKG-INFO
--- old/distributed-1.25.2/PKG-INFO     2019-01-04 23:14:54.000000000 +0100
+++ new/distributed-1.25.3/PKG-INFO     2019-01-31 21:17:27.000000000 +0100
@@ -1,6 +1,6 @@
 Metadata-Version: 1.2
 Name: distributed
-Version: 1.25.2
+Version: 1.25.3
 Summary: Distributed scheduler for Dask
 Home-page: https://distributed.readthedocs.io/en/latest/
 Maintainer: Matthew Rocklin
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/distributed-1.25.2/distributed/_version.py 
new/distributed-1.25.3/distributed/_version.py
--- old/distributed-1.25.2/distributed/_version.py      2019-01-04 
23:14:54.000000000 +0100
+++ new/distributed-1.25.3/distributed/_version.py      2019-01-31 
21:17:27.000000000 +0100
@@ -8,11 +8,11 @@
 
 version_json = '''
 {
- "date": "2019-01-04T14:14:00-0800",
+ "date": "2019-01-31T12:16:55-0800",
  "dirty": false,
  "error": null,
- "full-revisionid": "4e38022ed91b7d90ffe54703e9975d94a37fb9c3",
- "version": "1.25.2"
+ "full-revisionid": "f7abbd68b824dc03c8535b57f9e914bddd1d447c",
+ "version": "1.25.3"
 }
 '''  # END VERSION_JSON
 
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/distributed-1.25.2/distributed/bokeh/components.py 
new/distributed-1.25.3/distributed/bokeh/components.py
--- old/distributed-1.25.2/distributed/bokeh/components.py      2019-01-03 
18:30:24.000000000 +0100
+++ new/distributed-1.25.3/distributed/bokeh/components.py      2019-01-31 
19:43:38.000000000 +0100
@@ -311,7 +311,7 @@
         state = profile.create()
         data = profile.plot_data(state, profile_interval)
         self.states = data.pop('states')
-        self.source = ColumnDataSource(data=data)
+        self.root, self.source = profile.plot_figure(data, **kwargs)
 
         @without_property_validation
         def cb(attr, old, new):
@@ -335,45 +335,6 @@
         else:
             self.source.on_change('selected', cb)
 
-        self.root = figure(tools='tap', **kwargs)
-        self.root.quad('left', 'right', 'top', 'bottom', color='color',
-                      line_color='black', line_width=2, source=self.source)
-
-        hover = HoverTool(
-            point_policy="follow_mouse",
-            tooltips="""
-                <div>
-                    <span style="font-size: 14px; font-weight: 
bold;">Name:</span>&nbsp;
-                    <span style="font-size: 10px; font-family: Monaco, 
monospace;">@name</span>
-                </div>
-                <div>
-                    <span style="font-size: 14px; font-weight: 
bold;">Filename:</span>&nbsp;
-                    <span style="font-size: 10px; font-family: Monaco, 
monospace;">@filename</span>
-                </div>
-                <div>
-                    <span style="font-size: 14px; font-weight: bold;">Line 
number:</span>&nbsp;
-                    <span style="font-size: 10px; font-family: Monaco, 
monospace;">@line_number</span>
-                </div>
-                <div>
-                    <span style="font-size: 14px; font-weight: 
bold;">Line:</span>&nbsp;
-                    <span style="font-size: 10px; font-family: Monaco, 
monospace;">@line</span>
-                </div>
-                <div>
-                    <span style="font-size: 14px; font-weight: 
bold;">Time:</span>&nbsp;
-                    <span style="font-size: 10px; font-family: Monaco, 
monospace;">@time</span>
-                </div>
-                <div>
-                    <span style="font-size: 14px; font-weight: 
bold;">Percentage:</span>&nbsp;
-                    <span style="font-size: 10px; font-family: Monaco, 
monospace;">@width</span>
-                </div>
-                """
-        )
-        self.root.add_tools(hover)
-
-        self.root.xaxis.visible = False
-        self.root.yaxis.visible = False
-        self.root.grid.visible = False
-
     @without_property_validation
     def update(self, state):
         with log_errors():
@@ -412,7 +373,7 @@
         self.state = profile.create()
         data = profile.plot_data(self.state, profile_interval)
         self.states = data.pop('states')
-        self.source = ColumnDataSource(data=data)
+        self.profile_plot, self.source = profile.plot_figure(data, **kwargs)
 
         changing = [False]  # avoid repeated changes from within callback
 
@@ -445,47 +406,6 @@
         else:
             self.source.on_change('selected', cb)
 
-        self.profile_plot = figure(tools='tap', height=400, **kwargs)
-        r = self.profile_plot.quad('left', 'right', 'top', 'bottom', 
color='color',
-                                   line_color='black', source=self.source)
-        r.selection_glyph = None
-        r.nonselection_glyph = None
-
-        hover = HoverTool(
-            point_policy="follow_mouse",
-            tooltips="""
-                <div>
-                    <span style="font-size: 14px; font-weight: 
bold;">Name:</span>&nbsp;
-                    <span style="font-size: 10px; font-family: Monaco, 
monospace;">@name</span>
-                </div>
-                <div>
-                    <span style="font-size: 14px; font-weight: 
bold;">Filename:</span>&nbsp;
-                    <span style="font-size: 10px; font-family: Monaco, 
monospace;">@filename</span>
-                </div>
-                <div>
-                    <span style="font-size: 14px; font-weight: bold;">Line 
number:</span>&nbsp;
-                    <span style="font-size: 10px; font-family: Monaco, 
monospace;">@line_number</span>
-                </div>
-                <div>
-                    <span style="font-size: 14px; font-weight: 
bold;">Line:</span>&nbsp;
-                    <span style="font-size: 10px; font-family: Monaco, 
monospace;">@line</span>
-                </div>
-                <div>
-                    <span style="font-size: 14px; font-weight: 
bold;">Time:</span>&nbsp;
-                    <span style="font-size: 10px; font-family: Monaco, 
monospace;">@time</span>
-                </div>
-                <div>
-                    <span style="font-size: 14px; font-weight: 
bold;">Percentage:</span>&nbsp;
-                    <span style="font-size: 10px; font-family: Monaco, 
monospace;">@percentage</span>
-                </div>
-                """
-        )
-        self.profile_plot.add_tools(hover)
-
-        self.profile_plot.xaxis.visible = False
-        self.profile_plot.yaxis.visible = False
-        self.profile_plot.grid.visible = False
-
         self.ts_source = ColumnDataSource({'time': [], 'count': []})
         self.ts_plot = figure(title='Activity over time', height=100,
                               x_axis_type='datetime', 
active_drag='xbox_select',
@@ -591,7 +511,7 @@
         self.state = profile.get_profile(self.log)
         data = profile.plot_data(self.state, profile_interval)
         self.states = data.pop('states')
-        self.source = ColumnDataSource(data=data)
+        self.profile_plot, self.source = profile.plot_figure(data, **kwargs)
 
         changing = [False]  # avoid repeated changes from within callback
 
@@ -624,47 +544,6 @@
         else:
             self.source.on_change('selected', cb)
 
-        self.profile_plot = figure(tools='tap', height=400, **kwargs)
-        r = self.profile_plot.quad('left', 'right', 'top', 'bottom', 
color='color',
-                                   line_color='black', source=self.source)
-        r.selection_glyph = None
-        r.nonselection_glyph = None
-
-        hover = HoverTool(
-            point_policy="follow_mouse",
-            tooltips="""
-                <div>
-                    <span style="font-size: 14px; font-weight: 
bold;">Name:</span>&nbsp;
-                    <span style="font-size: 10px; font-family: Monaco, 
monospace;">@name</span>
-                </div>
-                <div>
-                    <span style="font-size: 14px; font-weight: 
bold;">Filename:</span>&nbsp;
-                    <span style="font-size: 10px; font-family: Monaco, 
monospace;">@filename</span>
-                </div>
-                <div>
-                    <span style="font-size: 14px; font-weight: bold;">Line 
number:</span>&nbsp;
-                    <span style="font-size: 10px; font-family: Monaco, 
monospace;">@line_number</span>
-                </div>
-                <div>
-                    <span style="font-size: 14px; font-weight: 
bold;">Line:</span>&nbsp;
-                    <span style="font-size: 10px; font-family: Monaco, 
monospace;">@line</span>
-                </div>
-                <div>
-                    <span style="font-size: 14px; font-weight: 
bold;">Time:</span>&nbsp;
-                    <span style="font-size: 10px; font-family: Monaco, 
monospace;">@time</span>
-                </div>
-                <div>
-                    <span style="font-size: 14px; font-weight: 
bold;">Percentage:</span>&nbsp;
-                    <span style="font-size: 10px; font-family: Monaco, 
monospace;">@percentage</span>
-                </div>
-                """
-        )
-        self.profile_plot.add_tools(hover)
-
-        self.profile_plot.xaxis.visible = False
-        self.profile_plot.yaxis.visible = False
-        self.profile_plot.grid.visible = False
-
         self.ts_source = ColumnDataSource({'time': [], 'count': []})
         self.ts_plot = figure(title='Activity over time', height=100,
                               x_axis_type='datetime', 
active_drag='xbox_select',
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' 
old/distributed-1.25.2/distributed/bokeh/scheduler_html.py 
new/distributed-1.25.3/distributed/bokeh/scheduler_html.py
--- old/distributed-1.25.2/distributed/bokeh/scheduler_html.py  2019-01-03 
18:30:24.000000000 +0100
+++ new/distributed-1.25.3/distributed/bokeh/scheduler_html.py  2019-01-31 
19:43:38.000000000 +0100
@@ -27,6 +27,7 @@
         with log_errors():
             self.render('workers.html',
                         title='Workers',
+                        scheduler=self.server,
                         **toolz.merge(self.server.__dict__, ns, self.extra))
 
 
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' 
old/distributed-1.25.2/distributed/bokeh/templates/workers.html 
new/distributed-1.25.3/distributed/bokeh/templates/workers.html
--- old/distributed-1.25.2/distributed/bokeh/templates/workers.html     
2019-01-03 18:30:24.000000000 +0100
+++ new/distributed-1.25.3/distributed/bokeh/templates/workers.html     
2019-01-31 19:43:38.000000000 +0100
@@ -1,6 +1,8 @@
 {% extends main.html %}
 {% block content %}
 
+  <h1 class="title"> Scheduler {{scheduler.address}} </h1>
+
   <a class="button is-primary" href="logs.html">Logs</a>
   <a class="button is-primary" href="../../status">Bokeh</a>
 
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/distributed-1.25.2/distributed/cli/dask_worker.py 
new/distributed-1.25.3/distributed/cli/dask_worker.py
--- old/distributed-1.25.2/distributed/cli/dask_worker.py       2019-01-03 
18:30:24.000000000 +0100
+++ new/distributed-1.25.3/distributed/cli/dask_worker.py       2019-01-30 
00:39:16.000000000 +0100
@@ -82,7 +82,9 @@
 @click.option('--local-directory', default='', type=str,
               help="Directory to place worker files")
 @click.option('--resources', type=str, default='',
-              help='Resources for task constraints like "GPU=2 MEM=10e9"')
+              help='Resources for task constraints like "GPU=2 MEM=10e9". '
+                   'Resources are applied separately to each worker process '
+                   "(only relevant when starting multiple worker processes 
with '--nprocs').")
 @click.option('--scheduler-file', type=str, default='',
               help='Filename to JSON encoded scheduler information. '
                    'Use with dask-scheduler --scheduler-file')
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/distributed-1.25.2/distributed/client.py 
new/distributed-1.25.3/distributed/client.py
--- old/distributed-1.25.2/distributed/client.py        2019-01-03 
18:30:24.000000000 +0100
+++ new/distributed-1.25.3/distributed/client.py        2019-01-31 
19:43:38.000000000 +0100
@@ -572,6 +572,20 @@
         else:
             self.connection_args = self.security.get_connection_args('client')
 
+        if address is None:
+            address = dask.config.get('scheduler-address', None)
+            if address:
+                logger.info("Config value `scheduler-address` found: %s",
+                            address)
+
+        if isinstance(address, (rpc, PooledRPCCall)):
+            self.scheduler = address
+        elif hasattr(address, "scheduler_address"):
+            # It's a LocalCluster or LocalCluster-compatible object
+            self.cluster = address
+            with ignoring(AttributeError):
+                loop = address.loop
+
         self._connecting_to_scheduler = False
         self._asynchronous = asynchronous
         self._should_close_loop = not loop
@@ -592,25 +606,10 @@
                 io_loop=self.loop
         )
 
-        if address is None:
-            address = dask.config.get('scheduler-address', None)
-            if address:
-                logger.info("Config value `scheduler-address` found: %s",
-                            address)
-
-        if isinstance(address, (rpc, PooledRPCCall)):
-            self.scheduler = address
-        elif hasattr(address, "scheduler_address"):
-            # It's a LocalCluster or LocalCluster-compatible object
-            self.cluster = address
-
         self._start_arg = address
         if set_as_default:
-            self._previous_scheduler = dask.config.get('scheduler', None)
-            dask.config.set(scheduler='dask.distributed')
-
-            self._previous_shuffle = dask.config.get('shuffle', None)
-            dask.config.set(shuffle='tasks')
+            self._set_config = dask.config.set(scheduler='dask.distributed',
+                                               shuffle='tasks')
 
         self._stream_handlers = {
             'key-in-memory': self._handle_key_in_memory,
@@ -1074,9 +1073,9 @@
                 pc.stop()
             self._scheduler_identity = {}
             with ignoring(AttributeError):
-                dask.config.set(scheduler=self._previous_scheduler)
-            with ignoring(AttributeError):
-                dask.config.set(shuffle=self._previous_shuffle)
+                # clear the dask.config set keys
+                with self._set_config:
+                    pass
             if self.get == dask.config.get('get', None):
                 del dask.config.config['get']
             if self.status == 'closed':
@@ -1090,7 +1089,8 @@
             # This makes the shutdown slightly smoother and quieter
             with ignoring(AttributeError, gen.TimeoutError):
                 yield gen.with_timeout(timedelta(milliseconds=100),
-                                       self._handle_scheduler_coroutine)
+                                       self._handle_scheduler_coroutine,
+                                       quiet_exceptions=(CancelledError,))
 
             if self.scheduler_comm and self.scheduler_comm.comm and not 
self.scheduler_comm.comm.closed():
                 yield self.scheduler_comm.close()
@@ -1159,13 +1159,6 @@
         if self._should_close_loop and not shutting_down():
             self._loop_runner.stop()
 
-        with ignoring(AttributeError):
-            dask.config.set(scheduler=self._previous_scheduler)
-        with ignoring(AttributeError):
-            dask.config.set(shuffle=self._previous_shuffle)
-        if self.get == dask.config.get('get', None):
-            del dask.config.config['get']
-
     def shutdown(self, *args, **kwargs):
         """ Deprecated, see close instead
 
@@ -2984,7 +2977,8 @@
             keys += list(map(tokey, {f.key for f in futures}))
         return self.sync(self.scheduler.call_stack, keys=keys or None)
 
-    def profile(self, key=None, start=None, stop=None, workers=None, 
merge_workers=True):
+    def profile(self, key=None, start=None, stop=None, workers=None,
+                merge_workers=True, plot=False, filename=None):
         """ Collect statistical profiling information about recent work
 
         Parameters
@@ -2996,16 +2990,49 @@
         stop: time
         workers: list
             List of workers to restrict profile information
+        plot: boolean or string
+            Whether or not to return a plot object
+        filename: str
+            Filename to save the plot
 
         Examples
         --------
         >>> client.profile()  # call on collections
+        >>> client.profile(filename='dask-profile.html')  # save to html file
         """
         if isinstance(workers, six.string_types + (Number,)):
             workers = [workers]
 
-        return self.sync(self.scheduler.profile, key=key, workers=workers,
-                         merge_workers=merge_workers, start=start, stop=stop)
+        return self.sync(self._profile, key=key, workers=workers,
+                         merge_workers=merge_workers, start=start, stop=stop,
+                         plot=plot, filename=filename)
+
+    @gen.coroutine
+    def _profile(self, key=None, start=None, stop=None, workers=None,
+                 merge_workers=True, plot=False, filename=None):
+        if isinstance(workers, six.string_types + (Number,)):
+            workers = [workers]
+
+        state = yield self.scheduler.profile(key=key, workers=workers,
+                merge_workers=merge_workers, start=start, stop=stop)
+
+        if filename:
+            plot = True
+
+        if plot:
+            from . import profile
+            data = profile.plot_data(state)
+            figure, source = profile.plot_figure(data, 
sizing_mode='stretch_both')
+
+            if plot == 'save' and not filename:
+                filename = 'dask-profile.html'
+
+            from bokeh.plotting import save
+            save(figure, title='Dask Profile', filename=filename)
+            raise gen.Return((state, figure))
+
+        else:
+            raise gen.Return(state)
 
     def scheduler_info(self, **kwargs):
         """ Basic information about the workers in the cluster
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/distributed-1.25.2/distributed/comm/core.py 
new/distributed-1.25.3/distributed/comm/core.py
--- old/distributed-1.25.2/distributed/comm/core.py     2019-01-03 
18:30:24.000000000 +0100
+++ new/distributed-1.25.3/distributed/comm/core.py     2019-01-28 
19:00:36.000000000 +0100
@@ -40,19 +40,32 @@
     # XXX add set_close_callback()?
 
     @abstractmethod
-    def read(self):
+    def read(self, deserializers=None):
         """
         Read and return a message (a Python object).
 
         This method is a coroutine.
+
+        Parameters
+        ----------
+        deserializers : Optional[Dict[str, Tuple[Callable, Callable, bool]]]
+            An optional dict appropriate for distributed.protocol.deserialize.
+            See :ref:`serialization` for more.
         """
 
     @abstractmethod
-    def write(self, msg):
+    def write(self, msg, on_error=None):
         """
         Write a message (a Python object).
 
         This method is a coroutine.
+
+        Parameters
+        ----------
+        msg :
+        on_error : Optional[str]
+            The behavior when serialization fails. See
+            ``distributed.protocol.core.dumps`` for valid values.
         """
 
     @abstractmethod
@@ -181,6 +194,7 @@
                % (addr, timeout, error))
         raise IOError(msg)
 
+    # This starts a thread
     while True:
         try:
             future = connector.connect(loc, deserialize=deserialize,
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/distributed-1.25.2/distributed/comm/tcp.py 
new/distributed-1.25.3/distributed/comm/tcp.py
--- old/distributed-1.25.2/distributed/comm/tcp.py      2019-01-03 
18:30:24.000000000 +0100
+++ new/distributed-1.25.3/distributed/comm/tcp.py      2019-01-19 
18:17:58.000000000 +0100
@@ -11,6 +11,8 @@
 except ImportError:
     ssl = None
 
+from concurrent.futures import ThreadPoolExecutor
+
 import dask
 import tornado
 from tornado import gen, netutil
@@ -184,16 +186,15 @@
 
             frames = []
             for length in lengths:
-                if PY3 and self._iostream_has_read_into:
-                    frame = bytearray(length)
-                    if length:
+                if length:
+                    if PY3 and self._iostream_has_read_into:
+                        frame = bytearray(length)
                         n = yield stream.read_into(frame)
                         assert n == length, (n, length)
-                else:
-                    if length:
-                        frame = yield stream.read_bytes(length)
                     else:
-                        frame = b''
+                        frame = yield stream.read_bytes(length)
+                else:
+                    frame = b''
                 frames.append(frame)
         except StreamClosedError as e:
             self.stream = None
@@ -320,6 +321,13 @@
 
 
 class BaseTCPConnector(Connector, RequireEncryptionMixin):
+    if PY3:  # see github PR #2403 discussion for more info
+        _executor = ThreadPoolExecutor(2)
+        _resolver = netutil.ExecutorResolver(close_executor=False,
+                                             executor=_executor)
+    else:
+        _resolver = None
+    client = TCPClient(resolver=_resolver)
 
     @gen.coroutine
     def connect(self, address, deserialize=True, **connection_args):
@@ -327,11 +335,11 @@
         ip, port = parse_host_port(address)
         kwargs = self._get_connect_args(**connection_args)
 
-        client = TCPClient()
         try:
-            stream = yield client.connect(ip, port,
+            stream = yield BaseTCPConnector.client.connect(ip, port,
                                           max_buffer_size=MAX_BUFFER_SIZE,
                                           **kwargs)
+
             # Under certain circumstances tornado will have a closed 
connnection with an error and not raise
             # a StreamClosedError.
             #
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' 
old/distributed-1.25.2/distributed/comm/tests/test_comms.py 
new/distributed-1.25.3/distributed/comm/tests/test_comms.py
--- old/distributed-1.25.2/distributed/comm/tests/test_comms.py 2019-01-03 
18:30:24.000000000 +0100
+++ new/distributed-1.25.3/distributed/comm/tests/test_comms.py 2019-01-09 
18:45:51.000000000 +0100
@@ -11,6 +11,7 @@
 from tornado import gen, ioloop, locks, queues
 from tornado.concurrent import Future
 
+from distributed.compatibility import PY3
 from distributed.metrics import time
 from distributed.utils import get_ip, get_ipv6
 from distributed.utils_test import (gen_test, requires_ipv6, has_ipv6,
@@ -289,6 +290,46 @@
     assert set(l) == {1234} | set(range(N))
 
 
+@gen_test()
+def test_comm_failure_threading():
+    """
+    When we fail to connect, make sure we don't make a lot
+    of threads.
+
+    We only assert for PY3, because the thread limit only is
+    set for python 3.  See github PR #2403 discussion for info.
+    """
+
+    @gen.coroutine
+    def sleep_for_60ms():
+        max_thread_count = 0
+        for x in range(60):
+            yield gen.sleep(0.001)
+            thread_count = threading.active_count()
+            if thread_count > max_thread_count:
+                max_thread_count = thread_count
+        raise gen.Return(max_thread_count)
+    original_thread_count = threading.active_count()
+
+    # tcp.TCPConnector()
+    sleep_future = sleep_for_60ms()
+    with pytest.raises(IOError):
+        yield connect("tcp://localhost:28400", 0.052)
+    max_thread_count = yield sleep_future
+    # 2 is the number set by BaseTCPConnector.executor (ThreadPoolExecutor)
+    if PY3:
+        assert max_thread_count <= 2 + original_thread_count
+
+    # tcp.TLSConnector()
+    sleep_future = sleep_for_60ms()
+    with pytest.raises(IOError):
+        yield connect("tls://localhost:28400", 0.052,
+                                 connection_args={'ssl_context': 
get_client_ssl_context()})
+    max_thread_count = yield sleep_future
+    if PY3:
+        assert max_thread_count <= 2 + original_thread_count
+
+
 @gen.coroutine
 def check_inproc_specific(run_client):
     """
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/distributed-1.25.2/distributed/deploy/local.py 
new/distributed-1.25.3/distributed/deploy/local.py
--- old/distributed-1.25.2/distributed/deploy/local.py  2019-01-03 
18:30:24.000000000 +0100
+++ new/distributed-1.25.3/distributed/deploy/local.py  2019-01-31 
19:43:36.000000000 +0100
@@ -8,9 +8,11 @@
 import weakref
 import toolz
 
+from dask.utils import factors
 from tornado import gen
 
 from .cluster import Cluster
+from ..compatibility import get_thread_identity
 from ..core import CommClosedError
 from ..utils import (sync, ignoring, All, silence_logging, LoopRunner,
         log_errors, thread_state, parse_timedelta)
@@ -99,8 +101,7 @@
             self._old_logging_level = silence_logging(level=silence_logs)
         if n_workers is None and threads_per_worker is None:
             if processes:
-                n_workers = _ncores
-                threads_per_worker = 1
+                n_workers, threads_per_worker = nprocesses_nthreads(_ncores)
             else:
                 n_workers = 1
                 threads_per_worker = _ncores
@@ -153,7 +154,11 @@
 
     @property
     def asynchronous(self):
-        return self._asynchronous or getattr(thread_state, 'asynchronous', 
False)
+        return (
+            self._asynchronous or
+            getattr(thread_state, 'asynchronous', False) or
+            hasattr(self.loop, '_thread_identity') and 
self.loop._thread_identity == get_thread_identity()
+        )
 
     def sync(self, func, *args, **kwargs):
         if kwargs.pop('asynchronous', None) or self.asynchronous:
@@ -372,6 +377,34 @@
             return '<unstarted>'
 
 
+def nprocesses_nthreads(n):
+    """
+    The default breakdown of processes and threads for a given number of cores
+
+    Parameters
+    ----------
+    n: int
+        Number of available cores
+
+    Examples
+    --------
+    >>> nprocesses_nthreads(4)
+    (4, 1)
+    >>> nprocesses_nthreads(32)
+    (8, 4)
+
+    Returns
+    -------
+    nprocesses, nthreads
+    """
+    if n <= 4:
+        processes = n
+    else:
+        processes = min(f for f in factors(n) if f >= math.sqrt(n))
+    threads = n // processes
+    return (processes, threads)
+
+
 clusters_to_close = weakref.WeakSet()
 
 
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' 
old/distributed-1.25.2/distributed/deploy/tests/test_local.py 
new/distributed-1.25.3/distributed/deploy/tests/test_local.py
--- old/distributed-1.25.2/distributed/deploy/tests/test_local.py       
2019-01-03 18:30:24.000000000 +0100
+++ new/distributed-1.25.3/distributed/deploy/tests/test_local.py       
2019-01-31 19:43:36.000000000 +0100
@@ -14,7 +14,7 @@
 import pytest
 
 from distributed import Client, Worker, Nanny
-from distributed.deploy.local import LocalCluster
+from distributed.deploy.local import LocalCluster, nprocesses_nthreads
 from distributed.metrics import time
 from distributed.utils_test import (inc, gen_test, slowinc,
                                     assert_cannot_connect,
@@ -32,12 +32,14 @@
 def test_simple(loop):
     with LocalCluster(4, scheduler_port=0, processes=False, silence_logs=False,
                       diagnostics_port=None, loop=loop) as c:
-        with Client(c.scheduler_address, loop=loop) as e:
+        with Client(c) as e:
             x = e.submit(inc, 1)
             x.result()
             assert x.key in c.scheduler.tasks
             assert any(w.data == {x.key: 2} for w in c.workers)
 
+            assert e.loop is c.loop
+
 
 @pytest.mark.skipif('sys.version_info[0] == 2', reason='fork issues')
 def test_close_twice():
@@ -131,7 +133,7 @@
 def test_Client_with_local(loop):
     with LocalCluster(1, scheduler_port=0, silence_logs=False,
                       diagnostics_port=None, loop=loop) as c:
-        with Client(c, loop=loop) as e:
+        with Client(c) as e:
             assert len(e.ncores()) == len(c.workers)
             assert c.scheduler_address in repr(c)
 
@@ -276,7 +278,7 @@
     cluster = yield LocalCluster(0, scheduler_port=0, processes=False,
                                  silence_logs=False, diagnostics_port=None,
                                  loop=loop, asynchronous=True)
-    c = yield Client(cluster, loop=loop, asynchronous=True)
+    c = yield Client(cluster, asynchronous=True)
 
     assert not cluster.workers
 
@@ -492,7 +494,7 @@
     cluster = yield MyCluster(0, scheduler_port=0, processes=False,
                               silence_logs=False, diagnostics_port=None,
                               loop=loop, asynchronous=True)
-    c = yield Client(cluster, loop=loop, asynchronous=True)
+    c = yield Client(cluster, asynchronous=True)
 
     assert not cluster.workers
 
@@ -529,5 +531,29 @@
             assert workers_before != workers_after
 
 
+def test_default_process_thread_breakdown():
+    assert nprocesses_nthreads(1) == (1, 1)
+    assert nprocesses_nthreads(4) == (4, 1)
+    assert nprocesses_nthreads(5) == (5, 1)
+    assert nprocesses_nthreads(8) == (4, 2)
+    assert nprocesses_nthreads(12) in ((6, 2), (4, 3))
+    assert nprocesses_nthreads(20) == (5, 4)
+    assert nprocesses_nthreads(24) in ((6, 4), (8, 3))
+    assert nprocesses_nthreads(32) == (8, 4)
+    assert nprocesses_nthreads(40) in ((8, 5), (10, 4))
+    assert nprocesses_nthreads(80) in ((10, 8), (16, 5))
+
+
+def test_asynchronous_property(loop):
+    with LocalCluster(4, scheduler_port=0, processes=False, silence_logs=False,
+                      diagnostics_port=None, loop=loop) as cluster:
+
+        @gen.coroutine
+        def _():
+            assert cluster.asynchronous
+
+        cluster.sync(_)
+
+
 if sys.version_info >= (3, 5):
     from distributed.deploy.tests.py3_test_deploy import *  # noqa F401
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' 
old/distributed-1.25.2/distributed/diagnostics/progress.py 
new/distributed-1.25.3/distributed/diagnostics/progress.py
--- old/distributed-1.25.2/distributed/diagnostics/progress.py  2019-01-03 
18:30:24.000000000 +0100
+++ new/distributed-1.25.3/distributed/diagnostics/progress.py  2019-01-31 
19:43:36.000000000 +0100
@@ -116,7 +116,7 @@
 
         if key in self.keys and finish == 'forgotten':
             logger.debug("A task was cancelled (%s), stopping progress", key)
-            self.stop(exception=True)
+            self.stop(exception=True, key=key)
 
     def restart(self, scheduler):
         self.stop()
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/distributed-1.25.2/distributed/profile.py 
new/distributed-1.25.3/distributed/profile.py
--- old/distributed-1.25.2/distributed/profile.py       2019-01-03 
18:30:24.000000000 +0100
+++ new/distributed-1.25.3/distributed/profile.py       2019-01-31 
19:43:38.000000000 +0100
@@ -24,6 +24,8 @@
                    'children': {...}}}
     }
 """
+from __future__ import print_function, division, absolute_import
+
 import bisect
 from collections import defaultdict, deque
 import linecache
@@ -299,3 +301,57 @@
         prof = merge(prof, recent)
 
     return prof
+
+
+def plot_figure(data, **kwargs):
+    from bokeh.plotting import ColumnDataSource, figure
+    from bokeh.models import HoverTool
+
+    if 'states' in data:
+        data = toolz.dissoc(data, 'states')
+
+    source = ColumnDataSource(data=data)
+
+    fig = figure(tools='tap', **kwargs)
+    r = fig.quad('left', 'right', 'top', 'bottom', color='color',
+             line_color='black', line_width=2, source=source)
+
+    r.selection_glyph = None
+    r.nonselection_glyph = None
+
+    hover = HoverTool(
+        point_policy="follow_mouse",
+        tooltips="""
+            <div>
+                <span style="font-size: 14px; font-weight: 
bold;">Name:</span>&nbsp;
+                <span style="font-size: 10px; font-family: Monaco, 
monospace;">@name</span>
+            </div>
+            <div>
+                <span style="font-size: 14px; font-weight: 
bold;">Filename:</span>&nbsp;
+                <span style="font-size: 10px; font-family: Monaco, 
monospace;">@filename</span>
+            </div>
+            <div>
+                <span style="font-size: 14px; font-weight: bold;">Line 
number:</span>&nbsp;
+                <span style="font-size: 10px; font-family: Monaco, 
monospace;">@line_number</span>
+            </div>
+            <div>
+                <span style="font-size: 14px; font-weight: 
bold;">Line:</span>&nbsp;
+                <span style="font-size: 10px; font-family: Monaco, 
monospace;">@line</span>
+            </div>
+            <div>
+                <span style="font-size: 14px; font-weight: 
bold;">Time:</span>&nbsp;
+                <span style="font-size: 10px; font-family: Monaco, 
monospace;">@time</span>
+            </div>
+            <div>
+                <span style="font-size: 14px; font-weight: 
bold;">Percentage:</span>&nbsp;
+                <span style="font-size: 10px; font-family: Monaco, 
monospace;">@width</span>
+            </div>
+            """
+    )
+    fig.add_tools(hover)
+
+    fig.xaxis.visible = False
+    fig.yaxis.visible = False
+    fig.grid.visible = False
+
+    return fig, source
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/distributed-1.25.2/distributed/protocol/serialize.py 
new/distributed-1.25.3/distributed/protocol/serialize.py
--- old/distributed-1.25.2/distributed/protocol/serialize.py    2019-01-03 
18:30:24.000000000 +0100
+++ new/distributed-1.25.3/distributed/protocol/serialize.py    2019-01-23 
01:35:56.000000000 +0100
@@ -171,6 +171,9 @@
     ----------
     header: dict
     frames: list of bytes
+    deserializers : Optional[Dict[str, Tuple[Callable, Callable, bool]]]
+        An optional dict mapping a name to a (de)serializer.
+        See `dask_serialize` and `dask_deserialize` for more.
 
     See Also
     --------
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/distributed-1.25.2/distributed/scheduler.py 
new/distributed-1.25.3/distributed/scheduler.py
--- old/distributed-1.25.2/distributed/scheduler.py     2019-01-03 
18:30:24.000000000 +0100
+++ new/distributed-1.25.3/distributed/scheduler.py     2019-01-31 
19:43:36.000000000 +0100
@@ -764,7 +764,6 @@
 
     def __init__(
             self,
-            center=None,
             loop=None,
             delete_interval='500ms',
             synchronize_worker_interval='60s',
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/distributed-1.25.2/distributed/tests/test_client.py 
new/distributed-1.25.3/distributed/tests/test_client.py
--- old/distributed-1.25.2/distributed/tests/test_client.py     2019-01-03 
18:30:24.000000000 +0100
+++ new/distributed-1.25.3/distributed/tests/test_client.py     2019-01-31 
19:43:38.000000000 +0100
@@ -3243,17 +3243,17 @@
 def test_default_get():
     with cluster() as (s, [a, b]):
         pre_get = dask.base.get_scheduler()
-        pre_shuffle = dask.config.get('shuffle', None)
+        pytest.raises(KeyError, dask.config.get, 'shuffle')
         with Client(s['address'], set_as_default=True) as c:
             assert dask.base.get_scheduler() == c.get
             assert dask.config.get('shuffle') == 'tasks'
 
         assert dask.base.get_scheduler() == pre_get
-        assert dask.config.get('shuffle') == pre_shuffle
+        pytest.raises(KeyError, dask.config.get, 'shuffle')
 
         c = Client(s['address'], set_as_default=False)
         assert dask.base.get_scheduler() == pre_get
-        assert dask.config.get('shuffle') == pre_shuffle
+        pytest.raises(KeyError, dask.config.get, 'shuffle')
         c.close()
 
         c = Client(s['address'], set_as_default=True)
@@ -3261,7 +3261,7 @@
         assert dask.base.get_scheduler() == c.get
         c.close()
         assert dask.base.get_scheduler() == pre_get
-        assert dask.config.get('shuffle') == pre_shuffle
+        pytest.raises(KeyError, dask.config.get, 'shuffle')
 
         with Client(s['address']) as c:
             assert dask.base.get_scheduler() == c.get
@@ -4579,6 +4579,22 @@
             ), line
 
 
+def test_quiet_client_close_when_cluster_is_closed_before_client(loop):
+    n_attempts = 5
+    # Trying a few times to reduce the flakiness of the test. Without the bug
+    # fix in #2477 and with 5 attempts, this test passes by chance in about 10%
+    # of the cases.
+    for _ in range(n_attempts):
+        with captured_logger(logging.getLogger('tornado.application')) as 
logger:
+            cluster = LocalCluster(loop=loop)
+            client = Client(cluster, loop=loop)
+            cluster.close()
+            client.close()
+
+        out = logger.getvalue()
+        assert 'CancelledError' not in out
+
+
 @gen_cluster()
 def test_close(s, a, b):
     c = yield Client(s.address, asynchronous=True)
@@ -5469,5 +5485,18 @@
         assert result == 101
 
 
+@gen_cluster(client=True, check_new_threads=False)
+def test_profile_bokeh(c, s, a, b):
+    pytest.importorskip('bokeh.plotting')
+    from bokeh.model import Model
+    yield c.map(slowinc, range(10), delay=0.2)
+    state, figure = yield c.profile(plot=True)
+    assert isinstance(figure, Model)
+
+    with tmpfile('html') as fn:
+        yield c.profile(filename=fn)
+        assert os.path.exists(fn)
+
+
 if sys.version_info >= (3, 5):
     from distributed.tests.py3_test_client import *  # noqa F401
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/distributed-1.25.2/distributed/tests/test_counter.py 
new/distributed-1.25.3/distributed/tests/test_counter.py
--- old/distributed-1.25.2/distributed/tests/test_counter.py    2019-01-03 
18:30:24.000000000 +0100
+++ new/distributed-1.25.3/distributed/tests/test_counter.py    2019-01-25 
19:01:55.000000000 +0100
@@ -11,9 +11,11 @@
     Digest = None
 
 
-@pytest.mark.parametrize('CD,size', [(Counter, lambda d: sum(d.values())),
-                                     pytest.mark.skipif(not Digest, reason="no 
crick library")(
-                                     (Digest, lambda x: x.size()))])
+@pytest.mark.parametrize('CD,size', [
+    (Counter, lambda d: sum(d.values())),
+    pytest.param(Digest, lambda x: x.size(),
+                 marks=pytest.mark.skipif(not Digest, reason="no crick 
library"))
+])
 def test_digest(loop, CD, size):
     c = CD(loop=loop)
     c.add(1)
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' 
old/distributed-1.25.2/distributed/tests/test_resources.py 
new/distributed-1.25.3/distributed/tests/test_resources.py
--- old/distributed-1.25.2/distributed/tests/test_resources.py  2019-01-03 
18:30:24.000000000 +0100
+++ new/distributed-1.25.3/distributed/tests/test_resources.py  2019-01-25 
19:01:55.000000000 +0100
@@ -277,8 +277,10 @@
 
 
 @pytest.mark.parametrize('optimize_graph', [
-    pytest.mark.xfail(True, reason="don't track resources through 
optimization"),
-    pytest.mark.skipif(WINDOWS, False, reason="intermittent failure"),
+    pytest.param(True,
+        marks=pytest.mark.xfail(reason="don't track resources through 
optimization")),
+    pytest.param(False,
+        marks=pytest.mark.skipif(WINDOWS, reason="intermittent failure"))
 ])
 def test_collections_get(client, optimize_graph, s, a, b):
     da = pytest.importorskip('dask.array')
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/distributed-1.25.2/distributed/tests/test_steal.py 
new/distributed-1.25.3/distributed/tests/test_steal.py
--- old/distributed-1.25.2/distributed/tests/test_steal.py      2019-01-03 
18:30:24.000000000 +0100
+++ new/distributed-1.25.3/distributed/tests/test_steal.py      2019-01-25 
19:01:55.000000000 +0100
@@ -468,12 +468,9 @@
       [1],
       [1]]),
 
-    pytest.mark.xfail(([[1, 1, 1, 1, 1, 1, 1],
-      [1, 1], [1, 1], [1, 1],
-      []],
-     [[1, 1, 1, 1, 1],
-      [1, 1], [1, 1], [1, 1],
-      [1, 1]]), reason="Some uncertainty based on executing stolen task")
+    pytest.param([[1, 1, 1, 1, 1, 1, 1], [1, 1], [1, 1], [1, 1], []],
+                 [[1, 1, 1, 1, 1], [1, 1], [1, 1], [1, 1], [1, 1]],
+                 marks=pytest.mark.xfail(reason="Some uncertainty based on 
executing stolen task"))
 ])
 def test_balance(inp, expected):
     test = lambda *args, **kwargs: assert_balanced(inp, expected, *args, 
**kwargs)
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/distributed-1.25.2/distributed/utils_test.py 
new/distributed-1.25.3/distributed/utils_test.py
--- old/distributed-1.25.2/distributed/utils_test.py    2019-01-03 
18:30:24.000000000 +0100
+++ new/distributed-1.25.3/distributed/utils_test.py    2019-01-31 
19:43:36.000000000 +0100
@@ -136,7 +136,7 @@
     start = time()
     while set(_global_clients):
         sleep(0.1)
-        assert time() < start + 5
+        assert time() < start + 10
 
     _cleanup_dangling()
 
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/distributed-1.25.2/distributed.egg-info/PKG-INFO 
new/distributed-1.25.3/distributed.egg-info/PKG-INFO
--- old/distributed-1.25.2/distributed.egg-info/PKG-INFO        2019-01-04 
23:14:53.000000000 +0100
+++ new/distributed-1.25.3/distributed.egg-info/PKG-INFO        2019-01-31 
21:17:27.000000000 +0100
@@ -1,6 +1,6 @@
 Metadata-Version: 1.2
 Name: distributed
-Version: 1.25.2
+Version: 1.25.3
 Summary: Distributed scheduler for Dask
 Home-page: https://distributed.readthedocs.io/en/latest/
 Maintainer: Matthew Rocklin
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/distributed-1.25.2/docs/source/changelog.rst 
new/distributed-1.25.3/docs/source/changelog.rst
--- old/distributed-1.25.2/docs/source/changelog.rst    2019-01-04 
23:13:19.000000000 +0100
+++ new/distributed-1.25.3/docs/source/changelog.rst    2019-01-31 
21:15:58.000000000 +0100
@@ -1,6 +1,26 @@
 Changelog
 =========
 
+1.25.3 - 2019-01-31
+-------------------
+
+-  Fix excess threading on missing connections (:pr:`2403`) `Daniel Farrell`_
+-  Fix typo in doc (:pr:`2457`) `Loïc Estève`_
+-  Start fewer but larger workers with LocalCluster (:pr:`2452`) `Matthew 
Rocklin`_
+-  Check for non-zero ``length`` first in ``read`` loop (:pr:`2465`) `John 
Kirkham`_
+-  DOC: Use of local cluster in script (:pr:`2462`) `Peter Killick`_
+-  DOC/API: Signature for base class write / read (:pr:`2472`) `Tom 
Augspurger`_
+-  Support Pytest 4 in Tests (:pr:`2478`) `Adam Beberg`_
+-  Ensure async behavior in event loop with LocalCluster (:pr:`2484`) `Matthew 
Rocklin`_
+-  Fix spurious CancelledError (:pr:`2485`) `Loïc Estève`_
+-  Properly reset dask.config scheduler and shuffle when closing the client 
(:pr:`2475`) `George Sakkis`_
+-  Make it more explict that resources are per worker. (:pr:`2470`) `Loïc 
Estève`_
+-  Remove references to center (:pr:`2488`)  `Matthew Rocklin`_
+-  Expand client clearing timeout to 10s in testing (:pr:`2493`) `Matthew 
Rocklin`_
+-  Propagate key keyword in progressbar (:pr:`2492`) `Matthew Rocklin`_
+-  Use provided cluster's IOLoop if present in Client (:pr:`2494`) `Matthew 
Rocklin`_
+
+
 1.25.2 - 2019-01-04
 -------------------
 
@@ -897,3 +917,6 @@
 .. _`Stephan Hoyer`: https://github.com/shoyer
 .. _`tjb900`: https://github.com/tjb900
 .. _`Dirk Petersen`: https://github.com/dirkpetersen
+.. _`Daniel Farrell`: https://github.com/danpf
+.. _`George Sakkis`: https://github.com/gsakkis
+.. _`Adam Beberg`: https://github.com/beberg
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' 
old/distributed-1.25.2/docs/source/diagnosing-performance.rst 
new/distributed-1.25.3/docs/source/diagnosing-performance.rst
--- old/distributed-1.25.2/docs/source/diagnosing-performance.rst       
2019-01-03 18:30:24.000000000 +0100
+++ new/distributed-1.25.3/docs/source/diagnosing-performance.rst       
2019-01-31 19:43:38.000000000 +0100
@@ -84,7 +84,9 @@
 
 Users can also query this data directly using the :doc:`Client.profile <api>`
 function.  This will deliver the raw data structure used to produce these
-plots.
+plots.  They can also pass a filename to save the plot as an HTML file
+directly.  Note that this file will have to be served from a webserver like
+``python -m http.server`` to be visible.
 
 The 10ms and 1s parameters can be controlled by the ``profile-interval`` and
 ``profile-cycle-interval`` entries in the config.yaml file.
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/distributed-1.25.2/docs/source/local-cluster.rst 
new/distributed-1.25.3/docs/source/local-cluster.rst
--- old/distributed-1.25.2/docs/source/local-cluster.rst        2019-01-03 
18:30:24.000000000 +0100
+++ new/distributed-1.25.3/docs/source/local-cluster.rst        2019-01-19 
18:17:58.000000000 +0100
@@ -28,6 +28,18 @@
    >>> client
    <Client: scheduler=127.0.0.1:8786 processes=8 cores=8>
 
+.. note::
+
+   Within a Python script you need to start a local cluster in the
+   ``if __name__ == '__main__'`` block:
+
+   .. code-block:: python
+
+      if __name__ == '__main__':
+          cluster = LocalCluster()
+          client = Client(cluster)
+          # Your code follows here
+
 API
 ---
 
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/distributed-1.25.2/docs/source/resources.rst 
new/distributed-1.25.3/docs/source/resources.rst
--- old/distributed-1.25.2/docs/source/resources.rst    2019-01-03 
18:30:24.000000000 +0100
+++ new/distributed-1.25.3/docs/source/resources.rst    2019-01-30 
00:39:16.000000000 +0100
@@ -48,6 +48,34 @@
    final = client.submit(aggregate, processed, resources={'MEMORY': 70e9})
 
 
+Resources are applied separately to each worker process
+-------------------------------------------------------
+
+If you are using ``dask-worker --nprocs <nprocs>`` the resource will be applied
+separately to each of the ``nprocs`` worker processes. Suppose you have 2 GPUs
+on your machine, if you want to use two worker processes, you have 1 GPU per
+worker process so you need to do something like this::
+
+   dask-worker scheduler:8786 --nprocs 2 --resources "GPU=1"
+
+Here is an example that illustrates how to use resources to ensure each task is
+run inside a separate process, which is useful to execute non thread-safe tasks
+or tasks that uses multithreading internally::
+
+   dask-worker scheduler:8786 --nprocs 3 --nthreads 2 --resources "process=1"
+
+With the code below, there will be at most 3 tasks running concurrently and
+each task will run in a separate process:
+
+.. code-block:: python
+
+   from distributed import Client
+   client = Client('scheduler:8786')
+
+   futures = [client.submit(non_thread_safe_function, arg,
+                            resources={'process': 1}) for arg in args]
+
+
 Resources are Abstract
 ----------------------
 
@@ -71,7 +99,7 @@
 
     x = dd.read_csv(...)
     y = x.map_partitions(func1)
-    z = y.map_parititons(func2)
+    z = y.map_partitions(func2)
 
     z.compute(resources={tuple(y.__dask_keys__()): {'GPU': 1})
 
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/distributed-1.25.2/docs/source/serialization.rst 
new/distributed-1.25.3/docs/source/serialization.rst
--- old/distributed-1.25.2/docs/source/serialization.rst        2019-01-03 
18:30:24.000000000 +0100
+++ new/distributed-1.25.3/docs/source/serialization.rst        2019-01-23 
01:35:56.000000000 +0100
@@ -1,3 +1,5 @@
+.. _serialization:
+
 Serialization
 =============
 
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/distributed-1.25.2/docs/source/setup.rst 
new/distributed-1.25.3/docs/source/setup.rst
--- old/distributed-1.25.2/docs/source/setup.rst        2019-01-03 
18:30:24.000000000 +0100
+++ new/distributed-1.25.3/docs/source/setup.rst        2019-01-31 
19:43:36.000000000 +0100
@@ -21,16 +21,16 @@
 node that hosts ``dask-scheduler``::
 
    $ dask-worker 192.168.0.1:8786
-   Start worker at:            192.168.0.2:12345
-   Registered with center at:  192.168.0.1:8786
+   Start worker at:               192.168.0.2:12345
+   Registered with Scheduler at:  192.168.0.1:8786
 
    $ dask-worker 192.168.0.1:8786
-   Start worker at:            192.168.0.3:12346
-   Registered with center at:  192.168.0.1:8786
+   Start worker at:               192.168.0.3:12346
+   Registered with Scheduler at:  192.168.0.1:8786
 
    $ dask-worker 192.168.0.1:8786
-   Start worker at:            192.168.0.4:12347
-   Registered with center at:  192.168.0.1:8786
+   Start worker at:               192.168.0.4:12347
+   Registered with Scheduler at:  192.168.0.1:8786
 
 There are various mechanisms to deploy these executables on a cluster, ranging
 from manualy SSH-ing into all of the nodes to more automated systems like


Reply via email to