HeartSaVioR commented on code in PR #38288:
URL: https://github.com/apache/spark/pull/38288#discussion_r1002572937
##########
sql/core/src/test/scala/org/apache/spark/sql/DataFrameTimeWindowingSuite.scala:
##########
@@ -575,4 +575,64 @@ class DataFrameTimeWindowingSuite extends QueryTest with
SharedSparkSession {
validateWindowColumnInSchema(schema2, "window")
}
}
+
+ test("window_time function on raw window column") {
+ val df = Seq(
+ ("2016-03-27 19:38:18"), ("2016-03-27 19:39:25")
+ ).toDF("time")
+
+ checkAnswer(
+ df.select(window($"time", "10 seconds").as("window"))
+ .select(
+ $"window.end".cast("string"),
+ window_time($"window").cast("string")
+ ),
+ Seq(
+ Row("2016-03-27 19:38:20", "2016-03-27 19:38:19.999999"),
+ Row("2016-03-27 19:39:30", "2016-03-27 19:39:29.999999")
+ )
+ )
+ }
+
+ test("2 window_time functions on raw window column") {
Review Comment:
The actual test code which fails due to the rule is following:
```
test("2 window_time functions on raw window column") {
val df = Seq(
("2016-03-27 19:38:18"), ("2016-03-27 19:39:25")
).toDF("time")
val df2 = df
.withColumn("time2", expr("time - INTERVAL 5 minutes"))
.select(window($"time", "10 seconds", "5 seconds").as("window1"),
$"time2")
.select($"window1", window($"time2", "10 seconds", "5
seconds").as("window2"))
/*
unresolved operator 'Project [window1#10.end AS end#19,
unresolvedalias(window_time(window1#10),
Some(org.apache.spark.sql.Column$$Lambda$1637/93974967@5d7dd549)),
window2#15.end AS end#20, unresolvedalias(window_time(window2#15),
Some(org.apache.spark.sql.Column$$Lambda$1637/93974967@5d7dd549))];
'Project [window1#10.end AS end#19,
unresolvedalias(window_time(window1#10),
Some(org.apache.spark.sql.Column$$Lambda$1637/93974967@5d7dd549)),
window2#15.end AS end#20, unresolvedalias(window_time(window2#15),
Some(org.apache.spark.sql.Column$$Lambda$1637/93974967@5d7dd549))]
+- Project [window1#10, window#16 AS window2#15]
+- Filter isnotnull(cast(time2#6 as timestamp))
+- Expand [[named_struct(start,
precisetimestampconversion(((precisetimestampconversion(cast(time2#6 as
timestamp), TimestampType, LongType) -
(((precisetimestampconversion(cast(time2#6 as timestamp), TimestampType,
LongType) - 0) + 5000000) % 5000000)) - 0), LongType, TimestampType), end,
precisetimestampconversion((((precisetimestampconversion(cast(time2#6 as
timestamp), TimestampType, LongType) -
(((precisetimestampconversion(cast(time2#6 as timestamp), TimestampType,
LongType) - 0) + 5000000) % 5000000)) - 0) + 10000000), LongType,
TimestampType)), window1#10, time2#6], [named_struct(start,
precisetimestampconversion(((precisetimestampconversion(cast(time2#6 as
timestamp), TimestampType, LongType) -
(((precisetimestampconversion(cast(time2#6 as timestamp), TimestampType,
LongType) - 0) + 5000000) % 5000000)) - 5000000), LongType, TimestampType),
end, precisetimestampconversion((((precisetimestampconversion(cast(time2#6 as
timestamp), TimestampType, LongType) - (((p
recisetimestampconversion(cast(time2#6 as timestamp), TimestampType, LongType)
- 0) + 5000000) % 5000000)) - 5000000) + 10000000), LongType, TimestampType)),
window1#10, time2#6]], [window#16, window1#10, time2#6]
+- Project [window#11 AS window1#10, time2#6]
+- Filter isnotnull(cast(time#4 as timestamp))
+- Expand [[named_struct(start,
precisetimestampconversion(((precisetimestampconversion(cast(time#4 as
timestamp), TimestampType, LongType) -
(((precisetimestampconversion(cast(time#4 as timestamp), TimestampType,
LongType) - 0) + 5000000) % 5000000)) - 0), LongType, TimestampType), end,
precisetimestampconversion((((precisetimestampconversion(cast(time#4 as
timestamp), TimestampType, LongType) -
(((precisetimestampconversion(cast(time#4 as timestamp), TimestampType,
LongType) - 0) + 5000000) % 5000000)) - 0) + 10000000), LongType,
TimestampType)), time#4, time2#6], [named_struct(start,
precisetimestampconversion(((precisetimestampconversion(cast(time#4 as
timestamp), TimestampType, LongType) -
(((precisetimestampconversion(cast(time#4 as timestamp), TimestampType,
LongType) - 0) + 5000000) % 5000000)) - 5000000), LongType, TimestampType),
end, precisetimestampconversion((((precisetimestampconversion(cast(time#4 as
timestamp), TimestampType, LongType) - (((pre
cisetimestampconversion(cast(time#4 as timestamp), TimestampType, LongType) -
0) + 5000000) % 5000000)) - 5000000) + 10000000), LongType, TimestampType)),
time#4, time2#6]], [window#11, time#4, time2#6]
+- Project [time#4, cast(time#4 - INTERVAL '05'
MINUTE as string) AS time2#6]
+- Project [value#1 AS time#4]
+- LocalRelation [value#1]
*/
df2.select(
$"window1.end",
window_time($"window1"),
$"window2.end",
window_time($"window2")
)
}
```
The reason the above test case fails with unresolved operator is that we do
not resolve the two window_time calls with different windows. If we fix the
rule to allow multiple window_time calls with different windows, it should just
work.
Btw, this code leads to cartesian product of window"s", but passes the
unsupported operation checker whereas you'll hit unsupported operation checker
if you place it in a single select. Spark's unsupported operator is rule based
and not that smart to capture all possibilities.
That said, Spark can handle the cartesian product of window"s". The
unsupported operation checker is more restrict than what Spark can actually do.
##########
sql/core/src/test/scala/org/apache/spark/sql/DataFrameTimeWindowingSuite.scala:
##########
@@ -575,4 +575,64 @@ class DataFrameTimeWindowingSuite extends QueryTest with
SharedSparkSession {
validateWindowColumnInSchema(schema2, "window")
}
}
+
+ test("window_time function on raw window column") {
+ val df = Seq(
+ ("2016-03-27 19:38:18"), ("2016-03-27 19:39:25")
+ ).toDF("time")
+
+ checkAnswer(
+ df.select(window($"time", "10 seconds").as("window"))
+ .select(
+ $"window.end".cast("string"),
+ window_time($"window").cast("string")
+ ),
+ Seq(
+ Row("2016-03-27 19:38:20", "2016-03-27 19:38:19.999999"),
+ Row("2016-03-27 19:39:30", "2016-03-27 19:39:29.999999")
+ )
+ )
+ }
+
+ test("2 window_time functions on raw window column") {
Review Comment:
The actual test code which fails due to the rule is following:
```
test("2 window_time functions on raw window column") {
val df = Seq(
("2016-03-27 19:38:18"), ("2016-03-27 19:39:25")
).toDF("time")
val df2 = df
.withColumn("time2", expr("time - INTERVAL 5 minutes"))
.select(window($"time", "10 seconds", "5 seconds").as("window1"),
$"time2")
.select($"window1", window($"time2", "10 seconds", "5
seconds").as("window2"))
/*
unresolved operator 'Project [window1#10.end AS end#19,
unresolvedalias(window_time(window1#10),
Some(org.apache.spark.sql.Column$$Lambda$1637/93974967@5d7dd549)),
window2#15.end AS end#20, unresolvedalias(window_time(window2#15),
Some(org.apache.spark.sql.Column$$Lambda$1637/93974967@5d7dd549))];
'Project [window1#10.end AS end#19,
unresolvedalias(window_time(window1#10),
Some(org.apache.spark.sql.Column$$Lambda$1637/93974967@5d7dd549)),
window2#15.end AS end#20, unresolvedalias(window_time(window2#15),
Some(org.apache.spark.sql.Column$$Lambda$1637/93974967@5d7dd549))]
+- Project [window1#10, window#16 AS window2#15]
+- Filter isnotnull(cast(time2#6 as timestamp))
+- Expand [[named_struct(start,
precisetimestampconversion(((precisetimestampconversion(cast(time2#6 as
timestamp), TimestampType, LongType) -
(((precisetimestampconversion(cast(time2#6 as timestamp), TimestampType,
LongType) - 0) + 5000000) % 5000000)) - 0), LongType, TimestampType), end,
precisetimestampconversion((((precisetimestampconversion(cast(time2#6 as
timestamp), TimestampType, LongType) -
(((precisetimestampconversion(cast(time2#6 as timestamp), TimestampType,
LongType) - 0) + 5000000) % 5000000)) - 0) + 10000000), LongType,
TimestampType)), window1#10, time2#6], [named_struct(start,
precisetimestampconversion(((precisetimestampconversion(cast(time2#6 as
timestamp), TimestampType, LongType) -
(((precisetimestampconversion(cast(time2#6 as timestamp), TimestampType,
LongType) - 0) + 5000000) % 5000000)) - 5000000), LongType, TimestampType),
end, precisetimestampconversion((((precisetimestampconversion(cast(time2#6 as
timestamp), TimestampType, LongType) - (((p
recisetimestampconversion(cast(time2#6 as timestamp), TimestampType, LongType)
- 0) + 5000000) % 5000000)) - 5000000) + 10000000), LongType, TimestampType)),
window1#10, time2#6]], [window#16, window1#10, time2#6]
+- Project [window#11 AS window1#10, time2#6]
+- Filter isnotnull(cast(time#4 as timestamp))
+- Expand [[named_struct(start,
precisetimestampconversion(((precisetimestampconversion(cast(time#4 as
timestamp), TimestampType, LongType) -
(((precisetimestampconversion(cast(time#4 as timestamp), TimestampType,
LongType) - 0) + 5000000) % 5000000)) - 0), LongType, TimestampType), end,
precisetimestampconversion((((precisetimestampconversion(cast(time#4 as
timestamp), TimestampType, LongType) -
(((precisetimestampconversion(cast(time#4 as timestamp), TimestampType,
LongType) - 0) + 5000000) % 5000000)) - 0) + 10000000), LongType,
TimestampType)), time#4, time2#6], [named_struct(start,
precisetimestampconversion(((precisetimestampconversion(cast(time#4 as
timestamp), TimestampType, LongType) -
(((precisetimestampconversion(cast(time#4 as timestamp), TimestampType,
LongType) - 0) + 5000000) % 5000000)) - 5000000), LongType, TimestampType),
end, precisetimestampconversion((((precisetimestampconversion(cast(time#4 as
timestamp), TimestampType, LongType) - (((pre
cisetimestampconversion(cast(time#4 as timestamp), TimestampType, LongType) -
0) + 5000000) % 5000000)) - 5000000) + 10000000), LongType, TimestampType)),
time#4, time2#6]], [window#11, time#4, time2#6]
+- Project [time#4, cast(time#4 - INTERVAL '05'
MINUTE as string) AS time2#6]
+- Project [value#1 AS time#4]
+- LocalRelation [value#1]
*/
df2.select(
$"window1.end",
window_time($"window1"),
$"window2.end",
window_time($"window2")
)
}
```
The reason the above test case fails with unresolved operator is that we do
not resolve the two window_time calls with different windows. If we fix the
rule to allow multiple window_time calls with different windows, it should just
work.
Btw, this code leads to cartesian product of window"s", but passes the
unsupported operation checker whereas you'll hit unsupported operation checker
if you place it in a single select. Spark's unsupported operator is rule based
and not that smart to capture all possibilities.
That said, Spark can handle the cartesian product of window"s". The
unsupported operation checker is more restricted than what Spark can actually
do.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]