dtenedor commented on code in PR #42595:
URL: https://github.com/apache/spark/pull/42595#discussion_r1310950704


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/python/UserDefinedPythonFunction.scala:
##########
@@ -81,7 +84,7 @@ case class UserDefinedPythonTableFunction(
     func: PythonFunction,
     returnType: Option[StructType],
     pythonEvalType: Int,
-    udfDeterministic: Boolean) {
+    udfDeterministic: Boolean) extends Logging {

Review Comment:
   Done.



##########
sql/core/src/test/resources/sql-tests/inputs/udtf/udtf.sql:
##########
@@ -16,3 +19,27 @@ SELECT * FROM udtf(1, 2) t(c1, c2), LATERAL udtf(c1, c2);
 
 -- test non-deterministic input
 SELECT * FROM udtf(cast(rand(0) AS int) + 1, 1);
+
+-- test UDTF calls that take input TABLE arguments
+SELECT * FROM UDTFCountSumLast(TABLE(t2) WITH SINGLE PARTITION);
+SELECT * FROM UDTFCountSumLast(TABLE(t2) PARTITION BY partition_col ORDER BY 
input);
+SELECT * FROM UDTFCountSumLast(TABLE(t2) PARTITION BY partition_col ORDER BY 
input DESC);
+
+-- test UDTF calls that take input TABLE arguments and the 'analyze' method 
returns required
+-- partitioning and/or ordering properties for Catalyst to enforce for the 
input table
+SELECT * FROM UDTFWithSinglePartition(TABLE(t2));
+SELECT * FROM UDTFPartitionByOrderBy(TABLE(t2));
+SELECT * FROM UDTFWithSinglePartition(TABLE(t2) WITH SINGLE PARTITION);
+SELECT * FROM UDTFWithSinglePartition(TABLE(t2) PARTITION BY partition_col);
+SELECT * FROM UDTFPartitionByOrderBy(TABLE(t2) WITH SINGLE PARTITION);
+SELECT * FROM UDTFPartitionByOrderBy(TABLE(t2) PARTITION BY partition_col);
+SELECT * FROM UDTFInvalidPartitionByAndWithSinglePartition(TABLE(t2));
+SELECT * FROM UDTFInvalidPartitionByAndWithSinglePartition(TABLE(t2) WITH 
SINGLE PARTITION);
+SELECT * FROM UDTFInvalidPartitionByAndWithSinglePartition(TABLE(t2) PARTITION 
BY partition_col);
+SELECT * FROM UDTFInvalidOrderByWithoutPartitionBy(TABLE(t2));
+SELECT * FROM UDTFInvalidOrderByWithoutPartitionBy(TABLE(t2) WITH SINGLE 
PARTITION);
+SELECT * FROM UDTFInvalidOrderByWithoutPartitionBy(TABLE(t2) PARTITION BY 
partition_col);

Review Comment:
   👍  I added a lateral join test for each UDTF type.



##########
sql/core/src/test/resources/sql-tests/inputs/udtf/udtf.sql:
##########
@@ -16,3 +19,27 @@ SELECT * FROM udtf(1, 2) t(c1, c2), LATERAL udtf(c1, c2);
 
 -- test non-deterministic input
 SELECT * FROM udtf(cast(rand(0) AS int) + 1, 1);
+
+-- test UDTF calls that take input TABLE arguments
+SELECT * FROM UDTFCountSumLast(TABLE(t2) WITH SINGLE PARTITION);

Review Comment:
   Sounds good, I ported the `AnalyzeResult` to the comments here for each UDTF.



##########
sql/core/src/test/resources/sql-tests/inputs/udtf/udtf.sql:
##########
@@ -16,3 +19,27 @@ SELECT * FROM udtf(1, 2) t(c1, c2), LATERAL udtf(c1, c2);
 
 -- test non-deterministic input
 SELECT * FROM udtf(cast(rand(0) AS int) + 1, 1);
+
+-- test UDTF calls that take input TABLE arguments
+SELECT * FROM UDTFCountSumLast(TABLE(t2) WITH SINGLE PARTITION);
+SELECT * FROM UDTFCountSumLast(TABLE(t2) PARTITION BY partition_col ORDER BY 
input);
+SELECT * FROM UDTFCountSumLast(TABLE(t2) PARTITION BY partition_col ORDER BY 
input DESC);
+
+-- test UDTF calls that take input TABLE arguments and the 'analyze' method 
returns required
+-- partitioning and/or ordering properties for Catalyst to enforce for the 
input table
+SELECT * FROM UDTFWithSinglePartition(TABLE(t2));
+SELECT * FROM UDTFPartitionByOrderBy(TABLE(t2));
+SELECT * FROM UDTFWithSinglePartition(TABLE(t2) WITH SINGLE PARTITION);
+SELECT * FROM UDTFWithSinglePartition(TABLE(t2) PARTITION BY partition_col);
+SELECT * FROM UDTFPartitionByOrderBy(TABLE(t2) WITH SINGLE PARTITION);
+SELECT * FROM UDTFPartitionByOrderBy(TABLE(t2) PARTITION BY partition_col);
+SELECT * FROM UDTFInvalidPartitionByAndWithSinglePartition(TABLE(t2));
+SELECT * FROM UDTFInvalidPartitionByAndWithSinglePartition(TABLE(t2) WITH 
SINGLE PARTITION);
+SELECT * FROM UDTFInvalidPartitionByAndWithSinglePartition(TABLE(t2) PARTITION 
BY partition_col);
+SELECT * FROM UDTFInvalidOrderByWithoutPartitionBy(TABLE(t2));
+SELECT * FROM UDTFInvalidOrderByWithoutPartitionBy(TABLE(t2) WITH SINGLE 
PARTITION);
+SELECT * FROM UDTFInvalidOrderByWithoutPartitionBy(TABLE(t2) PARTITION BY 
partition_col);

Review Comment:
   👍  I added a lateral join test for each UDTF type.



##########
sql/core/src/test/scala/org/apache/spark/sql/IntegratedUDFTestUtils.scala:
##########
@@ -441,6 +443,255 @@ object IntegratedUDFTestUtils extends SQLHelper {
     val prettyName: String = "Regular Python UDTF"
   }
 
+  object TestPythonUDTFCountSumLast extends TestUDTF {
+    val name: String = "UDTFCountSumLast"
+    val pythonScript: String =
+      s"""
+         |from pyspark.sql.functions import AnalyzeResult, OrderingColumn, 
PartitioningColumn
+         |from pyspark.sql.types import IntegerType, Row, StructType
+         |class $name:
+         |    def __init__(self):
+         |        self._count = 0
+         |        self._sum = 0
+         |        self._last = None
+         |
+         |    @staticmethod
+         |    def analyze(self):

Review Comment:
   Yes, it is required. Ostensibly we could put this in the static 
registration, but most use cases would probably want it to work with a 
polymorphic output schema, so we can just leave it here. In general the 
`AnalyzeResult` can become a collecting place for metadata like this, the UDTF 
can implement `analyze` if it want to specify this extra information as needed 
(even if the output schema is always the same).



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala:
##########
@@ -2096,23 +2096,87 @@ class Analyzer(override val catalogManager: 
CatalogManager) extends RuleExecutor
                   catalog, "table-valued functions")
               }
             }
-
+            // Resolve Python UDTF calls if needed.
+            val resolvedFunc = resolvedTvf match {
+              case g @ Generate(u: UnresolvedPolymorphicPythonUDTF, _, _, _, 
_, _) =>
+                val analyzeResult: PythonUDTFAnalyzeResult =
+                  u.resolveElementMetadata(u.func, u.children)
+                g.copy(generator =
+                  PythonUDTF(u.name, u.func, analyzeResult.schema, u.children,
+                    u.evalType, u.udfDeterministic, u.resultId, 
u.pythonUDTFPartitionColumnIndexes,
+                    analyzeResult = Some(analyzeResult)))
+              case other =>
+                other
+            }
             val tableArgs = mutable.ArrayBuffer.empty[LogicalPlan]
             val functionTableSubqueryArgs =
               
mutable.ArrayBuffer.empty[FunctionTableSubqueryArgumentExpression]
             val tvf = resolvedFunc.transformAllExpressionsWithPruning(
               _.containsPattern(FUNCTION_TABLE_RELATION_ARGUMENT_EXPRESSION), 
ruleId)  {
               case t: FunctionTableSubqueryArgumentExpression =>
                 val alias = 
SubqueryAlias.generateSubqueryName(s"_${tableArgs.size}")
+                var pythonUDTF = Option.empty[PythonUDTF]
+                var pythonUDTFAnalyzeResult = 
Option.empty[PythonUDTFAnalyzeResult]
                 resolvedFunc match {
-                  case Generate(_: PythonUDTF, _, _, _, _, _) =>
+                  case Generate(p: PythonUDTF, _, _, _, _, _) =>
+                    pythonUDTF = Some(p)
+                    pythonUDTFAnalyzeResult = p.analyzeResult
                   case _ =>
                     assert(!t.hasRepartitioning,
                       "Cannot evaluate the table-valued function call because 
it included the " +
                         "PARTITION BY clause, but only Python table functions 
support this clause")
                 }
-                tableArgs.append(SubqueryAlias(alias, t.evaluable))
-                functionTableSubqueryArgs.append(t)
+                // Check if this is a call to a Python user-defined table 
function whose polymorphic
+                // 'analyze' method returned metadata indicated requested 
partitioning and/or
+                // ordering properties of the input relation. In that event, 
make sure that the UDTF
+                // call did not include any explicit PARTITION BY and/or ORDER 
BY clauses for the
+                // corresponding TABLE argument, and then update the TABLE 
argument representation
+                // to apply the requested partitioning and/or ordering.
+                pythonUDTFAnalyzeResult.map { a =>
+                  if (a.withSinglePartition && 
a.partitionByExpressions.nonEmpty) {
+                    throw 
QueryCompilationErrors.tableValuedFunctionRequiredMetadataInvalid(
+                      functionName = pythonUDTF.get.name,
+                      reason = "the 'with_single_partition' field cannot be 
assigned to true " +
+                        "if the 'partition_by' list is non-empty")
+                  } else if (a.orderByExpressions.nonEmpty && 
!a.withSinglePartition &&
+                    a.partitionByExpressions.isEmpty) {
+                    throw 
QueryCompilationErrors.tableValuedFunctionRequiredMetadataInvalid(
+                      functionName = pythonUDTF.get.name,
+                      reason = "the 'order_by' field cannot be non-empty 
unless the " +
+                        "'with_single_partition' field is set to true or the 
'partition_by' list " +
+                        "is non-empty")
+                  } else if (a.hasRepartitioning && t.hasRepartitioning) {
+                    throw QueryCompilationErrors
+                      .tableValuedFunctionRequiredMetadataIncompatibleWithCall(
+                        functionName = pythonUDTF.get.name,
+                        requestedMetadata =
+                          "specified its own required partitioning of the 
input table",
+                        invalidFunctionCallProperty =
+                          "specified the WITH SINGLE PARTITION or PARTITION BY 
clause; " +
+                            "please remove these clauses and retry the query 
again.")
+                  }
+                  var withSinglePartition = t.withSinglePartition
+                  var partitionByExpressions = t.partitionByExpressions
+                  var orderByExpressions = t.orderByExpressions
+                  if (a.withSinglePartition) {
+                    withSinglePartition = true
+                  }
+                  if (a.partitionByExpressions.nonEmpty) {
+                    partitionByExpressions = a.partitionByExpressions
+                  }
+                  if (a.orderByExpressions.nonEmpty) {
+                    orderByExpressions = a.orderByExpressions
+                  }

Review Comment:
   Yes, that's that this check is for:
   
   ```
       if (hasRepartitioning && t.hasRepartitioning) {
         throw QueryCompilationErrors
           .tableValuedFunctionRequiredMetadataIncompatibleWithCall(...)
   ```
   
   Only one of the (UDTF call, `analyze` result) can specify an explicit input 
table partitioning property.
   



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/PythonUDF.scala:
##########
@@ -207,6 +208,19 @@ case class UnresolvedPolymorphicPythonUDTF(
     copy(children = newChildren)
 }
 
+/**
+ * Represents the result of invoking the polymorphic 'analyze' method on a 
Python user-defined table
+ * function. This returns the table function's output schema in addition to 
other optional metadata.
+ * @param schema

Review Comment:
   Sounds good, done.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala:
##########
@@ -2096,23 +2096,87 @@ class Analyzer(override val catalogManager: 
CatalogManager) extends RuleExecutor
                   catalog, "table-valued functions")
               }
             }
-
+            // Resolve Python UDTF calls if needed.
+            val resolvedFunc = resolvedTvf match {
+              case g @ Generate(u: UnresolvedPolymorphicPythonUDTF, _, _, _, 
_, _) =>
+                val analyzeResult: PythonUDTFAnalyzeResult =
+                  u.resolveElementMetadata(u.func, u.children)
+                g.copy(generator =
+                  PythonUDTF(u.name, u.func, analyzeResult.schema, u.children,
+                    u.evalType, u.udfDeterministic, u.resultId, 
u.pythonUDTFPartitionColumnIndexes,
+                    analyzeResult = Some(analyzeResult)))
+              case other =>
+                other
+            }
             val tableArgs = mutable.ArrayBuffer.empty[LogicalPlan]
             val functionTableSubqueryArgs =
               
mutable.ArrayBuffer.empty[FunctionTableSubqueryArgumentExpression]
             val tvf = resolvedFunc.transformAllExpressionsWithPruning(
               _.containsPattern(FUNCTION_TABLE_RELATION_ARGUMENT_EXPRESSION), 
ruleId)  {
               case t: FunctionTableSubqueryArgumentExpression =>
                 val alias = 
SubqueryAlias.generateSubqueryName(s"_${tableArgs.size}")
+                var pythonUDTF = Option.empty[PythonUDTF]
+                var pythonUDTFAnalyzeResult = 
Option.empty[PythonUDTFAnalyzeResult]
                 resolvedFunc match {
-                  case Generate(_: PythonUDTF, _, _, _, _, _) =>
+                  case Generate(p: PythonUDTF, _, _, _, _, _) =>
+                    pythonUDTF = Some(p)
+                    pythonUDTFAnalyzeResult = p.analyzeResult

Review Comment:
   Good idea, done. I also realized that we only need the python UDTF name, so 
simplified by returning that in the `val` instead (along with the 
`AnalyzeResult`).



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/PythonUDF.scala:
##########
@@ -165,7 +165,8 @@ case class PythonUDTF(
     evalType: Int,
     udfDeterministic: Boolean,
     resultId: ExprId = NamedExpression.newExprId,
-    pythonUDTFPartitionColumnIndexes: Option[PythonUDTFPartitionColumnIndexes] 
= None)
+    pythonUDTFPartitionColumnIndexes: Option[PythonUDTFPartitionColumnIndexes] 
= None,
+    analyzeResult: Option[PythonUDTFAnalyzeResult] = None)

Review Comment:
   Sounds good, done.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala:
##########
@@ -2096,23 +2096,87 @@ class Analyzer(override val catalogManager: 
CatalogManager) extends RuleExecutor
                   catalog, "table-valued functions")
               }
             }
-
+            // Resolve Python UDTF calls if needed.
+            val resolvedFunc = resolvedTvf match {
+              case g @ Generate(u: UnresolvedPolymorphicPythonUDTF, _, _, _, 
_, _) =>
+                val analyzeResult: PythonUDTFAnalyzeResult =
+                  u.resolveElementMetadata(u.func, u.children)
+                g.copy(generator =
+                  PythonUDTF(u.name, u.func, analyzeResult.schema, u.children,
+                    u.evalType, u.udfDeterministic, u.resultId, 
u.pythonUDTFPartitionColumnIndexes,
+                    analyzeResult = Some(analyzeResult)))
+              case other =>
+                other
+            }
             val tableArgs = mutable.ArrayBuffer.empty[LogicalPlan]
             val functionTableSubqueryArgs =
               
mutable.ArrayBuffer.empty[FunctionTableSubqueryArgumentExpression]
             val tvf = resolvedFunc.transformAllExpressionsWithPruning(
               _.containsPattern(FUNCTION_TABLE_RELATION_ARGUMENT_EXPRESSION), 
ruleId)  {
               case t: FunctionTableSubqueryArgumentExpression =>
                 val alias = 
SubqueryAlias.generateSubqueryName(s"_${tableArgs.size}")
+                var pythonUDTF = Option.empty[PythonUDTF]
+                var pythonUDTFAnalyzeResult = 
Option.empty[PythonUDTFAnalyzeResult]
                 resolvedFunc match {
-                  case Generate(_: PythonUDTF, _, _, _, _, _) =>
+                  case Generate(p: PythonUDTF, _, _, _, _, _) =>
+                    pythonUDTF = Some(p)
+                    pythonUDTFAnalyzeResult = p.analyzeResult
                   case _ =>
                     assert(!t.hasRepartitioning,
                       "Cannot evaluate the table-valued function call because 
it included the " +
                         "PARTITION BY clause, but only Python table functions 
support this clause")
                 }
-                tableArgs.append(SubqueryAlias(alias, t.evaluable))
-                functionTableSubqueryArgs.append(t)
+                // Check if this is a call to a Python user-defined table 
function whose polymorphic
+                // 'analyze' method returned metadata indicated requested 
partitioning and/or
+                // ordering properties of the input relation. In that event, 
make sure that the UDTF
+                // call did not include any explicit PARTITION BY and/or ORDER 
BY clauses for the
+                // corresponding TABLE argument, and then update the TABLE 
argument representation
+                // to apply the requested partitioning and/or ordering.
+                pythonUDTFAnalyzeResult.map { a =>

Review Comment:
   Good idea, I refactored this logic into a new method `applyToTableArgument` 
in the `PythonUDTFAnalyzeResult` object.



##########
python/pyspark/sql/udtf.py:
##########
@@ -61,6 +61,26 @@ class AnalyzeArgument:
     is_table: bool
 
 
+@dataclass(frozen=True)
+class PartitioningColumn:
+    """
+    Represents a UDTF column for purposes of returning metadata from the 
'analyze' method.
+    """
+
+    name: str
+
+
+@dataclass(frozen=True)
+class OrderingColumn:

Review Comment:
   Sure, I added it here.



##########
python/pyspark/sql/tests/test_udtf.py:
##########
@@ -2131,6 +2133,118 @@ def terminate(self):
             [Row(count=40, total=60, last=2)],
         )
 
+    def test_udtf_with_table_argument_with_single_partition_from_analyze(self):
+        @udtf
+        class TestUDTF:
+            def __init__(self):
+                self._count = 0
+                self._sum = 0
+                self._last = None
+
+            @staticmethod
+            def analyze(self):
+                return AnalyzeResult(
+                    schema=StructType()
+                    .add("count", IntegerType())
+                    .add("total", IntegerType())
+                    .add("last", IntegerType()),
+                    with_single_partition=True,
+                    order_by=[OrderingColumn("input"), 
OrderingColumn("partition_col")],
+                )
+
+            def eval(self, row: Row):
+                # Make sure that the rows arrive in the expected order.
+                if self._last is not None and self._last > row["input"]:
+                    raise Exception(
+                        f"self._last was {self._last} but the row value was 
{row['input']}"
+                    )
+                self._count += 1
+                self._last = row["input"]
+                self._sum += row["input"]
+
+            def terminate(self):
+                yield self._count, self._sum, self._last
+
+        self.spark.udtf.register("test_udtf", TestUDTF)
+
+        self.assertEqual(

Review Comment:
   Done.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala:
##########
@@ -2096,23 +2096,87 @@ class Analyzer(override val catalogManager: 
CatalogManager) extends RuleExecutor
                   catalog, "table-valued functions")
               }
             }
-
+            // Resolve Python UDTF calls if needed.
+            val resolvedFunc = resolvedTvf match {
+              case g @ Generate(u: UnresolvedPolymorphicPythonUDTF, _, _, _, 
_, _) =>
+                val analyzeResult: PythonUDTFAnalyzeResult =
+                  u.resolveElementMetadata(u.func, u.children)
+                g.copy(generator =
+                  PythonUDTF(u.name, u.func, analyzeResult.schema, u.children,
+                    u.evalType, u.udfDeterministic, u.resultId, 
u.pythonUDTFPartitionColumnIndexes,
+                    analyzeResult = Some(analyzeResult)))
+              case other =>
+                other
+            }
             val tableArgs = mutable.ArrayBuffer.empty[LogicalPlan]
             val functionTableSubqueryArgs =
               
mutable.ArrayBuffer.empty[FunctionTableSubqueryArgumentExpression]
             val tvf = resolvedFunc.transformAllExpressionsWithPruning(
               _.containsPattern(FUNCTION_TABLE_RELATION_ARGUMENT_EXPRESSION), 
ruleId)  {
               case t: FunctionTableSubqueryArgumentExpression =>
                 val alias = 
SubqueryAlias.generateSubqueryName(s"_${tableArgs.size}")
+                var pythonUDTF = Option.empty[PythonUDTF]
+                var pythonUDTFAnalyzeResult = 
Option.empty[PythonUDTFAnalyzeResult]
                 resolvedFunc match {
-                  case Generate(_: PythonUDTF, _, _, _, _, _) =>
+                  case Generate(p: PythonUDTF, _, _, _, _, _) =>
+                    pythonUDTF = Some(p)
+                    pythonUDTFAnalyzeResult = p.analyzeResult
                   case _ =>
                     assert(!t.hasRepartitioning,
                       "Cannot evaluate the table-valued function call because 
it included the " +
                         "PARTITION BY clause, but only Python table functions 
support this clause")
                 }
-                tableArgs.append(SubqueryAlias(alias, t.evaluable))
-                functionTableSubqueryArgs.append(t)
+                // Check if this is a call to a Python user-defined table 
function whose polymorphic
+                // 'analyze' method returned metadata indicated requested 
partitioning and/or
+                // ordering properties of the input relation. In that event, 
make sure that the UDTF
+                // call did not include any explicit PARTITION BY and/or ORDER 
BY clauses for the
+                // corresponding TABLE argument, and then update the TABLE 
argument representation
+                // to apply the requested partitioning and/or ordering.
+                pythonUDTFAnalyzeResult.map { a =>
+                  if (a.withSinglePartition && 
a.partitionByExpressions.nonEmpty) {
+                    throw 
QueryCompilationErrors.tableValuedFunctionRequiredMetadataInvalid(
+                      functionName = pythonUDTF.get.name,
+                      reason = "the 'with_single_partition' field cannot be 
assigned to true " +
+                        "if the 'partition_by' list is non-empty")
+                  } else if (a.orderByExpressions.nonEmpty && 
!a.withSinglePartition &&

Review Comment:
   Done.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala:
##########
@@ -2096,23 +2096,87 @@ class Analyzer(override val catalogManager: 
CatalogManager) extends RuleExecutor
                   catalog, "table-valued functions")
               }
             }
-
+            // Resolve Python UDTF calls if needed.
+            val resolvedFunc = resolvedTvf match {
+              case g @ Generate(u: UnresolvedPolymorphicPythonUDTF, _, _, _, 
_, _) =>
+                val analyzeResult: PythonUDTFAnalyzeResult =
+                  u.resolveElementMetadata(u.func, u.children)
+                g.copy(generator =
+                  PythonUDTF(u.name, u.func, analyzeResult.schema, u.children,
+                    u.evalType, u.udfDeterministic, u.resultId, 
u.pythonUDTFPartitionColumnIndexes,
+                    analyzeResult = Some(analyzeResult)))
+              case other =>
+                other
+            }
             val tableArgs = mutable.ArrayBuffer.empty[LogicalPlan]
             val functionTableSubqueryArgs =
               
mutable.ArrayBuffer.empty[FunctionTableSubqueryArgumentExpression]
             val tvf = resolvedFunc.transformAllExpressionsWithPruning(
               _.containsPattern(FUNCTION_TABLE_RELATION_ARGUMENT_EXPRESSION), 
ruleId)  {
               case t: FunctionTableSubqueryArgumentExpression =>
                 val alias = 
SubqueryAlias.generateSubqueryName(s"_${tableArgs.size}")
+                var pythonUDTF = Option.empty[PythonUDTF]
+                var pythonUDTFAnalyzeResult = 
Option.empty[PythonUDTFAnalyzeResult]
                 resolvedFunc match {
-                  case Generate(_: PythonUDTF, _, _, _, _, _) =>
+                  case Generate(p: PythonUDTF, _, _, _, _, _) =>
+                    pythonUDTF = Some(p)
+                    pythonUDTFAnalyzeResult = p.analyzeResult
                   case _ =>
                     assert(!t.hasRepartitioning,
                       "Cannot evaluate the table-valued function call because 
it included the " +
                         "PARTITION BY clause, but only Python table functions 
support this clause")
                 }
-                tableArgs.append(SubqueryAlias(alias, t.evaluable))
-                functionTableSubqueryArgs.append(t)
+                // Check if this is a call to a Python user-defined table 
function whose polymorphic
+                // 'analyze' method returned metadata indicated requested 
partitioning and/or
+                // ordering properties of the input relation. In that event, 
make sure that the UDTF
+                // call did not include any explicit PARTITION BY and/or ORDER 
BY clauses for the
+                // corresponding TABLE argument, and then update the TABLE 
argument representation
+                // to apply the requested partitioning and/or ordering.
+                pythonUDTFAnalyzeResult.map { a =>

Review Comment:
   Done.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/PythonUDF.scala:
##########
@@ -207,6 +208,19 @@ case class UnresolvedPolymorphicPythonUDTF(
     copy(children = newChildren)
 }
 
+/**
+ * Represents the result of invoking the polymorphic 'analyze' method on a 
Python user-defined table
+ * function. This returns the table function's output schema in addition to 
other optional metadata.
+ * @param schema
+ */
+case class PythonUDTFAnalyzeResult(
+    schema: StructType,
+    withSinglePartition: Boolean,
+    partitionByExpressions: Seq[Expression],
+    orderByExpressions: Seq[SortOrder]) {
+  def hasRepartitioning: Boolean = withSinglePartition || 
partitionByExpressions.nonEmpty

Review Comment:
   It turned out there was only one call site for this, so I removed it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to