neilramaswamy commented on code in PR #48297:
URL: https://github.com/apache/spark/pull/48297#discussion_r1796403387


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala:
##########
@@ -668,13 +668,38 @@ case class StreamingSymmetricHashJoinExec(
       private val iteratorNotEmpty: Boolean = super.hasNext
 
       override def completion(): Unit = {
-        val isLeftSemiWithMatch =
-          joinType == LeftSemi && joinSide == LeftSide && iteratorNotEmpty
-        // Add to state store only if both removal predicates do not match,
-        // and the row is not matched for left side of left semi join.
-        val shouldAddToState =
-          !stateKeyWatermarkPredicateFunc(key) && 
!stateValueWatermarkPredicateFunc(thisRow) &&
-          !isLeftSemiWithMatch
+        // The criteria of whether the input has to be added into state store 
or not:
+        // - Left side: input can be skipped to be added to the state store if 
it's already matched
+        //   and the join type is left semi.
+        //   For other cases, the input should be added, including the case 
it's going to be evicted
+        //   in this batch. It hasn't yet evaluated with inputs from right 
side "for this batch".

Review Comment:
   super nit: "for this batch" doesn't need to be in quotes. 



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala:
##########
@@ -668,13 +668,38 @@ case class StreamingSymmetricHashJoinExec(
       private val iteratorNotEmpty: Boolean = super.hasNext
 
       override def completion(): Unit = {
-        val isLeftSemiWithMatch =
-          joinType == LeftSemi && joinSide == LeftSide && iteratorNotEmpty
-        // Add to state store only if both removal predicates do not match,
-        // and the row is not matched for left side of left semi join.
-        val shouldAddToState =
-          !stateKeyWatermarkPredicateFunc(key) && 
!stateValueWatermarkPredicateFunc(thisRow) &&
-          !isLeftSemiWithMatch
+        // The criteria of whether the input has to be added into state store 
or not:
+        // - Left side: input can be skipped to be added to the state store if 
it's already matched
+        //   and the join type is left semi.
+        //   For other cases, the input should be added, including the case 
it's going to be evicted
+        //   in this batch. It hasn't yet evaluated with inputs from right 
side "for this batch".
+        //   Refer to the classdoc of SteramingSymmetricHashJoinExec about how 
stream-stream join
+        //   works.
+        // - Right side: for this side, the evaluation with inputs from left 
side "for this batch"

Review Comment:
   ditto



##########
sql/core/src/test/scala/org/apache/spark/sql/streaming/MultiStatefulOperatorsSuite.scala:
##########
@@ -878,6 +878,59 @@ class MultiStatefulOperatorsSuite
     testOutputWatermarkInJoin(join3, input1, -40L * 1000 - 1)
   }
 
+  test("SPARK-49829 time window agg per each source followed by stream-stream 
join") {
+    val inputStream1 = MemoryStream[Long]
+    val inputStream2 = MemoryStream[Long]
+
+    val df1 = inputStream1.toDF()
+      .selectExpr("value", "timestamp_seconds(value) AS ts")
+      .withWatermark("ts", "5 seconds")
+
+    val df2 = inputStream2.toDF()
+      .selectExpr("value", "timestamp_seconds(value) AS ts")
+      .withWatermark("ts", "5 seconds")
+
+    val df1Window = df1.groupBy(
+      window($"ts", "10 seconds")
+    ).agg(sum("value").as("sum_df1"))
+
+    val df2Window = df2.groupBy(
+      window($"ts", "10 seconds")
+    ).agg(sum("value").as("sum_df2"))
+
+    val joined = df1Window.join(df2Window, "window", "inner")
+      .selectExpr("CAST(window.end AS long) AS window_end", "sum_df1", 
"sum_df2")
+
+    // The test verifies the case where both sides produce input as time 
window (append mode)
+    // for stream-stream join having join condition for equality of time 
window.
+    // Inputs are produced into stream-stream join when the time windows are 
completed, meaning
+    // they will be evicted in this batch for stream-stream join as well. 
(NOTE: join condition
+    // does not delay the state watermark in stream-stream join).
+    // Before SPARK-49829, left side does not add the input to state store if 
it's going to evict
+    // in this batch, which breaks the match between input from left side and 
input from right
+    // side "for this batch".

Review Comment:
   Can you comment below which records specifically were getting dropped 
_before_ your changes?



##########
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala:
##########
@@ -1966,4 +2046,125 @@ class StreamingLeftSemiJoinSuite extends 
StreamingJoinSuite {
       assertNumStateRows(total = 9, updated = 4)
     )
   }
+
+  test("SPARK-49829 two chained stream-stream left outer joins among three 
input streams") {

Review Comment:
   Can you also comment on this test which record was previously being dropped? 



##########
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala:
##########
@@ -1559,6 +1581,64 @@ class StreamingOuterJoinSuite extends StreamingJoinSuite 
{
       )
     }
   }
+
+  test("SPARK-49829 left-outer join, input being unmatched is between WM for 
late event and " +
+    "WM for eviction") {
+
+    withTempDir { checkpoint =>
+      // This config needs to be set, otherwise no-data batch will be 
triggered and after
+      // no-data batch, WM for late event and WM for eviction would be same.
+      withSQLConf(SQLConf.STREAMING_NO_DATA_MICRO_BATCHES_ENABLED.key -> 
"false") {
+        val memoryStream1 = MemoryStream[(String, Int)]
+        val memoryStream2 = MemoryStream[(String, Int)]
+
+        val data1 = memoryStream1.toDF()
+          .selectExpr("_1 AS key", "timestamp_seconds(_2) AS eventTime")
+          .withWatermark("eventTime", "0 seconds")
+        val data2 = memoryStream2.toDF()
+          .selectExpr("_1 AS key", "timestamp_seconds(_2) AS eventTime")
+          .withWatermark("eventTime", "0 seconds")
+
+        val joinedDf = data1.join(data2, Seq("key", "eventTime"), "leftOuter")
+          .selectExpr("key", "CAST(eventTime AS long) AS eventTime")
+
+        def assertLeftRows(expected: Seq[Row]): AssertOnQuery = {
+          assertStateStoreRows(0L, "left", expected) { df =>
+            df.selectExpr("value.key", "CAST(value.eventTime AS long)")
+          }
+        }
+
+        def assertRightRows(expected: Seq[Row]): AssertOnQuery = {
+          assertStateStoreRows(0L, "right", expected) { df =>
+            df.selectExpr("value.key", "CAST(value.eventTime AS long)")
+          }
+        }
+
+        testStream(joinedDf)(
+          StartStream(checkpointLocation = checkpoint.getCanonicalPath),
+          // batch 0
+          // WM: late record = 0, eviction = 0
+          MultiAddData(
+            (memoryStream1, Seq(("a", 1), ("b", 2))),
+            (memoryStream2, Seq(("b", 2), ("c", 1)))
+          ),
+          CheckNewAnswer(("b", 2)),
+          assertLeftRows(Seq(Row("a", 1), Row("b", 2))),
+          assertRightRows(Seq(Row("b", 2), Row("c", 1))),
+          // batch 1
+          // WM: late record = 0, eviction = 2
+          // Before Spark introduces multiple stateful operator, WM for late 
record was same as
+          // WM for eviction, hence ("d", 1) was treated as late record.
+          // With the multiple state operator, ("d", 1) is added in batch 1 
but also evicted in
+          // batch 1. Before SPARK-49829, this wasn't producing unmatched row, 
and it is fixed.

Review Comment:
   The key thing here is that 1 is less than the state watermark. The state 
watermark is 2, since we are joining on event-time directly and the watermark 
is 0. Thus, state watermark = eviction watermark - 0 = eviction watermark.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to