Github user yhuai commented on a diff in the pull request:
https://github.com/apache/spark/pull/9453#discussion_r43959389
--- Diff:
sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala ---
@@ -419,25 +419,48 @@ private[sql] case class
EnsureRequirements(sqlContext: SQLContext) extends Rule[
// TODO: this should be a cost based decision. For example, a big
relation should probably
// maintain its existing number of partitions and smaller partitions
should be shuffled.
// defaultPartitions is arbitrary.
- val numPartitions = children.head.outputPartitioning.numPartitions
+ val maxChildrenNumPartitions =
children.map(_.outputPartitioning.numPartitions).max
val useExistingPartitioning =
children.zip(requiredChildDistributions).forall {
case (child, distribution) => {
child.outputPartitioning.guarantees(
- createPartitioning(distribution, numPartitions))
+ createPartitioning(distribution, maxChildrenNumPartitions))
}
}
children = if (useExistingPartitioning) {
+ // We do not need to shuffle any child's output.
children
} else {
+ // We need to shuffle at least one child's output.
+ // Now, we will determine the number of partitions that will be
used by created
+ // partitioning schemes.
+ val numPartitions = {
+ // Let's see if we need to shuffle all child's outputs when we
use
+ // maxChildrenNumPartitions.
+ val shufflesAllChildren =
children.zip(requiredChildDistributions).forall {
+ case (child, distribution) => {
+ !child.outputPartitioning.guarantees(
+ createPartitioning(distribution, maxChildrenNumPartitions))
+ }
+ }
+ // If we need to shuffle all children, we use
defaultNumPreShufflePartitions as the
+ // number of partitions. Otherwise, we use
maxChildrenNumPartitions.
+ if (shufflesAllChildren) defaultNumPreShufflePartitions else
maxChildrenNumPartitions
+ }
+
children.zip(requiredChildDistributions).map {
case (child, distribution) => {
val targetPartitioning =
- createPartitioning(distribution,
defaultNumPreShufflePartitions)
+ createPartitioning(distribution, numPartitions)
if (child.outputPartitioning.guarantees(targetPartitioning)) {
child
} else {
- Exchange(targetPartitioning, child)
+ child match {
+ // If child is an exchange, we replace it with
+ // a new one having targetPartitioning.
+ case Exchange(_, c, _) => Exchange(targetPartitioning, c)
+ case _ => Exchange(targetPartitioning, child)
+ }
--- End diff --
@nongli Can you take a look at here? If one side of the join is shuffled, I
am trying to avoid of shuffling that side.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]