viirya commented on a change in pull request #29455: URL: https://github.com/apache/spark/pull/29455#discussion_r473280296
########## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala ########## @@ -235,8 +235,13 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] { .getOrElse(createJoinWithoutHint()) case j @ ExtractSingleColumnNullAwareAntiJoin(leftKeys, rightKeys) => Review comment: We should also update the doc of `spark.sql.optimizeNullAwareAntiJoin`. It is not only `BroadcastHashJoinExec` now. ########## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/EliminateNullAwareAntiJoin.scala ########## @@ -20,22 +20,38 @@ package org.apache.spark.sql.execution.adaptive import org.apache.spark.sql.catalyst.planning.ExtractSingleColumnNullAwareAntiJoin import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan} import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.exchange.{ExchangeStatisticsCollector, ShuffleExchangeExec} import org.apache.spark.sql.execution.joins.EmptyHashedRelationWithAllNullKeys /** - * This optimization rule detects and convert a NAAJ to an Empty LocalRelation - * when buildSide is EmptyHashedRelationWithAllNullKeys. + * This optimization rule try to Eliminate NAAJ by following patterns. + * 1. Convert a NAAJ to an Empty LocalRelation when buildSide is EmptyHashedRelationWithAllNullKeys. + * 2. Convert a NAAJ to left plan when buildSide is empty. */ object EliminateNullAwareAntiJoin extends Rule[LogicalPlan] { - private def canEliminate(plan: LogicalPlan): Boolean = plan match { + private def canConvertToEmptyLocalRelation(plan: LogicalPlan): Boolean = plan match { case LogicalQueryStage(_, stage: BroadcastQueryStageExec) if stage.resultOption.get().isDefined && stage.broadcast.relationFuture.get().value == EmptyHashedRelationWithAllNullKeys => true + case LogicalQueryStage(_, stage: ShuffleQueryStageExec) if stage.resultOption.get().isDefined => + stage.shuffle.asInstanceOf[ExchangeStatisticsCollector] + .getNullPartitionKeyNumRows + .exists(_ > 0L) + case _ => false + } + + private def canRemoveAntiJoin(plan: LogicalPlan): Boolean = plan match { + case LogicalQueryStage(_, stage: ShuffleQueryStageExec) if stage.resultOption.get().isDefined => + stage.shuffle.asInstanceOf[ExchangeStatisticsCollector] + .getOutputNumRows.contains(0L) case _ => false } def apply(plan: LogicalPlan): LogicalPlan = plan.transformDown { - case j @ ExtractSingleColumnNullAwareAntiJoin(_, _) if canEliminate(j.right) => + case j @ ExtractSingleColumnNullAwareAntiJoin(_, _) Review comment: This pattern can be disabled by the config `spark.sql.optimizeNullAwareAntiJoin`. However here we don't actually optimize it to null-aware `BroadcastHashJoinExec` or `ShuffleHashJoin`, but eliminate it. I think we don't need this `EliminateNullAwareAntiJoin` to be control by the config. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org