allisonwang-db commented on code in PR #42595:
URL: https://github.com/apache/spark/pull/42595#discussion_r1310650141
##########
sql/core/src/test/scala/org/apache/spark/sql/IntegratedUDFTestUtils.scala:
##########
@@ -441,6 +443,255 @@ object IntegratedUDFTestUtils extends SQLHelper {
val prettyName: String = "Regular Python UDTF"
}
+ object TestPythonUDTFCountSumLast extends TestUDTF {
+ val name: String = "UDTFCountSumLast"
+ val pythonScript: String =
+ s"""
+ |from pyspark.sql.functions import AnalyzeResult, OrderingColumn,
PartitioningColumn
+ |from pyspark.sql.types import IntegerType, Row, StructType
+ |class $name:
+ | def __init__(self):
+ | self._count = 0
+ | self._sum = 0
+ | self._last = None
+ |
+ | @staticmethod
+ | def analyze(self):
Review Comment:
It looks like we can make this a non-polymorphic function.
Is it required to implement this `analyze` method to work with PARTITION BY
and ORDER BY?
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/PythonUDF.scala:
##########
@@ -165,7 +165,8 @@ case class PythonUDTF(
evalType: Int,
udfDeterministic: Boolean,
resultId: ExprId = NamedExpression.newExprId,
- pythonUDTFPartitionColumnIndexes: Option[PythonUDTFPartitionColumnIndexes]
= None)
+ pythonUDTFPartitionColumnIndexes: Option[PythonUDTFPartitionColumnIndexes]
= None,
+ analyzeResult: Option[PythonUDTFAnalyzeResult] = None)
Review Comment:
We have many arguments for the PythonUDTF case class now. It would be great
to add some docstring for each of the parameters :)
##########
sql/core/src/test/resources/sql-tests/inputs/udtf/udtf.sql:
##########
@@ -16,3 +19,27 @@ SELECT * FROM udtf(1, 2) t(c1, c2), LATERAL udtf(c1, c2);
-- test non-deterministic input
SELECT * FROM udtf(cast(rand(0) AS int) + 1, 1);
+
+-- test UDTF calls that take input TABLE arguments
+SELECT * FROM UDTFCountSumLast(TABLE(t2) WITH SINGLE PARTITION);
+SELECT * FROM UDTFCountSumLast(TABLE(t2) PARTITION BY partition_col ORDER BY
input);
+SELECT * FROM UDTFCountSumLast(TABLE(t2) PARTITION BY partition_col ORDER BY
input DESC);
+
+-- test UDTF calls that take input TABLE arguments and the 'analyze' method
returns required
+-- partitioning and/or ordering properties for Catalyst to enforce for the
input table
+SELECT * FROM UDTFWithSinglePartition(TABLE(t2));
+SELECT * FROM UDTFPartitionByOrderBy(TABLE(t2));
+SELECT * FROM UDTFWithSinglePartition(TABLE(t2) WITH SINGLE PARTITION);
+SELECT * FROM UDTFWithSinglePartition(TABLE(t2) PARTITION BY partition_col);
+SELECT * FROM UDTFPartitionByOrderBy(TABLE(t2) WITH SINGLE PARTITION);
+SELECT * FROM UDTFPartitionByOrderBy(TABLE(t2) PARTITION BY partition_col);
+SELECT * FROM UDTFInvalidPartitionByAndWithSinglePartition(TABLE(t2));
+SELECT * FROM UDTFInvalidPartitionByAndWithSinglePartition(TABLE(t2) WITH
SINGLE PARTITION);
+SELECT * FROM UDTFInvalidPartitionByAndWithSinglePartition(TABLE(t2) PARTITION
BY partition_col);
+SELECT * FROM UDTFInvalidOrderByWithoutPartitionBy(TABLE(t2));
+SELECT * FROM UDTFInvalidOrderByWithoutPartitionBy(TABLE(t2) WITH SINGLE
PARTITION);
+SELECT * FROM UDTFInvalidOrderByWithoutPartitionBy(TABLE(t2) PARTITION BY
partition_col);
Review Comment:
Shall we add a couple of test cases with LATERAL JOIN?
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/PythonUDF.scala:
##########
@@ -207,6 +208,19 @@ case class UnresolvedPolymorphicPythonUDTF(
copy(children = newChildren)
}
+/**
+ * Represents the result of invoking the polymorphic 'analyze' method on a
Python user-defined table
+ * function. This returns the table function's output schema in addition to
other optional metadata.
+ * @param schema
+ */
+case class PythonUDTFAnalyzeResult(
+ schema: StructType,
+ withSinglePartition: Boolean,
+ partitionByExpressions: Seq[Expression],
+ orderByExpressions: Seq[SortOrder]) {
+ def hasRepartitioning: Boolean = withSinglePartition ||
partitionByExpressions.nonEmpty
Review Comment:
`laze val`?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]