allisonwang-db commented on code in PR #42595:
URL: https://github.com/apache/spark/pull/42595#discussion_r1310650141


##########
sql/core/src/test/scala/org/apache/spark/sql/IntegratedUDFTestUtils.scala:
##########
@@ -441,6 +443,255 @@ object IntegratedUDFTestUtils extends SQLHelper {
     val prettyName: String = "Regular Python UDTF"
   }
 
+  object TestPythonUDTFCountSumLast extends TestUDTF {
+    val name: String = "UDTFCountSumLast"
+    val pythonScript: String =
+      s"""
+         |from pyspark.sql.functions import AnalyzeResult, OrderingColumn, 
PartitioningColumn
+         |from pyspark.sql.types import IntegerType, Row, StructType
+         |class $name:
+         |    def __init__(self):
+         |        self._count = 0
+         |        self._sum = 0
+         |        self._last = None
+         |
+         |    @staticmethod
+         |    def analyze(self):

Review Comment:
   It looks like we can make this a non-polymorphic function.
   Is it required to implement this `analyze` method to work with PARTITION BY 
and ORDER BY?



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/PythonUDF.scala:
##########
@@ -165,7 +165,8 @@ case class PythonUDTF(
     evalType: Int,
     udfDeterministic: Boolean,
     resultId: ExprId = NamedExpression.newExprId,
-    pythonUDTFPartitionColumnIndexes: Option[PythonUDTFPartitionColumnIndexes] 
= None)
+    pythonUDTFPartitionColumnIndexes: Option[PythonUDTFPartitionColumnIndexes] 
= None,
+    analyzeResult: Option[PythonUDTFAnalyzeResult] = None)

Review Comment:
   We have many arguments for the PythonUDTF case class now. It would be great 
to add some docstring for each of the parameters :)



##########
sql/core/src/test/resources/sql-tests/inputs/udtf/udtf.sql:
##########
@@ -16,3 +19,27 @@ SELECT * FROM udtf(1, 2) t(c1, c2), LATERAL udtf(c1, c2);
 
 -- test non-deterministic input
 SELECT * FROM udtf(cast(rand(0) AS int) + 1, 1);
+
+-- test UDTF calls that take input TABLE arguments
+SELECT * FROM UDTFCountSumLast(TABLE(t2) WITH SINGLE PARTITION);
+SELECT * FROM UDTFCountSumLast(TABLE(t2) PARTITION BY partition_col ORDER BY 
input);
+SELECT * FROM UDTFCountSumLast(TABLE(t2) PARTITION BY partition_col ORDER BY 
input DESC);
+
+-- test UDTF calls that take input TABLE arguments and the 'analyze' method 
returns required
+-- partitioning and/or ordering properties for Catalyst to enforce for the 
input table
+SELECT * FROM UDTFWithSinglePartition(TABLE(t2));
+SELECT * FROM UDTFPartitionByOrderBy(TABLE(t2));
+SELECT * FROM UDTFWithSinglePartition(TABLE(t2) WITH SINGLE PARTITION);
+SELECT * FROM UDTFWithSinglePartition(TABLE(t2) PARTITION BY partition_col);
+SELECT * FROM UDTFPartitionByOrderBy(TABLE(t2) WITH SINGLE PARTITION);
+SELECT * FROM UDTFPartitionByOrderBy(TABLE(t2) PARTITION BY partition_col);
+SELECT * FROM UDTFInvalidPartitionByAndWithSinglePartition(TABLE(t2));
+SELECT * FROM UDTFInvalidPartitionByAndWithSinglePartition(TABLE(t2) WITH 
SINGLE PARTITION);
+SELECT * FROM UDTFInvalidPartitionByAndWithSinglePartition(TABLE(t2) PARTITION 
BY partition_col);
+SELECT * FROM UDTFInvalidOrderByWithoutPartitionBy(TABLE(t2));
+SELECT * FROM UDTFInvalidOrderByWithoutPartitionBy(TABLE(t2) WITH SINGLE 
PARTITION);
+SELECT * FROM UDTFInvalidOrderByWithoutPartitionBy(TABLE(t2) PARTITION BY 
partition_col);

Review Comment:
   Shall we add a couple of test cases with LATERAL JOIN?



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/PythonUDF.scala:
##########
@@ -207,6 +208,19 @@ case class UnresolvedPolymorphicPythonUDTF(
     copy(children = newChildren)
 }
 
+/**
+ * Represents the result of invoking the polymorphic 'analyze' method on a 
Python user-defined table
+ * function. This returns the table function's output schema in addition to 
other optional metadata.
+ * @param schema
+ */
+case class PythonUDTFAnalyzeResult(
+    schema: StructType,
+    withSinglePartition: Boolean,
+    partitionByExpressions: Seq[Expression],
+    orderByExpressions: Seq[SortOrder]) {
+  def hasRepartitioning: Boolean = withSinglePartition || 
partitionByExpressions.nonEmpty

Review Comment:
   `laze val`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to