allisonwang-db commented on code in PR #42595:
URL: https://github.com/apache/spark/pull/42595#discussion_r1310679860
##########
python/pyspark/sql/udtf.py:
##########
@@ -61,6 +61,26 @@ class AnalyzeArgument:
is_table: bool
+@dataclass(frozen=True)
+class PartitioningColumn:
+ """
+ Represents a UDTF column for purposes of returning metadata from the
'analyze' method.
+ """
+
+ name: str
+
+
+@dataclass(frozen=True)
+class OrderingColumn:
Review Comment:
SortOrder class also includes `nullOrdering` . Do we want to add is here?
For example `ORDER BY t.c1 DESC NULL FIRST`
##########
python/pyspark/sql/tests/test_udtf.py:
##########
@@ -2131,6 +2133,118 @@ def terminate(self):
[Row(count=40, total=60, last=2)],
)
+ def test_udtf_with_table_argument_with_single_partition_from_analyze(self):
+ @udtf
+ class TestUDTF:
+ def __init__(self):
+ self._count = 0
+ self._sum = 0
+ self._last = None
+
+ @staticmethod
+ def analyze(self):
+ return AnalyzeResult(
+ schema=StructType()
+ .add("count", IntegerType())
+ .add("total", IntegerType())
+ .add("last", IntegerType()),
+ with_single_partition=True,
+ order_by=[OrderingColumn("input"),
OrderingColumn("partition_col")],
+ )
+
+ def eval(self, row: Row):
+ # Make sure that the rows arrive in the expected order.
+ if self._last is not None and self._last > row["input"]:
+ raise Exception(
+ f"self._last was {self._last} but the row value was
{row['input']}"
+ )
+ self._count += 1
+ self._last = row["input"]
+ self._sum += row["input"]
+
+ def terminate(self):
+ yield self._count, self._sum, self._last
+
+ self.spark.udtf.register("test_udtf", TestUDTF)
+
+ self.assertEqual(
Review Comment:
Let's use the new PySpark test framework `assertDataFrameEqual` added by
@asl3
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala:
##########
@@ -2096,23 +2096,87 @@ class Analyzer(override val catalogManager:
CatalogManager) extends RuleExecutor
catalog, "table-valued functions")
}
}
-
+ // Resolve Python UDTF calls if needed.
+ val resolvedFunc = resolvedTvf match {
+ case g @ Generate(u: UnresolvedPolymorphicPythonUDTF, _, _, _,
_, _) =>
+ val analyzeResult: PythonUDTFAnalyzeResult =
+ u.resolveElementMetadata(u.func, u.children)
+ g.copy(generator =
+ PythonUDTF(u.name, u.func, analyzeResult.schema, u.children,
+ u.evalType, u.udfDeterministic, u.resultId,
u.pythonUDTFPartitionColumnIndexes,
+ analyzeResult = Some(analyzeResult)))
+ case other =>
+ other
+ }
val tableArgs = mutable.ArrayBuffer.empty[LogicalPlan]
val functionTableSubqueryArgs =
mutable.ArrayBuffer.empty[FunctionTableSubqueryArgumentExpression]
val tvf = resolvedFunc.transformAllExpressionsWithPruning(
_.containsPattern(FUNCTION_TABLE_RELATION_ARGUMENT_EXPRESSION),
ruleId) {
case t: FunctionTableSubqueryArgumentExpression =>
val alias =
SubqueryAlias.generateSubqueryName(s"_${tableArgs.size}")
+ var pythonUDTF = Option.empty[PythonUDTF]
+ var pythonUDTFAnalyzeResult =
Option.empty[PythonUDTFAnalyzeResult]
resolvedFunc match {
- case Generate(_: PythonUDTF, _, _, _, _, _) =>
+ case Generate(p: PythonUDTF, _, _, _, _, _) =>
+ pythonUDTF = Some(p)
+ pythonUDTFAnalyzeResult = p.analyzeResult
case _ =>
assert(!t.hasRepartitioning,
"Cannot evaluate the table-valued function call because
it included the " +
"PARTITION BY clause, but only Python table functions
support this clause")
}
- tableArgs.append(SubqueryAlias(alias, t.evaluable))
- functionTableSubqueryArgs.append(t)
+ // Check if this is a call to a Python user-defined table
function whose polymorphic
+ // 'analyze' method returned metadata indicated requested
partitioning and/or
+ // ordering properties of the input relation. In that event,
make sure that the UDTF
+ // call did not include any explicit PARTITION BY and/or ORDER
BY clauses for the
+ // corresponding TABLE argument, and then update the TABLE
argument representation
+ // to apply the requested partitioning and/or ordering.
+ pythonUDTFAnalyzeResult.map { a =>
Review Comment:
nit: `a` -> `analyzeResult`
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala:
##########
@@ -2096,23 +2096,87 @@ class Analyzer(override val catalogManager:
CatalogManager) extends RuleExecutor
catalog, "table-valued functions")
}
}
-
+ // Resolve Python UDTF calls if needed.
+ val resolvedFunc = resolvedTvf match {
+ case g @ Generate(u: UnresolvedPolymorphicPythonUDTF, _, _, _,
_, _) =>
+ val analyzeResult: PythonUDTFAnalyzeResult =
+ u.resolveElementMetadata(u.func, u.children)
+ g.copy(generator =
+ PythonUDTF(u.name, u.func, analyzeResult.schema, u.children,
+ u.evalType, u.udfDeterministic, u.resultId,
u.pythonUDTFPartitionColumnIndexes,
+ analyzeResult = Some(analyzeResult)))
+ case other =>
+ other
+ }
val tableArgs = mutable.ArrayBuffer.empty[LogicalPlan]
val functionTableSubqueryArgs =
mutable.ArrayBuffer.empty[FunctionTableSubqueryArgumentExpression]
val tvf = resolvedFunc.transformAllExpressionsWithPruning(
_.containsPattern(FUNCTION_TABLE_RELATION_ARGUMENT_EXPRESSION),
ruleId) {
case t: FunctionTableSubqueryArgumentExpression =>
val alias =
SubqueryAlias.generateSubqueryName(s"_${tableArgs.size}")
+ var pythonUDTF = Option.empty[PythonUDTF]
+ var pythonUDTFAnalyzeResult =
Option.empty[PythonUDTFAnalyzeResult]
resolvedFunc match {
- case Generate(_: PythonUDTF, _, _, _, _, _) =>
+ case Generate(p: PythonUDTF, _, _, _, _, _) =>
+ pythonUDTF = Some(p)
+ pythonUDTFAnalyzeResult = p.analyzeResult
case _ =>
assert(!t.hasRepartitioning,
"Cannot evaluate the table-valued function call because
it included the " +
"PARTITION BY clause, but only Python table functions
support this clause")
}
- tableArgs.append(SubqueryAlias(alias, t.evaluable))
- functionTableSubqueryArgs.append(t)
+ // Check if this is a call to a Python user-defined table
function whose polymorphic
+ // 'analyze' method returned metadata indicated requested
partitioning and/or
+ // ordering properties of the input relation. In that event,
make sure that the UDTF
+ // call did not include any explicit PARTITION BY and/or ORDER
BY clauses for the
+ // corresponding TABLE argument, and then update the TABLE
argument representation
+ // to apply the requested partitioning and/or ordering.
+ pythonUDTFAnalyzeResult.map { a =>
+ if (a.withSinglePartition &&
a.partitionByExpressions.nonEmpty) {
+ throw
QueryCompilationErrors.tableValuedFunctionRequiredMetadataInvalid(
+ functionName = pythonUDTF.get.name,
+ reason = "the 'with_single_partition' field cannot be
assigned to true " +
+ "if the 'partition_by' list is non-empty")
+ } else if (a.orderByExpressions.nonEmpty &&
!a.withSinglePartition &&
+ a.partitionByExpressions.isEmpty) {
+ throw
QueryCompilationErrors.tableValuedFunctionRequiredMetadataInvalid(
+ functionName = pythonUDTF.get.name,
+ reason = "the 'order_by' field cannot be non-empty
unless the " +
+ "'with_single_partition' field is set to true or the
'partition_by' list " +
+ "is non-empty")
+ } else if (a.hasRepartitioning && t.hasRepartitioning) {
+ throw QueryCompilationErrors
+ .tableValuedFunctionRequiredMetadataIncompatibleWithCall(
+ functionName = pythonUDTF.get.name,
+ requestedMetadata =
+ "specified its own required partitioning of the
input table",
+ invalidFunctionCallProperty =
+ "specified the WITH SINGLE PARTITION or PARTITION BY
clause; " +
+ "please remove these clauses and retry the query
again.")
+ }
+ var withSinglePartition = t.withSinglePartition
+ var partitionByExpressions = t.partitionByExpressions
+ var orderByExpressions = t.orderByExpressions
+ if (a.withSinglePartition) {
+ withSinglePartition = true
+ }
+ if (a.partitionByExpressions.nonEmpty) {
+ partitionByExpressions = a.partitionByExpressions
+ }
+ if (a.orderByExpressions.nonEmpty) {
+ orderByExpressions = a.orderByExpressions
+ }
Review Comment:
One option to make documentation easier (and make it easier for users to
understand), is to simply throw an exception if both the table subquery and the
UDTF's analysis result contain any of the metadata.
For example:
UDTFAnalyzeResult: [single_partition = True] is invoked with `udtf(TABLE(t)
ORDER BY t.c1) `
We can throw an exception stating that the UDTF already has 'PARTITION BY'
or 'ORDER BY' defined in its analysis result, and thus it cannot be used in
conjunction with the SQL 'PARTITION BY' or 'ORDER BY' syntax.
Of course, this approach limits the expressiveness of UDTFs. Another option
is to retain the overwriting logic here, but we would need to clearly document
the behavior when a UDTF has both an 'ORDER BY' clause in the SQL query and
within the UDTF's AnalyzeResult.
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala:
##########
@@ -2096,23 +2096,87 @@ class Analyzer(override val catalogManager:
CatalogManager) extends RuleExecutor
catalog, "table-valued functions")
}
}
-
+ // Resolve Python UDTF calls if needed.
+ val resolvedFunc = resolvedTvf match {
+ case g @ Generate(u: UnresolvedPolymorphicPythonUDTF, _, _, _,
_, _) =>
+ val analyzeResult: PythonUDTFAnalyzeResult =
+ u.resolveElementMetadata(u.func, u.children)
+ g.copy(generator =
+ PythonUDTF(u.name, u.func, analyzeResult.schema, u.children,
+ u.evalType, u.udfDeterministic, u.resultId,
u.pythonUDTFPartitionColumnIndexes,
+ analyzeResult = Some(analyzeResult)))
+ case other =>
+ other
+ }
val tableArgs = mutable.ArrayBuffer.empty[LogicalPlan]
val functionTableSubqueryArgs =
mutable.ArrayBuffer.empty[FunctionTableSubqueryArgumentExpression]
val tvf = resolvedFunc.transformAllExpressionsWithPruning(
_.containsPattern(FUNCTION_TABLE_RELATION_ARGUMENT_EXPRESSION),
ruleId) {
case t: FunctionTableSubqueryArgumentExpression =>
val alias =
SubqueryAlias.generateSubqueryName(s"_${tableArgs.size}")
+ var pythonUDTF = Option.empty[PythonUDTF]
+ var pythonUDTFAnalyzeResult =
Option.empty[PythonUDTFAnalyzeResult]
resolvedFunc match {
- case Generate(_: PythonUDTF, _, _, _, _, _) =>
+ case Generate(p: PythonUDTF, _, _, _, _, _) =>
+ pythonUDTF = Some(p)
+ pythonUDTFAnalyzeResult = p.analyzeResult
case _ =>
assert(!t.hasRepartitioning,
"Cannot evaluate the table-valued function call because
it included the " +
"PARTITION BY clause, but only Python table functions
support this clause")
}
- tableArgs.append(SubqueryAlias(alias, t.evaluable))
- functionTableSubqueryArgs.append(t)
+ // Check if this is a call to a Python user-defined table
function whose polymorphic
+ // 'analyze' method returned metadata indicated requested
partitioning and/or
+ // ordering properties of the input relation. In that event,
make sure that the UDTF
+ // call did not include any explicit PARTITION BY and/or ORDER
BY clauses for the
+ // corresponding TABLE argument, and then update the TABLE
argument representation
+ // to apply the requested partitioning and/or ordering.
+ pythonUDTFAnalyzeResult.map { a =>
+ if (a.withSinglePartition &&
a.partitionByExpressions.nonEmpty) {
+ throw
QueryCompilationErrors.tableValuedFunctionRequiredMetadataInvalid(
+ functionName = pythonUDTF.get.name,
+ reason = "the 'with_single_partition' field cannot be
assigned to true " +
+ "if the 'partition_by' list is non-empty")
+ } else if (a.orderByExpressions.nonEmpty &&
!a.withSinglePartition &&
Review Comment:
nit: `else if` -> `if`
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala:
##########
@@ -2096,23 +2096,87 @@ class Analyzer(override val catalogManager:
CatalogManager) extends RuleExecutor
catalog, "table-valued functions")
}
}
-
+ // Resolve Python UDTF calls if needed.
+ val resolvedFunc = resolvedTvf match {
+ case g @ Generate(u: UnresolvedPolymorphicPythonUDTF, _, _, _,
_, _) =>
+ val analyzeResult: PythonUDTFAnalyzeResult =
+ u.resolveElementMetadata(u.func, u.children)
+ g.copy(generator =
+ PythonUDTF(u.name, u.func, analyzeResult.schema, u.children,
+ u.evalType, u.udfDeterministic, u.resultId,
u.pythonUDTFPartitionColumnIndexes,
+ analyzeResult = Some(analyzeResult)))
+ case other =>
+ other
+ }
val tableArgs = mutable.ArrayBuffer.empty[LogicalPlan]
val functionTableSubqueryArgs =
mutable.ArrayBuffer.empty[FunctionTableSubqueryArgumentExpression]
val tvf = resolvedFunc.transformAllExpressionsWithPruning(
_.containsPattern(FUNCTION_TABLE_RELATION_ARGUMENT_EXPRESSION),
ruleId) {
case t: FunctionTableSubqueryArgumentExpression =>
val alias =
SubqueryAlias.generateSubqueryName(s"_${tableArgs.size}")
+ var pythonUDTF = Option.empty[PythonUDTF]
+ var pythonUDTFAnalyzeResult =
Option.empty[PythonUDTFAnalyzeResult]
resolvedFunc match {
- case Generate(_: PythonUDTF, _, _, _, _, _) =>
+ case Generate(p: PythonUDTF, _, _, _, _, _) =>
+ pythonUDTF = Some(p)
+ pythonUDTFAnalyzeResult = p.analyzeResult
case _ =>
assert(!t.hasRepartitioning,
"Cannot evaluate the table-valued function call because
it included the " +
"PARTITION BY clause, but only Python table functions
support this clause")
}
- tableArgs.append(SubqueryAlias(alias, t.evaluable))
- functionTableSubqueryArgs.append(t)
+ // Check if this is a call to a Python user-defined table
function whose polymorphic
+ // 'analyze' method returned metadata indicated requested
partitioning and/or
+ // ordering properties of the input relation. In that event,
make sure that the UDTF
+ // call did not include any explicit PARTITION BY and/or ORDER
BY clauses for the
+ // corresponding TABLE argument, and then update the TABLE
argument representation
+ // to apply the requested partitioning and/or ordering.
+ pythonUDTFAnalyzeResult.map { a =>
Review Comment:
How about creating a helper function `resolvePolymorphicPythonUDTF` here?
Otherwise, the logic here to resolve a TVF becomes a bit hard to read.
if (pythonUDTFAnalyzeResult.isDefined) { resolvePolymorphicPythonUDTF(...) }
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/python/UserDefinedPythonFunction.scala:
##########
@@ -81,7 +84,7 @@ case class UserDefinedPythonTableFunction(
func: PythonFunction,
returnType: Option[StructType],
pythonEvalType: Int,
- udfDeterministic: Boolean) {
+ udfDeterministic: Boolean) extends Logging {
Review Comment:
this change is not needed?
##########
sql/core/src/test/resources/sql-tests/inputs/udtf/udtf.sql:
##########
@@ -16,3 +19,27 @@ SELECT * FROM udtf(1, 2) t(c1, c2), LATERAL udtf(c1, c2);
-- test non-deterministic input
SELECT * FROM udtf(cast(rand(0) AS int) + 1, 1);
+
+-- test UDTF calls that take input TABLE arguments
+SELECT * FROM UDTFCountSumLast(TABLE(t2) WITH SINGLE PARTITION);
Review Comment:
It would be really great if we could add some comments for these UDTFs
(their analyze result)
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala:
##########
@@ -2096,23 +2096,87 @@ class Analyzer(override val catalogManager:
CatalogManager) extends RuleExecutor
catalog, "table-valued functions")
}
}
-
+ // Resolve Python UDTF calls if needed.
+ val resolvedFunc = resolvedTvf match {
+ case g @ Generate(u: UnresolvedPolymorphicPythonUDTF, _, _, _,
_, _) =>
+ val analyzeResult: PythonUDTFAnalyzeResult =
+ u.resolveElementMetadata(u.func, u.children)
+ g.copy(generator =
+ PythonUDTF(u.name, u.func, analyzeResult.schema, u.children,
+ u.evalType, u.udfDeterministic, u.resultId,
u.pythonUDTFPartitionColumnIndexes,
+ analyzeResult = Some(analyzeResult)))
+ case other =>
+ other
+ }
val tableArgs = mutable.ArrayBuffer.empty[LogicalPlan]
val functionTableSubqueryArgs =
mutable.ArrayBuffer.empty[FunctionTableSubqueryArgumentExpression]
val tvf = resolvedFunc.transformAllExpressionsWithPruning(
_.containsPattern(FUNCTION_TABLE_RELATION_ARGUMENT_EXPRESSION),
ruleId) {
case t: FunctionTableSubqueryArgumentExpression =>
val alias =
SubqueryAlias.generateSubqueryName(s"_${tableArgs.size}")
+ var pythonUDTF = Option.empty[PythonUDTF]
+ var pythonUDTFAnalyzeResult =
Option.empty[PythonUDTFAnalyzeResult]
resolvedFunc match {
- case Generate(_: PythonUDTF, _, _, _, _, _) =>
+ case Generate(p: PythonUDTF, _, _, _, _, _) =>
+ pythonUDTF = Some(p)
+ pythonUDTFAnalyzeResult = p.analyzeResult
Review Comment:
We can do
```
val (pythonUDTF, pythonUDTFAnalyzeResult) = resolvedFunc match {
case Generate...
(Some(p), p.analyzeReulst)
case _ =>
assert(...)
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]