cloud-fan commented on code in PR #46143:
URL: https://github.com/apache/spark/pull/46143#discussion_r2525714854


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala:
##########
@@ -2053,17 +2053,117 @@ object PushDownPredicates extends Rule[LogicalPlan] {
 object PushPredicateThroughNonJoin extends Rule[LogicalPlan] with 
PredicateHelper {
   def apply(plan: LogicalPlan): LogicalPlan = plan transform applyLocally
 
+  // Match the column ordering. It's not something we guarantee but seems like 
it
+  // could be useful especially for folks writing out to "raw" files.
+  private def matchColumnOrdering(original: Seq[NamedExpression], updated: 
Seq[NamedExpression]):
+      Seq[NamedExpression] = {
+    val nameToIndex = original.map(_.name).zipWithIndex.toMap
+    val response = updated.toArray
+    updated.foreach { e =>
+      val idx = nameToIndex(e.name)
+      response(idx) = e
+    }
+    response.toSeq
+  }
+
   val applyLocally: PartialFunction[LogicalPlan, LogicalPlan] = {
     // SPARK-13473: We can't push the predicate down when the underlying 
projection output non-
     // deterministic field(s).  Non-deterministic expressions are essentially 
stateful. This
     // implies that, for a given input row, the output are determined by the 
expression's initial
     // state and all the input rows processed before. In another word, the 
order of input rows
     // matters for non-deterministic expressions, while pushing down 
predicates changes the order.
     // This also applies to Aggregate.
-    case Filter(condition, project @ Project(fields, grandChild))
+    case f @ Filter(condition, project @ Project(fields, grandChild))
       if fields.forall(_.deterministic) && canPushThroughCondition(grandChild, 
condition) =>
       val aliasMap = getAliasMap(project)
-      project.copy(child = Filter(replaceAlias(condition, aliasMap), 
grandChild))

Review Comment:
   The new impl looks overly complicated. I think we only need to split the 
predicates and push down the part that won't trigger double eval. e.g.
   ```
   val (stayUp, pushDown) = splitConjunctivePredicates(condition).partition { 
predicate =>
     replaceAlias(predicate, aliasMap).expensive
   }
   (stayUp, pushDown) match {
     case (_, Nil) =>
       // Nothing to push down, keep the same filter.
       filter
     case (Nil, _) =>
       // Push all
       project.copy(child = Filter(replaceAlias(condition, aliasMap), 
grandChild))
     case _ =>
       Filter(
         stayUp.reduce(And),
         project.copy(
           child = Filter(replaceAlias(pushDown.reduce(And), aliasMap), 
grandChild)
         )
       )
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to