dtenedor commented on code in PR #42174:
URL: https://github.com/apache/spark/pull/42174#discussion_r1280970732


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala:
##########
@@ -2073,6 +2073,13 @@ class Analyzer(override val catalogManager: 
CatalogManager) extends RuleExecutor
               _.containsPattern(FUNCTION_TABLE_RELATION_ARGUMENT_EXPRESSION), 
ruleId)  {
               case t: FunctionTableSubqueryArgumentExpression =>
                 val alias = 
SubqueryAlias.generateSubqueryName(s"_${tableArgs.size}")
+                resolvedFunc match {
+                  case Generate(_: PythonUDTF, _, _, _, _, _) =>
+                  case _ if t.hasRepartitioning =>
+                    throw 
QueryCompilationErrors.tableValuedFunctionPartitionByClauseNotSupported(

Review Comment:
   I tried this, but got a different data type mismatch error instead. Since 
I'm unable to exercise this error message, I changed it to an assert for now 
instead, and left the unit test that exercises using a `TABLE` argument type 
with EXPLODE.



##########
python/pyspark/sql/tests/test_udtf.py:
##########
@@ -14,9 +14,12 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 #
+
 import os
+import re
 import shutil
 import tempfile
+import textwrap

Review Comment:
   Apologies for missing this, reverted this file now.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala:
##########
@@ -2445,8 +2452,12 @@ class Analyzer(override val catalogManager: 
CatalogManager) extends RuleExecutor
           InSubquery(values, expr.asInstanceOf[ListQuery])
         case s @ LateralSubquery(sub, _, exprId, _, _) if !sub.resolved =>
           resolveSubQuery(s, outer)(LateralSubquery(_, _, exprId))
-        case a @ FunctionTableSubqueryArgumentExpression(sub, _, exprId) if 
!sub.resolved =>
-          resolveSubQuery(a, outer)(FunctionTableSubqueryArgumentExpression(_, 
_, exprId))
+        case a @ FunctionTableSubqueryArgumentExpression(
+            sub, _, exprId, partitionByExpressions, withSinglePartition, 
orderByExpressions)

Review Comment:
   Done, thanks, this is better.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/FunctionTableSubqueryArgumentExpression.scala:
##########
@@ -32,11 +32,40 @@ import org.apache.spark.sql.types.DataType
  * table in the catalog. In the latter case, the relation argument comprises
  * a table subquery that may itself refer to one or more tables in its own
  * FROM clause.
+ *
+ * Each TABLE argument may also optionally include a PARTITION BY clause. If 
present, these indicate
+ * how to logically split up the input relation such that the table-valued 
function evaluates
+ * exactly once for each partition, and returns the union of all results. If 
no partitioning list is
+ * present, this splitting of the input relation is undefined. Furthermore, if 
the PARTITION BY
+ * clause includes a following ORDER BY clause, Catalyst will sort the rows in 
each partition such
+ * that the table-valued function receives them one-by-one in the requested 
order. Otherwise, if no
+ * such ordering is specified, the ordering of rows within each partition is 
undefined.
+ *
+ * @param plan the logical plan provided as input for the table argument as 
either a logical
+ *             relation or as a more complex logical plan in the event of a 
table subquery.
+ * @param outerAttrs outer references of this subquery plan, generally empty 
since these table
+ *                   arguments do not allow correlated references currently
+ * @param exprId expression ID of this subquery expression, generally 
generated afresh each time
+ * @param partitionByExpressions if non-empty, the TABLE argument included the 
PARTITION BY clause
+ *                               to indicate that the input relation should be 
repartitioned by the
+ *                               hash of the provided expressions, such that 
all the rows with each
+ *                               unique combination of values of the 
partitioning expressions will
+ *                               be consumed by exactly one instance of the 
table function class.
+ * @param withSinglePartition if true, the TABLE argument included the WITH 
SINGLE PARTITION clause
+ *                            to indicate that the entire input relation 
should be repartitioned to
+ *                            one worker for consumption by exactly one 
instance of the table
+ *                            function class.
+ * @param orderByExpressions if non-empty, the TABLE argument included the 
ORDER BY clause to
+ *                           indicate that the rows within each partition of 
the table function are
+ *                           to arrive in the provided order.
  */
 case class FunctionTableSubqueryArgumentExpression(
     plan: LogicalPlan,
     outerAttrs: Seq[Expression] = Seq.empty,
-    exprId: ExprId = NamedExpression.newExprId)
+    exprId: ExprId = NamedExpression.newExprId,
+    partitionByExpressions: Seq[Expression] = Seq.empty,
+    withSinglePartition: Boolean = false,
+    orderByExpressions: Seq[SortOrder] = Seq.empty)
   extends SubqueryExpression(plan, outerAttrs, exprId, Seq.empty, None) with 
Unevaluable {
 

Review Comment:
   Good idea, done.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala:
##########
@@ -1564,14 +1564,43 @@ class AstBuilder extends DataTypeAstBuilder with 
SQLConfHelper with Logging {
     }.getOrElse {
       plan(ctx.query)
     }
-    val partitioning = Option(ctx.tableArgumentPartitioning)
-    if (partitioning.isDefined) {
-      // The PARTITION BY clause is not implemented yet for TABLE arguments to 
table valued function
-      // calls.
-      operationNotAllowed(
-        "Specifying the PARTITION BY clause for TABLE arguments is not 
implemented yet", ctx)
-    }
-    FunctionTableSubqueryArgumentExpression(p)
+    var withSinglePartition = false
+    var partitionByExpressions = Seq.empty[Expression]
+    var orderByExpressions = Seq.empty[SortOrder]
+    Option(ctx.tableArgumentPartitioning)
+      .foreach { p =>
+        if (p.SINGLE != null) {
+          withSinglePartition = true
+        }
+      }
+    Option(ctx.tableArgumentPartitioning)
+      .map(_.partition.asScala.map(expression))
+      .foreach { expressions =>
+        if (expressions.nonEmpty) {
+          partitionByExpressions = expressions
+        }
+      }
+    Option(ctx.tableArgumentPartitioning)

Review Comment:
   Thanks, this is better.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to