singhpk234 commented on a change in pull request #34464:
URL: https://github.com/apache/spark/pull/34464#discussion_r740839355



##########
File path: 
sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
##########
@@ -651,6 +651,23 @@ class AdaptiveQueryExecSuite
       }
     }
   }
+  test("Allow changing outer join to broadcast join even if too many empty 
partitions" +
+    " on build plan") {
+    withSQLConf(
+      SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
+      SQLConf.NON_EMPTY_PARTITION_RATIO_FOR_BROADCAST_JOIN.key -> "0.5") {
+      // `testData` is small enough to be broadcast but has empty partition 
ratio over the config.
+      withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "80") {
+        val (plan, adaptivePlan) = runAdaptiveAndVerifyResult(
+          "SELECT * FROM (select * from testData where value = '1') td" +
+            " right outer join testData2 ON key = a")

Review comment:
       [optional] how about adding all outerJoinTypes here and making it more 
rigid
   can be done with minimal changes 
   ```
           Seq("right outer", "left outer", "full outer").foreach { joinType =>
             val (plan, adaptivePlan) = runAdaptiveAndVerifyResult(
               "SELECT * FROM (select * from testData where value = '1') td" +
                 s" $joinType join testData2 ON key = a")
             val smj = findTopLevelSortMergeJoin(plan)
             assert(smj.size == 1)
             val bhj = findTopLevelBroadcastHashJoin(adaptivePlan)
             assert(bhj.size == 1)
           }
   ```

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/DynamicJoinSelection.scala
##########
@@ -50,10 +51,16 @@ object DynamicJoinSelection extends Rule[LogicalPlan] {
       mapStats.bytesByPartitionId.forall(_ <= 
maxShuffledHashJoinLocalMapThreshold)
   }
 
-  private def selectJoinStrategy(plan: LogicalPlan): Option[JoinStrategyHint] 
= plan match {
+  private def selectJoinStrategy(plan: LogicalPlan,
+                                 joinType: JoinType): Option[JoinStrategyHint] 
= plan match {

Review comment:
       [style] indentation is a bit off 
   can ref this : [scala style 
guide](https://github.com/databricks/scala-style-guide#spacing-and-indentation)
   
   quoting the piece which suggests what should be done in this case : 
   
   **_For method declarations, use 4 space indentation for their parameters and 
put each in each line when the parameters don't fit in two lines. Return types 
can be either on the same line as the last parameter, or start a new line with 
2 space indent._**

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/DynamicJoinSelection.scala
##########
@@ -50,10 +51,16 @@ object DynamicJoinSelection extends Rule[LogicalPlan] {
       mapStats.bytesByPartitionId.forall(_ <= 
maxShuffledHashJoinLocalMapThreshold)
   }
 
-  private def selectJoinStrategy(plan: LogicalPlan): Option[JoinStrategyHint] 
= plan match {
+  private def selectJoinStrategy(plan: LogicalPlan,
+                                 joinType: JoinType): Option[JoinStrategyHint] 
= plan match {
     case LogicalQueryStage(_, stage: ShuffleQueryStageExec) if 
stage.isMaterialized
       && stage.mapStats.isDefined =>
-      val demoteBroadcastHash = 
shouldDemoteBroadcastHashJoin(stage.mapStats.get)
+      val demoteBroadcastHash = joinType match {
+        // doesn't make sense for outer joins since one side is preserved and 
join is not
+        // short circuited if the other side is empty
+        case Inner | LeftSemi | LeftAnti => shouldDemoteBroadcastHashJoin 
(stage.mapStats.get)

Review comment:
       We can also include Existence Joins as well.
   You can use LeftExistence pattern to match all LeftSemi / LeftAnti / 
Existence in one shot

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/DynamicJoinSelection.scala
##########
@@ -69,16 +76,16 @@ object DynamicJoinSelection extends Rule[LogicalPlan] {
   }
 
   def apply(plan: LogicalPlan): LogicalPlan = plan.transformDown {
-    case j @ ExtractEquiJoinKeys(_, _, _, _, _, left, right, hint) =>
+    case j@ExtractEquiJoinKeys(joinType, _, _, _, _, left, right, hint) =>

Review comment:
       [nit] `j @ Ex...` space after j got removed

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/DynamicJoinSelection.scala
##########
@@ -50,10 +51,16 @@ object DynamicJoinSelection extends Rule[LogicalPlan] {
       mapStats.bytesByPartitionId.forall(_ <= 
maxShuffledHashJoinLocalMapThreshold)
   }
 
-  private def selectJoinStrategy(plan: LogicalPlan): Option[JoinStrategyHint] 
= plan match {
+  private def selectJoinStrategy(plan: LogicalPlan,
+                                 joinType: JoinType): Option[JoinStrategyHint] 
= plan match {
     case LogicalQueryStage(_, stage: ShuffleQueryStageExec) if 
stage.isMaterialized
       && stage.mapStats.isDefined =>
-      val demoteBroadcastHash = 
shouldDemoteBroadcastHashJoin(stage.mapStats.get)
+      val demoteBroadcastHash = joinType match {
+        // doesn't make sense for outer joins since one side is preserved and 
join is not
+        // short circuited if the other side is empty
+        case Inner | LeftSemi | LeftAnti => shouldDemoteBroadcastHashJoin 
(stage.mapStats.get)

Review comment:
       Can we also include Existence Joins as well
    -  LeftExistence pattern to match all LeftSemi / LeftAnti / Existence in 
one shot




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to