peter-toth commented on code in PR #46143:
URL: https://github.com/apache/spark/pull/46143#discussion_r2656430693


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala:
##########
@@ -2047,23 +2047,198 @@ object PushDownPredicates extends Rule[LogicalPlan] {
  * Pushes [[Filter]] operators through many operators iff:
  * 1) the operator is deterministic
  * 2) the predicate is deterministic and the operator will not change any of 
rows.
+ * 3) We don't add double evaluation OR double evaluation would be cheap OR 
we're configured to.
  *
- * This heuristic is valid assuming the expression evaluation cost is minimal.
  */
 object PushPredicateThroughNonJoin extends Rule[LogicalPlan] with 
PredicateHelper {
   def apply(plan: LogicalPlan): LogicalPlan = plan transform applyLocally
 
+  // Match the column ordering, this is important as we shouldn't change the 
schema
+  // , even the positional one, during optimization.
+  private def matchColumnOrdering(original: Seq[NamedExpression], updated: 
Seq[NamedExpression]):
+      Seq[NamedExpression] = {
+    val nameToIndex = original.map(_.name).zipWithIndex.toMap
+    val response = updated.toArray
+    updated.foreach { e =>
+      val idx = nameToIndex(e.name)
+      response(idx) = e
+    }
+    response.toSeq
+  }
+
   val applyLocally: PartialFunction[LogicalPlan, LogicalPlan] = {
+    // Projections are a special case because the filter _may_ contain 
references to fields added in
+    // the projection that we wish to copy. We shouldn't blindly copy 
everything
+    // since double evaluation all operations can be expensive (unless the 
broken behavior is
+    // enabled by the user). The double filter eval regression was added in 
Spark 3 fixed in 4.2.
+    // The _new_ default algorithm works as follows:
+    // Provided filters are broken up based on their &&s for separate 
evaluation.
+    // We track which components of the projection are used in the filters.
+    //
+    // 1) The filter does not reference anything in the projection: pushed
+    // 2) Filter which reference _inexpensive_ items in projection: pushed and 
reference resolved
+    //    resulting in double evaluation, but only of inexpensive items -- 
worth it to filter
+    //    records sooner.
+    // (Case 1 & 2 are treated as "cheap" predicates)
+    // 3) When an a filter references expensive to compute references we
+    //    check to see if we should split the projection into multiple parts 
w/ the filter
+    //    in between. This allows us to avoid double evaluating expensive 
references.
+    //  3a) If the filter references everything in the projection we can't 
split it any further.
+    //      so we leave the filter and projection as is.
+    //  3b) If the filter does not reference the entire projection then we can 
potentially reduce
+    //     the amount of computation but splitting the projection around the 
filter.
+    // Case 3 is treated as an "expensive" predicate.
+    // Note that a given filter may contain parts from all cases. We handle 
each part separately
+    // according to the logic above.
+    // Additional restriction:
     // SPARK-13473: We can't push the predicate down when the underlying 
projection output non-
     // deterministic field(s).  Non-deterministic expressions are essentially 
stateful. This
     // implies that, for a given input row, the output are determined by the 
expression's initial
     // state and all the input rows processed before. In another word, the 
order of input rows
     // matters for non-deterministic expressions, while pushing down 
predicates changes the order.
     // This also applies to Aggregate.
-    case Filter(condition, project @ Project(fields, grandChild))
+    case f @ Filter(condition, project @ Project(fields, grandChild))
       if fields.forall(_.deterministic) && canPushThroughCondition(grandChild, 
condition) =>
+      // All of the aliases in the projection
       val aliasMap = getAliasMap(project)
-      project.copy(child = Filter(replaceAlias(condition, aliasMap), 
grandChild))
+      // Break up the filter into its respective components by &&s.
+      val splitCondition = splitConjunctivePredicates(condition)
+      // Find the different aliases each component of the filter uses.
+      val usedAliasesForCondition = splitCondition.map { cond =>
+        // If the legacy double evaluation behavior is enabled we just say
+        // every filter is "free."
+        if (!SQLConf.get.avoidDoubleFilterEval) {
+          (cond, AttributeMap.empty[Alias])
+        } else {
+          val (replaced, usedAliases) = replaceAliasWhileTracking(cond, 
aliasMap)
+          // Didn't swap anything? empty list :)
+          if (cond == replaced) {
+            (cond, AttributeMap.empty[Alias])
+          } else {
+            (cond, usedAliases)
+          }
+        }
+      }
+      // Split the filter's components into cheap and expensive while keeping 
track of
+      // what each references from the projection.
+      val (cheapWithUsed, expensiveWithUsed) = usedAliasesForCondition
+        .partition { case (cond, used) =>
+        if (!SQLConf.get.avoidDoubleFilterEval) {
+          // If we are always pushing through short circuit the check.
+          true
+        } else {
+          // Didn't use anything? We're good
+          if (used.isEmpty) {
+            true
+          } else if (used.iterator.map(_._2.child.expensive).forall(_ == 
false)) {
+            // If it's cheap we can push it because it might eliminate more 
data quickly and
+            // it may also be something which could be evaluated at the 
storage layer.
+            // We may wish to improve this heuristic in the future.
+            true
+          } else {
+            false
+          }
+        }
+      }
+      if (cheapWithUsed.isEmpty && expensiveWithUsed.isEmpty) {
+        // Short circuit, we don't have anything to push OR split
+        f
+      } else {
+        // Handle all of the cheap filters (part 1 & 2).
+        val cheap: Seq[Expression] = cheapWithUsed.map(_._1)
+        // Make a base instance which has all of the cheap filters pushed down.
+        val baseChild: LogicalPlan = if (!cheap.isEmpty) {
+          // For all filter which do not reference any expensive aliases then
+          // just push the filter while resolving the non-expensive aliases.
+          val combinedCheapFilter = cheap.reduce(And)
+          val resolvedCheapFilter = replaceAlias(combinedCheapFilter, aliasMap)
+          Filter(resolvedCheapFilter, child = grandChild)
+        } else {
+          // If we don't have any inexpensive filters to push it's "just" the 
grandchild.
+          grandChild
+        }
+        // Handle any Case 3 filters
+        // We group the expensive components by the aliases used, since if 
they use the same
+        // aliases we can introduce them together.
+        val grouped = expensiveWithUsed.groupBy(_._2)
+        // Sort by the name of the head expensive alias so we have
+        // a consistent order for testing. It's possible we could do something
+        // smarter here if we build a better cost based heuristic.
+        val expensiveByUsed = grouped.view
+          .mapValues(_.map(_._1).toList).toList.sortBy(_._1.head._2.name)
+        // For each expensive alias figure out if they're in case 3A or 3B
+        val (toSplit, leaveAsIs) = expensiveByUsed.partition {
+          case (used, expensive) =>
+            // We can't split these filters from this projection since they
+            // have a 1:1 mapping with all of the aliases.
+            if (used == aliasMap) {
+              false
+            } else {
+              true
+            }
+        }
+        if (toSplit.isEmpty && cheap.isEmpty) {
+          // Nothing to push or split, short circuit (all filters are case 3B).
+          f
+        } else {
+          val expensiveFiltersDone = if (toSplit.isEmpty) {
+            // If we can't split anymore (no 3A), but we did have cheap filters
+            // pushed we still need to add the expensive filters back on top.
+            baseChild
+          } else {
+            // We have at least one filter that we can split the projection 
around.
+            // We're going to now add projections one at a time for the 
expensive components for
+            // each group of filters. We'll keep track of what we added for 
the previous filter(s)
+            // so we don't double add anything.

Review Comment:
   > let's group the conditions by the number of referenced named expressions, 
`group1` means only reference one named expression, and so on.
   > ...
   > For conditions in group1, we rank the named expressions by "how many 
conditions can be evaluated with it" and pick the best
   
   This seems like a good heuristics to me to split the projections around.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to