HeartSaVioR commented on code in PR #38288:
URL: https://github.com/apache/spark/pull/38288#discussion_r1002572937


##########
sql/core/src/test/scala/org/apache/spark/sql/DataFrameTimeWindowingSuite.scala:
##########
@@ -575,4 +575,64 @@ class DataFrameTimeWindowingSuite extends QueryTest with 
SharedSparkSession {
       validateWindowColumnInSchema(schema2, "window")
     }
   }
+
+  test("window_time function on raw window column") {
+    val df = Seq(
+      ("2016-03-27 19:38:18"), ("2016-03-27 19:39:25")
+    ).toDF("time")
+
+    checkAnswer(
+      df.select(window($"time", "10 seconds").as("window"))
+        .select(
+          $"window.end".cast("string"),
+          window_time($"window").cast("string")
+        ),
+      Seq(
+        Row("2016-03-27 19:38:20", "2016-03-27 19:38:19.999999"),
+        Row("2016-03-27 19:39:30", "2016-03-27 19:39:29.999999")
+      )
+    )
+  }
+
+  test("2 window_time functions on raw window column") {

Review Comment:
   The actual test code which fails due to the rule is following:
   
   ```
     test("2 window_time functions on raw window column") {
       val df = Seq(
         ("2016-03-27 19:38:18"), ("2016-03-27 19:39:25")
       ).toDF("time")
   
       val df2 = df
         .withColumn("time2", expr("time - INTERVAL 5 minutes"))
         .select(window($"time", "10 seconds", "5 seconds").as("window1"), 
$"time2")
         .select($"window1", window($"time2", "10 seconds", "5 
seconds").as("window2"))
   
       /*
         unresolved operator 'Project [window1#10.end AS end#19, 
unresolvedalias(window_time(window1#10), 
Some(org.apache.spark.sql.Column$$Lambda$1637/93974967@5d7dd549)), 
window2#15.end AS end#20, unresolvedalias(window_time(window2#15), 
Some(org.apache.spark.sql.Column$$Lambda$1637/93974967@5d7dd549))];
         'Project [window1#10.end AS end#19, 
unresolvedalias(window_time(window1#10), 
Some(org.apache.spark.sql.Column$$Lambda$1637/93974967@5d7dd549)), 
window2#15.end AS end#20, unresolvedalias(window_time(window2#15), 
Some(org.apache.spark.sql.Column$$Lambda$1637/93974967@5d7dd549))]
         +- Project [window1#10, window#16 AS window2#15]
            +- Filter isnotnull(cast(time2#6 as timestamp))
               +- Expand [[named_struct(start, 
precisetimestampconversion(((precisetimestampconversion(cast(time2#6 as 
timestamp), TimestampType, LongType) - 
(((precisetimestampconversion(cast(time2#6 as timestamp), TimestampType, 
LongType) - 0) + 5000000) % 5000000)) - 0), LongType, TimestampType), end, 
precisetimestampconversion((((precisetimestampconversion(cast(time2#6 as 
timestamp), TimestampType, LongType) - 
(((precisetimestampconversion(cast(time2#6 as timestamp), TimestampType, 
LongType) - 0) + 5000000) % 5000000)) - 0) + 10000000), LongType, 
TimestampType)), window1#10, time2#6], [named_struct(start, 
precisetimestampconversion(((precisetimestampconversion(cast(time2#6 as 
timestamp), TimestampType, LongType) - 
(((precisetimestampconversion(cast(time2#6 as timestamp), TimestampType, 
LongType) - 0) + 5000000) % 5000000)) - 5000000), LongType, TimestampType), 
end, precisetimestampconversion((((precisetimestampconversion(cast(time2#6 as 
timestamp), TimestampType, LongType) - (((p
 recisetimestampconversion(cast(time2#6 as timestamp), TimestampType, LongType) 
- 0) + 5000000) % 5000000)) - 5000000) + 10000000), LongType, TimestampType)), 
window1#10, time2#6]], [window#16, window1#10, time2#6]
                  +- Project [window#11 AS window1#10, time2#6]
                     +- Filter isnotnull(cast(time#4 as timestamp))
                        +- Expand [[named_struct(start, 
precisetimestampconversion(((precisetimestampconversion(cast(time#4 as 
timestamp), TimestampType, LongType) - 
(((precisetimestampconversion(cast(time#4 as timestamp), TimestampType, 
LongType) - 0) + 5000000) % 5000000)) - 0), LongType, TimestampType), end, 
precisetimestampconversion((((precisetimestampconversion(cast(time#4 as 
timestamp), TimestampType, LongType) - 
(((precisetimestampconversion(cast(time#4 as timestamp), TimestampType, 
LongType) - 0) + 5000000) % 5000000)) - 0) + 10000000), LongType, 
TimestampType)), time#4, time2#6], [named_struct(start, 
precisetimestampconversion(((precisetimestampconversion(cast(time#4 as 
timestamp), TimestampType, LongType) - 
(((precisetimestampconversion(cast(time#4 as timestamp), TimestampType, 
LongType) - 0) + 5000000) % 5000000)) - 5000000), LongType, TimestampType), 
end, precisetimestampconversion((((precisetimestampconversion(cast(time#4 as 
timestamp), TimestampType, LongType) - (((pre
 cisetimestampconversion(cast(time#4 as timestamp), TimestampType, LongType) - 
0) + 5000000) % 5000000)) - 5000000) + 10000000), LongType, TimestampType)), 
time#4, time2#6]], [window#11, time#4, time2#6]
                           +- Project [time#4, cast(time#4 - INTERVAL '05' 
MINUTE as string) AS time2#6]
                              +- Project [value#1 AS time#4]
                                 +- LocalRelation [value#1]
        */
       df2.select(
         $"window1.end",
         window_time($"window1"),
         $"window2.end",
         window_time($"window2")
       )
     }
   ```
   
   The reason the above test case fails with unresolved operator is that we do 
not resolve the two window_time calls with different windows. If we fix the 
rule to allow multiple window_time calls with different windows, it should just 
work.
   
   Btw, this code leads to cartesian product of window"s", but passes the 
unsupported operation checker whereas you'll hit unsupported operation checker 
if you place it in a single select. Spark's unsupported operator is rule based 
and not that smart to capture all possibilities. 
   
   That said, Spark can handle the cartesian product of window"s". The 
unsupported operation checker is more restrict than what Spark can actually do.



##########
sql/core/src/test/scala/org/apache/spark/sql/DataFrameTimeWindowingSuite.scala:
##########
@@ -575,4 +575,64 @@ class DataFrameTimeWindowingSuite extends QueryTest with 
SharedSparkSession {
       validateWindowColumnInSchema(schema2, "window")
     }
   }
+
+  test("window_time function on raw window column") {
+    val df = Seq(
+      ("2016-03-27 19:38:18"), ("2016-03-27 19:39:25")
+    ).toDF("time")
+
+    checkAnswer(
+      df.select(window($"time", "10 seconds").as("window"))
+        .select(
+          $"window.end".cast("string"),
+          window_time($"window").cast("string")
+        ),
+      Seq(
+        Row("2016-03-27 19:38:20", "2016-03-27 19:38:19.999999"),
+        Row("2016-03-27 19:39:30", "2016-03-27 19:39:29.999999")
+      )
+    )
+  }
+
+  test("2 window_time functions on raw window column") {

Review Comment:
   The actual test code which fails due to the rule is following:
   
   ```
     test("2 window_time functions on raw window column") {
       val df = Seq(
         ("2016-03-27 19:38:18"), ("2016-03-27 19:39:25")
       ).toDF("time")
   
       val df2 = df
         .withColumn("time2", expr("time - INTERVAL 5 minutes"))
         .select(window($"time", "10 seconds", "5 seconds").as("window1"), 
$"time2")
         .select($"window1", window($"time2", "10 seconds", "5 
seconds").as("window2"))
   
       /*
         unresolved operator 'Project [window1#10.end AS end#19, 
unresolvedalias(window_time(window1#10), 
Some(org.apache.spark.sql.Column$$Lambda$1637/93974967@5d7dd549)), 
window2#15.end AS end#20, unresolvedalias(window_time(window2#15), 
Some(org.apache.spark.sql.Column$$Lambda$1637/93974967@5d7dd549))];
         'Project [window1#10.end AS end#19, 
unresolvedalias(window_time(window1#10), 
Some(org.apache.spark.sql.Column$$Lambda$1637/93974967@5d7dd549)), 
window2#15.end AS end#20, unresolvedalias(window_time(window2#15), 
Some(org.apache.spark.sql.Column$$Lambda$1637/93974967@5d7dd549))]
         +- Project [window1#10, window#16 AS window2#15]
            +- Filter isnotnull(cast(time2#6 as timestamp))
               +- Expand [[named_struct(start, 
precisetimestampconversion(((precisetimestampconversion(cast(time2#6 as 
timestamp), TimestampType, LongType) - 
(((precisetimestampconversion(cast(time2#6 as timestamp), TimestampType, 
LongType) - 0) + 5000000) % 5000000)) - 0), LongType, TimestampType), end, 
precisetimestampconversion((((precisetimestampconversion(cast(time2#6 as 
timestamp), TimestampType, LongType) - 
(((precisetimestampconversion(cast(time2#6 as timestamp), TimestampType, 
LongType) - 0) + 5000000) % 5000000)) - 0) + 10000000), LongType, 
TimestampType)), window1#10, time2#6], [named_struct(start, 
precisetimestampconversion(((precisetimestampconversion(cast(time2#6 as 
timestamp), TimestampType, LongType) - 
(((precisetimestampconversion(cast(time2#6 as timestamp), TimestampType, 
LongType) - 0) + 5000000) % 5000000)) - 5000000), LongType, TimestampType), 
end, precisetimestampconversion((((precisetimestampconversion(cast(time2#6 as 
timestamp), TimestampType, LongType) - (((p
 recisetimestampconversion(cast(time2#6 as timestamp), TimestampType, LongType) 
- 0) + 5000000) % 5000000)) - 5000000) + 10000000), LongType, TimestampType)), 
window1#10, time2#6]], [window#16, window1#10, time2#6]
                  +- Project [window#11 AS window1#10, time2#6]
                     +- Filter isnotnull(cast(time#4 as timestamp))
                        +- Expand [[named_struct(start, 
precisetimestampconversion(((precisetimestampconversion(cast(time#4 as 
timestamp), TimestampType, LongType) - 
(((precisetimestampconversion(cast(time#4 as timestamp), TimestampType, 
LongType) - 0) + 5000000) % 5000000)) - 0), LongType, TimestampType), end, 
precisetimestampconversion((((precisetimestampconversion(cast(time#4 as 
timestamp), TimestampType, LongType) - 
(((precisetimestampconversion(cast(time#4 as timestamp), TimestampType, 
LongType) - 0) + 5000000) % 5000000)) - 0) + 10000000), LongType, 
TimestampType)), time#4, time2#6], [named_struct(start, 
precisetimestampconversion(((precisetimestampconversion(cast(time#4 as 
timestamp), TimestampType, LongType) - 
(((precisetimestampconversion(cast(time#4 as timestamp), TimestampType, 
LongType) - 0) + 5000000) % 5000000)) - 5000000), LongType, TimestampType), 
end, precisetimestampconversion((((precisetimestampconversion(cast(time#4 as 
timestamp), TimestampType, LongType) - (((pre
 cisetimestampconversion(cast(time#4 as timestamp), TimestampType, LongType) - 
0) + 5000000) % 5000000)) - 5000000) + 10000000), LongType, TimestampType)), 
time#4, time2#6]], [window#11, time#4, time2#6]
                           +- Project [time#4, cast(time#4 - INTERVAL '05' 
MINUTE as string) AS time2#6]
                              +- Project [value#1 AS time#4]
                                 +- LocalRelation [value#1]
        */
       df2.select(
         $"window1.end",
         window_time($"window1"),
         $"window2.end",
         window_time($"window2")
       )
     }
   ```
   
   The reason the above test case fails with unresolved operator is that we do 
not resolve the two window_time calls with different windows. If we fix the 
rule to allow multiple window_time calls with different windows, it should just 
work.
   
   Btw, this code leads to cartesian product of window"s", but passes the 
unsupported operation checker whereas you'll hit unsupported operation checker 
if you place it in a single select. Spark's unsupported operator is rule based 
and not that smart to capture all possibilities. 
   
   That said, Spark can handle the cartesian product of window"s". The 
unsupported operation checker is more restricted than what Spark can actually 
do.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to