This is an automated email from the ASF dual-hosted git repository.

ueshin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 91a02e5d9701 [SPARK-46040][SQL][PYTHON] Update UDTF API for 'analyze' 
partitioning/ordering columns to support general expressions
91a02e5d9701 is described below

commit 91a02e5d97011f5f9b620c07b1c2f7d85291448b
Author: Daniel Tenedorio <daniel.tenedo...@databricks.com>
AuthorDate: Mon Dec 4 16:51:56 2023 -0800

    [SPARK-46040][SQL][PYTHON] Update UDTF API for 'analyze' 
partitioning/ordering columns to support general expressions
    
    ### What changes were proposed in this pull request?
    
    This PR updates the Python user-defined table function (UDTF) API for the 
`analyze` method to support general expressions for the `partitionBy` and 
`orderBy` fields of the `AnalyzeResult` class.
    
    For example, the following UDTF specifies to partition by `partition_col / 
10` so that all rows with values of this column between 0-9 arrive in the same 
partition, then all rows with values between 10-19 in the next partition, and 
so on.
    
    ```
    udtf
    class TestUDTF:
        def __init__(self):
            self._partition_col = None
            self._count = 0
            self._sum = 0
            self._last = None
    
        staticmethod
        def analyze(*args, **kwargs):
            return AnalyzeResult(
                schema=StructType()
                .add("partition_col", IntegerType())
                .add("count", IntegerType())
                .add("total", IntegerType())
                .add("last", IntegerType()),
                partitionBy=[PartitioningColumn("partition_col / 10")],
                orderBy=[
                    OrderingColumn(name="input", ascending=True, 
overrideNullsFirst=False)
                ],
            )
    
        def eval(self, row: Row):
            self._partition_col = row["partition_col"]
            self._count += 1
            self._last = row["input"]
            if row["input"] is not None:
                self._sum += row["input"]
    
        def terminate(self):
            yield self._partition_col, self._count, self._sum, self._last
    ```
    
    ### Why are the changes needed?
    
    This lets the UDTF partition by simple references to the columns of the 
input table just like before, but also gives the option to partition by general 
expressions based on those columns (just like the explicit `PARTITION BY` and 
`ORDER BY` clauses in the UDTF call in SQL).
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes, see above.
    
    ### How was this patch tested?
    
    This PR includes test coverage.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #43946 from dtenedor/udtf-order-partition-by-exprs.
    
    Authored-by: Daniel Tenedorio <daniel.tenedo...@databricks.com>
    Signed-off-by: Takuya UESHIN <ues...@databricks.com>
---
 .../src/main/resources/error/error-classes.json    |  6 +++
 .../sql/connect/planner/SparkConnectPlanner.scala  |  4 +-
 docs/sql-error-conditions.md                       |  6 +++
 python/docs/source/user_guide/sql/python_udtf.rst  |  2 +-
 python/pyspark/sql/udtf.py                         | 21 +++++----
 .../spark/sql/errors/QueryCompilationErrors.scala  |  7 +++
 .../org/apache/spark/sql/UDTFRegistration.scala    |  4 +-
 .../python/UserDefinedPythonFunction.scala         | 53 ++++++++++++---------
 .../sql-tests/analyzer-results/udtf/udtf.sql.out   | 47 +++++++++++++++++++
 .../test/resources/sql-tests/inputs/udtf/udtf.sql  |  3 ++
 .../resources/sql-tests/results/udtf/udtf.sql.out  | 54 ++++++++++++++++++++++
 .../apache/spark/sql/IntegratedUDFTestUtils.scala  | 39 +++++++++++++---
 12 files changed, 206 insertions(+), 40 deletions(-)

diff --git a/common/utils/src/main/resources/error/error-classes.json 
b/common/utils/src/main/resources/error/error-classes.json
index 9e0019b34728..6795ebcb0bd0 100644
--- a/common/utils/src/main/resources/error/error-classes.json
+++ b/common/utils/src/main/resources/error/error-classes.json
@@ -3110,6 +3110,12 @@
     ],
     "sqlState" : "42802"
   },
+  "UDTF_INVALID_ALIAS_IN_REQUESTED_ORDERING_STRING_FROM_ANALYZE_METHOD" : {
+    "message" : [
+      "Failed to evaluate the user-defined table function because its 
'analyze' method returned a requested OrderingColumn whose column name 
expression included an unnecessary alias <aliasName>; please remove this alias 
and then try the query again."
+    ],
+    "sqlState" : "42802"
+  },
   "UNABLE_TO_ACQUIRE_MEMORY" : {
     "message" : [
       "Unable to acquire <requestedBytes> bytes of memory, got 
<receivedBytes>."
diff --git 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
index b64fecafa311..dc1730c78267 100644
--- 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
+++ 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
@@ -951,7 +951,9 @@ class SparkConnectPlanner(
     fun.getFunctionCase match {
       case proto.CommonInlineUserDefinedTableFunction.FunctionCase.PYTHON_UDTF 
=>
         val function = createPythonUserDefinedTableFunction(fun)
-        
function.builder(fun.getArgumentsList.asScala.map(transformExpression).toSeq)
+        function.builder(
+          fun.getArgumentsList.asScala.map(transformExpression).toSeq,
+          session.sessionState.sqlParser)
       case _ =>
         throw InvalidPlanInput(
           s"Function with ID: ${fun.getFunctionCase.getNumber} is not 
supported")
diff --git a/docs/sql-error-conditions.md b/docs/sql-error-conditions.md
index 71abf10da328..1943f8359572 100644
--- a/docs/sql-error-conditions.md
+++ b/docs/sql-error-conditions.md
@@ -1994,6 +1994,12 @@ The number of aliases supplied in the AS clause does not 
match the number of col
 Expected `<aliasesSize>` aliases, but got `<aliasesNames>`.
 Please ensure that the number of aliases provided matches the number of 
columns output by the UDTF.
 
+### UDTF_INVALID_ALIAS_IN_REQUESTED_ORDERING_STRING_FROM_ANALYZE_METHOD
+
+[SQLSTATE: 
42802](sql-error-conditions-sqlstates.html#class-42-syntax-error-or-access-rule-violation)
+
+Failed to evaluate the user-defined table function because its 'analyze' 
method returned a requested OrderingColumn whose column name expression 
included an unnecessary alias `<aliasName>`; please remove this alias and then 
try the query again.
+
 ### UNABLE_TO_ACQUIRE_MEMORY
 
 [SQLSTATE: 
53200](sql-error-conditions-sqlstates.html#class-53-insufficient-resources)
diff --git a/python/docs/source/user_guide/sql/python_udtf.rst 
b/python/docs/source/user_guide/sql/python_udtf.rst
index 3e3c76344383..2a5c12b75c62 100644
--- a/python/docs/source/user_guide/sql/python_udtf.rst
+++ b/python/docs/source/user_guide/sql/python_udtf.rst
@@ -92,7 +92,7 @@ To implement a Python UDTF, you first need to define a class 
implementing the me
             that all rows of the input table are consumed by the `eval` method 
from exactly one
             instance of the UDTF class. On the other hand, if the 
`partitionBy` list is non-empty,
             the query planner will arrange a repartitioning such that all rows 
with each unique
-            combination of values of the partitioning columns are consumed by 
a separate unique
+            combination of values of the partitioning expressions are consumed 
by a separate unique
             instance of the UDTF class. If `orderBy` is non-empty, this 
specifies the requested
             ordering of rows within each partition.
 
diff --git a/python/pyspark/sql/udtf.py b/python/pyspark/sql/udtf.py
index 7d97a6a565c9..fe3b0dc54314 100644
--- a/python/pyspark/sql/udtf.py
+++ b/python/pyspark/sql/udtf.py
@@ -71,12 +71,14 @@ class AnalyzeArgument:
 @dataclass(frozen=True)
 class PartitioningColumn:
     """
-    Represents a UDTF column for purposes of returning metadata from the 
'analyze' method.
+    Represents an expression that the UDTF is specifying for Catalyst to 
partition the input table
+    by. This can be either the name of a single column from the input table 
(such as "columnA"), or
+    a SQL expression based on the column names of the input table (such as 
"columnA + columnB").
 
     Parameters
     ----------
     name : str
-        The name of the partitioning column.
+        The contents of the partitioning column name or expression represented 
as a SQL string.
     """
 
     name: str
@@ -85,19 +87,20 @@ class PartitioningColumn:
 @dataclass(frozen=True)
 class OrderingColumn:
     """
-    Represents a single ordering column name for purposes of returning 
metadata from the 'analyze'
-    method.
+    Represents an expression that the UDTF is specifying for Catalyst to order 
the input partition
+    by. This can be either the name of a single column from the input table 
(such as "columnA"),
+    or a SQL expression based on the column names of the input table (such as 
"columnA + columnB").
 
     Parameters
     ----------
     name : str
-        The name of the partitioning column.
+        The contents of the ordering column name or expression represented as 
a SQL string.
     ascending : bool, default True
-        If this column is in an ascending order or not.
+        This is if this expression specifies an ascending sorting order.
     overrideNullsFirst : str, optional
         If this is None, use the default behavior to sort NULL values first 
when sorting in
         ascending order, or last when sorting in descending order. Otherwise, 
if this is
-        True or False, override the default behavior accordingly.
+        True or False, we override the default behavior accordingly.
     """
 
     name: str
@@ -122,12 +125,12 @@ class AnalyzeResult:
         argument to one collection for consumption by exactly one instance of 
the correpsonding
         UDTF class.
     partitionBy : sequence of :class:`PartitioningColumn`
-        If non-empty, this is a sequence of columns that the UDTF is 
specifying for Catalyst to
+        If non-empty, this is a sequence of expressions that the UDTF is 
specifying for Catalyst to
         partition the input TABLE argument by. In this case, calls to the UDTF 
may not include any
         explicit PARTITION BY clause, in which case Catalyst will return an 
error. This option is
         mutually exclusive with 'withSinglePartition'.
     orderBy: sequence of :class:`OrderingColumn`
-        If non-empty, this is a sequence of columns that the UDTF is 
specifying for Catalyst to
+        If non-empty, this is a sequence of expressions that the UDTF is 
specifying for Catalyst to
         sort the input TABLE argument by. Note that the 'partitionBy' list 
must also be non-empty
         in this case.
     """
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
index 0c0b940a5880..bbc51e5cbd67 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
@@ -658,6 +658,13 @@ private[sql] object QueryCompilationErrors extends 
QueryErrorsBase with Compilat
         "aliasesNames" -> aliasesNames))
   }
 
+  def invalidSortOrderInUDTFOrderingColumnFromAnalyzeMethodHasAlias(
+      aliasName: String): Throwable = {
+    new AnalysisException(
+      errorClass = 
"UDTF_INVALID_ALIAS_IN_REQUESTED_ORDERING_STRING_FROM_ANALYZE_METHOD",
+      messageParameters = Map("aliasName" -> aliasName))
+  }
+
   def windowAggregateFunctionWithFilterNotSupportedError(): Throwable = {
     new AnalysisException(
       errorClass = "_LEGACY_ERROR_TEMP_1030",
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/UDTFRegistration.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/UDTFRegistration.scala
index 3330597cb763..b1666f247581 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/UDTFRegistration.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/UDTFRegistration.scala
@@ -44,6 +44,8 @@ class UDTFRegistration private[sql] (tableFunctionRegistry: 
TableFunctionRegistr
          | udfDeterministic: ${udtf.udfDeterministic}
       """.stripMargin)
 
-    tableFunctionRegistry.createOrReplaceTempFunction(name, udtf.builder, 
"python_udtf")
+    tableFunctionRegistry.createOrReplaceTempFunction(
+      name, udtf.builder(_, 
SparkSession.getActiveSession.get.sessionState.sqlParser),
+      source = "python_udtf")
   }
 }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/UserDefinedPythonFunction.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/UserDefinedPythonFunction.scala
index 202159907af8..6bb1992a064a 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/UserDefinedPythonFunction.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/UserDefinedPythonFunction.scala
@@ -25,8 +25,8 @@ import net.razorvine.pickle.Pickler
 
 import org.apache.spark.api.python.{PythonEvalType, PythonFunction, 
PythonWorkerUtils, SpecialLengths}
 import org.apache.spark.sql.{Column, DataFrame, Dataset, SparkSession}
-import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
-import org.apache.spark.sql.catalyst.expressions.{Ascending, Descending, 
Expression, FunctionTableSubqueryArgumentExpression, NamedArgumentExpression, 
NullsFirst, NullsLast, PythonUDAF, PythonUDF, PythonUDTF, 
PythonUDTFAnalyzeResult, SortOrder, UnresolvedPolymorphicPythonUDTF}
+import org.apache.spark.sql.catalyst.expressions.{Alias, Ascending, 
Descending, Expression, FunctionTableSubqueryArgumentExpression, 
NamedArgumentExpression, NullsFirst, NullsLast, PythonUDAF, PythonUDF, 
PythonUDTF, PythonUDTFAnalyzeResult, SortOrder, UnresolvedPolymorphicPythonUDTF}
+import org.apache.spark.sql.catalyst.parser.ParserInterface
 import org.apache.spark.sql.catalyst.plans.logical.{Generate, LogicalPlan, 
NamedParametersSupport, OneRowRelation}
 import org.apache.spark.sql.errors.QueryCompilationErrors
 import org.apache.spark.sql.types.{DataType, StructType}
@@ -106,7 +106,7 @@ case class UserDefinedPythonTableFunction(
     this(name, func, None, pythonEvalType, udfDeterministic)
   }
 
-  def builder(exprs: Seq[Expression]): LogicalPlan = {
+  def builder(exprs: Seq[Expression], parser: => ParserInterface): LogicalPlan 
= {
     /*
      * Check if the named arguments:
      * - don't have duplicated names
@@ -133,7 +133,8 @@ case class UserDefinedPythonTableFunction(
           case _ => false
         }
         val runAnalyzeInPython = (func: PythonFunction, exprs: 
Seq[Expression]) => {
-          val runner = new UserDefinedPythonTableFunctionAnalyzeRunner(name, 
func, exprs, tableArgs)
+          val runner =
+            new UserDefinedPythonTableFunctionAnalyzeRunner(name, func, exprs, 
tableArgs, parser)
           runner.runInPython()
         }
         UnresolvedPolymorphicPythonUDTF(
@@ -156,7 +157,7 @@ case class UserDefinedPythonTableFunction(
 
   /** Returns a [[DataFrame]] that will evaluate to calling this UDTF with the 
given input. */
   def apply(session: SparkSession, exprs: Column*): DataFrame = {
-    val udtf = builder(exprs.map(_.expr))
+    val udtf = builder(exprs.map(_.expr), session.sessionState.sqlParser)
     Dataset.ofRows(session, udtf)
   }
 }
@@ -187,7 +188,9 @@ class UserDefinedPythonTableFunctionAnalyzeRunner(
     name: String,
     func: PythonFunction,
     exprs: Seq[Expression],
-    tableArgs: Seq[Boolean]) extends 
PythonPlannerRunner[PythonUDTFAnalyzeResult](func) {
+    tableArgs: Seq[Boolean],
+    parser: ParserInterface)
+  extends PythonPlannerRunner[PythonUDTFAnalyzeResult](func) {
 
   override val workerModule = "pyspark.sql.worker.analyze_udtf"
 
@@ -242,33 +245,41 @@ class UserDefinedPythonTableFunctionAnalyzeRunner(
 
     // Receive whether the "with single partition" property is requested.
     val withSinglePartition = dataIn.readInt() == 1
-    // Receive the list of requested partitioning columns, if any.
-    val partitionByColumns = ArrayBuffer.empty[Expression]
-    val numPartitionByColumns = dataIn.readInt()
-    for (_ <- 0 until numPartitionByColumns) {
-      val columnName = PythonWorkerUtils.readUTF(dataIn)
-      partitionByColumns.append(UnresolvedAttribute(columnName))
+    // Receive the list of requested partitioning expressions, if any.
+    val partitionByExpressions = ArrayBuffer.empty[Expression]
+    val numPartitionByExpressions = dataIn.readInt()
+    for (_ <- 0 until numPartitionByExpressions) {
+      val expressionSql: String = PythonWorkerUtils.readUTF(dataIn)
+      val parsed: Expression = parser.parseExpression(expressionSql)
+      partitionByExpressions.append(parsed)
     }
-    // Receive the list of requested ordering columns, if any.
+    // Receive the list of requested ordering expressions, if any.
     val orderBy = ArrayBuffer.empty[SortOrder]
     val numOrderByItems = dataIn.readInt()
     for (_ <- 0 until numOrderByItems) {
-      val columnName = PythonWorkerUtils.readUTF(dataIn)
+      val expressionSql: String = PythonWorkerUtils.readUTF(dataIn)
+      val parsed: Expression = parser.parseExpression(expressionSql)
+      // Perform a basic check that the requested ordering column string does 
not include an alias,
+      // since it is possible to accidentally try to specify a sort order like 
ASC or DESC or NULLS
+      // FIRST/LAST in this manner leading to confusing results.
+      parsed match {
+        case a: Alias =>
+          throw QueryCompilationErrors
+            
.invalidSortOrderInUDTFOrderingColumnFromAnalyzeMethodHasAlias(aliasName = 
a.name)
+        case _ =>
+      }
       val direction = if (dataIn.readInt() == 1) Ascending else Descending
       val overrideNullsFirst = dataIn.readInt()
       overrideNullsFirst match {
-        case 0 =>
-          orderBy.append(SortOrder(UnresolvedAttribute(columnName), direction))
-        case 1 => orderBy.append(
-          SortOrder(UnresolvedAttribute(columnName), direction, NullsFirst, 
Seq.empty))
-        case 2 => orderBy.append(
-          SortOrder(UnresolvedAttribute(columnName), direction, NullsLast, 
Seq.empty))
+        case 0 => orderBy.append(SortOrder(parsed, direction))
+        case 1 => orderBy.append(SortOrder(parsed, direction, NullsFirst, 
Seq.empty))
+        case 2 => orderBy.append(SortOrder(parsed, direction, NullsLast, 
Seq.empty))
       }
     }
     PythonUDTFAnalyzeResult(
       schema = schema,
       withSinglePartition = withSinglePartition,
-      partitionByExpressions = partitionByColumns.toSeq,
+      partitionByExpressions = partitionByExpressions.toSeq,
       orderByExpressions = orderBy.toSeq,
       pickledAnalyzeResult = pickledAnalyzeResult)
   }
diff --git 
a/sql/core/src/test/resources/sql-tests/analyzer-results/udtf/udtf.sql.out 
b/sql/core/src/test/resources/sql-tests/analyzer-results/udtf/udtf.sql.out
index 3a9dfc26bcc9..efd6f563e7a1 100644
--- a/sql/core/src/test/resources/sql-tests/analyzer-results/udtf/udtf.sql.out
+++ b/sql/core/src/test/resources/sql-tests/analyzer-results/udtf/udtf.sql.out
@@ -278,6 +278,32 @@ org.apache.spark.sql.AnalysisException
 }
 
 
+-- !query
+SELECT * FROM UDTFPartitionByOrderByComplexExpr(TABLE(t2))
+-- !query analysis
+[Analyzer test output redacted due to nondeterminism]
+
+
+-- !query
+SELECT * FROM UDTFInvalidOrderByAscKeyword(TABLE(t2))
+-- !query analysis
+org.apache.spark.sql.AnalysisException
+{
+  "errorClass" : 
"UDTF_INVALID_ALIAS_IN_REQUESTED_ORDERING_STRING_FROM_ANALYZE_METHOD",
+  "sqlState" : "42802",
+  "messageParameters" : {
+    "aliasName" : "ASC"
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 15,
+    "stopIndex" : 53,
+    "fragment" : "UDTFInvalidOrderByAscKeyword(TABLE(t2))"
+  } ]
+}
+
+
 -- !query
 SELECT * FROM UDTFInvalidPartitionByAndWithSinglePartition(TABLE(t2))
 -- !query analysis
@@ -758,6 +784,27 @@ org.apache.spark.sql.AnalysisException
 }
 
 
+-- !query
+SELECT * FROM UDTFInvalidPartitionByOrderByParseError(TABLE(t2))
+-- !query analysis
+org.apache.spark.sql.catalyst.ExtendedAnalysisException
+{
+  "errorClass" : "UNRESOLVED_COLUMN.WITH_SUGGESTION",
+  "sqlState" : "42703",
+  "messageParameters" : {
+    "objectName" : "`unparsable`",
+    "proposal" : "`input`, `partition_col`"
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 1,
+    "stopIndex" : 10,
+    "fragment" : "unparsable"
+  } ]
+}
+
+
 -- !query
 DROP VIEW t1
 -- !query analysis
diff --git a/sql/core/src/test/resources/sql-tests/inputs/udtf/udtf.sql 
b/sql/core/src/test/resources/sql-tests/inputs/udtf/udtf.sql
index 68885923e9f7..304ee1168e48 100644
--- a/sql/core/src/test/resources/sql-tests/inputs/udtf/udtf.sql
+++ b/sql/core/src/test/resources/sql-tests/inputs/udtf/udtf.sql
@@ -75,6 +75,8 @@ SELECT * FROM
     VALUES (0), (1) AS t(col)
     JOIN LATERAL
     UDTFPartitionByOrderBy(TABLE(t2) PARTITION BY partition_col);
+SELECT * FROM UDTFPartitionByOrderByComplexExpr(TABLE(t2));
+SELECT * FROM UDTFInvalidOrderByAscKeyword(TABLE(t2));
 -- As a reminder, UDTFInvalidPartitionByAndWithSinglePartition returns this 
analyze result:
 --     AnalyzeResult(
 --         schema=StructType()
@@ -133,6 +135,7 @@ SELECT * FROM UDTFWithSinglePartition(1, 2, 3);
 SELECT * FROM UDTFWithSinglePartition(1, invalid_arg_name => 2);
 SELECT * FROM UDTFWithSinglePartition(1, initial_count => 2);
 SELECT * FROM UDTFWithSinglePartition(initial_count => 1, initial_count => 2);
+SELECT * FROM UDTFInvalidPartitionByOrderByParseError(TABLE(t2));
 
 -- cleanup
 DROP VIEW t1;
diff --git a/sql/core/src/test/resources/sql-tests/results/udtf/udtf.sql.out 
b/sql/core/src/test/resources/sql-tests/results/udtf/udtf.sql.out
index 1ed7726dde8e..6dc2a3e860e7 100644
--- a/sql/core/src/test/resources/sql-tests/results/udtf/udtf.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/udtf/udtf.sql.out
@@ -335,6 +335,37 @@ org.apache.spark.sql.AnalysisException
 }
 
 
+-- !query
+SELECT * FROM UDTFPartitionByOrderByComplexExpr(TABLE(t2))
+-- !query schema
+struct<partition_col:int,count:int,total:int,last:int>
+-- !query output
+0      1       1       1
+1      2       5       3
+
+
+-- !query
+SELECT * FROM UDTFInvalidOrderByAscKeyword(TABLE(t2))
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.AnalysisException
+{
+  "errorClass" : 
"UDTF_INVALID_ALIAS_IN_REQUESTED_ORDERING_STRING_FROM_ANALYZE_METHOD",
+  "sqlState" : "42802",
+  "messageParameters" : {
+    "aliasName" : "ASC"
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 15,
+    "stopIndex" : 53,
+    "fragment" : "UDTFInvalidOrderByAscKeyword(TABLE(t2))"
+  } ]
+}
+
+
 -- !query
 SELECT * FROM UDTFInvalidPartitionByAndWithSinglePartition(TABLE(t2))
 -- !query schema
@@ -895,6 +926,29 @@ org.apache.spark.sql.AnalysisException
 }
 
 
+-- !query
+SELECT * FROM UDTFInvalidPartitionByOrderByParseError(TABLE(t2))
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.catalyst.ExtendedAnalysisException
+{
+  "errorClass" : "UNRESOLVED_COLUMN.WITH_SUGGESTION",
+  "sqlState" : "42703",
+  "messageParameters" : {
+    "objectName" : "`unparsable`",
+    "proposal" : "`input`, `partition_col`"
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 1,
+    "stopIndex" : 10,
+    "fragment" : "unparsable"
+  } ]
+}
+
+
 -- !query
 DROP VIEW t1
 -- !query schema
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/IntegratedUDFTestUtils.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/IntegratedUDFTestUtils.scala
index 99045ffd8637..1e691f71c511 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/IntegratedUDFTestUtils.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/IntegratedUDFTestUtils.scala
@@ -487,7 +487,7 @@ object IntegratedUDFTestUtils extends SQLHelper {
   object UDTFCountSumLast extends TestUDTF {
     val pythonScript: String =
       s"""
-         |from pyspark.sql.functions import AnalyzeResult, OrderingColumn, 
PartitioningColumn
+         |from pyspark.sql.functions import AnalyzeResult
          |from pyspark.sql.types import IntegerType, Row, StructType
          |class $name:
          |    def __init__(self):
@@ -541,7 +541,7 @@ object IntegratedUDFTestUtils extends SQLHelper {
       s"""
         |import json
         |from dataclasses import dataclass
-        |from pyspark.sql.functions import AnalyzeResult, OrderingColumn, 
PartitioningColumn
+        |from pyspark.sql.functions import AnalyzeResult, OrderingColumn
         |from pyspark.sql.types import IntegerType, Row, StructType
         |
         |@dataclass
@@ -583,7 +583,9 @@ object IntegratedUDFTestUtils extends SQLHelper {
         |""".stripMargin
   }
 
-  object UDTFPartitionByOrderBy extends TestUDTF {
+  abstract class TestPythonUDTFPartitionByOrderByBase(
+      partitionBy: String,
+      orderBy: String) extends TestUDTF {
     val pythonScript: String =
       s"""
         |from pyspark.sql.functions import AnalyzeResult, OrderingColumn, 
PartitioningColumn
@@ -604,10 +606,10 @@ object IntegratedUDFTestUtils extends SQLHelper {
         |                .add("total", IntegerType())
         |                .add("last", IntegerType()),
         |            partitionBy=[
-        |                PartitioningColumn("partition_col")
+        |                PartitioningColumn("$partitionBy")
         |            ],
         |            orderBy=[
-        |                OrderingColumn("input")
+        |                OrderingColumn("$orderBy")
         |            ])
         |
         |    def eval(self, row: Row):
@@ -621,10 +623,30 @@ object IntegratedUDFTestUtils extends SQLHelper {
         |""".stripMargin
   }
 
+  object UDTFPartitionByOrderBy
+    extends TestPythonUDTFPartitionByOrderByBase(
+      partitionBy = "partition_col",
+      orderBy = "input")
+
+  object UDTFPartitionByOrderByComplexExpr
+    extends TestPythonUDTFPartitionByOrderByBase(
+      partitionBy = "partition_col + 1",
+      orderBy = "RANDOM(42)")
+
+  object UDTFInvalidPartitionByOrderByParseError
+    extends TestPythonUDTFPartitionByOrderByBase(
+      partitionBy = "unparsable",
+      orderBy = "input")
+
+  object UDTFInvalidOrderByAscKeyword
+    extends TestPythonUDTFPartitionByOrderByBase(
+      partitionBy = "partition_col",
+      orderBy = "partition_col ASC")
+
   object UDTFInvalidPartitionByAndWithSinglePartition extends TestUDTF {
     val pythonScript: String =
       s"""
-         |from pyspark.sql.functions import AnalyzeResult, OrderingColumn, 
PartitioningColumn
+         |from pyspark.sql.functions import AnalyzeResult, PartitioningColumn
          |from pyspark.sql.types import IntegerType, Row, StructType
          |class $name:
          |    def __init__(self):
@@ -651,7 +673,7 @@ object IntegratedUDFTestUtils extends SQLHelper {
   object UDTFInvalidOrderByWithoutPartitionBy extends TestUDTF {
     val pythonScript: String =
       s"""
-         |from pyspark.sql.functions import AnalyzeResult, OrderingColumn, 
PartitioningColumn
+         |from pyspark.sql.functions import AnalyzeResult, OrderingColumn
          |from pyspark.sql.types import IntegerType, Row, StructType
          |class $name:
          |    def __init__(self):
@@ -1127,10 +1149,13 @@ object IntegratedUDFTestUtils extends SQLHelper {
     UDTFLastString,
     UDTFWithSinglePartition,
     UDTFPartitionByOrderBy,
+    UDTFInvalidOrderByAscKeyword,
     UDTFInvalidPartitionByAndWithSinglePartition,
+    UDTFInvalidPartitionByOrderByParseError,
     UDTFInvalidOrderByWithoutPartitionBy,
     UDTFForwardStateFromAnalyze,
     UDTFForwardStateFromAnalyzeWithKwargs,
+    UDTFPartitionByOrderByComplexExpr,
     InvalidAnalyzeMethodReturnsNonStructTypeSchema,
     InvalidAnalyzeMethodWithSinglePartitionNoInputTable,
     InvalidAnalyzeMethodWithPartitionByNoInputTable,


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to