Github user yhuai commented on a diff in the pull request:

    https://github.com/apache/spark/pull/7988#discussion_r36479575
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala ---
    @@ -197,66 +197,108 @@ case class Exchange(newPartitioning: Partitioning, 
child: SparkPlan) extends Una
      * of input data meets the
      * [[org.apache.spark.sql.catalyst.plans.physical.Distribution 
Distribution]] requirements for
      * each operator by inserting [[Exchange]] Operators where required.  Also 
ensure that the
    - * required input partition ordering requirements are met.
    + * input partition ordering requirements are met.
      */
     private[sql] case class EnsureRequirements(sqlContext: SQLContext) extends 
Rule[SparkPlan] {
       // TODO: Determine the number of partitions.
    -  def numPartitions: Int = sqlContext.conf.numShufflePartitions
    +  private def numPartitions: Int = sqlContext.conf.numShufflePartitions
     
    -  def apply(plan: SparkPlan): SparkPlan = plan.transformUp {
    -    case operator: SparkPlan =>
    -      // Adds Exchange or Sort operators as required
    -      def addOperatorsIfNecessary(
    -          partitioning: Partitioning,
    -          rowOrdering: Seq[SortOrder],
    -          child: SparkPlan): SparkPlan = {
    -
    -        def addShuffleIfNecessary(child: SparkPlan): SparkPlan = {
    -          if (!child.outputPartitioning.guarantees(partitioning)) {
    -            Exchange(partitioning, child)
    -          } else {
    -            child
    -          }
    -        }
    +  /**
    +   * Given a required distribution, returns a partitioning that satisfies 
that distribution.
    +   */
    +  private def canonicalPartitioning(requiredDistribution: Distribution): 
Partitioning = {
    +    requiredDistribution match {
    +      case AllTuples => SinglePartition
    +      case ClusteredDistribution(clustering) => 
HashPartitioning(clustering, numPartitions)
    +      case OrderedDistribution(ordering) => RangePartitioning(ordering, 
numPartitions)
    +      case dist => sys.error(s"Do not know how to satisfy distribution 
$dist")
    +    }
    +  }
     
    -        def addSortIfNecessary(child: SparkPlan): SparkPlan = {
    +  /**
    +   * Return true if all of the operator's children satisfy their output 
distribution requirements.
    +   */
    +  private def childPartitioningsSatisfyDistributionRequirements(operator: 
SparkPlan): Boolean = {
    +    operator.children.zip(operator.requiredChildDistribution).forall {
    +      case (child, distribution) => 
child.outputPartitioning.satisfies(distribution)
    +    }
    +  }
     
    -          if (rowOrdering.nonEmpty) {
    -            // If child.outputOrdering is [a, b] and rowOrdering is [a], 
we do not need to sort.
    -            val minSize = Seq(rowOrdering.size, 
child.outputOrdering.size).min
    -            if (minSize == 0 || rowOrdering.take(minSize) != 
child.outputOrdering.take(minSize)) {
    -              
sqlContext.planner.BasicOperators.getSortOperator(rowOrdering, global = false, 
child)
    -            } else {
    +  /**
    +   * Given an operator, check whether the operator requires its children 
to have compatible
    +   * output partitionings and add Exchanges to fix any detected 
incompatibilities.
    +   */
    +  private def ensureChildPartitioningsAreCompatible(operator: SparkPlan): 
SparkPlan = {
    +    if (operator.requiresChildPartitioningsToBeCompatible) {
    --- End diff --
    
    The default return value of `requiredDistribution` is 
`Seq.fill(children.size)(UnspecifiedDistribution)`. Union and those broadcast 
joins should be fine.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to