Github user nongli commented on a diff in the pull request:
https://github.com/apache/spark/pull/9453#discussion_r43979999
--- Diff:
sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala ---
@@ -419,25 +419,48 @@ private[sql] case class
EnsureRequirements(sqlContext: SQLContext) extends Rule[
// TODO: this should be a cost based decision. For example, a big
relation should probably
// maintain its existing number of partitions and smaller partitions
should be shuffled.
// defaultPartitions is arbitrary.
- val numPartitions = children.head.outputPartitioning.numPartitions
+ val maxChildrenNumPartitions =
children.map(_.outputPartitioning.numPartitions).max
val useExistingPartitioning =
children.zip(requiredChildDistributions).forall {
case (child, distribution) => {
child.outputPartitioning.guarantees(
- createPartitioning(distribution, numPartitions))
+ createPartitioning(distribution, maxChildrenNumPartitions))
}
}
children = if (useExistingPartitioning) {
+ // We do not need to shuffle any child's output.
children
} else {
+ // We need to shuffle at least one child's output.
+ // Now, we will determine the number of partitions that will be
used by created
+ // partitioning schemes.
+ val numPartitions = {
+ // Let's see if we need to shuffle all child's outputs when we
use
+ // maxChildrenNumPartitions.
+ val shufflesAllChildren =
children.zip(requiredChildDistributions).forall {
+ case (child, distribution) => {
+ !child.outputPartitioning.guarantees(
+ createPartitioning(distribution, maxChildrenNumPartitions))
+ }
+ }
+ // If we need to shuffle all children, we use
defaultNumPreShufflePartitions as the
+ // number of partitions. Otherwise, we use
maxChildrenNumPartitions.
+ if (shufflesAllChildren) defaultNumPreShufflePartitions else
maxChildrenNumPartitions
+ }
+
children.zip(requiredChildDistributions).map {
case (child, distribution) => {
val targetPartitioning =
- createPartitioning(distribution,
defaultNumPreShufflePartitions)
+ createPartitioning(distribution, numPartitions)
if (child.outputPartitioning.guarantees(targetPartitioning)) {
child
} else {
- Exchange(targetPartitioning, child)
+ child match {
+ // If child is an exchange, we replace it with
+ // a new one having targetPartitioning.
+ case Exchange(_, c, _) => Exchange(targetPartitioning, c)
+ case _ => Exchange(targetPartitioning, child)
+ }
--- End diff --
Looks good to me. Can you update/remove the TODO on line 419?
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]