ulysses-you commented on code in PR #41609:
URL: https://github.com/apache/spark/pull/41609#discussion_r1272916109


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeShuffleWithLocalRead.scala:
##########
@@ -139,7 +139,7 @@ object OptimizeShuffleWithLocalRead extends 
AQEShuffleReadRule {
   def canUseLocalShuffleRead(plan: SparkPlan): Boolean = plan match {
     case s: ShuffleQueryStageExec =>
       s.mapStats.isDefined && isSupported(s.shuffle)
-    case AQEShuffleReadExec(s: ShuffleQueryStageExec, _) =>
+    case a @ AQEShuffleReadExec(s: ShuffleQueryStageExec, _) if 
!a.hasSkewedPartition =>

Review Comment:
   It won't cause skew if enable local shuffle read, so it just prevents the 
optimization.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala:
##########
@@ -215,6 +237,18 @@ case class OptimizeSkewedJoin(ensureRequirements: 
EnsureRequirements)
         case (newLeft, newRight) =>
           shj.copy(left = newLeft, right = newRight, isSkewJoin = true)
       }.getOrElse(shj)
+
+    case bhj @ BroadcastHashJoinExec(_, _, _, BuildRight, _,
+        ShuffleStage(left: ShuffleQueryStageExec), _, _, false) =>

Review Comment:
   Add `LOCAL_SHUFFLE_READER_ENABLED` check here is better. The BHJ can only be 
skewed if local shuffle read is disabled. Then we do not need to touch rule 
`OptimizeShuffleWithLocalRead`.
   
   Please add some comments about why we add this config check.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala:
##########
@@ -215,6 +237,18 @@ case class OptimizeSkewedJoin(ensureRequirements: 
EnsureRequirements)
         case (newLeft, newRight) =>
           shj.copy(left = newLeft, right = newRight, isSkewJoin = true)
       }.getOrElse(shj)
+
+    case bhj @ BroadcastHashJoinExec(_, _, _, BuildRight, _,
+        ShuffleStage(left: ShuffleQueryStageExec), _, _, false) =>
+      tryOptimizeBroadcastHashJoinStreamedPlan(left).map {
+        case newLeft => bhj.copy(left = newLeft, isSkewJoin = true)
+      }.getOrElse(bhj)
+
+    case bhj @ BroadcastHashJoinExec(_, _, _, BuildLeft, _, _,
+        ShuffleStage(right: ShuffleQueryStageExec), _, false) =>

Review Comment:
   ditto



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to