Hello community, here is the log from the commit of package python-dask for openSUSE:Factory checked in at 2019-11-30 10:39:24 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ Comparing /work/SRC/openSUSE:Factory/python-dask (Old) and /work/SRC/openSUSE:Factory/.python-dask.new.26869 (New) ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Package is "python-dask" Sat Nov 30 10:39:24 2019 rev:24 rq:751835 version:2.8.1 Changes: -------- --- /work/SRC/openSUSE:Factory/python-dask/python-dask.changes 2019-11-17 19:23:28.898857695 +0100 +++ /work/SRC/openSUSE:Factory/.python-dask.new.26869/python-dask.changes 2019-11-30 10:40:48.708147466 +0100 @@ -1,0 +2,29 @@ +Sun Nov 24 17:35:04 UTC 2019 - Arun Persaud <[email protected]> + +- update to version 2.8.1: + * Array + + Use auto rechunking in da.rechunk if no value given (:pr:`5605`) + Matthew Rocklin + * Core + + Add simple action to activate GH actions (:pr:`5619`) James + Bourbeau + * DataFrame + + Fix "file_path_0" bug in aggregate_row_groups (:pr:`5627`) + Richard J Zamora + + Add chunksize argument to read_parquet (:pr:`5607`) Richard J + Zamora + + Change test_repartition_npartitions to support arch64 + architecture (:pr:`5620`) ossdev07 + + Categories lost after groupby + agg (:pr:`5423`) Oliver Hofkens + + Fixed relative path issue with parquet metadata file + (:pr:`5608`) Nuno Gomes Silva + + Enable gpu-backed covariance/correlation in dataframes + (:pr:`5597`) Richard J Zamora + * Documentation + + Fix institutional faq and unknown doc warnings (:pr:`5616`) + James Bourbeau + + Add doc for some utils (:pr:`5609`) Tom Augspurger + + Removes html_extra_path (:pr:`5614`) James Bourbeau + + Fixed See Also referencence (:pr:`5612`) Tom Augspurger + +------------------------------------------------------------------- Old: ---- dask-2.8.0.tar.gz New: ---- dask-2.8.1.tar.gz ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ Other differences: ------------------ ++++++ python-dask.spec ++++++ --- /var/tmp/diff_new_pack.cjxRzs/_old 2019-11-30 10:40:49.172147411 +0100 +++ /var/tmp/diff_new_pack.cjxRzs/_new 2019-11-30 10:40:49.172147411 +0100 @@ -1,7 +1,7 @@ # # spec file for package python-dask # -# Copyright (c) 2019 SUSE LINUX GmbH, Nuernberg, Germany. +# Copyright (c) 2019 SUSE LLC # # All modifications and additions to the file contributed by third parties # remain the property of their copyright owners, unless otherwise agreed @@ -27,7 +27,7 @@ %endif %define skip_python2 1 Name: python-dask%{psuffix} -Version: 2.8.0 +Version: 2.8.1 Release: 0 Summary: Minimal task scheduling abstraction License: BSD-3-Clause ++++++ dask-2.8.0.tar.gz -> dask-2.8.1.tar.gz ++++++ diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/dask-2.8.0/PKG-INFO new/dask-2.8.1/PKG-INFO --- old/dask-2.8.0/PKG-INFO 2019-11-14 23:57:18.000000000 +0100 +++ new/dask-2.8.1/PKG-INFO 2019-11-23 05:31:55.000000000 +0100 @@ -1,6 +1,6 @@ Metadata-Version: 2.1 Name: dask -Version: 2.8.0 +Version: 2.8.1 Summary: Parallel PyData with Task Scheduling Home-page: https://github.com/dask/dask/ Maintainer: Matthew Rocklin @@ -43,10 +43,10 @@ Classifier: Programming Language :: Python :: 3.6 Classifier: Programming Language :: Python :: 3.7 Requires-Python: >=3.6 -Provides-Extra: complete Provides-Extra: array -Provides-Extra: diagnostics -Provides-Extra: dataframe Provides-Extra: bag -Provides-Extra: delayed +Provides-Extra: dataframe Provides-Extra: distributed +Provides-Extra: diagnostics +Provides-Extra: delayed +Provides-Extra: complete diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/dask-2.8.0/dask/_version.py new/dask-2.8.1/dask/_version.py --- old/dask-2.8.0/dask/_version.py 2019-11-14 23:57:18.000000000 +0100 +++ new/dask-2.8.1/dask/_version.py 2019-11-23 05:31:55.000000000 +0100 @@ -11,8 +11,8 @@ { "dirty": false, "error": null, - "full-revisionid": "539d1e27a8ccce01de5f3d49f1748057c27552f2", - "version": "2.8.0" + "full-revisionid": "eee9b78da60c24897e1df984f01dd9f36245fcb1", + "version": "2.8.1" } ''' # END VERSION_JSON diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/dask-2.8.0/dask/array/core.py new/dask-2.8.1/dask/array/core.py --- old/dask-2.8.0/dask/array/core.py 2019-11-13 21:17:45.000000000 +0100 +++ new/dask-2.8.1/dask/array/core.py 2019-11-23 05:10:40.000000000 +0100 @@ -2164,7 +2164,7 @@ return squeeze(self, axis) - def rechunk(self, chunks, threshold=None, block_size_limit=None): + def rechunk(self, chunks="auto", threshold=None, block_size_limit=None): """ See da.rechunk for docstring """ from . import rechunk # avoid circular import diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/dask-2.8.0/dask/array/linalg.py new/dask-2.8.1/dask/array/linalg.py --- old/dask-2.8.0/dask/array/linalg.py 2019-11-05 22:48:29.000000000 +0100 +++ new/dask-2.8.1/dask/array/linalg.py 2019-11-20 16:56:28.000000000 +0100 @@ -96,9 +96,12 @@ See Also -------- - dask.array.linalg.qr - Powered by this algorithm - dask.array.linalg.svd - Powered by this algorithm - dask.array.linalg.sfqr - Variant for short-and-fat arrays + dask.array.linalg.qr + Powered by this algorithm + dask.array.linalg.svd + Powered by this algorithm + dask.array.linalg.sfqr + Variant for short-and-fat arrays """ nr, nc = len(data.chunks[0]), len(data.chunks[1]) cr_max, cc = max(data.chunks[0]), data.chunks[1][0] @@ -519,8 +522,10 @@ See Also -------- - dask.array.linalg.qr - Main user API that uses this function - dask.array.linalg.tsqr - Variant for tall-and-skinny case + dask.array.linalg.qr + Main user API that uses this function + dask.array.linalg.tsqr + Variant for tall-and-skinny case """ nr, nc = len(data.chunks[0]), len(data.chunks[1]) cr, cc = data.chunks[0][0], data.chunks[1][0] @@ -742,21 +747,22 @@ """ Compute the qr factorization of a matrix. - Examples - -------- - - >>> q, r = da.linalg.qr(x) # doctest: +SKIP + Parameters + ---------- + a : Array Returns ------- - q: Array, orthonormal r: Array, upper-triangular - See Also + Examples -------- + >>> q, r = da.linalg.qr(x) # doctest: +SKIP - np.linalg.qr: Equivalent NumPy Operation + See Also + -------- + numpy.linalg.qr: Equivalent NumPy Operation dask.array.linalg.tsqr: Implementation for tall-and-skinny arrays dask.array.linalg.sfqr: Implementation for short-and-fat arrays """ diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/dask-2.8.0/dask/array/rechunk.py new/dask-2.8.1/dask/array/rechunk.py --- old/dask-2.8.0/dask/array/rechunk.py 2019-10-11 05:14:07.000000000 +0200 +++ new/dask-2.8.1/dask/array/rechunk.py 2019-11-20 16:14:05.000000000 +0100 @@ -181,7 +181,7 @@ return cross -def rechunk(x, chunks, threshold=None, block_size_limit=None): +def rechunk(x, chunks="auto", threshold=None, block_size_limit=None): """ Convert blocks in dask array x for new chunks. @@ -189,13 +189,14 @@ ---------- x: dask array Array to be rechunked. - chunks: int, tuple or dict + chunks: int, tuple, dict or str, optional The new block dimensions to create. -1 indicates the full size of the - corresponding dimension. - threshold: int + corresponding dimension. Default is "auto" which automatically + determines chunk sizes. + threshold: int, optional The graph growth factor under which we don't bother introducing an intermediate step. - block_size_limit: int + block_size_limit: int, optional The maximum block size (in bytes) we want to produce Defaults to the configuration value ``array.chunk-size`` diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/dask-2.8.0/dask/array/tests/test_array_core.py new/dask-2.8.1/dask/array/tests/test_array_core.py --- old/dask-2.8.0/dask/array/tests/test_array_core.py 2019-11-13 21:17:45.000000000 +0100 +++ new/dask-2.8.1/dask/array/tests/test_array_core.py 2019-11-20 16:14:05.000000000 +0100 @@ -4224,3 +4224,10 @@ y[:3, :] y.compute_chunk_sizes() y[:3, :] + + +def test_rechunk_auto(): + x = da.ones(10, chunks=(1,)) + y = x.rechunk() + + assert y.npartitions == 1 diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/dask-2.8.0/dask/dataframe/core.py new/dask-2.8.1/dask/dataframe/core.py --- old/dask-2.8.0/dask/dataframe/core.py 2019-11-13 18:07:07.000000000 +0100 +++ new/dask-2.8.1/dask/dataframe/core.py 2019-11-23 05:10:40.000000000 +0100 @@ -43,7 +43,7 @@ skip_doctest, ) from ..array.core import Array, normalize_arg -from ..array.utils import empty_like_safe +from ..array.utils import empty_like_safe, zeros_like_safe from ..blockwise import blockwise, Blockwise from ..base import DaskMethodsMixin, tokenize, dont_optimize, is_dask_collection from ..delayed import delayed, Delayed, unpack_collections @@ -5125,9 +5125,9 @@ """Chunk part of a covariance or correlation computation """ shape = (df.shape[1], df.shape[1]) - sums = np.zeros(shape) - counts = np.zeros(shape) df = df.astype("float64", copy=False) + sums = zeros_like_safe(df.values, shape=shape) + counts = zeros_like_safe(df.values, shape=shape) for idx, col in enumerate(df): mask = df.iloc[:, idx].notnull() sums[idx] = df[mask].sum().values @@ -5138,27 +5138,34 @@ with warnings.catch_warnings(record=True): warnings.simplefilter("always") mu = (sums / counts).T - m = np.zeros(shape) + m = zeros_like_safe(df.values, shape=shape) mask = df.isnull().values for idx, x in enumerate(df): - # Use .values to get the ndarray for the ufunc. - mu_discrepancy = np.subtract.outer(df.iloc[:, idx].values, mu[idx]) ** 2 + # Avoid using ufunc.outer (not supported by cupy) + mu_discrepancy = ( + np.subtract(df.iloc[:, idx].values[:, None], mu[idx][None, :]) ** 2 + ) mu_discrepancy[mask] = np.nan m[idx] = np.nansum(mu_discrepancy, axis=0) m = m.T dtype.append(("m", m.dtype)) - out = np.empty(counts.shape, dtype=dtype) - out["sum"] = sums - out["count"] = counts - out["cov"] = cov * (counts - 1) + out = {"sum": sums, "count": counts, "cov": cov * (counts - 1)} if corr: out["m"] = m return out -def cov_corr_combine(data, corr=False): - data = np.concatenate(data).reshape((len(data),) + data[0].shape) +def cov_corr_combine(data_in, corr=False): + + data = {"sum": None, "count": None, "cov": None} + if corr: + data["m"] = None + + for k in data.keys(): + data[k] = [d[k] for d in data_in] + data[k] = np.concatenate(data[k]).reshape((len(data[k]),) + data[k][0].shape) + sums = np.nan_to_num(data["sum"]) counts = data["count"] @@ -5175,10 +5182,7 @@ (n1 * n2) / (n1 + n2) * (d * d.transpose((0, 2, 1))), 0 ) + np.nansum(data["cov"], 0) - out = np.empty(C.shape, dtype=data.dtype) - out["sum"] = cum_sums[-1] - out["count"] = cum_counts[-1] - out["cov"] = C + out = {"sum": cum_sums[-1], "count": cum_counts[-1], "cov": C} if corr: nobs = np.where(cum_counts[-1], cum_counts[-1], np.nan) @@ -5202,7 +5206,7 @@ with np.errstate(invalid="ignore", divide="ignore"): mat = C / den if scalar: - return mat[0, 1] + return float(mat[0, 1]) return pd.DataFrame(mat, columns=cols, index=cols) diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/dask-2.8.0/dask/dataframe/io/parquet/arrow.py new/dask-2.8.1/dask/dataframe/io/parquet/arrow.py --- old/dask-2.8.0/dask/dataframe/io/parquet/arrow.py 2019-11-05 22:48:30.000000000 +0100 +++ new/dask-2.8.1/dask/dataframe/io/parquet/arrow.py 2019-11-23 05:12:29.000000000 +0100 @@ -77,8 +77,7 @@ ) else: base, fns = _analyze_paths(paths, fs) - relpaths = [path.replace(base, "").lstrip("/") for path in paths] - if "_metadata" in relpaths: + if "_metadata" in fns: # We have a _metadata file, lets use it dataset = pq.ParquetDataset( base + fs.sep + "_metadata", @@ -91,36 +90,32 @@ # Will need to pass a list of paths to read_partition dataset = pq.ParquetDataset(paths[0], filesystem=fs, **dataset_kwargs) parts = [base + fs.sep + fn for fn in fns] - else: - if fs.isdir(paths[0]): - # This is a directory, check for _metadata, then _common_metadata - allpaths = fs.glob(paths[0] + fs.sep + "*") - base, fns = _analyze_paths(allpaths, fs) - relpaths = [path.replace(base, "").lstrip("/") for path in allpaths] - if "_metadata" in relpaths and "validate_schema" not in dataset_kwargs: - dataset_kwargs["validate_schema"] = False - if "_metadata" in relpaths or gather_statistics is not False: - # Let arrow do its thing (use _metadata or scan files) + elif fs.isdir(paths[0]): + # This is a directory, check for _metadata, then _common_metadata + allpaths = fs.glob(paths[0] + fs.sep + "*") + base, fns = _analyze_paths(allpaths, fs) + if "_metadata" in fns and "validate_schema" not in dataset_kwargs: + dataset_kwargs["validate_schema"] = False + if "_metadata" in fns or gather_statistics is not False: + # Let arrow do its thing (use _metadata or scan files) + dataset = pq.ParquetDataset( + paths, filesystem=fs, filters=filters, **dataset_kwargs + ) + else: + # Use _common_metadata file if it is available. + # Otherwise, just use 0th file + if "_common_metadata" in fns: dataset = pq.ParquetDataset( - paths, filesystem=fs, filters=filters, **dataset_kwargs + base + fs.sep + "_common_metadata", filesystem=fs, **dataset_kwargs ) else: - # Use _common_metadata file if it is available. - # Otherwise, just use 0th file - if "_common_metadata" in relpaths: - dataset = pq.ParquetDataset( - base + fs.sep + "_common_metadata", - filesystem=fs, - **dataset_kwargs, - ) - else: - dataset = pq.ParquetDataset( - allpaths[0], filesystem=fs, **dataset_kwargs - ) - parts = [base + fs.sep + fn for fn in fns] - else: - # There is only one file to read - dataset = pq.ParquetDataset(paths, filesystem=fs, **dataset_kwargs) + dataset = pq.ParquetDataset( + allpaths[0], filesystem=fs, **dataset_kwargs + ) + parts = [base + fs.sep + fn for fn in fns] + else: + # There is only one file to read + dataset = pq.ParquetDataset(paths, filesystem=fs, **dataset_kwargs) return parts, dataset @@ -268,6 +263,7 @@ } ) s["columns"].append(d) + s["total_byte_size"] = row_group.total_byte_size stats.append(s) else: stats = None @@ -294,10 +290,15 @@ if split_row_groups and row_groups_per_piece: # TODO: This block can be removed after ARROW-2801 parts = [] + rg_tot = 0 for i, piece in enumerate(pieces): num_row_groups = row_groups_per_piece[i] for rg in range(num_row_groups): parts.append((piece.path, rg, piece.partition_keys)) + # Setting file_path here, because it may be + # missing from the row-group/column-chunk stats + stats[rg_tot]["file_path_0"] = piece.path + rg_tot += 1 else: parts = [ (piece.path, piece.row_group, piece.partition_keys) @@ -326,12 +327,14 @@ ) else: # `piece` contains (path, row_group, partition_keys) + (path, row_group, partition_keys) = piece piece = pq.ParquetDatasetPiece( - piece[0], - row_group=piece[1], - partition_keys=piece[2], + path, + row_group=row_group, + partition_keys=partition_keys, open_file_func=partial(fs.open, mode="rb"), ) + df = piece.read( columns=columns, partitions=partitions, diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/dask-2.8.0/dask/dataframe/io/parquet/core.py new/dask-2.8.1/dask/dataframe/io/parquet/core.py --- old/dask-2.8.0/dask/dataframe/io/parquet/core.py 2019-11-05 22:48:30.000000000 +0100 +++ new/dask-2.8.1/dask/dataframe/io/parquet/core.py 2019-11-23 05:14:40.000000000 +0100 @@ -8,8 +8,9 @@ from ...core import DataFrame, new_dd_object from ....base import tokenize -from ....utils import import_required, natural_sort_key +from ....utils import import_required, natural_sort_key, parse_bytes from collections.abc import Mapping +from ...methods import concat try: @@ -62,16 +63,18 @@ raise KeyError(key) part = self.parts[i] + if not isinstance(part, list): + part = [part] return ( read_parquet_part, self.engine.read_partition, self.fs, self.meta, - part["piece"], + [p["piece"] for p in part], self.columns, self.index, - toolz.merge(part["kwargs"], self.kwargs or {}), + toolz.merge(part[0]["kwargs"], self.kwargs or {}), ) def __len__(self): @@ -92,6 +95,7 @@ engine="auto", gather_statistics=None, split_row_groups=True, + chunksize=None, **kwargs ): """ @@ -144,6 +148,10 @@ to parquet-file row-groups (when enough row-group metadata is available). Otherwise, partitions correspond to distinct files. Only the "pyarrow" engine currently supports this argument. + chunksize : int, str + The target task partition size. If set, consecutive row-groups + from the same file will be aggregated into the same output + partition until the aggregate size reaches this value. **kwargs: dict (of dicts) Passthrough key-word arguments for read backend. The top-level keys correspond to the appropriate operation type, and @@ -218,116 +226,16 @@ if meta.index.name is not None: index = meta.index.name - ignore_index_column_intersection = False - if columns is None: - # User didn't specify columns, so ignore any intersection - # of auto-detected values with the index (if necessary) - ignore_index_column_intersection = True - columns = [c for c in meta.columns] - - if not set(columns).issubset(set(meta.columns)): - raise ValueError( - "The following columns were not found in the dataset %s\n" - "The following columns were found %s" - % (set(columns) - set(meta.columns), meta.columns) - ) - # Parse dataset statistics from metadata (if available) - index_in_columns = False - if statistics: - result = list( - zip( - *[ - (part, stats) - for part, stats in zip(parts, statistics) - if stats["num-rows"] > 0 - ] - ) - ) - parts, statistics = result or [[], []] - if filters: - parts, statistics = apply_filters(parts, statistics, filters) - - out = sorted_columns(statistics) - - if index and isinstance(index, str): - index = [index] - if index and out: - # Only one valid column - out = [o for o in out if o["name"] in index] - if index is not False and len(out) == 1: - # Use only sorted column with statistics as the index - divisions = out[0]["divisions"] - if index is None: - index_in_columns = True - index = [out[0]["name"]] - elif index != [out[0]["name"]]: - raise ValueError("Specified index is invalid.\nindex: {}".format(index)) - elif index is not False and len(out) > 1: - if any(o["name"] == "index" for o in out): - # Use sorted column named "index" as the index - [o] = [o for o in out if o["name"] == "index"] - divisions = o["divisions"] - if index is None: - index = [o["name"]] - index_in_columns = True - elif index != [o["name"]]: - raise ValueError( - "Specified index is invalid.\nindex: {}".format(index) - ) - else: - # Multiple sorted columns found, cannot autodetect the index - warnings.warn( - "Multiple sorted columns found %s, cannot\n " - "autodetect index. Will continue without an index.\n" - "To pick an index column, use the index= keyword; to \n" - "silence this warning use index=False." - "" % [o["name"] for o in out], - RuntimeWarning, - ) - index = False - divisions = [None] * (len(parts) + 1) - else: - divisions = [None] * (len(parts) + 1) - else: - divisions = [None] * (len(parts) + 1) - - if index: - if isinstance(index, str): - index = [index] - if isinstance(columns, str): - columns = [columns] - - if ignore_index_column_intersection: - columns = [col for col in columns if col not in index] - if set(index).intersection(columns): - if auto_index_allowed: - raise ValueError( - "Specified index and column arguments must not intersect" - " (set index=False or remove the detected index from columns).\n" - "index: {} | column: {}".format(index, columns) - ) - else: - raise ValueError( - "Specified index and column arguments must not intersect.\n" - "index: {} | column: {}".format(index, columns) - ) - - # Leaving index as a column in `meta`, because the index - # will be reset below (in case the index was detected after - # meta was created) - if index_in_columns: - meta = meta[columns + index] - else: - meta = meta[columns] - - else: - meta = meta[list(columns)] + parts, divisions, index, index_in_columns = process_statistics( + parts, statistics, filters, index, chunksize + ) - def _merge_kwargs(x, y): - z = x.copy() - z.update(y) - return z + # Account for index and columns arguments. + # Modify `meta` dataframe accordingly + meta, index, columns = set_index_columns( + meta, index, columns, index_in_columns, auto_index_allowed + ) subgraph = ParquetSubgraph(name, engine, fs, meta, columns, index, parts, kwargs) @@ -347,7 +255,12 @@ """ Read a part of a parquet dataset This function is used by `read_parquet`.""" - df = func(fs, part, columns, index, **kwargs) + if isinstance(part, list): + dfs = [func(fs, rg, columns.copy(), index, **kwargs) for rg in part] + df = concat(dfs, axis=0) + else: + df = func(fs, part, columns, index, **kwargs) + if meta.columns.name: df.columns.name = meta.columns.name columns = columns or [] @@ -684,4 +597,166 @@ return parts, statistics +def process_statistics(parts, statistics, filters, index, chunksize): + """Process row-group column statistics in metadata + Used in read_parquet. + """ + index_in_columns = False + if statistics: + result = list( + zip( + *[ + (part, stats) + for part, stats in zip(parts, statistics) + if stats["num-rows"] > 0 + ] + ) + ) + parts, statistics = result or [[], []] + if filters: + parts, statistics = apply_filters(parts, statistics, filters) + + # Aggregate parts/statistics if we are splitting by row-group + if chunksize: + parts, statistics = aggregate_row_groups(parts, statistics, chunksize) + + out = sorted_columns(statistics) + + if index and isinstance(index, str): + index = [index] + if index and out: + # Only one valid column + out = [o for o in out if o["name"] in index] + if index is not False and len(out) == 1: + # Use only sorted column with statistics as the index + divisions = out[0]["divisions"] + if index is None: + index_in_columns = True + index = [out[0]["name"]] + elif index != [out[0]["name"]]: + raise ValueError("Specified index is invalid.\nindex: {}".format(index)) + elif index is not False and len(out) > 1: + if any(o["name"] == "index" for o in out): + # Use sorted column named "index" as the index + [o] = [o for o in out if o["name"] == "index"] + divisions = o["divisions"] + if index is None: + index = [o["name"]] + index_in_columns = True + elif index != [o["name"]]: + raise ValueError( + "Specified index is invalid.\nindex: {}".format(index) + ) + else: + # Multiple sorted columns found, cannot autodetect the index + warnings.warn( + "Multiple sorted columns found %s, cannot\n " + "autodetect index. Will continue without an index.\n" + "To pick an index column, use the index= keyword; to \n" + "silence this warning use index=False." + "" % [o["name"] for o in out], + RuntimeWarning, + ) + index = False + divisions = [None] * (len(parts) + 1) + else: + divisions = [None] * (len(parts) + 1) + else: + divisions = [None] * (len(parts) + 1) + + return parts, divisions, index, index_in_columns + + +def set_index_columns(meta, index, columns, index_in_columns, auto_index_allowed): + """Handle index/column arguments, and modify `meta` + Used in read_parquet. + """ + ignore_index_column_intersection = False + if columns is None: + # User didn't specify columns, so ignore any intersection + # of auto-detected values with the index (if necessary) + ignore_index_column_intersection = True + columns = [c for c in meta.columns] + + if not set(columns).issubset(set(meta.columns)): + raise ValueError( + "The following columns were not found in the dataset %s\n" + "The following columns were found %s" + % (set(columns) - set(meta.columns), meta.columns) + ) + + if index: + if isinstance(index, str): + index = [index] + if isinstance(columns, str): + columns = [columns] + + if ignore_index_column_intersection: + columns = [col for col in columns if col not in index] + if set(index).intersection(columns): + if auto_index_allowed: + raise ValueError( + "Specified index and column arguments must not intersect" + " (set index=False or remove the detected index from columns).\n" + "index: {} | column: {}".format(index, columns) + ) + else: + raise ValueError( + "Specified index and column arguments must not intersect.\n" + "index: {} | column: {}".format(index, columns) + ) + + # Leaving index as a column in `meta`, because the index + # will be reset below (in case the index was detected after + # meta was created) + if index_in_columns: + meta = meta[columns + index] + else: + meta = meta[columns] + + else: + meta = meta[list(columns)] + + return meta, index, columns + + +def aggregate_row_groups(parts, stats, chunksize): + if not stats[0].get("file_path_0", None): + return parts, stats + + parts_agg = [] + stats_agg = [] + chunksize = parse_bytes(chunksize) + next_part, next_stat = [parts[0].copy()], stats[0].copy() + for i in range(1, len(parts)): + stat, part = stats[i], parts[i] + if (stat["file_path_0"] == next_stat["file_path_0"]) and ( + (next_stat["total_byte_size"] + stat["total_byte_size"]) <= chunksize + ): + # Update part list + next_part.append(part) + + # Update Statistics + next_stat["total_byte_size"] += stat["total_byte_size"] + next_stat["num-rows"] += stat["num-rows"] + for col, col_add in zip(next_stat["columns"], stat["columns"]): + if col["name"] != col_add["name"]: + raise ValueError("Columns are different!!") + if "null_count" in col: + col["null_count"] += col_add["null_count"] + if "min" in col: + col["min"] = min(col["min"], col_add["min"]) + if "max" in col: + col["max"] = max(col["max"], col_add["max"]) + else: + parts_agg.append(next_part) + stats_agg.append(next_stat) + next_part, next_stat = [part.copy()], stat.copy() + + parts_agg.append(next_part) + stats_agg.append(next_stat) + + return parts_agg, stats_agg + + DataFrame.to_parquet.__doc__ = to_parquet.__doc__ diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/dask-2.8.0/dask/dataframe/io/parquet/fastparquet.py new/dask-2.8.1/dask/dataframe/io/parquet/fastparquet.py --- old/dask-2.8.0/dask/dataframe/io/parquet/fastparquet.py 2019-11-05 22:48:30.000000000 +0100 +++ new/dask-2.8.1/dask/dataframe/io/parquet/fastparquet.py 2019-11-23 05:12:29.000000000 +0100 @@ -91,17 +91,16 @@ fast_metadata = True if len(paths) > 1: base, fns = _analyze_paths(paths, fs) - relpaths = [path.replace(base, "").lstrip("/") for path in paths] if gather_statistics is not False: # This scans all the files, allowing index/divisions # and filtering pf = ParquetFile( paths, open_with=fs.open, sep=fs.sep, **kwargs.get("file", {}) ) - if "_metadata" not in relpaths: + if "_metadata" not in fns: fast_metadata = False else: - if "_metadata" in relpaths: + if "_metadata" in fns: # We have a _metadata file, lets use it pf = ParquetFile( base + fs.sep + "_metadata", @@ -117,49 +116,45 @@ pf.file_scheme = scheme pf.cats = _paths_to_cats(fns, scheme) parts = paths.copy() - else: - if fs.isdir(paths[0]): - # This is a directory, check for _metadata, then _common_metadata - paths = fs.glob(paths[0] + fs.sep + "*") - base, fns = _analyze_paths(paths, fs) - relpaths = [path.replace(base, "").lstrip("/") for path in paths] - if "_metadata" in relpaths: - # Using _metadata file (best-case scenario) + elif fs.isdir(paths[0]): + # This is a directory, check for _metadata, then _common_metadata + paths = fs.glob(paths[0] + fs.sep + "*") + base, fns = _analyze_paths(paths, fs) + if "_metadata" in fns: + # Using _metadata file (best-case scenario) + pf = ParquetFile( + base + fs.sep + "_metadata", + open_with=fs.open, + sep=fs.sep, + **kwargs.get("file", {}) + ) + if gather_statistics is None: + gather_statistics = True + + elif gather_statistics is not False: + # Scan every file + pf = ParquetFile(paths, open_with=fs.open, **kwargs.get("file", {})) + fast_metadata = False + else: + # Use _common_metadata file if it is available. + # Otherwise, just use 0th file + if "_common_metadata" in fns: pf = ParquetFile( - base + fs.sep + "_metadata", + base + fs.sep + "_common_metadata", open_with=fs.open, - sep=fs.sep, **kwargs.get("file", {}) ) - if gather_statistics is None: - gather_statistics = True - - elif gather_statistics is not False: - # Scan every file - pf = ParquetFile(paths, open_with=fs.open, **kwargs.get("file", {})) - fast_metadata = False else: - # Use _common_metadata file if it is available. - # Otherwise, just use 0th file - if "_common_metadata" in relpaths: - pf = ParquetFile( - base + fs.sep + "_common_metadata", - open_with=fs.open, - **kwargs.get("file", {}) - ) - else: - pf = ParquetFile( - paths[0], open_with=fs.open, **kwargs.get("file", {}) - ) - scheme = get_file_scheme(fns) - pf.file_scheme = scheme - pf.cats = _paths_to_cats(fns, scheme) - parts = paths.copy() - else: - # There is only one file to read - pf = ParquetFile( - paths[0], open_with=fs.open, sep=fs.sep, **kwargs.get("file", {}) - ) + pf = ParquetFile(paths[0], open_with=fs.open, **kwargs.get("file", {})) + scheme = get_file_scheme(fns) + pf.file_scheme = scheme + pf.cats = _paths_to_cats(fns, scheme) + parts = paths.copy() + else: + # There is only one file to read + pf = ParquetFile( + paths[0], open_with=fs.open, sep=fs.sep, **kwargs.get("file", {}) + ) return parts, pf, gather_statistics, fast_metadata @@ -296,6 +291,8 @@ s["columns"].append(d) # Need this to filter out partitioned-on categorical columns s["filter"] = fastparquet.api.filter_out_cats(row_group, filters) + s["total_byte_size"] = row_group.total_byte_size + s["file_path_0"] = row_group.columns[0].file_path # 0th column only stats.append(s) else: @@ -345,20 +342,18 @@ pf.file_scheme = scheme pf.cats = _paths_to_cats(fns, scheme) pf.fn = base - df = pf.to_pandas(columns, categories, index=index) + return pf.to_pandas(columns, categories, index=index) else: if isinstance(pf, tuple): pf = _determine_pf_parts(fs, pf[0], pf[1], **kwargs)[1] pf._dtypes = lambda *args: pf.dtypes # ugly patch, could be fixed pf.fmd.row_groups = None - piece = pf.row_groups[piece] + rg_piece = pf.row_groups[piece] pf.fmd.key_value_metadata = None - df = pf.read_row_group_file( - piece, columns, categories, index=index, **kwargs.get("read", {}) + return pf.read_row_group_file( + rg_piece, columns, categories, index=index, **kwargs.get("read", {}) ) - return df - @staticmethod def initialize_write( df, diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/dask-2.8.0/dask/dataframe/io/tests/test_parquet.py new/dask-2.8.1/dask/dataframe/io/tests/test_parquet.py --- old/dask-2.8.0/dask/dataframe/io/tests/test_parquet.py 2019-11-05 22:48:30.000000000 +0100 +++ new/dask-2.8.1/dask/dataframe/io/tests/test_parquet.py 2019-11-23 05:14:40.000000000 +0100 @@ -14,7 +14,7 @@ from dask.dataframe.io.parquet.utils import _parse_pandas_metadata from dask.dataframe.optimize import optimize_read_parquet_getitem from dask.dataframe.io.parquet.core import ParquetSubgraph -from dask.utils import natural_sort_key +from dask.utils import natural_sort_key, parse_bytes try: import fastparquet @@ -1915,7 +1915,10 @@ def test_timeseries_nulls_in_schema(tmpdir, engine): - tmp_path = str(tmpdir) + # GH#5608: relative path failing _metadata/_common_metadata detection. + tmp_path = str(tmpdir.mkdir("files")) + tmp_path = os.path.join(tmp_path, "../", "files") + ddf2 = ( dask.datasets.timeseries(start="2000-01-01", end="2000-01-03", freq="1h") .reset_index() @@ -2079,7 +2082,11 @@ ) ddf3 = dd.read_parquet( - tmp, engine="pyarrow", gather_statistics=True, split_row_groups=True + tmp, + engine="pyarrow", + gather_statistics=True, + split_row_groups=True, + chunksize=1, ) assert ddf3.npartitions == 4 @@ -2093,7 +2100,11 @@ ) ddf3 = dd.read_parquet( - tmp, engine="pyarrow", gather_statistics=True, split_row_groups=True + tmp, + engine="pyarrow", + gather_statistics=True, + split_row_groups=True, + chunksize=1, ) assert ddf3.npartitions == 12 @@ -2101,3 +2112,80 @@ tmp, engine="pyarrow", gather_statistics=True, split_row_groups=False ) assert ddf3.npartitions == 4 + + [email protected]("metadata", [True, False]) [email protected]("chunksize", [None, 1024, 4096, "1MiB"]) +def test_chunksize(tmpdir, chunksize, engine, metadata): + check_pyarrow() # Need pyarrow for write phase in this test + + nparts = 2 + df_size = 100 + row_group_size = 5 + row_group_byte_size = 451 # Empirically measured + + df = pd.DataFrame( + { + "a": np.random.choice(["apple", "banana", "carrot"], size=df_size), + "b": np.random.random(size=df_size), + "c": np.random.randint(1, 5, size=df_size), + "index": np.arange(0, df_size), + } + ).set_index("index") + + ddf1 = dd.from_pandas(df, npartitions=nparts) + ddf1.to_parquet( + str(tmpdir), + engine="pyarrow", + row_group_size=row_group_size, + write_metadata_file=metadata, + ) + + if metadata: + path = str(tmpdir) + else: + dirname = str(tmpdir) + files = os.listdir(dirname) + assert "_metadata" not in files + path = os.path.join(dirname, "*.parquet") + + ddf2 = dd.read_parquet( + path, + engine=engine, + chunksize=chunksize, + split_row_groups=True, + gather_statistics=True, + index="index", + ) + + assert_eq(ddf1, ddf2, check_divisions=False) + + num_row_groups = df_size // row_group_size + if not chunksize: + assert ddf2.npartitions == num_row_groups + else: + # Check that we are really aggregating + df_byte_size = row_group_byte_size * num_row_groups + expected = df_byte_size // parse_bytes(chunksize) + remainder = (df_byte_size % parse_bytes(chunksize)) > 0 + expected += int(remainder) * nparts + assert ddf2.npartitions == max(nparts, expected) + + +@write_read_engines() +def test_roundtrip_pandas_chunksize(tmpdir, write_engine, read_engine): + path = str(tmpdir.join("test.parquet")) + pdf = df.copy() + pdf.index.name = "index" + pdf.to_parquet(path, engine=write_engine) + + ddf_read = dd.read_parquet( + path, + engine=read_engine, + chunksize="10 kiB", + gather_statistics=True, + split_row_groups=True, + index="index", + ) + + assert_eq(pdf, ddf_read) diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/dask-2.8.0/dask/dataframe/tests/test_dataframe.py new/dask-2.8.1/dask/dataframe/tests/test_dataframe.py --- old/dask-2.8.0/dask/dataframe/tests/test_dataframe.py 2019-11-05 22:48:30.000000000 +0100 +++ new/dask-2.8.1/dask/dataframe/tests/test_dataframe.py 2019-11-22 04:35:27.000000000 +0100 @@ -1734,7 +1734,7 @@ def test_repartition_npartitions(use_index, n, k, dtype, transform): df = pd.DataFrame( {"x": [1, 2, 3, 4, 5, 6] * 10, "y": list("abdabd") * 10}, - index=pd.Series([10, 20, 30, 40, 50, 60] * 10, dtype=dtype), + index=pd.Series([1, 2, 3, 4, 5, 6] * 10, dtype=dtype), ) df = transform(df) a = dd.from_pandas(df, npartitions=n, sort=use_index) diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/dask-2.8.0/dask/dataframe/tests/test_groupby.py new/dask-2.8.1/dask/dataframe/tests/test_groupby.py --- old/dask-2.8.0/dask/dataframe/tests/test_groupby.py 2019-11-13 18:07:11.000000000 +0100 +++ new/dask-2.8.1/dask/dataframe/tests/test_groupby.py 2019-11-22 04:35:27.000000000 +0100 @@ -2189,6 +2189,48 @@ ) [email protected]( + "grouping,agg", + [ + ( + lambda df: df.drop(columns="category_2").groupby("category_1"), + lambda grp: grp.mean(), + ), + ( + lambda df: df.drop(columns="category_2").groupby("category_1"), + lambda grp: grp.agg("mean"), + ), + (lambda df: df.groupby(["category_1", "category_2"]), lambda grp: grp.mean()), + pytest.param( + lambda df: df.groupby(["category_1", "category_2"]), + lambda grp: grp.agg("mean"), + marks=pytest.mark.xfail( + not dask.dataframe.utils.PANDAS_GT_100, + reason=( + "Should work starting from pandas 1.0.0: " + "https://github.com/dask/dask/pull/5423" + ), + ), + ), + ], +) +def test_groupby_aggregate_categoricals(grouping, agg): + pdf = pd.DataFrame( + { + "category_1": pd.Categorical(list("AABBCC")), + "category_2": pd.Categorical(list("ABCABC")), + "value": np.random.uniform(size=6), + } + ) + ddf = dd.from_pandas(pdf, 2) + + # DataFrameGroupBy + assert_eq(agg(grouping(pdf)), agg(grouping(ddf))) + + # SeriesGroupBy + assert_eq(agg(grouping(pdf)["value"]), agg(grouping(ddf)["value"])) + + @pytest.mark.xfail(reason="dropna kwarg not supported in pandas groupby.") @pytest.mark.parametrize("dropna", [False, True]) def test_groupby_dropna_pandas(dropna): diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/dask-2.8.0/dask.egg-info/PKG-INFO new/dask-2.8.1/dask.egg-info/PKG-INFO --- old/dask-2.8.0/dask.egg-info/PKG-INFO 2019-11-14 23:57:18.000000000 +0100 +++ new/dask-2.8.1/dask.egg-info/PKG-INFO 2019-11-23 05:31:54.000000000 +0100 @@ -1,6 +1,6 @@ Metadata-Version: 2.1 Name: dask -Version: 2.8.0 +Version: 2.8.1 Summary: Parallel PyData with Task Scheduling Home-page: https://github.com/dask/dask/ Maintainer: Matthew Rocklin @@ -43,10 +43,10 @@ Classifier: Programming Language :: Python :: 3.6 Classifier: Programming Language :: Python :: 3.7 Requires-Python: >=3.6 -Provides-Extra: complete Provides-Extra: array -Provides-Extra: diagnostics -Provides-Extra: dataframe Provides-Extra: bag -Provides-Extra: delayed +Provides-Extra: dataframe Provides-Extra: distributed +Provides-Extra: diagnostics +Provides-Extra: delayed +Provides-Extra: complete diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/dask-2.8.0/docs/source/api.rst new/dask-2.8.1/docs/source/api.rst --- old/dask-2.8.0/docs/source/api.rst 2019-10-11 05:14:07.000000000 +0200 +++ new/dask-2.8.1/docs/source/api.rst 2019-11-21 03:32:04.000000000 +0100 @@ -52,7 +52,27 @@ .. autofunction:: persist .. autofunction:: visualize -Finally, Dask has a few helpers for generating demo datasets +Datasets +-------- -.. autofunction:: datasets.make_people -.. autofunction:: datasets.timeseries +Dask has a few helpers for generating demo datasets + +.. currentmodule:: dask.datasets + +.. autofunction:: make_people +.. autofunction:: timeseries + +.. _api.utilities: + +Utilities +--------- + +Dask has some public utility methods. These are primarily used for parsing +configuration values. + +.. currentmodule:: dask.utils + +.. autofunction:: format_bytes +.. autofunction:: format_time +.. autofunction:: parse_bytes +.. autofunction:: parse_timedelta diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/dask-2.8.0/docs/source/changelog.rst new/dask-2.8.1/docs/source/changelog.rst --- old/dask-2.8.0/docs/source/changelog.rst 2019-11-14 23:55:24.000000000 +0100 +++ new/dask-2.8.1/docs/source/changelog.rst 2019-11-23 05:27:27.000000000 +0100 @@ -1,6 +1,34 @@ Changelog ========= +2.8.1 / 2019-11-22 +------------------ + +Array ++++++ +- Use auto rechunking in ``da.rechunk`` if no value given (:pr:`5605`) `Matthew Rocklin`_ + +Core +++++ +- Add simple action to activate GH actions (:pr:`5619`) `James Bourbeau`_ + +DataFrame ++++++++++ +- Fix "file_path_0" bug in ``aggregate_row_groups`` (:pr:`5627`) `Richard J Zamora`_ +- Add ``chunksize`` argument to ``read_parquet`` (:pr:`5607`) `Richard J Zamora`_ +- Change ``test_repartition_npartitions`` to support arch64 architecture (:pr:`5620`) `ossdev07`_ +- Categories lost after groupby + agg (:pr:`5423`) `Oliver Hofkens`_ +- Fixed relative path issue with parquet metadata file (:pr:`5608`) `Nuno Gomes Silva`_ +- Enable gpu-backed covariance/correlation in dataframes (:pr:`5597`) `Richard J Zamora`_ + +Documentation ++++++++++++++ +- Fix institutional faq and unknown doc warnings (:pr:`5616`) `James Bourbeau`_ +- Add doc for some utils (:pr:`5609`) `Tom Augspurger`_ +- Removes ``html_extra_path`` (:pr:`5614`) `James Bourbeau`_ +- Fixed See Also referencence (:pr:`5612`) `Tom Augspurger`_ + + 2.8.0 / 2019-11-14 ------------------ @@ -2700,3 +2728,5 @@ .. _`Prithvi MK`: https://github.com/pmk21 .. _`Eric Dill`: https://github.com/ericdill .. _`Gina Helfrich`: https://github.com/Dr-G +.. _`ossdev07`: https://github.com/ossdev07 +.. _`Nuno Gomes Silva`: https://github.com/mgsnuno diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/dask-2.8.0/docs/source/conf.py new/dask-2.8.1/docs/source/conf.py --- old/dask-2.8.0/docs/source/conf.py 2019-10-11 05:14:07.000000000 +0200 +++ new/dask-2.8.1/docs/source/conf.py 2019-11-20 18:05:56.000000000 +0100 @@ -48,7 +48,6 @@ # The master toctree document. master_doc = "index" -html_extra_path = ["index.html"] # General information about the project. project = u"Dask" @@ -359,10 +358,9 @@ """ html_context = { - 'css_files': [ - '_static/theme_overrides.css', # override wide tables in RTD theme - ], - } + "css_files": ["_static/theme_overrides.css"] # override wide tables in RTD theme +} + def copy_legacy_redirects(app, docname): if app.builder.name == "html": diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/dask-2.8.0/docs/source/configuration.rst new/dask-2.8.1/docs/source/configuration.rst --- old/dask-2.8.0/docs/source/configuration.rst 2019-10-11 05:14:07.000000000 +0200 +++ new/dask-2.8.1/docs/source/configuration.rst 2019-11-21 03:32:04.000000000 +0100 @@ -66,6 +66,8 @@ For example, ``dask.config.get('num_workers')`` is equivalent to ``dask.config.get('num-workers')``. +Values like ``"128 MiB"`` and ``"10s"`` are parsed using the functions in +:ref:`api.utilities`. Specify Configuration --------------------- diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/dask-2.8.0/docs/source/dataframe-create.rst new/dask-2.8.1/docs/source/dataframe-create.rst --- old/dask-2.8.0/docs/source/dataframe-create.rst 2019-10-11 02:10:15.000000000 +0200 +++ new/dask-2.8.1/docs/source/dataframe-create.rst 2019-11-21 03:32:04.000000000 +0100 @@ -7,7 +7,7 @@ File System (HDFS), and Amazon's S3 (excepting HDF, which is only available on POSIX like file systems). -See the :doc:`Overview section <dataframe-overview>` for an in depth +See the :doc:`DataFrame overview page <dataframe>` for an in depth discussion of ``dask.dataframe`` scope, use, and limitations. API diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/dask-2.8.0/docs/source/institutional-faq.rst new/dask-2.8.1/docs/source/institutional-faq.rst --- old/dask-2.8.0/docs/source/institutional-faq.rst 2019-11-05 22:48:30.000000000 +0100 +++ new/dask-2.8.1/docs/source/institutional-faq.rst 2019-11-21 03:32:04.000000000 +0100 @@ -154,10 +154,10 @@ SLLURM, PBS, LSF, Torque, Condor, or other job batch queuing systems, then users can launch Dask on these systems today using either: - - `Dask Jobqueue <https://jobqueue.dask.org>`_ , which uses typical + - `Dask Jobqueue <https://jobqueue.dask.org>`_ , which uses typical ``qsub``, ``sbatch``, ``bsub`` or other submission tools in interactive settings. - - `Dask MPI <https://mpi.dask.org>`_ which uses MPI for deployment in + - `Dask MPI <https://mpi.dask.org>`_ which uses MPI for deployment in batch settings For more information see :doc:`setup/hpc` @@ -167,10 +167,10 @@ all of which provide hosted Kubernetes as a service. People today use Dask on Kubernetes using either of the following: - - **Helm**: an easy way to stand up a long-running Dask cluster and + - **Helm**: an easy way to stand up a long-running Dask cluster and Jupyter notebook - - **Dask-Kubernetes**: for native Kubernetes integration for fast moving + - **Dask-Kubernetes**: for native Kubernetes integration for fast moving or ephemeral deployments. For more information see :doc:`setup/kubernetes` diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/dask-2.8.0/docs/source/setup/custom-startup.rst new/dask-2.8.1/docs/source/setup/custom-startup.rst --- old/dask-2.8.0/docs/source/setup/custom-startup.rst 2019-10-11 05:14:07.000000000 +0200 +++ new/dask-2.8.1/docs/source/setup/custom-startup.rst 2019-11-21 03:32:04.000000000 +0100 @@ -28,7 +28,8 @@ As an example, consider the following file that creates a -:doc:`scheduler plugin <plugins>` and registers it with the scheduler +`scheduler plugin <https://distributed.dask.org/en/latest/plugins.html>`_ +and registers it with the scheduler .. code-block:: python
