dtenedor commented on code in PR #42351:
URL: https://github.com/apache/spark/pull/42351#discussion_r1287783174
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/FunctionTableSubqueryArgumentExpression.scala:
##########
@@ -104,23 +104,80 @@ case class FunctionTableSubqueryArgumentExpression(
// the query plan.
var subquery = plan
if (partitionByExpressions.nonEmpty) {
- subquery = RepartitionByExpression(
- partitionExpressions = partitionByExpressions,
- child = subquery,
- optNumPartitions = None)
+ // Add a projection to project each of the partitioning expressions that
it is not a simple
+ // attribute that is already present in the plan output. Then add a sort
operation by the
+ // partition keys (plus any explicit ORDER BY items) since after the
hash-based shuffle
+ // operation, the rows from several partitions may arrive interleaved.
In this way, the Python
+ // UDTF evaluator is able to inspect the values of the partitioning
expressions for adjacent
+ // rows in order to determine when each partition ends and the next one
begins.
+ subquery = Project(
+ projectList = subquery.output ++ extraProjectedPartitioningExpressions,
+ child = subquery)
+ val partitioningAttributes = partitioningExpressionIndexes.map(i =>
subquery.output(i))
+ subquery = Sort(
+ order = partitioningAttributes.map(e => SortOrder(e, Ascending)) ++
orderByExpressions,
+ global = false,
+ child = RepartitionByExpression(
+ partitionExpressions = partitioningAttributes,
+ optNumPartitions = None,
+ child = subquery))
}
if (withSinglePartition) {
subquery = Repartition(
numPartitions = 1,
shuffle = true,
child = subquery)
- }
- if (orderByExpressions.nonEmpty) {
- subquery = Sort(
- order = orderByExpressions,
- global = false,
- child = subquery)
+ if (orderByExpressions.nonEmpty) {
+ subquery = Sort(
+ order = orderByExpressions,
+ global = false,
+ child = subquery)
+ }
}
Project(Seq(Alias(CreateStruct(subquery.output), "c")()), subquery)
}
+
+ /**
+ * These are the indexes of the PARTITION BY expressions within the
concatenation of the child's
+ * output attributes and the [[extraProjectedPartitioningExpressions]]. We
send these indexes to
+ * the Python UDTF evaluator so it knows which expressions to compare on
adjacent rows to know
+ * when the partition has changed.
Review Comment:
Good Q, we need this change because the `PARTITION BY` clause can include
any expressions, not just simple column references, e.g. `PARTITION BY
SUBSTR(string_col, 6)`. So this PR creates a projection to compute these
expressions before the repartitioning and sorting operations. We repartition by
attribute references to these expressions, then sort by the concatenation of
these attribute references + any explicit `ORDER BY` expressions that the
`TABLE` argument include das well.
We then need to send the indexes to the UDTF evaluator (along with the
projected expression results themselves) so that it may inspect these values
between each call to `eval`. If it detects that any of these values change,
then this indicates that boundary between two partitions, and we need to call
`terminate` on the UDTF class instance and then destroy it, and create a new
one for the next partition.
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/FunctionTableSubqueryArgumentExpression.scala:
##########
@@ -104,23 +104,80 @@ case class FunctionTableSubqueryArgumentExpression(
// the query plan.
var subquery = plan
if (partitionByExpressions.nonEmpty) {
- subquery = RepartitionByExpression(
- partitionExpressions = partitionByExpressions,
- child = subquery,
- optNumPartitions = None)
+ // Add a projection to project each of the partitioning expressions that
it is not a simple
+ // attribute that is already present in the plan output. Then add a sort
operation by the
+ // partition keys (plus any explicit ORDER BY items) since after the
hash-based shuffle
+ // operation, the rows from several partitions may arrive interleaved.
In this way, the Python
+ // UDTF evaluator is able to inspect the values of the partitioning
expressions for adjacent
+ // rows in order to determine when each partition ends and the next one
begins.
+ subquery = Project(
+ projectList = subquery.output ++ extraProjectedPartitioningExpressions,
+ child = subquery)
+ val partitioningAttributes = partitioningExpressionIndexes.map(i =>
subquery.output(i))
+ subquery = Sort(
+ order = partitioningAttributes.map(e => SortOrder(e, Ascending)) ++
orderByExpressions,
+ global = false,
+ child = RepartitionByExpression(
+ partitionExpressions = partitioningAttributes,
+ optNumPartitions = None,
+ child = subquery))
}
if (withSinglePartition) {
subquery = Repartition(
numPartitions = 1,
shuffle = true,
child = subquery)
- }
- if (orderByExpressions.nonEmpty) {
- subquery = Sort(
- order = orderByExpressions,
- global = false,
- child = subquery)
+ if (orderByExpressions.nonEmpty) {
+ subquery = Sort(
+ order = orderByExpressions,
+ global = false,
+ child = subquery)
+ }
}
Project(Seq(Alias(CreateStruct(subquery.output), "c")()), subquery)
}
+
+ /**
+ * These are the indexes of the PARTITION BY expressions within the
concatenation of the child's
+ * output attributes and the [[extraProjectedPartitioningExpressions]]. We
send these indexes to
+ * the Python UDTF evaluator so it knows which expressions to compare on
adjacent rows to know
+ * when the partition has changed.
+ */
+ lazy val partitioningExpressionIndexes: Seq[Int] = {
+ val extraPartitionByExpressionsToIndexes: Map[Expression, Int] =
+ extraProjectedPartitioningExpressions.map(_.child).zipWithIndex.toMap
+ partitionByExpressions.map { e =>
+ subqueryOutputs.get(e).getOrElse {
+ extraPartitionByExpressionsToIndexes.get(e).get + plan.output.length
+ }
+ }
+ }
+
+ private lazy val extraProjectedPartitioningExpressions: Seq[Alias] = {
Review Comment:
We do currently sort by the attributes of these projected partitioning
expressions:
```
val partitioningAttributes = partitioningExpressionIndexes.map(i =>
subquery.output(i))
subquery = Sort(
order = partitioningAttributes.map(e => SortOrder(e, Ascending)) ++
orderByExpressions,
...
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]