Victsm commented on a change in pull request #30480:
URL: https://github.com/apache/spark/pull/30480#discussion_r598314382
##########
File path: core/src/main/scala/org/apache/spark/MapOutputTracker.scala
##########
@@ -987,18 +1277,51 @@ private[spark] object MapOutputTracker extends Logging {
shuffleId: Int,
startPartition: Int,
endPartition: Int,
- statuses: Array[MapStatus],
+ mapStatuses: Array[MapStatus],
startMapIndex : Int,
- endMapIndex: Int): Iterator[(BlockManagerId, Seq[(BlockId, Long, Int)])]
= {
- assert (statuses != null)
+ endMapIndex: Int,
+ mergeStatuses: Option[Array[MergeStatus]] = None):
+ Iterator[(BlockManagerId, Seq[(BlockId, Long, Int)])] = {
+ assert (mapStatuses != null)
val splitsByAddress = new HashMap[BlockManagerId, ListBuffer[(BlockId,
Long, Int)]]
- val iter = statuses.iterator.zipWithIndex
- for ((status, mapIndex) <- iter.slice(startMapIndex, endMapIndex)) {
- if (status == null) {
- val errorMessage = s"Missing an output location for shuffle $shuffleId"
- logError(errorMessage)
- throw new MetadataFetchFailedException(shuffleId, startPartition,
errorMessage)
- } else {
+ // Only use MergeStatus for reduce tasks that fetch all map outputs. Since
a merged shuffle
+ // partition consists of blocks merged in random order, we are unable to
serve map index
+ // subrange requests. However, when a reduce task needs to fetch blocks
from a subrange of
+ // map outputs, it usually indicates skewed partitions which push-based
shuffle delegates
+ // to AQE to handle.
Review comment:
Just to further clarify a bit.
Push-based shuffle cannot support the current map ID based subdivision of a
shuffle partition into multiple smaller parts, because the blocks from
different mappers are merged out of order.
We could, however, support the chunk ID based subdivision of a merged
shuffle partition.
When blocks get merged, the current approach already divides the merged
shuffle file into multiple fix-sized chunks.
To leverage this, it would also require merging potentially skewed shuffle
partition as well.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]