peter-toth commented on code in PR #46143:
URL: https://github.com/apache/spark/pull/46143#discussion_r2656426813


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala:
##########
@@ -2047,23 +2047,190 @@ object PushDownPredicates extends Rule[LogicalPlan] {
  * Pushes [[Filter]] operators through many operators iff:
  * 1) the operator is deterministic
  * 2) the predicate is deterministic and the operator will not change any of 
rows.
+ * 3) We don't add double evaluation OR double evaluation would be cheap OR 
we're configured to.
  *
- * This heuristic is valid assuming the expression evaluation cost is minimal.
  */
 object PushPredicateThroughNonJoin extends Rule[LogicalPlan] with 
PredicateHelper {
   def apply(plan: LogicalPlan): LogicalPlan = plan transform applyLocally
 
+  // Match the column ordering, this is important as we shouldn't change the 
schema
+  // , even the positional one, during optimization.
+  private def matchColumnOrdering(original: Seq[NamedExpression], updated: 
Seq[NamedExpression]):
+      Seq[NamedExpression] = {
+    val nameToIndex = original.map(_.name).zipWithIndex.toMap
+    val response = updated.toArray
+    updated.foreach { e =>
+      val idx = nameToIndex(e.name)
+      response(idx) = e
+    }
+    response.toSeq
+  }
+
   val applyLocally: PartialFunction[LogicalPlan, LogicalPlan] = {
+    // Projections are a special case because the filter _may_ contain 
references to fields added in
+    // the projection that we wish to copy. We shouldn't blindly copy 
everything
+    // since double evaluation all operations can be expensive (unless the 
broken behavior is
+    // enabled by the user). The double filter eval regression was added in 
Spark 3 fixed in 4.2.
+    // The _new_ default algorithm works as follows:
+    // Provided filters are broken up based on their &&s for separate 
evaluation.
+    // We track which components of the projection are used in the filters.
+    //
+    // 1) The filter does not reference anything in the projection: pushed
+    // 2) Filter which reference _inexpensive_ items in projection: pushed and 
reference resolved
+    //    resulting in double evaluation, but only of inexpensive items -- 
worth it to filter
+    //    records sooner.
+    // (Case 1 & 2 are treated as "cheap" predicates)
+    // 3) When an a filter references expensive to compute references we
+    //    check to see if we should split the projection into multiple parts 
w/ the filter
+    //    in between. This allows us to avoid double evaluating expensive 
references.
+    //  3a) If the filter references everything in the projection we can't 
split it any further.
+    //      so we leave the filter and projection as is.
+    //  3b) If the filter does not reference the entire projection then we can 
potentially reduce
+    //     the amount of computation but splitting the projection around the 
filter.
+    // Case 3 is treated as an "expensive" predicate.
+    // Note that a given filter may contain parts from all cases. We handle 
each part separately
+    // according to the logic above.
+    // Additional restriction:
     // SPARK-13473: We can't push the predicate down when the underlying 
projection output non-
     // deterministic field(s).  Non-deterministic expressions are essentially 
stateful. This
     // implies that, for a given input row, the output are determined by the 
expression's initial
     // state and all the input rows processed before. In another word, the 
order of input rows
     // matters for non-deterministic expressions, while pushing down 
predicates changes the order.
     // This also applies to Aggregate.
-    case Filter(condition, project @ Project(fields, grandChild))
+    case f @ Filter(condition, project @ Project(fields, grandChild))
       if fields.forall(_.deterministic) && canPushThroughCondition(grandChild, 
condition) =>
+      // All of the aliases in the projection
       val aliasMap = getAliasMap(project)
-      project.copy(child = Filter(replaceAlias(condition, aliasMap), 
grandChild))
+      // Break up the filter into its respective components by &&s.
+      val splitCondition = splitConjunctivePredicates(condition)
+      // Find the different aliases each component of the filter uses.
+      val usedAliasesForCondition = splitCondition.map { cond =>
+        // If the legacy double evaluation behavior is enabled we just say
+        // every filter is "free."
+        if (!SQLConf.get.avoidDoubleFilterEval) {
+          (cond, AttributeMap.empty[Alias])
+        } else {
+          val (replaced, usedAliases) = replaceAliasWhileTracking(cond, 
aliasMap)

Review Comment:
   Seems like you don't use `replaced` anywere or am I missing something?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to