dtenedor commented on code in PR #42174:
URL: https://github.com/apache/spark/pull/42174#discussion_r1280970732
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala:
##########
@@ -2073,6 +2073,13 @@ class Analyzer(override val catalogManager:
CatalogManager) extends RuleExecutor
_.containsPattern(FUNCTION_TABLE_RELATION_ARGUMENT_EXPRESSION),
ruleId) {
case t: FunctionTableSubqueryArgumentExpression =>
val alias =
SubqueryAlias.generateSubqueryName(s"_${tableArgs.size}")
+ resolvedFunc match {
+ case Generate(_: PythonUDTF, _, _, _, _, _) =>
+ case _ if t.hasRepartitioning =>
+ throw
QueryCompilationErrors.tableValuedFunctionPartitionByClauseNotSupported(
Review Comment:
I tried this, but got a different data type mismatch error instead. Since
I'm unable to exercise this error message, I changed it to an assert for now
instead, and left the unit test that exercises using a `TABLE` argument type
with EXPLODE.
##########
python/pyspark/sql/tests/test_udtf.py:
##########
@@ -14,9 +14,12 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
+
import os
+import re
import shutil
import tempfile
+import textwrap
Review Comment:
Apologies for missing this, reverted this file now.
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala:
##########
@@ -2445,8 +2452,12 @@ class Analyzer(override val catalogManager:
CatalogManager) extends RuleExecutor
InSubquery(values, expr.asInstanceOf[ListQuery])
case s @ LateralSubquery(sub, _, exprId, _, _) if !sub.resolved =>
resolveSubQuery(s, outer)(LateralSubquery(_, _, exprId))
- case a @ FunctionTableSubqueryArgumentExpression(sub, _, exprId) if
!sub.resolved =>
- resolveSubQuery(a, outer)(FunctionTableSubqueryArgumentExpression(_,
_, exprId))
+ case a @ FunctionTableSubqueryArgumentExpression(
+ sub, _, exprId, partitionByExpressions, withSinglePartition,
orderByExpressions)
Review Comment:
Done, thanks, this is better.
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/FunctionTableSubqueryArgumentExpression.scala:
##########
@@ -32,11 +32,40 @@ import org.apache.spark.sql.types.DataType
* table in the catalog. In the latter case, the relation argument comprises
* a table subquery that may itself refer to one or more tables in its own
* FROM clause.
+ *
+ * Each TABLE argument may also optionally include a PARTITION BY clause. If
present, these indicate
+ * how to logically split up the input relation such that the table-valued
function evaluates
+ * exactly once for each partition, and returns the union of all results. If
no partitioning list is
+ * present, this splitting of the input relation is undefined. Furthermore, if
the PARTITION BY
+ * clause includes a following ORDER BY clause, Catalyst will sort the rows in
each partition such
+ * that the table-valued function receives them one-by-one in the requested
order. Otherwise, if no
+ * such ordering is specified, the ordering of rows within each partition is
undefined.
+ *
+ * @param plan the logical plan provided as input for the table argument as
either a logical
+ * relation or as a more complex logical plan in the event of a
table subquery.
+ * @param outerAttrs outer references of this subquery plan, generally empty
since these table
+ * arguments do not allow correlated references currently
+ * @param exprId expression ID of this subquery expression, generally
generated afresh each time
+ * @param partitionByExpressions if non-empty, the TABLE argument included the
PARTITION BY clause
+ * to indicate that the input relation should be
repartitioned by the
+ * hash of the provided expressions, such that
all the rows with each
+ * unique combination of values of the
partitioning expressions will
+ * be consumed by exactly one instance of the
table function class.
+ * @param withSinglePartition if true, the TABLE argument included the WITH
SINGLE PARTITION clause
+ * to indicate that the entire input relation
should be repartitioned to
+ * one worker for consumption by exactly one
instance of the table
+ * function class.
+ * @param orderByExpressions if non-empty, the TABLE argument included the
ORDER BY clause to
+ * indicate that the rows within each partition of
the table function are
+ * to arrive in the provided order.
*/
case class FunctionTableSubqueryArgumentExpression(
plan: LogicalPlan,
outerAttrs: Seq[Expression] = Seq.empty,
- exprId: ExprId = NamedExpression.newExprId)
+ exprId: ExprId = NamedExpression.newExprId,
+ partitionByExpressions: Seq[Expression] = Seq.empty,
+ withSinglePartition: Boolean = false,
+ orderByExpressions: Seq[SortOrder] = Seq.empty)
extends SubqueryExpression(plan, outerAttrs, exprId, Seq.empty, None) with
Unevaluable {
Review Comment:
Good idea, done.
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala:
##########
@@ -1564,14 +1564,43 @@ class AstBuilder extends DataTypeAstBuilder with
SQLConfHelper with Logging {
}.getOrElse {
plan(ctx.query)
}
- val partitioning = Option(ctx.tableArgumentPartitioning)
- if (partitioning.isDefined) {
- // The PARTITION BY clause is not implemented yet for TABLE arguments to
table valued function
- // calls.
- operationNotAllowed(
- "Specifying the PARTITION BY clause for TABLE arguments is not
implemented yet", ctx)
- }
- FunctionTableSubqueryArgumentExpression(p)
+ var withSinglePartition = false
+ var partitionByExpressions = Seq.empty[Expression]
+ var orderByExpressions = Seq.empty[SortOrder]
+ Option(ctx.tableArgumentPartitioning)
+ .foreach { p =>
+ if (p.SINGLE != null) {
+ withSinglePartition = true
+ }
+ }
+ Option(ctx.tableArgumentPartitioning)
+ .map(_.partition.asScala.map(expression))
+ .foreach { expressions =>
+ if (expressions.nonEmpty) {
+ partitionByExpressions = expressions
+ }
+ }
+ Option(ctx.tableArgumentPartitioning)
Review Comment:
Thanks, this is better.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]