dtenedor commented on code in PR #42351:
URL: https://github.com/apache/spark/pull/42351#discussion_r1287783174


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/FunctionTableSubqueryArgumentExpression.scala:
##########
@@ -104,23 +104,80 @@ case class FunctionTableSubqueryArgumentExpression(
     // the query plan.
     var subquery = plan
     if (partitionByExpressions.nonEmpty) {
-      subquery = RepartitionByExpression(
-        partitionExpressions = partitionByExpressions,
-        child = subquery,
-        optNumPartitions = None)
+      // Add a projection to project each of the partitioning expressions that 
it is not a simple
+      // attribute that is already present in the plan output. Then add a sort 
operation by the
+      // partition keys (plus any explicit ORDER BY items) since after the 
hash-based shuffle
+      // operation, the rows from several partitions may arrive interleaved. 
In this way, the Python
+      // UDTF evaluator is able to inspect the values of the partitioning 
expressions for adjacent
+      // rows in order to determine when each partition ends and the next one 
begins.
+      subquery = Project(
+        projectList = subquery.output ++ extraProjectedPartitioningExpressions,
+        child = subquery)
+      val partitioningAttributes = partitioningExpressionIndexes.map(i => 
subquery.output(i))
+      subquery = Sort(
+        order = partitioningAttributes.map(e => SortOrder(e, Ascending)) ++ 
orderByExpressions,
+        global = false,
+        child = RepartitionByExpression(
+          partitionExpressions = partitioningAttributes,
+          optNumPartitions = None,
+          child = subquery))
     }
     if (withSinglePartition) {
       subquery = Repartition(
         numPartitions = 1,
         shuffle = true,
         child = subquery)
-    }
-    if (orderByExpressions.nonEmpty) {
-      subquery = Sort(
-        order = orderByExpressions,
-        global = false,
-        child = subquery)
+      if (orderByExpressions.nonEmpty) {
+        subquery = Sort(
+          order = orderByExpressions,
+          global = false,
+          child = subquery)
+      }
     }
     Project(Seq(Alias(CreateStruct(subquery.output), "c")()), subquery)
   }
+
+  /**
+   * These are the indexes of the PARTITION BY expressions within the 
concatenation of the child's
+   * output attributes and the [[extraProjectedPartitioningExpressions]]. We 
send these indexes to
+   * the Python UDTF evaluator so it knows which expressions to compare on 
adjacent rows to know
+   * when the partition has changed.

Review Comment:
   Good Q, we need this change because the `PARTITION BY` clause can include 
any expressions, not just simple column references, e.g. `PARTITION BY 
SUBSTR(string_col, 6)`. So this PR creates a projection to compute these 
expressions before the repartitioning and sorting operations. We repartition by 
attribute references to these expressions, then sort by the concatenation of 
these attribute references + any explicit `ORDER BY` expressions that the 
`TABLE` argument include das well.
   
   We then need to send the indexes to the UDTF evaluator (along with the 
projected expression results themselves) so that it may inspect these values 
between each call to `eval`. If it detects that any of these values change, 
then this indicates that boundary between two partitions, and we need to call 
`terminate` on the UDTF class instance and then destroy it, and create a new 
one for the next partition.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/FunctionTableSubqueryArgumentExpression.scala:
##########
@@ -104,23 +104,80 @@ case class FunctionTableSubqueryArgumentExpression(
     // the query plan.
     var subquery = plan
     if (partitionByExpressions.nonEmpty) {
-      subquery = RepartitionByExpression(
-        partitionExpressions = partitionByExpressions,
-        child = subquery,
-        optNumPartitions = None)
+      // Add a projection to project each of the partitioning expressions that 
it is not a simple
+      // attribute that is already present in the plan output. Then add a sort 
operation by the
+      // partition keys (plus any explicit ORDER BY items) since after the 
hash-based shuffle
+      // operation, the rows from several partitions may arrive interleaved. 
In this way, the Python
+      // UDTF evaluator is able to inspect the values of the partitioning 
expressions for adjacent
+      // rows in order to determine when each partition ends and the next one 
begins.
+      subquery = Project(
+        projectList = subquery.output ++ extraProjectedPartitioningExpressions,
+        child = subquery)
+      val partitioningAttributes = partitioningExpressionIndexes.map(i => 
subquery.output(i))
+      subquery = Sort(
+        order = partitioningAttributes.map(e => SortOrder(e, Ascending)) ++ 
orderByExpressions,
+        global = false,
+        child = RepartitionByExpression(
+          partitionExpressions = partitioningAttributes,
+          optNumPartitions = None,
+          child = subquery))
     }
     if (withSinglePartition) {
       subquery = Repartition(
         numPartitions = 1,
         shuffle = true,
         child = subquery)
-    }
-    if (orderByExpressions.nonEmpty) {
-      subquery = Sort(
-        order = orderByExpressions,
-        global = false,
-        child = subquery)
+      if (orderByExpressions.nonEmpty) {
+        subquery = Sort(
+          order = orderByExpressions,
+          global = false,
+          child = subquery)
+      }
     }
     Project(Seq(Alias(CreateStruct(subquery.output), "c")()), subquery)
   }
+
+  /**
+   * These are the indexes of the PARTITION BY expressions within the 
concatenation of the child's
+   * output attributes and the [[extraProjectedPartitioningExpressions]]. We 
send these indexes to
+   * the Python UDTF evaluator so it knows which expressions to compare on 
adjacent rows to know
+   * when the partition has changed.
+   */
+  lazy val partitioningExpressionIndexes: Seq[Int] = {
+    val extraPartitionByExpressionsToIndexes: Map[Expression, Int] =
+      extraProjectedPartitioningExpressions.map(_.child).zipWithIndex.toMap
+    partitionByExpressions.map { e =>
+      subqueryOutputs.get(e).getOrElse {
+        extraPartitionByExpressionsToIndexes.get(e).get + plan.output.length
+      }
+    }
+  }
+
+  private lazy val extraProjectedPartitioningExpressions: Seq[Alias] = {

Review Comment:
   We do currently sort by the attributes of these projected partitioning 
expressions:
   
   ```
         val partitioningAttributes = partitioningExpressionIndexes.map(i => 
subquery.output(i))
         subquery = Sort(
           order = partitioningAttributes.map(e => SortOrder(e, Ascending)) ++ 
orderByExpressions,
           ...
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to