Hello community, here is the log from the commit of package python-joblib for openSUSE:Factory checked in at 2020-07-18 21:02:15 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ Comparing /work/SRC/openSUSE:Factory/python-joblib (Old) and /work/SRC/openSUSE:Factory/.python-joblib.new.3592 (New) ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Package is "python-joblib" Sat Jul 18 21:02:15 2020 rev:12 rq:821624 version:0.16.0 Changes: -------- --- /work/SRC/openSUSE:Factory/python-joblib/python-joblib.changes 2020-06-24 15:48:00.528187576 +0200 +++ /work/SRC/openSUSE:Factory/.python-joblib.new.3592/python-joblib.changes 2020-07-18 21:02:50.151600709 +0200 @@ -1,0 +2,19 @@ +Sat Jul 18 09:12:26 UTC 2020 - Dirk Mueller <[email protected]> + +- update to 0.16.0 + - Fix a problem in the constructors of of Parallel backends classes that + inherit from the `AutoBatchingMixin` that prevented the dask backend to + properly batch short tasks. + https://github.com/joblib/joblib/pull/1062 + - Fix a problem in the way the joblib dask backend batches calls that would + badly interact with the dask callable pickling cache and lead to wrong + results or errors. + https://github.com/joblib/joblib/pull/1055 + - Prevent a dask.distributed bug from surfacing in joblib's dask backend + during nested Parallel calls (due to joblib's auto-scattering feature) + https://github.com/joblib/joblib/pull/1061 + - Workaround for a race condition after Parallel calls with the dask backend + that would cause low level warnings from asyncio coroutines: + https://github.com/joblib/joblib/pull/1078 + +------------------------------------------------------------------- Old: ---- joblib-0.15.1.tar.gz New: ---- joblib-0.16.0.tar.gz ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ Other differences: ------------------ ++++++ python-joblib.spec ++++++ --- /var/tmp/diff_new_pack.ZPlVl6/_old 2020-07-18 21:02:52.195602906 +0200 +++ /var/tmp/diff_new_pack.ZPlVl6/_new 2020-07-18 21:02:52.199602910 +0200 @@ -17,8 +17,9 @@ %{?!python_module:%define python_module() python-%{**} python3-%{**}} +%global skip_python2 1 Name: python-joblib -Version: 0.15.1 +Version: 0.16.0 Release: 0 Summary: Module for using Python functions as pipeline jobs License: BSD-3-Clause ++++++ joblib-0.15.1.tar.gz -> joblib-0.16.0.tar.gz ++++++ diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/joblib-0.15.1/.codecov.yml new/joblib-0.16.0/.codecov.yml --- old/joblib-0.15.1/.codecov.yml 2020-05-04 15:38:39.000000000 +0200 +++ new/joblib-0.16.0/.codecov.yml 2020-07-01 18:02:32.000000000 +0200 @@ -1,2 +1,9 @@ codecov: token: 1b7eb264-fd77-469a-829a-e9cd5efd7cef +coverage: + status: + project: + default: + # Allow coverage to drop by up to 1% in a PR before marking it as + # failed + threshold: '1%' diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/joblib-0.15.1/CHANGES.rst new/joblib-0.16.0/CHANGES.rst --- old/joblib-0.15.1/CHANGES.rst 2020-05-16 14:38:07.000000000 +0200 +++ new/joblib-0.16.0/CHANGES.rst 2020-07-01 18:04:07.000000000 +0200 @@ -1,6 +1,27 @@ Latest changes ============== +Release 0.16.0 +-------------- + +- Fix a problem in the constructors of of Parallel backends classes that + inherit from the `AutoBatchingMixin` that prevented the dask backend to + properly batch short tasks. + https://github.com/joblib/joblib/pull/1062 + +- Fix a problem in the way the joblib dask backend batches calls that would + badly interact with the dask callable pickling cache and lead to wrong + results or errors. + https://github.com/joblib/joblib/pull/1055 + +- Prevent a dask.distributed bug from surfacing in joblib's dask backend + during nested Parallel calls (due to joblib's auto-scattering feature) + https://github.com/joblib/joblib/pull/1061 + +- Workaround for a race condition after Parallel calls with the dask backend + that would cause low level warnings from asyncio coroutines: + https://github.com/joblib/joblib/pull/1078 + Release 0.15.1 -------------- diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/joblib-0.15.1/PKG-INFO new/joblib-0.16.0/PKG-INFO --- old/joblib-0.15.1/PKG-INFO 2020-05-16 14:39:28.057508000 +0200 +++ new/joblib-0.16.0/PKG-INFO 2020-07-01 18:05:31.295775700 +0200 @@ -1,6 +1,6 @@ Metadata-Version: 1.2 Name: joblib -Version: 0.15.1 +Version: 0.16.0 Summary: Lightweight pipelining: using Python functions as pipeline jobs. Home-page: https://joblib.readthedocs.io Author: Gael Varoquaux diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/joblib-0.15.1/azure-pipelines.yml new/joblib-0.16.0/azure-pipelines.yml --- old/joblib-0.15.1/azure-pipelines.yml 2020-05-16 14:37:43.000000000 +0200 +++ new/joblib-0.16.0/azure-pipelines.yml 2020-07-01 11:58:50.000000000 +0200 @@ -38,10 +38,12 @@ PYTHON_VERSION: "pypy3" LOKY_MAX_CPU_COUNT: "2" - linux_py38: + linux_py38_distributed: + # To be updated regularly to use the most recent versions of the + # dependencies. imageName: 'ubuntu-latest' PYTHON_VERSION: "3.8" - EXTRA_CONDA_PACKAGES: "numpy=1.18" + EXTRA_CONDA_PACKAGES: "numpy=1.18 distributed=2.17" linux_py37_sklearn_tests: imageName: 'ubuntu-latest' PYTHON_VERSION: "3.7" diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/joblib-0.15.1/joblib/__init__.py new/joblib-0.16.0/joblib/__init__.py --- old/joblib-0.15.1/joblib/__init__.py 2020-05-16 14:38:22.000000000 +0200 +++ new/joblib-0.16.0/joblib/__init__.py 2020-07-01 18:04:07.000000000 +0200 @@ -106,7 +106,7 @@ # Dev branch marker is: 'X.Y.dev' or 'X.Y.devN' where N is an integer. # 'X.Y.dev0' is the canonical version of 'X.Y.dev' # -__version__ = '0.15.1' +__version__ = '0.16.0' import os diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/joblib-0.15.1/joblib/_dask.py new/joblib-0.16.0/joblib/_dask.py --- old/joblib-0.15.1/joblib/_dask.py 2020-05-15 09:32:16.000000000 +0200 +++ new/joblib-0.16.0/joblib/_dask.py 2020-07-01 18:02:32.000000000 +0200 @@ -4,6 +4,7 @@ import concurrent.futures import contextlib +import time from uuid import uuid4 import weakref @@ -89,30 +90,45 @@ def _funcname(x): try: - if isinstance(x, BatchedCalls): - x = x.items[0][0] + if isinstance(x, list): + x = x[0][0] except Exception: pass return funcname(x) -class Batch(object): +def _make_tasks_summary(tasks): + """Summarize of list of (func, args, kwargs) function calls""" + unique_funcs = {func for func, args, kwargs in tasks} + + if len(unique_funcs) == 1: + mixed = False + else: + mixed = True + return len(tasks), mixed, _funcname(tasks) + + +class Batch: + """dask-compatible wrapper that executes a batch of tasks""" def __init__(self, tasks): - self.tasks = tasks + # collect some metadata from the tasks to ease Batch calls + # introspection when debugging + self._num_tasks, self._mixed, self._funcname = _make_tasks_summary( + tasks + ) - def __call__(self, *data): + def __call__(self, tasks=None): results = [] with parallel_backend('dask'): - for func, args, kwargs in self.tasks: - args = [a(data) if isinstance(a, itemgetter) else a - for a in args] - kwargs = {k: v(data) if isinstance(v, itemgetter) else v - for (k, v) in kwargs.items()} + for func, args, kwargs in tasks: results.append(func(*args, **kwargs)) return results - def __reduce__(self): - return Batch, (self.tasks,) + def __repr__(self): + descr = f"batch_of_{self._funcname}_{self._num_tasks}_calls" + if self._mixed: + descr = "mixed_" + descr + return descr def _joblib_probe_task(): @@ -120,7 +136,7 @@ pass -class DaskDistributedBackend(ParallelBackendBase, AutoBatchingMixin): +class DaskDistributedBackend(AutoBatchingMixin, ParallelBackendBase): MIN_IDEAL_BATCH_DURATION = 0.2 MAX_IDEAL_BATCH_DURATION = 1.0 supports_timeout = True @@ -128,6 +144,8 @@ def __init__(self, scheduler_host=None, scatter=None, client=None, loop=None, wait_for_workers_timeout=10, **submit_kwargs): + super().__init__() + if distributed is None: msg = ("You are trying to use 'dask' as a joblib parallel backend " "but dask is not installed. Please install dask " @@ -141,14 +159,14 @@ else: try: client = get_client() - except ValueError: + except ValueError as e: msg = ("To use Joblib with Dask first create a Dask Client" "\n\n" " from dask.distributed import Client\n" " client = Client()\n" "or\n" " client = Client('scheduler-address:8786')") - raise ValueError(msg) + raise ValueError(msg) from e self.client = client @@ -195,6 +213,7 @@ return DaskDistributedBackend(client=self.client), -1 def configure(self, n_jobs=1, parallel=None, **backend_args): + self.parallel = parallel return self.effective_n_jobs(n_jobs) def start_call(self): @@ -206,6 +225,10 @@ # The explicit call to clear is required to break a cycling reference # to the futures. self._continue = False + # wait for the future collection routine (self._backend._collect) to + # finish in order to limit asyncio warnings due to aborting _collect + # during a following backend termination call + time.sleep(0.01) self.call_data_futures.clear() def effective_n_jobs(self, n_jobs): @@ -219,7 +242,7 @@ try: self.client.submit(_joblib_probe_task).result( timeout=self.wait_for_workers_timeout) - except _TimeoutError: + except _TimeoutError as e: error_msg = ( "DaskDistributedBackend has no worker after {} seconds. " "Make sure that workers are started and can properly connect " @@ -228,11 +251,10 @@ "parallel_backend('dask', wait_for_workers_timeout={})" ).format(self.wait_for_workers_timeout, max(10, 2 * self.wait_for_workers_timeout)) - raise TimeoutError(error_msg) + raise TimeoutError(error_msg) from e return sum(self.client.ncores().values()) async def _to_func_args(self, func): - collected_futures = [] itemgetters = dict() # Futures that are dynamically generated during a single call to @@ -252,24 +274,29 @@ try: f = call_data_futures[arg] except KeyError: + pass + if f is None: if is_weakrefable(arg) and sizeof(arg) > 1e3: # Automatically scatter large objects to some of # the workers to avoid duplicated data transfers. # Rely on automated inter-worker data stealing if # more workers need to reuse this data # concurrently. + # set hash=False - nested scatter calls (i.e + # calling client.scatter inside a dask worker) + # using hash=True often raise CancelledError, + # see dask/distributed#3703 [f] = await self.client.scatter( [arg], - asynchronous=True + asynchronous=True, + hash=False ) call_data_futures[arg] = f if f is not None: - getter = itemgetter(len(collected_futures)) - collected_futures.append(f) - itemgetters[arg_id] = getter - arg = getter - out.append(arg) + out.append(f) + else: + out.append(arg) return out tasks = [] @@ -279,21 +306,19 @@ await maybe_to_futures(kwargs.values()))) tasks.append((f, args, kwargs)) - if not collected_futures: - return func, () - return (Batch(tasks), collected_futures) + return (Batch(tasks), tasks) def apply_async(self, func, callback=None): - key = '%s-batch-%s' % (_funcname(func), uuid4().hex) cf_future = concurrent.futures.Future() cf_future.get = cf_future.result # achieve AsyncResult API async def f(func, callback): - func, args = await self._to_func_args(func) + batch, tasks = await self._to_func_args(func) + key = f'{repr(batch)}-{uuid4().hex}' dask_future = self.client.submit( - func, *args, key=key, **self.submit_kwargs + batch, tasks=tasks, key=key, **self.submit_kwargs ) self.waiting_futures.add(dask_future) self._callbacks[dask_future] = callback diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/joblib-0.15.1/joblib/_multiprocessing_helpers.py new/joblib-0.16.0/joblib/_multiprocessing_helpers.py --- old/joblib-0.15.1/joblib/_multiprocessing_helpers.py 2020-05-04 15:38:39.000000000 +0200 +++ new/joblib-0.16.0/joblib/_multiprocessing_helpers.py 2020-07-01 11:30:33.000000000 +0200 @@ -36,10 +36,10 @@ _sem = SemLock(0, 0, 1, name=name, unlink=True) del _sem # cleanup break - except FileExistsError: # pragma: no cover + except FileExistsError as e: # pragma: no cover if i >= 99: raise FileExistsError( - 'cannot find name for semaphore') + 'cannot find name for semaphore') from e except (FileExistsError, AttributeError, ImportError, OSError) as e: mp = None warnings.warn('%s. joblib will operate in serial mode' % (e,)) diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/joblib-0.15.1/joblib/_parallel_backends.py new/joblib-0.16.0/joblib/_parallel_backends.py --- old/joblib-0.15.1/joblib/_parallel_backends.py 2020-05-15 09:32:16.000000000 +0200 +++ new/joblib-0.16.0/joblib/_parallel_backends.py 2020-07-01 11:30:33.000000000 +0200 @@ -31,7 +31,9 @@ supports_inner_max_num_threads = False nesting_level = None - def __init__(self, nesting_level=None, inner_max_num_threads=None): + def __init__(self, nesting_level=None, inner_max_num_threads=None, + **kwargs): + super().__init__(**kwargs) self.nesting_level = nesting_level self.inner_max_num_threads = inner_max_num_threads @@ -276,9 +278,9 @@ _DEFAULT_SMOOTHED_BATCH_DURATION = 0.0 def __init__(self, **kwargs): + super().__init__(**kwargs) self._effective_batch_size = self._DEFAULT_EFFECTIVE_BATCH_SIZE self._smoothed_batch_duration = self._DEFAULT_SMOOTHED_BATCH_DURATION - super(AutoBatchingMixin, self).__init__(**kwargs) def compute_batch_size(self): """Determine the optimal batch size""" @@ -538,8 +540,8 @@ AsyncResults.get from multiprocessing.""" try: return future.result(timeout=timeout) - except CfTimeoutError: - raise TimeoutError() + except CfTimeoutError as e: + raise TimeoutError from e def terminate(self): if self._workers is not None: @@ -591,11 +593,11 @@ def __call__(self, *args, **kwargs): try: return self.func(*args, **kwargs) - except KeyboardInterrupt: + except KeyboardInterrupt as e: # We capture the KeyboardInterrupt and reraise it as # something different, as multiprocessing does not # interrupt processing for a KeyboardInterrupt - raise WorkerInterrupt() + raise WorkerInterrupt() from e except BaseException: # Rely on Python 3 built-in Remote Traceback reporting raise diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/joblib-0.15.1/joblib/disk.py new/joblib-0.16.0/joblib/disk.py --- old/joblib-0.15.1/joblib/disk.py 2020-05-14 18:47:29.000000000 +0200 +++ new/joblib-0.16.0/joblib/disk.py 2020-07-01 11:30:33.000000000 +0200 @@ -46,10 +46,10 @@ units = dict(K=kilo, M=kilo ** 2, G=kilo ** 3) try: size = int(units[text[-1]] * float(text[:-1])) - except (KeyError, ValueError): + except (KeyError, ValueError) as e: raise ValueError( "Invalid literal for size give: %s (type %s) should be " - "alike '10G', '500M', '50K'." % (text, type(text))) + "alike '10G', '500M', '50K'." % (text, type(text))) from e return size diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/joblib-0.15.1/joblib/func_inspect.py new/joblib-0.16.0/joblib/func_inspect.py --- old/joblib-0.15.1/joblib/func_inspect.py 2020-05-04 15:38:39.000000000 +0200 +++ new/joblib-0.16.0/joblib/func_inspect.py 2020-07-01 11:30:33.000000000 +0200 @@ -253,14 +253,14 @@ else: try: arg_dict[arg_name] = arg_defaults[position] - except (IndexError, KeyError): + except (IndexError, KeyError) as e: # Missing argument raise ValueError( 'Wrong number of arguments for %s:\n' ' %s was called.' % (_signature_str(name, arg_spec), _function_called_str(name, args, kwargs)) - ) + ) from e varkwargs = dict() for arg_name, arg_value in sorted(kwargs.items()): diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/joblib-0.15.1/joblib/numpy_pickle.py new/joblib-0.16.0/joblib/numpy_pickle.py --- old/joblib-0.15.1/joblib/numpy_pickle.py 2020-05-04 15:38:39.000000000 +0200 +++ new/joblib-0.16.0/joblib/numpy_pickle.py 2020-06-15 16:56:41.000000000 +0200 @@ -100,7 +100,7 @@ 'zerosize_ok'], buffersize=buffersize, order=self.order): - pickler.file_handle.write(chunk.tostring('C')) + pickler.file_handle.write(chunk.tobytes('C')) def read_array(self, unpickler): """Read array from unpickler file handle. diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/joblib-0.15.1/joblib/numpy_pickle_compat.py new/joblib-0.16.0/joblib/numpy_pickle_compat.py --- old/joblib-0.15.1/joblib/numpy_pickle_compat.py 2020-05-04 15:38:39.000000000 +0200 +++ new/joblib-0.16.0/joblib/numpy_pickle_compat.py 2020-06-15 16:56:41.000000000 +0200 @@ -126,7 +126,7 @@ retrieve it. The reason that we store the raw buffer data of the array and the meta information, rather than array representation routine - (tostring) is that it enables us to use completely the strided + (tobytes) is that it enables us to use completely the strided model to avoid memory copies (a and a.T store as fast). In addition saving the heavy information separately can avoid creating large temporary buffers when unpickling data with diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/joblib-0.15.1/joblib/parallel.py new/joblib-0.16.0/joblib/parallel.py --- old/joblib-0.15.1/joblib/parallel.py 2020-05-15 09:32:16.000000000 +0200 +++ new/joblib-0.16.0/joblib/parallel.py 2020-07-01 17:19:11.000000000 +0200 @@ -60,12 +60,12 @@ try: from ._dask import DaskDistributedBackend register_parallel_backend('dask', DaskDistributedBackend) - except ImportError: + except ImportError as e: msg = ("To use the dask.distributed backend you must install both " "the `dask` and distributed modules.\n\n" "See https://dask.pydata.org/en/latest/install.html for more " "information.") - raise ImportError(msg) + raise ImportError(msg) from e EXTERNAL_BACKENDS = { @@ -679,9 +679,9 @@ else: try: backend_factory = BACKENDS[backend] - except KeyError: + except KeyError as e: raise ValueError("Invalid backend: %s, expected one of %r" - % (backend, sorted(BACKENDS.keys()))) + % (backend, sorted(BACKENDS.keys()))) from e backend = backend_factory(nesting_level=nesting_level) if (require == 'sharedmem' and diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/joblib-0.15.1/joblib/test/data/create_numpy_pickle.py new/joblib-0.16.0/joblib/test/data/create_numpy_pickle.py --- old/joblib-0.15.1/joblib/test/data/create_numpy_pickle.py 2020-05-04 15:38:39.000000000 +0200 +++ new/joblib-0.16.0/joblib/test/data/create_numpy_pickle.py 2020-06-15 16:56:41.000000000 +0200 @@ -86,10 +86,7 @@ np.arange(5, dtype=np.dtype('<f8')), np.array([1, 'abc', {'a': 1, 'b': 2}], dtype='O'), # all possible bytes as a byte string - # .tostring actually returns bytes and is a - # compatibility alias for .tobytes which was - # added in 1.9.0 - np.arange(256, dtype=np.uint8).tostring(), + np.arange(256, dtype=np.uint8).tobytes(), np.matrix([0, 1, 2], dtype=np.dtype('<i8')), # unicode string with non-ascii chars u"C'est l'\xe9t\xe9 !"] diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/joblib-0.15.1/joblib/test/test_dask.py new/joblib-0.16.0/joblib/test/test_dask.py --- old/joblib-0.15.1/joblib/test/test_dask.py 2020-05-15 09:32:16.000000000 +0200 +++ new/joblib-0.16.0/joblib/test/test_dask.py 2020-07-01 16:57:44.000000000 +0200 @@ -3,14 +3,15 @@ import pytest from random import random +from uuid import uuid4 from time import sleep from .. import Parallel, delayed, parallel_backend -from ..parallel import ThreadingBackend +from ..parallel import ThreadingBackend, AutoBatchingMixin from .._dask import DaskDistributedBackend distributed = pytest.importorskip('distributed') -from distributed import Client, LocalCluster +from distributed import Client, LocalCluster, get_client from distributed.metrics import time from distributed.utils_test import cluster, inc @@ -25,6 +26,15 @@ raise ValueError("condition evaluated to True") +def count_events(event_name, client): + worker_events = client.run(lambda dask_worker: dask_worker.log) + event_counts = {} + for w, events in worker_events.items(): + event_counts[w] = len([event for event in list(events) + if event[1] == event_name]) + return event_counts + + def test_simple(loop): with cluster() as (s, [a, b]): with Client(s['address'], loop=loop) as client: # noqa: F841 @@ -40,6 +50,30 @@ assert seq == [inc(i) for i in range(10)] +def test_dask_backend_uses_autobatching(loop): + assert (DaskDistributedBackend.compute_batch_size + is AutoBatchingMixin.compute_batch_size) + + with cluster() as (s, [a, b]): + with Client(s['address'], loop=loop) as client: # noqa: F841 + with parallel_backend('dask') as (ba, _): + with Parallel() as parallel: + # The backend should be initialized with a default + # batch size of 1: + backend = parallel._backend + assert isinstance(backend, DaskDistributedBackend) + assert backend.parallel is parallel + assert backend._effective_batch_size == 1 + + # Launch many short tasks that should trigger + # auto-batching: + parallel( + delayed(lambda: None)() + for _ in range(int(1e4)) + ) + assert backend._effective_batch_size > 10 + + def random2(): return random() @@ -52,20 +86,85 @@ assert x != y -def test_dask_funcname(loop): [email protected]("mixed", [True, False]) +def test_dask_funcname(loop, mixed): + from joblib._dask import Batch + if not mixed: + tasks = [delayed(inc)(i) for i in range(4)] + batch_repr = 'batch_of_inc_4_calls' + else: + tasks = [ + delayed(abs)(i) if i % 2 else delayed(inc)(i) for i in range(4) + ] + batch_repr = 'mixed_batch_of_inc_4_calls' + + assert repr(Batch(tasks)) == batch_repr + with cluster() as (s, [a, b]): with Client(s['address'], loop=loop) as client: with parallel_backend('dask') as (ba, _): - x, y = Parallel()(delayed(inc)(i) for i in range(2)) + _ = Parallel(batch_size=2, pre_dispatch='all')(tasks) def f(dask_scheduler): return list(dask_scheduler.transition_log) + batch_repr = batch_repr.replace('4', '2') log = client.run_on_scheduler(f) - assert all(tup[0].startswith('inc-batch') for tup in log) + assert all('batch_of_inc' in tup[0] for tup in log) -def add5(a, b, c, d=0, e=0): - return a + b + c + d + e +def test_no_undesired_distributed_cache_hit(loop): + # Dask has a pickle cache for callables that are called many times. Because + # the dask backends used to wrapp both the functions and the arguments + # under instances of the Batch callable class this caching mechanism could + # lead to bugs as described in: https://github.com/joblib/joblib/pull/1055 + # The joblib-dask backend has been refactored to avoid bundling the + # arguments as an attribute of the Batch instance to avoid this problem. + # This test serves as non-regression problem. + + # Use a large number of input arguments to give the AutoBatchingMixin + # enough tasks to kick-in. + lists = [[] for _ in range(100)] + np = pytest.importorskip('numpy') + X = np.arange(int(1e6)) + + def isolated_operation(list_, X=None): + list_.append(uuid4().hex) + return list_ + + cluster = LocalCluster(n_workers=1, threads_per_worker=2) + client = Client(cluster) + try: + with parallel_backend('dask') as (ba, _): + # dispatches joblib.parallel.BatchedCalls + res = Parallel()( + delayed(isolated_operation)(list_) for list_ in lists + ) + + # The original arguments should not have been mutated as the mutation + # happens in the dask worker process. + assert lists == [[] for _ in range(100)] + + # Here we did not pass any large numpy array as argument to + # isolated_operation so no scattering event should happen under the + # hood. + counts = count_events('receive-from-scatter', client) + assert sum(counts.values()) == 0 + assert all([len(r) == 1 for r in res]) + + with parallel_backend('dask') as (ba, _): + # Append a large array which will be scattered by dask, and + # dispatch joblib._dask.Batch + res = Parallel()( + delayed(isolated_operation)(list_, X=X) for list_ in lists + ) + + # This time, auto-scattering should have kicked it. + counts = count_events('receive-from-scatter', client) + assert sum(counts.values()) > 0 + assert all([len(r) == 1 for r in res]) + finally: + client.close() + cluster.close() class CountSerialized(object): @@ -83,6 +182,10 @@ return (CountSerialized, (self.x,)) +def add5(a, b, c, d=0, e=0): + return a + b + c + d + e + + def test_manual_scatter(loop): x = CountSerialized(1) y = CountSerialized(2) @@ -110,7 +213,11 @@ # Scattered variables only serialized once assert x.count == 1 assert y.count == 1 - assert z.count == 4 + # Depending on the version of distributed, the unscattered z variable + # is either pickled 4 or 6 times, possibly because of the memoization + # of objects that appear several times in the arguments of a delayed + # task. + assert z.count in (4, 6) def test_auto_scatter(loop): @@ -119,14 +226,6 @@ data2 = np.ones(int(1e4), dtype=np.uint8) data_to_process = ([data1] * 3) + ([data2] * 3) - def count_events(event_name, client): - worker_events = client.run(lambda dask_worker: dask_worker.log) - event_counts = {} - for w, events in worker_events.items(): - event_counts[w] = len([event for event in list(events) - if event[1] == event_name]) - return event_counts - with cluster() as (s, [a, b]): with Client(s['address'], loop=loop) as client: with parallel_backend('dask') as (ba, _): @@ -152,6 +251,36 @@ assert counts[b['address']] == 0 [email protected]("retry_no", list(range(2))) +def test_nested_scatter(loop, retry_no): + + np = pytest.importorskip('numpy') + + NUM_INNER_TASKS = 10 + NUM_OUTER_TASKS = 10 + + def my_sum(x, i, j): + return np.sum(x) + + def outer_function_joblib(array, i): + client = get_client() # noqa + with parallel_backend("dask"): + results = Parallel()( + delayed(my_sum)(array[j:], i, j) for j in range( + NUM_INNER_TASKS) + ) + return sum(results) + + with cluster() as (s, [a, b]): + with Client(s['address'], loop=loop) as _: + with parallel_backend("dask"): + my_array = np.ones(10000) + _ = Parallel()( + delayed(outer_function_joblib)( + my_array[i:], i) for i in range(NUM_OUTER_TASKS) + ) + + def test_nested_backend_context_manager(loop): def get_nested_pids(): pids = set(Parallel(n_jobs=2)(delayed(os.getpid)() for _ in range(2))) diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/joblib-0.15.1/joblib/test/test_numpy_pickle.py new/joblib-0.16.0/joblib/test/test_numpy_pickle.py --- old/joblib-0.15.1/joblib/test/test_numpy_pickle.py 2020-05-16 14:37:43.000000000 +0200 +++ new/joblib-0.16.0/joblib/test/test_numpy_pickle.py 2020-06-15 16:56:41.000000000 +0200 @@ -340,10 +340,7 @@ np.arange(5, dtype=np.dtype('<f8')), np.arange(5, dtype=np.dtype('>f8')), np.array([1, 'abc', {'a': 1, 'b': 2}], dtype='O'), - # .tostring actually returns bytes and is a - # compatibility alias for .tobytes which was - # added in 1.9.0 - np.arange(256, dtype=np.uint8).tostring(), + np.arange(256, dtype=np.uint8).tobytes(), # np.matrix is a subclass of np.ndarray, here we want # to verify this type of object is correctly unpickled # among versions. @@ -436,10 +433,7 @@ expected_list = [np.arange(5, dtype=np.dtype('<i8')), np.arange(5, dtype=np.dtype('<f8')), np.array([1, 'abc', {'a': 1, 'b': 2}], dtype='O'), - # .tostring actually returns bytes and is a - # compatibility alias for .tobytes which was - # added in 1.9.0 - np.arange(256, dtype=np.uint8).tostring(), + np.arange(256, dtype=np.uint8).tobytes(), # np.matrix is a subclass of np.ndarray, here we want # to verify this type of object is correctly unpickled # among versions. diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/joblib-0.15.1/joblib/test/test_parallel.py new/joblib-0.16.0/joblib/test/test_parallel.py --- old/joblib-0.15.1/joblib/test/test_parallel.py 2020-05-14 18:47:29.000000000 +0200 +++ new/joblib-0.16.0/joblib/test/test_parallel.py 2020-07-01 11:58:50.000000000 +0200 @@ -31,6 +31,7 @@ from joblib.testing import (parametrize, raises, check_subprocess_call, skipif, SkipTest, warns) +from joblib.externals.loky.process_executor import TerminatedWorkerError from queue import Queue @@ -530,6 +531,7 @@ @with_multiprocessing @parametrize('backend', PARALLEL_BACKENDS) [email protected](reason="https://github.com/joblib/loky/pull/255") def test_nested_exception_dispatch(backend): """Ensure errors for nested joblib cases gets propagated @@ -1468,8 +1470,18 @@ # saturating the operating system resources by creating a unbounded number # of threads. with parallel_backend(backend, n_jobs=2): - with raises(RecursionError): + with raises(BaseException) as excinfo: _recursive_parallel() + exc = excinfo.value + if backend == "loky" and isinstance(exc, TerminatedWorkerError): + # The recursion exception can itself cause an error when pickling it to + # be send back to the parent process. In this case the worker crashes + # but the original traceback is still printed on stderr. This could be + # improved but does not seem simple to do and this is is not critical + # for users (as long as there is no process or thread bomb happening). + pytest.xfail("Loky worker crash when serializing RecursionError") + else: + assert isinstance(exc, RecursionError) def _run_parallel_sum(): diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/joblib-0.15.1/joblib.egg-info/PKG-INFO new/joblib-0.16.0/joblib.egg-info/PKG-INFO --- old/joblib-0.15.1/joblib.egg-info/PKG-INFO 2020-05-16 14:39:27.000000000 +0200 +++ new/joblib-0.16.0/joblib.egg-info/PKG-INFO 2020-07-01 18:05:30.000000000 +0200 @@ -1,6 +1,6 @@ Metadata-Version: 1.2 Name: joblib -Version: 0.15.1 +Version: 0.16.0 Summary: Lightweight pipelining: using Python functions as pipeline jobs. Home-page: https://joblib.readthedocs.io Author: Gael Varoquaux
