cloud-fan commented on a change in pull request #32602:
URL: https://github.com/apache/spark/pull/32602#discussion_r636705888



##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala
##########
@@ -72,11 +65,72 @@ object PropagateEmptyRelation extends Rule[LogicalPlan] 
with PredicateHelper wit
         }
       }
 
+    case p: UnaryNode if p.children.nonEmpty && 
p.children.forall(isEmptyLocalRelation) => p match {
+      case _: Project => empty(p)
+      case _: Filter => empty(p)
+      case _: Sample => empty(p)
+      case _ => p
+    }
+  }
+}
+
+/**
+ * The rule used by both normal Optimizer and AQE Optimizer for:
+ *  1. Binary-node Logical Plans
+ *     - Join with one or two empty children (including Intersect/Except).
+ *     - Join is single column NULL-aware anti join (NAAJ)
+ *       Broadcasted [[HashedRelation]] is [[HashedRelationWithAllNullKeys]]. 
Eliminate join to an
+ *       empty [[LocalRelation]].
+ *     - Left semi Join
+ *       Right side is non-empty and condition is empty. Eliminate join to its 
left side.
+ *     - Left anti join
+ *       Right side is non-empty and condition is empty. Eliminate join to an 
empty
+ *       [[LocalRelation]].
+ *  2. Unary-node Logical Plans
+ *     - Limit/Repartition with all empty children.
+ *     - Aggregate with all empty children and at least one grouping 
expression.
+ *     - Generate(Explode) with all empty children. Others like Hive UDTF may 
return results.
+ *
+ * @param checkRowCount At AQE side, we use the query stage stats to check the 
check.
+ * @param isRelationWithAllNullKeys At AQE side, we use the broadcast query 
stage to do the check.
+ */
+case class PropagateEmptyRelationAdvanced(
+    checkRowCount: Option[(LogicalPlan, Boolean) => Boolean] = None,
+    isRelationWithAllNullKeys: Option[LogicalPlan => Boolean] = None)
+  extends Rule[LogicalPlan] with CastSupport {
+
+  private def isEmptyLocalRelation(plan: LogicalPlan): Boolean = {
+    val defaultEmptyRelation: Boolean = plan match {
+      case p: LocalRelation => p.data.isEmpty
+      case _ => false
+    }
+
+    if (checkRowCount.isDefined) {
+      checkRowCount.get.apply(plan, false) || defaultEmptyRelation
+    } else {
+      defaultEmptyRelation
+    }
+  }
+
+  private def empty(plan: LogicalPlan) =
+    LocalRelation(plan.output, data = Seq.empty, isStreaming = 
plan.isStreaming)
+
+  // Construct a project list from plan's output, while the value is always 
NULL.
+  private def nullValueProjectList(plan: LogicalPlan): Seq[NamedExpression] =
+    plan.output.map{ a => Alias(cast(Literal(null), a.dataType), 
a.name)(a.exprId) }
+
+  // We can not use transformUpWithPruning here since this rule is used by 
both normal Optimizer
+  // and AQE Optimizer. And this may only effective at AQE side.

Review comment:
       ah good point. I think there is a way to overcome it:
   1. Create an abstract class `PropagateEmptyRelationBase` that contains util 
functions and optimizes expensive operators such as join, aggregate, etc.
   2. Create a rule `PropagateEmptyRelation extends PropagateEmptyRelationBase` 
that additionally optimzes project, filter, etc.
   3. Create a rule `AQEPropagateEmptyRelation extends 
PropagateEmptyRelationBase` that overrides some util functions like 
`isEmptyPlan`.
   
   Then this two rules can define their transformation prunning separatedly.

##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala
##########
@@ -72,11 +65,72 @@ object PropagateEmptyRelation extends Rule[LogicalPlan] 
with PredicateHelper wit
         }
       }
 
+    case p: UnaryNode if p.children.nonEmpty && 
p.children.forall(isEmptyLocalRelation) => p match {
+      case _: Project => empty(p)
+      case _: Filter => empty(p)
+      case _: Sample => empty(p)
+      case _ => p
+    }
+  }
+}
+
+/**
+ * The rule used by both normal Optimizer and AQE Optimizer for:
+ *  1. Binary-node Logical Plans
+ *     - Join with one or two empty children (including Intersect/Except).
+ *     - Join is single column NULL-aware anti join (NAAJ)
+ *       Broadcasted [[HashedRelation]] is [[HashedRelationWithAllNullKeys]]. 
Eliminate join to an
+ *       empty [[LocalRelation]].
+ *     - Left semi Join
+ *       Right side is non-empty and condition is empty. Eliminate join to its 
left side.
+ *     - Left anti join
+ *       Right side is non-empty and condition is empty. Eliminate join to an 
empty
+ *       [[LocalRelation]].
+ *  2. Unary-node Logical Plans
+ *     - Limit/Repartition with all empty children.
+ *     - Aggregate with all empty children and at least one grouping 
expression.
+ *     - Generate(Explode) with all empty children. Others like Hive UDTF may 
return results.
+ *
+ * @param checkRowCount At AQE side, we use the query stage stats to check the 
check.
+ * @param isRelationWithAllNullKeys At AQE side, we use the broadcast query 
stage to do the check.
+ */
+case class PropagateEmptyRelationAdvanced(
+    checkRowCount: Option[(LogicalPlan, Boolean) => Boolean] = None,
+    isRelationWithAllNullKeys: Option[LogicalPlan => Boolean] = None)
+  extends Rule[LogicalPlan] with CastSupport {
+
+  private def isEmptyLocalRelation(plan: LogicalPlan): Boolean = {
+    val defaultEmptyRelation: Boolean = plan match {
+      case p: LocalRelation => p.data.isEmpty
+      case _ => false
+    }
+
+    if (checkRowCount.isDefined) {
+      checkRowCount.get.apply(plan, false) || defaultEmptyRelation
+    } else {
+      defaultEmptyRelation
+    }
+  }
+
+  private def empty(plan: LogicalPlan) =
+    LocalRelation(plan.output, data = Seq.empty, isStreaming = 
plan.isStreaming)
+
+  // Construct a project list from plan's output, while the value is always 
NULL.
+  private def nullValueProjectList(plan: LogicalPlan): Seq[NamedExpression] =
+    plan.output.map{ a => Alias(cast(Literal(null), a.dataType), 
a.name)(a.exprId) }
+
+  // We can not use transformUpWithPruning here since this rule is used by 
both normal Optimizer
+  // and AQE Optimizer. And this may only effective at AQE side.

Review comment:
       ah good point. I think there is a way to overcome it:
   1. Create an abstract class `PropagateEmptyRelationBase` that contains util 
functions and optimizes expensive operators such as join, aggregate, etc.
   2. Create a rule `PropagateEmptyRelation extends PropagateEmptyRelationBase` 
that additionally optimzes project, filter, etc.
   3. Create a rule `AQEPropagateEmptyRelation extends 
PropagateEmptyRelationBase` that overrides some util functions like 
`isEmptyPlan`.
   
   Then these two rules can define their transformation prunning separatedly.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to