agrawaldevesh commented on a change in pull request #29455:
URL: https://github.com/apache/spark/pull/29455#discussion_r472579634



##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/EliminateNullAwareAntiJoin.scala
##########
@@ -20,22 +20,50 @@ package org.apache.spark.sql.execution.adaptive
 import 
org.apache.spark.sql.catalyst.planning.ExtractSingleColumnNullAwareAntiJoin
 import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
 import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec
 import org.apache.spark.sql.execution.joins.EmptyHashedRelationWithAllNullKeys
+import 
org.apache.spark.sql.execution.metric.SQLShuffleWriteMetricsReporter.SHUFFLE_RECORDS_WRITTEN
 
 /**
- * This optimization rule detects and convert a NAAJ to an Empty LocalRelation
- * when buildSide is EmptyHashedRelationWithAllNullKeys.
+ * This optimization rule try to Eliminate NAAJ by following patterns.
+ * 1. Convert a NAAJ to an Empty LocalRelation when buildSide is 
EmptyHashedRelationWithAllNullKeys.
+ * 2. Convert a NAAJ to left plan when buildSide is empty.
  */
 object EliminateNullAwareAntiJoin extends Rule[LogicalPlan] {
 
-  private def canEliminate(plan: LogicalPlan): Boolean = plan match {
+  private def canConvertToEmptyLocalRelation(plan: LogicalPlan): Boolean = 
plan match {
     case LogicalQueryStage(_, stage: BroadcastQueryStageExec) if 
stage.resultOption.get().isDefined
       && stage.broadcast.relationFuture.get().value == 
EmptyHashedRelationWithAllNullKeys => true
+    case LogicalQueryStage(_, stage: ShuffleQueryStageExec) if 
stage.resultOption.get().isDefined =>
+      // Equivalent to EmptyHashedRelationWithAllNullKeys
+      val nullPartitionKeyMetrics = 
stage.shuffle.asInstanceOf[ShuffleExchangeExec]
+        .metrics.get(ShuffleExchangeExec.NULL_PARTITION_KEY_RECORDS_WRITTEN)
+      if (nullPartitionKeyMetrics.isDefined && 
nullPartitionKeyMetrics.get.value > 0L) {
+        true
+      } else {
+        false
+      }
+    case _ => false
+  }
+
+  private def canConvertToLeftPlan(plan: LogicalPlan): Boolean = plan match {
+    case LogicalQueryStage(_, stage: ShuffleQueryStageExec) if 
stage.resultOption.get().isDefined =>
+      // Equivalent to EmptyHashedRelation

Review comment:
       same comment as above

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/EliminateNullAwareAntiJoin.scala
##########
@@ -20,22 +20,50 @@ package org.apache.spark.sql.execution.adaptive
 import 
org.apache.spark.sql.catalyst.planning.ExtractSingleColumnNullAwareAntiJoin
 import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
 import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec
 import org.apache.spark.sql.execution.joins.EmptyHashedRelationWithAllNullKeys
+import 
org.apache.spark.sql.execution.metric.SQLShuffleWriteMetricsReporter.SHUFFLE_RECORDS_WRITTEN
 
 /**
- * This optimization rule detects and convert a NAAJ to an Empty LocalRelation
- * when buildSide is EmptyHashedRelationWithAllNullKeys.
+ * This optimization rule try to Eliminate NAAJ by following patterns.
+ * 1. Convert a NAAJ to an Empty LocalRelation when buildSide is 
EmptyHashedRelationWithAllNullKeys.
+ * 2. Convert a NAAJ to left plan when buildSide is empty.
  */
 object EliminateNullAwareAntiJoin extends Rule[LogicalPlan] {
 
-  private def canEliminate(plan: LogicalPlan): Boolean = plan match {
+  private def canConvertToEmptyLocalRelation(plan: LogicalPlan): Boolean = 
plan match {
     case LogicalQueryStage(_, stage: BroadcastQueryStageExec) if 
stage.resultOption.get().isDefined
       && stage.broadcast.relationFuture.get().value == 
EmptyHashedRelationWithAllNullKeys => true
+    case LogicalQueryStage(_, stage: ShuffleQueryStageExec) if 
stage.resultOption.get().isDefined =>
+      // Equivalent to EmptyHashedRelationWithAllNullKeys
+      val nullPartitionKeyMetrics = 
stage.shuffle.asInstanceOf[ShuffleExchangeExec]
+        .metrics.get(ShuffleExchangeExec.NULL_PARTITION_KEY_RECORDS_WRITTEN)
+      if (nullPartitionKeyMetrics.isDefined && 
nullPartitionKeyMetrics.get.value > 0L) {

Review comment:
       nit : I think the if condition can be removed and perhaps changed to 
(return) `nullPartitionKeyMetrics.exists(_ > 0)` :-P 

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/EliminateNullAwareAntiJoin.scala
##########
@@ -20,22 +20,50 @@ package org.apache.spark.sql.execution.adaptive
 import 
org.apache.spark.sql.catalyst.planning.ExtractSingleColumnNullAwareAntiJoin
 import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
 import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec
 import org.apache.spark.sql.execution.joins.EmptyHashedRelationWithAllNullKeys
+import 
org.apache.spark.sql.execution.metric.SQLShuffleWriteMetricsReporter.SHUFFLE_RECORDS_WRITTEN
 
 /**
- * This optimization rule detects and convert a NAAJ to an Empty LocalRelation
- * when buildSide is EmptyHashedRelationWithAllNullKeys.
+ * This optimization rule try to Eliminate NAAJ by following patterns.
+ * 1. Convert a NAAJ to an Empty LocalRelation when buildSide is 
EmptyHashedRelationWithAllNullKeys.
+ * 2. Convert a NAAJ to left plan when buildSide is empty.
  */
 object EliminateNullAwareAntiJoin extends Rule[LogicalPlan] {
 
-  private def canEliminate(plan: LogicalPlan): Boolean = plan match {
+  private def canConvertToEmptyLocalRelation(plan: LogicalPlan): Boolean = 
plan match {
     case LogicalQueryStage(_, stage: BroadcastQueryStageExec) if 
stage.resultOption.get().isDefined
       && stage.broadcast.relationFuture.get().value == 
EmptyHashedRelationWithAllNullKeys => true
+    case LogicalQueryStage(_, stage: ShuffleQueryStageExec) if 
stage.resultOption.get().isDefined =>
+      // Equivalent to EmptyHashedRelationWithAllNullKeys

Review comment:
       nit: I am not sure if this comment is still very clear. It reflects the 
history of the code but not its current state. 
   

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala
##########
@@ -275,8 +282,17 @@ object ShuffleExchangeExec {
           position
         }
       case h: HashPartitioning =>
-        val projection = UnsafeProjection.create(h.partitionIdExpression :: 
Nil, outputAttributes)
-        row => projection(row).getInt(0)
+        val extraExprs = if (isNullAwareAntiJoin) h.expressions.head :: Nil 
else Nil

Review comment:
       Can we name extraExprs explicitly ? I am wondering if having this as an 
explicit different case branch (ie `case h : HashPartitioning if 
(isNullAwareAntiJoin)` would be in fact clearer ?

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala
##########
@@ -83,15 +83,18 @@ trait ShuffleExchangeLike extends Exchange {
 case class ShuffleExchangeExec(
     override val outputPartitioning: Partitioning,
     child: SparkPlan,
-    noUserSpecifiedNumPartition: Boolean = true) extends ShuffleExchangeLike {
+    noUserSpecifiedNumPartition: Boolean = true,
+    isNullAwareAntiJoin: Boolean = false) extends ShuffleExchangeLike {

Review comment:
       > sorry I think I should be talking with data in first place. I just 
checked and found in our case, less than 10% of join is using 
`BroadcastNestedLoopJoin` and amount of CPU is also single digit percentage. 
Our workload by no means is representative for industry, would like to hear 
more of based on data.
   
   First, some of the interest in improving single key NAAJ stems from 
TPCH-Q16: Benchmarketing. Without this optimization that one query can easily 
be a huge portion of the total TPCH runtime and skew the results.
   
   Having said that, one of the chief reasons to optimize NOT-IN is in terms of 
avoiding customer surprises. Often, people will trip on this query after an 
innocuous change. This is particularly true when customers are submitting 
queries via a UI tool (where you don't have control over the query generated by 
the tool) or when they innocently introduce this query via a Notebook. 
   
   It's not very interesting to optimize in the batch sense, because you have 
some flexibility to change the pipeline to use Not-Exists instead of Not-In 
(which is not planned as an NAAJ).

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/EliminateNullAwareAntiJoin.scala
##########
@@ -20,22 +20,50 @@ package org.apache.spark.sql.execution.adaptive
 import 
org.apache.spark.sql.catalyst.planning.ExtractSingleColumnNullAwareAntiJoin
 import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
 import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec
 import org.apache.spark.sql.execution.joins.EmptyHashedRelationWithAllNullKeys
+import 
org.apache.spark.sql.execution.metric.SQLShuffleWriteMetricsReporter.SHUFFLE_RECORDS_WRITTEN
 
 /**
- * This optimization rule detects and convert a NAAJ to an Empty LocalRelation
- * when buildSide is EmptyHashedRelationWithAllNullKeys.
+ * This optimization rule try to Eliminate NAAJ by following patterns.
+ * 1. Convert a NAAJ to an Empty LocalRelation when buildSide is 
EmptyHashedRelationWithAllNullKeys.
+ * 2. Convert a NAAJ to left plan when buildSide is empty.
  */
 object EliminateNullAwareAntiJoin extends Rule[LogicalPlan] {
 
-  private def canEliminate(plan: LogicalPlan): Boolean = plan match {
+  private def canConvertToEmptyLocalRelation(plan: LogicalPlan): Boolean = 
plan match {
     case LogicalQueryStage(_, stage: BroadcastQueryStageExec) if 
stage.resultOption.get().isDefined
       && stage.broadcast.relationFuture.get().value == 
EmptyHashedRelationWithAllNullKeys => true
+    case LogicalQueryStage(_, stage: ShuffleQueryStageExec) if 
stage.resultOption.get().isDefined =>
+      // Equivalent to EmptyHashedRelationWithAllNullKeys
+      val nullPartitionKeyMetrics = 
stage.shuffle.asInstanceOf[ShuffleExchangeExec]

Review comment:
       How accurate are metrics in spark ? Can they be double or under counted 
? 

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala
##########
@@ -52,7 +54,14 @@ case class EnsureRequirements(conf: SQLConf) extends 
Rule[SparkPlan] {
       case (child, distribution) =>
         val numPartitions = distribution.requiredNumPartitions
           .getOrElse(conf.numShufflePartitions)
-        ShuffleExchangeExec(distribution.createPartitioning(numPartitions), 
child)
+
+        operator match {
+          case shj @ ShuffledHashJoinExec(_, _, LeftAnti, BuildRight, _, _, _, 
true)

Review comment:
       A one line comment explaining this would be awesome.

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/EliminateNullAwareAntiJoin.scala
##########
@@ -20,22 +20,50 @@ package org.apache.spark.sql.execution.adaptive
 import 
org.apache.spark.sql.catalyst.planning.ExtractSingleColumnNullAwareAntiJoin
 import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
 import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec
 import org.apache.spark.sql.execution.joins.EmptyHashedRelationWithAllNullKeys
+import 
org.apache.spark.sql.execution.metric.SQLShuffleWriteMetricsReporter.SHUFFLE_RECORDS_WRITTEN
 
 /**
- * This optimization rule detects and convert a NAAJ to an Empty LocalRelation
- * when buildSide is EmptyHashedRelationWithAllNullKeys.
+ * This optimization rule try to Eliminate NAAJ by following patterns.
+ * 1. Convert a NAAJ to an Empty LocalRelation when buildSide is 
EmptyHashedRelationWithAllNullKeys.
+ * 2. Convert a NAAJ to left plan when buildSide is empty.
  */
 object EliminateNullAwareAntiJoin extends Rule[LogicalPlan] {
 
-  private def canEliminate(plan: LogicalPlan): Boolean = plan match {
+  private def canConvertToEmptyLocalRelation(plan: LogicalPlan): Boolean = 
plan match {
     case LogicalQueryStage(_, stage: BroadcastQueryStageExec) if 
stage.resultOption.get().isDefined
       && stage.broadcast.relationFuture.get().value == 
EmptyHashedRelationWithAllNullKeys => true
+    case LogicalQueryStage(_, stage: ShuffleQueryStageExec) if 
stage.resultOption.get().isDefined =>
+      // Equivalent to EmptyHashedRelationWithAllNullKeys
+      val nullPartitionKeyMetrics = 
stage.shuffle.asInstanceOf[ShuffleExchangeExec]
+        .metrics.get(ShuffleExchangeExec.NULL_PARTITION_KEY_RECORDS_WRITTEN)
+      if (nullPartitionKeyMetrics.isDefined && 
nullPartitionKeyMetrics.get.value > 0L) {
+        true
+      } else {
+        false
+      }
+    case _ => false
+  }
+
+  private def canConvertToLeftPlan(plan: LogicalPlan): Boolean = plan match {

Review comment:
       nit: I somehow expect to see a `canConvertToRightPlan` when I encounter 
a method called `canConvertToLeftPlan`. I am wondering if this can be renamed 
to `canRemoveAntiJoin` ? 

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala
##########
@@ -225,7 +231,8 @@ object ShuffleExchangeExec {
       outputAttributes: Seq[Attribute],
       newPartitioning: Partitioning,
       serializer: Serializer,
-      writeMetrics: Map[String, SQLMetric])
+      writeMetrics: Map[String, SQLMetric],
+      isNullAwareAntiJoin: Boolean = false)

Review comment:
       I am wondering if this should be renamed to something like 
`countRowsWithNullPartitionKey` ? In the context of an exchange, having a 
variable that is very join specific is a bit weird, because join is but one of 
the things that causes an exchange.

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala
##########
@@ -317,4 +318,41 @@ case class ShuffledHashJoinExec(
       v => s"$v = $thisPlan.buildHashedRelation(inputs[1]);", forceInline = 
true)
     HashedRelationInfo(relationTerm, keyIsUnique = false, isEmpty = false)
   }
+
+  /**
+   * Generates the code for anti join.
+   * Handles NULL-aware anti join (NAAJ) separately here.
+   */
+  protected override def codegenAnti(ctx: CodegenContext, input: 
Seq[ExprCode]): String = {
+    if (isNullAwareAntiJoin) {
+      val HashedRelationInfo(relationTerm, _, _) = prepareRelation(ctx)
+      val (keyEv, anyNull) = genStreamSideJoinKey(ctx, input)
+      val (matched, _, _) = getJoinCondition(ctx, input)
+      val numOutput = metricTerm(ctx, "numOutputRows")
+      val found = ctx.freshName("found")
+
+      // Skip code check on EmtpyHashedRelation and 
EmptyHashedRelationWithAllNullKeys,

Review comment:
       Did you mean skip code generation for EmtpyHashedRelation and 
EmptyHashedRelationWithAllNullKeys cases ?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to