ulysses-you commented on a change in pull request #33541:
URL: https://github.com/apache/spark/pull/33541#discussion_r679054239



##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEShuffleReadExec.scala
##########
@@ -69,6 +70,21 @@ case class AQEShuffleReadExec private(
         case _ =>
           throw new IllegalStateException("operating on canonicalization plan")
       }
+    } else if (isCoalescedRead) {

Review comment:
       We need ensure here is no skewedPartition.

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala
##########
@@ -249,13 +255,9 @@ object EnsureRequirements extends Rule[SparkPlan] {
   }
 
   def apply(plan: SparkPlan): SparkPlan = plan.transformUp {
-    // TODO: remove this after we create a physical operator for 
`RepartitionByExpression`.
-    // SPARK-35989: AQE will change the partition number so we should retain 
the REPARTITION_BY_NUM
-    // shuffle which is specified by user. And also we can not remove 
REBALANCE_PARTITIONS_BY_COL,
-    // it is a special shuffle used to rebalance partitions.
-    // So, here we only remove REPARTITION_BY_COL in AQE.
     case operator @ ShuffleExchangeExec(upper: HashPartitioning, child, 
shuffleOrigin)
-        if shuffleOrigin == REPARTITION_BY_COL || 
!conf.adaptiveExecutionEnabled =>
+        if optimizeOutRepartition &&
+          (shuffleOrigin == REPARTITION_BY_COL || shuffleOrigin == 
REPARTITION_BY_NUM) =>

Review comment:
       The previous check is to avoid AQE change output partitioning during 
execution (not the final stage). So I suggested do not remove all shuffle 
origin before.

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEShuffleReadExec.scala
##########
@@ -112,6 +128,16 @@ case class AQEShuffleReadExec private(
     partitionSpecs.exists(_.isInstanceOf[PartialMapperPartitionSpec]) ||
       partitionSpecs.exists(_.isInstanceOf[CoalescedMapperPartitionSpec])
 
+  def isCoalescedRead: Boolean = {
+    partitionSpecs.sliding(2).forall {
+      // A single partition spec which is `CoalescedPartitionSpec` also means 
coalesced read.
+      case Seq(_: CoalescedPartitionSpec) => true
+      case Seq(l: CoalescedPartitionSpec, r: CoalescedPartitionSpec) =>
+        l.endReducerIndex <= r.startReducerIndex

Review comment:
       shall we add check if the coalesced partition is only at first/last 
position.

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala
##########
@@ -249,13 +255,9 @@ object EnsureRequirements extends Rule[SparkPlan] {
   }
 
   def apply(plan: SparkPlan): SparkPlan = plan.transformUp {
-    // TODO: remove this after we create a physical operator for 
`RepartitionByExpression`.

Review comment:
       this todo is needed




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to