cloud-fan commented on a change in pull request #27742: [SPARK-30991] Refactor 
AQE readers and RDDs
URL: https://github.com/apache/spark/pull/27742#discussion_r386207002
 
 

 ##########
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeLocalShuffleReader.scala
 ##########
 @@ -67,28 +63,32 @@ case class OptimizeLocalShuffleReader(conf: SQLConf) 
extends Rule[SparkPlan] {
     }
   }
 
-  private def createLocalReader(plan: SparkPlan): LocalShuffleReaderExec = {
+  private def createLocalReader(plan: SparkPlan): CustomShuffleReaderExec = {
     plan match {
-      case c @ CoalescedShuffleReaderExec(s: ShuffleQueryStageExec, _) =>
-        LocalShuffleReaderExec(
-          s, getPartitionStartIndices(s, Some(c.partitionStartIndices.length)))
+      case c @ CustomShuffleReaderExec(s: ShuffleQueryStageExec, _, _) =>
+        CustomShuffleReaderExec(
+          s, getPartitionSpecs(s, Some(c.partitionSpecs.length)), "local")
       case s: ShuffleQueryStageExec =>
-        LocalShuffleReaderExec(s, getPartitionStartIndices(s, None))
+        CustomShuffleReaderExec(s, getPartitionSpecs(s, None), "local")
     }
   }
 
   // TODO: this method assumes all shuffle blocks are the same data size. We 
should calculate the
   //       partition start indices based on block size to avoid data skew.
-  private def getPartitionStartIndices(
+  private def getPartitionSpecs(
       shuffleStage: ShuffleQueryStageExec,
-      advisoryParallelism: Option[Int]): Array[Array[Int]] = {
+      advisoryParallelism: Option[Int]): Array[ShufflePartitionSpec] = {
     val shuffleDep = shuffleStage.shuffle.shuffleDependency
     val numReducers = shuffleDep.partitioner.numPartitions
     val expectedParallelism = advisoryParallelism.getOrElse(numReducers)
     val numMappers = shuffleDep.rdd.getNumPartitions
-    Array.fill(numMappers) {
+    lazy val splitPoints =
 
 Review comment:
   So this is to avoid error if `numMappers` is empty? Maybe better to do it 
more explicitly
   ```
   if (numMappers == 0) {
     Array.empty
   } else {
     val splitPoints = ...
     ...
   }
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to