This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 74d3d87171d [SPARK-45166][PYTHON] Clean up unused code paths for 
pyarrow<4
74d3d87171d is described below

commit 74d3d87171ddc6e648669bdb77ea476236bd6add
Author: Ruifeng Zheng <ruife...@apache.org>
AuthorDate: Fri Sep 15 16:13:26 2023 +0900

    [SPARK-45166][PYTHON] Clean up unused code paths for pyarrow<4
    
    ### What changes were proposed in this pull request?
    Clean up unused code paths for pyarrow<4
    
    ### Why are the changes needed?
    the minimum version had been upgraded to 4.0.0 in 
https://issues.apache.org/jira/browse/SPARK-44183
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    updated CI
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No
    
    Closes #42928 from zhengruifeng/py_pa_version.
    
    Authored-by: Ruifeng Zheng <ruife...@apache.org>
    Signed-off-by: Hyukjin Kwon <gurwls...@apache.org>
---
 python/pyspark/pandas/__init__.py                  |   9 +-
 python/pyspark/sql/pandas/types.py                 |  20 ----
 .../sql/tests/pandas/test_pandas_udf_scalar.py     |  12 +--
 python/pyspark/sql/tests/test_arrow.py             | 108 ++-------------------
 4 files changed, 13 insertions(+), 136 deletions(-)

diff --git a/python/pyspark/pandas/__init__.py 
b/python/pyspark/pandas/__init__.py
index d8ce385639c..44365c8e6b8 100644
--- a/python/pyspark/pandas/__init__.py
+++ b/python/pyspark/pandas/__init__.py
@@ -23,7 +23,6 @@
 import os
 import sys
 import warnings
-from distutils.version import LooseVersion
 from typing import Any
 
 from pyspark.pandas.missing.general_functions import 
MissingPandasLikeGeneralFunctions
@@ -40,13 +39,7 @@ except ImportError as e:
     else:
         raise
 
-
-import pyarrow
-
-if (
-    LooseVersion(pyarrow.__version__) >= LooseVersion("2.0.0")
-    and "PYARROW_IGNORE_TIMEZONE" not in os.environ
-):
+if "PYARROW_IGNORE_TIMEZONE" not in os.environ:
     warnings.warn(
         "'PYARROW_IGNORE_TIMEZONE' environment variable was not set. It is 
required to "
         "set this environment variable to '1' in both driver and executor 
sides if you use "
diff --git a/python/pyspark/sql/pandas/types.py 
b/python/pyspark/sql/pandas/types.py
index 54cd6fa7016..92e2ef1dc44 100644
--- a/python/pyspark/sql/pandas/types.py
+++ b/python/pyspark/sql/pandas/types.py
@@ -60,7 +60,6 @@ if TYPE_CHECKING:
 
 def to_arrow_type(dt: DataType) -> "pa.DataType":
     """Convert Spark data type to pyarrow type"""
-    from distutils.version import LooseVersion
     import pyarrow as pa
 
     if type(dt) == BooleanType:
@@ -93,21 +92,9 @@ def to_arrow_type(dt: DataType) -> "pa.DataType":
     elif type(dt) == DayTimeIntervalType:
         arrow_type = pa.duration("us")
     elif type(dt) == ArrayType:
-        if type(dt.elementType) == StructType and LooseVersion(pa.__version__) 
< LooseVersion(
-            "2.0.0"
-        ):
-            raise PySparkTypeError(
-                error_class="UNSUPPORTED_DATA_TYPE_FOR_ARROW_VERSION",
-                message_parameters={"data_type": "Array of StructType"},
-            )
         field = pa.field("element", to_arrow_type(dt.elementType), 
nullable=dt.containsNull)
         arrow_type = pa.list_(field)
     elif type(dt) == MapType:
-        if LooseVersion(pa.__version__) < LooseVersion("2.0.0"):
-            raise PySparkTypeError(
-                error_class="UNSUPPORTED_DATA_TYPE_FOR_ARROW_VERSION",
-                message_parameters={"data_type": "MapType"},
-            )
         key_field = pa.field("key", to_arrow_type(dt.keyType), nullable=False)
         value_field = pa.field("value", to_arrow_type(dt.valueType), 
nullable=dt.valueContainsNull)
         arrow_type = pa.map_(key_field, value_field)
@@ -142,8 +129,6 @@ def to_arrow_schema(schema: StructType) -> "pa.Schema":
 
 def from_arrow_type(at: "pa.DataType", prefer_timestamp_ntz: bool = False) -> 
DataType:
     """Convert pyarrow type to Spark data type."""
-    from distutils.version import LooseVersion
-    import pyarrow as pa
     import pyarrow.types as types
 
     spark_type: DataType
@@ -182,11 +167,6 @@ def from_arrow_type(at: "pa.DataType", 
prefer_timestamp_ntz: bool = False) -> Da
     elif types.is_list(at):
         spark_type = ArrayType(from_arrow_type(at.value_type, 
prefer_timestamp_ntz))
     elif types.is_map(at):
-        if LooseVersion(pa.__version__) < LooseVersion("2.0.0"):
-            raise PySparkTypeError(
-                error_class="UNSUPPORTED_DATA_TYPE_FOR_ARROW_VERSION",
-                message_parameters={"data_type": "MapType"},
-            )
         spark_type = MapType(
             from_arrow_type(at.key_type, prefer_timestamp_ntz),
             from_arrow_type(at.item_type, prefer_timestamp_ntz),
diff --git a/python/pyspark/sql/tests/pandas/test_pandas_udf_scalar.py 
b/python/pyspark/sql/tests/pandas/test_pandas_udf_scalar.py
index 8cb397ab95d..0e0018d80da 100644
--- a/python/pyspark/sql/tests/pandas/test_pandas_udf_scalar.py
+++ b/python/pyspark/sql/tests/pandas/test_pandas_udf_scalar.py
@@ -22,7 +22,6 @@ import time
 import unittest
 from datetime import date, datetime
 from decimal import Decimal
-from distutils.version import LooseVersion
 from typing import cast
 
 from pyspark import TaskContext
@@ -599,14 +598,9 @@ class ScalarPandasUDFTestsMixin:
         schema = StructType([StructField("map", MapType(StringType(), 
LongType()))])
         df = self.spark.createDataFrame(data, schema=schema)
         for udf_type in [PandasUDFType.SCALAR, PandasUDFType.SCALAR_ITER]:
-            if LooseVersion(pa.__version__) < LooseVersion("2.0.0"):
-                with QuietTest(self.sc):
-                    with self.assertRaisesRegex(Exception, "MapType.*not 
supported"):
-                        pandas_udf(lambda x: x, MapType(StringType(), 
LongType()), udf_type)
-            else:
-                map_f = pandas_udf(lambda x: x, MapType(StringType(), 
LongType()), udf_type)
-                result = df.select(map_f(col("map")))
-                self.assertEqual(df.collect(), result.collect())
+            map_f = pandas_udf(lambda x: x, MapType(StringType(), LongType()), 
udf_type)
+            result = df.select(map_f(col("map")))
+            self.assertEqual(df.collect(), result.collect())
 
     def test_vectorized_udf_complex(self):
         df = self.spark.range(10).select(
diff --git a/python/pyspark/sql/tests/test_arrow.py 
b/python/pyspark/sql/tests/test_arrow.py
index a30f9316401..28244b14385 100644
--- a/python/pyspark/sql/tests/test_arrow.py
+++ b/python/pyspark/sql/tests/test_arrow.py
@@ -20,8 +20,6 @@ import os
 import threading
 import time
 import unittest
-import warnings
-from distutils.version import LooseVersion
 from typing import cast
 from collections import namedtuple
 
@@ -195,46 +193,6 @@ class ArrowTestsMixin:
             + [np.array([[0.1, 0.1, 0.1], [0.2, 0.2, 0.2]]).astype(t) for t in 
float_dtypes]
         )
 
-    @unittest.skipIf(
-        not have_pyarrow or LooseVersion(pa.__version__) >= "2.0",
-        "will not fallback with pyarrow>=2.0",
-    )
-    def test_toPandas_fallback_enabled(self):
-        with 
self.sql_conf({"spark.sql.execution.arrow.pyspark.fallback.enabled": True}):
-            schema = StructType([StructField("a", ArrayType(StructType()), 
True)])
-            df = self.spark.createDataFrame([([Row()],)], schema=schema)
-            with QuietTest(self.sc):
-                with self.warnings_lock:
-                    with warnings.catch_warnings(record=True) as warns:
-                        # we want the warnings to appear even if this test is 
run from a subclass
-                        warnings.simplefilter("always")
-                        pdf = df.toPandas()
-                        # Catch and check the last UserWarning.
-                        user_warns = [
-                            warn.message for warn in warns if 
isinstance(warn.message, UserWarning)
-                        ]
-                        self.assertTrue(len(user_warns) > 0)
-                        self.assertTrue("Attempting non-optimization" in 
str(user_warns[-1]))
-                        assert_frame_equal(pdf, pd.DataFrame({"a": [[Row()]]}))
-
-    @unittest.skipIf(
-        not have_pyarrow or LooseVersion(pa.__version__) >= "2.0",
-        "will not fallback with pyarrow>=2.0",
-    )
-    def test_toPandas_fallback_disabled(self):
-        schema = StructType([StructField("a", ArrayType(StructType()), True)])
-        df = self.spark.createDataFrame([(None,)], schema=schema)
-        with QuietTest(self.sc):
-            with self.warnings_lock:
-                with self.assertRaises(PySparkTypeError) as pe:
-                    df.toPandas()
-
-                self.check_error(
-                    exception=pe.exception,
-                    error_class="UNSUPPORTED_DATA_TYPE_FOR_ARROW_VERSION",
-                    message_parameters={"data_type": "Array of StructType"},
-                )
-
     def test_toPandas_empty_df_arrow_enabled(self):
         for arrow_enabled in [True, False]:
             with self.subTest(arrow_enabled=arrow_enabled):
@@ -654,17 +612,13 @@ class ArrowTestsMixin:
         ):
             with self.subTest(schema=schema):
                 with 
self.sql_conf({"spark.sql.execution.arrow.pyspark.enabled": arrow_enabled}):
-                    if arrow_enabled and LooseVersion(pa.__version__) < 
LooseVersion("2.0.0"):
-                        with self.assertRaisesRegex(Exception, 
"MapType.*only.*pyarrow 2.0.0"):
-                            self.spark.createDataFrame(pdf, 
schema=schema).collect()
-                    else:
-                        df = self.spark.createDataFrame(pdf, schema=schema)
+                    df = self.spark.createDataFrame(pdf, schema=schema)
 
-                        result = df.collect()
+                    result = df.collect()
 
-                        for row in result:
-                            i, m = row
-                            self.assertEqual(m, map_data[i])
+                    for row in result:
+                        i, m = row
+                        self.assertEqual(m, map_data[i])
 
     def test_createDataFrame_with_struct_type(self):
         for arrow_enabled in [True, False]:
@@ -732,12 +686,8 @@ class ArrowTestsMixin:
                 df = self.spark.createDataFrame(origin, schema=schema)
 
             with self.sql_conf({"spark.sql.execution.arrow.pyspark.enabled": 
arrow_enabled}):
-                if arrow_enabled and LooseVersion(pa.__version__) < 
LooseVersion("2.0.0"):
-                    with self.assertRaisesRegex(Exception, 
"MapType.*only.*pyarrow 2.0.0"):
-                        df.toPandas()
-                else:
-                    pdf = df.toPandas()
-                    assert_frame_equal(origin, pdf)
+                pdf = df.toPandas()
+                assert_frame_equal(origin, pdf)
 
     def test_toPandas_with_map_type_nulls(self):
         with QuietTest(self.sc):
@@ -758,12 +708,8 @@ class ArrowTestsMixin:
                 df = self.spark.createDataFrame(origin, schema=schema)
 
             with self.sql_conf({"spark.sql.execution.arrow.pyspark.enabled": 
arrow_enabled}):
-                if arrow_enabled and LooseVersion(pa.__version__) < 
LooseVersion("2.0.0"):
-                    with self.assertRaisesRegex(Exception, 
"MapType.*only.*pyarrow 2.0.0"):
-                        df.toPandas()
-                else:
-                    pdf = df.toPandas()
-                    assert_frame_equal(origin, pdf)
+                pdf = df.toPandas()
+                assert_frame_equal(origin, pdf)
 
     def test_createDataFrame_with_int_col_names(self):
         for arrow_enabled in [True, False]:
@@ -779,42 +725,6 @@ class ArrowTestsMixin:
         pdf_col_names = [str(c) for c in pdf.columns]
         self.assertEqual(pdf_col_names, df.columns)
 
-    @unittest.skipIf(
-        not have_pyarrow or LooseVersion(pa.__version__) >= "2.0",
-        "will not fallback with pyarrow>=2.0",
-    )
-    def test_createDataFrame_fallback_enabled(self):
-        with QuietTest(self.sc):
-            with 
self.sql_conf({"spark.sql.execution.arrow.pyspark.fallback.enabled": True}):
-                with warnings.catch_warnings(record=True) as warns:
-                    # we want the warnings to appear even if this test is run 
from a subclass
-                    warnings.simplefilter("always")
-                    df = self.spark.createDataFrame(
-                        pd.DataFrame({"a": [[Row()]]}), "a: array<struct<>>"
-                    )
-                    # Catch and check the last UserWarning.
-                    user_warns = [
-                        warn.message for warn in warns if 
isinstance(warn.message, UserWarning)
-                    ]
-                    self.assertTrue(len(user_warns) > 0)
-                    self.assertTrue("Attempting non-optimization" in 
str(user_warns[-1]))
-                    self.assertEqual(df.collect(), [Row(a=[Row()])])
-
-    @unittest.skipIf(
-        not have_pyarrow or LooseVersion(pa.__version__) >= "2.0",
-        "will not fallback with pyarrow>=2.0",
-    )
-    def test_createDataFrame_fallback_disabled(self):
-        with QuietTest(self.sc):
-            with self.assertRaises(PySparkTypeError) as pe:
-                self.spark.createDataFrame(pd.DataFrame({"a": [[Row()]]}), "a: 
array<struct<>>")
-
-            self.check_error(
-                exception=pe.exception,
-                error_class="UNSUPPORTED_DATA_TYPE_FOR_ARROW_VERSION",
-                message_parameters={"data_type": "Array of StructType"},
-            )
-
     # Regression test for SPARK-23314
     def test_timestamp_dst(self):
         # Daylight saving time for Los Angeles for 2015 is Sun, Nov 1 at 2:00 
am


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to