This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new d92e66643e8d [SPARK-47104][SQL] `TakeOrderedAndProjectExec` should 
initialize the unsafe projection
d92e66643e8d is described below

commit d92e66643e8dc4e5395a0b8c2f17a72afed1528e
Author: Bruce Robbins <bersprock...@gmail.com>
AuthorDate: Wed Feb 21 12:53:17 2024 -0800

    [SPARK-47104][SQL] `TakeOrderedAndProjectExec` should initialize the unsafe 
projection
    
    ### What changes were proposed in this pull request?
    
    Change `TakeOrderedAndProjectExec#executeCollect` and 
`TakeOrderedAndProjectExec#doExecute` to initialize the unsafe projection 
before using it to produce output rows.
    
    ### Why are the changes needed?
    
    Because the unsafe projection is not initialized, non-deterministic 
expressions also don't get initialized. This results in errors when the 
projection contains non-deterministic expressions. For example:
    ```
    create or replace temp view v1(id, name) as values
    (1, "fred"),
    (2, "bob");
    
    cache table v1;
    
    select name, uuid() as _iid from (
      select * from v1 order by name
    )
    limit 20;
    ```
    This query produces the following error:
    ```
    java.lang.NullPointerException: Cannot invoke 
"org.apache.spark.sql.catalyst.util.RandomUUIDGenerator.getNextUUIDUTF8String()"
 because "this.randomGen_0" is null
            at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown
 Source)
            at 
org.apache.spark.sql.execution.TakeOrderedAndProjectExec.$anonfun$executeCollect$6(limit.scala:297)
            at scala.collection.ArrayOps$.map$extension(ArrayOps.scala:934)
            at 
org.apache.spark.sql.execution.TakeOrderedAndProjectExec.$anonfun$executeCollect$1(limit.scala:297)
    ...
    ```
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    New test.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #45199 from bersprockets/take_ordered_issue.
    
    Authored-by: Bruce Robbins <bersprock...@gmail.com>
    Signed-off-by: Dongjoon Hyun <dh...@apple.com>
---
 .../org/apache/spark/sql/execution/limit.scala     |  4 ++-
 .../sql/execution/TakeOrderedAndProjectSuite.scala | 36 +++++++++++++++++++++-
 2 files changed, 38 insertions(+), 2 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala
index 0dc4a69c0758..37fe2565d8f7 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala
@@ -294,6 +294,7 @@ case class TakeOrderedAndProjectExec(
     val data = if (offset > 0) limited.drop(offset) else limited
     if (projectList != child.output) {
       val proj = UnsafeProjection.create(projectList, child.output)
+      proj.initialize(0)
       data.map(r => proj(r).copy())
     } else {
       data
@@ -335,11 +336,12 @@ case class TakeOrderedAndProjectExec(
             writeMetrics),
           readMetrics)
       }
-      singlePartitionRDD.mapPartitionsInternal { iter =>
+      singlePartitionRDD.mapPartitionsWithIndexInternal { (idx, iter) =>
         val limited = Utils.takeOrdered(iter.map(_.copy()), limit)(ord)
         val topK = if (offset > 0) limited.drop(offset) else limited
         if (projectList != child.output) {
           val proj = UnsafeProjection.create(projectList, child.output)
+          proj.initialize(idx)
           topK.map(r => proj(r))
         } else {
           topK
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/TakeOrderedAndProjectSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/TakeOrderedAndProjectSuite.scala
index 647d46f8fbf9..c0ed9777e4e5 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/TakeOrderedAndProjectSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/TakeOrderedAndProjectSuite.scala
@@ -21,7 +21,7 @@ import scala.util.Random
 
 import org.apache.spark.sql.{DataFrame, Row}
 import org.apache.spark.sql.catalyst.dsl.expressions._
-import org.apache.spark.sql.catalyst.expressions.Literal
+import org.apache.spark.sql.catalyst.expressions.{Alias, Literal, Rand}
 import org.apache.spark.sql.test.SharedSparkSession
 import org.apache.spark.sql.types._
 
@@ -127,4 +127,38 @@ class TakeOrderedAndProjectSuite extends SparkPlanTest 
with SharedSparkSession {
       }
     }
   }
+
+  test("SPARK-47104: Non-deterministic expressions in projection") {
+    val expected = (input: SparkPlan) => {
+      GlobalLimitExec(limit,
+        LocalLimitExec(limit,
+          SortExec(sortOrder, true, input)))
+    }
+    val schema = StructType.fromDDL("a int, b int, c double")
+    val rdd = sparkContext.parallelize(
+      Seq(Row(1, 2, 0.0953472826424725d),
+        Row(2, 3, 0.5234194256885571d),
+        Row(3, 4, 0.7604953758285915d)), 1)
+    val df = spark.createDataFrame(rdd, schema)
+    val projection = df.queryExecution.sparkPlan.output.take(2) :+
+      Alias(Rand(Literal(0, IntegerType)), "_uuid")()
+
+    // test executeCollect
+    checkThatPlansAgree(
+      df,
+      input =>
+        TakeOrderedAndProjectExec(limit, sortOrder, projection,
+          SortExec(sortOrder, false, input)),
+      input => expected(input),
+      sortAnswers = false)
+
+    // test doExecute
+    checkThatPlansAgree(
+      df,
+      input =>
+        noOpFilter(TakeOrderedAndProjectExec(limit, sortOrder, projection,
+          SortExec(sortOrder, false, input))),
+      input => expected(input),
+      sortAnswers = false)
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to