peter-toth commented on code in PR #37630:
URL: https://github.com/apache/spark/pull/37630#discussion_r1427838550


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/MergeScalarSubqueries.scala:
##########
@@ -223,87 +224,383 @@ object MergeScalarSubqueries extends Rule[LogicalPlan] {
     }
   }
 
-  // Recursively traverse down and try merging 2 plans. If merge is possible 
then return the merged
-  // plan with the attribute mapping from the new to the merged version.
-  // Please note that merging arbitrary plans can be complicated, the current 
version supports only
-  // some of the most important nodes.
+  /**
+   * Recursively traverse down and try merging 2 plans.
+   *
+   * Please note that merging arbitrary plans can be complicated, the current 
version supports only
+   * some of the most important nodes.
+   *
+   * @param newPlan a new plan that we want to merge to an already processed 
plan
+   * @param cachedPlan a plan that we already processed, it can be either an 
original plan or a
+   *                   merged version of 2 or more plans
+   * @param filterPropagationSupported a boolean flag that we propagate down 
to signal we have seen
+   *                                   an `Aggregate` node where propagated 
filters can be merged
+   * @return A tuple of:
+   *         - the merged plan,
+   *         - the attribute mapping from the new to the merged version,
+   *         - the 2 optional filters of both plans that we need to propagate 
up and merge in
+   *           an ancestor `Aggregate` node if possible,
+   *         - the optional accumulated extra cost of merge that we need to 
propagate up and
+   *           check in the ancestor `Aggregate` node.
+   *         The cost is optional to signal if the cost needs to be taken into 
account up in the
+   *         `Aggregate` node to decide about merge.
+   */
   private def tryMergePlans(
       newPlan: LogicalPlan,
-      cachedPlan: LogicalPlan): Option[(LogicalPlan, AttributeMap[Attribute])] 
= {
-    checkIdenticalPlans(newPlan, cachedPlan).map(cachedPlan -> _).orElse(
+      cachedPlan: LogicalPlan,
+      filterPropagationSupported: Boolean):
+    Option[(LogicalPlan, AttributeMap[Attribute], Option[Expression], 
Option[Expression],
+        Option[Double])] = {
+    checkIdenticalPlans(newPlan, cachedPlan).map { outputMap =>
+      // Currently the cost is always propagated up when 
`filterPropagationSupported` is true but
+      // later we can address cases when we don't need to take cost into 
account. Please find the
+      // details at the `Filter` node handling.
+      val mergeCost = if (filterPropagationSupported) Some(0d) else None
+
+      (cachedPlan, outputMap, None, None, mergeCost)
+    }.orElse(
       (newPlan, cachedPlan) match {

Review Comment:
   This is a logical plan optimization rule and in the previous version of this 
PR I was trying to peek into the physical plan by moving this rule to the last 
in the optimization phase and generate the physical plan of the scans + the 
adjacent projects/filters above it.
   I did this to see if any of those projects/filters gets pushed down to 
physical scan (as column pruning or pushed partition or data filters). I 
prevented merging if the 2 physical scans differed (actually there was this 
PLAN_MERGE_IGNORE_PUSHED_DATA_FILTERS config to still allow merging if only 
pushed data filters differed) to avoid those cases that could cause performance 
degradation due to merging non-overlapping scans.
   The problem with this approach was that:
   - The code was pretty complicated,
   - As most of the physical scans (e.g. Parquet/ORC) allow pushing down data 
filters so the default of PLAN_MERGE_IGNORE_PUSHED_DATA_FILTERS was true. But 
actually even data filter diference could cause non-overlapping scans in some 
physical scans.
   - This approach didn't work well with DSv2 as DSv2 physical scans can't be 
compared (they don't have comparable partition and data filters). To solve this 
I suggested a new `SupportsMerge` interface that DSv2 scans could implement to 
decide if merging makes sense. This was in a separete PR: 
https://github.com/apache/spark/pull/37711 and I implemented the interface for 
DSv2 Parquet only.
   
   The new version of this PR dropped the physical plan comparison as mentioned 
here: https://github.com/apache/spark/pull/37630#issuecomment-1688626745 and 
decides about merging based on costs. If the the sum of the cost differences 
between the original plans and the merged plan is lower than 
PLAN_MERGE_FILTER_PROPAGATION_MAX_COST then merging is enabled. The cost 
function might need some refinement: 
https://github.com/apache/spark/pull/37630/files#diff-5096416449daefcb91637508ae3e98a11c8ac66cae5b146b0937370115c1cbb1R734-R742
 to support more expressions, but it already works for TPCDS q9.
   This cost based new approach might also need some follow-up changes to make 
it work with DSv2, but definitely no huge changes from the DSv2 scans (like the 
`SupportsMerge` previously) are required.
   This PR targets Spark 4.0 as new features are not backported to already 
released versions, but it could work with Spark 3.5 too.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to