Github user nongli commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9276#discussion_r43334997
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala ---
    @@ -202,12 +286,103 @@ private[sql] case class 
EnsureRequirements(sqlContext: SQLContext) extends Rule[
       private def canonicalPartitioning(requiredDistribution: Distribution): 
Partitioning = {
         requiredDistribution match {
           case AllTuples => SinglePartition
    -      case ClusteredDistribution(clustering) => 
HashPartitioning(clustering, numPartitions)
    -      case OrderedDistribution(ordering) => RangePartitioning(ordering, 
numPartitions)
    +      case ClusteredDistribution(clustering) =>
    +        HashPartitioning(clustering, numPreShufflePartitions)
    +      case OrderedDistribution(ordering) => RangePartitioning(ordering, 
numPreShufflePartitions)
           case dist => sys.error(s"Do not know how to satisfy distribution 
$dist")
         }
       }
     
    +  private def withExchangeCoordinator(
    +      children: Seq[SparkPlan],
    +      requiredChildDistributions: Seq[Distribution]): Seq[SparkPlan] = {
    +    val needsCoordinator =
    +      if (children.exists(_.isInstanceOf[Exchange])) {
    +        // Right now, ExchangeCoordinator only support HashPartitionings.
    +        children.forall {
    +          case e @ Exchange(hash: HashPartitioning, _, _) => true
    +          case child =>
    +            child.outputPartitioning match {
    +              case hash: HashPartitioning => true
    +              case collection: PartitioningCollection =>
    +                
collection.partitionings.exists(_.isInstanceOf[HashPartitioning])
    +              case _ => false
    +            }
    +        }
    +      } else {
    +        // In this case, although we do not have Exchange operators, we 
may still need to
    +        // shuffle data when we have more than one children because data 
generated by
    +        // these children may not be partitioned in the same way.
    +        // Please see the comment in withCoordinator for more details.
    +        val supportsDistribution =
    +          
requiredChildDistributions.forall(_.isInstanceOf[ClusteredDistribution])
    +        children.length > 1 && supportsDistribution
    +      }
    +
    +    val withCoordinator =
    +      if (adaptiveExecutionEnabled && needsCoordinator) {
    +        val coordinator =
    +          new ExchangeCoordinator(
    +            children.length,
    +            targetPostShuffleInputSize,
    +            minNumPostShufflePartitions)
    +        children.zip(requiredChildDistributions).map {
    +          case (e: Exchange, _) =>
    +            // This child is an Exchange, we need to add the coordinator.
    +            e.copy(coordinator = Some(coordinator))
    +          case (child, distribution) =>
    +            // If this child is not an Exchange, we need to add an 
Exchange for now.
    +            // Ideally, we can try to avoid this Exchange. However, when 
we reach here,
    +            // there are at least two children operators (because if there 
is a single child
    +            // and we can avoid Exchange, this method will not be called). 
Although we can
    +            // make two children have the same number of post-shuffle 
partitions. Their
    +            // numbers of pre-shuffle partitions may be different. For 
example, let's say
    +            // we have the following plan
    +            //         Join
    +            //         /  \
    +            //       Agg  Exchange
    +            //       /      \
    +            //    Exchange  t2
    +            //      /
    +            //     t1
    +            // In this case, because a post-shuffle partition can include 
multiple pre-shuffle
    +            // partitions, a HashPartitioning will not be strictly 
partitioned by the hashcodes
    +            // after shuffle. So, even we can use the child Exchange 
operator of the Join to
    +            // have a number of post-shuffle partitions that matches the 
number of partitions of
    +            // Agg, we cannot say these two children are partitioned in 
the same way.
    +            // Here is another case
    +            //         Join
    +            //         /  \
    +            //       Agg1  Agg2
    +            //       /      \
    +            //   Exchange1  Exchange2
    +            //       /       \
    +            //      t1       t2
    +            // In this case, two Aggs shuffle data with the same column of 
the join condition.
    +            // After we use ExchangeCoordinator, these two Aggs may not be 
partitioned in the same
    +            // way. Let's say that Agg1 and Agg2 both have 5 pre-shuffle 
partitions and 2
    +            // post-shuffle partitions. However, Agg1 fetches those 
pre-shuffle partitions by
    +            // using a partitionStartIndices [0, 3]. But, Agg1 fetches 
those pre-shuffle
    +            // partitions by using another partitionStartIndices [0, 4]. 
So, Agg1 and Agg2
    +            // are actually not partitioned in the same way. So, we need 
to add Exchanges at here.
    +            //
    +            // It will be great to introduce a new Partitioning to 
represent the post-shuffle
    +            // partitions when one post-shuffle partition includes 
multiple pre-shuffle partitions.
    +
    +            // Because originally we do not have an Exchange operator, we 
can just use this child
    +            // operator's outputPartitioning to shuffle data.
    +            val targetPartitioning = canonicalPartitioning(distribution)
    +            assert(targetPartitioning.isInstanceOf[HashPartitioning])
    +            Exchange(targetPartitioning, child, Some(coordinator))
    --- End diff --
    
    Are you sure this is a good idea? Adding exchanges seems very expensive.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to