c21 commented on a change in pull request #29455:
URL: https://github.com/apache/spark/pull/29455#discussion_r472503907



##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala
##########
@@ -83,15 +83,18 @@ trait ShuffleExchangeLike extends Exchange {
 case class ShuffleExchangeExec(
     override val outputPartitioning: Partitioning,
     child: SparkPlan,
-    noUserSpecifiedNumPartition: Boolean = true) extends ShuffleExchangeLike {
+    noUserSpecifiedNumPartition: Boolean = true,
+    isNullAwareAntiJoin: Boolean = false) extends ShuffleExchangeLike {

Review comment:
       Just personal opinion - I feel NULL-aware anti join is not very popular 
query patterns, is there a way to avoid adding parameter here for popular class 
`ShuffleExchangeExec`? It's kind of weird to read that shuffle exchange 
depending on a config for NULL-aware anti join.

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
##########
@@ -235,8 +235,13 @@ abstract class SparkStrategies extends 
QueryPlanner[SparkPlan] {
           .getOrElse(createJoinWithoutHint())
 
       case j @ ExtractSingleColumnNullAwareAntiJoin(leftKeys, rightKeys) =>
-        Seq(joins.BroadcastHashJoinExec(leftKeys, rightKeys, LeftAnti, 
BuildRight,
-          None, planLater(j.left), planLater(j.right), isNullAwareAntiJoin = 
true))
+        if (!canBroadcastBySize(j.right, conf) && 
conf.adaptiveExecutionEnabled) {

Review comment:
       Just a high level question - if we are worried about broadcast join OOM 
on driver side here, shouldn't we be also worried about shuffled hash join OOM 
on task side as well?

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala
##########
@@ -318,7 +355,9 @@ trait HashJoin extends BaseJoinExec with CodegenSupport {
         outerJoin(streamedIter, hashed)
       case LeftSemi =>
         semiJoin(streamedIter, hashed)
-      case LeftAnti =>
+      case LeftAnti if isNullAwareAntiJoin =>
+        nullAwareAntiJoin(streamedIter, hashed)
+      case LeftAnti if !isNullAwareAntiJoin =>

Review comment:
       nit: could this just be
   
   ```
   case LeftAnti =>
     if (isNullAwareAntiJoin) {
       nullAwareAntiJoin(streamedIter, hashed)
     } else {
       antiJoin(streamedIter, hashed)
     }
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to