HeartSaVioR commented on code in PR #48297:
URL: https://github.com/apache/spark/pull/48297#discussion_r1805716763


##########
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala:
##########
@@ -1966,4 +2047,128 @@ class StreamingLeftSemiJoinSuite extends 
StreamingJoinSuite {
       assertNumStateRows(total = 9, updated = 4)
     )
   }
+
+  test("SPARK-49829 two chained stream-stream left outer joins among three 
input streams") {
+    withSQLConf(SQLConf.STREAMING_NO_DATA_MICRO_BATCHES_ENABLED.key -> 
"false") {
+      val memoryStream1 = MemoryStream[(Long, Int)]
+      val memoryStream2 = MemoryStream[(Long, Int)]
+      val memoryStream3 = MemoryStream[(Long, Int)]
+
+      val data1 = memoryStream1.toDF()
+        .selectExpr("timestamp_seconds(_1) AS eventTime", "_2 AS v1")
+        .withWatermark("eventTime", "0 seconds")
+      val data2 = memoryStream2.toDF()
+        .selectExpr("timestamp_seconds(_1) AS eventTime", "_2 AS v2")
+        .withWatermark("eventTime", "0 seconds")
+      val data3 = memoryStream3.toDF()
+        .selectExpr("timestamp_seconds(_1) AS eventTime", "_2 AS v3")
+        .withWatermark("eventTime", "0 seconds")
+
+      val join = data1
+        .join(data2, Seq("eventTime"), "leftOuter")
+        .join(data3, Seq("eventTime"), "leftOuter")
+        .selectExpr("CAST(eventTime AS long) AS eventTime", "v1", "v2", "v3")
+
+      def assertLeftRowsFor1stJoin(expected: Seq[Row]): AssertOnQuery = {
+        assertStateStoreRows(1L, "left", expected) { df =>
+          df.selectExpr("CAST(value.eventTime AS long)", "value.v1")
+        }
+      }
+
+      def assertRightRowsFor1stJoin(expected: Seq[Row]): AssertOnQuery = {
+        assertStateStoreRows(1L, "right", expected) { df =>
+          df.selectExpr("CAST(value.eventTime AS long)", "value.v2")
+        }
+      }
+
+      def assertLeftRowsFor2ndJoin(expected: Seq[Row]): AssertOnQuery = {
+        assertStateStoreRows(0L, "left", expected) { df =>
+          df.selectExpr("CAST(value.eventTime AS long)", "value.v1", 
"value.v2")
+        }
+      }
+
+      def assertRightRowsFor2ndJoin(expected: Seq[Row]): AssertOnQuery = {
+        assertStateStoreRows(0L, "right", expected) { df =>
+          df.selectExpr("CAST(value.eventTime AS long)", "value.v3")
+        }
+      }
+
+      testStream(join)(
+        // batch 0
+        // WM: late event = 0, eviction = 0
+        MultiAddData(
+          (memoryStream1, Seq((20L, 1))),
+          (memoryStream2, Seq((20L, 1))),
+          (memoryStream3, Seq((20L, 1)))
+        ),
+        CheckNewAnswer((20, 1, 1, 1)),
+        assertLeftRowsFor1stJoin(Seq(Row(20, 1))),
+        assertRightRowsFor1stJoin(Seq(Row(20, 1))),
+        assertLeftRowsFor2ndJoin(Seq(Row(20, 1, 1))),
+        assertRightRowsFor2ndJoin(Seq(Row(20, 1))),
+        // batch 1
+        // WM: late event = 0, eviction = 20
+        MultiAddData(
+          (memoryStream1, Seq((21L, 2))),
+          (memoryStream2, Seq((21L, 2)))
+        ),
+        CheckNewAnswer(),
+        assertLeftRowsFor1stJoin(Seq(Row(21, 2))),
+        assertRightRowsFor1stJoin(Seq(Row(21, 2))),
+        assertLeftRowsFor2ndJoin(Seq(Row(21, 2, 2))),
+        assertRightRowsFor2ndJoin(Seq()),
+        // batch 2
+        // WM: late event = 20, eviction = 20 (slowest: inputStream3)
+        MultiAddData(
+          (memoryStream1, Seq((22L, 3))),
+          (memoryStream3, Seq((22L, 3)))
+        ),
+        CheckNewAnswer(),
+        assertLeftRowsFor1stJoin(Seq(Row(21, 2), Row(22, 3))),
+        assertRightRowsFor1stJoin(Seq(Row(21, 2))),
+        assertLeftRowsFor2ndJoin(Seq(Row(21, 2, 2))),
+        assertRightRowsFor2ndJoin(Seq(Row(22, 3))),
+        // batch 3
+        // WM: late event = 20, eviction = 21 (slowest: inputStream2)
+        AddData(memoryStream1, (23L, 4)),
+        CheckNewAnswer(Row(21, 2, 2, null)),
+        assertLeftRowsFor1stJoin(Seq(Row(22, 3), Row(23, 4))),
+        assertRightRowsFor1stJoin(Seq()),
+        assertLeftRowsFor2ndJoin(Seq()),
+        assertRightRowsFor2ndJoin(Seq(Row(22, 3))),
+        // batch 4
+        // WM: late event = 21, eviction = 21 (slowest: inputStream2)
+        MultiAddData(
+          (memoryStream1, Seq((24L, 5))),
+          (memoryStream2, Seq((24L, 5))),
+          (memoryStream3, Seq((24L, 5)))
+        ),
+        CheckNewAnswer(Row(24, 5, 5, 5)),
+        assertLeftRowsFor1stJoin(Seq(Row(22, 3), Row(23, 4), Row(24, 5))),
+        assertRightRowsFor1stJoin(Seq(Row(24, 5))),
+        assertLeftRowsFor2ndJoin(Seq(Row(24, 5, 5))),
+        assertRightRowsFor2ndJoin(Seq(Row(22, 3), Row(24, 5))),
+        // batch 5
+        // WM: late event = 21, eviction = 24
+        // just trigger a new batch with arbitrary data as the original test 
relies on no-data
+        // batch, and we need to check with remaining unmatched outputs
+        AddData(memoryStream1, (100L, 6)),
+        // Before SPARK-49829, the test fails because (23, 4, null, null) 
wasn't produced.
+        // (The assertion of state for left inputs & right inputs weren't 
included on the test
+        // before SPARK-49829.)
+        CheckNewAnswer(Row(22, 3, null, 3), Row(23, 4, null, null))
+      )
+
+      /*
+      // The collection of the above new answers is the same with below in 
original test:
+      val expected = Array(
+        Row(Timestamp.valueOf("2024-02-10 10:20:00"), 1, 1, 1),
+        Row(Timestamp.valueOf("2024-02-10 10:21:00"), 2, 2, null),
+        Row(Timestamp.valueOf("2024-02-10 10:22:00"), 3, null, 3),
+        Row(Timestamp.valueOf("2024-02-10 10:23:00"), 4, null, null),
+        Row(Timestamp.valueOf("2024-02-10 10:24:00"), 5, 5, 5),
+      )
+       */

Review Comment:
   It's a correct indentation; same column for both `*`. 
   
   Spark Scala style guide doesn't have a guide for multiline code comment 
except the "doc" (class/method/etc), but this is the guide for class/method/etc 
doc:
   
   ```
   /**
    * This is correct multi-line JavaDoc comment. And
    * this is my second line, and if I keep typing, this would be
    * my third line.
    */
   ```
   
   and every starting `*` is placed at the same column.
   
   Please let me know if I'm missing somthing. Let's not block the PR for this 
- if I'm missing something, I can address it as post-review comment.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to