This is an automated email from the ASF dual-hosted git repository. ueshin pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 4d21b94 [SPARK-35475][PYTHON] Fix disallow_untyped_defs mypy checks 4d21b94 is described below commit 4d21b94d13daf958edf6bbf49c4aa17f5cb012b5 Author: Takuya UESHIN <ues...@databricks.com> AuthorDate: Fri Jun 11 11:07:11 2021 -0700 [SPARK-35475][PYTHON] Fix disallow_untyped_defs mypy checks ### What changes were proposed in this pull request? Adds more type annotations in the file `python/pyspark/pandas/namespace.py` and fixes the mypy check failures. ### Why are the changes needed? We should enable more disallow_untyped_defs mypy checks. ### Does this PR introduce _any_ user-facing change? Yes. This PR adds more type annotations in pandas APIs on Spark module, which can impact interaction with development tools for users. ### How was this patch tested? The mypy check with a new configuration and existing tests should pass. Closes #32871 from ueshin/issues/SPARK-35475/disallow_untyped_defs. Authored-by: Takuya UESHIN <ues...@databricks.com> Signed-off-by: Takuya UESHIN <ues...@databricks.com> --- python/mypy.ini | 3 - python/pyspark/pandas/frame.py | 8 +- python/pyspark/pandas/namespace.py | 281 ++++++++++++++++++++++--------------- python/pyspark/pandas/utils.py | 2 +- 4 files changed, 175 insertions(+), 119 deletions(-) diff --git a/python/mypy.ini b/python/mypy.ini index 86156fa..ae76f41 100644 --- a/python/mypy.ini +++ b/python/mypy.ini @@ -174,9 +174,6 @@ disallow_untyped_defs = False [mypy-pyspark.pandas.groupby] disallow_untyped_defs = False -[mypy-pyspark.pandas.namespace] -disallow_untyped_defs = False - [mypy-pyspark.pandas.series] disallow_untyped_defs = False diff --git a/python/pyspark/pandas/frame.py b/python/pyspark/pandas/frame.py index d61b80c..0773901 100644 --- a/python/pyspark/pandas/frame.py +++ b/python/pyspark/pandas/frame.py @@ -8929,7 +8929,13 @@ defaultdict(<class 'list'>, {'col..., 'col...})] else: raise TypeError("other must be a pandas-on-Spark DataFrame") - def melt(self, id_vars=None, value_vars=None, var_name=None, value_name="value") -> "DataFrame": + def melt( + self, + id_vars: Optional[Union[Any, Tuple, List[Union[Any, Tuple]]]] = None, + value_vars: Optional[Union[Any, Tuple, List[Union[Any, Tuple]]]] = None, + var_name: Optional[Union[str, List[str]]] = None, + value_name: str = "value", + ) -> "DataFrame": """ Unpivot a DataFrame from wide format to long format, optionally leaving identifier variables set. diff --git a/python/pyspark/pandas/namespace.py b/python/pyspark/pandas/namespace.py index 090a1bf..41ba186 100644 --- a/python/pyspark/pandas/namespace.py +++ b/python/pyspark/pandas/namespace.py @@ -20,6 +20,7 @@ Wrappers around spark that correspond to common pandas functions. """ from typing import ( # noqa: F401 (SPARK-34943) Any, + Callable, Dict, List, Optional, @@ -28,9 +29,11 @@ from typing import ( # noqa: F401 (SPARK-34943) Type, Union, cast, + no_type_check, ) from collections import OrderedDict from collections.abc import Iterable +from datetime import tzinfo from distutils.version import LooseVersion from functools import reduce from io import BytesIO @@ -39,6 +42,7 @@ import json import numpy as np import pandas as pd from pandas.api.types import is_datetime64_dtype, is_datetime64tz_dtype, is_list_like +from pandas.tseries.offsets import DateOffset import pyarrow as pa import pyarrow.parquet as pq from pyspark import sql as spark @@ -79,6 +83,7 @@ from pyspark.pandas.internal import ( ) from pyspark.pandas.series import Series, first_series from pyspark.pandas.spark.utils import as_nullable_spark_type, force_decimal_precision_scale +from pyspark.pandas.typedef.typehints import Dtype from pyspark.pandas.indexes import Index, DatetimeIndex @@ -197,21 +202,21 @@ def range( def read_csv( - path, - sep=",", - header="infer", - names=None, - index_col=None, - usecols=None, - squeeze=False, - mangle_dupe_cols=True, - dtype=None, - nrows=None, - parse_dates=False, - quotechar=None, - escapechar=None, - comment=None, - **options + path: str, + sep: str = ",", + header: Union[str, int, None] = "infer", + names: Optional[Union[str, List[str]]] = None, + index_col: Optional[Union[str, List[str]]] = None, + usecols: Optional[Union[List[int], List[str], Callable[[str], bool]]] = None, + squeeze: bool = False, + mangle_dupe_cols: bool = True, + dtype: Optional[Union[str, Dtype, Dict[str, Union[str, Dtype]]]] = None, + nrows: Optional[int] = None, + parse_dates: bool = False, + quotechar: Optional[str] = None, + escapechar: Optional[str] = None, + comment: Optional[str] = None, + **options: Any ) -> Union[DataFrame, Series]: """Read CSV (comma-separated) file into DataFrame or Series. @@ -221,7 +226,7 @@ def read_csv( The path string storing the CSV file to be read. sep : str, default ‘,’ Delimiter to use. Must be a single character. - header : int, list of int, default ‘infer’ + header : int, default ‘infer’ Whether to to use as the column names, and the start of the data. Default behavior is to infer the column names: if no names are passed the behavior is identical to `header=0` and column names are inferred from @@ -287,7 +292,7 @@ def read_csv( raise ValueError("parse_dates can only be `False`: %s" % parse_dates) if usecols is not None and not callable(usecols): - usecols = list(usecols) + usecols = list(usecols) # type: ignore if usecols is None or callable(usecols) or len(usecols) > 0: reader = default_session().read @@ -341,18 +346,21 @@ def read_csv( column_labels = OrderedDict( (label, col) for label, col in column_labels.items() if usecols(label) ) - missing = [] + missing = [] # type: List[Union[int, str]] elif all(isinstance(col, int) for col in usecols): + usecols_ints = cast(List[int], usecols) new_column_labels = OrderedDict( (label, col) for i, (label, col) in enumerate(column_labels.items()) - if i in usecols + if i in usecols_ints ) missing = [ col - for col in usecols - if col >= len(column_labels) - or list(column_labels)[col] not in new_column_labels + for col in usecols_ints + if ( + col >= len(column_labels) + or list(column_labels)[col] not in new_column_labels + ) ] column_labels = new_column_labels elif all(isinstance(col, str) for col in usecols): @@ -424,7 +432,7 @@ def read_csv( def read_json( - path: str, lines: bool = True, index_col: Optional[Union[str, List[str]]] = None, **options + path: str, lines: bool = True, index_col: Optional[Union[str, List[str]]] = None, **options: Any ) -> DataFrame: """ Convert a JSON string to DataFrame. @@ -486,7 +494,7 @@ def read_delta( version: Optional[str] = None, timestamp: Optional[str] = None, index_col: Optional[Union[str, List[str]]] = None, - **options + **options: Any ) -> DataFrame: """ Read a Delta Lake table on some file system and return a DataFrame. @@ -615,7 +623,7 @@ def read_spark_io( format: Optional[str] = None, schema: Union[str, "StructType"] = None, index_col: Optional[Union[str, List[str]]] = None, - **options + **options: Any ) -> DataFrame: """Load a DataFrame from a Spark data source. @@ -694,7 +702,13 @@ def read_spark_io( ) -def read_parquet(path, columns=None, index_col=None, pandas_metadata=False, **options) -> DataFrame: +def read_parquet( + path: str, + columns: Optional[List[str]] = None, + index_col: Optional[List[str]] = None, + pandas_metadata: bool = False, + **options: Any +) -> DataFrame: """Load a parquet object from the file path, returning a DataFrame. Parameters @@ -748,6 +762,7 @@ def read_parquet(path, columns=None, index_col=None, pandas_metadata=False, **op if index_col is None and pandas_metadata: # Try to read pandas metadata + @no_type_check @pandas_udf("index_col array<string>, index_names array<string>", PandasUDFType.SCALAR) def read_index_metadata(pser): binary = pser.iloc[0] @@ -801,7 +816,7 @@ def read_parquet(path, columns=None, index_col=None, pandas_metadata=False, **op return psdf -def read_clipboard(sep=r"\s+", **kwargs) -> DataFrame: +def read_clipboard(sep: str = r"\s+", **kwargs: Any) -> DataFrame: r""" Read text from clipboard and pass to read_csv. See read_csv for the full argument list @@ -824,31 +839,31 @@ def read_clipboard(sep=r"\s+", **kwargs) -> DataFrame: def read_excel( - io, - sheet_name=0, - header=0, - names=None, - index_col=None, - usecols=None, - squeeze=False, - dtype=None, - engine=None, - converters=None, - true_values=None, - false_values=None, - skiprows=None, - nrows=None, - na_values=None, - keep_default_na=True, - verbose=False, - parse_dates=False, - date_parser=None, - thousands=None, - comment=None, - skipfooter=0, - convert_float=True, - mangle_dupe_cols=True, - **kwds + io: Union[str, Any], + sheet_name: Union[str, int, List[Union[str, int]], None] = 0, + header: Union[int, List[int]] = 0, + names: Optional[List] = None, + index_col: Optional[List[int]] = None, + usecols: Optional[Union[int, str, List[Union[int, str]], Callable[[str], bool]]] = None, + squeeze: bool = False, + dtype: Optional[Dict[str, Union[str, Dtype]]] = None, + engine: Optional[str] = None, + converters: Optional[Dict] = None, + true_values: Optional[Any] = None, + false_values: Optional[Any] = None, + skiprows: Optional[Union[int, List[int]]] = None, + nrows: Optional[int] = None, + na_values: Optional[Any] = None, + keep_default_na: bool = True, + verbose: bool = False, + parse_dates: Union[bool, List, Dict] = False, + date_parser: Optional[Callable] = None, + thousands: Optional[str] = None, + comment: Optional[str] = None, + skipfooter: int = 0, + convert_float: bool = True, + mangle_dupe_cols: bool = True, + **kwds: Any ) -> Union[DataFrame, Series, OrderedDict]: """ Read an Excel file into a pandas-on-Spark DataFrame or Series. @@ -1050,7 +1065,9 @@ def read_excel( 2 None NaN """ - def pd_read_excel(io_or_bin, sn, sq): + def pd_read_excel( + io_or_bin: Any, sn: Union[str, int, List[Union[str, int]], None], sq: bool + ) -> pd.DataFrame: return pd.read_excel( io=BytesIO(io_or_bin) if isinstance(io_or_bin, (bytes, bytearray)) else io_or_bin, sheet_name=sn, @@ -1099,8 +1116,10 @@ def read_excel( return cast(Union[DataFrame, Series], from_pandas(pdf_or_psers)) else: - def read_excel_on_spark(pdf_or_pser, sn): - + def read_excel_on_spark( + pdf_or_pser: Union[pd.DataFrame, pd.Series], + sn: Union[str, int, List[Union[str, int]], None], + ) -> Union[DataFrame, Series]: if isinstance(pdf_or_pser, pd.Series): pdf = pdf_or_pser.to_frame() else: @@ -1111,7 +1130,7 @@ def read_excel( as_nullable_spark_type(psdf._internal.spark_frame.drop(*HIDDEN_COLUMNS).schema) ) - def output_func(pdf): + def output_func(pdf: pd.DataFrame) -> pd.DataFrame: pdf = pd.concat( [pd_read_excel(bin, sn=sn, sq=False) for bin in pdf[pdf.columns[0]]] ) @@ -1153,21 +1172,21 @@ def read_excel( def read_html( - io, - match=".+", - flavor=None, - header=None, - index_col=None, - skiprows=None, - attrs=None, - parse_dates=False, - thousands=",", - encoding=None, - decimal=".", - converters=None, - na_values=None, - keep_default_na=True, - displayed_only=True, + io: Union[str, Any], + match: str = ".+", + flavor: Optional[str] = None, + header: Optional[Union[int, List[int]]] = None, + index_col: Optional[Union[int, List[int]]] = None, + skiprows: Optional[Union[int, List[int], slice]] = None, + attrs: Optional[Dict[str, str]] = None, + parse_dates: bool = False, + thousands: str = ",", + encoding: Optional[str] = None, + decimal: str = ".", + converters: Optional[Dict] = None, + na_values: Optional[Any] = None, + keep_default_na: bool = True, + displayed_only: bool = True, ) -> List[DataFrame]: r"""Read HTML tables into a ``list`` of ``DataFrame`` objects. @@ -1290,7 +1309,12 @@ def read_html( # TODO: add `coerce_float` and 'parse_dates' parameters def read_sql_table( - table_name, con, schema=None, index_col=None, columns=None, **options + table_name: str, + con: str, + schema: Optional[str] = None, + index_col: Optional[Union[str, List[str]]] = None, + columns: Optional[Union[str, List[str]]] = None, + **options: Any ) -> DataFrame: """ Read SQL database table into a DataFrame. @@ -1355,7 +1379,9 @@ def read_sql_table( # TODO: add `coerce_float`, `params`, and 'parse_dates' parameters -def read_sql_query(sql, con, index_col=None, **options) -> DataFrame: +def read_sql_query( + sql: str, con: str, index_col: Optional[Union[str, List[str]]] = None, **options: Any +) -> DataFrame: """Read SQL query into a DataFrame. Returns a DataFrame corresponding to the result set of the query @@ -1408,7 +1434,13 @@ def read_sql_query(sql, con, index_col=None, **options) -> DataFrame: # TODO: add `coerce_float`, `params`, and 'parse_dates' parameters -def read_sql(sql, con, index_col=None, columns=None, **options) -> DataFrame: +def read_sql( + sql: str, + con: str, + index_col: Optional[Union[str, List[str]]] = None, + columns: Optional[Union[str, List[str]]] = None, + **options: Any +) -> DataFrame: """ Read SQL query or database table into a DataFrame. @@ -1462,8 +1494,14 @@ def read_sql(sql, con, index_col=None, columns=None, **options) -> DataFrame: return read_sql_query(sql, con, index_col=index_col, **options) +@no_type_check def to_datetime( - arg, errors="raise", format=None, unit=None, infer_datetime_format=False, origin="unix" + arg, + errors: str = "raise", + format: Optional[str] = None, + unit: Optional[str] = None, + infer_datetime_format: bool = False, + origin: str = "unix", ): """ Convert argument to datetime. @@ -1579,7 +1617,7 @@ def to_datetime( DatetimeIndex(['1960-01-02', '1960-01-03', '1960-01-04'], dtype='datetime64[ns]', freq=None) """ - def pandas_to_datetime(pser_or_pdf) -> Series[np.datetime64]: + def pandas_to_datetime(pser_or_pdf: Union[pd.DataFrame, pd.Series]) -> Series[np.datetime64]: if isinstance(pser_or_pdf, pd.DataFrame): pser_or_pdf = pser_or_pdf[["year", "month", "day"]] return pd.to_datetime( @@ -1607,15 +1645,15 @@ def to_datetime( def date_range( - start=None, - end=None, - periods=None, - freq=None, - tz=None, - normalize=False, - name=None, - closed=None, - **kwargs + start: Union[str, Any] = None, + end: Union[str, Any] = None, + periods: Optional[int] = None, + freq: Optional[Union[str, DateOffset]] = None, + tz: Optional[Union[str, tzinfo]] = None, + normalize: bool = False, + name: Optional[str] = None, + closed: Optional[str] = None, + **kwargs: Any ) -> DatetimeIndex: """ Return a fixed frequency DatetimeIndex. @@ -1770,14 +1808,14 @@ def date_range( def get_dummies( - data, - prefix=None, - prefix_sep="_", - dummy_na=False, - columns=None, - sparse=False, - drop_first=False, - dtype=None, + data: Union[DataFrame, Series], + prefix: Optional[Union[str, List[str], Dict[str, str]]] = None, + prefix_sep: str = "_", + dummy_na: bool = False, + columns: Optional[Union[Any, Tuple, List[Union[Any, Tuple]]]] = None, + sparse: bool = False, + drop_first: bool = False, + dtype: Optional[Union[str, Dtype]] = None, ) -> DataFrame: """ Convert categorical variable into dummy/indicator variables, also @@ -1976,11 +2014,11 @@ def get_dummies( if drop_first: values = values[1:] - def column_name(value): - if prefix is None or prefix[i] == "": + def column_name(value: str) -> str: + if prefix is None or cast(List[str], prefix)[i] == "": return value else: - return "{}{}{}".format(prefix[i], prefix_sep, value) + return "{}{}{}".format(cast(List[str], prefix)[i], prefix_sep, value) for value in values: remaining_columns.append( @@ -1995,7 +2033,13 @@ def get_dummies( # TODO: there are many parameters to implement and support. See pandas's pd.concat. -def concat(objs, axis=0, join="outer", ignore_index=False, sort=False) -> Union[Series, DataFrame]: +def concat( + objs: List[Union[DataFrame, Series]], + axis: Union[int, str] = 0, + join: str = "outer", + ignore_index: bool = False, + sort: bool = False, +) -> Union[Series, DataFrame]: """ Concatenate pandas-on-Spark objects along a particular axis with optional set logic along the other axes. @@ -2206,6 +2250,7 @@ def concat(objs, axis=0, join="outer", ignore_index=False, sort=False) -> Union[ if len(psdfs_not_same_anchor) > 0: + @no_type_check def resolve_func(psdf, this_column_labels, that_column_labels): raise AssertionError("This should not happen.") @@ -2252,16 +2297,15 @@ def concat(objs, axis=0, join="outer", ignore_index=False, sort=False) -> Union[ series_names.add(obj.name) obj = obj.to_frame(DEFAULT_SERIES_NAME) new_objs.append(obj) - objs = new_objs - column_labels_levels = set(obj._internal.column_labels_level for obj in objs) + column_labels_levels = set(obj._internal.column_labels_level for obj in new_objs) if len(column_labels_levels) != 1: raise ValueError("MultiIndex columns should have the same levels") # DataFrame, DataFrame, ... # All Series are converted into DataFrame and then compute concat. if not ignore_index: - indices_of_psdfs = [psdf.index for psdf in objs] + indices_of_psdfs = [psdf.index for psdf in new_objs] index_of_first_psdf = indices_of_psdfs[0] for index_of_psdf in indices_of_psdfs: if index_of_first_psdf.names != index_of_psdf.names: @@ -2274,20 +2318,20 @@ def concat(objs, axis=0, join="outer", ignore_index=False, sort=False) -> Union[ ) ) - column_labels_of_psdfs = [psdf._internal.column_labels for psdf in objs] + column_labels_of_psdfs = [psdf._internal.column_labels for psdf in new_objs] if ignore_index: - index_names_of_psdfs = [[] for _ in objs] # type: List + index_names_of_psdfs = [[] for _ in new_objs] # type: List else: - index_names_of_psdfs = [psdf._internal.index_names for psdf in objs] + index_names_of_psdfs = [psdf._internal.index_names for psdf in new_objs] if all(name == index_names_of_psdfs[0] for name in index_names_of_psdfs) and all( idx == column_labels_of_psdfs[0] for idx in column_labels_of_psdfs ): # If all columns are in the same order and values, use it. - psdfs = objs + psdfs = new_objs else: if join == "inner": - interested_columns = set.intersection(*map(set, column_labels_of_psdfs)) + interested_columns = set.intersection(*map(lambda x: set(x), column_labels_of_psdfs)) # Keep the column order with its firsts DataFrame. merged_columns = [ label for label in column_labels_of_psdfs[0] if label in interested_columns @@ -2299,7 +2343,7 @@ def concat(objs, axis=0, join="outer", ignore_index=False, sort=False) -> Union[ # FIXME: better ordering merged_columns = sorted(merged_columns, key=name_like_string) - psdfs = [psdf[merged_columns] for psdf in objs] + psdfs = [psdf[merged_columns] for psdf in new_objs] elif join == "outer": merged_columns = [] for labels in column_labels_of_psdfs: @@ -2320,7 +2364,7 @@ def concat(objs, axis=0, join="outer", ignore_index=False, sort=False) -> Union[ merged_columns = sorted(merged_columns, key=name_like_string) psdfs = [] - for psdf in objs: + for psdf in new_objs: columns_to_add = list(set(merged_columns) - set(psdf._internal.column_labels)) # TODO: NaN and None difference for missing values. pandas seems filling NaN. @@ -2391,13 +2435,20 @@ def concat(objs, axis=0, join="outer", ignore_index=False, sort=False) -> Union[ return result_psdf -def melt(frame, id_vars=None, value_vars=None, var_name=None, value_name="value") -> DataFrame: +def melt( + frame: DataFrame, + id_vars: Optional[Union[Any, Tuple, List[Union[Any, Tuple]]]] = None, + value_vars: Optional[Union[Any, Tuple, List[Union[Any, Tuple]]]] = None, + var_name: Optional[Union[str, List[str]]] = None, + value_name: str = "value", +) -> DataFrame: return DataFrame.melt(frame, id_vars, value_vars, var_name, value_name) melt.__doc__ = DataFrame.melt.__doc__ +@no_type_check def isna(obj): """ Detect missing values for an array-like object. @@ -2480,6 +2531,7 @@ def isna(obj): isnull = isna +@no_type_check def notna(obj): """ Detect existing (non-missing) values. @@ -2555,8 +2607,8 @@ notnull = notna def merge( - obj, - right: "DataFrame", + obj: DataFrame, + right: DataFrame, how: str = "inner", on: Union[Any, List[Any], Tuple, List[Tuple]] = None, left_on: Union[Any, List[Any], Tuple, List[Tuple]] = None, @@ -2687,6 +2739,7 @@ def merge( ) +@no_type_check def to_numeric(arg): """ Convert argument to a numeric type. @@ -2759,7 +2812,7 @@ def to_numeric(arg): return pd.to_numeric(arg) -def broadcast(obj) -> DataFrame: +def broadcast(obj: DataFrame) -> DataFrame: """ Marks a DataFrame as small enough for use in broadcast joins. @@ -2801,10 +2854,10 @@ def broadcast(obj) -> DataFrame: def read_orc( - path, + path: str, columns: Optional[List[str]] = None, index_col: Optional[Union[str, List[str]]] = None, - **options + **options: Any ) -> "DataFrame": """ Load an ORC object from the file path, returning a DataFrame. @@ -2892,7 +2945,7 @@ _get_dummies_acceptable_types = _get_dummies_default_accept_types + ( ) -def _test(): +def _test() -> None: import os import doctest import shutil diff --git a/python/pyspark/pandas/utils.py b/python/pyspark/pandas/utils.py index 8acc967..794c61f 100644 --- a/python/pyspark/pandas/utils.py +++ b/python/pyspark/pandas/utils.py @@ -600,7 +600,7 @@ def column_labels_level(column_labels: List[Tuple]) -> int: return list(levels)[0] -def name_like_string(name: Optional[Union[str, Tuple]]) -> str: +def name_like_string(name: Optional[Union[Any, Tuple]]) -> str: """ Return the name-like strings from str or tuple of str --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org