This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 74d3d87171d [SPARK-45166][PYTHON] Clean up unused code paths for pyarrow<4 74d3d87171d is described below commit 74d3d87171ddc6e648669bdb77ea476236bd6add Author: Ruifeng Zheng <ruife...@apache.org> AuthorDate: Fri Sep 15 16:13:26 2023 +0900 [SPARK-45166][PYTHON] Clean up unused code paths for pyarrow<4 ### What changes were proposed in this pull request? Clean up unused code paths for pyarrow<4 ### Why are the changes needed? the minimum version had been upgraded to 4.0.0 in https://issues.apache.org/jira/browse/SPARK-44183 ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? updated CI ### Was this patch authored or co-authored using generative AI tooling? No Closes #42928 from zhengruifeng/py_pa_version. Authored-by: Ruifeng Zheng <ruife...@apache.org> Signed-off-by: Hyukjin Kwon <gurwls...@apache.org> --- python/pyspark/pandas/__init__.py | 9 +- python/pyspark/sql/pandas/types.py | 20 ---- .../sql/tests/pandas/test_pandas_udf_scalar.py | 12 +-- python/pyspark/sql/tests/test_arrow.py | 108 ++------------------- 4 files changed, 13 insertions(+), 136 deletions(-) diff --git a/python/pyspark/pandas/__init__.py b/python/pyspark/pandas/__init__.py index d8ce385639c..44365c8e6b8 100644 --- a/python/pyspark/pandas/__init__.py +++ b/python/pyspark/pandas/__init__.py @@ -23,7 +23,6 @@ import os import sys import warnings -from distutils.version import LooseVersion from typing import Any from pyspark.pandas.missing.general_functions import MissingPandasLikeGeneralFunctions @@ -40,13 +39,7 @@ except ImportError as e: else: raise - -import pyarrow - -if ( - LooseVersion(pyarrow.__version__) >= LooseVersion("2.0.0") - and "PYARROW_IGNORE_TIMEZONE" not in os.environ -): +if "PYARROW_IGNORE_TIMEZONE" not in os.environ: warnings.warn( "'PYARROW_IGNORE_TIMEZONE' environment variable was not set. It is required to " "set this environment variable to '1' in both driver and executor sides if you use " diff --git a/python/pyspark/sql/pandas/types.py b/python/pyspark/sql/pandas/types.py index 54cd6fa7016..92e2ef1dc44 100644 --- a/python/pyspark/sql/pandas/types.py +++ b/python/pyspark/sql/pandas/types.py @@ -60,7 +60,6 @@ if TYPE_CHECKING: def to_arrow_type(dt: DataType) -> "pa.DataType": """Convert Spark data type to pyarrow type""" - from distutils.version import LooseVersion import pyarrow as pa if type(dt) == BooleanType: @@ -93,21 +92,9 @@ def to_arrow_type(dt: DataType) -> "pa.DataType": elif type(dt) == DayTimeIntervalType: arrow_type = pa.duration("us") elif type(dt) == ArrayType: - if type(dt.elementType) == StructType and LooseVersion(pa.__version__) < LooseVersion( - "2.0.0" - ): - raise PySparkTypeError( - error_class="UNSUPPORTED_DATA_TYPE_FOR_ARROW_VERSION", - message_parameters={"data_type": "Array of StructType"}, - ) field = pa.field("element", to_arrow_type(dt.elementType), nullable=dt.containsNull) arrow_type = pa.list_(field) elif type(dt) == MapType: - if LooseVersion(pa.__version__) < LooseVersion("2.0.0"): - raise PySparkTypeError( - error_class="UNSUPPORTED_DATA_TYPE_FOR_ARROW_VERSION", - message_parameters={"data_type": "MapType"}, - ) key_field = pa.field("key", to_arrow_type(dt.keyType), nullable=False) value_field = pa.field("value", to_arrow_type(dt.valueType), nullable=dt.valueContainsNull) arrow_type = pa.map_(key_field, value_field) @@ -142,8 +129,6 @@ def to_arrow_schema(schema: StructType) -> "pa.Schema": def from_arrow_type(at: "pa.DataType", prefer_timestamp_ntz: bool = False) -> DataType: """Convert pyarrow type to Spark data type.""" - from distutils.version import LooseVersion - import pyarrow as pa import pyarrow.types as types spark_type: DataType @@ -182,11 +167,6 @@ def from_arrow_type(at: "pa.DataType", prefer_timestamp_ntz: bool = False) -> Da elif types.is_list(at): spark_type = ArrayType(from_arrow_type(at.value_type, prefer_timestamp_ntz)) elif types.is_map(at): - if LooseVersion(pa.__version__) < LooseVersion("2.0.0"): - raise PySparkTypeError( - error_class="UNSUPPORTED_DATA_TYPE_FOR_ARROW_VERSION", - message_parameters={"data_type": "MapType"}, - ) spark_type = MapType( from_arrow_type(at.key_type, prefer_timestamp_ntz), from_arrow_type(at.item_type, prefer_timestamp_ntz), diff --git a/python/pyspark/sql/tests/pandas/test_pandas_udf_scalar.py b/python/pyspark/sql/tests/pandas/test_pandas_udf_scalar.py index 8cb397ab95d..0e0018d80da 100644 --- a/python/pyspark/sql/tests/pandas/test_pandas_udf_scalar.py +++ b/python/pyspark/sql/tests/pandas/test_pandas_udf_scalar.py @@ -22,7 +22,6 @@ import time import unittest from datetime import date, datetime from decimal import Decimal -from distutils.version import LooseVersion from typing import cast from pyspark import TaskContext @@ -599,14 +598,9 @@ class ScalarPandasUDFTestsMixin: schema = StructType([StructField("map", MapType(StringType(), LongType()))]) df = self.spark.createDataFrame(data, schema=schema) for udf_type in [PandasUDFType.SCALAR, PandasUDFType.SCALAR_ITER]: - if LooseVersion(pa.__version__) < LooseVersion("2.0.0"): - with QuietTest(self.sc): - with self.assertRaisesRegex(Exception, "MapType.*not supported"): - pandas_udf(lambda x: x, MapType(StringType(), LongType()), udf_type) - else: - map_f = pandas_udf(lambda x: x, MapType(StringType(), LongType()), udf_type) - result = df.select(map_f(col("map"))) - self.assertEqual(df.collect(), result.collect()) + map_f = pandas_udf(lambda x: x, MapType(StringType(), LongType()), udf_type) + result = df.select(map_f(col("map"))) + self.assertEqual(df.collect(), result.collect()) def test_vectorized_udf_complex(self): df = self.spark.range(10).select( diff --git a/python/pyspark/sql/tests/test_arrow.py b/python/pyspark/sql/tests/test_arrow.py index a30f9316401..28244b14385 100644 --- a/python/pyspark/sql/tests/test_arrow.py +++ b/python/pyspark/sql/tests/test_arrow.py @@ -20,8 +20,6 @@ import os import threading import time import unittest -import warnings -from distutils.version import LooseVersion from typing import cast from collections import namedtuple @@ -195,46 +193,6 @@ class ArrowTestsMixin: + [np.array([[0.1, 0.1, 0.1], [0.2, 0.2, 0.2]]).astype(t) for t in float_dtypes] ) - @unittest.skipIf( - not have_pyarrow or LooseVersion(pa.__version__) >= "2.0", - "will not fallback with pyarrow>=2.0", - ) - def test_toPandas_fallback_enabled(self): - with self.sql_conf({"spark.sql.execution.arrow.pyspark.fallback.enabled": True}): - schema = StructType([StructField("a", ArrayType(StructType()), True)]) - df = self.spark.createDataFrame([([Row()],)], schema=schema) - with QuietTest(self.sc): - with self.warnings_lock: - with warnings.catch_warnings(record=True) as warns: - # we want the warnings to appear even if this test is run from a subclass - warnings.simplefilter("always") - pdf = df.toPandas() - # Catch and check the last UserWarning. - user_warns = [ - warn.message for warn in warns if isinstance(warn.message, UserWarning) - ] - self.assertTrue(len(user_warns) > 0) - self.assertTrue("Attempting non-optimization" in str(user_warns[-1])) - assert_frame_equal(pdf, pd.DataFrame({"a": [[Row()]]})) - - @unittest.skipIf( - not have_pyarrow or LooseVersion(pa.__version__) >= "2.0", - "will not fallback with pyarrow>=2.0", - ) - def test_toPandas_fallback_disabled(self): - schema = StructType([StructField("a", ArrayType(StructType()), True)]) - df = self.spark.createDataFrame([(None,)], schema=schema) - with QuietTest(self.sc): - with self.warnings_lock: - with self.assertRaises(PySparkTypeError) as pe: - df.toPandas() - - self.check_error( - exception=pe.exception, - error_class="UNSUPPORTED_DATA_TYPE_FOR_ARROW_VERSION", - message_parameters={"data_type": "Array of StructType"}, - ) - def test_toPandas_empty_df_arrow_enabled(self): for arrow_enabled in [True, False]: with self.subTest(arrow_enabled=arrow_enabled): @@ -654,17 +612,13 @@ class ArrowTestsMixin: ): with self.subTest(schema=schema): with self.sql_conf({"spark.sql.execution.arrow.pyspark.enabled": arrow_enabled}): - if arrow_enabled and LooseVersion(pa.__version__) < LooseVersion("2.0.0"): - with self.assertRaisesRegex(Exception, "MapType.*only.*pyarrow 2.0.0"): - self.spark.createDataFrame(pdf, schema=schema).collect() - else: - df = self.spark.createDataFrame(pdf, schema=schema) + df = self.spark.createDataFrame(pdf, schema=schema) - result = df.collect() + result = df.collect() - for row in result: - i, m = row - self.assertEqual(m, map_data[i]) + for row in result: + i, m = row + self.assertEqual(m, map_data[i]) def test_createDataFrame_with_struct_type(self): for arrow_enabled in [True, False]: @@ -732,12 +686,8 @@ class ArrowTestsMixin: df = self.spark.createDataFrame(origin, schema=schema) with self.sql_conf({"spark.sql.execution.arrow.pyspark.enabled": arrow_enabled}): - if arrow_enabled and LooseVersion(pa.__version__) < LooseVersion("2.0.0"): - with self.assertRaisesRegex(Exception, "MapType.*only.*pyarrow 2.0.0"): - df.toPandas() - else: - pdf = df.toPandas() - assert_frame_equal(origin, pdf) + pdf = df.toPandas() + assert_frame_equal(origin, pdf) def test_toPandas_with_map_type_nulls(self): with QuietTest(self.sc): @@ -758,12 +708,8 @@ class ArrowTestsMixin: df = self.spark.createDataFrame(origin, schema=schema) with self.sql_conf({"spark.sql.execution.arrow.pyspark.enabled": arrow_enabled}): - if arrow_enabled and LooseVersion(pa.__version__) < LooseVersion("2.0.0"): - with self.assertRaisesRegex(Exception, "MapType.*only.*pyarrow 2.0.0"): - df.toPandas() - else: - pdf = df.toPandas() - assert_frame_equal(origin, pdf) + pdf = df.toPandas() + assert_frame_equal(origin, pdf) def test_createDataFrame_with_int_col_names(self): for arrow_enabled in [True, False]: @@ -779,42 +725,6 @@ class ArrowTestsMixin: pdf_col_names = [str(c) for c in pdf.columns] self.assertEqual(pdf_col_names, df.columns) - @unittest.skipIf( - not have_pyarrow or LooseVersion(pa.__version__) >= "2.0", - "will not fallback with pyarrow>=2.0", - ) - def test_createDataFrame_fallback_enabled(self): - with QuietTest(self.sc): - with self.sql_conf({"spark.sql.execution.arrow.pyspark.fallback.enabled": True}): - with warnings.catch_warnings(record=True) as warns: - # we want the warnings to appear even if this test is run from a subclass - warnings.simplefilter("always") - df = self.spark.createDataFrame( - pd.DataFrame({"a": [[Row()]]}), "a: array<struct<>>" - ) - # Catch and check the last UserWarning. - user_warns = [ - warn.message for warn in warns if isinstance(warn.message, UserWarning) - ] - self.assertTrue(len(user_warns) > 0) - self.assertTrue("Attempting non-optimization" in str(user_warns[-1])) - self.assertEqual(df.collect(), [Row(a=[Row()])]) - - @unittest.skipIf( - not have_pyarrow or LooseVersion(pa.__version__) >= "2.0", - "will not fallback with pyarrow>=2.0", - ) - def test_createDataFrame_fallback_disabled(self): - with QuietTest(self.sc): - with self.assertRaises(PySparkTypeError) as pe: - self.spark.createDataFrame(pd.DataFrame({"a": [[Row()]]}), "a: array<struct<>>") - - self.check_error( - exception=pe.exception, - error_class="UNSUPPORTED_DATA_TYPE_FOR_ARROW_VERSION", - message_parameters={"data_type": "Array of StructType"}, - ) - # Regression test for SPARK-23314 def test_timestamp_dst(self): # Daylight saving time for Los Angeles for 2015 is Sun, Nov 1 at 2:00 am --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org