This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new c4d5390dd03 [SPARK-39496][SQL] Handle null struct in `Inline.eval`
c4d5390dd03 is described below

commit c4d5390dd032d17a40ad50e38f0ed7bd9bbd4698
Author: Bruce Robbins <bersprock...@gmail.com>
AuthorDate: Sat Jun 18 09:25:11 2022 +0900

    [SPARK-39496][SQL] Handle null struct in `Inline.eval`
    
    ### What changes were proposed in this pull request?
    
    Change `Inline.eval` to return a row of null values rather than a null row 
in the case of a null input struct.
    
    ### Why are the changes needed?
    
    Consider the following query:
    ```
    set spark.sql.codegen.wholeStage=false;
    select inline(array(named_struct('a', 1, 'b', 2), null));
    ```
    This query fails with a `NullPointerException`:
    ```
    22/06/16 15:10:06 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
    java.lang.NullPointerException
            at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown
 Source)
            at 
org.apache.spark.sql.execution.GenerateExec.$anonfun$doExecute$11(GenerateExec.scala:122)
    ```
    (In Spark 3.1.3, you don't need to set `spark.sql.codegen.wholeStage` to 
false to reproduce the error, since Spark 3.1.3 has no codegen path for 
`Inline`).
    
    This query fails regardless of the setting of 
`spark.sql.codegen.wholeStage`:
    ```
    val dfWide = (Seq((1))
      .toDF("col0")
      .selectExpr(Seq.tabulate(99)(x => s"$x as col${x + 1}"): _*))
    
    val df = (dfWide
      .selectExpr("*", "array(named_struct('a', 1, 'b', 2), null) as 
struct_array"))
    
    df.selectExpr("*", "inline(struct_array)").collect
    ```
    It fails with
    ```
    22/06/16 15:18:55 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 
0)/ 1]
    java.lang.NullPointerException
            at 
org.apache.spark.sql.catalyst.expressions.JoinedRow.isNullAt(JoinedRow.scala:80)
            at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.writeFields_0_8$(Unknown
 Source)
    ```
    When `Inline.eval` returns a null row in the collection, GenerateExec gets 
a NullPointerException either when joining the null row with required child 
output, or projecting the null row.
    
    This PR avoids producing the null row and produces a row of null values 
instead:
    ```
    spark-sql> set spark.sql.codegen.wholeStage=false;
    spark.sql.codegen.wholeStage    false
    Time taken: 3.095 seconds, Fetched 1 row(s)
    spark-sql> select inline(array(named_struct('a', 1, 'b', 2), null));
    1       2
    NULL    NULL
    Time taken: 1.214 seconds, Fetched 2 row(s)
    spark-sql>
    ```
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    New unit test.
    
    Closes #36903 from bersprockets/inline_eval_null_struct_issue.
    
    Authored-by: Bruce Robbins <bersprock...@gmail.com>
    Signed-off-by: Hyukjin Kwon <gurwls...@apache.org>
---
 .../apache/spark/sql/catalyst/expressions/generators.scala  |  8 ++++++--
 .../scala/org/apache/spark/sql/GeneratorFunctionSuite.scala | 13 ++++++++++++-
 2 files changed, 18 insertions(+), 3 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala
index 1079f0a333d..d305b4d3700 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala
@@ -452,13 +452,17 @@ case class Inline(child: Expression) extends 
UnaryExpression with CollectionGene
 
   private lazy val numFields = elementSchema.fields.length
 
+  private lazy val generatorNullRow = new 
GenericInternalRow(elementSchema.length)
+
   override def eval(input: InternalRow): TraversableOnce[InternalRow] = {
     val inputArray = child.eval(input).asInstanceOf[ArrayData]
     if (inputArray == null) {
       Nil
     } else {
-      for (i <- 0 until inputArray.numElements())
-        yield inputArray.getStruct(i, numFields)
+      for (i <- 0 until inputArray.numElements()) yield {
+        val s = inputArray.getStruct(i, numFields)
+        if (s == null) generatorNullRow else s
+      }
     }
   }
 
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/GeneratorFunctionSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/GeneratorFunctionSuite.scala
index 20d6588dd6d..648f9cac216 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/GeneratorFunctionSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/GeneratorFunctionSuite.scala
@@ -23,6 +23,7 @@ import 
org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCo
 import org.apache.spark.sql.catalyst.expressions.codegen.Block._
 import org.apache.spark.sql.catalyst.trees.LeafLike
 import org.apache.spark.sql.functions._
+import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.SharedSparkSession
 import org.apache.spark.sql.types.{IntegerType, StructType}
 
@@ -390,7 +391,7 @@ class GeneratorFunctionSuite extends QueryTest with 
SharedSparkSession {
     }
   }
 
-  test("SPARK-39061: inline should handle null struct") {
+  def testNullStruct(): Unit = {
     val df = sql(
       """select * from values
         |(
@@ -414,6 +415,16 @@ class GeneratorFunctionSuite extends QueryTest with 
SharedSparkSession {
       sql("select a, inline(b) from t1"),
       Row(1, 0, 1) :: Row(1, null, null) :: Row(1, 2, 3) :: Row(1, null, null) 
:: Nil)
   }
+
+  test("SPARK-39061: inline should handle null struct") {
+    testNullStruct
+  }
+
+  test("SPARK-39496: inline eval path should handle null struct") {
+    withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "false") {
+      testNullStruct
+    }
+  }
 }
 
 case class EmptyGenerator() extends Generator with LeafLike[Expression] {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to