This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 9756bbe8026c [SPARK-46289][SQL] Support ordering UDTs in interpreted mode 9756bbe8026c is described below commit 9756bbe8026c2f9863046f01949a8d6e6d647f2f Author: Bruce Robbins <bersprock...@gmail.com> AuthorDate: Mon Dec 18 11:55:24 2023 -0800 [SPARK-46289][SQL] Support ordering UDTs in interpreted mode ### What changes were proposed in this pull request? When comparing two UDT values in interpreted mode, treat each value as an instance of the UDT's underlying type. ### Why are the changes needed? Consider the following code: ``` import org.apache.spark.ml.linalg.{DenseVector, Vector} val df = Seq.tabulate(30) { x => (x, x + 1, x + 2, new DenseVector(Array((x/100.0).toDouble, ((x + 1)/100.0).toDouble, ((x + 3)/100.0).toDouble))) }.toDF("id", "c1", "c2", "c3") df.createOrReplaceTempView("df") // this works sql("select * from df order by c3").collect sql("set spark.sql.codegen.wholeStage=false") sql("set spark.sql.codegen.factoryMode=NO_CODEGEN") // this gets an error sql("select * from df order by c3").collect ``` The first collect action works. However, the second collect action, which runs in interpreted mode, gets the following exception: ``` org.apache.spark.SparkIllegalArgumentException: Type UninitializedPhysicalType does not support ordered operations. at org.apache.spark.sql.errors.QueryExecutionErrors$.orderedOperationUnsupportedByDataTypeError(QueryExecutionErrors.scala:348) at org.apache.spark.sql.catalyst.types.UninitializedPhysicalType$.ordering(PhysicalDataType.scala:332) at org.apache.spark.sql.catalyst.types.UninitializedPhysicalType$.ordering(PhysicalDataType.scala:329) at org.apache.spark.sql.catalyst.expressions.InterpretedOrdering.compare(ordering.scala:60) at org.apache.spark.sql.catalyst.expressions.InterpretedOrdering.compare(ordering.scala:39) at org.apache.spark.sql.execution.UnsafeExternalRowSorter$RowComparator.compare(UnsafeExternalRowSorter.java:254) ``` The code generator creates code that compares UDTs based on their underlying type. See [here](https://github.com/apache/spark/blob/c045a425bf0c472f164e3ef75a8a2c68d72d61d3/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala#L721). On the other hand, the interpreted mode code tries to compare the values as UDTs, not as their underlying types. This PR brings interpreted mode code in line with the generated code. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? New test. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #44361 from bersprockets/udt_order_issue. Authored-by: Bruce Robbins <bersprock...@gmail.com> Signed-off-by: Dongjoon Hyun <dh...@apple.com> --- .../spark/sql/catalyst/expressions/ordering.scala | 8 +++++++- .../org/apache/spark/sql/UserDefinedTypeSuite.scala | 19 ++++++++++++++++++- 2 files changed, 25 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ordering.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ordering.scala index 41c68d439a28..47de7a26affc 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ordering.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ordering.scala @@ -37,7 +37,13 @@ class BaseOrdering extends Ordering[InternalRow] { * An interpreted row ordering comparator. */ class InterpretedOrdering(ordering: Seq[SortOrder]) extends BaseOrdering { - private lazy val physicalDataTypes = ordering.map(order => PhysicalDataType(order.dataType)) + private lazy val physicalDataTypes = ordering.map { order => + val dt = order.dataType match { + case udt: UserDefinedType[_] => udt.sqlType + case _ => order.dataType + } + PhysicalDataType(dt) + } def this(ordering: Seq[SortOrder], inputSchema: Seq[Attribute]) = this(bindReferences(ordering, inputSchema)) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/UserDefinedTypeSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/UserDefinedTypeSuite.scala index 65aa5ae6a055..24175ea8ed94 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/UserDefinedTypeSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/UserDefinedTypeSuite.scala @@ -22,9 +22,10 @@ import java.util.Arrays import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.CatalystTypeConverters -import org.apache.spark.sql.catalyst.expressions.{Cast, ExpressionEvalHelper, Literal} +import org.apache.spark.sql.catalyst.expressions.{Cast, CodegenObjectFactoryMode, ExpressionEvalHelper, Literal} import org.apache.spark.sql.execution.datasources.parquet.ParquetTest import org.apache.spark.sql.functions._ +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.sql.types._ @@ -282,4 +283,20 @@ class UserDefinedTypeSuite extends QueryTest with SharedSparkSession with Parque java.util.Arrays.equals(unwrappedFeaturesArrays(0), Array(0.1, 1.0)) java.util.Arrays.equals(unwrappedFeaturesArrays(1), Array(0.2, 2.0)) } + + test("SPARK-46289: UDT ordering") { + val settings = Seq( + ("true", CodegenObjectFactoryMode.CODEGEN_ONLY.toString), + ("false", CodegenObjectFactoryMode.NO_CODEGEN.toString)) + withTempView("v1") { + pointsRDD.createOrReplaceTempView("v1") + for ((wsSetting, cgSetting) <- settings) { + withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> wsSetting, + SQLConf.CODEGEN_FACTORY_MODE.key -> cgSetting) { + val df = sql("select label from v1 order by features") + checkAnswer(df, Row(1.0) :: Row(0.0) :: Nil) + } + } + } + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org