This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new c0982621f46b [SPARK-48267][SS] Regression e2e test with SPARK-47305
c0982621f46b is described below

commit c0982621f46b696c7c4c6a805ae0c7d101570929
Author: Jungtaek Lim <kabhwan.opensou...@gmail.com>
AuthorDate: Tue May 14 15:40:51 2024 +0900

    [SPARK-48267][SS] Regression e2e test with SPARK-47305
    
    ### What changes were proposed in this pull request?
    
    This PR proposes to add a regression test (e2e) with SPARK-47305.
    
    As of commit cae2248bc13 (pre-Spark 4.0), the query in new unit test is 
represented as below logical plans:
    
    > Batch 0
    
    >> analyzed plan
    
    ```
    WriteToMicroBatchDataSource MemorySink, 
5067923b-e1d0-484c-914c-b111c9e60aac, Append, 0
    +- Project [value#1]
       +- Join Inner, (cast(code#5 as bigint) = ref_code#14L)
          :- Union false, false
          :  :- Project [value#1, 1 AS code#5]
          :  :  +- StreamingDataSourceV2ScanRelation[value#1] 
MemoryStreamDataSource
          :  +- Project [value#3, cast(code#9 as int) AS code#16]
          :     +- Project [value#3, null AS code#9]
          :        +- LocalRelation <empty>, [value#3]
          +- Project [id#12L AS ref_code#14L]
             +- Range (1, 5, step=1, splits=Some(2))
    ```
    
    >> optimized plan
    
    ```
    WriteToDataSourceV2 MicroBatchWrite[epoch: 0, writer: ...]
    +- Join Inner
       :- StreamingDataSourceV2ScanRelation[value#1] MemoryStreamDataSource
       +- Project
          +- Filter (1 = id#12L)
             +- Range (1, 5, step=1, splits=Some(2))
    ```
    
    > Batch 1
    
    >> analyzed plan
    
    ```
    WriteToMicroBatchDataSource MemorySink, 
d1c8be66-88e7-437a-9f25-6b87db8efe17, Append, 1
    +- Project [value#1]
       +- Join Inner, (cast(code#5 as bigint) = ref_code#14L)
          :- Union false, false
          :  :- Project [value#1, 1 AS code#5]
          :  :  +- LocalRelation <empty>, [value#1]
          :  +- Project [value#3, cast(code#9 as int) AS code#16]
          :     +- Project [value#3, null AS code#9]
          :        +- StreamingDataSourceV2ScanRelation[value#3] 
MemoryStreamDataSource
          +- Project [id#12L AS ref_code#14L]
             +- Range (1, 5, step=1, splits=Some(2))
    ```
    
    >> optimized plan
    
    ```
    WriteToDataSourceV2 MicroBatchWrite[epoch: 1, writer: ...]
    +- Join Inner
       :- StreamingDataSourceV2ScanRelation[value#3] MemoryStreamDataSource
       +- LocalRelation <empty>
    ```
    
    Notice the difference in optimized plan between batch 0 and batch 1. In 
optimized plan for batch 1, the batch side is pruned out, which goes with the 
path of PruneFilters. The sequence of optimization is,
    
    1) left stream side is collapsed with empty local relation
    2) union is replaced with subtree for right stream side as left stream side 
is simply an empty local relation
    3) the value of 'code' column is now known to be 'null' and it's propagated 
to the join criteria (`null = ref_code`)
    4) join criteria is extracted out from join, and being pushed to the batch 
side
    5) the value of 'ref_code' column can never be null, hence the filter is 
optimized as `filter false`
    6) `filter false` triggers PruneFilters (where we fix a bug in SPARK-47305)
    
    Before SPARK-47305, a new empty local relation was incorrectly marked as 
streaming.
    
    NOTE: I intentionally didn't put the detail like above as code comment, as 
optimization result is subject to change for Spark versions.
    
    ### Why are the changes needed?
    
    In the PR of SPARK-47305 we only added an unit test to verify the fix, but 
it wasn't e2e about the workload we encountered an issue. Given the complexity 
of QO, it'd be ideal to put an e2e reproducer (despite simplified) as 
regression test.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    New UT.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #46569 from HeartSaVioR/SPARK-48267.
    
    Authored-by: Jungtaek Lim <kabhwan.opensou...@gmail.com>
    Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com>
---
 ...treamingQueryOptimizationCorrectnessSuite.scala | 37 +++++++++++++++++++++-
 1 file changed, 36 insertions(+), 1 deletion(-)

diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryOptimizationCorrectnessSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryOptimizationCorrectnessSuite.scala
index efc84c8e4c7c..d17da5d31edd 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryOptimizationCorrectnessSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryOptimizationCorrectnessSuite.scala
@@ -21,7 +21,7 @@ import java.sql.Timestamp
 
 import org.apache.spark.sql.Row
 import org.apache.spark.sql.execution.streaming.MemoryStream
-import org.apache.spark.sql.functions.{lit, window}
+import org.apache.spark.sql.functions.{expr, lit, window}
 
 /**
  * This test ensures that any optimizations done by Spark SQL optimizer are
@@ -416,4 +416,39 @@ class StreamingQueryOptimizationCorrectnessSuite extends 
StreamTest {
       )
     }
   }
+
+  test("SPARK-48267: regression test, stream-stream union followed by 
stream-batch join") {
+    withTempDir { dir =>
+      val input1 = MemoryStream[Int]
+      val input2 = MemoryStream[Int]
+
+      val df1 = input1.toDF().withColumn("code", lit(1))
+      val df2 = input2.toDF().withColumn("code", lit(null))
+
+      // NOTE: The column 'ref_code' is known to be non-nullable.
+      val batchDf = spark.range(1, 5).select($"id".as("ref_code"))
+
+      val unionDf = df1.union(df2)
+        .join(batchDf, expr("code = ref_code"))
+        .select("value")
+
+      testStream(unionDf)(
+        StartStream(checkpointLocation = dir.getAbsolutePath),
+
+        AddData(input1, 1, 2, 3),
+        CheckNewAnswer(1, 2, 3),
+
+        AddData(input2, 1, 2, 3),
+        // The test failed before SPARK-47305 - the test failed with below 
error message:
+        // org.apache.spark.sql.streaming.StreamingQueryException: 
Stream-stream join without
+        // equality predicate is not supported.;
+        // Join Inner
+        // :- StreamingDataSourceV2ScanRelation[value#3] MemoryStreamDataSource
+        // +- LocalRelation <empty>
+        // Note that LocalRelation <empty> is actually a batch source (Range) 
but due to
+        // a bug, it was incorrect marked to the streaming. SPARK-47305 fixed 
the bug.
+        CheckNewAnswer()
+      )
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to