HeartSaVioR commented on a change in pull request #36002:
URL: https://github.com/apache/spark/pull/36002#discussion_r837377919



##########
File path: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala
##########
@@ -1353,6 +1353,67 @@ class StreamingOuterJoinSuite extends StreamingJoinSuite 
{
     ).select(Symbol("leftKey1"), Symbol("rightKey1"), Symbol("leftKey2"), 
Symbol("rightKey2"),
       $"leftWindow.end".cast("long"), Symbol("leftValue"), 
Symbol("rightValue"))
   }
+
+  test("SPARK-38684: outer join works correctly even if processing input rows 
and " +
+    "evicting state rows for same grouping key happens in the same 
micro-batch") {
+
+    // The test is to demonstrate the correctness issue in outer join before 
SPARK-38684.
+    withSQLConf(
+      SQLConf.STREAMING_NO_DATA_MICRO_BATCHES_ENABLED.key -> "false",
+      SQLConf.STATE_STORE_PROVIDER_CLASS.key -> 
classOf[RocksDBStateStoreProvider].getName) {
+
+      val input1 = MemoryStream[(Timestamp, String, String)]
+      val df1 = input1.toDF
+        .selectExpr("_1 as eventTime", "_2 as id", "_3 as comment")
+        .withWatermark("eventTime", "0 second")
+
+      val input2 = MemoryStream[(Timestamp, String, String)]
+      val df2 = input2.toDF
+        .selectExpr("_1 as eventTime", "_2 as id", "_3 as comment")
+        .withWatermark("eventTime", "0 second")
+
+      val joined = df1.as("left")
+        .join(df2.as("right"),
+          expr("""
+                 |left.id = right.id AND left.eventTime BETWEEN
+                 |  right.eventTime - INTERVAL 30 seconds AND
+                 |  right.eventTime + INTERVAL 30 seconds
+             """.stripMargin),
+          joinType = "leftOuter")
+
+      testStream(joined)(
+        MultiAddData(
+          (input1, Seq((Timestamp.valueOf("2020-01-02 00:00:00"), "abc", "left 
in batch 1"))),
+          (input2, Seq((Timestamp.valueOf("2020-01-02 00:01:00"), "abc", 
"right in batch 1")))
+        ),
+        CheckNewAnswer(),
+        MultiAddData(
+          (input1, Seq((Timestamp.valueOf("2020-01-02 01:00:00"), "abc", "left 
in batch 2"))),
+          (input2, Seq((Timestamp.valueOf("2020-01-02 01:01:00"), "abc", 
"right in batch 2")))
+        ),
+        // watermark advanced to "2020-01-02 00:00:00"
+        CheckNewAnswer(),
+        AddData(input1, (Timestamp.valueOf("2020-01-02 01:30:00"), "abc", 
"left in batch 3")),
+        // watermark advanced to "2020-01-02 01:00:00"
+        CheckNewAnswer(
+          (Timestamp.valueOf("2020-01-02 00:00:00"), "abc", "left in batch 1", 
null, null, null)
+        ),
+        // left side state should still contain "left in batch 2" and "left in 
batch 3"
+        // we should see both rows in the left side since
+        // - "left in batch 2" is going to be evicted in this batch
+        // - "left in batch 3" is going to be matched with new row in right 
side
+        AddData(input2,
+          (Timestamp.valueOf("2020-01-02 01:30:10"), "abc", "match with left 
in batch 3")),
+        // watermark advanced to "2020-01-02 01:01:00"
+        CheckNewAnswer(
+          (Timestamp.valueOf("2020-01-02 01:00:00"), "abc", "left in batch 2",

Review comment:
       Without this fix, Spark only produces one row instead of two.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to