sunchao commented on a change in pull request #32875:
URL: https://github.com/apache/spark/pull/32875#discussion_r763261221
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala
##########
@@ -70,61 +70,98 @@ case class EnsureRequirements(
val childrenIndexes = requiredChildDistributions.zipWithIndex.filter {
case (UnspecifiedDistribution, _) => false
case (_: BroadcastDistribution, _) => false
+ case (AllTuples, _) => false
case _ => true
}.map(_._2)
- val childrenNumPartitions =
- childrenIndexes.map(children(_).outputPartitioning.numPartitions).toSet
-
- if (childrenNumPartitions.size > 1) {
- // Get the number of partitions which is explicitly required by the
distributions.
- val requiredNumPartitions = {
- val numPartitionsSet = childrenIndexes.flatMap {
- index => requiredChildDistributions(index).requiredNumPartitions
- }.toSet
- assert(numPartitionsSet.size <= 1,
- s"$requiredChildDistributions have incompatible requirements of the
number of partitions")
- numPartitionsSet.headOption
+ // If there are more than one children, we'll need to check partitioning &
distribution of them
+ // and see if extra shuffles are necessary.
+ if (childrenIndexes.length > 1) {
+ childrenIndexes.map(requiredChildDistributions(_)).foreach { d =>
+ if (!d.isInstanceOf[ClusteredDistribution]) {
+ throw new IllegalStateException(s"Expected ClusteredDistribution but
found " +
+ s"${d.getClass.getSimpleName}")
+ }
}
+ val specs = childrenIndexes.map(i =>
+ i -> children(i).outputPartitioning.createShuffleSpec(
+ requiredChildDistributions(i).asInstanceOf[ClusteredDistribution])
+ ).toMap
Review comment:
👍 . It's a map since later on we need to lookup a given index for its
spec.
```scala
case ((child, dist), idx) =>
if (bestSpec.isDefined &&
bestSpec.get.isCompatibleWith(specs(idx))) {
child
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]