Hello community, here is the log from the commit of package python-dask for openSUSE:Factory checked in at 2020-04-13 12:53:26 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ Comparing /work/SRC/openSUSE:Factory/python-dask (Old) and /work/SRC/openSUSE:Factory/.python-dask.new.3248 (New) ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Package is "python-dask" Mon Apr 13 12:53:26 2020 rev:32 rq:793342 version:2.14.0 Changes: -------- --- /work/SRC/openSUSE:Factory/python-dask/python-dask.changes 2020-03-29 14:27:54.510165399 +0200 +++ /work/SRC/openSUSE:Factory/.python-dask.new.3248/python-dask.changes 2020-04-13 12:53:28.476653032 +0200 @@ -1,0 +2,18 @@ +Sat Apr 11 21:45:43 UTC 2020 - Arun Persaud <[email protected]> + +- update to version 2.14.0: + * Array + + Added np.iscomplexobj implementation (:pr:`6045`) Tom Augspurger + * Core + + Update test_rearrange_disk_cleanup_with_exception to pass + without cloudpickle installed (:pr:`6052`) James Bourbeau + + Fixed flaky test-rearrange (:pr:`5977`) Tom Augspurger + * DataFrame + + Use _meta_nonempty for dtype casting in stack_partitions + (:pr:`6061`) mlondschien + + Fix bugs in _metadata creation and filtering in parquet + ArrowEngine (:pr:`6023`) Richard (Rick) Zamora + * Documentation + + DOC: Add name caveats (:pr:`6040`) Tom Augspurger + +------------------------------------------------------------------- Old: ---- dask-2.13.0.tar.gz New: ---- dask-2.14.0.tar.gz ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ Other differences: ------------------ ++++++ python-dask.spec ++++++ --- /var/tmp/diff_new_pack.haRUdJ/_old 2020-04-13 12:53:29.352653414 +0200 +++ /var/tmp/diff_new_pack.haRUdJ/_new 2020-04-13 12:53:29.356653415 +0200 @@ -27,7 +27,7 @@ %endif %define skip_python2 1 Name: python-dask%{psuffix} -Version: 2.13.0 +Version: 2.14.0 Release: 0 Summary: Minimal task scheduling abstraction License: BSD-3-Clause ++++++ dask-2.13.0.tar.gz -> dask-2.14.0.tar.gz ++++++ diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/dask-2.13.0/PKG-INFO new/dask-2.14.0/PKG-INFO --- old/dask-2.13.0/PKG-INFO 2020-03-25 20:23:38.611427800 +0100 +++ new/dask-2.14.0/PKG-INFO 2020-04-03 22:41:01.324165000 +0200 @@ -1,6 +1,6 @@ Metadata-Version: 2.1 Name: dask -Version: 2.13.0 +Version: 2.14.0 Summary: Parallel PyData with Task Scheduling Home-page: https://github.com/dask/dask/ Maintainer: Matthew Rocklin diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/dask-2.13.0/dask/_version.py new/dask-2.14.0/dask/_version.py --- old/dask-2.13.0/dask/_version.py 2020-03-25 20:23:38.614493400 +0100 +++ new/dask-2.14.0/dask/_version.py 2020-04-03 22:41:01.328230600 +0200 @@ -11,8 +11,8 @@ { "dirty": false, "error": null, - "full-revisionid": "c743feab0a92d20e248d32e97dcfbe567f4dfbd0", - "version": "2.13.0" + "full-revisionid": "824a69cdc258d9cae1b33f5052e0bd78c203d2e1", + "version": "2.14.0" } ''' # END VERSION_JSON diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/dask-2.13.0/dask/array/core.py new/dask-2.14.0/dask/array/core.py --- old/dask-2.13.0/dask/array/core.py 2020-03-21 17:11:50.000000000 +0100 +++ new/dask-2.14.0/dask/array/core.py 2020-03-27 18:08:57.000000000 +0100 @@ -412,7 +412,7 @@ drop_axis=[], new_axis=None, meta=None, - **kwargs + **kwargs, ): """ Map a function across all blocks of a dask array. @@ -811,7 +811,7 @@ regions=None, compute=True, return_stored=False, - **kwargs + **kwargs, ): """ Store dask arrays in array-like objects, overwrite data in target @@ -2696,6 +2696,15 @@ changed by installing cityhash, xxhash or murmurhash. If installed, a large-factor speedup can be obtained in the tokenisation step. Use ``name=False`` to generate a random name instead of hashing (fast) + + .. note:: + + Because this ``name`` is used as the key in task graphs, you should + ensure that it uniquely identifies the data contained within. If + you'd like to provide a descriptive name that is still unique, combine + the descriptive name with :func:`dask.base.tokenize` of the + ``array_like``. See :ref:`graphs` for more. + lock : bool or Lock, optional If ``x`` doesn't support concurrent reads then provide a lock here, or pass in True to have dask.array create one for you. @@ -2731,6 +2740,12 @@ >>> a = da.from_array(x, chunks='auto') # doctest: +SKIP >>> a = da.from_array(x, chunks='100 MiB') # doctest: +SKIP >>> a = da.from_array(x) # doctest: +SKIP + + If providing a name, ensure that it is unique + + >>> import dask.base + >>> token = dask.base.tokenize(x) # doctest: +SKIP + >>> a = da.from_array('myarray-' + token) # doctest: +SKIP """ if isinstance(x, Array): raise ValueError( @@ -2852,7 +2867,7 @@ overwrite=False, compute=True, return_stored=False, - **kwargs + **kwargs, ): """Save array to the zarr storage format diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/dask-2.13.0/dask/array/routines.py new/dask-2.14.0/dask/array/routines.py --- old/dask-2.13.0/dask/array/routines.py 2020-03-21 17:11:50.000000000 +0100 +++ new/dask-2.14.0/dask/array/routines.py 2020-04-01 00:34:20.000000000 +0200 @@ -808,6 +808,12 @@ return a.map_blocks(np.round, decimals=decimals, dtype=a.dtype) +@implements(np.iscomplexobj) +@derived_from(np) +def iscomplexobj(x): + return issubclass(x.dtype.type, np.complexfloating) + + def _unique_internal(ar, indices, counts, return_inverse=False): """ Helper/wrapper function for :func:`numpy.unique`. diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/dask-2.13.0/dask/array/tests/test_routines.py new/dask-2.14.0/dask/array/tests/test_routines.py --- old/dask-2.13.0/dask/array/tests/test_routines.py 2020-03-21 17:11:50.000000000 +0100 +++ new/dask-2.14.0/dask/array/tests/test_routines.py 2020-04-01 00:34:20.000000000 +0200 @@ -1740,3 +1740,11 @@ with pytest.warns(RuntimeWarning): da.average(d_a, weights=da.zeros_like(d_a)).compute() + + +def test_iscomplexobj(): + a = da.from_array(np.array([1, 2]), 2) + assert np.iscomplexobj(a) is False + + a = da.from_array(np.array([1, 2 + 0j]), 2) + assert np.iscomplexobj(a) is True diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/dask-2.13.0/dask/dataframe/io/parquet/arrow.py new/dask-2.14.0/dask/dataframe/io/parquet/arrow.py --- old/dask-2.13.0/dask/dataframe/io/parquet/arrow.py 2020-03-11 00:05:35.000000000 +0100 +++ new/dask-2.14.0/dask/dataframe/io/parquet/arrow.py 2020-04-01 00:34:21.000000000 +0200 @@ -1,10 +1,12 @@ from functools import partial from collections import OrderedDict import json +import warnings import pandas as pd import pyarrow as pa import pyarrow.parquet as pq +from pyarrow.compat import guid from ....utils import natural_sort_key, getargspec from ..utils import _get_pyarrow_dtypes, _meta_from_dtypes from ...utils import clear_known_categories @@ -61,6 +63,37 @@ return tuple(result.values()) +def _merge_statistics(stats, s): + """ Update `stats` with vaules in `s` + """ + stats[-1]["total_byte_size"] += s["total_byte_size"] + stats[-1]["num-rows"] += s["num-rows"] + ncols = len(stats[-1]["columns"]) + ncols_n = len(s["columns"]) + if ncols != ncols_n: + raise ValueError(f"Column count not equal ({ncols} vs {ncols_n})") + for i in range(ncols): + name = stats[-1]["columns"][i]["name"] + j = i + for ii in range(ncols): + if name == s["columns"][j]["name"]: + break + if ii == ncols - 1: + raise KeyError(f"Column statistics missing for {name}") + j = (j + 1) % ncols + + min_n = s["columns"][j]["min"] + max_n = s["columns"][j]["max"] + null_count_n = s["columns"][j]["null_count"] + + min_i = stats[-1]["columns"][i]["min"] + max_i = stats[-1]["columns"][i]["max"] + stats[-1]["columns"][i]["min"] = min(min_i, min_n) + stats[-1]["columns"][i]["max"] = min(max_i, max_n) + stats[-1]["columns"][i]["null_count"] += null_count_n + return True + + def _determine_dataset_parts(fs, paths, gather_statistics, filters, dataset_kwargs): """ Determine how to access metadata and break read into ``parts`` @@ -119,6 +152,55 @@ return parts, dataset +def _write_partitioned( + table, root_path, partition_cols, fs, preserve_index=True, **kwargs +): + """ Write table to a partitioned dataset with pyarrow. + + Logic copied from pyarrow.parquet. + (arrow/python/pyarrow/parquet.py::write_to_dataset) + + TODO: Remove this in favor of pyarrow's `write_to_dataset` + once ARROW-8244 is addressed. + """ + fs.mkdirs(root_path, exist_ok=True) + + df = table.to_pandas(ignore_metadata=True) + partition_keys = [df[col] for col in partition_cols] + data_df = df.drop(partition_cols, axis="columns") + data_cols = df.columns.drop(partition_cols) + if len(data_cols) == 0 and not preserve_index: + raise ValueError("No data left to save outside partition columns") + + subschema = table.schema + for col in table.schema.names: + if col in partition_cols: + subschema = subschema.remove(subschema.get_field_index(col)) + + md_list = [] + for keys, subgroup in data_df.groupby(partition_keys): + if not isinstance(keys, tuple): + keys = (keys,) + subdir = fs.sep.join( + [ + "{colname}={value}".format(colname=name, value=val) + for name, val in zip(partition_cols, keys) + ] + ) + subtable = pa.Table.from_pandas( + subgroup, preserve_index=False, schema=subschema, safe=False + ) + prefix = fs.sep.join([root_path, subdir]) + fs.mkdir(prefix, exists_ok=True) + outfile = guid() + ".parquet" + full_path = fs.sep.join([prefix, outfile]) + with fs.open(full_path, "wb") as f: + pq.write_table(subtable, f, metadata_collector=md_list, **kwargs) + md_list[-1].set_file_path(fs.sep.join([subdir, outfile])) + + return md_list + + class ArrowEngine(Engine): @staticmethod def read_metadata( @@ -138,6 +220,15 @@ parts, dataset = _determine_dataset_parts( fs, paths, gather_statistics, filters, kwargs.get("dataset", {}) ) + # Check if the column-chunk file_path's are set in "_metadata". + # If available, we can use the path to sort the row-groups + col_chunk_paths = False + if dataset.metadata: + col_chunk_paths = all( + dataset.metadata.row_group(i).column(0).file_path is not None + for i in range(dataset.metadata.num_row_groups) + ) + # TODO: Call to `_determine_dataset_parts` uses `pq.ParquetDataset` # to define the `dataset` object. `split_row_groups` should be passed # to that constructor once it is supported (see ARROW-2801). @@ -145,11 +236,39 @@ partitions = [ n for n in dataset.partitions.partition_names if n is not None ] - if partitions: - split_row_groups = False + if partitions and dataset.metadata: + # Dont use dataset.metadata for partitioned datasets, unless + # the column-chunk metadata includes the `"file_path"`. + # The order of dataset.metadata.row_group items is often + # different than the order of `dataset.pieces`. + if not col_chunk_paths or ( + len(dataset.pieces) != dataset.metadata.num_row_groups + ): + dataset.schema = dataset.metadata.schema + dataset.metadata = None else: partitions = [] + # Statistics are currently collected at the row-group level only. + # Therefore, we cannot perform filtering with split_row_groups=False. + # For "partitioned" datasets, each file (usually) corresponds to a + # row-group anyway. + # TODO: Map row-group statistics onto file pieces for filtering. + # This shouldn't be difficult if `col_chunk_paths==True` + if not split_row_groups and not col_chunk_paths: + if gather_statistics is None and not partitions: + gather_statistics = False + if filters: + raise ValueError( + "Filters not supported with split_row_groups=False " + "(unless proper _metadata is available)." + ) + if gather_statistics and not partitions: + raise ValueError( + "Statistics not supported with split_row_groups=False." + "(unless proper _metadata is available)." + ) + if dataset.metadata: schema = dataset.metadata.schema.to_arrow_schema() else: @@ -218,12 +337,21 @@ and dataset.metadata.num_row_groups >= len(pieces) ): gather_statistics = True - # Don't gather stats by default if this is a partitioned dataset - if dataset.metadata.num_row_groups != len(pieces) and partitions: - gather_statistics = False if not pieces: gather_statistics = False + if filters: + # Filters may require us to gather statistics + if gather_statistics is False and partitions: + warnings.warn( + "Filtering with gather_statistics=False. " + "Only partition columns will be filtered correctly." + ) + elif gather_statistics is False: + raise ValueError("Cannot apply filters with gather_statistics=False") + elif not gather_statistics: + gather_statistics = True + row_groups_per_piece = None if gather_statistics: # Read from _metadata file @@ -232,6 +360,16 @@ dataset.metadata.row_group(i) for i in range(dataset.metadata.num_row_groups) ] + + # Re-order row-groups by path name if known + if col_chunk_paths: + row_groups = sorted( + row_groups, + key=lambda row_group: natural_sort_key( + row_group.column(0).file_path + ), + ) + if split_row_groups and len(dataset.paths) == 1: row_groups_per_piece = _get_row_groups_per_piece( pieces, dataset.metadata, dataset.paths[0], fs @@ -250,6 +388,7 @@ if gather_statistics: stats = [] skip_cols = set() # Columns with min/max = None detected + path_last = None for ri, row_group in enumerate(row_groups): s = {"num-rows": row_group.num_rows, "columns": []} for i, name in enumerate(names): @@ -274,6 +413,17 @@ ) s["columns"].append(d) s["total_byte_size"] = row_group.total_byte_size + if col_chunk_paths: + s["file_path_0"] = row_group.column(0).file_path + if not split_row_groups and (s["file_path_0"] == path_last): + # Rather than appending a new "row-group", just merge + # new `s` statistics into last element of `stats`. + # Note that each stats element will now correspond to an + # entire file (rather than actual "row-groups") + _merge_statistics(stats, s) + continue + else: + path_last = s["file_path_0"] stats.append(s) else: stats = None @@ -307,7 +457,8 @@ parts.append((piece.path, rg, piece.partition_keys)) # Setting file_path here, because it may be # missing from the row-group/column-chunk stats - stats[rg_tot]["file_path_0"] = piece.path + if "file_path_0" not in stats[rg_tot]: + stats[rg_tot]["file_path_0"] = piece.path rg_tot += 1 else: parts = [ @@ -484,22 +635,22 @@ schema=None, **kwargs, ): - md_list = [] + _meta = None preserve_index = False if index_cols: df = df.set_index(index_cols) preserve_index = True t = pa.Table.from_pandas(df, preserve_index=preserve_index, schema=schema) if partition_on: - pq.write_to_dataset( - t, - path, - partition_cols=partition_on, - filesystem=fs, - metadata_collector=md_list, - **kwargs, + md_list = _write_partitioned( + t, path, partition_on, fs, preserve_index=preserve_index, **kwargs ) + if md_list: + _meta = md_list[0] + for i in range(1, len(md_list)): + _meta.append_row_groups(md_list[i]) else: + md_list = [] with fs.open(fs.sep.join([path, filename]), "wb") as fil: pq.write_table( t, @@ -509,10 +660,11 @@ **kwargs, ) if md_list: - md_list[0].set_file_path(filename) + _meta = md_list[0] + _meta.set_file_path(filename) # Return the schema needed to write the metadata if return_metadata: - return [{"schema": t.schema, "meta": md_list[0]}] + return [{"schema": t.schema, "meta": _meta}] else: return [] diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/dask-2.13.0/dask/dataframe/io/tests/test_parquet.py new/dask-2.14.0/dask/dataframe/io/tests/test_parquet.py --- old/dask-2.13.0/dask/dataframe/io/tests/test_parquet.py 2020-03-21 17:11:50.000000000 +0100 +++ new/dask-2.14.0/dask/dataframe/io/tests/test_parquet.py 2020-04-01 00:34:21.000000000 +0200 @@ -639,7 +639,9 @@ engine=engine, ) - out = dd.read_parquet(tmp, engine=engine, gather_statistics=True).compute() + out = dd.read_parquet( + tmp, engine=engine, index="index", gather_statistics=True + ).compute() out["lon"] = out.lon.astype("int") # just to pass assert # sort required since partitioning breaks index order assert_eq( @@ -1000,9 +1002,14 @@ ("partition_column", pa.int64()), ] ) - ddf.to_parquet( - str(tmpdir), engine="pyarrow", partition_on="partition_column", schema=schema + fut = ddf.to_parquet( + str(tmpdir), + compute=False, + engine="pyarrow", + partition_on="partition_column", + schema=schema, ) + fut.compute(scheduler="single-threaded") ddf_after_write = ( dd.read_parquet(str(tmpdir), engine="pyarrow", gather_statistics=False) .compute() @@ -1091,7 +1098,7 @@ assert set(df.bb[df.aa == val]) == set(out.bb[out.aa == val]) -@write_read_engines_xfail +@write_read_engines() def test_filters_categorical(tmpdir, write_engine, read_engine): tmpdir = str(tmpdir) cats = ["2018-01-01", "2018-01-02", "2018-01-03", "2018-01-04"] @@ -1104,7 +1111,10 @@ ddftest = dd.from_pandas(dftest, npartitions=4).set_index("dummy") ddftest.to_parquet(tmpdir, partition_on="DatePart", engine=write_engine) ddftest_read = dd.read_parquet( - tmpdir, engine=read_engine, filters=[(("DatePart", "<=", "2018-01-02"))] + tmpdir, + index="dummy", + engine=read_engine, + filters=[(("DatePart", "<=", "2018-01-02"))], ) assert len(ddftest_read) == 2 @@ -1762,9 +1772,9 @@ } pdf = pd.DataFrame(data) ddf = dd.from_pandas(pdf, npartitions=2) - ddf.to_parquet(path, engine="pyarrow", partition_on="p") + ddf.to_parquet(path, engine="pyarrow", write_index=False, partition_on="p") - ddf = dd.read_parquet(path, engine="pyarrow") + ddf = dd.read_parquet(path, index=False, engine="pyarrow") ddf.astype({"b": np.float32}).compute() @@ -2122,6 +2132,36 @@ assert ddf3.npartitions == 4 +def test_split_row_groups_filter_pyarrow(tmpdir): + check_pyarrow() + tmp = str(tmpdir) + df = pd.DataFrame( + {"i32": np.arange(800, dtype=np.int32), "f": np.arange(800, dtype=np.float64)} + ) + df.index.name = "index" + search_val = 600 + filters = [("f", "==", search_val)] + + dd.from_pandas(df, npartitions=4).to_parquet( + tmp, append=True, engine="pyarrow", row_group_size=50 + ) + + ddf2 = dd.read_parquet(tmp, engine="pyarrow") + ddf3 = dd.read_parquet( + tmp, + engine="pyarrow", + gather_statistics=True, + split_row_groups=False, + filters=filters, + ) + + assert search_val in ddf3["i32"] + assert_eq( + ddf2[ddf2["i32"] == search_val].compute(), + ddf3[ddf3["i32"] == search_val].compute(), + ) + + def test_optimize_getitem_and_nonblockwise(tmpdir): check_engine() path = os.path.join(tmpdir, "path.parquet") @@ -2260,3 +2300,32 @@ ddf = dd.read_parquet(path, engine=engine) a, b = dask.optimize(ddf["A"], ddf) + + [email protected]("gather_statistics", [None, True]) +@write_read_engines() +def test_filter_nonpartition_columns( + tmpdir, write_engine, read_engine, gather_statistics +): + tmpdir = str(tmpdir) + df_write = pd.DataFrame( + { + "id": [1, 2, 3, 4] * 4, + "time": np.arange(16), + "random": np.random.choice(["cat", "dog"], size=16), + } + ) + ddf_write = dd.from_pandas(df_write, npartitions=4) + ddf_write.to_parquet( + tmpdir, write_index=False, partition_on=["id"], engine=write_engine + ) + ddf_read = dd.read_parquet( + tmpdir, + index=False, + engine=read_engine, + gather_statistics=gather_statistics, + filters=[(("time", "<", 5))], + ) + df_read = ddf_read.compute() + assert len(df_read) == len(df_read[df_read["time"] < 5]) + assert df_read["time"].max() < 5 diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/dask-2.13.0/dask/dataframe/multi.py new/dask-2.14.0/dask/dataframe/multi.py --- old/dask-2.13.0/dask/dataframe/multi.py 2020-03-25 20:06:02.000000000 +0100 +++ new/dask-2.14.0/dask/dataframe/multi.py 2020-04-03 22:23:54.000000000 +0200 @@ -85,6 +85,7 @@ is_series_like, asciitable, is_dataframe_like, + make_meta, ) @@ -927,7 +928,13 @@ def stack_partitions(dfs, divisions, join="outer"): """Concatenate partitions on axis=0 by doing a simple stack""" - meta = methods.concat([df._meta for df in dfs], join=join, filter_warning=False) + # Use _meta_nonempty as pandas.concat will incorrectly cast float to datetime + # for empty data frames. See https://github.com/pandas-dev/pandas/issues/32934. + meta = make_meta( + methods.concat( + [df._meta_nonempty for df in dfs], join=join, filter_warning=False + ) + ) empty = strip_unknown_categories(meta) name = "concat-{0}".format(tokenize(*dfs)) diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/dask-2.13.0/dask/dataframe/shuffle.py new/dask-2.14.0/dask/dataframe/shuffle.py --- old/dask-2.13.0/dask/dataframe/shuffle.py 2020-03-21 17:11:50.000000000 +0100 +++ new/dask-2.14.0/dask/dataframe/shuffle.py 2020-04-01 00:34:21.000000000 +0200 @@ -1,6 +1,10 @@ +import contextlib +import logging import math +import shutil from operator import getitem import uuid +import tempfile import warnings import tlz as toolz @@ -17,6 +21,8 @@ from ..utils import digit, insert, M from .utils import hash_object_dispatch, group_split_dispatch +logger = logging.getLogger(__name__) + def set_index( df, @@ -340,6 +346,8 @@ def __call__(self, *args, **kwargs): import partd + path = tempfile.mkdtemp(suffix=".partd", dir=self.tempdir) + try: partd_compression = ( getattr(partd.compressed, self.compression) @@ -353,10 +361,8 @@ self.compression ) ) - if self.tempdir: - file = partd.File(dir=self.tempdir) - else: - file = partd.File() + file = partd.File(path) + partd.file.cleanup_files.append(path) # Envelope partd file with compression, if set and available if partd_compression: file = partd_compression(file) @@ -407,19 +413,33 @@ dsk3 = {barrier_token: (barrier, list(dsk2))} # Collect groups - name = "shuffle-collect-" + token + name1 = "shuffle-collect-1" + token dsk4 = { - (name, i): (collect, p, i, df._meta, barrier_token) for i in range(npartitions) + (name1, i): (collect, p, i, df._meta, barrier_token) for i in range(npartitions) } + cleanup_token = "cleanup-" + always_new_token + barrier_token2 = "barrier2-" + always_new_token + # A task that depends on `cleanup-`, but has a small output + dsk5 = {(barrier_token2, i): (barrier, part) for i, part in enumerate(dsk4)} + # This indirectly depends on `cleanup-` and so runs after we're done using the disk + dsk6 = {cleanup_token: (cleanup_partd_files, p, list(dsk5))} + name = "shuffle-collect-2" + token + dsk7 = {(name, i): (_noop, (name1, i), cleanup_token) for i in range(npartitions)} divisions = (None,) * (npartitions + 1) - layer = toolz.merge(dsk1, dsk2, dsk3, dsk4) + layer = toolz.merge(dsk1, dsk2, dsk3, dsk4, dsk5, dsk6, dsk7) graph = HighLevelGraph.from_collections(name, layer, dependencies=dependencies) - return DataFrame(graph, name, df._meta, divisions) +def _noop(x, cleanup_token): + """ + A task that does nothing. + """ + return x + + def rearrange_by_column_tasks( df, column, max_branch=32, npartitions=None, ignore_index=False ): @@ -611,10 +631,38 @@ return 0 +def cleanup_partd_files(p, keys): + """ + Cleanup the files in a partd.File dataset. + + Parameters + ---------- + p : partd.Interface + File or Encode wrapping a file should be OK. + keys: List + Just for scheduling purposes, not actually used. + """ + import partd + + if isinstance(p, partd.Encode): + maybe_file = p.partd + else: + maybe_file + + if isinstance(maybe_file, partd.File): + path = maybe_file.path + else: + path = None + + if path: + shutil.rmtree(path, ignore_errors=True) + + def collect(p, part, meta, barrier_token): """ Collect partitions from partd, yield dataframes """ - res = p.get(part) - return res if len(res) > 0 else meta + with ensure_cleanup_on_exception(p): + res = p.get(part) + return res if len(res) > 0 else meta def set_partitions_pre(s, divisions): @@ -683,10 +731,31 @@ return group_split_dispatch(df, c.astype(np.int64), k, ignore_index=ignore_index) [email protected] +def ensure_cleanup_on_exception(p): + """Ensure a partd.File is cleaned up. + + We have several tasks referring to a `partd.File` instance. We want to + ensure that the file is cleaned up if and only if there's an exception + in the tasks using the `partd.File`. + """ + try: + yield + except Exception: + # the function (e.g. shuffle_group_3) had an internal exception. + # We'll cleanup our temporary files and re-raise. + try: + p.drop() + except Exception: + logger.exception("ignoring exception in ensure_cleanup_on_exception") + raise + + def shuffle_group_3(df, col, npartitions, p): - g = df.groupby(col) - d = {i: g.get_group(i) for i in g.groups} - p.append(d, fsync=True) + with ensure_cleanup_on_exception(p): + g = df.groupby(col) + d = {i: g.get_group(i) for i in g.groups} + p.append(d, fsync=True) def set_index_post_scalar(df, index_name, drop, column_dtype): diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/dask-2.13.0/dask/dataframe/tests/test_multi.py new/dask-2.14.0/dask/dataframe/tests/test_multi.py --- old/dask-2.13.0/dask/dataframe/tests/test_multi.py 2020-03-25 20:06:02.000000000 +0100 +++ new/dask-2.14.0/dask/dataframe/tests/test_multi.py 2020-04-03 22:23:54.000000000 +0200 @@ -585,9 +585,7 @@ [ (1.0, 1), (1.0, "one"), - # See https://github.com/dask/dask/issues/5968 and - # https://github.com/pandas-dev/pandas/issues/32934 - pytest.param(1.0, pd.to_datetime("1970-01-01"), marks=pytest.mark.xfail), + (1.0, pd.to_datetime("1970-01-01")), (1, "one"), (1, pd.to_datetime("1970-01-01")), ("one", pd.to_datetime("1970-01-01")), diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/dask-2.13.0/dask/dataframe/tests/test_shuffle.py new/dask-2.14.0/dask/dataframe/tests/test_shuffle.py --- old/dask-2.13.0/dask/dataframe/tests/test_shuffle.py 2020-03-21 17:11:50.000000000 +0100 +++ new/dask-2.14.0/dask/dataframe/tests/test_shuffle.py 2020-04-02 01:11:50.000000000 +0200 @@ -1,6 +1,8 @@ import itertools import os import random +import tempfile +from unittest import mock import pandas as pd import pytest @@ -24,7 +26,6 @@ remove_nans, ) from dask.dataframe.utils import assert_eq, make_meta -from dask.compatibility import PY_VERSION dsk = { @@ -277,6 +278,44 @@ assert sum(i in set(part._partitions) for part in parts) == 1 +def test_rearrange_cleanup(): + df = pd.DataFrame({"x": np.random.random(10)}) + ddf = dd.from_pandas(df, npartitions=4) + ddf2 = ddf.assign(_partitions=ddf.x % 4) + + tmpdir = tempfile.mkdtemp() + + with dask.config.set(temporay_directory=str(tmpdir)): + result = rearrange_by_column(ddf2, "_partitions", max_branch=32, shuffle="disk") + result.compute(scheduler="processes") + + assert len(os.listdir(tmpdir)) == 0 + + +def mock_shuffle_group_3(df, col, npartitions, p): + raise ValueError("Mock exception!") + + +def test_rearrange_disk_cleanup_with_exception(): + # ensure temporary files are cleaned up when there's an internal exception. + + with mock.patch("dask.dataframe.shuffle.shuffle_group_3", new=mock_shuffle_group_3): + df = pd.DataFrame({"x": np.random.random(10)}) + ddf = dd.from_pandas(df, npartitions=4) + ddf2 = ddf.assign(_partitions=ddf.x % 4) + + tmpdir = tempfile.mkdtemp() + + with dask.config.set(temporay_directory=str(tmpdir)): + with pytest.raises(ValueError, match="Mock exception!"): + result = rearrange_by_column( + ddf2, "_partitions", max_branch=32, shuffle="disk" + ) + result.compute(scheduler="processes") + + assert len(os.listdir(tmpdir)) == 0 + + def test_rearrange_by_column_with_narrow_divisions(): from dask.dataframe.tests.test_multi import list_eq @@ -779,34 +818,6 @@ compute_divisions(c) -# TODO: Fix sporadic failure on Python 3.8 and remove this xfail mark [email protected](PY_VERSION >= "3.8", reason="Flaky test", strict=False) -def test_temporary_directory(tmpdir): - from multiprocessing.pool import Pool - - df = pd.DataFrame( - { - "x": np.random.random(100), - "y": np.random.random(100), - "z": np.random.random(100), - } - ) - ddf = dd.from_pandas(df, npartitions=10, name="x", sort=False) - - # We use a pool to avoid a race condition between the pool close - # cleaning up files, and the assert below. - pool = Pool(4) - with pool: - with dask.config.set( - temporary_directory=str(tmpdir), scheduler="processes", pool=pool - ): - ddf2 = ddf.set_index("x", shuffle="disk") - ddf2.compute() - assert any( - fn.endswith(".partd") for fn in os.listdir(str(tmpdir)) - ), os.listdir(str(tmpdir)) - - def test_empty_partitions(): # See https://github.com/dask/dask/issues/2408 df = pd.DataFrame({"a": list(range(10))}) diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/dask-2.13.0/dask/delayed.py new/dask-2.14.0/dask/delayed.py --- old/dask-2.13.0/dask/delayed.py 2020-03-21 17:11:50.000000000 +0100 +++ new/dask-2.14.0/dask/delayed.py 2020-03-27 18:08:57.000000000 +0100 @@ -231,6 +231,15 @@ to hashing content. Note that this only affects the name of the object wrapped by this call to delayed, and *not* the output of delayed function calls - for that use ``dask_key_name=`` as described below. + + .. note:: + + Because this ``name`` is used as the key in task graphs, you should + ensure that it uniquely identifies ``obj``. If you'd like to provide + a descriptive name that is still unique, combine the descriptive name + with :func:`dask.base.tokenize` of the ``array_like``. See + :ref:`graphs` for more. + pure : bool, optional Indicates whether calling the resulting ``Delayed`` object is a pure operation. If True, arguments to the call are hashed to produce @@ -338,11 +347,15 @@ The key name of a delayed object is hashed by default if ``pure=True`` or is generated randomly if ``pure=False`` (default). To explicitly set the - name, you can use the ``name`` keyword: - - >>> a = delayed([1, 2, 3], name='mylist') - >>> a - Delayed('mylist') + name, you can use the ``name`` keyword. To ensure that the key is unique + you should include the tokenized value as well, or otherwise ensure that + it's unique: + + >>> from dask.base import tokenize + >>> data = [1, 2, 3] + >>> a = delayed(data, name='mylist-' + tokenize(data)) + >>> a # doctest: +SKIP + Delayed('mylist-55af65871cb378a4fa6de1660c3e8fb7') Delayed results act as a proxy to the underlying object. Many operators are supported: diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/dask-2.13.0/dask.egg-info/PKG-INFO new/dask-2.14.0/dask.egg-info/PKG-INFO --- old/dask-2.13.0/dask.egg-info/PKG-INFO 2020-03-25 20:23:38.000000000 +0100 +++ new/dask-2.14.0/dask.egg-info/PKG-INFO 2020-04-03 22:40:58.000000000 +0200 @@ -1,6 +1,6 @@ Metadata-Version: 2.1 Name: dask -Version: 2.13.0 +Version: 2.14.0 Summary: Parallel PyData with Task Scheduling Home-page: https://github.com/dask/dask/ Maintainer: Matthew Rocklin diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/dask-2.13.0/docs/source/changelog.rst new/dask-2.14.0/docs/source/changelog.rst --- old/dask-2.13.0/docs/source/changelog.rst 2020-03-25 20:21:24.000000000 +0100 +++ new/dask-2.14.0/docs/source/changelog.rst 2020-04-03 22:37:39.000000000 +0200 @@ -1,6 +1,32 @@ Changelog ========= +2.14.0 / 2020-04-03 +------------------- + +Array ++++++ + +- Added ``np.iscomplexobj`` implementation (:pr:`6045`) `Tom Augspurger`_ + +Core +++++ + +- Update ``test_rearrange_disk_cleanup_with_exception`` to pass without cloudpickle installed (:pr:`6052`) `James Bourbeau`_ +- Fixed flaky ``test-rearrange`` (:pr:`5977`) `Tom Augspurger`_ + +DataFrame ++++++++++ + +- Use ``_meta_nonempty`` for dtype casting in ``stack_partitions`` (:pr:`6061`) `mlondschien`_ +- Fix bugs in ``_metadata`` creation and filtering in parquet ``ArrowEngine`` (:pr:`6023`) `Richard (Rick) Zamora`_ + +Documentation ++++++++++++++ + +- DOC: Add name caveats (:pr:`6040`) `Tom Augspurger`_ + + 2.13.0 / 2020-03-25 ------------------- @@ -3045,3 +3071,4 @@ .. _`psimaj`: https://github.com/psimaj .. _`mlondschien`: https://github.com/mlondschien .. _`petiop`: https://github.com/petiop +.. _`Richard (Rick) Zamora`: https://github.com/rjzamora diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/dask-2.13.0/docs/source/graphs.rst new/dask-2.14.0/docs/source/graphs.rst --- old/dask-2.13.0/docs/source/graphs.rst 2020-01-28 18:29:39.000000000 +0100 +++ new/dask-2.14.0/docs/source/graphs.rst 2020-03-27 18:08:57.000000000 +0100 @@ -1,3 +1,5 @@ +.. _graphs: + Task Graphs ===========
