Victsm commented on a change in pull request #30480:
URL: https://github.com/apache/spark/pull/30480#discussion_r598314382



##########
File path: core/src/main/scala/org/apache/spark/MapOutputTracker.scala
##########
@@ -987,18 +1277,51 @@ private[spark] object MapOutputTracker extends Logging {
       shuffleId: Int,
       startPartition: Int,
       endPartition: Int,
-      statuses: Array[MapStatus],
+      mapStatuses: Array[MapStatus],
       startMapIndex : Int,
-      endMapIndex: Int): Iterator[(BlockManagerId, Seq[(BlockId, Long, Int)])] 
= {
-    assert (statuses != null)
+      endMapIndex: Int,
+      mergeStatuses: Option[Array[MergeStatus]] = None):
+      Iterator[(BlockManagerId, Seq[(BlockId, Long, Int)])] = {
+    assert (mapStatuses != null)
     val splitsByAddress = new HashMap[BlockManagerId, ListBuffer[(BlockId, 
Long, Int)]]
-    val iter = statuses.iterator.zipWithIndex
-    for ((status, mapIndex) <- iter.slice(startMapIndex, endMapIndex)) {
-      if (status == null) {
-        val errorMessage = s"Missing an output location for shuffle $shuffleId"
-        logError(errorMessage)
-        throw new MetadataFetchFailedException(shuffleId, startPartition, 
errorMessage)
-      } else {
+    // Only use MergeStatus for reduce tasks that fetch all map outputs. Since 
a merged shuffle
+    // partition consists of blocks merged in random order, we are unable to 
serve map index
+    // subrange requests. However, when a reduce task needs to fetch blocks 
from a subrange of
+    // map outputs, it usually indicates skewed partitions which push-based 
shuffle delegates
+    // to AQE to handle.

Review comment:
       Just to further clarify a bit.
   Push-based shuffle cannot support the current map ID based subdivision of a 
shuffle partition into multiple smaller parts, because the blocks from 
different mappers are merged out of order.
   We could, however, support the chunk ID based subdivision of a merged 
shuffle partition.
   When blocks get merged, the current approach already divides the merged 
shuffle file into multiple fix-sized chunks.
   To leverage this, it would also require merging potentially skewed shuffle 
partition as well.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to