dongjoon-hyun commented on code in PR #52835:
URL: https://github.com/apache/spark/pull/52835#discussion_r2484972241


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/MergeScalarSubqueries.scala:
##########
@@ -117,297 +116,126 @@ object MergeScalarSubqueries extends Rule[LogicalPlan] {
     }
   }
 
-  /**
-   * An item in the cache of merged scalar subqueries.
-   *
-   * @param plan The plan of a merged scalar subquery.
-   * @param merged A flag to identify if this item is the result of merging 
subqueries.
-   *               Please note that `attributes.size == 1` doesn't always mean 
that the plan is not
-   *               merged as there can be subqueries that are different 
([[checkIdenticalPlans]] is
-   *               false) due to an extra [[Project]] node in one of them. In 
that case
-   *               `attributes.size` remains 1 after merging, but the merged 
flag becomes true.
-   * @param references A set of subquery indexes in the cache to track all 
(including transitive)
-   *                   nested subqueries.
-   */
-  case class Header(
-      plan: LogicalPlan,
-      merged: Boolean,
-      references: Set[Int])
-
   private def extractCommonScalarSubqueries(plan: LogicalPlan) = {
-    val cache = ArrayBuffer.empty[Header]
-    val planWithReferences = insertReferences(plan, cache)
-    cache.zipWithIndex.foreach { case (header, i) =>
-      cache(i) = cache(i).copy(plan =
-        if (header.merged) {
+    // Collect `ScalarSubquery` plans by level into `PlanMerger`s and insert 
references in place of
+    // `ScalarSubquery`s.
+    val planMergers = ArrayBuffer.empty[PlanMerger]
+    val planWithReferences = insertReferences(plan, planMergers)._1
+
+    // Traverse level by level and convert merged plans to `CTERelationDef`s 
and keep non-merged
+    // ones. While traversing replace references in plans back to 
`CTERelationRef`s or to original
+    // `ScalarSubquery`s. This is safe as a subquery plan at a level can 
reference only lower level
+    // other subqueries.
+    val subqueryPlansByLevel = ArrayBuffer.empty[IndexedSeq[LogicalPlan]]
+    planMergers.foreach { planMerger =>
+      val mergedPlans = planMerger.mergedPlans()
+      subqueryPlansByLevel += mergedPlans.map { mergedPlan =>
+        val planWithoutReferences = if (subqueryPlansByLevel.isEmpty) {
+          // Level 0 plans can't contain references
+          mergedPlan.plan
+        } else {
+          removeReferences(mergedPlan.plan, subqueryPlansByLevel)
+        }
+        if (mergedPlan.merged) {
           CTERelationDef(
-            createProject(header.plan.output, removeReferences(header.plan, 
cache)),
+            Project(

Review Comment:
   May I ask why this PR proposes to remove the existing `createProject` method 
and use `Project(...)` directly here? Is this better in a way?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to