carsonwang commented on a change in pull request #20303: [SPARK-23128][SQL] A 
new approach to do adaptive execution in Spark SQL
URL: https://github.com/apache/spark/pull/20303#discussion_r265885287
 
 

 ##########
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala
 ##########
 @@ -36,107 +35,12 @@ import org.apache.spark.sql.internal.SQLConf
  * the input partition ordering requirements are met.
  */
 case class EnsureRequirements(conf: SQLConf) extends Rule[SparkPlan] {
-  private def defaultNumPreShufflePartitions: Int = conf.numShufflePartitions
-
-  private def targetPostShuffleInputSize: Long = 
conf.targetPostShuffleInputSize
-
-  private def adaptiveExecutionEnabled: Boolean = conf.adaptiveExecutionEnabled
-
-  private def minNumPostShufflePartitions: Option[Int] = {
-    val minNumPostShufflePartitions = conf.minNumPostShufflePartitions
-    if (minNumPostShufflePartitions > 0) Some(minNumPostShufflePartitions) 
else None
-  }
-
-  /**
-   * Adds [[ExchangeCoordinator]] to [[ShuffleExchangeExec]]s if adaptive 
query execution is enabled
-   * and partitioning schemes of these [[ShuffleExchangeExec]]s support 
[[ExchangeCoordinator]].
-   */
-  private def withExchangeCoordinator(
-      children: Seq[SparkPlan],
-      requiredChildDistributions: Seq[Distribution]): Seq[SparkPlan] = {
-    val supportsCoordinator =
-      if (children.exists(_.isInstanceOf[ShuffleExchangeExec])) {
-        // Right now, ExchangeCoordinator only support HashPartitionings.
-        children.forall {
-          case e @ ShuffleExchangeExec(hash: HashPartitioning, _, _) => true
-          case child =>
-            child.outputPartitioning match {
-              case hash: HashPartitioning => true
-              case collection: PartitioningCollection =>
-                
collection.partitionings.forall(_.isInstanceOf[HashPartitioning])
-              case _ => false
-            }
-        }
-      } else {
-        // In this case, although we do not have Exchange operators, we may 
still need to
-        // shuffle data when we have more than one children because data 
generated by
-        // these children may not be partitioned in the same way.
-        // Please see the comment in withCoordinator for more details.
-        val supportsDistribution = requiredChildDistributions.forall { dist =>
-          dist.isInstanceOf[ClusteredDistribution] || 
dist.isInstanceOf[HashClusteredDistribution]
-        }
-        children.length > 1 && supportsDistribution
-      }
-
-    val withCoordinator =
-      if (adaptiveExecutionEnabled && supportsCoordinator) {
-        val coordinator =
-          new ExchangeCoordinator(
-            targetPostShuffleInputSize,
-            minNumPostShufflePartitions)
-        children.zip(requiredChildDistributions).map {
-          case (e: ShuffleExchangeExec, _) =>
-            // This child is an Exchange, we need to add the coordinator.
-            e.copy(coordinator = Some(coordinator))
-          case (child, distribution) =>
-            // If this child is not an Exchange, we need to add an Exchange 
for now.
-            // Ideally, we can try to avoid this Exchange. However, when we 
reach here,
-            // there are at least two children operators (because if there is 
a single child
-            // and we can avoid Exchange, supportsCoordinator will be false 
and we
-            // will not reach here.). Although we can make two children have 
the same number of
-            // post-shuffle partitions. Their numbers of pre-shuffle 
partitions may be different.
-            // For example, let's say we have the following plan
-            //         Join
-            //         /  \
-            //       Agg  Exchange
-            //       /      \
-            //    Exchange  t2
-            //      /
-            //     t1
-            // In this case, because a post-shuffle partition can include 
multiple pre-shuffle
-            // partitions, a HashPartitioning will not be strictly partitioned 
by the hashcodes
-            // after shuffle. So, even we can use the child Exchange operator 
of the Join to
-            // have a number of post-shuffle partitions that matches the 
number of partitions of
-            // Agg, we cannot say these two children are partitioned in the 
same way.
-            // Here is another case
-            //         Join
-            //         /  \
-            //       Agg1  Agg2
-            //       /      \
-            //   Exchange1  Exchange2
-            //       /       \
-            //      t1       t2
-            // In this case, two Aggs shuffle data with the same column of the 
join condition.
-            // After we use ExchangeCoordinator, these two Aggs may not be 
partitioned in the same
-            // way. Let's say that Agg1 and Agg2 both have 5 pre-shuffle 
partitions and 2
-            // post-shuffle partitions. It is possible that Agg1 fetches those 
pre-shuffle
-            // partitions by using a partitionStartIndices [0, 3]. However, 
Agg2 may fetch its
-            // pre-shuffle partitions by using another partitionStartIndices 
[0, 4].
-            // So, Agg1 and Agg2 are actually not co-partitioned.
-            //
-            // It will be great to introduce a new Partitioning to represent 
the post-shuffle
-            // partitions when one post-shuffle partition includes multiple 
pre-shuffle partitions.
-            val targetPartitioning = 
distribution.createPartitioning(defaultNumPreShufflePartitions)
-            assert(targetPartitioning.isInstanceOf[HashPartitioning])
-            ShuffleExchangeExec(targetPartitioning, child, Some(coordinator))
-        }
-      } else {
-        // If we do not need ExchangeCoordinator, the original children are 
returned.
-        children
-      }
-
-    withCoordinator
-  }
+  private def defaultNumPreShufflePartitions: Int =
 
 Review comment:
   It is a good point we should not break anything when AE is enabled by 
default at cluster level. Currently it is enabled only for test purpose. But it 
is possible we enable it by default in future. What about we set 
maxNumPostShufflePartitions to spark.sql.shuffle.partitions by default? We are 
currently using the maxNumPostShufflePartitions as an initial partition number. 
But in future, if we can find a better initial partition number between 
minNumPostShufflePartitions and maxNumPostShufflePartitions at runtime, 
maxNumPostShufflePartitions will be treated as a max limit.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to