cloud-fan commented on code in PR #39170:
URL: https://github.com/apache/spark/pull/39170#discussion_r1164131408


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala:
##########
@@ -114,44 +114,63 @@ object InjectRuntimeFilter extends Rule[LogicalPlan] with 
PredicateHelper with J
   }
 
   /**
-   * Returns whether the plan is a simple filter over scan and the filter is 
likely selective
-   * Also check if the plan only has simple expressions (attribute reference, 
literals) so that we
-   * do not add a subquery that might have an expensive computation
+   * Extracts a sub-plan which is a simple filter over scan from the input 
plan. The simple
+   * filter should be selective and the filter condition (including 
expressions in the child
+   * plan referenced by the filter condition) should be a simple expression, 
so that we do
+   * not add a subquery that might have an expensive computation.
    */
-  private def isSelectiveFilterOverScan(plan: LogicalPlan): Boolean = {
+  private def extractSelectiveFilterOverScan(
+      plan: LogicalPlan,
+      filterCreationSideExp: Expression): Option[LogicalPlan] = {
     @tailrec
-    def isSelective(
+    def extract(
         p: LogicalPlan,
         predicateReference: AttributeSet,
         hasHitFilter: Boolean,
-        hasHitSelectiveFilter: Boolean): Boolean = p match {
-      case Project(projectList, child) =>
-        if (hasHitFilter) {
-          // We need to make sure all expressions referenced by filter 
predicates are simple
-          // expressions.
-          val referencedExprs = projectList.filter(predicateReference.contains)
-          referencedExprs.forall(isSimpleExpression) &&
-            isSelective(
-              child,
-              referencedExprs.map(_.references).foldLeft(AttributeSet.empty)(_ 
++ _),
-              hasHitFilter,
-              hasHitSelectiveFilter)
+        hasHitSelectiveFilter: Boolean): Option[LogicalPlan] = p match {
+      case Project(projectList, child) if hasHitFilter =>
+        // We need to make sure all expressions referenced by filter 
predicates are simple
+        // expressions.
+        val referencedExprs = projectList.filter(predicateReference.contains)
+        if (referencedExprs.forall(isSimpleExpression)) {
+          extract(
+            child,
+            referencedExprs.map(_.references).foldLeft(AttributeSet.empty)(_ 
++ _),
+            hasHitFilter,
+            hasHitSelectiveFilter)
         } else {
-          assert(predicateReference.isEmpty && !hasHitSelectiveFilter)
-          isSelective(child, predicateReference, hasHitFilter, 
hasHitSelectiveFilter)
+          None
         }
-      case Filter(condition, child) =>
-        isSimpleExpression(condition) && isSelective(
+      case Project(_, child) =>
+        assert(predicateReference.isEmpty && !hasHitSelectiveFilter)
+        extract(child, predicateReference, hasHitFilter, hasHitSelectiveFilter)
+      case Filter(condition, child) if isSimpleExpression(condition) =>
+        extract(
           child,
           predicateReference ++ condition.references,
           hasHitFilter = true,
           hasHitSelectiveFilter = hasHitSelectiveFilter || 
isLikelySelective(condition))
-      case _: LeafNode => hasHitSelectiveFilter
-      case _ => false
+      case ExtractEquiJoinKeys(joinType, _, _, _, _, left, right, hint) =>
+        // Runtime filters use one side of the [[Join]] to build a set of join 
key values and prune
+        // the other side of the [[Join]]. It's also OK to use a superset of 
the join key values
+        // (ignore null values) to do the pruning.
+        if (left.output.exists(_.semanticEquals(filterCreationSideExp))) {
+          extractSelectiveFilterOverScan(left, filterCreationSideExp)

Review Comment:
   to make `def extract` still a tail recursion, we should call `extract(left, 
AttributeSet.empty, hasHitFilter = false, hasHitSelectiveFilter = false)` here



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala:
##########
@@ -114,44 +114,63 @@ object InjectRuntimeFilter extends Rule[LogicalPlan] with 
PredicateHelper with J
   }
 
   /**
-   * Returns whether the plan is a simple filter over scan and the filter is 
likely selective
-   * Also check if the plan only has simple expressions (attribute reference, 
literals) so that we
-   * do not add a subquery that might have an expensive computation
+   * Extracts a sub-plan which is a simple filter over scan from the input 
plan. The simple
+   * filter should be selective and the filter condition (including 
expressions in the child
+   * plan referenced by the filter condition) should be a simple expression, 
so that we do
+   * not add a subquery that might have an expensive computation.
    */
-  private def isSelectiveFilterOverScan(plan: LogicalPlan): Boolean = {
+  private def extractSelectiveFilterOverScan(
+      plan: LogicalPlan,
+      filterCreationSideExp: Expression): Option[LogicalPlan] = {
     @tailrec
-    def isSelective(
+    def extract(
         p: LogicalPlan,
         predicateReference: AttributeSet,
         hasHitFilter: Boolean,
-        hasHitSelectiveFilter: Boolean): Boolean = p match {
-      case Project(projectList, child) =>
-        if (hasHitFilter) {
-          // We need to make sure all expressions referenced by filter 
predicates are simple
-          // expressions.
-          val referencedExprs = projectList.filter(predicateReference.contains)
-          referencedExprs.forall(isSimpleExpression) &&
-            isSelective(
-              child,
-              referencedExprs.map(_.references).foldLeft(AttributeSet.empty)(_ 
++ _),
-              hasHitFilter,
-              hasHitSelectiveFilter)
+        hasHitSelectiveFilter: Boolean): Option[LogicalPlan] = p match {
+      case Project(projectList, child) if hasHitFilter =>
+        // We need to make sure all expressions referenced by filter 
predicates are simple
+        // expressions.
+        val referencedExprs = projectList.filter(predicateReference.contains)
+        if (referencedExprs.forall(isSimpleExpression)) {
+          extract(
+            child,
+            referencedExprs.map(_.references).foldLeft(AttributeSet.empty)(_ 
++ _),
+            hasHitFilter,
+            hasHitSelectiveFilter)
         } else {
-          assert(predicateReference.isEmpty && !hasHitSelectiveFilter)
-          isSelective(child, predicateReference, hasHitFilter, 
hasHitSelectiveFilter)
+          None
         }
-      case Filter(condition, child) =>
-        isSimpleExpression(condition) && isSelective(
+      case Project(_, child) =>
+        assert(predicateReference.isEmpty && !hasHitSelectiveFilter)
+        extract(child, predicateReference, hasHitFilter, hasHitSelectiveFilter)
+      case Filter(condition, child) if isSimpleExpression(condition) =>
+        extract(
           child,
           predicateReference ++ condition.references,
           hasHitFilter = true,
           hasHitSelectiveFilter = hasHitSelectiveFilter || 
isLikelySelective(condition))
-      case _: LeafNode => hasHitSelectiveFilter
-      case _ => false
+      case ExtractEquiJoinKeys(joinType, _, _, _, _, left, right, hint) =>
+        // Runtime filters use one side of the [[Join]] to build a set of join 
key values and prune
+        // the other side of the [[Join]]. It's also OK to use a superset of 
the join key values
+        // (ignore null values) to do the pruning.
+        if (left.output.exists(_.semanticEquals(filterCreationSideExp))) {
+          extractSelectiveFilterOverScan(left, filterCreationSideExp)
+        } else if 
(right.output.exists(_.semanticEquals(filterCreationSideExp))) {
+          extractSelectiveFilterOverScan(right, filterCreationSideExp)

Review Comment:
   ditto



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to