ueshin commented on code in PR #42351:
URL: https://github.com/apache/spark/pull/42351#discussion_r1287537496
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/FunctionTableSubqueryArgumentExpression.scala:
##########
@@ -104,23 +104,77 @@ case class FunctionTableSubqueryArgumentExpression(
// the query plan.
var subquery = plan
if (partitionByExpressions.nonEmpty) {
- subquery = RepartitionByExpression(
- partitionExpressions = partitionByExpressions,
- child = subquery,
- optNumPartitions = None)
+ // Add a projection to project each of the partitioning expressions that
it is not a simple
+ // attribute that is already present in the plan output. Then add a sort
operation by the
+ // partition keys (plus any explicit ORDER BY items) since after the
hash-based shuffle
+ // operation, the rows from several partitions may arrive interleaved.
In this way, the Python
+ // UDTF evaluator is able to inspect the values of the partitioning
expressions for adjacent
+ // rows in order to determine when each partition ends and the next one
begins.
+ subquery = Project(
+ projectList = subquery.output ++ extraProjectedPartitioningExpressions,
+ child = subquery)
+ val partitioningAttributes = partitioningExpressionIndexes.map(i =>
subquery.output(i))
+ subquery = Sort(
+ order = partitioningAttributes.map(e => SortOrder(e, Ascending)) ++
orderByExpressions,
+ global = false,
+ child = RepartitionByExpression(
+ partitionExpressions = partitioningAttributes,
+ optNumPartitions = None,
+ child = subquery))
}
if (withSinglePartition) {
subquery = Repartition(
numPartitions = 1,
shuffle = true,
child = subquery)
- }
- if (orderByExpressions.nonEmpty) {
- subquery = Sort(
- order = orderByExpressions,
- global = false,
- child = subquery)
+ if (orderByExpressions.nonEmpty) {
+ subquery = Sort(
+ order = orderByExpressions,
+ global = false,
+ child = subquery)
+ }
}
Project(Seq(Alias(CreateStruct(subquery.output), "c")()), subquery)
}
+
+ /**
+ * These are the indexes of the PARTITION BY expressions within the
concatenation of the child's
+ * output attributes and the [[extraProjectedPartitioningExpressions]]. We
send these indexes to
+ * the Python UDTF evaluator so it knows which expressions to compare on
adjacent rows to know
+ * when the partition has changed.
+ */
+ lazy val partitioningExpressionIndexes: Seq[Int] =
partitionByExpressions.map { e =>
+ subqueryOutputs.get(e).getOrElse {
+ lazy val extraIndexes =
extraProjectedPartitioningExpressions.map(_.child).zipWithIndex.toMap
Review Comment:
nit: I'm wondering this is done only once?
might want to extract this out of `partitionByExpressions.map { e => ... }`
to be readable?
```scala
lazy val partitioningExpressionIndexes: Seq[Int] = {
val extraIndexes =
extraProjectedPartitioningExpressions.map(_.child).zipWithIndex.toMap
partitionByExpressions.map { e =>
subqueryOutputs.get(e).getOrElse {
extraIndexes.get(e).get + plan.output.length
}
}
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]