dtenedor commented on code in PR #42595:
URL: https://github.com/apache/spark/pull/42595#discussion_r1310587796


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala:
##########
@@ -2096,23 +2096,87 @@ class Analyzer(override val catalogManager: 
CatalogManager) extends RuleExecutor
                   catalog, "table-valued functions")
               }
             }
-
+            // Resolve Python UDTF calls if needed.
+            val resolvedFunc = resolvedTvf match {
+              case g @ Generate(u: UnresolvedPolymorphicPythonUDTF, _, _, _, 
_, _) =>
+                val analyzeResult: PythonUDTFAnalyzeResult =
+                  u.resolveElementMetadata(u.func, u.children)
+                g.copy(generator =
+                  PythonUDTF(u.name, u.func, analyzeResult.schema, u.children,
+                    u.evalType, u.udfDeterministic, u.resultId, 
u.pythonUDTFPartitionColumnIndexes,
+                    analyzeResult = Some(analyzeResult)))
+              case other =>
+                other
+            }
             val tableArgs = mutable.ArrayBuffer.empty[LogicalPlan]
             val functionTableSubqueryArgs =
               
mutable.ArrayBuffer.empty[FunctionTableSubqueryArgumentExpression]
             val tvf = resolvedFunc.transformAllExpressionsWithPruning(
               _.containsPattern(FUNCTION_TABLE_RELATION_ARGUMENT_EXPRESSION), 
ruleId)  {
               case t: FunctionTableSubqueryArgumentExpression =>
                 val alias = 
SubqueryAlias.generateSubqueryName(s"_${tableArgs.size}")
+                var pythonUDTF = Option.empty[PythonUDTF]
+                var pythonUDTFAnalyzeResult = 
Option.empty[PythonUDTFAnalyzeResult]
                 resolvedFunc match {
-                  case Generate(_: PythonUDTF, _, _, _, _, _) =>
+                  case Generate(p: PythonUDTF, _, _, _, _, _) =>
+                    pythonUDTF = Some(p)
+                    pythonUDTFAnalyzeResult = p.analyzeResult
                   case _ =>
                     assert(!t.hasRepartitioning,
                       "Cannot evaluate the table-valued function call because 
it included the " +
                         "PARTITION BY clause, but only Python table functions 
support this clause")
                 }
-                tableArgs.append(SubqueryAlias(alias, t.evaluable))
-                functionTableSubqueryArgs.append(t)
+                // Check if this is a call to a Python user-defined table 
function whose polymorphic
+                // 'analyze' method returned metadata indicated requested 
partitioning and/or
+                // ordering properties of the input relation. In that event, 
make sure that the UDTF
+                // call did not include any explicit PARTITION BY and/or ORDER 
BY clauses for the
+                // corresponding TABLE argument, and then update the TABLE 
argument representation
+                // to apply the requested partitioning and/or ordering.
+                pythonUDTFAnalyzeResult.map { a =>
+                  if (a.withSinglePartition && 
a.partitionByExpressions.nonEmpty) {
+                    throw 
QueryCompilationErrors.tableValuedFunctionRequiredMetadataInvalid(
+                      functionName = pythonUDTF.get.name,
+                      reason = "the 'with_single_partition' field cannot be 
assigned to true " +
+                        "if the 'partition_by' list is non-empty")
+                  } else if (a.orderByExpressions.nonEmpty && 
!a.withSinglePartition &&
+                    a.partitionByExpressions.isEmpty) {
+                    throw 
QueryCompilationErrors.tableValuedFunctionRequiredMetadataInvalid(
+                      functionName = pythonUDTF.get.name,
+                      reason = "the 'order_by' field cannot be non-empty 
unless the " +
+                        "'with_single_partition' field is set to true or the 
'partition_by' list " +
+                        "is non-empty")
+                  } else if (a.hasRepartitioning && t.hasRepartitioning) {
+                    throw QueryCompilationErrors
+                      .tableValuedFunctionRequiredMetadataIncompatibleWithCall(
+                        functionName = pythonUDTF.get.name,
+                        requestedMetadata =
+                          "specified its own required partitioning of the 
input table",
+                        invalidFunctionCallProperty =
+                          "specified the WITH SINGLE PARTITION or PARTITION BY 
clause; " +
+                            "please remove these clauses and retry the query 
again.")
+                  }
+                  var withSinglePartition = t.withSinglePartition
+                  var partitionByExpressions = t.partitionByExpressions
+                  var orderByExpressions = t.orderByExpressions
+                  if (a.withSinglePartition) {
+                    withSinglePartition = true
+                  }
+                  if (a.partitionByExpressions.nonEmpty) {
+                    partitionByExpressions = a.partitionByExpressions
+                  }
+                  if (a.orderByExpressions.nonEmpty) {
+                    orderByExpressions = a.orderByExpressions
+                  }

Review Comment:
   Good question, I originally tried just simply overwriting with the fields of 
the AnalyzeResult, but found it very easy to make mistakes/bugs by accidentally 
overwriting details of the UDTF call from the AnalyzeResult when not intended. 
   
   Therefore it seems prudent to explicitly write down every case here in the 
code, starting with the UDTF call details (from `t`) and then deciding when to 
overwrite them with the corresponding details of the AnalyzeResult (from `a`). 
Even if this results in the same behavior for e.g. these `orderByExpressions`, 
it's probably safer to explicitly write them all out like this in order to make 
the intentions of the logic clear and to invite consistency later if and when 
we add more metadata to the UDTF AnalyzeResult.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to