HeartSaVioR commented on code in PR #48297:
URL: https://github.com/apache/spark/pull/48297#discussion_r1796532095
##########
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala:
##########
@@ -1559,6 +1581,64 @@ class StreamingOuterJoinSuite extends StreamingJoinSuite
{
)
}
}
+
+ test("SPARK-49829 left-outer join, input being unmatched is between WM for
late event and " +
+ "WM for eviction") {
+
+ withTempDir { checkpoint =>
+ // This config needs to be set, otherwise no-data batch will be
triggered and after
+ // no-data batch, WM for late event and WM for eviction would be same.
+ withSQLConf(SQLConf.STREAMING_NO_DATA_MICRO_BATCHES_ENABLED.key ->
"false") {
+ val memoryStream1 = MemoryStream[(String, Int)]
+ val memoryStream2 = MemoryStream[(String, Int)]
+
+ val data1 = memoryStream1.toDF()
+ .selectExpr("_1 AS key", "timestamp_seconds(_2) AS eventTime")
+ .withWatermark("eventTime", "0 seconds")
+ val data2 = memoryStream2.toDF()
+ .selectExpr("_1 AS key", "timestamp_seconds(_2) AS eventTime")
+ .withWatermark("eventTime", "0 seconds")
+
+ val joinedDf = data1.join(data2, Seq("key", "eventTime"), "leftOuter")
+ .selectExpr("key", "CAST(eventTime AS long) AS eventTime")
+
+ def assertLeftRows(expected: Seq[Row]): AssertOnQuery = {
+ assertStateStoreRows(0L, "left", expected) { df =>
+ df.selectExpr("value.key", "CAST(value.eventTime AS long)")
+ }
+ }
+
+ def assertRightRows(expected: Seq[Row]): AssertOnQuery = {
+ assertStateStoreRows(0L, "right", expected) { df =>
+ df.selectExpr("value.key", "CAST(value.eventTime AS long)")
+ }
+ }
+
+ testStream(joinedDf)(
+ StartStream(checkpointLocation = checkpoint.getCanonicalPath),
+ // batch 0
+ // WM: late record = 0, eviction = 0
+ MultiAddData(
+ (memoryStream1, Seq(("a", 1), ("b", 2))),
+ (memoryStream2, Seq(("b", 2), ("c", 1)))
+ ),
+ CheckNewAnswer(("b", 2)),
+ assertLeftRows(Seq(Row("a", 1), Row("b", 2))),
+ assertRightRows(Seq(Row("b", 2), Row("c", 1))),
+ // batch 1
+ // WM: late record = 0, eviction = 2
+ // Before Spark introduces multiple stateful operator, WM for late
record was same as
+ // WM for eviction, hence ("d", 1) was treated as late record.
+ // With the multiple state operator, ("d", 1) is added in batch 1
but also evicted in
+ // batch 1. Before SPARK-49829, this wasn't producing unmatched row,
and it is fixed.
Review Comment:
Yes, do you want me to update the code comment? I assume the people who will
read the test must be aware of essential behavior of stream-stream join, but
probably also good to elaborate as stream-stream join is not very easy operator
to understand.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]