Github user gatorsmile commented on a diff in the pull request:
https://github.com/apache/spark/pull/16933#discussion_r104543895
--- Diff:
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
---
@@ -562,27 +562,43 @@ object CollapseProject extends Rule[LogicalPlan] {
}
/**
- * Combines adjacent [[Repartition]] and [[RepartitionByExpression]]
operator combinations
- * by keeping only the one.
- * 1. For adjacent [[Repartition]]s, collapse into the last
[[Repartition]].
- * 2. For adjacent [[RepartitionByExpression]]s, collapse into the last
[[RepartitionByExpression]].
- * 3. For a combination of [[Repartition]] and
[[RepartitionByExpression]], collapse as a single
- * [[RepartitionByExpression]] with the expression and last number of
partition.
+ * Combines adjacent [[RepartitionOperation]] operators
*/
object CollapseRepartition extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
- // Case 1
- case Repartition(numPartitions, shuffle, Repartition(_, _, child)) =>
- Repartition(numPartitions, shuffle, child)
- // Case 2
- case RepartitionByExpression(exprs, RepartitionByExpression(_, child,
_), numPartitions) =>
- RepartitionByExpression(exprs, child, numPartitions)
- // Case 3
- case Repartition(numPartitions, _, r: RepartitionByExpression) =>
- r.copy(numPartitions = numPartitions)
- // Case 3
- case RepartitionByExpression(exprs, Repartition(_, _, child),
numPartitions) =>
- RepartitionByExpression(exprs, child, numPartitions)
+ // Case 1: When a Repartition has a child of Repartition or
RepartitionByExpression,
+ // we can collapse it with the child based on the type of shuffle and
the specified number
+ // of partitions.
+ case r @ Repartition(_, _, child: Repartition) =>
+ collapseRepartition(r, child)
+ case r @ Repartition(_, _, child: RepartitionByExpression) =>
+ collapseRepartition(r, child)
+ // Case 2: When a RepartitionByExpression has a child of Repartition
or RepartitionByExpression
+ // we can remove the child.
+ case r @ RepartitionByExpression(_, child: RepartitionByExpression, _)
=>
+ r.copy(child = child.child)
+ case r @ RepartitionByExpression(_, child: Repartition, _) =>
+ r.copy(child = child.child)
+ }
+
+ /**
+ * Collapses the [[Repartition]] with its child
[[RepartitionOperation]], if possible.
+ * - Case 1 the top [[Repartition]] does not enable shuffle (i.e.,
coalesce API):
+ * If the last numPartitions is bigger, returns the child node;
otherwise, keep unchanged.
+ * - Case 2 the top [[Repartition]] enables shuffle (i.e., repartition
API):
+ * returns the child node with the last numPartitions.
+ */
+ private def collapseRepartition(r: Repartition, child:
RepartitionOperation): LogicalPlan = {
+ (r.shuffle, child.shuffle) match {
+ case (false, true) => child match {
--- End diff --
Yes!
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]