cloud-fan commented on a change in pull request #32602:
URL: https://github.com/apache/spark/pull/32602#discussion_r637849407
##########
File path:
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala
##########
@@ -26,59 +26,48 @@ import org.apache.spark.sql.catalyst.rules._
import org.apache.spark.sql.catalyst.trees.TreePattern.{LOCAL_RELATION,
TRUE_OR_FALSE_LITERAL}
/**
- * Collapse plans consisting empty local relations generated by
[[PruneFilters]].
- * 1. Binary(or Higher)-node Logical Plans
- * - Union with all empty children.
- * - Join with one or two empty children (including Intersect/Except).
- * 2. Unary-node Logical Plans
- * - Project/Filter/Sample/Join/Limit/Repartition with all empty children.
- * - Join with false condition.
- * - Aggregate with all empty children and at least one grouping expression.
- * - Generate(Explode) with all empty children. Others like Hive UDTF may
return results.
+ * The rule used by both normal Optimizer and AQE Optimizer for:
+ * 1. Binary-node Logical Plans
+ * - Join with one or two empty children (including Intersect/Except).
+ * - Join is single column NULL-aware anti join (NAAJ)
+ * Broadcasted [[HashedRelation]] is [[HashedRelationWithAllNullKeys]].
Eliminate join to an
+ * empty [[LocalRelation]].
+ * - Left semi Join
+ * Right side is non-empty and condition is empty. Eliminate join to its
left side.
+ * - Left anti join
+ * Right side is non-empty and condition is empty. Eliminate join to an
empty
+ * [[LocalRelation]].
+ * 2. Unary-node Logical Plans
+ * - Limit/Repartition with all empty children.
+ * - Aggregate with all empty children and at least one grouping
expression.
+ * - Generate(Explode) with all empty children. Others like Hive UDTF may
return results.
*/
-object PropagateEmptyRelation extends Rule[LogicalPlan] with PredicateHelper
with CastSupport {
- private def isEmptyLocalRelation(plan: LogicalPlan): Boolean = plan match {
+abstract class PropagateEmptyRelationBase extends Rule[LogicalPlan] with
CastSupport {
+ protected def isEmpty(plan: LogicalPlan): Boolean = plan match {
case p: LocalRelation => p.data.isEmpty
case _ => false
}
- private def empty(plan: LogicalPlan) =
+ protected def nonEmpty(plan: LogicalPlan): Boolean = plan match {
+ case p: LocalRelation => p.data.nonEmpty
+ case _ => false
+ }
+
+ protected def empty(plan: LogicalPlan): LocalRelation =
LocalRelation(plan.output, data = Seq.empty, isStreaming =
plan.isStreaming)
// Construct a project list from plan's output, while the value is always
NULL.
private def nullValueProjectList(plan: LogicalPlan): Seq[NamedExpression] =
plan.output.map{ a => Alias(cast(Literal(null), a.dataType),
a.name)(a.exprId) }
- def apply(plan: LogicalPlan): LogicalPlan = plan.transformUpWithPruning(
- _.containsAnyPattern(LOCAL_RELATION, TRUE_OR_FALSE_LITERAL), ruleId) {
- case p: Union if p.children.exists(isEmptyLocalRelation) =>
- val newChildren = p.children.filterNot(isEmptyLocalRelation)
- if (newChildren.isEmpty) {
- empty(p)
- } else {
- val newPlan = if (newChildren.size > 1) Union(newChildren) else
newChildren.head
- val outputs = newPlan.output.zip(p.output)
- // the original Union may produce different output attributes than the
new one so we alias
- // them if needed
- if (outputs.forall { case (newAttr, oldAttr) => newAttr.exprId ==
oldAttr.exprId }) {
- newPlan
- } else {
- val outputAliases = outputs.map { case (newAttr, oldAttr) =>
- val newExplicitMetadata =
- if (oldAttr.metadata != newAttr.metadata) Some(oldAttr.metadata)
else None
- Alias(newAttr, oldAttr.name)(oldAttr.exprId, explicitMetadata =
newExplicitMetadata)
- }
- Project(outputAliases, newPlan)
- }
- }
-
+ protected def commonApplyFunc: PartialFunction[LogicalPlan, LogicalPlan] = {
// Joins on empty LocalRelations generated from streaming sources are not
eliminated
// as stateful streaming joins need to perform other state management
operations other than
// just processing the input data.
case p @ Join(_, _, joinType, conditionOpt, _)
- if !p.children.exists(_.isStreaming) =>
Review comment:
the previous indentation is correct.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]