c21 commented on a change in pull request #29455:
URL: https://github.com/apache/spark/pull/29455#discussion_r473377778



##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala
##########
@@ -317,4 +318,41 @@ case class ShuffledHashJoinExec(
       v => s"$v = $thisPlan.buildHashedRelation(inputs[1]);", forceInline = 
true)
     HashedRelationInfo(relationTerm, keyIsUnique = false, isEmpty = false)
   }
+
+  /**
+   * Generates the code for anti join.
+   * Handles NULL-aware anti join (NAAJ) separately here.
+   */
+  protected override def codegenAnti(ctx: CodegenContext, input: 
Seq[ExprCode]): String = {
+    if (isNullAwareAntiJoin) {
+      val HashedRelationInfo(relationTerm, _, _) = prepareRelation(ctx)
+      val (keyEv, anyNull) = genStreamSideJoinKey(ctx, input)
+      val (matched, _, _) = getJoinCondition(ctx, input)
+      val numOutput = metricTerm(ctx, "numOutputRows")
+      val found = ctx.freshName("found")
+
+      // Skip code generation for EmtpyHashedRelation and 
EmptyHashedRelationWithAllNullKeys cases,
+      // since it is already handled by AQE rewrite.
+      s"""
+         |boolean $found = false;
+         |// generate join key for stream side
+         |${keyEv.code}
+         |if ($anyNull) {
+         |  $found = true;
+         |} else {
+         |  UnsafeRow $matched = 
(UnsafeRow)$relationTerm.getValue(${keyEv.value});

Review comment:
       `getValue()` is for retrieving of unique key only. how about changing 
this into `get()` to be consistent with existing non-codegen code path. Though 
there's no issue to retrieve a single value for key, when key is not unique. 
But it can become a bug later when people enforce some check inside 
`HashedRelation`.

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala
##########
@@ -317,4 +318,41 @@ case class ShuffledHashJoinExec(
       v => s"$v = $thisPlan.buildHashedRelation(inputs[1]);", forceInline = 
true)
     HashedRelationInfo(relationTerm, keyIsUnique = false, isEmpty = false)
   }
+
+  /**
+   * Generates the code for anti join.
+   * Handles NULL-aware anti join (NAAJ) separately here.
+   */
+  protected override def codegenAnti(ctx: CodegenContext, input: 
Seq[ExprCode]): String = {

Review comment:
       given we refactored non-code-gen `nullAwareAntiJoin` to `HashJoin`, I 
think we can also refactor code-gen `codegenNullAwareAntiJoin` from 
`BroadcastHashJoinExec` and `ShuffledHashJoinExec` to `HashJoin` to avoid code 
duplication.

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala
##########
@@ -277,6 +286,34 @@ trait HashJoin extends BaseJoinExec with CodegenSupport {
     }
   }
 
+  private def nullAwareAntiJoin(
+      streamIter: Iterator[InternalRow],
+      hashedRelation: HashedRelation): Iterator[InternalRow] = {
+    if (hashedRelation == EmptyHashedRelation) {
+      streamIter
+    } else if (hashedRelation == EmptyHashedRelationWithAllNullKeys) {
+      // This code branch should only be used in 
BroadcastHashJoinExec(Non-AQE).

Review comment:
       can we enforce this assumption better than comment? e.g. we can override 
`nullAwareAntiJoin` in `BroadcastHashJoinExec` with:
   
   ```
     protected override def nullAwareAntiJoin(
         streamIter: Iterator[InternalRow],
         hashedRelation: HashedRelation): Iterator[InternalRow] = {
       if (hashedRelation == EmptyHashedRelationWithAllNullKeys &&
           sqlContext.conf.adaptiveExecutionEnabled) {
         Iterator.empty
       } else {
         super.nullAwareAntiJoin(streamIter, hashedRelation)
       }
     }
   ```
   
   and we can make better assertion at `nullAwareAntiJoin ` in `HashJoin` here:
   
   ```
     protected def nullAwareAntiJoin(
         streamIter: Iterator[InternalRow],
         hashedRelation: HashedRelation): Iterator[InternalRow] = {
       if (hashedRelation == EmptyHashedRelation) {
         streamIter
       } else {
         require(hashedRelation != EmptyHashedRelationWithAllNullKeys)
         ...
       }
     }
   ```

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala
##########
@@ -277,6 +286,34 @@ trait HashJoin extends BaseJoinExec with CodegenSupport {
     }
   }
 
+  private def nullAwareAntiJoin(
+      streamIter: Iterator[InternalRow],
+      hashedRelation: HashedRelation): Iterator[InternalRow] = {
+    if (hashedRelation == EmptyHashedRelation) {
+      streamIter
+    } else if (hashedRelation == EmptyHashedRelationWithAllNullKeys) {
+      // This code branch should only be used in 
BroadcastHashJoinExec(Non-AQE).
+      // In BroadcastHashJoinExec(AQE) and ShuffledHashJoinExec(AQE),
+      // the JoinExec would be rewritten into a Empty LocalRelation.
+      Iterator.empty
+    } else {
+      val keyGenerator = UnsafeProjection.create(
+        BindReferences.bindReferences[Expression](
+          leftKeys,
+          AttributeSeq(left.output))
+      )

Review comment:
       why we can't reuse same method call for `streamSideKeyGenerator()`?

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala
##########
@@ -52,7 +54,15 @@ case class EnsureRequirements(conf: SQLConf) extends 
Rule[SparkPlan] {
       case (child, distribution) =>
         val numPartitions = distribution.requiredNumPartitions
           .getOrElse(conf.numShufflePartitions)
-        ShuffleExchangeExec(distribution.createPartitioning(numPartitions), 
child)
+
+        operator match {
+          // Ensure collectNullPartitionKeyMetrics is passed through via 
instance creation
+          case shj @ ShuffledHashJoinExec(_, _, LeftAnti, BuildRight, _, _, _, 
true)
+              if shj.right.fastEquals(child) =>
+            ShuffleExchangeExec(distribution.createPartitioning(numPartitions),
+              child, statSpecs = Seq(NullPartitionKeyStatisticsSpec))
+          case _ => 
ShuffleExchangeExec(distribution.createPartitioning(numPartitions), child)
+        }

Review comment:
       I feel this is not enough for passing `isNullAwareAntiJoin` information 
from `ShuffledHashJoinExec` to `ShuffleExchangeExec`. This only works when we 
need to insert a shuffle exchange at build side, but seems not cover when we 
don't need to insert a shuffle exchange (i.e. build side already partitioned on 
join keys).
   
   Examples:
   
   ```
   SELECT *
   FROM t1
   WHERE t1.key NOT IN (
     SELECT t2.key
     FROM t2
     DISTRIBUTE BY t2
   )
   ```
   
   or 
   
   ```
   SELECT *
   FROM t1
   WHERE t1.key NOT IN (
     SELECT t2.key
     FROM t2 // t2 is bucketed table on column key
   )
   ```
   
   I feel we probably need a separate physical plan rule after 
`EnsureRequirements` to pass the null-aware anti join information from shuffled 
hash join to its child shuffle exchange operator.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to