agrawaldevesh commented on a change in pull request #29455:
URL: https://github.com/apache/spark/pull/29455#discussion_r472772355



##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala
##########
@@ -83,15 +83,18 @@ trait ShuffleExchangeLike extends Exchange {
 case class ShuffleExchangeExec(
     override val outputPartitioning: Partitioning,
     child: SparkPlan,
-    noUserSpecifiedNumPartition: Boolean = true) extends ShuffleExchangeLike {
+    noUserSpecifiedNumPartition: Boolean = true,
+    isNullAwareAntiJoin: Boolean = false) extends ShuffleExchangeLike {

Review comment:
       @c21, Yeah @leanken proposed the name `collectNullPartitionKeyMetrics` 
below. I also made the similar observation below that we shouldn't have very 
join specific naming cropping up in shuffle code. 

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/EliminateNullAwareAntiJoin.scala
##########
@@ -20,22 +20,50 @@ package org.apache.spark.sql.execution.adaptive
 import 
org.apache.spark.sql.catalyst.planning.ExtractSingleColumnNullAwareAntiJoin
 import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
 import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec
 import org.apache.spark.sql.execution.joins.EmptyHashedRelationWithAllNullKeys
+import 
org.apache.spark.sql.execution.metric.SQLShuffleWriteMetricsReporter.SHUFFLE_RECORDS_WRITTEN
 
 /**
- * This optimization rule detects and convert a NAAJ to an Empty LocalRelation
- * when buildSide is EmptyHashedRelationWithAllNullKeys.
+ * This optimization rule try to Eliminate NAAJ by following patterns.
+ * 1. Convert a NAAJ to an Empty LocalRelation when buildSide is 
EmptyHashedRelationWithAllNullKeys.
+ * 2. Convert a NAAJ to left plan when buildSide is empty.
  */
 object EliminateNullAwareAntiJoin extends Rule[LogicalPlan] {
 
-  private def canEliminate(plan: LogicalPlan): Boolean = plan match {
+  private def canConvertToEmptyLocalRelation(plan: LogicalPlan): Boolean = 
plan match {
     case LogicalQueryStage(_, stage: BroadcastQueryStageExec) if 
stage.resultOption.get().isDefined
       && stage.broadcast.relationFuture.get().value == 
EmptyHashedRelationWithAllNullKeys => true
+    case LogicalQueryStage(_, stage: ShuffleQueryStageExec) if 
stage.resultOption.get().isDefined =>
+      // Equivalent to EmptyHashedRelationWithAllNullKeys
+      val nullPartitionKeyMetrics = 
stage.shuffle.asInstanceOf[ShuffleExchangeExec]
+        .metrics.get(ShuffleExchangeExec.NULL_PARTITION_KEY_RECORDS_WRITTEN)
+      if (nullPartitionKeyMetrics.isDefined && 
nullPartitionKeyMetrics.get.value > 0L) {
+        true
+      } else {
+        false
+      }
+    case _ => false
+  }
+
+  private def canConvertToLeftPlan(plan: LogicalPlan): Boolean = plan match {

Review comment:
       Sure. Lets leave the existing naming then. Thanks for checking !

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/EliminateNullAwareAntiJoin.scala
##########
@@ -20,22 +20,50 @@ package org.apache.spark.sql.execution.adaptive
 import 
org.apache.spark.sql.catalyst.planning.ExtractSingleColumnNullAwareAntiJoin
 import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
 import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec
 import org.apache.spark.sql.execution.joins.EmptyHashedRelationWithAllNullKeys
+import 
org.apache.spark.sql.execution.metric.SQLShuffleWriteMetricsReporter.SHUFFLE_RECORDS_WRITTEN
 
 /**
- * This optimization rule detects and convert a NAAJ to an Empty LocalRelation
- * when buildSide is EmptyHashedRelationWithAllNullKeys.
+ * This optimization rule try to Eliminate NAAJ by following patterns.
+ * 1. Convert a NAAJ to an Empty LocalRelation when buildSide is 
EmptyHashedRelationWithAllNullKeys.
+ * 2. Convert a NAAJ to left plan when buildSide is empty.
  */
 object EliminateNullAwareAntiJoin extends Rule[LogicalPlan] {
 
-  private def canEliminate(plan: LogicalPlan): Boolean = plan match {
+  private def canConvertToEmptyLocalRelation(plan: LogicalPlan): Boolean = 
plan match {
     case LogicalQueryStage(_, stage: BroadcastQueryStageExec) if 
stage.resultOption.get().isDefined
       && stage.broadcast.relationFuture.get().value == 
EmptyHashedRelationWithAllNullKeys => true
+    case LogicalQueryStage(_, stage: ShuffleQueryStageExec) if 
stage.resultOption.get().isDefined =>
+      // Equivalent to EmptyHashedRelationWithAllNullKeys
+      val nullPartitionKeyMetrics = 
stage.shuffle.asInstanceOf[ShuffleExchangeExec]

Review comment:
       Yeah the concern is that we get data corruption when say spark tasks 
fail etc and the counters are invalid. I don't think Counters in Spark are 
meant to be so reliable. 
   
   One approach we could use -- haven't thought too deeply about it -- is to 
run some Spark computation on the shuffle outputs. Its like Spark using itself 
(just like how Delta uses Spark for metadata computation). I think it should be 
doable given the machinery of Encoders etc but I haven't done it :-P 

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala
##########
@@ -225,7 +231,8 @@ object ShuffleExchangeExec {
       outputAttributes: Seq[Attribute],
       newPartitioning: Partitioning,
       serializer: Serializer,
-      writeMetrics: Map[String, SQLMetric])
+      writeMetrics: Map[String, SQLMetric],
+      isNullAwareAntiJoin: Boolean = false)

Review comment:
       Works for me ! 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to