cloud-fan commented on a change in pull request #27226: [SPARK-30524] [SQL] 
Disable OptimizeSkewedJoin rule when introducing additional shuffle
URL: https://github.com/apache/spark/pull/27226#discussion_r367227838
 
 

 ##########
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala
 ##########
 @@ -114,25 +120,56 @@ case class OptimizeSkewedJoin(conf: SQLConf) extends 
Rule[SparkPlan] {
     stage.shuffle.shuffleDependency.rdd.partitions.length
   }
 
-  def handleSkewJoin(plan: SparkPlan): SparkPlan = plan.transformUp {
-    case smj @ SortMergeJoinExec(leftKeys, rightKeys, joinType, condition,
-        s1 @ SortExec(_, _, left: ShuffleQueryStageExec, _),
-        s2 @ SortExec(_, _, right: ShuffleQueryStageExec, _))
-      if supportedJoinTypes.contains(joinType) =>
+  private def containShuffleQueryStage(plan : SparkPlan): (Boolean, 
ShuffleQueryStageExec) =
+    plan match {
+      case stage: ShuffleQueryStageExec => (true, stage)
+      case SortExec(_, _, s: ShuffleQueryStageExec, _) =>
+        (true, s)
+      case _ => (false, null)
+  }
+
+  private def reOptimizeChild(
+      skewedReader: SkewedPartitionReaderExec,
+      child: SparkPlan): SparkPlan = child match {
+    case sort @ SortExec(_, _, s: ShuffleQueryStageExec, _) =>
+      sort.copy(child = skewedReader)
+    case _ => child
+  }
+
+  private def getSizeInfo(medianSize: Long, maxSize: Long): String = {
+    s"median size: $medianSize, max size: ${maxSize}"
+  }
+
+  /*
+   * This method aim to optimize the skewed join with the following steps:
+   * 1. Check whether the shuffle partition is skewed based on the median size
+   *    and the skewed partition threshold in origin smj.
+   * 2. Assuming partition0 is skewed in left side, and it has 5 mappers 
(Map0, Map1...Map4).
+   *    And we will split the 5 Mappers into 3 mapper ranges [(Map0, Map1), 
(Map2, Map3), (Map4)]
+   *    based on the map size and the max split number.
+   * 3. Create the 3 smjs with separately reading the above mapper ranges and 
then join with
+   *    the Partition0 in right side.
+   * 4. Finally union the above 3 split smjs and the origin smj.
+   */
+  def optimizeSkewJoin(plan: SparkPlan): SparkPlan = plan.transformUp {
+    case smj @ SortMergeJoinExec(leftKeys, rightKeys, joinType, condition, 
leftPlan, rightPlan)
+      if (containShuffleQueryStage(leftPlan)._1 && 
containShuffleQueryStage(rightPlan)._1) &&
+        supportedJoinTypes.contains(joinType) =>
+      val left = containShuffleQueryStage(leftPlan)._2
+      val right = containShuffleQueryStage(rightPlan)._2
       val leftStats = getStatistics(left)
       val rightStats = getStatistics(right)
       val numPartitions = leftStats.bytesByPartitionId.length
 
       val leftMedSize = medianSize(leftStats)
       val rightMedSize = medianSize(rightStats)
-      val leftSizeInfo = s"median size: $leftMedSize, max size: 
${leftStats.bytesByPartitionId.max}"
-      val rightSizeInfo = s"median size: $rightMedSize," +
-        s" max size: ${rightStats.bytesByPartitionId.max}"
       logDebug(
         s"""
-          |Try to optimize skewed join.
-          |Left side partition size: $leftSizeInfo
-          |Right side partition size: $rightSizeInfo
+           |Try to optimize skewed join.
 
 Review comment:
   the previous indentation seems corrected.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to