neilramaswamy commented on code in PR #48297:
URL: https://github.com/apache/spark/pull/48297#discussion_r1796553152


##########
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala:
##########
@@ -1559,6 +1581,64 @@ class StreamingOuterJoinSuite extends StreamingJoinSuite 
{
       )
     }
   }
+
+  test("SPARK-49829 left-outer join, input being unmatched is between WM for 
late event and " +
+    "WM for eviction") {
+
+    withTempDir { checkpoint =>
+      // This config needs to be set, otherwise no-data batch will be 
triggered and after
+      // no-data batch, WM for late event and WM for eviction would be same.
+      withSQLConf(SQLConf.STREAMING_NO_DATA_MICRO_BATCHES_ENABLED.key -> 
"false") {
+        val memoryStream1 = MemoryStream[(String, Int)]
+        val memoryStream2 = MemoryStream[(String, Int)]
+
+        val data1 = memoryStream1.toDF()
+          .selectExpr("_1 AS key", "timestamp_seconds(_2) AS eventTime")
+          .withWatermark("eventTime", "0 seconds")
+        val data2 = memoryStream2.toDF()
+          .selectExpr("_1 AS key", "timestamp_seconds(_2) AS eventTime")
+          .withWatermark("eventTime", "0 seconds")
+
+        val joinedDf = data1.join(data2, Seq("key", "eventTime"), "leftOuter")
+          .selectExpr("key", "CAST(eventTime AS long) AS eventTime")
+
+        def assertLeftRows(expected: Seq[Row]): AssertOnQuery = {
+          assertStateStoreRows(0L, "left", expected) { df =>
+            df.selectExpr("value.key", "CAST(value.eventTime AS long)")
+          }
+        }
+
+        def assertRightRows(expected: Seq[Row]): AssertOnQuery = {
+          assertStateStoreRows(0L, "right", expected) { df =>
+            df.selectExpr("value.key", "CAST(value.eventTime AS long)")
+          }
+        }
+
+        testStream(joinedDf)(
+          StartStream(checkpointLocation = checkpoint.getCanonicalPath),
+          // batch 0
+          // WM: late record = 0, eviction = 0
+          MultiAddData(
+            (memoryStream1, Seq(("a", 1), ("b", 2))),
+            (memoryStream2, Seq(("b", 2), ("c", 1)))
+          ),
+          CheckNewAnswer(("b", 2)),
+          assertLeftRows(Seq(Row("a", 1), Row("b", 2))),
+          assertRightRows(Seq(Row("b", 2), Row("c", 1))),
+          // batch 1
+          // WM: late record = 0, eviction = 2
+          // Before Spark introduces multiple stateful operator, WM for late 
record was same as
+          // WM for eviction, hence ("d", 1) was treated as late record.
+          // With the multiple state operator, ("d", 1) is added in batch 1 
but also evicted in
+          // batch 1. Before SPARK-49829, this wasn't producing unmatched row, 
and it is fixed.

Review Comment:
   Yeah, probably good thing to mention state watermark. In the new test that I 
will add, I will mention the state watermark. Should be simple here as 
watermark = 0 and there's no time interval condition. It's just the watermark 
for eviction.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to