dtenedor commented on code in PR #42595:
URL: https://github.com/apache/spark/pull/42595#discussion_r1310950704
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/python/UserDefinedPythonFunction.scala:
##########
@@ -81,7 +84,7 @@ case class UserDefinedPythonTableFunction(
func: PythonFunction,
returnType: Option[StructType],
pythonEvalType: Int,
- udfDeterministic: Boolean) {
+ udfDeterministic: Boolean) extends Logging {
Review Comment:
Done.
##########
sql/core/src/test/resources/sql-tests/inputs/udtf/udtf.sql:
##########
@@ -16,3 +19,27 @@ SELECT * FROM udtf(1, 2) t(c1, c2), LATERAL udtf(c1, c2);
-- test non-deterministic input
SELECT * FROM udtf(cast(rand(0) AS int) + 1, 1);
+
+-- test UDTF calls that take input TABLE arguments
+SELECT * FROM UDTFCountSumLast(TABLE(t2) WITH SINGLE PARTITION);
+SELECT * FROM UDTFCountSumLast(TABLE(t2) PARTITION BY partition_col ORDER BY
input);
+SELECT * FROM UDTFCountSumLast(TABLE(t2) PARTITION BY partition_col ORDER BY
input DESC);
+
+-- test UDTF calls that take input TABLE arguments and the 'analyze' method
returns required
+-- partitioning and/or ordering properties for Catalyst to enforce for the
input table
+SELECT * FROM UDTFWithSinglePartition(TABLE(t2));
+SELECT * FROM UDTFPartitionByOrderBy(TABLE(t2));
+SELECT * FROM UDTFWithSinglePartition(TABLE(t2) WITH SINGLE PARTITION);
+SELECT * FROM UDTFWithSinglePartition(TABLE(t2) PARTITION BY partition_col);
+SELECT * FROM UDTFPartitionByOrderBy(TABLE(t2) WITH SINGLE PARTITION);
+SELECT * FROM UDTFPartitionByOrderBy(TABLE(t2) PARTITION BY partition_col);
+SELECT * FROM UDTFInvalidPartitionByAndWithSinglePartition(TABLE(t2));
+SELECT * FROM UDTFInvalidPartitionByAndWithSinglePartition(TABLE(t2) WITH
SINGLE PARTITION);
+SELECT * FROM UDTFInvalidPartitionByAndWithSinglePartition(TABLE(t2) PARTITION
BY partition_col);
+SELECT * FROM UDTFInvalidOrderByWithoutPartitionBy(TABLE(t2));
+SELECT * FROM UDTFInvalidOrderByWithoutPartitionBy(TABLE(t2) WITH SINGLE
PARTITION);
+SELECT * FROM UDTFInvalidOrderByWithoutPartitionBy(TABLE(t2) PARTITION BY
partition_col);
Review Comment:
👍 I added a lateral join test for each UDTF type.
##########
sql/core/src/test/resources/sql-tests/inputs/udtf/udtf.sql:
##########
@@ -16,3 +19,27 @@ SELECT * FROM udtf(1, 2) t(c1, c2), LATERAL udtf(c1, c2);
-- test non-deterministic input
SELECT * FROM udtf(cast(rand(0) AS int) + 1, 1);
+
+-- test UDTF calls that take input TABLE arguments
+SELECT * FROM UDTFCountSumLast(TABLE(t2) WITH SINGLE PARTITION);
Review Comment:
Sounds good, I ported the `AnalyzeResult` to the comments here for each UDTF.
##########
sql/core/src/test/resources/sql-tests/inputs/udtf/udtf.sql:
##########
@@ -16,3 +19,27 @@ SELECT * FROM udtf(1, 2) t(c1, c2), LATERAL udtf(c1, c2);
-- test non-deterministic input
SELECT * FROM udtf(cast(rand(0) AS int) + 1, 1);
+
+-- test UDTF calls that take input TABLE arguments
+SELECT * FROM UDTFCountSumLast(TABLE(t2) WITH SINGLE PARTITION);
+SELECT * FROM UDTFCountSumLast(TABLE(t2) PARTITION BY partition_col ORDER BY
input);
+SELECT * FROM UDTFCountSumLast(TABLE(t2) PARTITION BY partition_col ORDER BY
input DESC);
+
+-- test UDTF calls that take input TABLE arguments and the 'analyze' method
returns required
+-- partitioning and/or ordering properties for Catalyst to enforce for the
input table
+SELECT * FROM UDTFWithSinglePartition(TABLE(t2));
+SELECT * FROM UDTFPartitionByOrderBy(TABLE(t2));
+SELECT * FROM UDTFWithSinglePartition(TABLE(t2) WITH SINGLE PARTITION);
+SELECT * FROM UDTFWithSinglePartition(TABLE(t2) PARTITION BY partition_col);
+SELECT * FROM UDTFPartitionByOrderBy(TABLE(t2) WITH SINGLE PARTITION);
+SELECT * FROM UDTFPartitionByOrderBy(TABLE(t2) PARTITION BY partition_col);
+SELECT * FROM UDTFInvalidPartitionByAndWithSinglePartition(TABLE(t2));
+SELECT * FROM UDTFInvalidPartitionByAndWithSinglePartition(TABLE(t2) WITH
SINGLE PARTITION);
+SELECT * FROM UDTFInvalidPartitionByAndWithSinglePartition(TABLE(t2) PARTITION
BY partition_col);
+SELECT * FROM UDTFInvalidOrderByWithoutPartitionBy(TABLE(t2));
+SELECT * FROM UDTFInvalidOrderByWithoutPartitionBy(TABLE(t2) WITH SINGLE
PARTITION);
+SELECT * FROM UDTFInvalidOrderByWithoutPartitionBy(TABLE(t2) PARTITION BY
partition_col);
Review Comment:
👍 I added a lateral join test for each UDTF type.
##########
sql/core/src/test/scala/org/apache/spark/sql/IntegratedUDFTestUtils.scala:
##########
@@ -441,6 +443,255 @@ object IntegratedUDFTestUtils extends SQLHelper {
val prettyName: String = "Regular Python UDTF"
}
+ object TestPythonUDTFCountSumLast extends TestUDTF {
+ val name: String = "UDTFCountSumLast"
+ val pythonScript: String =
+ s"""
+ |from pyspark.sql.functions import AnalyzeResult, OrderingColumn,
PartitioningColumn
+ |from pyspark.sql.types import IntegerType, Row, StructType
+ |class $name:
+ | def __init__(self):
+ | self._count = 0
+ | self._sum = 0
+ | self._last = None
+ |
+ | @staticmethod
+ | def analyze(self):
Review Comment:
Yes, it is required. Ostensibly we could put this in the static
registration, but most use cases would probably want it to work with a
polymorphic output schema, so we can just leave it here. In general the
`AnalyzeResult` can become a collecting place for metadata like this, the UDTF
can implement `analyze` if it want to specify this extra information as needed
(even if the output schema is always the same).
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala:
##########
@@ -2096,23 +2096,87 @@ class Analyzer(override val catalogManager:
CatalogManager) extends RuleExecutor
catalog, "table-valued functions")
}
}
-
+ // Resolve Python UDTF calls if needed.
+ val resolvedFunc = resolvedTvf match {
+ case g @ Generate(u: UnresolvedPolymorphicPythonUDTF, _, _, _,
_, _) =>
+ val analyzeResult: PythonUDTFAnalyzeResult =
+ u.resolveElementMetadata(u.func, u.children)
+ g.copy(generator =
+ PythonUDTF(u.name, u.func, analyzeResult.schema, u.children,
+ u.evalType, u.udfDeterministic, u.resultId,
u.pythonUDTFPartitionColumnIndexes,
+ analyzeResult = Some(analyzeResult)))
+ case other =>
+ other
+ }
val tableArgs = mutable.ArrayBuffer.empty[LogicalPlan]
val functionTableSubqueryArgs =
mutable.ArrayBuffer.empty[FunctionTableSubqueryArgumentExpression]
val tvf = resolvedFunc.transformAllExpressionsWithPruning(
_.containsPattern(FUNCTION_TABLE_RELATION_ARGUMENT_EXPRESSION),
ruleId) {
case t: FunctionTableSubqueryArgumentExpression =>
val alias =
SubqueryAlias.generateSubqueryName(s"_${tableArgs.size}")
+ var pythonUDTF = Option.empty[PythonUDTF]
+ var pythonUDTFAnalyzeResult =
Option.empty[PythonUDTFAnalyzeResult]
resolvedFunc match {
- case Generate(_: PythonUDTF, _, _, _, _, _) =>
+ case Generate(p: PythonUDTF, _, _, _, _, _) =>
+ pythonUDTF = Some(p)
+ pythonUDTFAnalyzeResult = p.analyzeResult
case _ =>
assert(!t.hasRepartitioning,
"Cannot evaluate the table-valued function call because
it included the " +
"PARTITION BY clause, but only Python table functions
support this clause")
}
- tableArgs.append(SubqueryAlias(alias, t.evaluable))
- functionTableSubqueryArgs.append(t)
+ // Check if this is a call to a Python user-defined table
function whose polymorphic
+ // 'analyze' method returned metadata indicated requested
partitioning and/or
+ // ordering properties of the input relation. In that event,
make sure that the UDTF
+ // call did not include any explicit PARTITION BY and/or ORDER
BY clauses for the
+ // corresponding TABLE argument, and then update the TABLE
argument representation
+ // to apply the requested partitioning and/or ordering.
+ pythonUDTFAnalyzeResult.map { a =>
+ if (a.withSinglePartition &&
a.partitionByExpressions.nonEmpty) {
+ throw
QueryCompilationErrors.tableValuedFunctionRequiredMetadataInvalid(
+ functionName = pythonUDTF.get.name,
+ reason = "the 'with_single_partition' field cannot be
assigned to true " +
+ "if the 'partition_by' list is non-empty")
+ } else if (a.orderByExpressions.nonEmpty &&
!a.withSinglePartition &&
+ a.partitionByExpressions.isEmpty) {
+ throw
QueryCompilationErrors.tableValuedFunctionRequiredMetadataInvalid(
+ functionName = pythonUDTF.get.name,
+ reason = "the 'order_by' field cannot be non-empty
unless the " +
+ "'with_single_partition' field is set to true or the
'partition_by' list " +
+ "is non-empty")
+ } else if (a.hasRepartitioning && t.hasRepartitioning) {
+ throw QueryCompilationErrors
+ .tableValuedFunctionRequiredMetadataIncompatibleWithCall(
+ functionName = pythonUDTF.get.name,
+ requestedMetadata =
+ "specified its own required partitioning of the
input table",
+ invalidFunctionCallProperty =
+ "specified the WITH SINGLE PARTITION or PARTITION BY
clause; " +
+ "please remove these clauses and retry the query
again.")
+ }
+ var withSinglePartition = t.withSinglePartition
+ var partitionByExpressions = t.partitionByExpressions
+ var orderByExpressions = t.orderByExpressions
+ if (a.withSinglePartition) {
+ withSinglePartition = true
+ }
+ if (a.partitionByExpressions.nonEmpty) {
+ partitionByExpressions = a.partitionByExpressions
+ }
+ if (a.orderByExpressions.nonEmpty) {
+ orderByExpressions = a.orderByExpressions
+ }
Review Comment:
Yes, that's that this check is for:
```
if (hasRepartitioning && t.hasRepartitioning) {
throw QueryCompilationErrors
.tableValuedFunctionRequiredMetadataIncompatibleWithCall(...)
```
Only one of the (UDTF call, `analyze` result) can specify an explicit input
table partitioning property.
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/PythonUDF.scala:
##########
@@ -207,6 +208,19 @@ case class UnresolvedPolymorphicPythonUDTF(
copy(children = newChildren)
}
+/**
+ * Represents the result of invoking the polymorphic 'analyze' method on a
Python user-defined table
+ * function. This returns the table function's output schema in addition to
other optional metadata.
+ * @param schema
Review Comment:
Sounds good, done.
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala:
##########
@@ -2096,23 +2096,87 @@ class Analyzer(override val catalogManager:
CatalogManager) extends RuleExecutor
catalog, "table-valued functions")
}
}
-
+ // Resolve Python UDTF calls if needed.
+ val resolvedFunc = resolvedTvf match {
+ case g @ Generate(u: UnresolvedPolymorphicPythonUDTF, _, _, _,
_, _) =>
+ val analyzeResult: PythonUDTFAnalyzeResult =
+ u.resolveElementMetadata(u.func, u.children)
+ g.copy(generator =
+ PythonUDTF(u.name, u.func, analyzeResult.schema, u.children,
+ u.evalType, u.udfDeterministic, u.resultId,
u.pythonUDTFPartitionColumnIndexes,
+ analyzeResult = Some(analyzeResult)))
+ case other =>
+ other
+ }
val tableArgs = mutable.ArrayBuffer.empty[LogicalPlan]
val functionTableSubqueryArgs =
mutable.ArrayBuffer.empty[FunctionTableSubqueryArgumentExpression]
val tvf = resolvedFunc.transformAllExpressionsWithPruning(
_.containsPattern(FUNCTION_TABLE_RELATION_ARGUMENT_EXPRESSION),
ruleId) {
case t: FunctionTableSubqueryArgumentExpression =>
val alias =
SubqueryAlias.generateSubqueryName(s"_${tableArgs.size}")
+ var pythonUDTF = Option.empty[PythonUDTF]
+ var pythonUDTFAnalyzeResult =
Option.empty[PythonUDTFAnalyzeResult]
resolvedFunc match {
- case Generate(_: PythonUDTF, _, _, _, _, _) =>
+ case Generate(p: PythonUDTF, _, _, _, _, _) =>
+ pythonUDTF = Some(p)
+ pythonUDTFAnalyzeResult = p.analyzeResult
Review Comment:
Good idea, done. I also realized that we only need the python UDTF name, so
simplified by returning that in the `val` instead (along with the
`AnalyzeResult`).
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/PythonUDF.scala:
##########
@@ -165,7 +165,8 @@ case class PythonUDTF(
evalType: Int,
udfDeterministic: Boolean,
resultId: ExprId = NamedExpression.newExprId,
- pythonUDTFPartitionColumnIndexes: Option[PythonUDTFPartitionColumnIndexes]
= None)
+ pythonUDTFPartitionColumnIndexes: Option[PythonUDTFPartitionColumnIndexes]
= None,
+ analyzeResult: Option[PythonUDTFAnalyzeResult] = None)
Review Comment:
Sounds good, done.
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala:
##########
@@ -2096,23 +2096,87 @@ class Analyzer(override val catalogManager:
CatalogManager) extends RuleExecutor
catalog, "table-valued functions")
}
}
-
+ // Resolve Python UDTF calls if needed.
+ val resolvedFunc = resolvedTvf match {
+ case g @ Generate(u: UnresolvedPolymorphicPythonUDTF, _, _, _,
_, _) =>
+ val analyzeResult: PythonUDTFAnalyzeResult =
+ u.resolveElementMetadata(u.func, u.children)
+ g.copy(generator =
+ PythonUDTF(u.name, u.func, analyzeResult.schema, u.children,
+ u.evalType, u.udfDeterministic, u.resultId,
u.pythonUDTFPartitionColumnIndexes,
+ analyzeResult = Some(analyzeResult)))
+ case other =>
+ other
+ }
val tableArgs = mutable.ArrayBuffer.empty[LogicalPlan]
val functionTableSubqueryArgs =
mutable.ArrayBuffer.empty[FunctionTableSubqueryArgumentExpression]
val tvf = resolvedFunc.transformAllExpressionsWithPruning(
_.containsPattern(FUNCTION_TABLE_RELATION_ARGUMENT_EXPRESSION),
ruleId) {
case t: FunctionTableSubqueryArgumentExpression =>
val alias =
SubqueryAlias.generateSubqueryName(s"_${tableArgs.size}")
+ var pythonUDTF = Option.empty[PythonUDTF]
+ var pythonUDTFAnalyzeResult =
Option.empty[PythonUDTFAnalyzeResult]
resolvedFunc match {
- case Generate(_: PythonUDTF, _, _, _, _, _) =>
+ case Generate(p: PythonUDTF, _, _, _, _, _) =>
+ pythonUDTF = Some(p)
+ pythonUDTFAnalyzeResult = p.analyzeResult
case _ =>
assert(!t.hasRepartitioning,
"Cannot evaluate the table-valued function call because
it included the " +
"PARTITION BY clause, but only Python table functions
support this clause")
}
- tableArgs.append(SubqueryAlias(alias, t.evaluable))
- functionTableSubqueryArgs.append(t)
+ // Check if this is a call to a Python user-defined table
function whose polymorphic
+ // 'analyze' method returned metadata indicated requested
partitioning and/or
+ // ordering properties of the input relation. In that event,
make sure that the UDTF
+ // call did not include any explicit PARTITION BY and/or ORDER
BY clauses for the
+ // corresponding TABLE argument, and then update the TABLE
argument representation
+ // to apply the requested partitioning and/or ordering.
+ pythonUDTFAnalyzeResult.map { a =>
Review Comment:
Good idea, I refactored this logic into a new method `applyToTableArgument`
in the `PythonUDTFAnalyzeResult` object.
##########
python/pyspark/sql/udtf.py:
##########
@@ -61,6 +61,26 @@ class AnalyzeArgument:
is_table: bool
+@dataclass(frozen=True)
+class PartitioningColumn:
+ """
+ Represents a UDTF column for purposes of returning metadata from the
'analyze' method.
+ """
+
+ name: str
+
+
+@dataclass(frozen=True)
+class OrderingColumn:
Review Comment:
Sure, I added it here.
##########
python/pyspark/sql/tests/test_udtf.py:
##########
@@ -2131,6 +2133,118 @@ def terminate(self):
[Row(count=40, total=60, last=2)],
)
+ def test_udtf_with_table_argument_with_single_partition_from_analyze(self):
+ @udtf
+ class TestUDTF:
+ def __init__(self):
+ self._count = 0
+ self._sum = 0
+ self._last = None
+
+ @staticmethod
+ def analyze(self):
+ return AnalyzeResult(
+ schema=StructType()
+ .add("count", IntegerType())
+ .add("total", IntegerType())
+ .add("last", IntegerType()),
+ with_single_partition=True,
+ order_by=[OrderingColumn("input"),
OrderingColumn("partition_col")],
+ )
+
+ def eval(self, row: Row):
+ # Make sure that the rows arrive in the expected order.
+ if self._last is not None and self._last > row["input"]:
+ raise Exception(
+ f"self._last was {self._last} but the row value was
{row['input']}"
+ )
+ self._count += 1
+ self._last = row["input"]
+ self._sum += row["input"]
+
+ def terminate(self):
+ yield self._count, self._sum, self._last
+
+ self.spark.udtf.register("test_udtf", TestUDTF)
+
+ self.assertEqual(
Review Comment:
Done.
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala:
##########
@@ -2096,23 +2096,87 @@ class Analyzer(override val catalogManager:
CatalogManager) extends RuleExecutor
catalog, "table-valued functions")
}
}
-
+ // Resolve Python UDTF calls if needed.
+ val resolvedFunc = resolvedTvf match {
+ case g @ Generate(u: UnresolvedPolymorphicPythonUDTF, _, _, _,
_, _) =>
+ val analyzeResult: PythonUDTFAnalyzeResult =
+ u.resolveElementMetadata(u.func, u.children)
+ g.copy(generator =
+ PythonUDTF(u.name, u.func, analyzeResult.schema, u.children,
+ u.evalType, u.udfDeterministic, u.resultId,
u.pythonUDTFPartitionColumnIndexes,
+ analyzeResult = Some(analyzeResult)))
+ case other =>
+ other
+ }
val tableArgs = mutable.ArrayBuffer.empty[LogicalPlan]
val functionTableSubqueryArgs =
mutable.ArrayBuffer.empty[FunctionTableSubqueryArgumentExpression]
val tvf = resolvedFunc.transformAllExpressionsWithPruning(
_.containsPattern(FUNCTION_TABLE_RELATION_ARGUMENT_EXPRESSION),
ruleId) {
case t: FunctionTableSubqueryArgumentExpression =>
val alias =
SubqueryAlias.generateSubqueryName(s"_${tableArgs.size}")
+ var pythonUDTF = Option.empty[PythonUDTF]
+ var pythonUDTFAnalyzeResult =
Option.empty[PythonUDTFAnalyzeResult]
resolvedFunc match {
- case Generate(_: PythonUDTF, _, _, _, _, _) =>
+ case Generate(p: PythonUDTF, _, _, _, _, _) =>
+ pythonUDTF = Some(p)
+ pythonUDTFAnalyzeResult = p.analyzeResult
case _ =>
assert(!t.hasRepartitioning,
"Cannot evaluate the table-valued function call because
it included the " +
"PARTITION BY clause, but only Python table functions
support this clause")
}
- tableArgs.append(SubqueryAlias(alias, t.evaluable))
- functionTableSubqueryArgs.append(t)
+ // Check if this is a call to a Python user-defined table
function whose polymorphic
+ // 'analyze' method returned metadata indicated requested
partitioning and/or
+ // ordering properties of the input relation. In that event,
make sure that the UDTF
+ // call did not include any explicit PARTITION BY and/or ORDER
BY clauses for the
+ // corresponding TABLE argument, and then update the TABLE
argument representation
+ // to apply the requested partitioning and/or ordering.
+ pythonUDTFAnalyzeResult.map { a =>
+ if (a.withSinglePartition &&
a.partitionByExpressions.nonEmpty) {
+ throw
QueryCompilationErrors.tableValuedFunctionRequiredMetadataInvalid(
+ functionName = pythonUDTF.get.name,
+ reason = "the 'with_single_partition' field cannot be
assigned to true " +
+ "if the 'partition_by' list is non-empty")
+ } else if (a.orderByExpressions.nonEmpty &&
!a.withSinglePartition &&
Review Comment:
Done.
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala:
##########
@@ -2096,23 +2096,87 @@ class Analyzer(override val catalogManager:
CatalogManager) extends RuleExecutor
catalog, "table-valued functions")
}
}
-
+ // Resolve Python UDTF calls if needed.
+ val resolvedFunc = resolvedTvf match {
+ case g @ Generate(u: UnresolvedPolymorphicPythonUDTF, _, _, _,
_, _) =>
+ val analyzeResult: PythonUDTFAnalyzeResult =
+ u.resolveElementMetadata(u.func, u.children)
+ g.copy(generator =
+ PythonUDTF(u.name, u.func, analyzeResult.schema, u.children,
+ u.evalType, u.udfDeterministic, u.resultId,
u.pythonUDTFPartitionColumnIndexes,
+ analyzeResult = Some(analyzeResult)))
+ case other =>
+ other
+ }
val tableArgs = mutable.ArrayBuffer.empty[LogicalPlan]
val functionTableSubqueryArgs =
mutable.ArrayBuffer.empty[FunctionTableSubqueryArgumentExpression]
val tvf = resolvedFunc.transformAllExpressionsWithPruning(
_.containsPattern(FUNCTION_TABLE_RELATION_ARGUMENT_EXPRESSION),
ruleId) {
case t: FunctionTableSubqueryArgumentExpression =>
val alias =
SubqueryAlias.generateSubqueryName(s"_${tableArgs.size}")
+ var pythonUDTF = Option.empty[PythonUDTF]
+ var pythonUDTFAnalyzeResult =
Option.empty[PythonUDTFAnalyzeResult]
resolvedFunc match {
- case Generate(_: PythonUDTF, _, _, _, _, _) =>
+ case Generate(p: PythonUDTF, _, _, _, _, _) =>
+ pythonUDTF = Some(p)
+ pythonUDTFAnalyzeResult = p.analyzeResult
case _ =>
assert(!t.hasRepartitioning,
"Cannot evaluate the table-valued function call because
it included the " +
"PARTITION BY clause, but only Python table functions
support this clause")
}
- tableArgs.append(SubqueryAlias(alias, t.evaluable))
- functionTableSubqueryArgs.append(t)
+ // Check if this is a call to a Python user-defined table
function whose polymorphic
+ // 'analyze' method returned metadata indicated requested
partitioning and/or
+ // ordering properties of the input relation. In that event,
make sure that the UDTF
+ // call did not include any explicit PARTITION BY and/or ORDER
BY clauses for the
+ // corresponding TABLE argument, and then update the TABLE
argument representation
+ // to apply the requested partitioning and/or ordering.
+ pythonUDTFAnalyzeResult.map { a =>
Review Comment:
Done.
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/PythonUDF.scala:
##########
@@ -207,6 +208,19 @@ case class UnresolvedPolymorphicPythonUDTF(
copy(children = newChildren)
}
+/**
+ * Represents the result of invoking the polymorphic 'analyze' method on a
Python user-defined table
+ * function. This returns the table function's output schema in addition to
other optional metadata.
+ * @param schema
+ */
+case class PythonUDTFAnalyzeResult(
+ schema: StructType,
+ withSinglePartition: Boolean,
+ partitionByExpressions: Seq[Expression],
+ orderByExpressions: Seq[SortOrder]) {
+ def hasRepartitioning: Boolean = withSinglePartition ||
partitionByExpressions.nonEmpty
Review Comment:
It turned out there was only one call site for this, so I removed it.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]