otterc commented on a change in pull request #30480:
URL: https://github.com/apache/spark/pull/30480#discussion_r614972228
##########
File path: core/src/main/scala/org/apache/spark/MapOutputTracker.scala
##########
@@ -1000,18 +1403,55 @@ private[spark] object MapOutputTracker extends Logging {
shuffleId: Int,
startPartition: Int,
endPartition: Int,
- statuses: Array[MapStatus],
+ mapStatuses: Array[MapStatus],
startMapIndex : Int,
- endMapIndex: Int): Iterator[(BlockManagerId, Seq[(BlockId, Long, Int)])]
= {
- assert (statuses != null)
+ endMapIndex: Int,
+ mergeStatuses: Option[Array[MergeStatus]] = None):
+ Iterator[(BlockManagerId, Seq[(BlockId, Long, Int)])] = {
+ assert (mapStatuses != null)
val splitsByAddress = new HashMap[BlockManagerId, ListBuffer[(BlockId,
Long, Int)]]
- val iter = statuses.iterator.zipWithIndex
- for ((status, mapIndex) <- iter.slice(startMapIndex, endMapIndex)) {
- if (status == null) {
- val errorMessage = s"Missing an output location for shuffle $shuffleId"
- logError(errorMessage)
- throw new MetadataFetchFailedException(shuffleId, startPartition,
errorMessage)
- } else {
+ // Only use MergeStatus for reduce tasks that fetch all map outputs. Since
a merged shuffle
+ // partition consists of blocks merged in random order, we are unable to
serve map index
+ // subrange requests. However, when a reduce task needs to fetch blocks
from a subrange of
+ // map outputs, it usually indicates skewed partitions which push-based
shuffle delegates
+ // to AQE to handle.
+ // TODO: SPARK-35036: Instead of reading map blocks in case of AQE with
Push based shuffle,
+ // TODO: improve push based shuffle to read partial merged blocks
satisfying the start/end
+ // TODO: map indexes
+ if (mergeStatuses.isDefined && startMapIndex == 0 && endMapIndex ==
mapStatuses.length) {
+ // We have MergeStatus and full range of mapIds are requested so return
a merged block.
+ val numMaps = mapStatuses.length
+ mergeStatuses.get.zipWithIndex.slice(startPartition,
endPartition).foreach {
+ case (mergeStatus, partId) =>
+ val remainingMapStatuses = if (mergeStatus != null &&
mergeStatus.totalSize > 0) {
Review comment:
For fallback support we have added the `getMapSizesForMergeResult`
methods. Once there is a fetch failure for a merged shuffle block/chunk then
the iterator queries the MOT to get all the map statuses for the blocks that
are part of that merged shuffle block/chunk using these new methods. For
reference this is the PR:
https://github.com/apache/spark/pull/32140/
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]