HeartSaVioR commented on code in PR #48297:
URL: https://github.com/apache/spark/pull/48297#discussion_r1796530144


##########
sql/core/src/test/scala/org/apache/spark/sql/streaming/MultiStatefulOperatorsSuite.scala:
##########
@@ -878,6 +878,59 @@ class MultiStatefulOperatorsSuite
     testOutputWatermarkInJoin(join3, input1, -40L * 1000 - 1)
   }
 
+  test("SPARK-49829 time window agg per each source followed by stream-stream 
join") {
+    val inputStream1 = MemoryStream[Long]
+    val inputStream2 = MemoryStream[Long]
+
+    val df1 = inputStream1.toDF()
+      .selectExpr("value", "timestamp_seconds(value) AS ts")
+      .withWatermark("ts", "5 seconds")
+
+    val df2 = inputStream2.toDF()
+      .selectExpr("value", "timestamp_seconds(value) AS ts")
+      .withWatermark("ts", "5 seconds")
+
+    val df1Window = df1.groupBy(
+      window($"ts", "10 seconds")
+    ).agg(sum("value").as("sum_df1"))
+
+    val df2Window = df2.groupBy(
+      window($"ts", "10 seconds")
+    ).agg(sum("value").as("sum_df2"))
+
+    val joined = df1Window.join(df2Window, "window", "inner")
+      .selectExpr("CAST(window.end AS long) AS window_end", "sum_df1", 
"sum_df2")
+
+    // The test verifies the case where both sides produce input as time 
window (append mode)
+    // for stream-stream join having join condition for equality of time 
window.
+    // Inputs are produced into stream-stream join when the time windows are 
completed, meaning
+    // they will be evicted in this batch for stream-stream join as well. 
(NOTE: join condition
+    // does not delay the state watermark in stream-stream join).
+    // Before SPARK-49829, left side does not add the input to state store if 
it's going to evict
+    // in this batch, which breaks the match between input from left side and 
input from right
+    // side "for this batch".

Review Comment:
   I forgot it - I'll run the test with master and update the code comment.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to