ulysses-you commented on a change in pull request #32602:
URL: https://github.com/apache/spark/pull/32602#discussion_r637891208



##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala
##########
@@ -26,59 +26,48 @@ import org.apache.spark.sql.catalyst.rules._
 import org.apache.spark.sql.catalyst.trees.TreePattern.{LOCAL_RELATION, 
TRUE_OR_FALSE_LITERAL}
 
 /**
- * Collapse plans consisting empty local relations generated by 
[[PruneFilters]].
- * 1. Binary(or Higher)-node Logical Plans
- *    - Union with all empty children.
- *    - Join with one or two empty children (including Intersect/Except).
- * 2. Unary-node Logical Plans
- *    - Project/Filter/Sample/Join/Limit/Repartition with all empty children.
- *    - Join with false condition.
- *    - Aggregate with all empty children and at least one grouping expression.
- *    - Generate(Explode) with all empty children. Others like Hive UDTF may 
return results.
+ * The rule used by both normal Optimizer and AQE Optimizer for:
+ *  1. Binary-node Logical Plans
+ *     - Join with one or two empty children (including Intersect/Except).
+ *     - Join is single column NULL-aware anti join (NAAJ)
+ *       Broadcasted [[HashedRelation]] is [[HashedRelationWithAllNullKeys]]. 
Eliminate join to an
+ *       empty [[LocalRelation]].
+ *     - Left semi Join
+ *       Right side is non-empty and condition is empty. Eliminate join to its 
left side.
+ *     - Left anti join
+ *       Right side is non-empty and condition is empty. Eliminate join to an 
empty
+ *       [[LocalRelation]].
+ *  2. Unary-node Logical Plans
+ *     - Limit/Repartition with all empty children.
+ *     - Aggregate with all empty children and at least one grouping 
expression.
+ *     - Generate(Explode) with all empty children. Others like Hive UDTF may 
return results.
  */
-object PropagateEmptyRelation extends Rule[LogicalPlan] with PredicateHelper 
with CastSupport {
-  private def isEmptyLocalRelation(plan: LogicalPlan): Boolean = plan match {
+abstract class PropagateEmptyRelationBase extends Rule[LogicalPlan] with 
CastSupport {
+  protected def isEmpty(plan: LogicalPlan): Boolean = plan match {
     case p: LocalRelation => p.data.isEmpty
     case _ => false
   }
 
-  private def empty(plan: LogicalPlan) =
+  protected def nonEmpty(plan: LogicalPlan): Boolean = plan match {
+    case p: LocalRelation => p.data.nonEmpty
+    case _ => false
+  }
+
+  protected def empty(plan: LogicalPlan): LocalRelation =
     LocalRelation(plan.output, data = Seq.empty, isStreaming = 
plan.isStreaming)
 
   // Construct a project list from plan's output, while the value is always 
NULL.
   private def nullValueProjectList(plan: LogicalPlan): Seq[NamedExpression] =
     plan.output.map{ a => Alias(cast(Literal(null), a.dataType), 
a.name)(a.exprId) }
 
-  def apply(plan: LogicalPlan): LogicalPlan = plan.transformUpWithPruning(
-    _.containsAnyPattern(LOCAL_RELATION, TRUE_OR_FALSE_LITERAL), ruleId) {
-    case p: Union if p.children.exists(isEmptyLocalRelation) =>
-      val newChildren = p.children.filterNot(isEmptyLocalRelation)
-      if (newChildren.isEmpty) {
-        empty(p)
-      } else {
-        val newPlan = if (newChildren.size > 1) Union(newChildren) else 
newChildren.head
-        val outputs = newPlan.output.zip(p.output)
-        // the original Union may produce different output attributes than the 
new one so we alias
-        // them if needed
-        if (outputs.forall { case (newAttr, oldAttr) => newAttr.exprId == 
oldAttr.exprId }) {
-          newPlan
-        } else {
-          val outputAliases = outputs.map { case (newAttr, oldAttr) =>
-            val newExplicitMetadata =
-              if (oldAttr.metadata != newAttr.metadata) Some(oldAttr.metadata) 
else None
-            Alias(newAttr, oldAttr.name)(oldAttr.exprId, explicitMetadata = 
newExplicitMetadata)
-          }
-          Project(outputAliases, newPlan)
-        }
-      }
-
+  protected def commonApplyFunc: PartialFunction[LogicalPlan, LogicalPlan] = {
     // Joins on empty LocalRelations generated from streaming sources are not 
eliminated
     // as stateful streaming joins need to perform other state management 
operations other than
     // just processing the input data.
     case p @ Join(_, _, joinType, conditionOpt, _)
-        if !p.children.exists(_.isStreaming) =>

Review comment:
       reverted




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to