beliefer commented on code in PR #46263: URL: https://github.com/apache/spark/pull/46263#discussion_r1582095704
########## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala: ########## @@ -120,34 +132,49 @@ object InjectRuntimeFilter extends Rule[LogicalPlan] with PredicateHelper with J hasHitSelectiveFilter = hasHitSelectiveFilter || isLikelySelective(condition), currentPlan, targetKey) - case ExtractEquiJoinKeys(_, lkeys, rkeys, _, _, left, right, _) => + case ExtractEquiJoinKeys(joinType, lkeys, rkeys, _, _, left, right, _) => // Runtime filters use one side of the [[Join]] to build a set of join key values and prune // the other side of the [[Join]]. It's also OK to use a superset of the join key values // (ignore null values) to do the pruning. // We assume other rules have already pushed predicates through join if possible. // So the predicate references won't pass on anymore. if (left.output.exists(_.semanticEquals(targetKey))) { - extract(left, AttributeSet.empty, hasHitFilter = false, hasHitSelectiveFilter = false, - currentPlan = left, targetKey = targetKey).orElse { - // We can also extract from the right side if the join keys are transitive. + val extractLeft = if (canExtractLeft(joinType)) { + extract(left, AttributeSet.empty, hasHitFilter = false, hasHitSelectiveFilter = false, + currentPlan = left, targetKey = targetKey) + } else { + None + } + val extractRight = if (canExtractRight(joinType)) { lkeys.zip(rkeys).find(_._1.semanticEquals(targetKey)).map(_._2) .flatMap { newTargetKey => extract(right, AttributeSet.empty, hasHitFilter = false, hasHitSelectiveFilter = false, currentPlan = right, targetKey = newTargetKey) } + } else { + None } + extractLeft.orElse(extractRight) } else if (right.output.exists(_.semanticEquals(targetKey))) { - extract(right, AttributeSet.empty, hasHitFilter = false, hasHitSelectiveFilter = false, - currentPlan = right, targetKey = targetKey).orElse { + val extractRight = if (canExtractRight(joinType)) { + extract(right, AttributeSet.empty, hasHitFilter = false, hasHitSelectiveFilter = false, + currentPlan = right, targetKey = targetKey) + } else { + None + } + val extractLeft = if (canExtractLeft(joinType)) { // We can also extract from the left side if the join keys are transitive. rkeys.zip(lkeys).find(_._1.semanticEquals(targetKey)).map(_._2) .flatMap { newTargetKey => extract(left, AttributeSet.empty, hasHitFilter = false, hasHitSelectiveFilter = false, currentPlan = left, targetKey = newTargetKey) } + } else { + None } + extractRight.orElse(extractLeft) Review Comment: ditto ########## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala: ########## @@ -86,6 +87,17 @@ object InjectRuntimeFilter extends Rule[LogicalPlan] with PredicateHelper with J private def extractSelectiveFilterOverScan( plan: LogicalPlan, filterCreationSideKey: Expression): Option[(Expression, LogicalPlan)] = { + + def canExtractRight(joinType: JoinType): Boolean = joinType match { + case Inner | LeftSemi | RightOuter => true + case _ => false + } + + def canExtractLeft(joinType: JoinType): Boolean = joinType match { + case Inner | LeftSemi | LeftOuter | LeftAnti => true + case _ => false + } Review Comment: Shall we add the two methods into `joins.scala`? ########## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala: ########## @@ -120,34 +132,49 @@ object InjectRuntimeFilter extends Rule[LogicalPlan] with PredicateHelper with J hasHitSelectiveFilter = hasHitSelectiveFilter || isLikelySelective(condition), currentPlan, targetKey) - case ExtractEquiJoinKeys(_, lkeys, rkeys, _, _, left, right, _) => + case ExtractEquiJoinKeys(joinType, lkeys, rkeys, _, _, left, right, _) => // Runtime filters use one side of the [[Join]] to build a set of join key values and prune // the other side of the [[Join]]. It's also OK to use a superset of the join key values // (ignore null values) to do the pruning. // We assume other rules have already pushed predicates through join if possible. // So the predicate references won't pass on anymore. if (left.output.exists(_.semanticEquals(targetKey))) { - extract(left, AttributeSet.empty, hasHitFilter = false, hasHitSelectiveFilter = false, - currentPlan = left, targetKey = targetKey).orElse { - // We can also extract from the right side if the join keys are transitive. + val extractLeft = if (canExtractLeft(joinType)) { + extract(left, AttributeSet.empty, hasHitFilter = false, hasHitSelectiveFilter = false, + currentPlan = left, targetKey = targetKey) + } else { + None + } + val extractRight = if (canExtractRight(joinType)) { lkeys.zip(rkeys).find(_._1.semanticEquals(targetKey)).map(_._2) .flatMap { newTargetKey => extract(right, AttributeSet.empty, hasHitFilter = false, hasHitSelectiveFilter = false, currentPlan = right, targetKey = newTargetKey) } + } else { + None } + extractLeft.orElse(extractRight) Review Comment: How about ``` extractLeft.orElse { if (canExtractRight(joinType)) { ... } else { None } } ``` ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org