mridulm commented on a change in pull request #30480:
URL: https://github.com/apache/spark/pull/30480#discussion_r601996217



##########
File path: core/src/main/scala/org/apache/spark/MapOutputTracker.scala
##########
@@ -987,18 +1277,51 @@ private[spark] object MapOutputTracker extends Logging {
       shuffleId: Int,
       startPartition: Int,
       endPartition: Int,
-      statuses: Array[MapStatus],
+      mapStatuses: Array[MapStatus],
       startMapIndex : Int,
-      endMapIndex: Int): Iterator[(BlockManagerId, Seq[(BlockId, Long, Int)])] 
= {
-    assert (statuses != null)
+      endMapIndex: Int,
+      mergeStatuses: Option[Array[MergeStatus]] = None):
+      Iterator[(BlockManagerId, Seq[(BlockId, Long, Int)])] = {
+    assert (mapStatuses != null)
     val splitsByAddress = new HashMap[BlockManagerId, ListBuffer[(BlockId, 
Long, Int)]]
-    val iter = statuses.iterator.zipWithIndex
-    for ((status, mapIndex) <- iter.slice(startMapIndex, endMapIndex)) {
-      if (status == null) {
-        val errorMessage = s"Missing an output location for shuffle $shuffleId"
-        logError(errorMessage)
-        throw new MetadataFetchFailedException(shuffleId, startPartition, 
errorMessage)
-      } else {
+    // Only use MergeStatus for reduce tasks that fetch all map outputs. Since 
a merged shuffle
+    // partition consists of blocks merged in random order, we are unable to 
serve map index
+    // subrange requests. However, when a reduce task needs to fetch blocks 
from a subrange of
+    // map outputs, it usually indicates skewed partitions which push-based 
shuffle delegates
+    // to AQE to handle.

Review comment:
       There are couple of things here:
   
   a) Can we leverage existing skew algo ? My understanding is we can, though 
it might not be necessarily as optimal as current reads for push based shuffle.
   What I mean is, if reducer r1.1 is processing m1-m100 and r1.2 is processing 
m101-m200 for reducer partition 1, we can ensure that m1-m100 can be satisfied 
with bin packing to get better read than reading from 100 mappers/ESS - right ? 
It is not as optimal as reading m1-m200, but it will be better than alternative.
   
   b) Alternative ways to split mapper input for a reducer : which is what you 
described, and this can be an option as spark evolves.
   
   Given both of these, we would want to make the comment a TODO with a jira 
for it - which can be addressed in some subsequent work.
   
   Or are there concerns with (a) or (b) that I am missing ?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to