This is an automated email from the ASF dual-hosted git repository. ueshin pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 91a02e5d9701 [SPARK-46040][SQL][PYTHON] Update UDTF API for 'analyze' partitioning/ordering columns to support general expressions 91a02e5d9701 is described below commit 91a02e5d97011f5f9b620c07b1c2f7d85291448b Author: Daniel Tenedorio <daniel.tenedo...@databricks.com> AuthorDate: Mon Dec 4 16:51:56 2023 -0800 [SPARK-46040][SQL][PYTHON] Update UDTF API for 'analyze' partitioning/ordering columns to support general expressions ### What changes were proposed in this pull request? This PR updates the Python user-defined table function (UDTF) API for the `analyze` method to support general expressions for the `partitionBy` and `orderBy` fields of the `AnalyzeResult` class. For example, the following UDTF specifies to partition by `partition_col / 10` so that all rows with values of this column between 0-9 arrive in the same partition, then all rows with values between 10-19 in the next partition, and so on. ``` udtf class TestUDTF: def __init__(self): self._partition_col = None self._count = 0 self._sum = 0 self._last = None staticmethod def analyze(*args, **kwargs): return AnalyzeResult( schema=StructType() .add("partition_col", IntegerType()) .add("count", IntegerType()) .add("total", IntegerType()) .add("last", IntegerType()), partitionBy=[PartitioningColumn("partition_col / 10")], orderBy=[ OrderingColumn(name="input", ascending=True, overrideNullsFirst=False) ], ) def eval(self, row: Row): self._partition_col = row["partition_col"] self._count += 1 self._last = row["input"] if row["input"] is not None: self._sum += row["input"] def terminate(self): yield self._partition_col, self._count, self._sum, self._last ``` ### Why are the changes needed? This lets the UDTF partition by simple references to the columns of the input table just like before, but also gives the option to partition by general expressions based on those columns (just like the explicit `PARTITION BY` and `ORDER BY` clauses in the UDTF call in SQL). ### Does this PR introduce _any_ user-facing change? Yes, see above. ### How was this patch tested? This PR includes test coverage. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #43946 from dtenedor/udtf-order-partition-by-exprs. Authored-by: Daniel Tenedorio <daniel.tenedo...@databricks.com> Signed-off-by: Takuya UESHIN <ues...@databricks.com> --- .../src/main/resources/error/error-classes.json | 6 +++ .../sql/connect/planner/SparkConnectPlanner.scala | 4 +- docs/sql-error-conditions.md | 6 +++ python/docs/source/user_guide/sql/python_udtf.rst | 2 +- python/pyspark/sql/udtf.py | 21 +++++---- .../spark/sql/errors/QueryCompilationErrors.scala | 7 +++ .../org/apache/spark/sql/UDTFRegistration.scala | 4 +- .../python/UserDefinedPythonFunction.scala | 53 ++++++++++++--------- .../sql-tests/analyzer-results/udtf/udtf.sql.out | 47 +++++++++++++++++++ .../test/resources/sql-tests/inputs/udtf/udtf.sql | 3 ++ .../resources/sql-tests/results/udtf/udtf.sql.out | 54 ++++++++++++++++++++++ .../apache/spark/sql/IntegratedUDFTestUtils.scala | 39 +++++++++++++--- 12 files changed, 206 insertions(+), 40 deletions(-) diff --git a/common/utils/src/main/resources/error/error-classes.json b/common/utils/src/main/resources/error/error-classes.json index 9e0019b34728..6795ebcb0bd0 100644 --- a/common/utils/src/main/resources/error/error-classes.json +++ b/common/utils/src/main/resources/error/error-classes.json @@ -3110,6 +3110,12 @@ ], "sqlState" : "42802" }, + "UDTF_INVALID_ALIAS_IN_REQUESTED_ORDERING_STRING_FROM_ANALYZE_METHOD" : { + "message" : [ + "Failed to evaluate the user-defined table function because its 'analyze' method returned a requested OrderingColumn whose column name expression included an unnecessary alias <aliasName>; please remove this alias and then try the query again." + ], + "sqlState" : "42802" + }, "UNABLE_TO_ACQUIRE_MEMORY" : { "message" : [ "Unable to acquire <requestedBytes> bytes of memory, got <receivedBytes>." diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala index b64fecafa311..dc1730c78267 100644 --- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala @@ -951,7 +951,9 @@ class SparkConnectPlanner( fun.getFunctionCase match { case proto.CommonInlineUserDefinedTableFunction.FunctionCase.PYTHON_UDTF => val function = createPythonUserDefinedTableFunction(fun) - function.builder(fun.getArgumentsList.asScala.map(transformExpression).toSeq) + function.builder( + fun.getArgumentsList.asScala.map(transformExpression).toSeq, + session.sessionState.sqlParser) case _ => throw InvalidPlanInput( s"Function with ID: ${fun.getFunctionCase.getNumber} is not supported") diff --git a/docs/sql-error-conditions.md b/docs/sql-error-conditions.md index 71abf10da328..1943f8359572 100644 --- a/docs/sql-error-conditions.md +++ b/docs/sql-error-conditions.md @@ -1994,6 +1994,12 @@ The number of aliases supplied in the AS clause does not match the number of col Expected `<aliasesSize>` aliases, but got `<aliasesNames>`. Please ensure that the number of aliases provided matches the number of columns output by the UDTF. +### UDTF_INVALID_ALIAS_IN_REQUESTED_ORDERING_STRING_FROM_ANALYZE_METHOD + +[SQLSTATE: 42802](sql-error-conditions-sqlstates.html#class-42-syntax-error-or-access-rule-violation) + +Failed to evaluate the user-defined table function because its 'analyze' method returned a requested OrderingColumn whose column name expression included an unnecessary alias `<aliasName>`; please remove this alias and then try the query again. + ### UNABLE_TO_ACQUIRE_MEMORY [SQLSTATE: 53200](sql-error-conditions-sqlstates.html#class-53-insufficient-resources) diff --git a/python/docs/source/user_guide/sql/python_udtf.rst b/python/docs/source/user_guide/sql/python_udtf.rst index 3e3c76344383..2a5c12b75c62 100644 --- a/python/docs/source/user_guide/sql/python_udtf.rst +++ b/python/docs/source/user_guide/sql/python_udtf.rst @@ -92,7 +92,7 @@ To implement a Python UDTF, you first need to define a class implementing the me that all rows of the input table are consumed by the `eval` method from exactly one instance of the UDTF class. On the other hand, if the `partitionBy` list is non-empty, the query planner will arrange a repartitioning such that all rows with each unique - combination of values of the partitioning columns are consumed by a separate unique + combination of values of the partitioning expressions are consumed by a separate unique instance of the UDTF class. If `orderBy` is non-empty, this specifies the requested ordering of rows within each partition. diff --git a/python/pyspark/sql/udtf.py b/python/pyspark/sql/udtf.py index 7d97a6a565c9..fe3b0dc54314 100644 --- a/python/pyspark/sql/udtf.py +++ b/python/pyspark/sql/udtf.py @@ -71,12 +71,14 @@ class AnalyzeArgument: @dataclass(frozen=True) class PartitioningColumn: """ - Represents a UDTF column for purposes of returning metadata from the 'analyze' method. + Represents an expression that the UDTF is specifying for Catalyst to partition the input table + by. This can be either the name of a single column from the input table (such as "columnA"), or + a SQL expression based on the column names of the input table (such as "columnA + columnB"). Parameters ---------- name : str - The name of the partitioning column. + The contents of the partitioning column name or expression represented as a SQL string. """ name: str @@ -85,19 +87,20 @@ class PartitioningColumn: @dataclass(frozen=True) class OrderingColumn: """ - Represents a single ordering column name for purposes of returning metadata from the 'analyze' - method. + Represents an expression that the UDTF is specifying for Catalyst to order the input partition + by. This can be either the name of a single column from the input table (such as "columnA"), + or a SQL expression based on the column names of the input table (such as "columnA + columnB"). Parameters ---------- name : str - The name of the partitioning column. + The contents of the ordering column name or expression represented as a SQL string. ascending : bool, default True - If this column is in an ascending order or not. + This is if this expression specifies an ascending sorting order. overrideNullsFirst : str, optional If this is None, use the default behavior to sort NULL values first when sorting in ascending order, or last when sorting in descending order. Otherwise, if this is - True or False, override the default behavior accordingly. + True or False, we override the default behavior accordingly. """ name: str @@ -122,12 +125,12 @@ class AnalyzeResult: argument to one collection for consumption by exactly one instance of the correpsonding UDTF class. partitionBy : sequence of :class:`PartitioningColumn` - If non-empty, this is a sequence of columns that the UDTF is specifying for Catalyst to + If non-empty, this is a sequence of expressions that the UDTF is specifying for Catalyst to partition the input TABLE argument by. In this case, calls to the UDTF may not include any explicit PARTITION BY clause, in which case Catalyst will return an error. This option is mutually exclusive with 'withSinglePartition'. orderBy: sequence of :class:`OrderingColumn` - If non-empty, this is a sequence of columns that the UDTF is specifying for Catalyst to + If non-empty, this is a sequence of expressions that the UDTF is specifying for Catalyst to sort the input TABLE argument by. Note that the 'partitionBy' list must also be non-empty in this case. """ diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala index 0c0b940a5880..bbc51e5cbd67 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala @@ -658,6 +658,13 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase with Compilat "aliasesNames" -> aliasesNames)) } + def invalidSortOrderInUDTFOrderingColumnFromAnalyzeMethodHasAlias( + aliasName: String): Throwable = { + new AnalysisException( + errorClass = "UDTF_INVALID_ALIAS_IN_REQUESTED_ORDERING_STRING_FROM_ANALYZE_METHOD", + messageParameters = Map("aliasName" -> aliasName)) + } + def windowAggregateFunctionWithFilterNotSupportedError(): Throwable = { new AnalysisException( errorClass = "_LEGACY_ERROR_TEMP_1030", diff --git a/sql/core/src/main/scala/org/apache/spark/sql/UDTFRegistration.scala b/sql/core/src/main/scala/org/apache/spark/sql/UDTFRegistration.scala index 3330597cb763..b1666f247581 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/UDTFRegistration.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/UDTFRegistration.scala @@ -44,6 +44,8 @@ class UDTFRegistration private[sql] (tableFunctionRegistry: TableFunctionRegistr | udfDeterministic: ${udtf.udfDeterministic} """.stripMargin) - tableFunctionRegistry.createOrReplaceTempFunction(name, udtf.builder, "python_udtf") + tableFunctionRegistry.createOrReplaceTempFunction( + name, udtf.builder(_, SparkSession.getActiveSession.get.sessionState.sqlParser), + source = "python_udtf") } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/UserDefinedPythonFunction.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/UserDefinedPythonFunction.scala index 202159907af8..6bb1992a064a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/UserDefinedPythonFunction.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/UserDefinedPythonFunction.scala @@ -25,8 +25,8 @@ import net.razorvine.pickle.Pickler import org.apache.spark.api.python.{PythonEvalType, PythonFunction, PythonWorkerUtils, SpecialLengths} import org.apache.spark.sql.{Column, DataFrame, Dataset, SparkSession} -import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute -import org.apache.spark.sql.catalyst.expressions.{Ascending, Descending, Expression, FunctionTableSubqueryArgumentExpression, NamedArgumentExpression, NullsFirst, NullsLast, PythonUDAF, PythonUDF, PythonUDTF, PythonUDTFAnalyzeResult, SortOrder, UnresolvedPolymorphicPythonUDTF} +import org.apache.spark.sql.catalyst.expressions.{Alias, Ascending, Descending, Expression, FunctionTableSubqueryArgumentExpression, NamedArgumentExpression, NullsFirst, NullsLast, PythonUDAF, PythonUDF, PythonUDTF, PythonUDTFAnalyzeResult, SortOrder, UnresolvedPolymorphicPythonUDTF} +import org.apache.spark.sql.catalyst.parser.ParserInterface import org.apache.spark.sql.catalyst.plans.logical.{Generate, LogicalPlan, NamedParametersSupport, OneRowRelation} import org.apache.spark.sql.errors.QueryCompilationErrors import org.apache.spark.sql.types.{DataType, StructType} @@ -106,7 +106,7 @@ case class UserDefinedPythonTableFunction( this(name, func, None, pythonEvalType, udfDeterministic) } - def builder(exprs: Seq[Expression]): LogicalPlan = { + def builder(exprs: Seq[Expression], parser: => ParserInterface): LogicalPlan = { /* * Check if the named arguments: * - don't have duplicated names @@ -133,7 +133,8 @@ case class UserDefinedPythonTableFunction( case _ => false } val runAnalyzeInPython = (func: PythonFunction, exprs: Seq[Expression]) => { - val runner = new UserDefinedPythonTableFunctionAnalyzeRunner(name, func, exprs, tableArgs) + val runner = + new UserDefinedPythonTableFunctionAnalyzeRunner(name, func, exprs, tableArgs, parser) runner.runInPython() } UnresolvedPolymorphicPythonUDTF( @@ -156,7 +157,7 @@ case class UserDefinedPythonTableFunction( /** Returns a [[DataFrame]] that will evaluate to calling this UDTF with the given input. */ def apply(session: SparkSession, exprs: Column*): DataFrame = { - val udtf = builder(exprs.map(_.expr)) + val udtf = builder(exprs.map(_.expr), session.sessionState.sqlParser) Dataset.ofRows(session, udtf) } } @@ -187,7 +188,9 @@ class UserDefinedPythonTableFunctionAnalyzeRunner( name: String, func: PythonFunction, exprs: Seq[Expression], - tableArgs: Seq[Boolean]) extends PythonPlannerRunner[PythonUDTFAnalyzeResult](func) { + tableArgs: Seq[Boolean], + parser: ParserInterface) + extends PythonPlannerRunner[PythonUDTFAnalyzeResult](func) { override val workerModule = "pyspark.sql.worker.analyze_udtf" @@ -242,33 +245,41 @@ class UserDefinedPythonTableFunctionAnalyzeRunner( // Receive whether the "with single partition" property is requested. val withSinglePartition = dataIn.readInt() == 1 - // Receive the list of requested partitioning columns, if any. - val partitionByColumns = ArrayBuffer.empty[Expression] - val numPartitionByColumns = dataIn.readInt() - for (_ <- 0 until numPartitionByColumns) { - val columnName = PythonWorkerUtils.readUTF(dataIn) - partitionByColumns.append(UnresolvedAttribute(columnName)) + // Receive the list of requested partitioning expressions, if any. + val partitionByExpressions = ArrayBuffer.empty[Expression] + val numPartitionByExpressions = dataIn.readInt() + for (_ <- 0 until numPartitionByExpressions) { + val expressionSql: String = PythonWorkerUtils.readUTF(dataIn) + val parsed: Expression = parser.parseExpression(expressionSql) + partitionByExpressions.append(parsed) } - // Receive the list of requested ordering columns, if any. + // Receive the list of requested ordering expressions, if any. val orderBy = ArrayBuffer.empty[SortOrder] val numOrderByItems = dataIn.readInt() for (_ <- 0 until numOrderByItems) { - val columnName = PythonWorkerUtils.readUTF(dataIn) + val expressionSql: String = PythonWorkerUtils.readUTF(dataIn) + val parsed: Expression = parser.parseExpression(expressionSql) + // Perform a basic check that the requested ordering column string does not include an alias, + // since it is possible to accidentally try to specify a sort order like ASC or DESC or NULLS + // FIRST/LAST in this manner leading to confusing results. + parsed match { + case a: Alias => + throw QueryCompilationErrors + .invalidSortOrderInUDTFOrderingColumnFromAnalyzeMethodHasAlias(aliasName = a.name) + case _ => + } val direction = if (dataIn.readInt() == 1) Ascending else Descending val overrideNullsFirst = dataIn.readInt() overrideNullsFirst match { - case 0 => - orderBy.append(SortOrder(UnresolvedAttribute(columnName), direction)) - case 1 => orderBy.append( - SortOrder(UnresolvedAttribute(columnName), direction, NullsFirst, Seq.empty)) - case 2 => orderBy.append( - SortOrder(UnresolvedAttribute(columnName), direction, NullsLast, Seq.empty)) + case 0 => orderBy.append(SortOrder(parsed, direction)) + case 1 => orderBy.append(SortOrder(parsed, direction, NullsFirst, Seq.empty)) + case 2 => orderBy.append(SortOrder(parsed, direction, NullsLast, Seq.empty)) } } PythonUDTFAnalyzeResult( schema = schema, withSinglePartition = withSinglePartition, - partitionByExpressions = partitionByColumns.toSeq, + partitionByExpressions = partitionByExpressions.toSeq, orderByExpressions = orderBy.toSeq, pickledAnalyzeResult = pickledAnalyzeResult) } diff --git a/sql/core/src/test/resources/sql-tests/analyzer-results/udtf/udtf.sql.out b/sql/core/src/test/resources/sql-tests/analyzer-results/udtf/udtf.sql.out index 3a9dfc26bcc9..efd6f563e7a1 100644 --- a/sql/core/src/test/resources/sql-tests/analyzer-results/udtf/udtf.sql.out +++ b/sql/core/src/test/resources/sql-tests/analyzer-results/udtf/udtf.sql.out @@ -278,6 +278,32 @@ org.apache.spark.sql.AnalysisException } +-- !query +SELECT * FROM UDTFPartitionByOrderByComplexExpr(TABLE(t2)) +-- !query analysis +[Analyzer test output redacted due to nondeterminism] + + +-- !query +SELECT * FROM UDTFInvalidOrderByAscKeyword(TABLE(t2)) +-- !query analysis +org.apache.spark.sql.AnalysisException +{ + "errorClass" : "UDTF_INVALID_ALIAS_IN_REQUESTED_ORDERING_STRING_FROM_ANALYZE_METHOD", + "sqlState" : "42802", + "messageParameters" : { + "aliasName" : "ASC" + }, + "queryContext" : [ { + "objectType" : "", + "objectName" : "", + "startIndex" : 15, + "stopIndex" : 53, + "fragment" : "UDTFInvalidOrderByAscKeyword(TABLE(t2))" + } ] +} + + -- !query SELECT * FROM UDTFInvalidPartitionByAndWithSinglePartition(TABLE(t2)) -- !query analysis @@ -758,6 +784,27 @@ org.apache.spark.sql.AnalysisException } +-- !query +SELECT * FROM UDTFInvalidPartitionByOrderByParseError(TABLE(t2)) +-- !query analysis +org.apache.spark.sql.catalyst.ExtendedAnalysisException +{ + "errorClass" : "UNRESOLVED_COLUMN.WITH_SUGGESTION", + "sqlState" : "42703", + "messageParameters" : { + "objectName" : "`unparsable`", + "proposal" : "`input`, `partition_col`" + }, + "queryContext" : [ { + "objectType" : "", + "objectName" : "", + "startIndex" : 1, + "stopIndex" : 10, + "fragment" : "unparsable" + } ] +} + + -- !query DROP VIEW t1 -- !query analysis diff --git a/sql/core/src/test/resources/sql-tests/inputs/udtf/udtf.sql b/sql/core/src/test/resources/sql-tests/inputs/udtf/udtf.sql index 68885923e9f7..304ee1168e48 100644 --- a/sql/core/src/test/resources/sql-tests/inputs/udtf/udtf.sql +++ b/sql/core/src/test/resources/sql-tests/inputs/udtf/udtf.sql @@ -75,6 +75,8 @@ SELECT * FROM VALUES (0), (1) AS t(col) JOIN LATERAL UDTFPartitionByOrderBy(TABLE(t2) PARTITION BY partition_col); +SELECT * FROM UDTFPartitionByOrderByComplexExpr(TABLE(t2)); +SELECT * FROM UDTFInvalidOrderByAscKeyword(TABLE(t2)); -- As a reminder, UDTFInvalidPartitionByAndWithSinglePartition returns this analyze result: -- AnalyzeResult( -- schema=StructType() @@ -133,6 +135,7 @@ SELECT * FROM UDTFWithSinglePartition(1, 2, 3); SELECT * FROM UDTFWithSinglePartition(1, invalid_arg_name => 2); SELECT * FROM UDTFWithSinglePartition(1, initial_count => 2); SELECT * FROM UDTFWithSinglePartition(initial_count => 1, initial_count => 2); +SELECT * FROM UDTFInvalidPartitionByOrderByParseError(TABLE(t2)); -- cleanup DROP VIEW t1; diff --git a/sql/core/src/test/resources/sql-tests/results/udtf/udtf.sql.out b/sql/core/src/test/resources/sql-tests/results/udtf/udtf.sql.out index 1ed7726dde8e..6dc2a3e860e7 100644 --- a/sql/core/src/test/resources/sql-tests/results/udtf/udtf.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/udtf/udtf.sql.out @@ -335,6 +335,37 @@ org.apache.spark.sql.AnalysisException } +-- !query +SELECT * FROM UDTFPartitionByOrderByComplexExpr(TABLE(t2)) +-- !query schema +struct<partition_col:int,count:int,total:int,last:int> +-- !query output +0 1 1 1 +1 2 5 3 + + +-- !query +SELECT * FROM UDTFInvalidOrderByAscKeyword(TABLE(t2)) +-- !query schema +struct<> +-- !query output +org.apache.spark.sql.AnalysisException +{ + "errorClass" : "UDTF_INVALID_ALIAS_IN_REQUESTED_ORDERING_STRING_FROM_ANALYZE_METHOD", + "sqlState" : "42802", + "messageParameters" : { + "aliasName" : "ASC" + }, + "queryContext" : [ { + "objectType" : "", + "objectName" : "", + "startIndex" : 15, + "stopIndex" : 53, + "fragment" : "UDTFInvalidOrderByAscKeyword(TABLE(t2))" + } ] +} + + -- !query SELECT * FROM UDTFInvalidPartitionByAndWithSinglePartition(TABLE(t2)) -- !query schema @@ -895,6 +926,29 @@ org.apache.spark.sql.AnalysisException } +-- !query +SELECT * FROM UDTFInvalidPartitionByOrderByParseError(TABLE(t2)) +-- !query schema +struct<> +-- !query output +org.apache.spark.sql.catalyst.ExtendedAnalysisException +{ + "errorClass" : "UNRESOLVED_COLUMN.WITH_SUGGESTION", + "sqlState" : "42703", + "messageParameters" : { + "objectName" : "`unparsable`", + "proposal" : "`input`, `partition_col`" + }, + "queryContext" : [ { + "objectType" : "", + "objectName" : "", + "startIndex" : 1, + "stopIndex" : 10, + "fragment" : "unparsable" + } ] +} + + -- !query DROP VIEW t1 -- !query schema diff --git a/sql/core/src/test/scala/org/apache/spark/sql/IntegratedUDFTestUtils.scala b/sql/core/src/test/scala/org/apache/spark/sql/IntegratedUDFTestUtils.scala index 99045ffd8637..1e691f71c511 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/IntegratedUDFTestUtils.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/IntegratedUDFTestUtils.scala @@ -487,7 +487,7 @@ object IntegratedUDFTestUtils extends SQLHelper { object UDTFCountSumLast extends TestUDTF { val pythonScript: String = s""" - |from pyspark.sql.functions import AnalyzeResult, OrderingColumn, PartitioningColumn + |from pyspark.sql.functions import AnalyzeResult |from pyspark.sql.types import IntegerType, Row, StructType |class $name: | def __init__(self): @@ -541,7 +541,7 @@ object IntegratedUDFTestUtils extends SQLHelper { s""" |import json |from dataclasses import dataclass - |from pyspark.sql.functions import AnalyzeResult, OrderingColumn, PartitioningColumn + |from pyspark.sql.functions import AnalyzeResult, OrderingColumn |from pyspark.sql.types import IntegerType, Row, StructType | |@dataclass @@ -583,7 +583,9 @@ object IntegratedUDFTestUtils extends SQLHelper { |""".stripMargin } - object UDTFPartitionByOrderBy extends TestUDTF { + abstract class TestPythonUDTFPartitionByOrderByBase( + partitionBy: String, + orderBy: String) extends TestUDTF { val pythonScript: String = s""" |from pyspark.sql.functions import AnalyzeResult, OrderingColumn, PartitioningColumn @@ -604,10 +606,10 @@ object IntegratedUDFTestUtils extends SQLHelper { | .add("total", IntegerType()) | .add("last", IntegerType()), | partitionBy=[ - | PartitioningColumn("partition_col") + | PartitioningColumn("$partitionBy") | ], | orderBy=[ - | OrderingColumn("input") + | OrderingColumn("$orderBy") | ]) | | def eval(self, row: Row): @@ -621,10 +623,30 @@ object IntegratedUDFTestUtils extends SQLHelper { |""".stripMargin } + object UDTFPartitionByOrderBy + extends TestPythonUDTFPartitionByOrderByBase( + partitionBy = "partition_col", + orderBy = "input") + + object UDTFPartitionByOrderByComplexExpr + extends TestPythonUDTFPartitionByOrderByBase( + partitionBy = "partition_col + 1", + orderBy = "RANDOM(42)") + + object UDTFInvalidPartitionByOrderByParseError + extends TestPythonUDTFPartitionByOrderByBase( + partitionBy = "unparsable", + orderBy = "input") + + object UDTFInvalidOrderByAscKeyword + extends TestPythonUDTFPartitionByOrderByBase( + partitionBy = "partition_col", + orderBy = "partition_col ASC") + object UDTFInvalidPartitionByAndWithSinglePartition extends TestUDTF { val pythonScript: String = s""" - |from pyspark.sql.functions import AnalyzeResult, OrderingColumn, PartitioningColumn + |from pyspark.sql.functions import AnalyzeResult, PartitioningColumn |from pyspark.sql.types import IntegerType, Row, StructType |class $name: | def __init__(self): @@ -651,7 +673,7 @@ object IntegratedUDFTestUtils extends SQLHelper { object UDTFInvalidOrderByWithoutPartitionBy extends TestUDTF { val pythonScript: String = s""" - |from pyspark.sql.functions import AnalyzeResult, OrderingColumn, PartitioningColumn + |from pyspark.sql.functions import AnalyzeResult, OrderingColumn |from pyspark.sql.types import IntegerType, Row, StructType |class $name: | def __init__(self): @@ -1127,10 +1149,13 @@ object IntegratedUDFTestUtils extends SQLHelper { UDTFLastString, UDTFWithSinglePartition, UDTFPartitionByOrderBy, + UDTFInvalidOrderByAscKeyword, UDTFInvalidPartitionByAndWithSinglePartition, + UDTFInvalidPartitionByOrderByParseError, UDTFInvalidOrderByWithoutPartitionBy, UDTFForwardStateFromAnalyze, UDTFForwardStateFromAnalyzeWithKwargs, + UDTFPartitionByOrderByComplexExpr, InvalidAnalyzeMethodReturnsNonStructTypeSchema, InvalidAnalyzeMethodWithSinglePartitionNoInputTable, InvalidAnalyzeMethodWithPartitionByNoInputTable, --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org