dtenedor commented on code in PR #42351:
URL: https://github.com/apache/spark/pull/42351#discussion_r1287706651


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/FunctionTableSubqueryArgumentExpression.scala:
##########
@@ -104,23 +104,77 @@ case class FunctionTableSubqueryArgumentExpression(
     // the query plan.
     var subquery = plan
     if (partitionByExpressions.nonEmpty) {
-      subquery = RepartitionByExpression(
-        partitionExpressions = partitionByExpressions,
-        child = subquery,
-        optNumPartitions = None)
+      // Add a projection to project each of the partitioning expressions that 
it is not a simple
+      // attribute that is already present in the plan output. Then add a sort 
operation by the
+      // partition keys (plus any explicit ORDER BY items) since after the 
hash-based shuffle
+      // operation, the rows from several partitions may arrive interleaved. 
In this way, the Python
+      // UDTF evaluator is able to inspect the values of the partitioning 
expressions for adjacent
+      // rows in order to determine when each partition ends and the next one 
begins.
+      subquery = Project(
+        projectList = subquery.output ++ extraProjectedPartitioningExpressions,
+        child = subquery)
+      val partitioningAttributes = partitioningExpressionIndexes.map(i => 
subquery.output(i))
+      subquery = Sort(
+        order = partitioningAttributes.map(e => SortOrder(e, Ascending)) ++ 
orderByExpressions,
+        global = false,
+        child = RepartitionByExpression(
+          partitionExpressions = partitioningAttributes,
+          optNumPartitions = None,
+          child = subquery))
     }
     if (withSinglePartition) {
       subquery = Repartition(
         numPartitions = 1,
         shuffle = true,
         child = subquery)
-    }
-    if (orderByExpressions.nonEmpty) {
-      subquery = Sort(
-        order = orderByExpressions,
-        global = false,
-        child = subquery)
+      if (orderByExpressions.nonEmpty) {
+        subquery = Sort(
+          order = orderByExpressions,
+          global = false,
+          child = subquery)
+      }
     }
     Project(Seq(Alias(CreateStruct(subquery.output), "c")()), subquery)
   }
+
+  /**
+   * These are the indexes of the PARTITION BY expressions within the 
concatenation of the child's
+   * output attributes and the [[extraProjectedPartitioningExpressions]]. We 
send these indexes to
+   * the Python UDTF evaluator so it knows which expressions to compare on 
adjacent rows to know
+   * when the partition has changed.
+   */
+  lazy val partitioningExpressionIndexes: Seq[Int] = 
partitionByExpressions.map { e =>
+    subqueryOutputs.get(e).getOrElse {
+      lazy val extraIndexes = 
extraProjectedPartitioningExpressions.map(_.child).zipWithIndex.toMap

Review Comment:
   Good idea, make this change and renamed `extraIndexes` to hopefully clarify 
a bit.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/FunctionTableSubqueryArgumentExpression.scala:
##########
@@ -104,23 +104,77 @@ case class FunctionTableSubqueryArgumentExpression(
     // the query plan.
     var subquery = plan
     if (partitionByExpressions.nonEmpty) {
-      subquery = RepartitionByExpression(
-        partitionExpressions = partitionByExpressions,
-        child = subquery,
-        optNumPartitions = None)
+      // Add a projection to project each of the partitioning expressions that 
it is not a simple
+      // attribute that is already present in the plan output. Then add a sort 
operation by the
+      // partition keys (plus any explicit ORDER BY items) since after the 
hash-based shuffle
+      // operation, the rows from several partitions may arrive interleaved. 
In this way, the Python
+      // UDTF evaluator is able to inspect the values of the partitioning 
expressions for adjacent
+      // rows in order to determine when each partition ends and the next one 
begins.
+      subquery = Project(
+        projectList = subquery.output ++ extraProjectedPartitioningExpressions,
+        child = subquery)
+      val partitioningAttributes = partitioningExpressionIndexes.map(i => 
subquery.output(i))
+      subquery = Sort(
+        order = partitioningAttributes.map(e => SortOrder(e, Ascending)) ++ 
orderByExpressions,
+        global = false,
+        child = RepartitionByExpression(
+          partitionExpressions = partitioningAttributes,
+          optNumPartitions = None,
+          child = subquery))
     }
     if (withSinglePartition) {
       subquery = Repartition(
         numPartitions = 1,
         shuffle = true,
         child = subquery)
-    }
-    if (orderByExpressions.nonEmpty) {
-      subquery = Sort(
-        order = orderByExpressions,
-        global = false,
-        child = subquery)
+      if (orderByExpressions.nonEmpty) {
+        subquery = Sort(
+          order = orderByExpressions,
+          global = false,
+          child = subquery)
+      }
     }
     Project(Seq(Alias(CreateStruct(subquery.output), "c")()), subquery)
   }
+
+  /**
+   * These are the indexes of the PARTITION BY expressions within the 
concatenation of the child's
+   * output attributes and the [[extraProjectedPartitioningExpressions]]. We 
send these indexes to
+   * the Python UDTF evaluator so it knows which expressions to compare on 
adjacent rows to know
+   * when the partition has changed.
+   */
+  lazy val partitioningExpressionIndexes: Seq[Int] = 
partitionByExpressions.map { e =>
+    subqueryOutputs.get(e).getOrElse {
+      lazy val extraIndexes = 
extraProjectedPartitioningExpressions.map(_.child).zipWithIndex.toMap

Review Comment:
   Good idea, made this change and renamed `extraIndexes` to hopefully clarify 
a bit.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to