dongjoon-hyun commented on code in PR #55628:
URL: https://github.com/apache/spark/pull/55628#discussion_r3170416338


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PlanMerger.scala:
##########
@@ -416,18 +425,37 @@ class PlanMerger(
           }
 
         case (np: Join, cp: Join) if np.joinType == cp.joinType && np.hint == 
cp.hint =>
-          // Filter propagation across joins is not yet supported.
-          tryMergePlans(np.left, cp.left, false).flatMap {
-            case TryMergeResult(mergedLeft, leftNPMapping, None, None) =>
-              tryMergePlans(np.right, cp.right, false).flatMap {
-                case TryMergeResult(mergedRight, rightNPMapping, None, None) =>
+          tryMergePlans(np.left, cp.left, filterPropagationSupported).flatMap {
+            case TryMergeResult(mergedLeft, leftNPMapping, leftNPFilter, 
leftCPFilter) =>
+              tryMergePlans(np.right, cp.right, 
filterPropagationSupported).flatMap {
+                case TryMergeResult(mergedRight, rightNPMapping, 
rightNPFilter, rightCPFilter)
+                    // If both children independently propagate filter 
attributes we would need to
+                    // AND them into a new alias above the join, which is not 
yet supported.
+                    if !(leftNPFilter.isDefined && rightNPFilter.isDefined) &&
+                       !(leftCPFilter.isDefined && rightCPFilter.isDefined) &&
+                       // Gate join-crossing filter propagation behind its own 
config flag.
+                       // When no filter attributes are in play the merge is 
unconditionally safe.
+                       (leftNPFilter.isEmpty && leftCPFilter.isEmpty &&
+                           rightNPFilter.isEmpty && rightCPFilter.isEmpty ||
+                           filterPropagationThroughJoinEnabled) &&
+                       // A filter attribute is only safe to propagate through 
a join if it comes
+                       // from the "preserved" (non-nullable) side. On the 
nullable side, unmatched
+                       // rows are NULL-padded so f=NULL, causing FILTER 
(WHERE f) to incorrectly
+                       // exclude rows that should contribute to the 
aggregate. Right-side
+                       // attributes are also absent from semi/anti join 
output.
+                       (leftNPFilter.isEmpty  && leftCPFilter.isEmpty  ||

Review Comment:
   `isEmpty  &&` -> `isEmpty &&` because the Apache Spark community doesn't use 
vertical alignment.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to