Github user yhuai commented on a diff in the pull request:
https://github.com/apache/spark/pull/9276#discussion_r43347860
--- Diff:
sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala ---
@@ -202,12 +286,103 @@ private[sql] case class
EnsureRequirements(sqlContext: SQLContext) extends Rule[
private def canonicalPartitioning(requiredDistribution: Distribution):
Partitioning = {
requiredDistribution match {
case AllTuples => SinglePartition
- case ClusteredDistribution(clustering) =>
HashPartitioning(clustering, numPartitions)
- case OrderedDistribution(ordering) => RangePartitioning(ordering,
numPartitions)
+ case ClusteredDistribution(clustering) =>
+ HashPartitioning(clustering, numPreShufflePartitions)
+ case OrderedDistribution(ordering) => RangePartitioning(ordering,
numPreShufflePartitions)
case dist => sys.error(s"Do not know how to satisfy distribution
$dist")
}
}
+ private def withExchangeCoordinator(
+ children: Seq[SparkPlan],
+ requiredChildDistributions: Seq[Distribution]): Seq[SparkPlan] = {
+ val needsCoordinator =
+ if (children.exists(_.isInstanceOf[Exchange])) {
+ // Right now, ExchangeCoordinator only support HashPartitionings.
+ children.forall {
+ case e @ Exchange(hash: HashPartitioning, _, _) => true
+ case child =>
+ child.outputPartitioning match {
+ case hash: HashPartitioning => true
+ case collection: PartitioningCollection =>
+
collection.partitionings.exists(_.isInstanceOf[HashPartitioning])
+ case _ => false
+ }
+ }
+ } else {
+ // In this case, although we do not have Exchange operators, we
may still need to
+ // shuffle data when we have more than one children because data
generated by
+ // these children may not be partitioned in the same way.
+ // Please see the comment in withCoordinator for more details.
+ val supportsDistribution =
+
requiredChildDistributions.forall(_.isInstanceOf[ClusteredDistribution])
+ children.length > 1 && supportsDistribution
+ }
+
+ val withCoordinator =
+ if (adaptiveExecutionEnabled && needsCoordinator) {
+ val coordinator =
+ new ExchangeCoordinator(
+ children.length,
+ targetPostShuffleInputSize,
+ minNumPostShufflePartitions)
+ children.zip(requiredChildDistributions).map {
+ case (e: Exchange, _) =>
+ // This child is an Exchange, we need to add the coordinator.
+ e.copy(coordinator = Some(coordinator))
+ case (child, distribution) =>
+ // If this child is not an Exchange, we need to add an
Exchange for now.
+ // Ideally, we can try to avoid this Exchange. However, when
we reach here,
+ // there are at least two children operators (because if there
is a single child
+ // and we can avoid Exchange, this method will not be called).
Although we can
+ // make two children have the same number of post-shuffle
partitions. Their
+ // numbers of pre-shuffle partitions may be different. For
example, let's say
+ // we have the following plan
+ // Join
+ // / \
+ // Agg Exchange
+ // / \
+ // Exchange t2
+ // /
+ // t1
+ // In this case, because a post-shuffle partition can include
multiple pre-shuffle
+ // partitions, a HashPartitioning will not be strictly
partitioned by the hashcodes
+ // after shuffle. So, even we can use the child Exchange
operator of the Join to
+ // have a number of post-shuffle partitions that matches the
number of partitions of
+ // Agg, we cannot say these two children are partitioned in
the same way.
+ // Here is another case
+ // Join
+ // / \
+ // Agg1 Agg2
+ // / \
+ // Exchange1 Exchange2
+ // / \
+ // t1 t2
+ // In this case, two Aggs shuffle data with the same column of
the join condition.
+ // After we use ExchangeCoordinator, these two Aggs may not be
partitioned in the same
+ // way. Let's say that Agg1 and Agg2 both have 5 pre-shuffle
partitions and 2
+ // post-shuffle partitions. However, Agg1 fetches those
pre-shuffle partitions by
+ // using a partitionStartIndices [0, 3]. But, Agg1 fetches
those pre-shuffle
+ // partitions by using another partitionStartIndices [0, 4].
So, Agg1 and Agg2
+ // are actually not partitioned in the same way. So, we need
to add Exchanges at here.
+ //
+ // It will be great to introduce a new Partitioning to
represent the post-shuffle
+ // partitions when one post-shuffle partition includes
multiple pre-shuffle partitions.
+
+ // Because originally we do not have an Exchange operator, we
can just use this child
+ // operator's outputPartitioning to shuffle data.
+ val targetPartitioning = canonicalPartitioning(distribution)
+ assert(targetPartitioning.isInstanceOf[HashPartitioning])
+ Exchange(targetPartitioning, child, Some(coordinator))
--- End diff --
It is not a good idea. It will shuffle data more times and make certain
queries with multiple joins inefficient. Right now, it is mainly to make sure
we can generate correct results. Once we can generate a DAG of query fragments
(based on https://github.com/apache/spark/pull/9039). I think this problem can
be addressed.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]