neilramaswamy commented on code in PR #48297:
URL: https://github.com/apache/spark/pull/48297#discussion_r1796553152
##########
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala:
##########
@@ -1559,6 +1581,64 @@ class StreamingOuterJoinSuite extends StreamingJoinSuite
{
)
}
}
+
+ test("SPARK-49829 left-outer join, input being unmatched is between WM for
late event and " +
+ "WM for eviction") {
+
+ withTempDir { checkpoint =>
+ // This config needs to be set, otherwise no-data batch will be
triggered and after
+ // no-data batch, WM for late event and WM for eviction would be same.
+ withSQLConf(SQLConf.STREAMING_NO_DATA_MICRO_BATCHES_ENABLED.key ->
"false") {
+ val memoryStream1 = MemoryStream[(String, Int)]
+ val memoryStream2 = MemoryStream[(String, Int)]
+
+ val data1 = memoryStream1.toDF()
+ .selectExpr("_1 AS key", "timestamp_seconds(_2) AS eventTime")
+ .withWatermark("eventTime", "0 seconds")
+ val data2 = memoryStream2.toDF()
+ .selectExpr("_1 AS key", "timestamp_seconds(_2) AS eventTime")
+ .withWatermark("eventTime", "0 seconds")
+
+ val joinedDf = data1.join(data2, Seq("key", "eventTime"), "leftOuter")
+ .selectExpr("key", "CAST(eventTime AS long) AS eventTime")
+
+ def assertLeftRows(expected: Seq[Row]): AssertOnQuery = {
+ assertStateStoreRows(0L, "left", expected) { df =>
+ df.selectExpr("value.key", "CAST(value.eventTime AS long)")
+ }
+ }
+
+ def assertRightRows(expected: Seq[Row]): AssertOnQuery = {
+ assertStateStoreRows(0L, "right", expected) { df =>
+ df.selectExpr("value.key", "CAST(value.eventTime AS long)")
+ }
+ }
+
+ testStream(joinedDf)(
+ StartStream(checkpointLocation = checkpoint.getCanonicalPath),
+ // batch 0
+ // WM: late record = 0, eviction = 0
+ MultiAddData(
+ (memoryStream1, Seq(("a", 1), ("b", 2))),
+ (memoryStream2, Seq(("b", 2), ("c", 1)))
+ ),
+ CheckNewAnswer(("b", 2)),
+ assertLeftRows(Seq(Row("a", 1), Row("b", 2))),
+ assertRightRows(Seq(Row("b", 2), Row("c", 1))),
+ // batch 1
+ // WM: late record = 0, eviction = 2
+ // Before Spark introduces multiple stateful operator, WM for late
record was same as
+ // WM for eviction, hence ("d", 1) was treated as late record.
+ // With the multiple state operator, ("d", 1) is added in batch 1
but also evicted in
+ // batch 1. Before SPARK-49829, this wasn't producing unmatched row,
and it is fixed.
Review Comment:
Yeah, probably good thing to mention state watermark. In the new test that I
will add, I will mention the state watermark. Should be simple here as
watermark = 0 and there's no time interval condition. It's just the watermark
for eviction.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]