[GitHub] [spark] moskvax commented on a change in pull request #28743: [SPARK-31920][PYTHON] Fix pandas conversion using Arrow with __arrow_array__ columns
moskvax commented on a change in pull request #28743: URL: https://github.com/apache/spark/pull/28743#discussion_r438840313 ## File path: python/pyspark/sql/pandas/conversion.py ## @@ -394,10 +394,11 @@ def _create_from_pandas_with_arrow(self, pdf, schema, timezone): # Create the Spark schema from list of names passed in with Arrow types if isinstance(schema, (list, tuple)): -arrow_schema = pa.Schema.from_pandas(pdf, preserve_index=False) +inferred_types = [pa.infer_type(s, mask=s.isna(), from_pandas=True) Review comment: > For the second case above, so `pa.Schema.from_pandas` returns correct types in the case? When `pa.infer_type` infers the specified array types, will it just throw error or return a wrong array type? `pa.infer_type` will throw an error for these arrays. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] moskvax commented on a change in pull request #28743: [SPARK-31920][PYTHON] Fix pandas conversion using Arrow with __arrow_array__ columns
moskvax commented on a change in pull request #28743: URL: https://github.com/apache/spark/pull/28743#discussion_r438051160 ## File path: python/pyspark/sql/pandas/serializers.py ## @@ -150,15 +151,22 @@ def _create_batch(self, series): series = ((s, None) if not isinstance(s, (list, tuple)) else s for s in series) def create_array(s, t): -mask = s.isnull() +# Create with __arrow_array__ if the series' backing array implements it +series_array = getattr(s, 'array', s._values) +if hasattr(series_array, "__arrow_array__"): +return series_array.__arrow_array__(type=t) + # Ensure timestamp series are in expected form for Spark internal representation if t is not None and pa.types.is_timestamp(t): s = _check_series_convert_timestamps_internal(s, self._timezone) -elif type(s.dtype) == pd.CategoricalDtype: +elif is_categorical_dtype(s.dtype): # Note: This can be removed once minimum pyarrow version is >= 0.16.1 s = s.astype(s.dtypes.categories.dtype) try: -array = pa.Array.from_pandas(s, mask=mask, type=t, safe=self._safecheck) +mask = s.isnull() +# pass _ndarray_values to avoid potential failed type checks from pandas array types Review comment: This is a workaround for `IntegerArray` in pre-1.0.0 pandas, which did not yet implement `__arrow_array__`, so pyarrow expects it to be a NumPy array: ```pycon >>> import pandas as pd >>> import pyarrow as pa >>> print(pd.__version__, pa.__version__) 0.25.0 0.17.1 >>> s = pd.Series(range(3), dtype=pd.Int64Dtype()) >>> pa.Array.from_pandas(s) Traceback (most recent call last): File "", line 1, in File "pyarrow/array.pxi", line 805, in pyarrow.lib.Array.from_pandas File "pyarrow/array.pxi", line 265, in pyarrow.lib.array File "pyarrow/types.pxi", line 76, in pyarrow.lib._datatype_to_pep3118 File "pyarrow/array.pxi", line 64, in pyarrow.lib._ndarray_to_type File "pyarrow/error.pxi", line 108, in pyarrow.lib.check_status pyarrow.lib.ArrowTypeError: Did not pass numpy.dtype object >>> pa.Array.from_pandas(s, type=pa.int64()) Traceback (most recent call last): File "", line 1, in File "pyarrow/array.pxi", line 805, in pyarrow.lib.Array.from_pandas File "pyarrow/array.pxi", line 265, in pyarrow.lib.array File "pyarrow/array.pxi", line 80, in pyarrow.lib._ndarray_to_array File "pyarrow/error.pxi", line 85, in pyarrow.lib.check_status pyarrow.lib.ArrowInvalid: Input object was not a NumPy array >>> pa.Array.from_pandas(s._ndarray_values, type=pa.int64()) [ 0, 1, 2 ] >>> ``` I'll update the comment to mention this. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] moskvax commented on a change in pull request #28743: [SPARK-31920][PYTHON] Fix pandas conversion using Arrow with __arrow_array__ columns
moskvax commented on a change in pull request #28743: URL: https://github.com/apache/spark/pull/28743#discussion_r438051119 ## File path: python/pyspark/sql/pandas/conversion.py ## @@ -394,10 +394,11 @@ def _create_from_pandas_with_arrow(self, pdf, schema, timezone): # Create the Spark schema from list of names passed in with Arrow types if isinstance(schema, (list, tuple)): -arrow_schema = pa.Schema.from_pandas(pdf, preserve_index=False) +inferred_types = [pa.infer_type(s, mask=s.isna(), from_pandas=True) Review comment: pyarrow < 0.17.0 cannot handle either ([ARROW-8159](https://issues.apache.org/jira/browse/ARROW-8159)). pyarrow 0.17.x works as long as the columns that contain `pd.NA` values are not `object`-dtyped, which is the case by default as of pandas 1.0.4 (cf pandas-dev/pandas#32931). `pa.infer_type` can take a mask and thus avoids trying to infer the type of `pd.NA` values, which is what causes `pa.Schema.from_pandas` to fail here. `pa.Schema.from_pandas` returns different types from `pa.infer_type` in two cases: 1. `Categorical` arrays * `pa.Schema.from_pandas` returns a `DictionaryType` * `pa.infer_type` returns the `value_type` of the `DictionaryType`, which is what is already used to determine the Spark type of the resulting column 2. `__arrow_array__`-implementing arrays which return a specialised Arrow type (`IntervalArray`, `PeriodArray`) * `pa.Schema.from_pandas` returns the type of the array returned by `__arrow_array__` * `pa.infer_type` does not check for `__arrow_array__` and thus fails with these arrays, however these types cannot currently be converted to Spark types anyway Neither of these cases cause regressions, which is why I propose replacing `pa.Schema.from_pandas` with `pa.infer_type` here. ## File path: python/pyspark/sql/pandas/serializers.py ## @@ -150,15 +151,22 @@ def _create_batch(self, series): series = ((s, None) if not isinstance(s, (list, tuple)) else s for s in series) def create_array(s, t): -mask = s.isnull() +# Create with __arrow_array__ if the series' backing array implements it +series_array = getattr(s, 'array', s._values) +if hasattr(series_array, "__arrow_array__"): +return series_array.__arrow_array__(type=t) + # Ensure timestamp series are in expected form for Spark internal representation if t is not None and pa.types.is_timestamp(t): s = _check_series_convert_timestamps_internal(s, self._timezone) -elif type(s.dtype) == pd.CategoricalDtype: +elif is_categorical_dtype(s.dtype): # Note: This can be removed once minimum pyarrow version is >= 0.16.1 s = s.astype(s.dtypes.categories.dtype) try: -array = pa.Array.from_pandas(s, mask=mask, type=t, safe=self._safecheck) +mask = s.isnull() +# pass _ndarray_values to avoid potential failed type checks from pandas array types Review comment: This is a workaround for `IntegerArray` in pre-1.0.0 pandas, which did not yet implement `__arrow_array__`, so pyarrow expects it to be a NumPy array: ```pycon >>> import pandas as pd >>> import pyarrow as pa >>> print(pd.__version__, pa.__version__) 0.25.0 0.17.1 >>> s = pd.Series(range(3), dtype=pd.Int64Dtype()) >>> pa.Array.from_pandas(s) Traceback (most recent call last): File "", line 1, in File "pyarrow/array.pxi", line 805, in pyarrow.lib.Array.from_pandas File "pyarrow/array.pxi", line 265, in pyarrow.lib.array File "pyarrow/types.pxi", line 76, in pyarrow.lib._datatype_to_pep3118 File "pyarrow/array.pxi", line 64, in pyarrow.lib._ndarray_to_type File "pyarrow/error.pxi", line 108, in pyarrow.lib.check_status pyarrow.lib.ArrowTypeError: Did not pass numpy.dtype object >>> pa.Array.from_pandas(s, type=pa.int64()) Traceback (most recent call last): File "", line 1, in File "pyarrow/array.pxi", line 805, in pyarrow.lib.Array.from_pandas File "pyarrow/array.pxi", line 265, in pyarrow.lib.array File "pyarrow/array.pxi", line 80, in pyarrow.lib._ndarray_to_array File "pyarrow/error.pxi", line 85, in pyarrow.lib.check_status pyarrow.lib.ArrowInvalid: Input object was not a NumPy array >>> pa.Array.from_pandas(s._ndarray_values, type=pa.int64()) [ 0, 1, 2 ] >>> ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [spark] moskvax commented on a change in pull request #28743: [SPARK-31920][PYTHON] Fix pandas conversion using Arrow with __arrow_array__ columns
moskvax commented on a change in pull request #28743: URL: https://github.com/apache/spark/pull/28743#discussion_r438051033 ## File path: python/pyspark/sql/pandas/conversion.py ## @@ -394,10 +394,11 @@ def _create_from_pandas_with_arrow(self, pdf, schema, timezone): # Create the Spark schema from list of names passed in with Arrow types if isinstance(schema, (list, tuple)): -arrow_schema = pa.Schema.from_pandas(pdf, preserve_index=False) +inferred_types = [pa.infer_type(s, mask=s.isna(), from_pandas=True) + for s in (pdf[c] for c in pdf)] struct = StructType() -for name, field in zip(schema, arrow_schema): -struct.add(name, from_arrow_type(field.type), nullable=field.nullable) +for name, t in zip(schema, inferred_types): +struct.add(name, from_arrow_type(t), nullable=True) Review comment: Sounds good, will update with a comment. Alternatively, `any(s.isna())` could be checked if we wanted to actively infer nullability here. This would change existing behavior as well as being inconsistent with the non-Arrow path, though, which similarly defaults to inferred types being nullable: https://github.com/apache/spark/blob/43063e2db2bf7469f985f1954d8615b95cf5c578/python/pyspark/sql/types.py#L1069 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] moskvax commented on a change in pull request #28743: [SPARK-31920][PYTHON] Fix pandas conversion using Arrow with __arrow_array__ columns
moskvax commented on a change in pull request #28743: URL: https://github.com/apache/spark/pull/28743#discussion_r437872692 ## File path: python/pyspark/sql/pandas/serializers.py ## @@ -150,15 +151,22 @@ def _create_batch(self, series): series = ((s, None) if not isinstance(s, (list, tuple)) else s for s in series) def create_array(s, t): -mask = s.isnull() +# Create with __arrow_array__ if the series' backing array implements it +series_array = getattr(s, 'array', s._values) +if hasattr(series_array, "__arrow_array__"): +return series_array.__arrow_array__(type=t) + # Ensure timestamp series are in expected form for Spark internal representation if t is not None and pa.types.is_timestamp(t): s = _check_series_convert_timestamps_internal(s, self._timezone) -elif type(s.dtype) == pd.CategoricalDtype: +elif is_categorical_dtype(s.dtype): Review comment: By the way, this change was made as `CategoricalDtype` is only imported into the root pandas namespace after pandas 0.24.0, which was causing `AttributeError` when testing with earlier versions. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] moskvax commented on a change in pull request #28743: [SPARK-31920][PYTHON] Fix pandas conversion using Arrow with __arrow_array__ columns
moskvax commented on a change in pull request #28743: URL: https://github.com/apache/spark/pull/28743#discussion_r437858625 ## File path: python/pyspark/sql/tests/test_arrow.py ## @@ -30,10 +30,14 @@ pandas_requirement_message, pyarrow_requirement_message from pyspark.testing.utils import QuietTest from pyspark.util import _exception_message +from distutils.version import LooseVersion if have_pandas: import pandas as pd from pandas.util.testing import assert_frame_equal +pandas_version = LooseVersion(pd.__version__) +else: +pandas_version = LooseVersion("0") Review comment: Nice, will update This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] moskvax commented on a change in pull request #28743: [SPARK-31920][PYTHON] Fix pandas conversion using Arrow with __arrow_array__ columns
moskvax commented on a change in pull request #28743: URL: https://github.com/apache/spark/pull/28743#discussion_r437858389 ## File path: python/pyspark/sql/pandas/conversion.py ## @@ -394,10 +394,11 @@ def _create_from_pandas_with_arrow(self, pdf, schema, timezone): # Create the Spark schema from list of names passed in with Arrow types if isinstance(schema, (list, tuple)): -arrow_schema = pa.Schema.from_pandas(pdf, preserve_index=False) +inferred_types = [pa.infer_type(s, mask=s.isna(), from_pandas=True) + for s in (pdf[c] for c in pdf)] struct = StructType() -for name, field in zip(schema, arrow_schema): -struct.add(name, from_arrow_type(field.type), nullable=field.nullable) +for name, t in zip(schema, inferred_types): +struct.add(name, from_arrow_type(t), nullable=True) Review comment: `infer_type` only returns a type, not a `field`, which would supposedly have nullability information. But it appears that in the implementation of `Schema.from_pandas` ([link](https://github.com/apache/arrow/blob/b058cf0d1c26ad7984c104bb84322cc7dcc66f00/python/pyarrow/types.pxi#L1328)), inferring nullability was not actually done and the default `nullable=True` would always be returned. So this change is just following the existing behaviour of `Schema.from_pandas`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org