HeartSaVioR commented on a change in pull request #35680:
URL: https://github.com/apache/spark/pull/35680#discussion_r837181916



##########
File path: 
sql/core/src/test/scala/org/apache/spark/sql/DataFrameSessionWindowingSuite.scala
##########
@@ -495,4 +495,56 @@ class DataFrameSessionWindowingSuite extends QueryTest 
with SharedSparkSession
       validateWindowColumnInSchema(schema2, "session")
     }
   }
+
+  test("SPARK-38349: No need to filter events when gapDuration greater than 
0") {
+    // negative gap duration
+    check("-5 seconds", true, "Need to filter events when gap duration less 
than 0")
+
+    // positive gap duration
+    check("5 seconds", false, "No need to filter events when gap duration 
greater than 0")
+
+    // invalid gap duration
+    check("x seconds", true, "Need to filter events when gap duration invalid")
+
+    // dynamic gap duration
+    check(when(col("time").equalTo("1"), "5 seconds")
+      .when(col("time").equalTo("2"), "10 seconds")
+      .otherwise("10 seconds"), true, "Need to filter events when gap duration 
dynamically")
+
+    def check(
+               gapDuration: Any,
+               expectTimeRange: Boolean,
+               assertHintMsg: String): Unit = {
+      val data = Seq(
+        ("2016-03-27 19:39:30", 1, "a")).toDF("time", "value", "id")
+      val df = if (gapDuration.isInstanceOf[String]) {

Review comment:
       seems OK since this is a simple test code

##########
File path: 
sql/core/src/test/scala/org/apache/spark/sql/DataFrameSessionWindowingSuite.scala
##########
@@ -495,4 +495,56 @@ class DataFrameSessionWindowingSuite extends QueryTest 
with SharedSparkSession
       validateWindowColumnInSchema(schema2, "session")
     }
   }
+
+  test("SPARK-38349: No need to filter events when gapDuration greater than 
0") {
+    // negative gap duration
+    check("-5 seconds", true, "Need to filter events when gap duration less 
than 0")
+
+    // positive gap duration
+    check("5 seconds", false, "No need to filter events when gap duration 
greater than 0")
+
+    // invalid gap duration
+    check("x seconds", true, "Need to filter events when gap duration invalid")
+
+    // dynamic gap duration
+    check(when(col("time").equalTo("1"), "5 seconds")
+      .when(col("time").equalTo("2"), "10 seconds")
+      .otherwise("10 seconds"), true, "Need to filter events when gap duration 
dynamically")
+
+    def check(
+               gapDuration: Any,
+               expectTimeRange: Boolean,
+               assertHintMsg: String): Unit = {
+      val data = Seq(
+        ("2016-03-27 19:39:30", 1, "a")).toDF("time", "value", "id")
+      val df = if (gapDuration.isInstanceOf[String]) {
+        data.groupBy(session_window($"time", gapDuration.asInstanceOf[String]))
+      } else {
+        data.groupBy(session_window($"time", gapDuration.asInstanceOf[Column]))
+      }
+      val aggregate = df.agg(count("*").as("counts"))
+        .select($"session_window.start".cast("string"), 
$"session_window.end".cast("string"),
+          $"counts")
+
+      checkFilterCondition(aggregate.queryExecution.logical, expectTimeRange, 
assertHintMsg)
+    }
+
+    def checkFilterCondition(
+                              logicalPlan: LogicalPlan,

Review comment:
       nit: indentation is off (4 spaces)

##########
File path: 
sql/core/src/test/scala/org/apache/spark/sql/DataFrameSessionWindowingSuite.scala
##########
@@ -495,4 +495,56 @@ class DataFrameSessionWindowingSuite extends QueryTest 
with SharedSparkSession
       validateWindowColumnInSchema(schema2, "session")
     }
   }
+
+  test("SPARK-38349: No need to filter events when gapDuration greater than 
0") {
+    // negative gap duration
+    check("-5 seconds", true, "Need to filter events when gap duration less 
than 0")
+
+    // positive gap duration
+    check("5 seconds", false, "No need to filter events when gap duration 
greater than 0")
+
+    // invalid gap duration
+    check("x seconds", true, "Need to filter events when gap duration invalid")
+
+    // dynamic gap duration
+    check(when(col("time").equalTo("1"), "5 seconds")
+      .when(col("time").equalTo("2"), "10 seconds")
+      .otherwise("10 seconds"), true, "Need to filter events when gap duration 
dynamically")
+
+    def check(
+               gapDuration: Any,

Review comment:
       nit: indentation is off (4 spaces)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to