dongjoon-hyun commented on code in PR #52835:
URL: https://github.com/apache/spark/pull/52835#discussion_r2484971747


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PlanMerger.scala:
##########
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, 
AttributeMap, Expression, NamedExpression}
+import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
+import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Filter, Join, 
LogicalPlan, Project}
+
+/**
+ * Result of attempting to merge a plan via [[PlanMerger.merge]].
+ *
+ * @param mergedPlan The resulting plan, either:
+ *                   - An existing cached plan (if identical match found)
+ *                   - A newly merged plan combining the input with a cached 
plan
+ *                   - The original input plan (if no merge was possible)
+ * @param mergedPlanIndex The index of this plan in the PlanMerger's cache.
+ * @param merged Whether the plan was merged with an existing cached plan 
(true) or
+ *               is a new entry (false).
+ * @param outputMap Maps attributes from the input plan to corresponding 
attributes in
+ *                  `mergedPlan`. Used to rewrite expressions referencing the 
original plan
+ *                  to reference the merged plan instead.
+ */
+case class MergeResult(
+    mergedPlan: LogicalPlan,
+    mergedPlanIndex: Int,
+    merged: Boolean,
+    outputMap: AttributeMap[Attribute])
+
+/**
+ * Represents a plan in the PlanMerger's cache.
+ *
+ * @param plan The logical plan, which may have been merged from multiple 
original plans.
+ * @param merged Whether this plan is the result of merging two or more plans 
(true), or
+ *               is an original unmerged plan (false). Merged plans typically 
require special
+ *               handling such as wrapping in CTEs.
+ */
+case class MergedPlan(plan: LogicalPlan, merged: Boolean)
+
+/**
+ * A stateful utility for merging identical or similar logical plans to enable 
query plan reuse.
+ *
+ * `PlanMerger` maintains a cache of previously seen plans and attempts to 
either:
+ * 1. Reuse an identical plan already in the cache
+ * 2. Merge a new plan with a cached plan by combining their outputs
+ *
+ * The merging process preserves semantic equivalence while combining outputs 
from multiple
+ * plans into a single plan. This is primarily used by 
[[MergeScalarSubqueries]] to deduplicate
+ * scalar subquery execution.
+ *
+ * Supported plan types for merging:
+ * - [[Project]]: Merges project lists
+ * - [[Aggregate]]: Merges aggregate expressions with identical grouping
+ * - [[Filter]]: Requires identical filter conditions
+ * - [[Join]]: Requires identical join type, hints, and conditions
+ *
+ * @example
+ * {{{
+ *   val merger = PlanMerger()
+ *   val result1 = merger.merge(plan1)  // Adds plan1 to cache
+ *   val result2 = merger.merge(plan2)  // Merges with plan1 if compatible
+ *   // result2.merged == true if plans were merged
+ *   // result2.outputMap maps plan2's attributes to the merged plan's 
attributes
+ * }}}
+ */
+class PlanMerger {
+  val cache = ArrayBuffer.empty[MergedPlan]
+
+  /**
+   * Attempts to merge the given plan with cached plans, or adds it to the 
cache.
+   *
+   * The method tries the following in order:
+   * 1. Check if an identical plan exists in cache (using canonicalized 
comparison)
+   * 2. Try to merge with each cached plan using [[tryMergePlans]]
+   * 3. If no merge is possible, add as a new cache entry
+   *
+   * @param plan The logical plan to merge or cache.
+   * @return A [[MergeResult]] containing:
+   *         - The merged/cached plan to use
+   *         - Its index in the cache
+   *         - Whether it was merged with an existing plan
+   *         - An attribute mapping for rewriting expressions
+   */
+  def merge(plan: LogicalPlan): MergeResult = {
+    cache.zipWithIndex.collectFirst(Function.unlift {
+      case (mp, i) =>
+        checkIdenticalPlans(plan, mp.plan).map { outputMap =>
+          MergeResult(mp.plan, i, true, outputMap)
+        }.orElse {
+          tryMergePlans(plan, mp.plan).map {
+            case (mergedPlan, outputMap) =>
+              cache(i) = MergedPlan(mergedPlan, true)
+              MergeResult(mergedPlan, i, true, outputMap)
+          }
+        }
+      case _ => None
+    }).getOrElse {
+      cache += MergedPlan(plan, false)
+      val outputMap = AttributeMap(plan.output.map(a => a -> a))
+      MergeResult(plan, cache.length - 1, false, outputMap)
+    }
+  }
+
+  /**
+   * Returns all plans currently in the cache as an immutable indexed sequence.
+   *
+   * @return An indexed sequence of [[MergedPlan]]s in cache order. The index 
of each plan
+   *         corresponds to the `mergedPlanIndex` returned by [[merge]].
+   */
+  def mergedPlans(): IndexedSeq[MergedPlan] = cache.toIndexedSeq
+
+  // If 2 plans are identical return the attribute mapping from the new to the 
cached version.
+  private def checkIdenticalPlans(
+      newPlan: LogicalPlan,
+      cachedPlan: LogicalPlan): Option[AttributeMap[Attribute]] = {
+    if (newPlan.canonicalized == cachedPlan.canonicalized) {
+      Some(AttributeMap(newPlan.output.zip(cachedPlan.output)))
+    } else {
+      None
+    }
+  }
+
+  /**
+   * Recursively attempts to merge two plans by traversing their tree 
structures.
+   *
+   * Two plans can be merged if:
+   * - They are identical (canonicalized forms match), OR
+   * - They have compatible root nodes with mergeable children
+   *
+   * Supported merge patterns:
+   * - Project nodes: Combines project lists from both plans
+   * - Aggregate nodes: Combines aggregate expressions if grouping is 
identical and both
+   *   support the same aggregate implementation (hash/object-hash/sort-based)
+   * - Filter nodes: Only if filter conditions are identical
+   * - Join nodes: Only if join type, hints, and conditions are identical
+   *
+   * @param newPlan The plan to merge into the cached plan.
+   * @param cachedPlan The cached plan to merge with.
+   * @return Some((mergedPlan, outputMap)) if merge succeeds, where:
+   *         - mergedPlan is the combined plan
+   *         - outputMap maps newPlan's attributes to mergedPlan's attributes
+   *         Returns None if plans cannot be merged.
+   */
+  private def tryMergePlans(

Review Comment:
   At the first glance, this looks identical in terms of the code. Am I right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to