This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new d92e66643e8d [SPARK-47104][SQL] `TakeOrderedAndProjectExec` should initialize the unsafe projection d92e66643e8d is described below commit d92e66643e8dc4e5395a0b8c2f17a72afed1528e Author: Bruce Robbins <bersprock...@gmail.com> AuthorDate: Wed Feb 21 12:53:17 2024 -0800 [SPARK-47104][SQL] `TakeOrderedAndProjectExec` should initialize the unsafe projection ### What changes were proposed in this pull request? Change `TakeOrderedAndProjectExec#executeCollect` and `TakeOrderedAndProjectExec#doExecute` to initialize the unsafe projection before using it to produce output rows. ### Why are the changes needed? Because the unsafe projection is not initialized, non-deterministic expressions also don't get initialized. This results in errors when the projection contains non-deterministic expressions. For example: ``` create or replace temp view v1(id, name) as values (1, "fred"), (2, "bob"); cache table v1; select name, uuid() as _iid from ( select * from v1 order by name ) limit 20; ``` This query produces the following error: ``` java.lang.NullPointerException: Cannot invoke "org.apache.spark.sql.catalyst.util.RandomUUIDGenerator.getNextUUIDUTF8String()" because "this.randomGen_0" is null at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source) at org.apache.spark.sql.execution.TakeOrderedAndProjectExec.$anonfun$executeCollect$6(limit.scala:297) at scala.collection.ArrayOps$.map$extension(ArrayOps.scala:934) at org.apache.spark.sql.execution.TakeOrderedAndProjectExec.$anonfun$executeCollect$1(limit.scala:297) ... ``` ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? New test. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #45199 from bersprockets/take_ordered_issue. Authored-by: Bruce Robbins <bersprock...@gmail.com> Signed-off-by: Dongjoon Hyun <dh...@apple.com> --- .../org/apache/spark/sql/execution/limit.scala | 4 ++- .../sql/execution/TakeOrderedAndProjectSuite.scala | 36 +++++++++++++++++++++- 2 files changed, 38 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala index 0dc4a69c0758..37fe2565d8f7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala @@ -294,6 +294,7 @@ case class TakeOrderedAndProjectExec( val data = if (offset > 0) limited.drop(offset) else limited if (projectList != child.output) { val proj = UnsafeProjection.create(projectList, child.output) + proj.initialize(0) data.map(r => proj(r).copy()) } else { data @@ -335,11 +336,12 @@ case class TakeOrderedAndProjectExec( writeMetrics), readMetrics) } - singlePartitionRDD.mapPartitionsInternal { iter => + singlePartitionRDD.mapPartitionsWithIndexInternal { (idx, iter) => val limited = Utils.takeOrdered(iter.map(_.copy()), limit)(ord) val topK = if (offset > 0) limited.drop(offset) else limited if (projectList != child.output) { val proj = UnsafeProjection.create(projectList, child.output) + proj.initialize(idx) topK.map(r => proj(r)) } else { topK diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/TakeOrderedAndProjectSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/TakeOrderedAndProjectSuite.scala index 647d46f8fbf9..c0ed9777e4e5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/TakeOrderedAndProjectSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/TakeOrderedAndProjectSuite.scala @@ -21,7 +21,7 @@ import scala.util.Random import org.apache.spark.sql.{DataFrame, Row} import org.apache.spark.sql.catalyst.dsl.expressions._ -import org.apache.spark.sql.catalyst.expressions.Literal +import org.apache.spark.sql.catalyst.expressions.{Alias, Literal, Rand} import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.sql.types._ @@ -127,4 +127,38 @@ class TakeOrderedAndProjectSuite extends SparkPlanTest with SharedSparkSession { } } } + + test("SPARK-47104: Non-deterministic expressions in projection") { + val expected = (input: SparkPlan) => { + GlobalLimitExec(limit, + LocalLimitExec(limit, + SortExec(sortOrder, true, input))) + } + val schema = StructType.fromDDL("a int, b int, c double") + val rdd = sparkContext.parallelize( + Seq(Row(1, 2, 0.0953472826424725d), + Row(2, 3, 0.5234194256885571d), + Row(3, 4, 0.7604953758285915d)), 1) + val df = spark.createDataFrame(rdd, schema) + val projection = df.queryExecution.sparkPlan.output.take(2) :+ + Alias(Rand(Literal(0, IntegerType)), "_uuid")() + + // test executeCollect + checkThatPlansAgree( + df, + input => + TakeOrderedAndProjectExec(limit, sortOrder, projection, + SortExec(sortOrder, false, input)), + input => expected(input), + sortAnswers = false) + + // test doExecute + checkThatPlansAgree( + df, + input => + noOpFilter(TakeOrderedAndProjectExec(limit, sortOrder, projection, + SortExec(sortOrder, false, input))), + input => expected(input), + sortAnswers = false) + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org