ekoifman commented on a change in pull request #31653:
URL: https://github.com/apache/spark/pull/31653#discussion_r604308499



##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala
##########
@@ -251,48 +253,129 @@ object OptimizeSkewedJoin extends 
CustomShuffleReaderRule {
       }
   }
 
+  /**
+   * A potential stage is from Exchange down.  Actual [[QueryStageExec]] nodes 
are created
+   * by [[AdaptiveSparkPlanExec.newQueryStage]] bounded by previously created 
[[QueryStageExec]]
+   * nodes below.
+   * Todo: need better way to identify which join the log msgs below refer to. 
 Tags?
+   */
+  private def handlePotentialQueryStage(plan: SparkPlan): SparkPlan = {
+    val shuffleStages = collectShuffleStages(plan)
+    val s = ExplainUtils.getAQELogPrefix(shuffleStages)
+
+    if (shuffleStages.length != 2 && !conf.adaptiveForceIfShuffle) {
+      /* Consider Case II.  Shuffle above SMJ1.  We should see 3 SQSE nodes but
+       with adaptiveForceIfShuffle() we should be able to add a new shuffle
+       above SMJ2 to enable skew mitigation of SMJ2.  W/o ability to add a new
+       shuffle skew mitigation is still possible in some cases - to be handled 
later.
+
+       Add a test for this.
+       See test("skew in deeply nested join - test ShuffleAddedException") and
+       add a similar test with just 2 joins */
+      logInfo(s"OptimizeSkewedJoin: rule is not applied since" +
+        s" shuffleStages.length=${shuffleStages.length} != 2 and " +
+        s"${SQLConf.ADAPTIVE_FORCE_IF_SHUFFLE.key}=false; $s")
+      return plan
+    }
+    val numShufflesBefore = plan.collect {
+      case e: ShuffleExchangeExec => e
+    }.length
+    val mitigatedPlan = optimizeSkewJoin(plan)
+    if (mitigatedPlan eq plan) {
+      return plan
+    }
+    val executedPlan = ensureRequirements.apply(mitigatedPlan)
+    val numNewShuffles = executedPlan.collect {
+      case e: ShuffleExchangeExec => e
+    }.length - numShufflesBefore
+    if(numNewShuffles > 0) {
+      if (conf.adaptiveForceIfShuffle) {
+        logInfo(s"OptimizeSkewedJoin: rule is applied. " +
+          s"$numNewShuffles additional shuffles will be introduced; $s")
+        executedPlan // make sure to return plan with new shuffles
+      } else {
+        logInfo(s"OptimizeSkewedJoin: rule is not applied due" +
+          s" to $numNewShuffles additional shuffles will be introduced; $s")
+        plan
+      }
+    } else {
+      executedPlan
+    }
+  }
+
+  def collectShuffleStages(plan: SparkPlan): Seq[ShuffleQueryStageExec] = plan 
match {
+    case stage: ShuffleQueryStageExec => Seq(stage)
+    case _ => plan.children.flatMap(collectShuffleStages)
+  }
+  /**
+   * Now this runs as part of queryStagePreparationRules() which means it runs 
over the whole plan
+   * which may have any number of ExchangeExec nodes, i.e. multiple "query 
stages"

Review comment:
       1. with this PR, if you have 
   ```
   SMJ1
    |-...
    |-SMJ2
       |-...
       |-...
   ```
   in a single stage, `OprimizeSkewedJoin` has to pay attention to it because 
it may decide to mitigate skew in SMJ2 and insert a shuffle above it, breaking 
up the stage
   2. My comment in the code was referring to something different.  When 
`OprimizeSkewedJoin` runs as part of `queryStagePreparationRules`, it may run 
over a plan like this for example
   ```
   SMJ0
   |-Exch
    |-SMJ1
       |-...
       |-...
   |-Exch 
    |-SMJ2
       |-...
       |-...
   ```
   where all inputs to SMJ1/SMJ2 are materialized.  And it has to make a 
decision about each SMJ separately.  Previously (when `OprimizeSkewedJoin` was 
part of `queryStageOptimizerRules`, any given run of `OprimizeSkewedJoin` would 
either see SMJ1 or SMJ2 but not both)
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to