ulysses-you commented on code in PR #37537:
URL: https://github.com/apache/spark/pull/37537#discussion_r947485988


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceCTERefWithRepartition.scala:
##########
@@ -41,19 +41,25 @@ object ReplaceCTERefWithRepartition extends 
Rule[LogicalPlan] {
       replaceWithRepartition(plan, mutable.HashMap.empty[Long, LogicalPlan])
   }
 
+  private def canSkipExtraRepartition(p: LogicalPlan): Boolean = p match {
+    case _: RepartitionOperation => true
+    case _: RebalancePartitions => true
+    case _ => false
+  }
+
   private def replaceWithRepartition(
       plan: LogicalPlan,
       cteMap: mutable.HashMap[Long, LogicalPlan]): LogicalPlan = plan match {
     case WithCTE(child, cteDefs) =>
       cteDefs.foreach { cteDef =>
         val inlined = replaceWithRepartition(cteDef.child, cteMap)
         val withRepartition =
-          if (inlined.isInstanceOf[RepartitionOperation] || 
cteDef.underSubquery) {
+          if (canSkipExtraRepartition(inlined) || cteDef.underSubquery) {
             // If the CTE definition plan itself is a repartition operation or 
if it hosts a merged
             // scalar subquery, we do not need to add an extra repartition 
shuffle.
             inlined
           } else {
-            Repartition(conf.numShufflePartitions, shuffle = true, inlined)
+            RepartitionByExpression(Seq.empty, inlined, None)

Review Comment:
   it uses RoundRobinPartitioning to do exchange which is same wih Repartition



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to