cloud-fan commented on code in PR #46143:
URL: https://github.com/apache/spark/pull/46143#discussion_r2567036287


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala:
##########
@@ -2053,17 +2053,124 @@ object PushDownPredicates extends Rule[LogicalPlan] {
 object PushPredicateThroughNonJoin extends Rule[LogicalPlan] with 
PredicateHelper {
   def apply(plan: LogicalPlan): LogicalPlan = plan transform applyLocally
 
+  // Match the column ordering. It's not something we guarantee but seems like 
it
+  // could be useful especially for folks writing out to "raw" files.
+  private def matchColumnOrdering(original: Seq[NamedExpression], updated: 
Seq[NamedExpression]):
+      Seq[NamedExpression] = {
+    val nameToIndex = original.map(_.name).zipWithIndex.toMap
+    val response = updated.toArray
+    updated.foreach { e =>
+      val idx = nameToIndex(e.name)
+      response(idx) = e
+    }
+    response.toSeq
+  }
+
   val applyLocally: PartialFunction[LogicalPlan, LogicalPlan] = {
     // SPARK-13473: We can't push the predicate down when the underlying 
projection output non-
     // deterministic field(s).  Non-deterministic expressions are essentially 
stateful. This
     // implies that, for a given input row, the output are determined by the 
expression's initial
     // state and all the input rows processed before. In another word, the 
order of input rows
     // matters for non-deterministic expressions, while pushing down 
predicates changes the order.
     // This also applies to Aggregate.
-    case Filter(condition, project @ Project(fields, grandChild))
+    case f @ Filter(condition, project @ Project(fields, grandChild))
       if fields.forall(_.deterministic) && canPushThroughCondition(grandChild, 
condition) =>
       val aliasMap = getAliasMap(project)
-      project.copy(child = Filter(replaceAlias(condition, aliasMap), 
grandChild))
+      // Projection aliases that the filter references
+      val expensiveFilterAliasesBuf = mutable.ArrayBuffer.empty[Alias]
+      // Projection aliases that are not used in the filter, so we don't need 
to push
+      var leftBehindAliases: Map[ExprId, Alias] = aliasMap.baseMap.values.map {
+        kv => (kv._2.exprId, kv._2)
+      }.toMap
+      val (cheap, expensive) = splitConjunctivePredicates(condition).partition 
{ cond =>
+        if (!SQLConf.get.avoidDoubleFilterEvail) {
+          // If we are always pushing through short circuit the check.
+          true
+        } else {
+          val (replaced, usedAliases) = replaceAliasWhileTracking(cond, 
aliasMap)
+          if (cond == replaced) {
+          // If nothing changes then our alias is cheap
+            true
+          } else if (usedAliases.iterator.map(_._2.child.expensive).forall(_ 
== false)) {
+            // If it's cheap we can push it because it might eliminate more 
data quickly and
+            // it may also be something which could be evaluated at the 
storage layer.
+            // We may wish to improve this heuristic in the future.
+            true
+          } else {
+            // This is an expensive replacement so we see resolve it
+            usedAliases.iterator.foreach {
+              e: (Attribute, Alias) =>
+              if (leftBehindAliases.contains(e._2.exprId)) {
+                leftBehindAliases = leftBehindAliases.removed(e._2.exprId)
+                expensiveFilterAliasesBuf += e._2
+              }
+            }
+            false
+          }
+        }
+      }
+
+      val expensiveFilterAliases = expensiveFilterAliasesBuf.toArray.toSeq
+      val expensiveLeftBehindAliases: Map[ExprId, Alias] = 
leftBehindAliases.filter {
+        kv => kv._2.child.expensive
+      }
+
+      if (expensiveFilterAliases.isEmpty) {
+        // If the filter does not reference any expensive aliases then we
+        // just push the filter while resolving the non-expensive aliases.
+        project.copy(child = Filter(replaceAlias(condition, aliasMap), child = 
grandChild))
+      } else if (leftBehindAliases.isEmpty) {
+        // If there are no left behind aliases then we've used all of the 
aliases in our filter.
+        // We don't need to introduce any more projections since we already 
have the "minimal"
+        // projection for our expensive conditions, but we might be able to 
move the "cheap"
+        // conditions.
+        if (!cheap.isEmpty) {
+          val expensiveFilterCond = expensive.reduce(And)
+          // Replace the aliases in the cheap filter since we are pushing them 
down.
+          val cheapFilterCond = replaceAlias(cheap.reduce(And), aliasMap)
+          Filter(expensiveFilterCond, project.copy(child = 
Filter(cheapFilterCond, grandChild)))
+        } else {
+          f
+        }
+      } else if (expensiveLeftBehindAliases.isEmpty) {
+        // If there are no expensive left behind aliases then introducing
+        // a new projection might not be worth the overhead.
+        project.copy(child = Filter(replaceAlias(condition, aliasMap), child = 
grandChild))
+      } else {
+        // If we're here then we have a filter with an expensive alias along 
with
+        // some left behind aliases which are also expensive to compute. In 
this case we will
+        // add a new projection which computes the expensive aliases needed 
for the filter then
+        // filter and then the final top level projection with the remaining 
aliases.
+        val projectionReferences = project.references.toSeq
+
+        val expensiveCondition = expensive.reduce(And)
+
+        // If we don't have any cheap conditions we do:
+        // non-expensive-filter-used Project -> Filter -> Used Expensive 
Projections
+        // If we do we go:
+        // non-expensive-filter-used Project -> Filter -> Used Expensive 
Projections -> Cheap Filter
+        val filterChild = if (cheap.isEmpty) {
+          project.copy(
+            projectList = (projectionReferences ++ expensiveFilterAliases))
+        } else {
+          val cheapCondition = replaceAlias(cheap.reduce(And), aliasMap)
+          project.copy(
+            projectList = (projectionReferences ++ expensiveFilterAliases),
+            child = Filter(cheapCondition, grandChild))
+        }
+
+
+        // Our top level projection is going to be the existing projection 
minus the
+        // leftBehindAliases + all of the output fields which we've 
effectively pushed down.
+        val pushedAliasNames = project.output.filter(
+          a => !leftBehindAliases.contains(a.exprId)).toSeq
+        val remainingAliasesToEval = leftBehindAliases.iterator.map(_._2).toSeq
+        val topLevelProjection = project.copy(
+          projectList = matchColumnOrdering(fields, pushedAliasNames ++ 
remainingAliasesToEval),
+          child = Filter(expensiveCondition, child = filterChild)
+        )
+        topLevelProjection
+      }

Review Comment:
   It's really hard to review this new optimization, I have a few suggestions:
   1. optimizer rules should be orthogonal. I think "push down cheap filters 
only" and "split project" are two optimizations and should be put in two rules.
   2. add some code comments at the beginning to explain the algorithm. It's 
not easy for me to figure out the algorithm from the concrete code as it's a 
bit complicated.
   
   The algorithm in my mind:
   1. match plan pattern `Filter` -> `Project`
   2. find all expensive predicates (after `splitConjunctivePredicates`)
   3. for each expensive predicate, find the indexes of the referenced 
expensive expressions in the Project list.
   
   For example:
   ```
   Filter c1 + c3 > 0 AND c2 > 0
     Project expr1 AS c1, expr2 AS c2, expr3 as c3
   ```
   assume all expressions are expensive, then the first predicate has indexes 
[1, 3], and the second has indexes [2].
   
   The goal is to filter as much data before we evaluating expensive 
expressions, so a perfect solution should consider both predicate selectivity 
and expression eval cost, then decide how to split the `Project`. For 
simplicity the assumption is predicate are all selective, and expressions are 
equally expensive. Then we should run predicates that reference less expensive 
expressions:
   ```
   Filter c1 + c3 > 0
     Project expr1 AS c1, c2, expr3 as c3
       Filter c2 > 0
         Project expr2 AS c2
   ```
   
   Catalyst optimizer can run many iterations, so we don't need to finalize the 
plan in one shot. For this example:
   ```
   Filter c1 > 0 AND c2 > 0 AND c3 > 0
     Project expr1 AS c1, expr2 AS c2, expr3 as c3
   ```
   we can split the Project into 3
   ```
   Filter c3 > 0
     Project c1, c2, expr3 as c3
       Filter c2 > 0
         Project c1, expr2 AS c2
           Filter c1 > 0
             Project expr1 AS c1
   ```
   
   For each iteration, we only need to split a Project into two, and push down 
predicates that reference the same set of expressions (same indexes), and wait 
for the next iteration to keep splitting until there is no need to split.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to