cloud-fan commented on code in PR #45146:
URL: https://github.com/apache/spark/pull/45146#discussion_r1503508797


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala:
##########
@@ -1776,26 +1776,57 @@ object PushPredicateThroughNonJoin extends 
Rule[LogicalPlan] with PredicateHelpe
 
     case filter @ Filter(condition, union: Union) =>
       // Union could change the rows, so non-deterministic predicate can't be 
pushed down
-      val (pushDown, stayUp) = 
splitConjunctivePredicates(condition).partition(_.deterministic)
+      // We should also only push down filters which are equal (either ref or 
semantic) to an
+      // output of the union. We check referential equality since semantic 
equality of a named field
+      // may be false as the data type may have changed to include nullable 
during the union.
+      val output = union.output
+      def semanticOrRefEqual(e1: Expression, e2: Expression): Boolean = {
+        val e1c = e1.canonicalized
+        val e2c = e2.canonicalized
+        if (e1c.semanticEquals(e2c)) {
+          true
+        } else if (e1c.isInstanceOf[NamedExpression] && 
e2c.isInstanceOf[NamedExpression]) {
+          val named1 = e1c.asInstanceOf[NamedExpression]
+          val named2 = e2c.asInstanceOf[NamedExpression]
+          if (named1.exprId == named2.exprId) {
+            true
+          } else {
+            false
+          }
+        } else {
+          false
+        }
+      }
+      def eligibleForPushdown(e: Expression): Boolean = {
+        e.deterministic && e.references.forall { ref =>
+          output.exists(e2 => semanticOrRefEqual(ref, e2))
+        }
+      }
+      val (pushDown, stayUp) = 
splitConjunctivePredicates(condition).partition(eligibleForPushdown)
 
       if (pushDown.nonEmpty) {
         val pushDownCond = pushDown.reduceLeft(And)
-        val output = union.output
+        // The union is the child of the filter so it's children are 
grandchildren.
+        // Moves filters down to the grandchild if there is an element in the 
grand child's
+        // output which is semantically equal to the filter being evaluated.
         val newGrandChildren = union.children.map { grandchild =>
           val newCond = pushDownCond transform {
-            case e if output.exists(_.semanticEquals(e)) =>

Review Comment:
   `output` is `Seq[Attribute]`, so we can simplify the fix here
   ```
   case a: Attribute if output.exists(_.exprId == a.exprId) =>
   
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to