Github user yhuai commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9404#discussion_r43708751
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala ---
    @@ -229,12 +230,33 @@ private[sql] case class 
EnsureRequirements(sqlContext: SQLContext) extends Rule[
         if (children.length > 1
             && requiredChildDistributions.toSet != Set(UnspecifiedDistribution)
             && 
!Partitioning.allCompatible(children.map(_.outputPartitioning))) {
    -      children = children.zip(requiredChildDistributions).map { case 
(child, distribution) =>
    -        val targetPartitioning = canonicalPartitioning(distribution)
    -        if (child.outputPartitioning.guarantees(targetPartitioning)) {
    -          child
    -        } else {
    -          Exchange(targetPartitioning, child)
    +
    +      // First check if the existing partitions of the children all match. 
This means they are
    +      // partitioned by the same partitioning into the same number of 
partitions. In that case,
    +      // don't try to make them match `defaultPartitions`, just use the 
existing partitioning.
    +      // TODO: this should be a cost based descision. For example, a big 
relation should probably
    +      // maintain its existing number of partitions and smaller partitions 
should be shuffled.
    +      // defaultPartitions is arbitrary.
    +      val numPartitions = children.head.outputPartitioning.numPartitions
    +      val useExistingPartitioning = 
children.zip(requiredChildDistributions).forall {
    +        case (child, distribution) => {
    +          child.outputPartitioning.guarantees(
    +            createPartitioning(distribution, numPartitions))
    +        }
    +      }
    --- End diff --
    
    Just a note at here. For a join, it is possible that the left table is 
range partitioned and the right table is hash-partitioned and they both 
satisfies the required distribution. However, `createPartitioning` will only 
create `HashPartitioning` for `ClusteredDistribution` and `guarantees` at here 
will fail if the output partitioning is `RangePartitioning`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to