HeartSaVioR commented on a change in pull request #33691:
URL: https://github.com/apache/spark/pull/33691#discussion_r688192567



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/functions.scala
##########
@@ -3661,6 +3661,36 @@ object functions {
     }.as("session_window")
   }
 
+  /**
+   * Generates session window given a timestamp specifying column.
+   *
+   * Session window is one of dynamic windows, which means the length of 
window is varying
+   * according to the given inputs. The length of session window is defined as 
"the timestamp

Review comment:
       Same here.

##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SessionWindow.scala
##########
@@ -17,52 +17,38 @@
 
 package org.apache.spark.sql.catalyst.expressions
 
-import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
-import org.apache.spark.sql.catalyst.analysis.TypeCheckResult.TypeCheckFailure
+import org.apache.spark.sql.catalyst.util.IntervalUtils
 import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.types.UTF8String
 
 /**
  * Represent the session window.
  *
  * @param timeColumn the start time of session window
  * @param gapDuration the duration of session gap, meaning the session will 
close if there is
- *                    no new element appeared within "the last element in 
session + gap".
+ *                    no new element appeared within "the last element in 
session + gap". Besides

Review comment:
       Let's update here as well according to the updates on guide doc.

##########
File path: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingSessionWindowSuite.scala
##########
@@ -256,6 +256,102 @@ class StreamingSessionWindowSuite extends StreamTest
     )
   }
 
+  testWithAllOptions("SPARK-36465: dynamic gap duration") {
+    val inputData = MemoryStream[(String, Long)]
+
+    val udf = spark.udf.register("gapDuration", (s: String) => {
+      if (s == "hello") {
+        "1 second"
+      } else if (s == "structured") {
+        // zero gap duration will be filtered out from aggregation
+        "0 second"
+      } else if (s == "world") {
+        // negative gap duration will be filtered out from aggregation
+        "-10 seconds"
+      } else {
+        "10 seconds"
+      }
+    })
+
+    val sessionUpdates = sessionWindowQuery(inputData,
+      session_window($"eventTime", udf($"sessionId")))
+
+    testStream(sessionUpdates, OutputMode.Append())(
+      AddData(inputData,
+        ("hello world spark streaming", 40L),
+        ("world hello structured streaming", 41L)
+      ),
+
+      // watermark: 11
+      // current sessions
+      // ("hello", 40, 42, 2, 2),
+      // ("streaming", 40, 51, 11, 2),
+      // ("spark", 40, 50, 10, 1),
+      CheckNewAnswer(
+      ),
+
+      // placing new sessions "before" previous sessions
+      AddData(inputData, ("spark streaming", 25L)),
+      // watermark: 11
+      // current sessions
+      // ("spark", 25, 35, 10, 1),
+      // ("streaming", 25, 35, 10, 1),
+      // ("hello", 40, 42, 2, 2),
+      // ("streaming", 40, 51, 11, 2),
+      // ("spark", 40, 50, 10, 1),
+      CheckNewAnswer(
+      ),
+
+      // late event which session's end 10 would be later than watermark 11: 
should be dropped
+      AddData(inputData, ("spark streaming", 0L)),
+      // watermark: 11
+      // current sessions
+      // ("spark", 25, 35, 10, 1),
+      // ("streaming", 25, 35, 10, 1),
+      // ("hello", 40, 42, 2, 2),
+      // ("streaming", 40, 51, 11, 2),
+      // ("spark", 40, 50, 10, 1),
+      CheckNewAnswer(
+      ),
+
+      // concatenating multiple previous sessions into one
+      AddData(inputData, ("spark streaming", 30L)),
+      // watermark: 11
+      // current sessions
+      // ("spark", 25, 50, 25, 3),
+      // ("streaming", 25, 51, 26, 4),
+      // ("hello", 40, 51, 11, 2),

Review comment:
       `("hello", 40, 42, 2, 2)`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to