ueshin commented on code in PR #42595:
URL: https://github.com/apache/spark/pull/42595#discussion_r1309357128
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala:
##########
@@ -2096,23 +2096,87 @@ class Analyzer(override val catalogManager:
CatalogManager) extends RuleExecutor
catalog, "table-valued functions")
}
}
-
+ // Resolve Python UDTF calls if needed.
+ val resolvedFunc = resolvedTvf match {
+ case g @ Generate(u: UnresolvedPolymorphicPythonUDTF, _, _, _,
_, _) =>
+ val analyzeResult: PythonUDTFAnalyzeResult =
+ u.resolveElementMetadata(u.func, u.children)
+ g.copy(generator =
+ PythonUDTF(u.name, u.func, analyzeResult.schema, u.children,
+ u.evalType, u.udfDeterministic, u.resultId,
u.pythonUDTFPartitionColumnIndexes,
+ analyzeResult = Some(analyzeResult)))
+ case other =>
+ other
+ }
val tableArgs = mutable.ArrayBuffer.empty[LogicalPlan]
val functionTableSubqueryArgs =
mutable.ArrayBuffer.empty[FunctionTableSubqueryArgumentExpression]
val tvf = resolvedFunc.transformAllExpressionsWithPruning(
_.containsPattern(FUNCTION_TABLE_RELATION_ARGUMENT_EXPRESSION),
ruleId) {
case t: FunctionTableSubqueryArgumentExpression =>
val alias =
SubqueryAlias.generateSubqueryName(s"_${tableArgs.size}")
+ var pythonUDTF = Option.empty[PythonUDTF]
+ var pythonUDTFAnalyzeResult =
Option.empty[PythonUDTFAnalyzeResult]
resolvedFunc match {
- case Generate(_: PythonUDTF, _, _, _, _, _) =>
+ case Generate(p: PythonUDTF, _, _, _, _, _) =>
+ pythonUDTF = Some(p)
+ pythonUDTFAnalyzeResult = p.analyzeResult
case _ =>
assert(!t.hasRepartitioning,
"Cannot evaluate the table-valued function call because
it included the " +
"PARTITION BY clause, but only Python table functions
support this clause")
}
- tableArgs.append(SubqueryAlias(alias, t.evaluable))
- functionTableSubqueryArgs.append(t)
+ // Check if this is a call to a Python user-defined table
function whose polymorphic
+ // 'analyze' method returned metadata indicated requested
partitioning and/or
+ // ordering properties of the input relation. In that event,
make sure that the UDTF
+ // call did not include any explicit PARTITION BY and/or ORDER
BY clauses for the
+ // corresponding TABLE argument, and then update the TABLE
argument representation
+ // to apply the requested partitioning and/or ordering.
+ pythonUDTFAnalyzeResult.map { a =>
+ if (a.withSinglePartition &&
a.partitionByExpressions.nonEmpty) {
+ throw
QueryCompilationErrors.tableValuedFunctionRequiredMetadataInvalid(
+ functionName = pythonUDTF.get.name,
+ reason = "the 'with_single_partition' field cannot be
assigned to true " +
+ "if the 'partition_by' list is non-empty")
+ } else if (a.orderByExpressions.nonEmpty &&
!a.withSinglePartition &&
+ a.partitionByExpressions.isEmpty) {
+ throw
QueryCompilationErrors.tableValuedFunctionRequiredMetadataInvalid(
+ functionName = pythonUDTF.get.name,
+ reason = "the 'order_by' field cannot be non-empty
unless the " +
+ "'with_single_partition' field is set to true or the
'partition_by' list " +
+ "is non-empty")
+ } else if (a.hasRepartitioning && t.hasRepartitioning) {
+ throw QueryCompilationErrors
+ .tableValuedFunctionRequiredMetadataIncompatibleWithCall(
+ functionName = pythonUDTF.get.name,
+ requestedMetadata =
+ "specified its own required partitioning of the
input table",
+ invalidFunctionCallProperty =
+ "specified the WITH SINGLE PARTITION or PARTITION BY
clause; " +
+ "please remove these clauses and retry the query
again.")
+ }
+ var withSinglePartition = t.withSinglePartition
+ var partitionByExpressions = t.partitionByExpressions
+ var orderByExpressions = t.orderByExpressions
+ if (a.withSinglePartition) {
+ withSinglePartition = true
+ }
+ if (a.partitionByExpressions.nonEmpty) {
+ partitionByExpressions = a.partitionByExpressions
+ }
+ if (a.orderByExpressions.nonEmpty) {
+ orderByExpressions = a.orderByExpressions
+ }
Review Comment:
I'm wondering whether it's ok to just overwrite `orderByExpressions` without
any checks.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]