Ngone51 commented on a change in pull request #34156:
URL: https://github.com/apache/spark/pull/34156#discussion_r720642425



##########
File path: core/src/main/scala/org/apache/spark/MapOutputTracker.scala
##########
@@ -519,17 +521,19 @@ private[spark] abstract class MapOutputTracker(conf: 
SparkConf) extends Logging
    * but endMapIndex is excluded). If endMapIndex=Int.MaxValue, the actual 
endMapIndex will be
    * changed to the length of total map outputs.
    *
-   * @return A sequence of 2-item tuples, where the first item in the tuple is 
a BlockManagerId,
-   *         and the second item is a sequence of (shuffle block id, shuffle 
block size, map index)
-   *         tuples describing the shuffle blocks that are stored at that 
block manager.
-   *         Note that zero-sized blocks are excluded in the result.
+   * @return A case class object which includes two attributes. The first 
attribute is a sequence
+   *         of 2-item tuples, where the first item in the tuple is a 
BlockManagerId, and the
+   *         second item is a sequence of (shuffle block id, shuffle block 
size, map index) tuples
+   *         tuples describing the shuffle blocks that are stored at that 
block manager. Note that
+   *         zero-sized blocks are excluded in the result. The second 
attribute is a boolean flag,
+   *         indicating whether batch fetch can be enabled.
    */
   def getMapSizesByExecutorId(
       shuffleId: Int,
       startMapIndex: Int,
       endMapIndex: Int,
       startPartition: Int,
-      endPartition: Int): Iterator[(BlockManagerId, Seq[(BlockId, Long, Int)])]
+      endPartition: Int): MapSizesByExecutorId

Review comment:
       Maybe, an equivalent way to do the same thing by `MapSizesByExecutorId` 
is:
   
   ```scala
   override def getReader[K, C](
         handle: ShuffleHandle,
         startMapIndex: Int,
         endMapIndex: Int,
         startPartition: Int,
         endPartition: Int,
         context: TaskContext,
         metrics: ShuffleReadMetricsReporter): ShuffleReader[K, C] = {
       val blocksByAddress = 
SparkEnv.get.mapOutputTracker.getMapSizesByExecutorId(
         handle.shuffleId, startMapIndex, endMapIndex, startPartition, 
endPartition)
      val hasMergedBlock = blocksByAddress.exists(_._1.host == 
SHUFFLE_MERGER_IDENTIFIER)
       new BlockStoreShuffleReader(
         handle.asInstanceOf[BaseShuffleHandle[K, _, C]], blocksByAddress, 
context, metrics,
         shouldBatchFetch =  !hasMergedBlock && 
canUseBatchFetch(startPartition, endPartition, context))
     }
   ```
   which looks simpler.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to