cloud-fan commented on a change in pull request #28593:
URL: https://github.com/apache/spark/pull/28593#discussion_r436653299



##########
File path: python/pyspark/sql/functions.py
##########
@@ -1427,6 +1427,19 @@ def to_utc_timestamp(timestamp, tz):
     return 
Column(sc._jvm.functions.to_utc_timestamp(_to_java_column(timestamp), tz))
 
 
+@since(3.1)
+def timestamp_seconds(col):

Review comment:
       I vaguely remeber that pyspark can access scala SQL functions 
automatically. cc @HyukjinKwon 

##########
File path: python/pyspark/sql/functions.py
##########
@@ -1427,6 +1427,19 @@ def to_utc_timestamp(timestamp, tz):
     return 
Column(sc._jvm.functions.to_utc_timestamp(_to_java_column(timestamp), tz))
 
 
+@since(3.1)
+def timestamp_seconds(col):

Review comment:
       I vaguely remember that pyspark can access scala SQL functions 
automatically. cc @HyukjinKwon 

##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala
##########
@@ -411,19 +411,48 @@ abstract class NumberToTimestampBase extends 
UnaryExpression
 
   protected def upScaleFactor: Long
 
-  override def inputTypes: Seq[AbstractDataType] = Seq(IntegralType)
+  override def inputTypes: Seq[AbstractDataType] = Seq(NumericType)
 
   override def dataType: DataType = TimestampType
 
   override def nullSafeEval(input: Any): Any = {
-    Math.multiplyExact(input.asInstanceOf[Number].longValue(), upScaleFactor)
+      child.dataType match {
+        case ByteType | ShortType | IntegerType | LongType =>
+          Math.multiplyExact(input.asInstanceOf[Number].longValue(), 
upScaleFactor).longValue()
+        case DecimalType() =>

Review comment:
       we shouldn't support these fraction types. Users should add manual cast 
if they want to convert fraction numbers to timestamp.

##########
File path: python/pyspark/sql/dataframe.py
##########
@@ -534,7 +534,8 @@ def withWatermark(self, eventTime, delayThreshold):
 
         .. note:: Evolving
 
-        >>> sdf.select('name', 
sdf.time.cast('timestamp')).withWatermark('time', '10 minutes')
+        >>> from pyspark.sql.functions import timestamp_seconds
+        >>> sdf.select('name', 
timestamp_seconds($"time")).withWatermark('time', '10 minutes')

Review comment:
       does `$"time"` work in pyspark?

##########
File path: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CastSuite.scala
##########
@@ -376,29 +409,46 @@ abstract class CastSuiteBase extends SparkFunSuite with 
ExpressionEvalHelper {
     checkEvaluation(cast(ts, LongType), 15.toLong)
     checkEvaluation(cast(ts, FloatType), 15.003f)
     checkEvaluation(cast(ts, DoubleType), 15.003)
-    checkEvaluation(cast(cast(tss, ShortType), TimestampType),
-      DateTimeUtils.fromJavaTimestamp(ts) * MILLIS_PER_SECOND)
-    checkEvaluation(cast(cast(tss, IntegerType), TimestampType),
-      DateTimeUtils.fromJavaTimestamp(ts) * MILLIS_PER_SECOND)
-    checkEvaluation(cast(cast(tss, LongType), TimestampType),
-      DateTimeUtils.fromJavaTimestamp(ts) * MILLIS_PER_SECOND)
-    checkEvaluation(
-      cast(cast(millis.toFloat / MILLIS_PER_SECOND, TimestampType), FloatType),
-      millis.toFloat / MILLIS_PER_SECOND)
-    checkEvaluation(
-      cast(cast(millis.toDouble / MILLIS_PER_SECOND, TimestampType), 
DoubleType),
-      millis.toDouble / MILLIS_PER_SECOND)
-    checkEvaluation(
-      cast(cast(Decimal(1), TimestampType), DecimalType.SYSTEM_DEFAULT),
-      Decimal(1))
 
-    // A test for higher precision than millis
-    checkEvaluation(cast(cast(0.000001, TimestampType), DoubleType), 0.000001)
+    withSQLConf(SQLConf.LEGACY_ALLOW_CAST_NUMERIC_TO_TIMESTAMP.key -> "true") {
+      checkEvaluation(cast(cast(tss, ShortType), TimestampType),
+        DateTimeUtils.fromJavaTimestamp(ts) * MILLIS_PER_SECOND)
+      checkEvaluation(cast(cast(tss, IntegerType), TimestampType),
+        DateTimeUtils.fromJavaTimestamp(ts) * MILLIS_PER_SECOND)
+      checkEvaluation(cast(cast(tss, LongType), TimestampType),
+        DateTimeUtils.fromJavaTimestamp(ts) * MILLIS_PER_SECOND)
+      checkEvaluation(
+        cast(cast(millis.toFloat / MILLIS_PER_SECOND, TimestampType), 
FloatType),
+        millis.toFloat / MILLIS_PER_SECOND)
+      checkEvaluation(
+        cast(cast(millis.toDouble / MILLIS_PER_SECOND, TimestampType), 
DoubleType),
+        millis.toDouble / MILLIS_PER_SECOND)
+      checkEvaluation(
+        cast(cast(Decimal(1), TimestampType), DecimalType.SYSTEM_DEFAULT),
+        Decimal(1))
+
+      // A test for higher precision than millis
+      checkEvaluation(cast(cast(0.000001, TimestampType), DoubleType), 
0.000001)
+
+      checkEvaluation(cast(Double.NaN, TimestampType), null)
+      checkEvaluation(cast(1.0 / 0.0, TimestampType), null)
+      checkEvaluation(cast(Float.NaN, TimestampType), null)
+      checkEvaluation(cast(1.0f / 0.0f, TimestampType), null)
+    }
 
-    checkEvaluation(cast(Double.NaN, TimestampType), null)
-    checkEvaluation(cast(1.0 / 0.0, TimestampType), null)
-    checkEvaluation(cast(Float.NaN, TimestampType), null)
-    checkEvaluation(cast(1.0f / 0.0f, TimestampType), null)
+    withSQLConf(SQLConf.LEGACY_ALLOW_CAST_NUMERIC_TO_TIMESTAMP.key -> "false") 
{
+      assert(!cast(cast(tss, ShortType), TimestampType).resolved)

Review comment:
       can we have a dedicated test case to explicitly test the new cast type 
check? Then we don't need to repeat the existing test cases with the config set 
to false.

##########
File path: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CastSuite.scala
##########
@@ -376,29 +409,46 @@ abstract class CastSuiteBase extends SparkFunSuite with 
ExpressionEvalHelper {
     checkEvaluation(cast(ts, LongType), 15.toLong)
     checkEvaluation(cast(ts, FloatType), 15.003f)
     checkEvaluation(cast(ts, DoubleType), 15.003)
-    checkEvaluation(cast(cast(tss, ShortType), TimestampType),
-      DateTimeUtils.fromJavaTimestamp(ts) * MILLIS_PER_SECOND)
-    checkEvaluation(cast(cast(tss, IntegerType), TimestampType),
-      DateTimeUtils.fromJavaTimestamp(ts) * MILLIS_PER_SECOND)
-    checkEvaluation(cast(cast(tss, LongType), TimestampType),
-      DateTimeUtils.fromJavaTimestamp(ts) * MILLIS_PER_SECOND)
-    checkEvaluation(
-      cast(cast(millis.toFloat / MILLIS_PER_SECOND, TimestampType), FloatType),
-      millis.toFloat / MILLIS_PER_SECOND)
-    checkEvaluation(
-      cast(cast(millis.toDouble / MILLIS_PER_SECOND, TimestampType), 
DoubleType),
-      millis.toDouble / MILLIS_PER_SECOND)
-    checkEvaluation(
-      cast(cast(Decimal(1), TimestampType), DecimalType.SYSTEM_DEFAULT),
-      Decimal(1))
 
-    // A test for higher precision than millis
-    checkEvaluation(cast(cast(0.000001, TimestampType), DoubleType), 0.000001)
+    withSQLConf(SQLConf.LEGACY_ALLOW_CAST_NUMERIC_TO_TIMESTAMP.key -> "true") {
+      checkEvaluation(cast(cast(tss, ShortType), TimestampType),
+        DateTimeUtils.fromJavaTimestamp(ts) * MILLIS_PER_SECOND)
+      checkEvaluation(cast(cast(tss, IntegerType), TimestampType),
+        DateTimeUtils.fromJavaTimestamp(ts) * MILLIS_PER_SECOND)
+      checkEvaluation(cast(cast(tss, LongType), TimestampType),
+        DateTimeUtils.fromJavaTimestamp(ts) * MILLIS_PER_SECOND)
+      checkEvaluation(
+        cast(cast(millis.toFloat / MILLIS_PER_SECOND, TimestampType), 
FloatType),
+        millis.toFloat / MILLIS_PER_SECOND)
+      checkEvaluation(
+        cast(cast(millis.toDouble / MILLIS_PER_SECOND, TimestampType), 
DoubleType),
+        millis.toDouble / MILLIS_PER_SECOND)
+      checkEvaluation(
+        cast(cast(Decimal(1), TimestampType), DecimalType.SYSTEM_DEFAULT),
+        Decimal(1))
+
+      // A test for higher precision than millis
+      checkEvaluation(cast(cast(0.000001, TimestampType), DoubleType), 
0.000001)
+
+      checkEvaluation(cast(Double.NaN, TimestampType), null)
+      checkEvaluation(cast(1.0 / 0.0, TimestampType), null)
+      checkEvaluation(cast(Float.NaN, TimestampType), null)
+      checkEvaluation(cast(1.0f / 0.0f, TimestampType), null)
+    }
 
-    checkEvaluation(cast(Double.NaN, TimestampType), null)
-    checkEvaluation(cast(1.0 / 0.0, TimestampType), null)
-    checkEvaluation(cast(Float.NaN, TimestampType), null)
-    checkEvaluation(cast(1.0f / 0.0f, TimestampType), null)
+    withSQLConf(SQLConf.LEGACY_ALLOW_CAST_NUMERIC_TO_TIMESTAMP.key -> "false") 
{
+      assert(!cast(cast(tss, ShortType), TimestampType).resolved)

Review comment:
       ah we already have. Then let's just remove these duplicated test cases.

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/functions.scala
##########
@@ -3358,6 +3358,17 @@ object functions {
     window(timeColumn, windowDuration, windowDuration, "0 second")
   }
 
+   /**

Review comment:
       can you follow the indentation of other methods in this file?

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/functions.scala
##########
@@ -3358,6 +3358,17 @@ object functions {
     window(timeColumn, windowDuration, windowDuration, "0 second")
   }
 
+   /**
+    * Creates timestamp from the number of seconds since UTC epoch.",
+    * @group = "datetime_funcs",
+    * @since = "3.1.0")
+    */
+  def timestamp_seconds(e: Column): Column = withExpr { 
SecondsToTimestamp(e.expr) }
+
+  def array_contains1(column: Column, value: Any): Column = withExpr {

Review comment:
       why do we add this?

##########
File path: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
##########
@@ -58,13 +60,16 @@ class HiveQuerySuite extends HiveComparisonTest with 
SQLTestUtils with BeforeAnd
     TestHive.setCacheTables(true)
     // Ensures that cross joins are enabled so that we can test them
     TestHive.setConf(SQLConf.CROSS_JOINS_ENABLED, true)
+    TestHive.setConf(SQLConf.LEGACY_ALLOW_CAST_NUMERIC_TO_TIMESTAMP, true)

Review comment:
       how many tests fail without this config in this suite?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to