Ngone51 commented on a change in pull request #30480:
URL: https://github.com/apache/spark/pull/30480#discussion_r615503198



##########
File path: core/src/main/scala/org/apache/spark/MapOutputTracker.scala
##########
@@ -1000,18 +1403,55 @@ private[spark] object MapOutputTracker extends Logging {
       shuffleId: Int,
       startPartition: Int,
       endPartition: Int,
-      statuses: Array[MapStatus],
+      mapStatuses: Array[MapStatus],
       startMapIndex : Int,
-      endMapIndex: Int): Iterator[(BlockManagerId, Seq[(BlockId, Long, Int)])] 
= {
-    assert (statuses != null)
+      endMapIndex: Int,
+      mergeStatuses: Option[Array[MergeStatus]] = None):
+      Iterator[(BlockManagerId, Seq[(BlockId, Long, Int)])] = {
+    assert (mapStatuses != null)
     val splitsByAddress = new HashMap[BlockManagerId, ListBuffer[(BlockId, 
Long, Int)]]
-    val iter = statuses.iterator.zipWithIndex
-    for ((status, mapIndex) <- iter.slice(startMapIndex, endMapIndex)) {
-      if (status == null) {
-        val errorMessage = s"Missing an output location for shuffle $shuffleId"
-        logError(errorMessage)
-        throw new MetadataFetchFailedException(shuffleId, startPartition, 
errorMessage)
-      } else {
+    // Only use MergeStatus for reduce tasks that fetch all map outputs. Since 
a merged shuffle
+    // partition consists of blocks merged in random order, we are unable to 
serve map index
+    // subrange requests. However, when a reduce task needs to fetch blocks 
from a subrange of
+    // map outputs, it usually indicates skewed partitions which push-based 
shuffle delegates
+    // to AQE to handle.
+    // TODO: SPARK-35036: Instead of reading map blocks in case of AQE with 
Push based shuffle,
+    // TODO: improve push based shuffle to read partial merged blocks 
satisfying the start/end
+    // TODO: map indexes
+    if (mergeStatuses.isDefined && startMapIndex == 0 && endMapIndex == 
mapStatuses.length) {
+      // We have MergeStatus and full range of mapIds are requested so return 
a merged block.
+      val numMaps = mapStatuses.length
+      mergeStatuses.get.zipWithIndex.slice(startPartition, 
endPartition).foreach {
+        case (mergeStatus, partId) =>
+          val remainingMapStatuses = if (mergeStatus != null && 
mergeStatus.totalSize > 0) {

Review comment:
       OK, I'll review it in detail when it gets in.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to