sunchao commented on a change in pull request #32875:
URL: https://github.com/apache/spark/pull/32875#discussion_r766032121
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala
##########
@@ -70,61 +70,98 @@ case class EnsureRequirements(
val childrenIndexes = requiredChildDistributions.zipWithIndex.filter {
case (UnspecifiedDistribution, _) => false
case (_: BroadcastDistribution, _) => false
+ case (AllTuples, _) => false
case _ => true
}.map(_._2)
- val childrenNumPartitions =
- childrenIndexes.map(children(_).outputPartitioning.numPartitions).toSet
-
- if (childrenNumPartitions.size > 1) {
- // Get the number of partitions which is explicitly required by the
distributions.
- val requiredNumPartitions = {
- val numPartitionsSet = childrenIndexes.flatMap {
- index => requiredChildDistributions(index).requiredNumPartitions
- }.toSet
- assert(numPartitionsSet.size <= 1,
- s"$requiredChildDistributions have incompatible requirements of the
number of partitions")
- numPartitionsSet.headOption
+ // If there are more than one children, we'll need to check partitioning &
distribution of them
+ // and see if extra shuffles are necessary.
+ if (childrenIndexes.length > 1) {
+ childrenIndexes.map(requiredChildDistributions(_)).foreach { d =>
+ if (!d.isInstanceOf[ClusteredDistribution]) {
+ throw new IllegalStateException(s"Expected ClusteredDistribution but
found " +
+ s"${d.getClass.getSimpleName}")
+ }
}
+ val specs = childrenIndexes.map(i =>
+ i -> children(i).outputPartitioning.createShuffleSpec(
+ requiredChildDistributions(i).asInstanceOf[ClusteredDistribution])
+ ).toMap
+
+ // Find out the shuffle spec that gives better parallelism.
+ //
+ // NOTE: this is not optimal for the case when there are more than 2
children. Consider:
+ // (10, 10, 11)
+ // it's better to pick 10 in this case since we only need to shuffle one
side - we'd need to
+ // shuffle two sides if we pick 11.
+ //
+ // However this should be sufficient for now since in Spark nodes with
multiple children
+ // always have exactly 2 children.
- // If there are non-shuffle children that satisfy the required
distribution, we have
- // some tradeoffs when picking the expected number of shuffle partitions:
- // 1. We should avoid shuffling these children.
- // 2. We should have a reasonable parallelism.
- val nonShuffleChildrenNumPartitions =
-
childrenIndexes.map(children).filterNot(_.isInstanceOf[ShuffleExchangeExec])
- .map(_.outputPartitioning.numPartitions)
- val expectedChildrenNumPartitions = if
(nonShuffleChildrenNumPartitions.nonEmpty) {
- if (nonShuffleChildrenNumPartitions.length == childrenIndexes.length) {
- // Here we pick the max number of partitions among these non-shuffle
children.
- nonShuffleChildrenNumPartitions.max
+ // Whether we should consider `spark.sql.shuffle.partitions` and ensure
enough parallelism
+ // during the shuffle. To achieve a good trade-off between parallelism
and shuffle cost, we
+ // only consider the minimum parallelism if:
+ // 1. Some child can't create partitioning, i.e., it needs to be
shuffled.
+ // 2. Some child already needs to be shuffled with
`ShuffleExchangeExec` being present.
+ // In either of the above cases, we'll apply
`spark.sql.shuffle.partitions` in case there
+ // is not enough parallelism.
+ //
+ // On the other hand, if we have:
+ // HashPartitioning(5) <-> HashPartitioning(6)
+ // while `spark.sql.shuffle.partitions` is 10, we'll only re-shuffle the
left side and make it
Review comment:
Updated the code. @cloud-fan please take another look. Thanks!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]