This is an automated email from the ASF dual-hosted git repository.

ruifengz pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.5 by this push:
     new 13c8c8123b8 [SPARK-44731][PYTHON][CONNECT] Make TimestampNTZ works 
with literals in Python Spark Connect
13c8c8123b8 is described below

commit 13c8c8123b875f2fa4fa75caeaa74ce0a68b88ac
Author: Hyukjin Kwon <gurwls...@apache.org>
AuthorDate: Fri Aug 11 17:35:00 2023 +0800

    [SPARK-44731][PYTHON][CONNECT] Make TimestampNTZ works with literals in 
Python Spark Connect
    
    ### What changes were proposed in this pull request?
    
    This PR proposes:
    - Share the namespaces for `to_timestamp_ntz`, `to_timestamp_ltz` and 
`to_unix_timestamp` in Spark Connect. They were missed.
    - Adds the support of `TimestampNTZ` for literal handling in Python Spark 
Connect (by respecting `spark.sql.timestampType`).
    
    ### Why are the changes needed?
    
    For feature parity, and respect timestamp ntz in resampling in pandas API 
on Spark
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes, this virtually fixes the same bug: 
https://github.com/apache/spark/pull/42392 in Spark Connect with Python.
    
    ### How was this patch tested?
    
    Unittests reenabled.
    
    Closes #42445 from HyukjinKwon/SPARK-44731.
    
    Authored-by: Hyukjin Kwon <gurwls...@apache.org>
    Signed-off-by: Ruifeng Zheng <ruife...@apache.org>
    (cherry picked from commit 73b0376ec1527197c215495c7957efbe9d3bfab7)
    Signed-off-by: Ruifeng Zheng <ruife...@apache.org>
---
 python/pyspark/pandas/tests/connect/test_parity_resample.py |  4 +---
 python/pyspark/sql/connect/expressions.py                   |  3 +++
 python/pyspark/sql/functions.py                             | 12 ++++++++++++
 python/pyspark/sql/utils.py                                 | 13 +++++++++++--
 4 files changed, 27 insertions(+), 5 deletions(-)

diff --git a/python/pyspark/pandas/tests/connect/test_parity_resample.py 
b/python/pyspark/pandas/tests/connect/test_parity_resample.py
index d5c901f113a..caca2f957b5 100644
--- a/python/pyspark/pandas/tests/connect/test_parity_resample.py
+++ b/python/pyspark/pandas/tests/connect/test_parity_resample.py
@@ -30,9 +30,7 @@ class ResampleParityTests(
 class ResampleWithTimezoneTests(
     ResampleWithTimezoneMixin, PandasOnSparkTestUtils, TestUtils, 
ReusedConnectTestCase
 ):
-    @unittest.skip("SPARK-44731: Support 'spark.sql.timestampType' in Python 
Spark Connect client")
-    def test_series_resample_with_timezone(self):
-        super().test_series_resample_with_timezone()
+    pass
 
 
 if __name__ == "__main__":
diff --git a/python/pyspark/sql/connect/expressions.py 
b/python/pyspark/sql/connect/expressions.py
index 44e6e174f70..d0a9b1d69ae 100644
--- a/python/pyspark/sql/connect/expressions.py
+++ b/python/pyspark/sql/connect/expressions.py
@@ -15,6 +15,7 @@
 # limitations under the License.
 #
 from pyspark.sql.connect.utils import check_dependencies
+from pyspark.sql.utils import is_timestamp_ntz_preferred
 
 check_dependencies(__name__)
 
@@ -295,6 +296,8 @@ class LiteralExpression(Expression):
             return StringType()
         elif isinstance(value, decimal.Decimal):
             return DecimalType()
+        elif isinstance(value, datetime.datetime) and 
is_timestamp_ntz_preferred():
+            return TimestampNTZType()
         elif isinstance(value, datetime.datetime):
             return TimestampType()
         elif isinstance(value, datetime.date):
diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py
index b45e1daa0fd..0a8eccacbc0 100644
--- a/python/pyspark/sql/functions.py
+++ b/python/pyspark/sql/functions.py
@@ -7757,6 +7757,7 @@ def session_window(timeColumn: "ColumnOrName", 
gapDuration: Union[Column, str])
     return _invoke_function("session_window", time_col, gap_duration)
 
 
+@try_remote_functions
 def to_unix_timestamp(
     timestamp: "ColumnOrName",
     format: Optional["ColumnOrName"] = None,
@@ -7766,6 +7767,9 @@ def to_unix_timestamp(
 
     .. versionadded:: 3.5.0
 
+    .. versionchanged:: 3.5.0
+        Supports Spark Connect.
+
     Parameters
     ----------
     timestamp : :class:`~pyspark.sql.Column` or str
@@ -7793,6 +7797,7 @@ def to_unix_timestamp(
         return _invoke_function_over_columns("to_unix_timestamp", timestamp)
 
 
+@try_remote_functions
 def to_timestamp_ltz(
     timestamp: "ColumnOrName",
     format: Optional["ColumnOrName"] = None,
@@ -7803,6 +7808,9 @@ def to_timestamp_ltz(
 
     .. versionadded:: 3.5.0
 
+    .. versionchanged:: 3.5.0
+        Supports Spark Connect.
+
     Parameters
     ----------
     timestamp : :class:`~pyspark.sql.Column` or str
@@ -7830,6 +7838,7 @@ def to_timestamp_ltz(
         return _invoke_function_over_columns("to_timestamp_ltz", timestamp)
 
 
+@try_remote_functions
 def to_timestamp_ntz(
     timestamp: "ColumnOrName",
     format: Optional["ColumnOrName"] = None,
@@ -7840,6 +7849,9 @@ def to_timestamp_ntz(
 
     .. versionadded:: 3.5.0
 
+    .. versionchanged:: 3.5.0
+        Supports Spark Connect.
+
     Parameters
     ----------
     timestamp : :class:`~pyspark.sql.Column` or str
diff --git a/python/pyspark/sql/utils.py b/python/pyspark/sql/utils.py
index b72d8d9a7c8..61486d6a8c6 100644
--- a/python/pyspark/sql/utils.py
+++ b/python/pyspark/sql/utils.py
@@ -140,8 +140,17 @@ def is_timestamp_ntz_preferred() -> bool:
     """
     Return a bool if TimestampNTZType is preferred according to the SQL 
configuration set.
     """
-    jvm = SparkContext._jvm
-    return jvm is not None and jvm.PythonSQLUtils.isTimestampNTZPreferred()
+    if is_remote():
+        from pyspark.sql.connect.session import SparkSession as 
ConnectSparkSession
+
+        session = ConnectSparkSession.getActiveSession()
+        if session is None:
+            return False
+        else:
+            return session.conf.get("spark.sql.timestampType", None) == 
"TIMESTAMP_NTZ"
+    else:
+        jvm = SparkContext._jvm
+        return jvm is not None and jvm.PythonSQLUtils.isTimestampNTZPreferred()
 
 
 def is_remote() -> bool:


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to