Hello community,

here is the log from the commit of package python-dask for openSUSE:Factory 
checked in at 2020-04-13 12:53:26
++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Comparing /work/SRC/openSUSE:Factory/python-dask (Old)
 and      /work/SRC/openSUSE:Factory/.python-dask.new.3248 (New)
++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++

Package is "python-dask"

Mon Apr 13 12:53:26 2020 rev:32 rq:793342 version:2.14.0

Changes:
--------
--- /work/SRC/openSUSE:Factory/python-dask/python-dask.changes  2020-03-29 
14:27:54.510165399 +0200
+++ /work/SRC/openSUSE:Factory/.python-dask.new.3248/python-dask.changes        
2020-04-13 12:53:28.476653032 +0200
@@ -1,0 +2,18 @@
+Sat Apr 11 21:45:43 UTC 2020 - Arun Persaud <[email protected]>
+
+- update to version 2.14.0:
+  * Array
+    + Added np.iscomplexobj implementation (:pr:`6045`) Tom Augspurger
+  * Core
+    + Update test_rearrange_disk_cleanup_with_exception to pass
+      without cloudpickle installed (:pr:`6052`) James Bourbeau
+    + Fixed flaky test-rearrange (:pr:`5977`) Tom Augspurger
+  * DataFrame
+    + Use _meta_nonempty for dtype casting in stack_partitions
+      (:pr:`6061`) mlondschien
+    + Fix bugs in _metadata creation and filtering in parquet
+      ArrowEngine (:pr:`6023`) Richard (Rick) Zamora
+  * Documentation
+    + DOC: Add name caveats (:pr:`6040`) Tom Augspurger
+
+-------------------------------------------------------------------

Old:
----
  dask-2.13.0.tar.gz

New:
----
  dask-2.14.0.tar.gz

++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++

Other differences:
------------------
++++++ python-dask.spec ++++++
--- /var/tmp/diff_new_pack.haRUdJ/_old  2020-04-13 12:53:29.352653414 +0200
+++ /var/tmp/diff_new_pack.haRUdJ/_new  2020-04-13 12:53:29.356653415 +0200
@@ -27,7 +27,7 @@
 %endif
 %define         skip_python2 1
 Name:           python-dask%{psuffix}
-Version:        2.13.0
+Version:        2.14.0
 Release:        0
 Summary:        Minimal task scheduling abstraction
 License:        BSD-3-Clause

++++++ dask-2.13.0.tar.gz -> dask-2.14.0.tar.gz ++++++
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/dask-2.13.0/PKG-INFO new/dask-2.14.0/PKG-INFO
--- old/dask-2.13.0/PKG-INFO    2020-03-25 20:23:38.611427800 +0100
+++ new/dask-2.14.0/PKG-INFO    2020-04-03 22:41:01.324165000 +0200
@@ -1,6 +1,6 @@
 Metadata-Version: 2.1
 Name: dask
-Version: 2.13.0
+Version: 2.14.0
 Summary: Parallel PyData with Task Scheduling
 Home-page: https://github.com/dask/dask/
 Maintainer: Matthew Rocklin
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/dask-2.13.0/dask/_version.py 
new/dask-2.14.0/dask/_version.py
--- old/dask-2.13.0/dask/_version.py    2020-03-25 20:23:38.614493400 +0100
+++ new/dask-2.14.0/dask/_version.py    2020-04-03 22:41:01.328230600 +0200
@@ -11,8 +11,8 @@
 {
  "dirty": false,
  "error": null,
- "full-revisionid": "c743feab0a92d20e248d32e97dcfbe567f4dfbd0",
- "version": "2.13.0"
+ "full-revisionid": "824a69cdc258d9cae1b33f5052e0bd78c203d2e1",
+ "version": "2.14.0"
 }
 '''  # END VERSION_JSON
 
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/dask-2.13.0/dask/array/core.py 
new/dask-2.14.0/dask/array/core.py
--- old/dask-2.13.0/dask/array/core.py  2020-03-21 17:11:50.000000000 +0100
+++ new/dask-2.14.0/dask/array/core.py  2020-03-27 18:08:57.000000000 +0100
@@ -412,7 +412,7 @@
     drop_axis=[],
     new_axis=None,
     meta=None,
-    **kwargs
+    **kwargs,
 ):
     """ Map a function across all blocks of a dask array.
 
@@ -811,7 +811,7 @@
     regions=None,
     compute=True,
     return_stored=False,
-    **kwargs
+    **kwargs,
 ):
     """ Store dask arrays in array-like objects, overwrite data in target
 
@@ -2696,6 +2696,15 @@
         changed by installing cityhash, xxhash or murmurhash. If installed,
         a large-factor speedup can be obtained in the tokenisation step.
         Use ``name=False`` to generate a random name instead of hashing (fast)
+
+        .. note::
+
+           Because this ``name`` is used as the key in task graphs, you should
+           ensure that it uniquely identifies the data contained within. If
+           you'd like to provide a descriptive name that is still unique, 
combine
+           the descriptive name with :func:`dask.base.tokenize` of the
+           ``array_like``. See :ref:`graphs` for more.
+
     lock : bool or Lock, optional
         If ``x`` doesn't support concurrent reads then provide a lock here, or
         pass in True to have dask.array create one for you.
@@ -2731,6 +2740,12 @@
     >>> a = da.from_array(x, chunks='auto')  # doctest: +SKIP
     >>> a = da.from_array(x, chunks='100 MiB')  # doctest: +SKIP
     >>> a = da.from_array(x)  # doctest: +SKIP
+
+    If providing a name, ensure that it is unique
+
+    >>> import dask.base
+    >>> token = dask.base.tokenize(x)  # doctest: +SKIP
+    >>> a = da.from_array('myarray-' + token)  # doctest: +SKIP
     """
     if isinstance(x, Array):
         raise ValueError(
@@ -2852,7 +2867,7 @@
     overwrite=False,
     compute=True,
     return_stored=False,
-    **kwargs
+    **kwargs,
 ):
     """Save array to the zarr storage format
 
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/dask-2.13.0/dask/array/routines.py 
new/dask-2.14.0/dask/array/routines.py
--- old/dask-2.13.0/dask/array/routines.py      2020-03-21 17:11:50.000000000 
+0100
+++ new/dask-2.14.0/dask/array/routines.py      2020-04-01 00:34:20.000000000 
+0200
@@ -808,6 +808,12 @@
     return a.map_blocks(np.round, decimals=decimals, dtype=a.dtype)
 
 
+@implements(np.iscomplexobj)
+@derived_from(np)
+def iscomplexobj(x):
+    return issubclass(x.dtype.type, np.complexfloating)
+
+
 def _unique_internal(ar, indices, counts, return_inverse=False):
     """
     Helper/wrapper function for :func:`numpy.unique`.
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/dask-2.13.0/dask/array/tests/test_routines.py 
new/dask-2.14.0/dask/array/tests/test_routines.py
--- old/dask-2.13.0/dask/array/tests/test_routines.py   2020-03-21 
17:11:50.000000000 +0100
+++ new/dask-2.14.0/dask/array/tests/test_routines.py   2020-04-01 
00:34:20.000000000 +0200
@@ -1740,3 +1740,11 @@
 
     with pytest.warns(RuntimeWarning):
         da.average(d_a, weights=da.zeros_like(d_a)).compute()
+
+
+def test_iscomplexobj():
+    a = da.from_array(np.array([1, 2]), 2)
+    assert np.iscomplexobj(a) is False
+
+    a = da.from_array(np.array([1, 2 + 0j]), 2)
+    assert np.iscomplexobj(a) is True
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/dask-2.13.0/dask/dataframe/io/parquet/arrow.py 
new/dask-2.14.0/dask/dataframe/io/parquet/arrow.py
--- old/dask-2.13.0/dask/dataframe/io/parquet/arrow.py  2020-03-11 
00:05:35.000000000 +0100
+++ new/dask-2.14.0/dask/dataframe/io/parquet/arrow.py  2020-04-01 
00:34:21.000000000 +0200
@@ -1,10 +1,12 @@
 from functools import partial
 from collections import OrderedDict
 import json
+import warnings
 
 import pandas as pd
 import pyarrow as pa
 import pyarrow.parquet as pq
+from pyarrow.compat import guid
 from ....utils import natural_sort_key, getargspec
 from ..utils import _get_pyarrow_dtypes, _meta_from_dtypes
 from ...utils import clear_known_categories
@@ -61,6 +63,37 @@
     return tuple(result.values())
 
 
+def _merge_statistics(stats, s):
+    """ Update `stats` with vaules in `s`
+    """
+    stats[-1]["total_byte_size"] += s["total_byte_size"]
+    stats[-1]["num-rows"] += s["num-rows"]
+    ncols = len(stats[-1]["columns"])
+    ncols_n = len(s["columns"])
+    if ncols != ncols_n:
+        raise ValueError(f"Column count not equal ({ncols} vs {ncols_n})")
+    for i in range(ncols):
+        name = stats[-1]["columns"][i]["name"]
+        j = i
+        for ii in range(ncols):
+            if name == s["columns"][j]["name"]:
+                break
+            if ii == ncols - 1:
+                raise KeyError(f"Column statistics missing for {name}")
+            j = (j + 1) % ncols
+
+        min_n = s["columns"][j]["min"]
+        max_n = s["columns"][j]["max"]
+        null_count_n = s["columns"][j]["null_count"]
+
+        min_i = stats[-1]["columns"][i]["min"]
+        max_i = stats[-1]["columns"][i]["max"]
+        stats[-1]["columns"][i]["min"] = min(min_i, min_n)
+        stats[-1]["columns"][i]["max"] = min(max_i, max_n)
+        stats[-1]["columns"][i]["null_count"] += null_count_n
+    return True
+
+
 def _determine_dataset_parts(fs, paths, gather_statistics, filters, 
dataset_kwargs):
     """ Determine how to access metadata and break read into ``parts``
 
@@ -119,6 +152,55 @@
     return parts, dataset
 
 
+def _write_partitioned(
+    table, root_path, partition_cols, fs, preserve_index=True, **kwargs
+):
+    """ Write table to a partitioned dataset with pyarrow.
+
+        Logic copied from pyarrow.parquet.
+        (arrow/python/pyarrow/parquet.py::write_to_dataset)
+
+        TODO: Remove this in favor of pyarrow's `write_to_dataset`
+              once ARROW-8244 is addressed.
+    """
+    fs.mkdirs(root_path, exist_ok=True)
+
+    df = table.to_pandas(ignore_metadata=True)
+    partition_keys = [df[col] for col in partition_cols]
+    data_df = df.drop(partition_cols, axis="columns")
+    data_cols = df.columns.drop(partition_cols)
+    if len(data_cols) == 0 and not preserve_index:
+        raise ValueError("No data left to save outside partition columns")
+
+    subschema = table.schema
+    for col in table.schema.names:
+        if col in partition_cols:
+            subschema = subschema.remove(subschema.get_field_index(col))
+
+    md_list = []
+    for keys, subgroup in data_df.groupby(partition_keys):
+        if not isinstance(keys, tuple):
+            keys = (keys,)
+        subdir = fs.sep.join(
+            [
+                "{colname}={value}".format(colname=name, value=val)
+                for name, val in zip(partition_cols, keys)
+            ]
+        )
+        subtable = pa.Table.from_pandas(
+            subgroup, preserve_index=False, schema=subschema, safe=False
+        )
+        prefix = fs.sep.join([root_path, subdir])
+        fs.mkdir(prefix, exists_ok=True)
+        outfile = guid() + ".parquet"
+        full_path = fs.sep.join([prefix, outfile])
+        with fs.open(full_path, "wb") as f:
+            pq.write_table(subtable, f, metadata_collector=md_list, **kwargs)
+        md_list[-1].set_file_path(fs.sep.join([subdir, outfile]))
+
+    return md_list
+
+
 class ArrowEngine(Engine):
     @staticmethod
     def read_metadata(
@@ -138,6 +220,15 @@
         parts, dataset = _determine_dataset_parts(
             fs, paths, gather_statistics, filters, kwargs.get("dataset", {})
         )
+        # Check if the column-chunk file_path's are set in "_metadata".
+        # If available, we can use the path to sort the row-groups
+        col_chunk_paths = False
+        if dataset.metadata:
+            col_chunk_paths = all(
+                dataset.metadata.row_group(i).column(0).file_path is not None
+                for i in range(dataset.metadata.num_row_groups)
+            )
+
         # TODO: Call to `_determine_dataset_parts` uses `pq.ParquetDataset`
         # to define the `dataset` object. `split_row_groups` should be passed
         # to that constructor once it is supported (see ARROW-2801).
@@ -145,11 +236,39 @@
             partitions = [
                 n for n in dataset.partitions.partition_names if n is not None
             ]
-            if partitions:
-                split_row_groups = False
+            if partitions and dataset.metadata:
+                # Dont use dataset.metadata for partitioned datasets, unless
+                # the column-chunk metadata includes the `"file_path"`.
+                # The order of dataset.metadata.row_group items is often
+                # different than the order of `dataset.pieces`.
+                if not col_chunk_paths or (
+                    len(dataset.pieces) != dataset.metadata.num_row_groups
+                ):
+                    dataset.schema = dataset.metadata.schema
+                    dataset.metadata = None
         else:
             partitions = []
 
+        # Statistics are currently collected at the row-group level only.
+        # Therefore, we cannot perform filtering with split_row_groups=False.
+        # For "partitioned" datasets, each file (usually) corresponds to a
+        # row-group anyway.
+        # TODO: Map row-group statistics onto file pieces for filtering.
+        #       This shouldn't be difficult if `col_chunk_paths==True`
+        if not split_row_groups and not col_chunk_paths:
+            if gather_statistics is None and not partitions:
+                gather_statistics = False
+                if filters:
+                    raise ValueError(
+                        "Filters not supported with split_row_groups=False "
+                        "(unless proper _metadata is available)."
+                    )
+            if gather_statistics and not partitions:
+                raise ValueError(
+                    "Statistics not supported with split_row_groups=False."
+                    "(unless proper _metadata is available)."
+                )
+
         if dataset.metadata:
             schema = dataset.metadata.schema.to_arrow_schema()
         else:
@@ -218,12 +337,21 @@
             and dataset.metadata.num_row_groups >= len(pieces)
         ):
             gather_statistics = True
-            # Don't gather stats by default if this is a partitioned dataset
-            if dataset.metadata.num_row_groups != len(pieces) and partitions:
-                gather_statistics = False
         if not pieces:
             gather_statistics = False
 
+        if filters:
+            # Filters may require us to gather statistics
+            if gather_statistics is False and partitions:
+                warnings.warn(
+                    "Filtering with gather_statistics=False. "
+                    "Only partition columns will be filtered correctly."
+                )
+            elif gather_statistics is False:
+                raise ValueError("Cannot apply filters with 
gather_statistics=False")
+            elif not gather_statistics:
+                gather_statistics = True
+
         row_groups_per_piece = None
         if gather_statistics:
             # Read from _metadata file
@@ -232,6 +360,16 @@
                     dataset.metadata.row_group(i)
                     for i in range(dataset.metadata.num_row_groups)
                 ]
+
+                # Re-order row-groups by path name if known
+                if col_chunk_paths:
+                    row_groups = sorted(
+                        row_groups,
+                        key=lambda row_group: natural_sort_key(
+                            row_group.column(0).file_path
+                        ),
+                    )
+
                 if split_row_groups and len(dataset.paths) == 1:
                     row_groups_per_piece = _get_row_groups_per_piece(
                         pieces, dataset.metadata, dataset.paths[0], fs
@@ -250,6 +388,7 @@
         if gather_statistics:
             stats = []
             skip_cols = set()  # Columns with min/max = None detected
+            path_last = None
             for ri, row_group in enumerate(row_groups):
                 s = {"num-rows": row_group.num_rows, "columns": []}
                 for i, name in enumerate(names):
@@ -274,6 +413,17 @@
                             )
                         s["columns"].append(d)
                 s["total_byte_size"] = row_group.total_byte_size
+                if col_chunk_paths:
+                    s["file_path_0"] = row_group.column(0).file_path
+                    if not split_row_groups and (s["file_path_0"] == 
path_last):
+                        # Rather than appending a new "row-group", just merge
+                        # new `s` statistics into last element of `stats`.
+                        # Note that each stats element will now correspond to 
an
+                        # entire file (rather than actual "row-groups")
+                        _merge_statistics(stats, s)
+                        continue
+                    else:
+                        path_last = s["file_path_0"]
                 stats.append(s)
         else:
             stats = None
@@ -307,7 +457,8 @@
                         parts.append((piece.path, rg, piece.partition_keys))
                         # Setting file_path here, because it may be
                         # missing from the row-group/column-chunk stats
-                        stats[rg_tot]["file_path_0"] = piece.path
+                        if "file_path_0" not in stats[rg_tot]:
+                            stats[rg_tot]["file_path_0"] = piece.path
                         rg_tot += 1
             else:
                 parts = [
@@ -484,22 +635,22 @@
         schema=None,
         **kwargs,
     ):
-        md_list = []
+        _meta = None
         preserve_index = False
         if index_cols:
             df = df.set_index(index_cols)
             preserve_index = True
         t = pa.Table.from_pandas(df, preserve_index=preserve_index, 
schema=schema)
         if partition_on:
-            pq.write_to_dataset(
-                t,
-                path,
-                partition_cols=partition_on,
-                filesystem=fs,
-                metadata_collector=md_list,
-                **kwargs,
+            md_list = _write_partitioned(
+                t, path, partition_on, fs, preserve_index=preserve_index, 
**kwargs
             )
+            if md_list:
+                _meta = md_list[0]
+                for i in range(1, len(md_list)):
+                    _meta.append_row_groups(md_list[i])
         else:
+            md_list = []
             with fs.open(fs.sep.join([path, filename]), "wb") as fil:
                 pq.write_table(
                     t,
@@ -509,10 +660,11 @@
                     **kwargs,
                 )
             if md_list:
-                md_list[0].set_file_path(filename)
+                _meta = md_list[0]
+                _meta.set_file_path(filename)
         # Return the schema needed to write the metadata
         if return_metadata:
-            return [{"schema": t.schema, "meta": md_list[0]}]
+            return [{"schema": t.schema, "meta": _meta}]
         else:
             return []
 
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/dask-2.13.0/dask/dataframe/io/tests/test_parquet.py 
new/dask-2.14.0/dask/dataframe/io/tests/test_parquet.py
--- old/dask-2.13.0/dask/dataframe/io/tests/test_parquet.py     2020-03-21 
17:11:50.000000000 +0100
+++ new/dask-2.14.0/dask/dataframe/io/tests/test_parquet.py     2020-04-01 
00:34:21.000000000 +0200
@@ -639,7 +639,9 @@
         engine=engine,
     )
 
-    out = dd.read_parquet(tmp, engine=engine, gather_statistics=True).compute()
+    out = dd.read_parquet(
+        tmp, engine=engine, index="index", gather_statistics=True
+    ).compute()
     out["lon"] = out.lon.astype("int")  # just to pass assert
     # sort required since partitioning breaks index order
     assert_eq(
@@ -1000,9 +1002,14 @@
             ("partition_column", pa.int64()),
         ]
     )
-    ddf.to_parquet(
-        str(tmpdir), engine="pyarrow", partition_on="partition_column", 
schema=schema
+    fut = ddf.to_parquet(
+        str(tmpdir),
+        compute=False,
+        engine="pyarrow",
+        partition_on="partition_column",
+        schema=schema,
     )
+    fut.compute(scheduler="single-threaded")
     ddf_after_write = (
         dd.read_parquet(str(tmpdir), engine="pyarrow", gather_statistics=False)
         .compute()
@@ -1091,7 +1098,7 @@
         assert set(df.bb[df.aa == val]) == set(out.bb[out.aa == val])
 
 
-@write_read_engines_xfail
+@write_read_engines()
 def test_filters_categorical(tmpdir, write_engine, read_engine):
     tmpdir = str(tmpdir)
     cats = ["2018-01-01", "2018-01-02", "2018-01-03", "2018-01-04"]
@@ -1104,7 +1111,10 @@
     ddftest = dd.from_pandas(dftest, npartitions=4).set_index("dummy")
     ddftest.to_parquet(tmpdir, partition_on="DatePart", engine=write_engine)
     ddftest_read = dd.read_parquet(
-        tmpdir, engine=read_engine, filters=[(("DatePart", "<=", 
"2018-01-02"))]
+        tmpdir,
+        index="dummy",
+        engine=read_engine,
+        filters=[(("DatePart", "<=", "2018-01-02"))],
     )
     assert len(ddftest_read) == 2
 
@@ -1762,9 +1772,9 @@
     }
     pdf = pd.DataFrame(data)
     ddf = dd.from_pandas(pdf, npartitions=2)
-    ddf.to_parquet(path, engine="pyarrow", partition_on="p")
+    ddf.to_parquet(path, engine="pyarrow", write_index=False, partition_on="p")
 
-    ddf = dd.read_parquet(path, engine="pyarrow")
+    ddf = dd.read_parquet(path, index=False, engine="pyarrow")
 
     ddf.astype({"b": np.float32}).compute()
 
@@ -2122,6 +2132,36 @@
     assert ddf3.npartitions == 4
 
 
+def test_split_row_groups_filter_pyarrow(tmpdir):
+    check_pyarrow()
+    tmp = str(tmpdir)
+    df = pd.DataFrame(
+        {"i32": np.arange(800, dtype=np.int32), "f": np.arange(800, 
dtype=np.float64)}
+    )
+    df.index.name = "index"
+    search_val = 600
+    filters = [("f", "==", search_val)]
+
+    dd.from_pandas(df, npartitions=4).to_parquet(
+        tmp, append=True, engine="pyarrow", row_group_size=50
+    )
+
+    ddf2 = dd.read_parquet(tmp, engine="pyarrow")
+    ddf3 = dd.read_parquet(
+        tmp,
+        engine="pyarrow",
+        gather_statistics=True,
+        split_row_groups=False,
+        filters=filters,
+    )
+
+    assert search_val in ddf3["i32"]
+    assert_eq(
+        ddf2[ddf2["i32"] == search_val].compute(),
+        ddf3[ddf3["i32"] == search_val].compute(),
+    )
+
+
 def test_optimize_getitem_and_nonblockwise(tmpdir):
     check_engine()
     path = os.path.join(tmpdir, "path.parquet")
@@ -2260,3 +2300,32 @@
 
     ddf = dd.read_parquet(path, engine=engine)
     a, b = dask.optimize(ddf["A"], ddf)
+
+
[email protected]("gather_statistics", [None, True])
+@write_read_engines()
+def test_filter_nonpartition_columns(
+    tmpdir, write_engine, read_engine, gather_statistics
+):
+    tmpdir = str(tmpdir)
+    df_write = pd.DataFrame(
+        {
+            "id": [1, 2, 3, 4] * 4,
+            "time": np.arange(16),
+            "random": np.random.choice(["cat", "dog"], size=16),
+        }
+    )
+    ddf_write = dd.from_pandas(df_write, npartitions=4)
+    ddf_write.to_parquet(
+        tmpdir, write_index=False, partition_on=["id"], engine=write_engine
+    )
+    ddf_read = dd.read_parquet(
+        tmpdir,
+        index=False,
+        engine=read_engine,
+        gather_statistics=gather_statistics,
+        filters=[(("time", "<", 5))],
+    )
+    df_read = ddf_read.compute()
+    assert len(df_read) == len(df_read[df_read["time"] < 5])
+    assert df_read["time"].max() < 5
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/dask-2.13.0/dask/dataframe/multi.py 
new/dask-2.14.0/dask/dataframe/multi.py
--- old/dask-2.13.0/dask/dataframe/multi.py     2020-03-25 20:06:02.000000000 
+0100
+++ new/dask-2.14.0/dask/dataframe/multi.py     2020-04-03 22:23:54.000000000 
+0200
@@ -85,6 +85,7 @@
     is_series_like,
     asciitable,
     is_dataframe_like,
+    make_meta,
 )
 
 
@@ -927,7 +928,13 @@
 
 def stack_partitions(dfs, divisions, join="outer"):
     """Concatenate partitions on axis=0 by doing a simple stack"""
-    meta = methods.concat([df._meta for df in dfs], join=join, 
filter_warning=False)
+    # Use _meta_nonempty as pandas.concat will incorrectly cast float to 
datetime
+    # for empty data frames. See 
https://github.com/pandas-dev/pandas/issues/32934.
+    meta = make_meta(
+        methods.concat(
+            [df._meta_nonempty for df in dfs], join=join, filter_warning=False
+        )
+    )
     empty = strip_unknown_categories(meta)
 
     name = "concat-{0}".format(tokenize(*dfs))
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/dask-2.13.0/dask/dataframe/shuffle.py 
new/dask-2.14.0/dask/dataframe/shuffle.py
--- old/dask-2.13.0/dask/dataframe/shuffle.py   2020-03-21 17:11:50.000000000 
+0100
+++ new/dask-2.14.0/dask/dataframe/shuffle.py   2020-04-01 00:34:21.000000000 
+0200
@@ -1,6 +1,10 @@
+import contextlib
+import logging
 import math
+import shutil
 from operator import getitem
 import uuid
+import tempfile
 import warnings
 
 import tlz as toolz
@@ -17,6 +21,8 @@
 from ..utils import digit, insert, M
 from .utils import hash_object_dispatch, group_split_dispatch
 
+logger = logging.getLogger(__name__)
+
 
 def set_index(
     df,
@@ -340,6 +346,8 @@
     def __call__(self, *args, **kwargs):
         import partd
 
+        path = tempfile.mkdtemp(suffix=".partd", dir=self.tempdir)
+
         try:
             partd_compression = (
                 getattr(partd.compressed, self.compression)
@@ -353,10 +361,8 @@
                     self.compression
                 )
             )
-        if self.tempdir:
-            file = partd.File(dir=self.tempdir)
-        else:
-            file = partd.File()
+        file = partd.File(path)
+        partd.file.cleanup_files.append(path)
         # Envelope partd file with compression, if set and available
         if partd_compression:
             file = partd_compression(file)
@@ -407,19 +413,33 @@
     dsk3 = {barrier_token: (barrier, list(dsk2))}
 
     # Collect groups
-    name = "shuffle-collect-" + token
+    name1 = "shuffle-collect-1" + token
     dsk4 = {
-        (name, i): (collect, p, i, df._meta, barrier_token) for i in 
range(npartitions)
+        (name1, i): (collect, p, i, df._meta, barrier_token) for i in 
range(npartitions)
     }
+    cleanup_token = "cleanup-" + always_new_token
+    barrier_token2 = "barrier2-" + always_new_token
+    # A task that depends on `cleanup-`, but has a small output
+    dsk5 = {(barrier_token2, i): (barrier, part) for i, part in 
enumerate(dsk4)}
+    # This indirectly depends on `cleanup-` and so runs after we're done using 
the disk
+    dsk6 = {cleanup_token: (cleanup_partd_files, p, list(dsk5))}
 
+    name = "shuffle-collect-2" + token
+    dsk7 = {(name, i): (_noop, (name1, i), cleanup_token) for i in 
range(npartitions)}
     divisions = (None,) * (npartitions + 1)
 
-    layer = toolz.merge(dsk1, dsk2, dsk3, dsk4)
+    layer = toolz.merge(dsk1, dsk2, dsk3, dsk4, dsk5, dsk6, dsk7)
     graph = HighLevelGraph.from_collections(name, layer, 
dependencies=dependencies)
-
     return DataFrame(graph, name, df._meta, divisions)
 
 
+def _noop(x, cleanup_token):
+    """
+    A task that does nothing.
+    """
+    return x
+
+
 def rearrange_by_column_tasks(
     df, column, max_branch=32, npartitions=None, ignore_index=False
 ):
@@ -611,10 +631,38 @@
     return 0
 
 
+def cleanup_partd_files(p, keys):
+    """
+    Cleanup the files in a partd.File dataset.
+
+    Parameters
+    ----------
+    p : partd.Interface
+        File or Encode wrapping a file should be OK.
+    keys: List
+        Just for scheduling purposes, not actually used.
+    """
+    import partd
+
+    if isinstance(p, partd.Encode):
+        maybe_file = p.partd
+    else:
+        maybe_file
+
+    if isinstance(maybe_file, partd.File):
+        path = maybe_file.path
+    else:
+        path = None
+
+    if path:
+        shutil.rmtree(path, ignore_errors=True)
+
+
 def collect(p, part, meta, barrier_token):
     """ Collect partitions from partd, yield dataframes """
-    res = p.get(part)
-    return res if len(res) > 0 else meta
+    with ensure_cleanup_on_exception(p):
+        res = p.get(part)
+        return res if len(res) > 0 else meta
 
 
 def set_partitions_pre(s, divisions):
@@ -683,10 +731,31 @@
     return group_split_dispatch(df, c.astype(np.int64), k, 
ignore_index=ignore_index)
 
 
[email protected]
+def ensure_cleanup_on_exception(p):
+    """Ensure a partd.File is cleaned up.
+
+    We have several tasks referring to a `partd.File` instance. We want to
+    ensure that the file is cleaned up if and only if there's an exception
+    in the tasks using the `partd.File`.
+    """
+    try:
+        yield
+    except Exception:
+        # the function (e.g. shuffle_group_3) had an internal exception.
+        # We'll cleanup our temporary files and re-raise.
+        try:
+            p.drop()
+        except Exception:
+            logger.exception("ignoring exception in 
ensure_cleanup_on_exception")
+        raise
+
+
 def shuffle_group_3(df, col, npartitions, p):
-    g = df.groupby(col)
-    d = {i: g.get_group(i) for i in g.groups}
-    p.append(d, fsync=True)
+    with ensure_cleanup_on_exception(p):
+        g = df.groupby(col)
+        d = {i: g.get_group(i) for i in g.groups}
+        p.append(d, fsync=True)
 
 
 def set_index_post_scalar(df, index_name, drop, column_dtype):
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/dask-2.13.0/dask/dataframe/tests/test_multi.py 
new/dask-2.14.0/dask/dataframe/tests/test_multi.py
--- old/dask-2.13.0/dask/dataframe/tests/test_multi.py  2020-03-25 
20:06:02.000000000 +0100
+++ new/dask-2.14.0/dask/dataframe/tests/test_multi.py  2020-04-03 
22:23:54.000000000 +0200
@@ -585,9 +585,7 @@
     [
         (1.0, 1),
         (1.0, "one"),
-        # See https://github.com/dask/dask/issues/5968 and
-        # https://github.com/pandas-dev/pandas/issues/32934
-        pytest.param(1.0, pd.to_datetime("1970-01-01"), 
marks=pytest.mark.xfail),
+        (1.0, pd.to_datetime("1970-01-01")),
         (1, "one"),
         (1, pd.to_datetime("1970-01-01")),
         ("one", pd.to_datetime("1970-01-01")),
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/dask-2.13.0/dask/dataframe/tests/test_shuffle.py 
new/dask-2.14.0/dask/dataframe/tests/test_shuffle.py
--- old/dask-2.13.0/dask/dataframe/tests/test_shuffle.py        2020-03-21 
17:11:50.000000000 +0100
+++ new/dask-2.14.0/dask/dataframe/tests/test_shuffle.py        2020-04-02 
01:11:50.000000000 +0200
@@ -1,6 +1,8 @@
 import itertools
 import os
 import random
+import tempfile
+from unittest import mock
 
 import pandas as pd
 import pytest
@@ -24,7 +26,6 @@
     remove_nans,
 )
 from dask.dataframe.utils import assert_eq, make_meta
-from dask.compatibility import PY_VERSION
 
 
 dsk = {
@@ -277,6 +278,44 @@
         assert sum(i in set(part._partitions) for part in parts) == 1
 
 
+def test_rearrange_cleanup():
+    df = pd.DataFrame({"x": np.random.random(10)})
+    ddf = dd.from_pandas(df, npartitions=4)
+    ddf2 = ddf.assign(_partitions=ddf.x % 4)
+
+    tmpdir = tempfile.mkdtemp()
+
+    with dask.config.set(temporay_directory=str(tmpdir)):
+        result = rearrange_by_column(ddf2, "_partitions", max_branch=32, 
shuffle="disk")
+        result.compute(scheduler="processes")
+
+    assert len(os.listdir(tmpdir)) == 0
+
+
+def mock_shuffle_group_3(df, col, npartitions, p):
+    raise ValueError("Mock exception!")
+
+
+def test_rearrange_disk_cleanup_with_exception():
+    # ensure temporary files are cleaned up when there's an internal exception.
+
+    with mock.patch("dask.dataframe.shuffle.shuffle_group_3", 
new=mock_shuffle_group_3):
+        df = pd.DataFrame({"x": np.random.random(10)})
+        ddf = dd.from_pandas(df, npartitions=4)
+        ddf2 = ddf.assign(_partitions=ddf.x % 4)
+
+        tmpdir = tempfile.mkdtemp()
+
+        with dask.config.set(temporay_directory=str(tmpdir)):
+            with pytest.raises(ValueError, match="Mock exception!"):
+                result = rearrange_by_column(
+                    ddf2, "_partitions", max_branch=32, shuffle="disk"
+                )
+                result.compute(scheduler="processes")
+
+    assert len(os.listdir(tmpdir)) == 0
+
+
 def test_rearrange_by_column_with_narrow_divisions():
     from dask.dataframe.tests.test_multi import list_eq
 
@@ -779,34 +818,6 @@
         compute_divisions(c)
 
 
-# TODO: Fix sporadic failure on Python 3.8 and remove this xfail mark
[email protected](PY_VERSION >= "3.8", reason="Flaky test", strict=False)
-def test_temporary_directory(tmpdir):
-    from multiprocessing.pool import Pool
-
-    df = pd.DataFrame(
-        {
-            "x": np.random.random(100),
-            "y": np.random.random(100),
-            "z": np.random.random(100),
-        }
-    )
-    ddf = dd.from_pandas(df, npartitions=10, name="x", sort=False)
-
-    # We use a pool to avoid a race condition between the pool close
-    # cleaning up files, and the assert below.
-    pool = Pool(4)
-    with pool:
-        with dask.config.set(
-            temporary_directory=str(tmpdir), scheduler="processes", pool=pool
-        ):
-            ddf2 = ddf.set_index("x", shuffle="disk")
-            ddf2.compute()
-            assert any(
-                fn.endswith(".partd") for fn in os.listdir(str(tmpdir))
-            ), os.listdir(str(tmpdir))
-
-
 def test_empty_partitions():
     # See https://github.com/dask/dask/issues/2408
     df = pd.DataFrame({"a": list(range(10))})
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/dask-2.13.0/dask/delayed.py 
new/dask-2.14.0/dask/delayed.py
--- old/dask-2.13.0/dask/delayed.py     2020-03-21 17:11:50.000000000 +0100
+++ new/dask-2.14.0/dask/delayed.py     2020-03-27 18:08:57.000000000 +0100
@@ -231,6 +231,15 @@
         to hashing content. Note that this only affects the name of the object
         wrapped by this call to delayed, and *not* the output of delayed
         function calls - for that use ``dask_key_name=`` as described below.
+
+        .. note::
+
+           Because this ``name`` is used as the key in task graphs, you should
+           ensure that it uniquely identifies ``obj``. If you'd like to provide
+           a descriptive name that is still unique, combine the descriptive 
name
+           with :func:`dask.base.tokenize` of the ``array_like``. See
+           :ref:`graphs` for more.
+
     pure : bool, optional
         Indicates whether calling the resulting ``Delayed`` object is a pure
         operation. If True, arguments to the call are hashed to produce
@@ -338,11 +347,15 @@
 
     The key name of a delayed object is hashed by default if ``pure=True`` or
     is generated randomly if ``pure=False`` (default).  To explicitly set the
-    name, you can use the ``name`` keyword:
-
-    >>> a = delayed([1, 2, 3], name='mylist')
-    >>> a
-    Delayed('mylist')
+    name, you can use the ``name`` keyword. To ensure that the key is unique
+    you should include the tokenized value as well, or otherwise ensure that
+    it's unique:
+
+    >>> from dask.base import tokenize
+    >>> data = [1, 2, 3]
+    >>> a = delayed(data, name='mylist-' + tokenize(data))
+    >>> a  # doctest: +SKIP
+    Delayed('mylist-55af65871cb378a4fa6de1660c3e8fb7')
 
     Delayed results act as a proxy to the underlying object. Many operators
     are supported:
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/dask-2.13.0/dask.egg-info/PKG-INFO 
new/dask-2.14.0/dask.egg-info/PKG-INFO
--- old/dask-2.13.0/dask.egg-info/PKG-INFO      2020-03-25 20:23:38.000000000 
+0100
+++ new/dask-2.14.0/dask.egg-info/PKG-INFO      2020-04-03 22:40:58.000000000 
+0200
@@ -1,6 +1,6 @@
 Metadata-Version: 2.1
 Name: dask
-Version: 2.13.0
+Version: 2.14.0
 Summary: Parallel PyData with Task Scheduling
 Home-page: https://github.com/dask/dask/
 Maintainer: Matthew Rocklin
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/dask-2.13.0/docs/source/changelog.rst 
new/dask-2.14.0/docs/source/changelog.rst
--- old/dask-2.13.0/docs/source/changelog.rst   2020-03-25 20:21:24.000000000 
+0100
+++ new/dask-2.14.0/docs/source/changelog.rst   2020-04-03 22:37:39.000000000 
+0200
@@ -1,6 +1,32 @@
 Changelog
 =========
 
+2.14.0 / 2020-04-03
+-------------------
+
+Array
++++++
+
+- Added ``np.iscomplexobj`` implementation (:pr:`6045`) `Tom Augspurger`_
+
+Core
+++++
+
+- Update ``test_rearrange_disk_cleanup_with_exception`` to pass without 
cloudpickle installed (:pr:`6052`) `James Bourbeau`_
+- Fixed flaky ``test-rearrange`` (:pr:`5977`) `Tom Augspurger`_
+
+DataFrame
++++++++++
+
+- Use ``_meta_nonempty`` for dtype casting in ``stack_partitions`` 
(:pr:`6061`) `mlondschien`_
+- Fix bugs in ``_metadata`` creation and filtering in parquet ``ArrowEngine`` 
(:pr:`6023`) `Richard (Rick) Zamora`_
+
+Documentation
++++++++++++++
+
+- DOC: Add name caveats (:pr:`6040`) `Tom Augspurger`_
+
+
 2.13.0 / 2020-03-25
 -------------------
 
@@ -3045,3 +3071,4 @@
 .. _`psimaj`: https://github.com/psimaj
 .. _`mlondschien`: https://github.com/mlondschien
 .. _`petiop`: https://github.com/petiop
+.. _`Richard (Rick) Zamora`: https://github.com/rjzamora
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/dask-2.13.0/docs/source/graphs.rst 
new/dask-2.14.0/docs/source/graphs.rst
--- old/dask-2.13.0/docs/source/graphs.rst      2020-01-28 18:29:39.000000000 
+0100
+++ new/dask-2.14.0/docs/source/graphs.rst      2020-03-27 18:08:57.000000000 
+0100
@@ -1,3 +1,5 @@
+.. _graphs:
+
 Task Graphs
 ===========
 


Reply via email to