This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 8911578020f [SPARK-44132][SQL] Materialize `Stream` of join column 
names to avoid codegen failure
8911578020f is described below

commit 8911578020f8a2428b12dd72cb0ed4b7d747d835
Author: Steven Aerts <steven.ae...@gmail.com>
AuthorDate: Tue Aug 8 08:09:05 2023 +0900

    [SPARK-44132][SQL] Materialize `Stream` of join column names to avoid 
codegen failure
    
    ### What changes were proposed in this pull request?
    
    Materialize passed join columns as an `IndexedSeq` before passing it to the 
lower layers.
    
    ### Why are the changes needed?
    
    When nesting multiple full outer joins using column names which are a 
`Stream`, the code generator will generate faulty code resulting in a NPE or 
bad `UnsafeRow` access at runtime. See the 2 added test cases.
    
    ### Why are the changes needed?
    Otherwise the code will crash, see the 2 added test cases.  Which show an 
NPE and a bad `UnsafeRow` access.
    
    ### Does this PR introduce _any_ user-facing change?
    No, only bug fix.
    
    ### How was this patch tested?
    A reproduction scenario was created and added to the code base.
    
    Closes #41712 from steven-aerts/SPARK-44132-fix.
    
    Authored-by: Steven Aerts <steven.ae...@gmail.com>
    Signed-off-by: Hyukjin Kwon <gurwls...@apache.org>
---
 .../main/scala/org/apache/spark/sql/Dataset.scala    |  2 +-
 .../sql/execution/joins/JoinCodegenSupport.scala     |  2 +-
 .../test/scala/org/apache/spark/sql/JoinSuite.scala  | 20 ++++++++++++++++++++
 3 files changed, 22 insertions(+), 2 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index 61c83829d20..eda017937d9 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -1092,7 +1092,7 @@ class Dataset[T] private[sql](
       Join(
         joined.left,
         joined.right,
-        UsingJoin(JoinType(joinType), usingColumns),
+        UsingJoin(JoinType(joinType), usingColumns.toIndexedSeq),
         None,
         JoinHint.NONE)
     }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/JoinCodegenSupport.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/JoinCodegenSupport.scala
index a7d1edefcd6..6496f9a0006 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/JoinCodegenSupport.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/JoinCodegenSupport.scala
@@ -79,7 +79,7 @@ trait JoinCodegenSupport extends CodegenSupport with 
BaseJoinExec {
       setDefaultValue: Boolean): Seq[ExprCode] = {
     ctx.currentVars = null
     ctx.INPUT_ROW = row
-    plan.output.zipWithIndex.map { case (a, i) =>
+    plan.output.toIndexedSeq.zipWithIndex.map { case (a, i) =>
       val ev = BoundReference(i, a.dataType, a.nullable).genCode(ctx)
       if (setDefaultValue) {
         // the variables are needed even there is no matched rows
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
index 7f358723eeb..14f1fb27906 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
@@ -1709,4 +1709,24 @@ class JoinSuite extends QueryTest with 
SharedSparkSession with AdaptiveSparkPlan
       checkAnswer(sql(query), expected)
     }
   }
+
+  test("SPARK-44132: FULL OUTER JOIN by streamed column name fails with NPE") {
+    val dsA = Seq((1, "a")).toDF("id", "c1")
+    val dsB = Seq((2, "b")).toDF("id", "c2")
+    val dsC = Seq((3, "c")).toDF("id", "c3")
+    val joined = dsA.join(dsB, Stream("id"), "full_outer").join(dsC, 
Stream("id"), "full_outer")
+
+    val expected = Seq(Row(1, "a", null, null), Row(2, null, "b", null), 
Row(3, null, null, "c"))
+
+    checkAnswer(joined, expected)
+  }
+
+  test("SPARK-44132: FULL OUTER JOIN by streamed column name fails with 
invalid access") {
+    val ds = Seq((1, "a")).toDF("id", "c1")
+    val joined = ds.join(ds, Stream("id"), "full_outer").join(ds, 
Stream("id"), "full_outer")
+
+    val expected = Seq(Row(1, "a", "a", "a"))
+
+    checkAnswer(joined, expected)
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to