Github user JoshRosen commented on a diff in the pull request:
https://github.com/apache/spark/pull/7988#discussion_r36479427
--- Diff:
sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala ---
@@ -197,66 +197,108 @@ case class Exchange(newPartitioning: Partitioning,
child: SparkPlan) extends Una
* of input data meets the
* [[org.apache.spark.sql.catalyst.plans.physical.Distribution
Distribution]] requirements for
* each operator by inserting [[Exchange]] Operators where required. Also
ensure that the
- * required input partition ordering requirements are met.
+ * input partition ordering requirements are met.
*/
private[sql] case class EnsureRequirements(sqlContext: SQLContext) extends
Rule[SparkPlan] {
// TODO: Determine the number of partitions.
- def numPartitions: Int = sqlContext.conf.numShufflePartitions
+ private def numPartitions: Int = sqlContext.conf.numShufflePartitions
- def apply(plan: SparkPlan): SparkPlan = plan.transformUp {
- case operator: SparkPlan =>
- // Adds Exchange or Sort operators as required
- def addOperatorsIfNecessary(
- partitioning: Partitioning,
- rowOrdering: Seq[SortOrder],
- child: SparkPlan): SparkPlan = {
-
- def addShuffleIfNecessary(child: SparkPlan): SparkPlan = {
- if (!child.outputPartitioning.guarantees(partitioning)) {
- Exchange(partitioning, child)
- } else {
- child
- }
- }
+ /**
+ * Given a required distribution, returns a partitioning that satisfies
that distribution.
+ */
+ private def canonicalPartitioning(requiredDistribution: Distribution):
Partitioning = {
+ requiredDistribution match {
+ case AllTuples => SinglePartition
+ case ClusteredDistribution(clustering) =>
HashPartitioning(clustering, numPartitions)
+ case OrderedDistribution(ordering) => RangePartitioning(ordering,
numPartitions)
+ case dist => sys.error(s"Do not know how to satisfy distribution
$dist")
+ }
+ }
- def addSortIfNecessary(child: SparkPlan): SparkPlan = {
+ /**
+ * Return true if all of the operator's children satisfy their output
distribution requirements.
+ */
+ private def childPartitioningsSatisfyDistributionRequirements(operator:
SparkPlan): Boolean = {
+ operator.children.zip(operator.requiredChildDistribution).forall {
+ case (child, distribution) =>
child.outputPartitioning.satisfies(distribution)
+ }
+ }
- if (rowOrdering.nonEmpty) {
- // If child.outputOrdering is [a, b] and rowOrdering is [a],
we do not need to sort.
- val minSize = Seq(rowOrdering.size,
child.outputOrdering.size).min
- if (minSize == 0 || rowOrdering.take(minSize) !=
child.outputOrdering.take(minSize)) {
-
sqlContext.planner.BasicOperators.getSortOperator(rowOrdering, global = false,
child)
- } else {
+ /**
+ * Given an operator, check whether the operator requires its children
to have compatible
+ * output partitionings and add Exchanges to fix any detected
incompatibilities.
+ */
+ private def ensureChildPartitioningsAreCompatible(operator: SparkPlan):
SparkPlan = {
+ if (operator.requiresChildPartitioningsToBeCompatible) {
--- End diff --
Hmm. What about `Union`?
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]