cloud-fan commented on a change in pull request #28593:
URL: https://github.com/apache/spark/pull/28593#discussion_r436653299
##########
File path: python/pyspark/sql/functions.py
##########
@@ -1427,6 +1427,19 @@ def to_utc_timestamp(timestamp, tz):
return
Column(sc._jvm.functions.to_utc_timestamp(_to_java_column(timestamp), tz))
+@since(3.1)
+def timestamp_seconds(col):
Review comment:
I vaguely remeber that pyspark can access scala SQL functions
automatically. cc @HyukjinKwon
##########
File path: python/pyspark/sql/functions.py
##########
@@ -1427,6 +1427,19 @@ def to_utc_timestamp(timestamp, tz):
return
Column(sc._jvm.functions.to_utc_timestamp(_to_java_column(timestamp), tz))
+@since(3.1)
+def timestamp_seconds(col):
Review comment:
I vaguely remember that pyspark can access scala SQL functions
automatically. cc @HyukjinKwon
##########
File path:
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala
##########
@@ -411,19 +411,48 @@ abstract class NumberToTimestampBase extends
UnaryExpression
protected def upScaleFactor: Long
- override def inputTypes: Seq[AbstractDataType] = Seq(IntegralType)
+ override def inputTypes: Seq[AbstractDataType] = Seq(NumericType)
override def dataType: DataType = TimestampType
override def nullSafeEval(input: Any): Any = {
- Math.multiplyExact(input.asInstanceOf[Number].longValue(), upScaleFactor)
+ child.dataType match {
+ case ByteType | ShortType | IntegerType | LongType =>
+ Math.multiplyExact(input.asInstanceOf[Number].longValue(),
upScaleFactor).longValue()
+ case DecimalType() =>
Review comment:
we shouldn't support these fraction types. Users should add manual cast
if they want to convert fraction numbers to timestamp.
##########
File path: python/pyspark/sql/dataframe.py
##########
@@ -534,7 +534,8 @@ def withWatermark(self, eventTime, delayThreshold):
.. note:: Evolving
- >>> sdf.select('name',
sdf.time.cast('timestamp')).withWatermark('time', '10 minutes')
+ >>> from pyspark.sql.functions import timestamp_seconds
+ >>> sdf.select('name',
timestamp_seconds($"time")).withWatermark('time', '10 minutes')
Review comment:
does `$"time"` work in pyspark?
##########
File path:
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CastSuite.scala
##########
@@ -376,29 +409,46 @@ abstract class CastSuiteBase extends SparkFunSuite with
ExpressionEvalHelper {
checkEvaluation(cast(ts, LongType), 15.toLong)
checkEvaluation(cast(ts, FloatType), 15.003f)
checkEvaluation(cast(ts, DoubleType), 15.003)
- checkEvaluation(cast(cast(tss, ShortType), TimestampType),
- DateTimeUtils.fromJavaTimestamp(ts) * MILLIS_PER_SECOND)
- checkEvaluation(cast(cast(tss, IntegerType), TimestampType),
- DateTimeUtils.fromJavaTimestamp(ts) * MILLIS_PER_SECOND)
- checkEvaluation(cast(cast(tss, LongType), TimestampType),
- DateTimeUtils.fromJavaTimestamp(ts) * MILLIS_PER_SECOND)
- checkEvaluation(
- cast(cast(millis.toFloat / MILLIS_PER_SECOND, TimestampType), FloatType),
- millis.toFloat / MILLIS_PER_SECOND)
- checkEvaluation(
- cast(cast(millis.toDouble / MILLIS_PER_SECOND, TimestampType),
DoubleType),
- millis.toDouble / MILLIS_PER_SECOND)
- checkEvaluation(
- cast(cast(Decimal(1), TimestampType), DecimalType.SYSTEM_DEFAULT),
- Decimal(1))
- // A test for higher precision than millis
- checkEvaluation(cast(cast(0.000001, TimestampType), DoubleType), 0.000001)
+ withSQLConf(SQLConf.LEGACY_ALLOW_CAST_NUMERIC_TO_TIMESTAMP.key -> "true") {
+ checkEvaluation(cast(cast(tss, ShortType), TimestampType),
+ DateTimeUtils.fromJavaTimestamp(ts) * MILLIS_PER_SECOND)
+ checkEvaluation(cast(cast(tss, IntegerType), TimestampType),
+ DateTimeUtils.fromJavaTimestamp(ts) * MILLIS_PER_SECOND)
+ checkEvaluation(cast(cast(tss, LongType), TimestampType),
+ DateTimeUtils.fromJavaTimestamp(ts) * MILLIS_PER_SECOND)
+ checkEvaluation(
+ cast(cast(millis.toFloat / MILLIS_PER_SECOND, TimestampType),
FloatType),
+ millis.toFloat / MILLIS_PER_SECOND)
+ checkEvaluation(
+ cast(cast(millis.toDouble / MILLIS_PER_SECOND, TimestampType),
DoubleType),
+ millis.toDouble / MILLIS_PER_SECOND)
+ checkEvaluation(
+ cast(cast(Decimal(1), TimestampType), DecimalType.SYSTEM_DEFAULT),
+ Decimal(1))
+
+ // A test for higher precision than millis
+ checkEvaluation(cast(cast(0.000001, TimestampType), DoubleType),
0.000001)
+
+ checkEvaluation(cast(Double.NaN, TimestampType), null)
+ checkEvaluation(cast(1.0 / 0.0, TimestampType), null)
+ checkEvaluation(cast(Float.NaN, TimestampType), null)
+ checkEvaluation(cast(1.0f / 0.0f, TimestampType), null)
+ }
- checkEvaluation(cast(Double.NaN, TimestampType), null)
- checkEvaluation(cast(1.0 / 0.0, TimestampType), null)
- checkEvaluation(cast(Float.NaN, TimestampType), null)
- checkEvaluation(cast(1.0f / 0.0f, TimestampType), null)
+ withSQLConf(SQLConf.LEGACY_ALLOW_CAST_NUMERIC_TO_TIMESTAMP.key -> "false")
{
+ assert(!cast(cast(tss, ShortType), TimestampType).resolved)
Review comment:
can we have a dedicated test case to explicitly test the new cast type
check? Then we don't need to repeat the existing test cases with the config set
to false.
##########
File path:
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CastSuite.scala
##########
@@ -376,29 +409,46 @@ abstract class CastSuiteBase extends SparkFunSuite with
ExpressionEvalHelper {
checkEvaluation(cast(ts, LongType), 15.toLong)
checkEvaluation(cast(ts, FloatType), 15.003f)
checkEvaluation(cast(ts, DoubleType), 15.003)
- checkEvaluation(cast(cast(tss, ShortType), TimestampType),
- DateTimeUtils.fromJavaTimestamp(ts) * MILLIS_PER_SECOND)
- checkEvaluation(cast(cast(tss, IntegerType), TimestampType),
- DateTimeUtils.fromJavaTimestamp(ts) * MILLIS_PER_SECOND)
- checkEvaluation(cast(cast(tss, LongType), TimestampType),
- DateTimeUtils.fromJavaTimestamp(ts) * MILLIS_PER_SECOND)
- checkEvaluation(
- cast(cast(millis.toFloat / MILLIS_PER_SECOND, TimestampType), FloatType),
- millis.toFloat / MILLIS_PER_SECOND)
- checkEvaluation(
- cast(cast(millis.toDouble / MILLIS_PER_SECOND, TimestampType),
DoubleType),
- millis.toDouble / MILLIS_PER_SECOND)
- checkEvaluation(
- cast(cast(Decimal(1), TimestampType), DecimalType.SYSTEM_DEFAULT),
- Decimal(1))
- // A test for higher precision than millis
- checkEvaluation(cast(cast(0.000001, TimestampType), DoubleType), 0.000001)
+ withSQLConf(SQLConf.LEGACY_ALLOW_CAST_NUMERIC_TO_TIMESTAMP.key -> "true") {
+ checkEvaluation(cast(cast(tss, ShortType), TimestampType),
+ DateTimeUtils.fromJavaTimestamp(ts) * MILLIS_PER_SECOND)
+ checkEvaluation(cast(cast(tss, IntegerType), TimestampType),
+ DateTimeUtils.fromJavaTimestamp(ts) * MILLIS_PER_SECOND)
+ checkEvaluation(cast(cast(tss, LongType), TimestampType),
+ DateTimeUtils.fromJavaTimestamp(ts) * MILLIS_PER_SECOND)
+ checkEvaluation(
+ cast(cast(millis.toFloat / MILLIS_PER_SECOND, TimestampType),
FloatType),
+ millis.toFloat / MILLIS_PER_SECOND)
+ checkEvaluation(
+ cast(cast(millis.toDouble / MILLIS_PER_SECOND, TimestampType),
DoubleType),
+ millis.toDouble / MILLIS_PER_SECOND)
+ checkEvaluation(
+ cast(cast(Decimal(1), TimestampType), DecimalType.SYSTEM_DEFAULT),
+ Decimal(1))
+
+ // A test for higher precision than millis
+ checkEvaluation(cast(cast(0.000001, TimestampType), DoubleType),
0.000001)
+
+ checkEvaluation(cast(Double.NaN, TimestampType), null)
+ checkEvaluation(cast(1.0 / 0.0, TimestampType), null)
+ checkEvaluation(cast(Float.NaN, TimestampType), null)
+ checkEvaluation(cast(1.0f / 0.0f, TimestampType), null)
+ }
- checkEvaluation(cast(Double.NaN, TimestampType), null)
- checkEvaluation(cast(1.0 / 0.0, TimestampType), null)
- checkEvaluation(cast(Float.NaN, TimestampType), null)
- checkEvaluation(cast(1.0f / 0.0f, TimestampType), null)
+ withSQLConf(SQLConf.LEGACY_ALLOW_CAST_NUMERIC_TO_TIMESTAMP.key -> "false")
{
+ assert(!cast(cast(tss, ShortType), TimestampType).resolved)
Review comment:
ah we already have. Then let's just remove these duplicated test cases.
##########
File path: sql/core/src/main/scala/org/apache/spark/sql/functions.scala
##########
@@ -3358,6 +3358,17 @@ object functions {
window(timeColumn, windowDuration, windowDuration, "0 second")
}
+ /**
Review comment:
can you follow the indentation of other methods in this file?
##########
File path: sql/core/src/main/scala/org/apache/spark/sql/functions.scala
##########
@@ -3358,6 +3358,17 @@ object functions {
window(timeColumn, windowDuration, windowDuration, "0 second")
}
+ /**
+ * Creates timestamp from the number of seconds since UTC epoch.",
+ * @group = "datetime_funcs",
+ * @since = "3.1.0")
+ */
+ def timestamp_seconds(e: Column): Column = withExpr {
SecondsToTimestamp(e.expr) }
+
+ def array_contains1(column: Column, value: Any): Column = withExpr {
Review comment:
why do we add this?
##########
File path:
sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
##########
@@ -58,13 +60,16 @@ class HiveQuerySuite extends HiveComparisonTest with
SQLTestUtils with BeforeAnd
TestHive.setCacheTables(true)
// Ensures that cross joins are enabled so that we can test them
TestHive.setConf(SQLConf.CROSS_JOINS_ENABLED, true)
+ TestHive.setConf(SQLConf.LEGACY_ALLOW_CAST_NUMERIC_TO_TIMESTAMP, true)
Review comment:
how many tests fail without this config in this suite?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]