sunchao commented on a change in pull request #32875:
URL: https://github.com/apache/spark/pull/32875#discussion_r753389528
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala
##########
@@ -70,13 +70,63 @@ case class EnsureRequirements(
val childrenIndexes = requiredChildDistributions.zipWithIndex.filter {
case (UnspecifiedDistribution, _) => false
case (_: BroadcastDistribution, _) => false
+ case (AllTuples, _) => false
case _ => true
}.map(_._2)
- val childrenNumPartitions =
- childrenIndexes.map(children(_).outputPartitioning.numPartitions).toSet
+ // If there are more than one children, we'll need to check partitioning &
distribution of them
+ // and see if extra shuffles are necessary.
+ if (childrenIndexes.length > 1) {
+ childrenIndexes.map(requiredChildDistributions(_)).foreach { d =>
+ if (!d.isInstanceOf[ClusteredDistribution]) {
+ throw new IllegalStateException(s"Expected ClusteredDistribution but
found " +
+ s"${d.getClass.getSimpleName}")
+ }
+ }
+ val specs = childrenIndexes.map(i =>
+ i -> children(i).outputPartitioning.createShuffleSpec(
+
requiredChildDistributions(i).requiredNumPartitions.getOrElse(conf.numShufflePartitions),
+ requiredChildDistributions(i).asInstanceOf[ClusteredDistribution])
+ ).toMap
+
+ // Find out the shuffle spec that gives better parallelism.
+ //
+ // NOTE: this is not optimal for the case when there are more than 2
children. Consider:
+ // (10, 10, 11)
+ // it's better to pick 10 in this case since we only need to shuffle one
side - we'd need to
+ // shuffle two sides if we pick 11.
+ //
+ // However this should be sufficient for now since in Spark nodes with
multiple children
+ // always have exactly 2 children.
+ //
+ // Also when choosing the spec, we should consider those children with
no `Exchange` node
+ // first. For instance, if we have:
+ // A: (No_Exchange, 100) <---> B: (Exchange, 120)
+ // it's better to pick A and change B to (Exchange, 100) instead of
picking B and insert a
+ // new shuffle for A.
Review comment:
Thanks for the detailed description. Yea I tried a similar to the one
you mentioned above, but the implementation was somehow a bit more complex than
I thought and thus I went with this simpler approach which is tailored for the
case of exactly two children.
Let me try it again and see if it can be simple, general, and easy to
understand.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]