cloud-fan commented on a change in pull request #29021:
URL: https://github.com/apache/spark/pull/29021#discussion_r459836104
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala
##########
@@ -157,96 +183,106 @@ case class OptimizeSkewedJoin(conf: SQLConf) extends
Rule[SparkPlan] {
* 3 tasks separately.
*/
def optimizeSkewJoin(plan: SparkPlan): SparkPlan = plan.transformUp {
- case smj @ SortMergeJoinExec(_, _, joinType, _,
- s1 @ SortExec(_, _, ShuffleStage(left: ShuffleStageInfo), _),
- s2 @ SortExec(_, _, ShuffleStage(right: ShuffleStageInfo), _), _)
+ case smj @ SortMergeJoinExec(_, _, joinType, _, s1: SortExec, s2:
SortExec, _)
if supportedJoinTypes.contains(joinType) =>
- assert(left.partitionsWithSizes.length ==
right.partitionsWithSizes.length)
- val numPartitions = left.partitionsWithSizes.length
- // We use the median size of the original shuffle partitions to detect
skewed partitions.
- val leftMedSize = medianSize(left.mapStats)
- val rightMedSize = medianSize(right.mapStats)
- logDebug(
- s"""
- |Optimizing skewed join.
- |Left side partitions size info:
- |${getSizeInfo(leftMedSize, left.mapStats.bytesByPartitionId)}
- |Right side partitions size info:
- |${getSizeInfo(rightMedSize, right.mapStats.bytesByPartitionId)}
- """.stripMargin)
- val canSplitLeft = canSplitLeftSide(joinType)
- val canSplitRight = canSplitRightSide(joinType)
- // We use the actual partition sizes (may be coalesced) to calculate
target size, so that
- // the final data distribution is even (coalesced partitions + split
partitions).
- val leftActualSizes = left.partitionsWithSizes.map(_._2)
- val rightActualSizes = right.partitionsWithSizes.map(_._2)
- val leftTargetSize = targetSize(leftActualSizes, leftMedSize)
- val rightTargetSize = targetSize(rightActualSizes, rightMedSize)
-
- val leftSidePartitions = mutable.ArrayBuffer.empty[ShufflePartitionSpec]
- val rightSidePartitions = mutable.ArrayBuffer.empty[ShufflePartitionSpec]
- var numSkewedLeft = 0
- var numSkewedRight = 0
- for (partitionIndex <- 0 until numPartitions) {
- val leftActualSize = leftActualSizes(partitionIndex)
- val isLeftSkew = isSkewed(leftActualSize, leftMedSize) && canSplitLeft
- val leftPartSpec = left.partitionsWithSizes(partitionIndex)._1
- val isLeftCoalesced = leftPartSpec.startReducerIndex + 1 <
leftPartSpec.endReducerIndex
-
- val rightActualSize = rightActualSizes(partitionIndex)
- val isRightSkew = isSkewed(rightActualSize, rightMedSize) &&
canSplitRight
- val rightPartSpec = right.partitionsWithSizes(partitionIndex)._1
- val isRightCoalesced = rightPartSpec.startReducerIndex + 1 <
rightPartSpec.endReducerIndex
-
- // A skewed partition should never be coalesced, but skip it here just
to be safe.
- val leftParts = if (isLeftSkew && !isLeftCoalesced) {
- val reducerId = leftPartSpec.startReducerIndex
- val skewSpecs = createSkewPartitionSpecs(
- left.shuffleStage.shuffle.shuffleDependency.shuffleId, reducerId,
leftTargetSize)
- if (skewSpecs.isDefined) {
- logDebug(s"Left side partition $partitionIndex " +
- s"(${FileUtils.byteCountToDisplaySize(leftActualSize)}) is
skewed, " +
- s"split it into ${skewSpecs.get.length} parts.")
- numSkewedLeft += 1
+ // find the shuffleStage from the plan tree
+ val leftOpt = findShuffleStage(s1)
+ val rightOpt = findShuffleStage(s2)
+ if (leftOpt.isEmpty || rightOpt.isEmpty) {
+ smj
+ } else {
+ val left = leftOpt.get
+ val right = rightOpt.get
+ assert(left.partitionsWithSizes.length ==
right.partitionsWithSizes.length)
+ val numPartitions = left.partitionsWithSizes.length
+ // We use the median size of the original shuffle partitions to detect
skewed partitions.
Review comment:
This PR is very hard to reason about. We need to clearly define:
1. what nodes can appear between the shuffle stage and SMJ. As we discussed
before, Agg can't appear at the skew side.
2. how to estimate the size? Since there are nodes in the middle, the stats
of the shuffle stage may not be accurate for the final join child. (e.g. Filter
in the middle)
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]