cloud-fan commented on a change in pull request #32602:
URL: https://github.com/apache/spark/pull/32602#discussion_r636705888
##########
File path:
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala
##########
@@ -72,11 +65,72 @@ object PropagateEmptyRelation extends Rule[LogicalPlan]
with PredicateHelper wit
}
}
+ case p: UnaryNode if p.children.nonEmpty &&
p.children.forall(isEmptyLocalRelation) => p match {
+ case _: Project => empty(p)
+ case _: Filter => empty(p)
+ case _: Sample => empty(p)
+ case _ => p
+ }
+ }
+}
+
+/**
+ * The rule used by both normal Optimizer and AQE Optimizer for:
+ * 1. Binary-node Logical Plans
+ * - Join with one or two empty children (including Intersect/Except).
+ * - Join is single column NULL-aware anti join (NAAJ)
+ * Broadcasted [[HashedRelation]] is [[HashedRelationWithAllNullKeys]].
Eliminate join to an
+ * empty [[LocalRelation]].
+ * - Left semi Join
+ * Right side is non-empty and condition is empty. Eliminate join to its
left side.
+ * - Left anti join
+ * Right side is non-empty and condition is empty. Eliminate join to an
empty
+ * [[LocalRelation]].
+ * 2. Unary-node Logical Plans
+ * - Limit/Repartition with all empty children.
+ * - Aggregate with all empty children and at least one grouping
expression.
+ * - Generate(Explode) with all empty children. Others like Hive UDTF may
return results.
+ *
+ * @param checkRowCount At AQE side, we use the query stage stats to check the
check.
+ * @param isRelationWithAllNullKeys At AQE side, we use the broadcast query
stage to do the check.
+ */
+case class PropagateEmptyRelationAdvanced(
+ checkRowCount: Option[(LogicalPlan, Boolean) => Boolean] = None,
+ isRelationWithAllNullKeys: Option[LogicalPlan => Boolean] = None)
+ extends Rule[LogicalPlan] with CastSupport {
+
+ private def isEmptyLocalRelation(plan: LogicalPlan): Boolean = {
+ val defaultEmptyRelation: Boolean = plan match {
+ case p: LocalRelation => p.data.isEmpty
+ case _ => false
+ }
+
+ if (checkRowCount.isDefined) {
+ checkRowCount.get.apply(plan, false) || defaultEmptyRelation
+ } else {
+ defaultEmptyRelation
+ }
+ }
+
+ private def empty(plan: LogicalPlan) =
+ LocalRelation(plan.output, data = Seq.empty, isStreaming =
plan.isStreaming)
+
+ // Construct a project list from plan's output, while the value is always
NULL.
+ private def nullValueProjectList(plan: LogicalPlan): Seq[NamedExpression] =
+ plan.output.map{ a => Alias(cast(Literal(null), a.dataType),
a.name)(a.exprId) }
+
+ // We can not use transformUpWithPruning here since this rule is used by
both normal Optimizer
+ // and AQE Optimizer. And this may only effective at AQE side.
Review comment:
ah good point. I think there is a way to overcome it:
1. Create an abstract class `PropagateEmptyRelationBase` that contains util
functions and optimizes expensive operators such as join, aggregate, etc.
2. Create a rule `PropagateEmptyRelation extends PropagateEmptyRelationBase`
that additionally optimzes project, filter, etc.
3. Create a rule `AQEPropagateEmptyRelation extends
PropagateEmptyRelationBase` that overrides some util functions like
`isEmptyPlan`.
Then this two rules can define their transformation prunning separatedly.
##########
File path:
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala
##########
@@ -72,11 +65,72 @@ object PropagateEmptyRelation extends Rule[LogicalPlan]
with PredicateHelper wit
}
}
+ case p: UnaryNode if p.children.nonEmpty &&
p.children.forall(isEmptyLocalRelation) => p match {
+ case _: Project => empty(p)
+ case _: Filter => empty(p)
+ case _: Sample => empty(p)
+ case _ => p
+ }
+ }
+}
+
+/**
+ * The rule used by both normal Optimizer and AQE Optimizer for:
+ * 1. Binary-node Logical Plans
+ * - Join with one or two empty children (including Intersect/Except).
+ * - Join is single column NULL-aware anti join (NAAJ)
+ * Broadcasted [[HashedRelation]] is [[HashedRelationWithAllNullKeys]].
Eliminate join to an
+ * empty [[LocalRelation]].
+ * - Left semi Join
+ * Right side is non-empty and condition is empty. Eliminate join to its
left side.
+ * - Left anti join
+ * Right side is non-empty and condition is empty. Eliminate join to an
empty
+ * [[LocalRelation]].
+ * 2. Unary-node Logical Plans
+ * - Limit/Repartition with all empty children.
+ * - Aggregate with all empty children and at least one grouping
expression.
+ * - Generate(Explode) with all empty children. Others like Hive UDTF may
return results.
+ *
+ * @param checkRowCount At AQE side, we use the query stage stats to check the
check.
+ * @param isRelationWithAllNullKeys At AQE side, we use the broadcast query
stage to do the check.
+ */
+case class PropagateEmptyRelationAdvanced(
+ checkRowCount: Option[(LogicalPlan, Boolean) => Boolean] = None,
+ isRelationWithAllNullKeys: Option[LogicalPlan => Boolean] = None)
+ extends Rule[LogicalPlan] with CastSupport {
+
+ private def isEmptyLocalRelation(plan: LogicalPlan): Boolean = {
+ val defaultEmptyRelation: Boolean = plan match {
+ case p: LocalRelation => p.data.isEmpty
+ case _ => false
+ }
+
+ if (checkRowCount.isDefined) {
+ checkRowCount.get.apply(plan, false) || defaultEmptyRelation
+ } else {
+ defaultEmptyRelation
+ }
+ }
+
+ private def empty(plan: LogicalPlan) =
+ LocalRelation(plan.output, data = Seq.empty, isStreaming =
plan.isStreaming)
+
+ // Construct a project list from plan's output, while the value is always
NULL.
+ private def nullValueProjectList(plan: LogicalPlan): Seq[NamedExpression] =
+ plan.output.map{ a => Alias(cast(Literal(null), a.dataType),
a.name)(a.exprId) }
+
+ // We can not use transformUpWithPruning here since this rule is used by
both normal Optimizer
+ // and AQE Optimizer. And this may only effective at AQE side.
Review comment:
ah good point. I think there is a way to overcome it:
1. Create an abstract class `PropagateEmptyRelationBase` that contains util
functions and optimizes expensive operators such as join, aggregate, etc.
2. Create a rule `PropagateEmptyRelation extends PropagateEmptyRelationBase`
that additionally optimzes project, filter, etc.
3. Create a rule `AQEPropagateEmptyRelation extends
PropagateEmptyRelationBase` that overrides some util functions like
`isEmptyPlan`.
Then these two rules can define their transformation prunning separatedly.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]