Github user HyukjinKwon commented on a diff in the pull request:
https://github.com/apache/spark/pull/19607#discussion_r153167040
--- Diff: python/pyspark/sql/tests.py ---
@@ -3683,6 +3808,47 @@ def check_records_per_batch(x):
else:
self.spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", orig_value)
+ def test_vectorized_udf_timestamps_respect_session_timezone(self):
+ from pyspark.sql.functions import pandas_udf, col
+ from datetime import datetime
+ import pandas as pd
+ schema = StructType([
+ StructField("idx", LongType(), True),
+ StructField("timestamp", TimestampType(), True)])
+ data = [(1, datetime(1969, 1, 1, 1, 1, 1)),
+ (2, datetime(2012, 2, 2, 2, 2, 2)),
+ (3, None),
+ (4, datetime(2100, 3, 3, 3, 3, 3))]
+ df = self.spark.createDataFrame(data, schema=schema)
+
+ f_timestamp_copy = pandas_udf(lambda ts: ts, TimestampType())
+ internal_value = pandas_udf(
+ lambda ts: ts.apply(lambda ts: ts.value if ts is not pd.NaT
else None), LongType())
+
+ orig_tz = self.spark.conf.get("spark.sql.session.timeZone")
+ try:
+ timezone = "America/New_York"
+ self.spark.conf.set("spark.sql.session.timeZone", timezone)
+
self.spark.conf.set("spark.sql.execution.pandas.respectSessionTimeZone",
"false")
+ try:
+ df_la = df.withColumn("tscopy",
f_timestamp_copy(col("timestamp"))) \
+ .withColumn("internal_value",
internal_value(col("timestamp")))
+ result_la = df_la.select(col("idx"),
col("internal_value")).collect()
+ diff = 3 * 60 * 60 * 1000 * 1000 * 1000
--- End diff --
Here too. it took me a while to check where this 3 came from ..
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]