Hello community,

here is the log from the commit of package python-joblib for openSUSE:Factory 
checked in at 2020-07-18 21:02:15
++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Comparing /work/SRC/openSUSE:Factory/python-joblib (Old)
 and      /work/SRC/openSUSE:Factory/.python-joblib.new.3592 (New)
++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++

Package is "python-joblib"

Sat Jul 18 21:02:15 2020 rev:12 rq:821624 version:0.16.0

Changes:
--------
--- /work/SRC/openSUSE:Factory/python-joblib/python-joblib.changes      
2020-06-24 15:48:00.528187576 +0200
+++ /work/SRC/openSUSE:Factory/.python-joblib.new.3592/python-joblib.changes    
2020-07-18 21:02:50.151600709 +0200
@@ -1,0 +2,19 @@
+Sat Jul 18 09:12:26 UTC 2020 - Dirk Mueller <[email protected]>
+
+- update to 0.16.0
+  - Fix a problem in the constructors of of Parallel backends classes that
+    inherit from the `AutoBatchingMixin` that prevented the dask backend to
+    properly batch short tasks.
+    https://github.com/joblib/joblib/pull/1062
+  - Fix a problem in the way the joblib dask backend batches calls that would
+    badly interact with the dask callable pickling cache and lead to wrong
+    results or errors.
+    https://github.com/joblib/joblib/pull/1055
+  - Prevent a dask.distributed bug from surfacing in joblib's dask backend
+    during nested Parallel calls (due to joblib's auto-scattering feature)
+    https://github.com/joblib/joblib/pull/1061
+  - Workaround for a race condition after Parallel calls with the dask backend
+    that would cause low level warnings from asyncio coroutines:
+    https://github.com/joblib/joblib/pull/1078
+
+-------------------------------------------------------------------

Old:
----
  joblib-0.15.1.tar.gz

New:
----
  joblib-0.16.0.tar.gz

++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++

Other differences:
------------------
++++++ python-joblib.spec ++++++
--- /var/tmp/diff_new_pack.ZPlVl6/_old  2020-07-18 21:02:52.195602906 +0200
+++ /var/tmp/diff_new_pack.ZPlVl6/_new  2020-07-18 21:02:52.199602910 +0200
@@ -17,8 +17,9 @@
 
 
 %{?!python_module:%define python_module() python-%{**} python3-%{**}}
+%global skip_python2 1
 Name:           python-joblib
-Version:        0.15.1
+Version:        0.16.0
 Release:        0
 Summary:        Module for using Python functions as pipeline jobs
 License:        BSD-3-Clause

++++++ joblib-0.15.1.tar.gz -> joblib-0.16.0.tar.gz ++++++
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/joblib-0.15.1/.codecov.yml 
new/joblib-0.16.0/.codecov.yml
--- old/joblib-0.15.1/.codecov.yml      2020-05-04 15:38:39.000000000 +0200
+++ new/joblib-0.16.0/.codecov.yml      2020-07-01 18:02:32.000000000 +0200
@@ -1,2 +1,9 @@
 codecov:
   token: 1b7eb264-fd77-469a-829a-e9cd5efd7cef
+coverage:
+  status:
+    project:
+      default:
+        # Allow coverage to drop by up to 1% in a PR before marking it as
+        # failed
+        threshold: '1%'
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/joblib-0.15.1/CHANGES.rst 
new/joblib-0.16.0/CHANGES.rst
--- old/joblib-0.15.1/CHANGES.rst       2020-05-16 14:38:07.000000000 +0200
+++ new/joblib-0.16.0/CHANGES.rst       2020-07-01 18:04:07.000000000 +0200
@@ -1,6 +1,27 @@
 Latest changes
 ==============
 
+Release 0.16.0
+--------------
+
+- Fix a problem in the constructors of of Parallel backends classes that
+  inherit from the `AutoBatchingMixin` that prevented the dask backend to
+  properly batch short tasks.
+  https://github.com/joblib/joblib/pull/1062
+
+- Fix a problem in the way the joblib dask backend batches calls that would
+  badly interact with the dask callable pickling cache and lead to wrong
+  results or errors.
+  https://github.com/joblib/joblib/pull/1055
+
+- Prevent a dask.distributed bug from surfacing in joblib's dask backend
+  during nested Parallel calls (due to joblib's auto-scattering feature)
+  https://github.com/joblib/joblib/pull/1061
+
+- Workaround for a race condition after Parallel calls with the dask backend
+  that would cause low level warnings from asyncio coroutines:
+  https://github.com/joblib/joblib/pull/1078
+
 Release 0.15.1
 --------------
 
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/joblib-0.15.1/PKG-INFO new/joblib-0.16.0/PKG-INFO
--- old/joblib-0.15.1/PKG-INFO  2020-05-16 14:39:28.057508000 +0200
+++ new/joblib-0.16.0/PKG-INFO  2020-07-01 18:05:31.295775700 +0200
@@ -1,6 +1,6 @@
 Metadata-Version: 1.2
 Name: joblib
-Version: 0.15.1
+Version: 0.16.0
 Summary: Lightweight pipelining: using Python functions as pipeline jobs.
 Home-page: https://joblib.readthedocs.io
 Author: Gael Varoquaux
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/joblib-0.15.1/azure-pipelines.yml 
new/joblib-0.16.0/azure-pipelines.yml
--- old/joblib-0.15.1/azure-pipelines.yml       2020-05-16 14:37:43.000000000 
+0200
+++ new/joblib-0.16.0/azure-pipelines.yml       2020-07-01 11:58:50.000000000 
+0200
@@ -38,10 +38,12 @@
         PYTHON_VERSION: "pypy3"
         LOKY_MAX_CPU_COUNT: "2"
 
-      linux_py38:
+      linux_py38_distributed:
+        # To be updated regularly to use the most recent versions of the
+        # dependencies.
         imageName: 'ubuntu-latest'
         PYTHON_VERSION: "3.8"
-        EXTRA_CONDA_PACKAGES: "numpy=1.18"
+        EXTRA_CONDA_PACKAGES: "numpy=1.18 distributed=2.17"
       linux_py37_sklearn_tests:
         imageName: 'ubuntu-latest'
         PYTHON_VERSION: "3.7"
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/joblib-0.15.1/joblib/__init__.py 
new/joblib-0.16.0/joblib/__init__.py
--- old/joblib-0.15.1/joblib/__init__.py        2020-05-16 14:38:22.000000000 
+0200
+++ new/joblib-0.16.0/joblib/__init__.py        2020-07-01 18:04:07.000000000 
+0200
@@ -106,7 +106,7 @@
 # Dev branch marker is: 'X.Y.dev' or 'X.Y.devN' where N is an integer.
 # 'X.Y.dev0' is the canonical version of 'X.Y.dev'
 #
-__version__ = '0.15.1'
+__version__ = '0.16.0'
 
 
 import os
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/joblib-0.15.1/joblib/_dask.py 
new/joblib-0.16.0/joblib/_dask.py
--- old/joblib-0.15.1/joblib/_dask.py   2020-05-15 09:32:16.000000000 +0200
+++ new/joblib-0.16.0/joblib/_dask.py   2020-07-01 18:02:32.000000000 +0200
@@ -4,6 +4,7 @@
 import concurrent.futures
 import contextlib
 
+import time
 from uuid import uuid4
 import weakref
 
@@ -89,30 +90,45 @@
 
 def _funcname(x):
     try:
-        if isinstance(x, BatchedCalls):
-            x = x.items[0][0]
+        if isinstance(x, list):
+            x = x[0][0]
     except Exception:
         pass
     return funcname(x)
 
 
-class Batch(object):
+def _make_tasks_summary(tasks):
+    """Summarize of list of (func, args, kwargs) function calls"""
+    unique_funcs = {func for func, args, kwargs in tasks}
+
+    if len(unique_funcs) == 1:
+        mixed = False
+    else:
+        mixed = True
+    return len(tasks), mixed, _funcname(tasks)
+
+
+class Batch:
+    """dask-compatible wrapper that executes a batch of tasks"""
     def __init__(self, tasks):
-        self.tasks = tasks
+        # collect some metadata from the tasks to ease Batch calls
+        # introspection when debugging
+        self._num_tasks, self._mixed, self._funcname = _make_tasks_summary(
+            tasks
+        )
 
-    def __call__(self, *data):
+    def __call__(self, tasks=None):
         results = []
         with parallel_backend('dask'):
-            for func, args, kwargs in self.tasks:
-                args = [a(data) if isinstance(a, itemgetter) else a
-                        for a in args]
-                kwargs = {k: v(data) if isinstance(v, itemgetter) else v
-                          for (k, v) in kwargs.items()}
+            for func, args, kwargs in tasks:
                 results.append(func(*args, **kwargs))
         return results
 
-    def __reduce__(self):
-        return Batch, (self.tasks,)
+    def __repr__(self):
+        descr = f"batch_of_{self._funcname}_{self._num_tasks}_calls"
+        if self._mixed:
+            descr = "mixed_" + descr
+        return descr
 
 
 def _joblib_probe_task():
@@ -120,7 +136,7 @@
     pass
 
 
-class DaskDistributedBackend(ParallelBackendBase, AutoBatchingMixin):
+class DaskDistributedBackend(AutoBatchingMixin, ParallelBackendBase):
     MIN_IDEAL_BATCH_DURATION = 0.2
     MAX_IDEAL_BATCH_DURATION = 1.0
     supports_timeout = True
@@ -128,6 +144,8 @@
     def __init__(self, scheduler_host=None, scatter=None,
                  client=None, loop=None, wait_for_workers_timeout=10,
                  **submit_kwargs):
+        super().__init__()
+
         if distributed is None:
             msg = ("You are trying to use 'dask' as a joblib parallel backend "
                    "but dask is not installed. Please install dask "
@@ -141,14 +159,14 @@
             else:
                 try:
                     client = get_client()
-                except ValueError:
+                except ValueError as e:
                     msg = ("To use Joblib with Dask first create a Dask Client"
                            "\n\n"
                            "    from dask.distributed import Client\n"
                            "    client = Client()\n"
                            "or\n"
                            "    client = Client('scheduler-address:8786')")
-                    raise ValueError(msg)
+                    raise ValueError(msg) from e
 
         self.client = client
 
@@ -195,6 +213,7 @@
         return DaskDistributedBackend(client=self.client), -1
 
     def configure(self, n_jobs=1, parallel=None, **backend_args):
+        self.parallel = parallel
         return self.effective_n_jobs(n_jobs)
 
     def start_call(self):
@@ -206,6 +225,10 @@
         # The explicit call to clear is required to break a cycling reference
         # to the futures.
         self._continue = False
+        # wait for the future collection routine (self._backend._collect) to
+        # finish in order to limit asyncio warnings due to aborting _collect
+        # during a following backend termination call
+        time.sleep(0.01)
         self.call_data_futures.clear()
 
     def effective_n_jobs(self, n_jobs):
@@ -219,7 +242,7 @@
         try:
             self.client.submit(_joblib_probe_task).result(
                 timeout=self.wait_for_workers_timeout)
-        except _TimeoutError:
+        except _TimeoutError as e:
             error_msg = (
                 "DaskDistributedBackend has no worker after {} seconds. "
                 "Make sure that workers are started and can properly connect "
@@ -228,11 +251,10 @@
                 "parallel_backend('dask', wait_for_workers_timeout={})"
             ).format(self.wait_for_workers_timeout,
                      max(10, 2 * self.wait_for_workers_timeout))
-            raise TimeoutError(error_msg)
+            raise TimeoutError(error_msg) from e
         return sum(self.client.ncores().values())
 
     async def _to_func_args(self, func):
-        collected_futures = []
         itemgetters = dict()
 
         # Futures that are dynamically generated during a single call to
@@ -252,24 +274,29 @@
                     try:
                         f = call_data_futures[arg]
                     except KeyError:
+                        pass
+                    if f is None:
                         if is_weakrefable(arg) and sizeof(arg) > 1e3:
                             # Automatically scatter large objects to some of
                             # the workers to avoid duplicated data transfers.
                             # Rely on automated inter-worker data stealing if
                             # more workers need to reuse this data
                             # concurrently.
+                            # set hash=False - nested scatter calls (i.e
+                            # calling client.scatter inside a dask worker)
+                            # using hash=True often raise CancelledError,
+                            # see dask/distributed#3703
                             [f] = await self.client.scatter(
                                 [arg],
-                                asynchronous=True
+                                asynchronous=True,
+                                hash=False
                             )
                             call_data_futures[arg] = f
 
                 if f is not None:
-                    getter = itemgetter(len(collected_futures))
-                    collected_futures.append(f)
-                    itemgetters[arg_id] = getter
-                    arg = getter
-                out.append(arg)
+                    out.append(f)
+                else:
+                    out.append(arg)
             return out
 
         tasks = []
@@ -279,21 +306,19 @@
                               await maybe_to_futures(kwargs.values())))
             tasks.append((f, args, kwargs))
 
-        if not collected_futures:
-            return func, ()
-        return (Batch(tasks), collected_futures)
+        return (Batch(tasks), tasks)
 
     def apply_async(self, func, callback=None):
-        key = '%s-batch-%s' % (_funcname(func), uuid4().hex)
 
         cf_future = concurrent.futures.Future()
         cf_future.get = cf_future.result  # achieve AsyncResult API
 
         async def f(func, callback):
-            func, args = await self._to_func_args(func)
+            batch, tasks = await self._to_func_args(func)
+            key = f'{repr(batch)}-{uuid4().hex}'
 
             dask_future = self.client.submit(
-                func, *args, key=key, **self.submit_kwargs
+                batch, tasks=tasks, key=key, **self.submit_kwargs
             )
             self.waiting_futures.add(dask_future)
             self._callbacks[dask_future] = callback
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/joblib-0.15.1/joblib/_multiprocessing_helpers.py 
new/joblib-0.16.0/joblib/_multiprocessing_helpers.py
--- old/joblib-0.15.1/joblib/_multiprocessing_helpers.py        2020-05-04 
15:38:39.000000000 +0200
+++ new/joblib-0.16.0/joblib/_multiprocessing_helpers.py        2020-07-01 
11:30:33.000000000 +0200
@@ -36,10 +36,10 @@
                 _sem = SemLock(0, 0, 1, name=name, unlink=True)
                 del _sem  # cleanup
                 break
-            except FileExistsError:  # pragma: no cover
+            except FileExistsError as e:  # pragma: no cover
                 if i >= 99:
                     raise FileExistsError(
-                        'cannot find name for semaphore')
+                        'cannot find name for semaphore') from e
     except (FileExistsError, AttributeError, ImportError, OSError) as e:
         mp = None
         warnings.warn('%s.  joblib will operate in serial mode' % (e,))
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/joblib-0.15.1/joblib/_parallel_backends.py 
new/joblib-0.16.0/joblib/_parallel_backends.py
--- old/joblib-0.15.1/joblib/_parallel_backends.py      2020-05-15 
09:32:16.000000000 +0200
+++ new/joblib-0.16.0/joblib/_parallel_backends.py      2020-07-01 
11:30:33.000000000 +0200
@@ -31,7 +31,9 @@
     supports_inner_max_num_threads = False
     nesting_level = None
 
-    def __init__(self, nesting_level=None, inner_max_num_threads=None):
+    def __init__(self, nesting_level=None, inner_max_num_threads=None,
+                 **kwargs):
+        super().__init__(**kwargs)
         self.nesting_level = nesting_level
         self.inner_max_num_threads = inner_max_num_threads
 
@@ -276,9 +278,9 @@
     _DEFAULT_SMOOTHED_BATCH_DURATION = 0.0
 
     def __init__(self, **kwargs):
+        super().__init__(**kwargs)
         self._effective_batch_size = self._DEFAULT_EFFECTIVE_BATCH_SIZE
         self._smoothed_batch_duration = self._DEFAULT_SMOOTHED_BATCH_DURATION
-        super(AutoBatchingMixin, self).__init__(**kwargs)
 
     def compute_batch_size(self):
         """Determine the optimal batch size"""
@@ -538,8 +540,8 @@
         AsyncResults.get from multiprocessing."""
         try:
             return future.result(timeout=timeout)
-        except CfTimeoutError:
-            raise TimeoutError()
+        except CfTimeoutError as e:
+            raise TimeoutError from e
 
     def terminate(self):
         if self._workers is not None:
@@ -591,11 +593,11 @@
     def __call__(self, *args, **kwargs):
         try:
             return self.func(*args, **kwargs)
-        except KeyboardInterrupt:
+        except KeyboardInterrupt as e:
             # We capture the KeyboardInterrupt and reraise it as
             # something different, as multiprocessing does not
             # interrupt processing for a KeyboardInterrupt
-            raise WorkerInterrupt()
+            raise WorkerInterrupt() from e
         except BaseException:
             # Rely on Python 3 built-in Remote Traceback reporting
             raise
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/joblib-0.15.1/joblib/disk.py 
new/joblib-0.16.0/joblib/disk.py
--- old/joblib-0.15.1/joblib/disk.py    2020-05-14 18:47:29.000000000 +0200
+++ new/joblib-0.16.0/joblib/disk.py    2020-07-01 11:30:33.000000000 +0200
@@ -46,10 +46,10 @@
     units = dict(K=kilo, M=kilo ** 2, G=kilo ** 3)
     try:
         size = int(units[text[-1]] * float(text[:-1]))
-    except (KeyError, ValueError):
+    except (KeyError, ValueError) as e:
         raise ValueError(
             "Invalid literal for size give: %s (type %s) should be "
-            "alike '10G', '500M', '50K'." % (text, type(text)))
+            "alike '10G', '500M', '50K'." % (text, type(text))) from e
     return size
 
 
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/joblib-0.15.1/joblib/func_inspect.py 
new/joblib-0.16.0/joblib/func_inspect.py
--- old/joblib-0.15.1/joblib/func_inspect.py    2020-05-04 15:38:39.000000000 
+0200
+++ new/joblib-0.16.0/joblib/func_inspect.py    2020-07-01 11:30:33.000000000 
+0200
@@ -253,14 +253,14 @@
             else:
                 try:
                     arg_dict[arg_name] = arg_defaults[position]
-                except (IndexError, KeyError):
+                except (IndexError, KeyError) as e:
                     # Missing argument
                     raise ValueError(
                         'Wrong number of arguments for %s:\n'
                         '     %s was called.'
                         % (_signature_str(name, arg_spec),
                            _function_called_str(name, args, kwargs))
-                    )
+                    ) from e
 
     varkwargs = dict()
     for arg_name, arg_value in sorted(kwargs.items()):
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/joblib-0.15.1/joblib/numpy_pickle.py 
new/joblib-0.16.0/joblib/numpy_pickle.py
--- old/joblib-0.15.1/joblib/numpy_pickle.py    2020-05-04 15:38:39.000000000 
+0200
+++ new/joblib-0.16.0/joblib/numpy_pickle.py    2020-06-15 16:56:41.000000000 
+0200
@@ -100,7 +100,7 @@
                                                   'zerosize_ok'],
                                            buffersize=buffersize,
                                            order=self.order):
-                pickler.file_handle.write(chunk.tostring('C'))
+                pickler.file_handle.write(chunk.tobytes('C'))
 
     def read_array(self, unpickler):
         """Read array from unpickler file handle.
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/joblib-0.15.1/joblib/numpy_pickle_compat.py 
new/joblib-0.16.0/joblib/numpy_pickle_compat.py
--- old/joblib-0.15.1/joblib/numpy_pickle_compat.py     2020-05-04 
15:38:39.000000000 +0200
+++ new/joblib-0.16.0/joblib/numpy_pickle_compat.py     2020-06-15 
16:56:41.000000000 +0200
@@ -126,7 +126,7 @@
     retrieve it.
     The reason that we store the raw buffer data of the array and
     the meta information, rather than array representation routine
-    (tostring) is that it enables us to use completely the strided
+    (tobytes) is that it enables us to use completely the strided
     model to avoid memory copies (a and a.T store as fast). In
     addition saving the heavy information separately can avoid
     creating large temporary buffers when unpickling data with
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/joblib-0.15.1/joblib/parallel.py 
new/joblib-0.16.0/joblib/parallel.py
--- old/joblib-0.15.1/joblib/parallel.py        2020-05-15 09:32:16.000000000 
+0200
+++ new/joblib-0.16.0/joblib/parallel.py        2020-07-01 17:19:11.000000000 
+0200
@@ -60,12 +60,12 @@
     try:
         from ._dask import DaskDistributedBackend
         register_parallel_backend('dask', DaskDistributedBackend)
-    except ImportError:
+    except ImportError as e:
         msg = ("To use the dask.distributed backend you must install both "
                "the `dask` and distributed modules.\n\n"
                "See https://dask.pydata.org/en/latest/install.html for more "
                "information.")
-        raise ImportError(msg)
+        raise ImportError(msg) from e
 
 
 EXTERNAL_BACKENDS = {
@@ -679,9 +679,9 @@
         else:
             try:
                 backend_factory = BACKENDS[backend]
-            except KeyError:
+            except KeyError as e:
                 raise ValueError("Invalid backend: %s, expected one of %r"
-                                 % (backend, sorted(BACKENDS.keys())))
+                                 % (backend, sorted(BACKENDS.keys()))) from e
             backend = backend_factory(nesting_level=nesting_level)
 
         if (require == 'sharedmem' and
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' 
old/joblib-0.15.1/joblib/test/data/create_numpy_pickle.py 
new/joblib-0.16.0/joblib/test/data/create_numpy_pickle.py
--- old/joblib-0.15.1/joblib/test/data/create_numpy_pickle.py   2020-05-04 
15:38:39.000000000 +0200
+++ new/joblib-0.16.0/joblib/test/data/create_numpy_pickle.py   2020-06-15 
16:56:41.000000000 +0200
@@ -86,10 +86,7 @@
                  np.arange(5, dtype=np.dtype('<f8')),
                  np.array([1, 'abc', {'a': 1, 'b': 2}], dtype='O'),
                  # all possible bytes as a byte string
-                 # .tostring actually returns bytes and is a
-                 # compatibility alias for .tobytes which was
-                 # added in 1.9.0
-                 np.arange(256, dtype=np.uint8).tostring(),
+                 np.arange(256, dtype=np.uint8).tobytes(),
                  np.matrix([0, 1, 2], dtype=np.dtype('<i8')),
                  # unicode string with non-ascii chars
                  u"C'est l'\xe9t\xe9 !"]
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/joblib-0.15.1/joblib/test/test_dask.py 
new/joblib-0.16.0/joblib/test/test_dask.py
--- old/joblib-0.15.1/joblib/test/test_dask.py  2020-05-15 09:32:16.000000000 
+0200
+++ new/joblib-0.16.0/joblib/test/test_dask.py  2020-07-01 16:57:44.000000000 
+0200
@@ -3,14 +3,15 @@
 
 import pytest
 from random import random
+from uuid import uuid4
 from time import sleep
 
 from .. import Parallel, delayed, parallel_backend
-from ..parallel import ThreadingBackend
+from ..parallel import ThreadingBackend, AutoBatchingMixin
 from .._dask import DaskDistributedBackend
 
 distributed = pytest.importorskip('distributed')
-from distributed import Client, LocalCluster
+from distributed import Client, LocalCluster, get_client
 from distributed.metrics import time
 from distributed.utils_test import cluster, inc
 
@@ -25,6 +26,15 @@
         raise ValueError("condition evaluated to True")
 
 
+def count_events(event_name, client):
+    worker_events = client.run(lambda dask_worker: dask_worker.log)
+    event_counts = {}
+    for w, events in worker_events.items():
+        event_counts[w] = len([event for event in list(events)
+                               if event[1] == event_name])
+    return event_counts
+
+
 def test_simple(loop):
     with cluster() as (s, [a, b]):
         with Client(s['address'], loop=loop) as client:  # noqa: F841
@@ -40,6 +50,30 @@
                 assert seq == [inc(i) for i in range(10)]
 
 
+def test_dask_backend_uses_autobatching(loop):
+    assert (DaskDistributedBackend.compute_batch_size
+            is AutoBatchingMixin.compute_batch_size)
+
+    with cluster() as (s, [a, b]):
+        with Client(s['address'], loop=loop) as client:  # noqa: F841
+            with parallel_backend('dask') as (ba, _):
+                with Parallel() as parallel:
+                    # The backend should be initialized with a default
+                    # batch size of 1:
+                    backend = parallel._backend
+                    assert isinstance(backend, DaskDistributedBackend)
+                    assert backend.parallel is parallel
+                    assert backend._effective_batch_size == 1
+
+                    # Launch many short tasks that should trigger
+                    # auto-batching:
+                    parallel(
+                        delayed(lambda: None)()
+                        for _ in range(int(1e4))
+                    )
+                    assert backend._effective_batch_size > 10
+
+
 def random2():
     return random()
 
@@ -52,20 +86,85 @@
                 assert x != y
 
 
-def test_dask_funcname(loop):
[email protected]("mixed", [True, False])
+def test_dask_funcname(loop, mixed):
+    from joblib._dask import Batch
+    if not mixed:
+        tasks = [delayed(inc)(i) for i in range(4)]
+        batch_repr = 'batch_of_inc_4_calls'
+    else:
+        tasks = [
+            delayed(abs)(i) if i % 2 else delayed(inc)(i) for i in range(4)
+        ]
+        batch_repr = 'mixed_batch_of_inc_4_calls'
+
+    assert repr(Batch(tasks)) == batch_repr
+
     with cluster() as (s, [a, b]):
         with Client(s['address'], loop=loop) as client:
             with parallel_backend('dask') as (ba, _):
-                x, y = Parallel()(delayed(inc)(i) for i in range(2))
+                _ = Parallel(batch_size=2, pre_dispatch='all')(tasks)
 
             def f(dask_scheduler):
                 return list(dask_scheduler.transition_log)
+            batch_repr = batch_repr.replace('4', '2')
             log = client.run_on_scheduler(f)
-            assert all(tup[0].startswith('inc-batch') for tup in log)
+            assert all('batch_of_inc' in tup[0] for tup in log)
 
 
-def add5(a, b, c, d=0, e=0):
-    return a + b + c + d + e
+def test_no_undesired_distributed_cache_hit(loop):
+    # Dask has a pickle cache for callables that are called many times. Because
+    # the dask backends used to wrapp both the functions and the arguments
+    # under instances of the Batch callable class this caching mechanism could
+    # lead to bugs as described in: https://github.com/joblib/joblib/pull/1055
+    # The joblib-dask backend has been refactored to avoid bundling the
+    # arguments as an attribute of the Batch instance to avoid this problem.
+    # This test serves as non-regression problem.
+
+    # Use a large number of input arguments to give the AutoBatchingMixin
+    # enough tasks to kick-in.
+    lists = [[] for _ in range(100)]
+    np = pytest.importorskip('numpy')
+    X = np.arange(int(1e6))
+
+    def isolated_operation(list_, X=None):
+        list_.append(uuid4().hex)
+        return list_
+
+    cluster = LocalCluster(n_workers=1, threads_per_worker=2)
+    client = Client(cluster)
+    try:
+        with parallel_backend('dask') as (ba, _):
+            # dispatches joblib.parallel.BatchedCalls
+            res = Parallel()(
+                delayed(isolated_operation)(list_) for list_ in lists
+            )
+
+        # The original arguments should not have been mutated as the mutation
+        # happens in the dask worker process.
+        assert lists == [[] for _ in range(100)]
+
+        # Here we did not pass any large numpy array as argument to
+        # isolated_operation so no scattering event should happen under the
+        # hood.
+        counts = count_events('receive-from-scatter', client)
+        assert sum(counts.values()) == 0
+        assert all([len(r) == 1 for r in res])
+
+        with parallel_backend('dask') as (ba, _):
+            # Append a large array which will be scattered by dask, and
+            # dispatch joblib._dask.Batch
+            res = Parallel()(
+                delayed(isolated_operation)(list_, X=X) for list_ in lists
+            )
+
+        # This time, auto-scattering should have kicked it.
+        counts = count_events('receive-from-scatter', client)
+        assert sum(counts.values()) > 0
+        assert all([len(r) == 1 for r in res])
+    finally:
+        client.close()
+        cluster.close()
 
 
 class CountSerialized(object):
@@ -83,6 +182,10 @@
         return (CountSerialized, (self.x,))
 
 
+def add5(a, b, c, d=0, e=0):
+    return a + b + c + d + e
+
+
 def test_manual_scatter(loop):
     x = CountSerialized(1)
     y = CountSerialized(2)
@@ -110,7 +213,11 @@
     # Scattered variables only serialized once
     assert x.count == 1
     assert y.count == 1
-    assert z.count == 4
+    # Depending on the version of distributed, the unscattered z variable
+    # is either pickled 4 or 6 times, possibly because of the memoization
+    # of objects that appear several times in the arguments of a delayed
+    # task.
+    assert z.count in (4, 6)
 
 
 def test_auto_scatter(loop):
@@ -119,14 +226,6 @@
     data2 = np.ones(int(1e4), dtype=np.uint8)
     data_to_process = ([data1] * 3) + ([data2] * 3)
 
-    def count_events(event_name, client):
-        worker_events = client.run(lambda dask_worker: dask_worker.log)
-        event_counts = {}
-        for w, events in worker_events.items():
-            event_counts[w] = len([event for event in list(events)
-                                   if event[1] == event_name])
-        return event_counts
-
     with cluster() as (s, [a, b]):
         with Client(s['address'], loop=loop) as client:
             with parallel_backend('dask') as (ba, _):
@@ -152,6 +251,36 @@
             assert counts[b['address']] == 0
 
 
[email protected]("retry_no", list(range(2)))
+def test_nested_scatter(loop, retry_no):
+
+    np = pytest.importorskip('numpy')
+
+    NUM_INNER_TASKS = 10
+    NUM_OUTER_TASKS = 10
+
+    def my_sum(x, i, j):
+        return np.sum(x)
+
+    def outer_function_joblib(array, i):
+        client = get_client()  # noqa
+        with parallel_backend("dask"):
+            results = Parallel()(
+                delayed(my_sum)(array[j:], i, j) for j in range(
+                    NUM_INNER_TASKS)
+            )
+        return sum(results)
+
+    with cluster() as (s, [a, b]):
+        with Client(s['address'], loop=loop) as _:
+            with parallel_backend("dask"):
+                my_array = np.ones(10000)
+                _ = Parallel()(
+                    delayed(outer_function_joblib)(
+                        my_array[i:], i) for i in range(NUM_OUTER_TASKS)
+                )
+
+
 def test_nested_backend_context_manager(loop):
     def get_nested_pids():
         pids = set(Parallel(n_jobs=2)(delayed(os.getpid)() for _ in range(2)))
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/joblib-0.15.1/joblib/test/test_numpy_pickle.py 
new/joblib-0.16.0/joblib/test/test_numpy_pickle.py
--- old/joblib-0.15.1/joblib/test/test_numpy_pickle.py  2020-05-16 
14:37:43.000000000 +0200
+++ new/joblib-0.16.0/joblib/test/test_numpy_pickle.py  2020-06-15 
16:56:41.000000000 +0200
@@ -340,10 +340,7 @@
                      np.arange(5, dtype=np.dtype('<f8')),
                      np.arange(5, dtype=np.dtype('>f8')),
                      np.array([1, 'abc', {'a': 1, 'b': 2}], dtype='O'),
-                     # .tostring actually returns bytes and is a
-                     # compatibility alias for .tobytes which was
-                     # added in 1.9.0
-                     np.arange(256, dtype=np.uint8).tostring(),
+                     np.arange(256, dtype=np.uint8).tobytes(),
                      # np.matrix is a subclass of np.ndarray, here we want
                      # to verify this type of object is correctly unpickled
                      # among versions.
@@ -436,10 +433,7 @@
     expected_list = [np.arange(5, dtype=np.dtype('<i8')),
                      np.arange(5, dtype=np.dtype('<f8')),
                      np.array([1, 'abc', {'a': 1, 'b': 2}], dtype='O'),
-                     # .tostring actually returns bytes and is a
-                     # compatibility alias for .tobytes which was
-                     # added in 1.9.0
-                     np.arange(256, dtype=np.uint8).tostring(),
+                     np.arange(256, dtype=np.uint8).tobytes(),
                      # np.matrix is a subclass of np.ndarray, here we want
                      # to verify this type of object is correctly unpickled
                      # among versions.
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/joblib-0.15.1/joblib/test/test_parallel.py 
new/joblib-0.16.0/joblib/test/test_parallel.py
--- old/joblib-0.15.1/joblib/test/test_parallel.py      2020-05-14 
18:47:29.000000000 +0200
+++ new/joblib-0.16.0/joblib/test/test_parallel.py      2020-07-01 
11:58:50.000000000 +0200
@@ -31,6 +31,7 @@
 from joblib.testing import (parametrize, raises, check_subprocess_call,
                             skipif, SkipTest, warns)
 
+from joblib.externals.loky.process_executor import TerminatedWorkerError
 
 from queue import Queue
 
@@ -530,6 +531,7 @@
 
 @with_multiprocessing
 @parametrize('backend', PARALLEL_BACKENDS)
[email protected](reason="https://github.com/joblib/loky/pull/255";)
 def test_nested_exception_dispatch(backend):
     """Ensure errors for nested joblib cases gets propagated
 
@@ -1468,8 +1470,18 @@
     # saturating the operating system resources by creating a unbounded number
     # of threads.
     with parallel_backend(backend, n_jobs=2):
-        with raises(RecursionError):
+        with raises(BaseException) as excinfo:
             _recursive_parallel()
+    exc = excinfo.value
+    if backend == "loky" and isinstance(exc, TerminatedWorkerError):
+        # The recursion exception can itself cause an error when pickling it to
+        # be send back to the parent process. In this case the worker crashes
+        # but the original traceback is still printed on stderr. This could be
+        # improved but does not seem simple to do and this is is not critical
+        # for users (as long as there is no process or thread bomb happening).
+        pytest.xfail("Loky worker crash when serializing RecursionError")
+    else:
+        assert isinstance(exc, RecursionError)
 
 
 def _run_parallel_sum():
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/joblib-0.15.1/joblib.egg-info/PKG-INFO 
new/joblib-0.16.0/joblib.egg-info/PKG-INFO
--- old/joblib-0.15.1/joblib.egg-info/PKG-INFO  2020-05-16 14:39:27.000000000 
+0200
+++ new/joblib-0.16.0/joblib.egg-info/PKG-INFO  2020-07-01 18:05:30.000000000 
+0200
@@ -1,6 +1,6 @@
 Metadata-Version: 1.2
 Name: joblib
-Version: 0.15.1
+Version: 0.16.0
 Summary: Lightweight pipelining: using Python functions as pipeline jobs.
 Home-page: https://joblib.readthedocs.io
 Author: Gael Varoquaux


Reply via email to