cloud-fan commented on code in PR #46143:
URL: https://github.com/apache/spark/pull/46143#discussion_r2704971473


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala:
##########
@@ -2048,23 +2048,116 @@ object PushDownPredicates extends Rule[LogicalPlan] {
  * Pushes [[Filter]] operators through many operators iff:
  * 1) the operator is deterministic
  * 2) the predicate is deterministic and the operator will not change any of 
rows.
+ * 3) We don't add double evaluation OR double evaluation would be cheap OR 
we're configured to.
  *
- * This heuristic is valid assuming the expression evaluation cost is minimal.
  */
 object PushPredicateThroughNonJoin extends Rule[LogicalPlan] with 
PredicateHelper {
   def apply(plan: LogicalPlan): LogicalPlan = plan transform applyLocally
 
+  // Match the column ordering, this is important as we shouldn't change the 
schema
+  // , even the positional one, during optimization.
+  private def matchColumnOrdering(original: Seq[NamedExpression], updated: 
Seq[NamedExpression]):
+      Seq[NamedExpression] = {
+    val nameToIndex = original.map(_.name).zipWithIndex.toMap
+    val response = updated.toArray
+    updated.foreach { e =>
+      val idx = nameToIndex(e.name)
+      response(idx) = e
+    }
+    response.toSeq
+  }
+
   val applyLocally: PartialFunction[LogicalPlan, LogicalPlan] = {
+    // Projections are a special case because the filter _may_ contain 
references to fields added in
+    // the projection that we wish to copy. We shouldn't blindly copy 
everything
+    // since double evaluation all operations can be expensive (unless the 
broken behavior is
+    // enabled by the user). The double filter eval regression was added in 
Spark 3 fixed in 4.2.
+    // The _new_ default algorithm works as follows:
+    // Provided filters are broken up based on their &&s for separate 
evaluation.
+    // We track which components of the projection are used in the filters.
+    //
+    // 1) The filter does not reference anything in the projection: pushed
+    // 2) Filter which reference _inexpensive_ items in projection: pushed and 
reference resolved
+    //    resulting in double evaluation, but only of inexpensive items -- 
worth it to filter
+    //    records sooner.
+    // (Case 1 & 2 are treated as "cheap" predicates)
+    // 3) When an a filter references expensive to compute references we do 
not push it.
+    // Note that a given filter may contain parts (sepereated by logical ands) 
from all cases.
+    // We handle each part separately according to the logic above.
+    // Additional restriction:
     // SPARK-13473: We can't push the predicate down when the underlying 
projection output non-
     // deterministic field(s).  Non-deterministic expressions are essentially 
stateful. This
     // implies that, for a given input row, the output are determined by the 
expression's initial
     // state and all the input rows processed before. In another word, the 
order of input rows
     // matters for non-deterministic expressions, while pushing down 
predicates changes the order.
     // This also applies to Aggregate.
-    case Filter(condition, project @ Project(fields, grandChild))
+    case f @ Filter(condition, project @ Project(fields, grandChild))
       if fields.forall(_.deterministic) && canPushThroughCondition(grandChild, 
condition) =>
+      // All of the aliases in the projection
       val aliasMap = getAliasMap(project)
-      project.copy(child = Filter(replaceAlias(condition, aliasMap), 
grandChild))
+      // Break up the filter into its respective components by &&s.
+      val splitCondition = splitConjunctivePredicates(condition)
+      // Find the different aliases each component of the filter uses.
+      val usedAliasesForCondition = splitCondition.map { cond =>
+        // If the legacy double evaluation behavior is enabled we just say
+        // every filter is "free."
+        if (!SQLConf.get.avoidDoubleFilterEval) {
+          (cond, AttributeMap.empty[Alias])
+        } else {
+          val (replaced, usedAliases) = replaceAliasWhileTracking(cond, 
aliasMap)
+          (cond, usedAliases)
+        }
+      }
+      // Split the filter's components into cheap and expensive while keeping 
track of
+      // what each references from the projection.
+      val (cheapWithUsed, expensiveWithUsed) = usedAliasesForCondition
+        .partition { case (cond, used) =>
+        if (!SQLConf.get.avoidDoubleFilterEval) {
+          // If we are always pushing through short circuit the check.
+          true
+        } else {
+          // Didn't use anything? We're good
+          if (used.isEmpty) {
+            true
+          } else if (!used.exists(_._2.child.expensive)) {
+            // If it's cheap we can push it because it might eliminate more 
data quickly and
+            // it may also be something which could be evaluated at the 
storage layer.
+            // We may wish to improve this heuristic in the future.
+            true
+          } else {
+            false
+          }
+        }
+      }
+      // Short circuit if we do not have any cheap filters return the original 
filter as is.
+      if (cheapWithUsed.isEmpty) {
+        f
+      } else {
+        val cheap: Seq[Expression] = cheapWithUsed.map(_._1)
+        // Make a base instance which has all of the cheap filters pushed down.
+        // For all filter which do not reference any expensive aliases then
+        // just push the filter while resolving the non-expensive aliases.
+        val combinedCheapFilter = cheap.reduce(And)
+        val resolvedCheapFilter = replaceAlias(combinedCheapFilter, aliasMap)
+        val baseChild: LogicalPlan = Filter(resolvedCheapFilter, child = 
grandChild)
+        // Insert a last projection to match the desired column ordering and
+        // evaluate any stragglers and select the already computed columns.

Review Comment:
   hmm why is this needed if we only push a Filter through a Project?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to