Ngone51 commented on a change in pull request #34156:
URL: https://github.com/apache/spark/pull/34156#discussion_r720615835



##########
File path: core/src/main/scala/org/apache/spark/MapOutputTracker.scala
##########
@@ -519,17 +521,19 @@ private[spark] abstract class MapOutputTracker(conf: 
SparkConf) extends Logging
    * but endMapIndex is excluded). If endMapIndex=Int.MaxValue, the actual 
endMapIndex will be
    * changed to the length of total map outputs.
    *
-   * @return A sequence of 2-item tuples, where the first item in the tuple is 
a BlockManagerId,
-   *         and the second item is a sequence of (shuffle block id, shuffle 
block size, map index)
-   *         tuples describing the shuffle blocks that are stored at that 
block manager.
-   *         Note that zero-sized blocks are excluded in the result.
+   * @return A case class object which includes two attributes. The first 
attribute is a sequence
+   *         of 2-item tuples, where the first item in the tuple is a 
BlockManagerId, and the
+   *         second item is a sequence of (shuffle block id, shuffle block 
size, map index) tuples
+   *         tuples describing the shuffle blocks that are stored at that 
block manager. Note that
+   *         zero-sized blocks are excluded in the result. The second 
attribute is a boolean flag,
+   *         indicating whether batch fetch can be enabled.
    */
   def getMapSizesByExecutorId(
       shuffleId: Int,
       startMapIndex: Int,
       endMapIndex: Int,
       startPartition: Int,
-      endPartition: Int): Iterator[(BlockManagerId, Seq[(BlockId, Long, Int)])]
+      endPartition: Int): MapSizesByExecutorId

Review comment:
       Yeah, but as you may know, it's very common that Spark private APIs are 
abused as public APIs. We did break downstream use cases by changing private 
APIs, e.g., SQL UDF stuff.  And I know some users implement their own shuffle 
module by extending `ShuffleManager`,  `MapOutputTracker`, etc. Given that 
we're very close to the 3.2 release, I think it's not appropriate to have this 
kind of change at this moment. 
   
   If you guys still insist on  this way, I'd suggest to totally separate the 
push-based shuffle path with the normal shuffle path from 
`SortShuffleManager.getReader`, e.g.,
   
   ```scala
     override def getReader[K, C](
         handle: ShuffleHandle,
         startMapIndex: Int,
         endMapIndex: Int,
         startPartition: Int,
         endPartition: Int,
         context: TaskContext,
         metrics: ShuffleReadMetricsReporter): ShuffleReader[K, C] = {
       val (blocksByAddress, enableBatchFetch ) = if (fetchMergeResult) {
         val res = 
SparkEnv.get.mapOutputTracker.getPushBasedShuffleMapSizesByExecutorId(...)
         (res.iter, res.enableBatchFetch)
       } else {
         val address = 
SparkEnv.get.mapOutputTracker.getMapSizesByExecutorId(...)
         (address, canUseBatchFetch(startPartition, endPartition, context))
       }
       val blocksByAddress = 
       new BlockStoreShuffleReader(
         handle.asInstanceOf[BaseShuffleHandle[K, _, C]], blocksByAddress., 
context, metrics,
         shouldBatchFetch = enableBatchFetch)
             
     }
   
   ```
     




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to