beliefer commented on code in PR #46263:
URL: https://github.com/apache/spark/pull/46263#discussion_r1582095704


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala:
##########
@@ -120,34 +132,49 @@ object InjectRuntimeFilter extends Rule[LogicalPlan] with 
PredicateHelper with J
           hasHitSelectiveFilter = hasHitSelectiveFilter || 
isLikelySelective(condition),
           currentPlan,
           targetKey)
-      case ExtractEquiJoinKeys(_, lkeys, rkeys, _, _, left, right, _) =>
+      case ExtractEquiJoinKeys(joinType, lkeys, rkeys, _, _, left, right, _) =>
         // Runtime filters use one side of the [[Join]] to build a set of join 
key values and prune
         // the other side of the [[Join]]. It's also OK to use a superset of 
the join key values
         // (ignore null values) to do the pruning.
         // We assume other rules have already pushed predicates through join 
if possible.
         // So the predicate references won't pass on anymore.
         if (left.output.exists(_.semanticEquals(targetKey))) {
-          extract(left, AttributeSet.empty, hasHitFilter = false, 
hasHitSelectiveFilter = false,
-            currentPlan = left, targetKey = targetKey).orElse {
-            // We can also extract from the right side if the join keys are 
transitive.
+          val extractLeft = if (canExtractLeft(joinType)) {
+            extract(left, AttributeSet.empty, hasHitFilter = false, 
hasHitSelectiveFilter = false,
+              currentPlan = left, targetKey = targetKey)
+          } else {
+            None
+          }
+          val extractRight = if (canExtractRight(joinType)) {
             lkeys.zip(rkeys).find(_._1.semanticEquals(targetKey)).map(_._2)
               .flatMap { newTargetKey =>
                 extract(right, AttributeSet.empty,
                   hasHitFilter = false, hasHitSelectiveFilter = false, 
currentPlan = right,
                   targetKey = newTargetKey)
               }
+          } else {
+            None
           }
+          extractLeft.orElse(extractRight)
         } else if (right.output.exists(_.semanticEquals(targetKey))) {
-          extract(right, AttributeSet.empty, hasHitFilter = false, 
hasHitSelectiveFilter = false,
-            currentPlan = right, targetKey = targetKey).orElse {
+          val extractRight = if (canExtractRight(joinType)) {
+            extract(right, AttributeSet.empty, hasHitFilter = false, 
hasHitSelectiveFilter = false,
+              currentPlan = right, targetKey = targetKey)
+          } else {
+            None
+          }
+          val extractLeft = if (canExtractLeft(joinType)) {
             // We can also extract from the left side if the join keys are 
transitive.
             rkeys.zip(lkeys).find(_._1.semanticEquals(targetKey)).map(_._2)
               .flatMap { newTargetKey =>
                 extract(left, AttributeSet.empty,
                   hasHitFilter = false, hasHitSelectiveFilter = false, 
currentPlan = left,
                   targetKey = newTargetKey)
               }
+          } else {
+            None
           }
+          extractRight.orElse(extractLeft)

Review Comment:
   ditto



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala:
##########
@@ -86,6 +87,17 @@ object InjectRuntimeFilter extends Rule[LogicalPlan] with 
PredicateHelper with J
   private def extractSelectiveFilterOverScan(
       plan: LogicalPlan,
       filterCreationSideKey: Expression): Option[(Expression, LogicalPlan)] = {
+
+    def canExtractRight(joinType: JoinType): Boolean = joinType match {
+      case Inner | LeftSemi | RightOuter => true
+      case _ => false
+    }
+
+    def canExtractLeft(joinType: JoinType): Boolean = joinType match {
+      case Inner | LeftSemi | LeftOuter | LeftAnti => true
+      case _ => false
+    }

Review Comment:
   Shall we add the two methods into `joins.scala`?



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala:
##########
@@ -120,34 +132,49 @@ object InjectRuntimeFilter extends Rule[LogicalPlan] with 
PredicateHelper with J
           hasHitSelectiveFilter = hasHitSelectiveFilter || 
isLikelySelective(condition),
           currentPlan,
           targetKey)
-      case ExtractEquiJoinKeys(_, lkeys, rkeys, _, _, left, right, _) =>
+      case ExtractEquiJoinKeys(joinType, lkeys, rkeys, _, _, left, right, _) =>
         // Runtime filters use one side of the [[Join]] to build a set of join 
key values and prune
         // the other side of the [[Join]]. It's also OK to use a superset of 
the join key values
         // (ignore null values) to do the pruning.
         // We assume other rules have already pushed predicates through join 
if possible.
         // So the predicate references won't pass on anymore.
         if (left.output.exists(_.semanticEquals(targetKey))) {
-          extract(left, AttributeSet.empty, hasHitFilter = false, 
hasHitSelectiveFilter = false,
-            currentPlan = left, targetKey = targetKey).orElse {
-            // We can also extract from the right side if the join keys are 
transitive.
+          val extractLeft = if (canExtractLeft(joinType)) {
+            extract(left, AttributeSet.empty, hasHitFilter = false, 
hasHitSelectiveFilter = false,
+              currentPlan = left, targetKey = targetKey)
+          } else {
+            None
+          }
+          val extractRight = if (canExtractRight(joinType)) {
             lkeys.zip(rkeys).find(_._1.semanticEquals(targetKey)).map(_._2)
               .flatMap { newTargetKey =>
                 extract(right, AttributeSet.empty,
                   hasHitFilter = false, hasHitSelectiveFilter = false, 
currentPlan = right,
                   targetKey = newTargetKey)
               }
+          } else {
+            None
           }
+          extractLeft.orElse(extractRight)

Review Comment:
   How about
   ```
   extractLeft.orElse {
     if (canExtractRight(joinType)) {
       ...
     } else {
       None
     }
   }
   ``` ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to