dongjoon-hyun commented on code in PR #48297:
URL: https://github.com/apache/spark/pull/48297#discussion_r1804998996
##########
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala:
##########
@@ -1966,4 +2047,128 @@ class StreamingLeftSemiJoinSuite extends
StreamingJoinSuite {
assertNumStateRows(total = 9, updated = 4)
)
}
+
+ test("SPARK-49829 two chained stream-stream left outer joins among three
input streams") {
+ withSQLConf(SQLConf.STREAMING_NO_DATA_MICRO_BATCHES_ENABLED.key ->
"false") {
+ val memoryStream1 = MemoryStream[(Long, Int)]
+ val memoryStream2 = MemoryStream[(Long, Int)]
+ val memoryStream3 = MemoryStream[(Long, Int)]
+
+ val data1 = memoryStream1.toDF()
+ .selectExpr("timestamp_seconds(_1) AS eventTime", "_2 AS v1")
+ .withWatermark("eventTime", "0 seconds")
+ val data2 = memoryStream2.toDF()
+ .selectExpr("timestamp_seconds(_1) AS eventTime", "_2 AS v2")
+ .withWatermark("eventTime", "0 seconds")
+ val data3 = memoryStream3.toDF()
+ .selectExpr("timestamp_seconds(_1) AS eventTime", "_2 AS v3")
+ .withWatermark("eventTime", "0 seconds")
+
+ val join = data1
+ .join(data2, Seq("eventTime"), "leftOuter")
+ .join(data3, Seq("eventTime"), "leftOuter")
+ .selectExpr("CAST(eventTime AS long) AS eventTime", "v1", "v2", "v3")
+
+ def assertLeftRowsFor1stJoin(expected: Seq[Row]): AssertOnQuery = {
+ assertStateStoreRows(1L, "left", expected) { df =>
+ df.selectExpr("CAST(value.eventTime AS long)", "value.v1")
+ }
+ }
+
+ def assertRightRowsFor1stJoin(expected: Seq[Row]): AssertOnQuery = {
+ assertStateStoreRows(1L, "right", expected) { df =>
+ df.selectExpr("CAST(value.eventTime AS long)", "value.v2")
+ }
+ }
+
+ def assertLeftRowsFor2ndJoin(expected: Seq[Row]): AssertOnQuery = {
+ assertStateStoreRows(0L, "left", expected) { df =>
+ df.selectExpr("CAST(value.eventTime AS long)", "value.v1",
"value.v2")
+ }
+ }
+
+ def assertRightRowsFor2ndJoin(expected: Seq[Row]): AssertOnQuery = {
+ assertStateStoreRows(0L, "right", expected) { df =>
+ df.selectExpr("CAST(value.eventTime AS long)", "value.v3")
+ }
+ }
+
+ testStream(join)(
+ // batch 0
+ // WM: late event = 0, eviction = 0
+ MultiAddData(
+ (memoryStream1, Seq((20L, 1))),
+ (memoryStream2, Seq((20L, 1))),
+ (memoryStream3, Seq((20L, 1)))
+ ),
+ CheckNewAnswer((20, 1, 1, 1)),
+ assertLeftRowsFor1stJoin(Seq(Row(20, 1))),
+ assertRightRowsFor1stJoin(Seq(Row(20, 1))),
+ assertLeftRowsFor2ndJoin(Seq(Row(20, 1, 1))),
+ assertRightRowsFor2ndJoin(Seq(Row(20, 1))),
+ // batch 1
+ // WM: late event = 0, eviction = 20
+ MultiAddData(
+ (memoryStream1, Seq((21L, 2))),
+ (memoryStream2, Seq((21L, 2)))
+ ),
+ CheckNewAnswer(),
+ assertLeftRowsFor1stJoin(Seq(Row(21, 2))),
+ assertRightRowsFor1stJoin(Seq(Row(21, 2))),
+ assertLeftRowsFor2ndJoin(Seq(Row(21, 2, 2))),
+ assertRightRowsFor2ndJoin(Seq()),
+ // batch 2
+ // WM: late event = 20, eviction = 20 (slowest: inputStream3)
+ MultiAddData(
+ (memoryStream1, Seq((22L, 3))),
+ (memoryStream3, Seq((22L, 3)))
+ ),
+ CheckNewAnswer(),
+ assertLeftRowsFor1stJoin(Seq(Row(21, 2), Row(22, 3))),
+ assertRightRowsFor1stJoin(Seq(Row(21, 2))),
+ assertLeftRowsFor2ndJoin(Seq(Row(21, 2, 2))),
+ assertRightRowsFor2ndJoin(Seq(Row(22, 3))),
+ // batch 3
+ // WM: late event = 20, eviction = 21 (slowest: inputStream2)
+ AddData(memoryStream1, (23L, 4)),
+ CheckNewAnswer(Row(21, 2, 2, null)),
+ assertLeftRowsFor1stJoin(Seq(Row(22, 3), Row(23, 4))),
+ assertRightRowsFor1stJoin(Seq()),
+ assertLeftRowsFor2ndJoin(Seq()),
+ assertRightRowsFor2ndJoin(Seq(Row(22, 3))),
+ // batch 4
+ // WM: late event = 21, eviction = 21 (slowest: inputStream2)
+ MultiAddData(
+ (memoryStream1, Seq((24L, 5))),
+ (memoryStream2, Seq((24L, 5))),
+ (memoryStream3, Seq((24L, 5)))
+ ),
+ CheckNewAnswer(Row(24, 5, 5, 5)),
+ assertLeftRowsFor1stJoin(Seq(Row(22, 3), Row(23, 4), Row(24, 5))),
+ assertRightRowsFor1stJoin(Seq(Row(24, 5))),
+ assertLeftRowsFor2ndJoin(Seq(Row(24, 5, 5))),
+ assertRightRowsFor2ndJoin(Seq(Row(22, 3), Row(24, 5))),
+ // batch 5
+ // WM: late event = 21, eviction = 24
+ // just trigger a new batch with arbitrary data as the original test
relies on no-data
+ // batch, and we need to check with remaining unmatched outputs
+ AddData(memoryStream1, (100L, 6)),
+ // Before SPARK-49829, the test fails because (23, 4, null, null)
wasn't produced.
+ // (The assertion of state for left inputs & right inputs weren't
included on the test
+ // before SPARK-49829.)
+ CheckNewAnswer(Row(22, 3, null, 3), Row(23, 4, null, null))
+ )
+
+ /*
+ // The collection of the above new answers is the same with below in
original test:
+ val expected = Array(
+ Row(Timestamp.valueOf("2024-02-10 10:20:00"), 1, 1, 1),
+ Row(Timestamp.valueOf("2024-02-10 10:21:00"), 2, 2, null),
+ Row(Timestamp.valueOf("2024-02-10 10:22:00"), 3, null, 3),
+ Row(Timestamp.valueOf("2024-02-10 10:23:00"), 4, null, null),
+ Row(Timestamp.valueOf("2024-02-10 10:24:00"), 5, 5, 5),
+ )
+ */
Review Comment:
nit, indentation?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]