Github user nongli commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9453#discussion_r43979999
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala ---
    @@ -419,25 +419,48 @@ private[sql] case class 
EnsureRequirements(sqlContext: SQLContext) extends Rule[
           // TODO: this should be a cost based decision. For example, a big 
relation should probably
           // maintain its existing number of partitions and smaller partitions 
should be shuffled.
           // defaultPartitions is arbitrary.
    -      val numPartitions = children.head.outputPartitioning.numPartitions
    +      val maxChildrenNumPartitions = 
children.map(_.outputPartitioning.numPartitions).max
           val useExistingPartitioning = 
children.zip(requiredChildDistributions).forall {
             case (child, distribution) => {
               child.outputPartitioning.guarantees(
    -            createPartitioning(distribution, numPartitions))
    +            createPartitioning(distribution, maxChildrenNumPartitions))
             }
           }
     
           children = if (useExistingPartitioning) {
    +        // We do not need to shuffle any child's output.
             children
           } else {
    +        // We need to shuffle at least one child's output.
    +        // Now, we will determine the number of partitions that will be 
used by created
    +        // partitioning schemes.
    +        val numPartitions = {
    +          // Let's see if we need to shuffle all child's outputs when we 
use
    +          // maxChildrenNumPartitions.
    +          val shufflesAllChildren = 
children.zip(requiredChildDistributions).forall {
    +            case (child, distribution) => {
    +              !child.outputPartitioning.guarantees(
    +                createPartitioning(distribution, maxChildrenNumPartitions))
    +            }
    +          }
    +          // If we need to shuffle all children, we use 
defaultNumPreShufflePartitions as the
    +          // number of partitions. Otherwise, we use 
maxChildrenNumPartitions.
    +          if (shufflesAllChildren) defaultNumPreShufflePartitions else 
maxChildrenNumPartitions
    +        }
    +
             children.zip(requiredChildDistributions).map {
               case (child, distribution) => {
                 val targetPartitioning =
    -              createPartitioning(distribution, 
defaultNumPreShufflePartitions)
    +              createPartitioning(distribution, numPartitions)
                 if (child.outputPartitioning.guarantees(targetPartitioning)) {
                   child
                 } else {
    -              Exchange(targetPartitioning, child)
    +              child match {
    +                // If child is an exchange, we replace it with
    +                // a new one having targetPartitioning.
    +                case Exchange(_, c, _) => Exchange(targetPartitioning, c)
    +                case _ => Exchange(targetPartitioning, child)
    +              }
    --- End diff --
    
    Looks good to me. Can you update/remove the TODO on line 419?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to