otterc commented on a change in pull request #30480:
URL: https://github.com/apache/spark/pull/30480#discussion_r614968783
##########
File path: core/src/main/scala/org/apache/spark/MapOutputTracker.scala
##########
@@ -812,61 +1115,151 @@ private[spark] class MapOutputTrackerWorker(conf:
SparkConf) extends MapOutputTr
startPartition: Int,
endPartition: Int): Iterator[(BlockManagerId, Seq[(BlockId, Long,
Int)])] = {
logDebug(s"Fetching outputs for shuffle $shuffleId")
- val statuses = getStatuses(shuffleId, conf)
+ val (mapOutputStatuses, mergedOutputStatuses) = getStatuses(shuffleId,
conf)
try {
- val actualEndMapIndex = if (endMapIndex == Int.MaxValue) statuses.length
else endMapIndex
+ val actualEndMapIndex =
+ if (endMapIndex == Int.MaxValue) mapOutputStatuses.length else
endMapIndex
logDebug(s"Convert map statuses for shuffle $shuffleId, " +
s"mappers $startMapIndex-$actualEndMapIndex, partitions
$startPartition-$endPartition")
MapOutputTracker.convertMapStatuses(
- shuffleId, startPartition, endPartition, statuses, startMapIndex,
actualEndMapIndex)
+ shuffleId, startPartition, endPartition, mapOutputStatuses,
startMapIndex,
+ actualEndMapIndex, Option(mergedOutputStatuses))
} catch {
case e: MetadataFetchFailedException =>
// We experienced a fetch failure so our mapStatuses cache is
outdated; clear it:
mapStatuses.clear()
+ mergeStatuses.clear()
+ throw e
+ }
+ }
+
+ override def getMapSizesForMergeResult(
+ shuffleId: Int,
+ partitionId: Int): Iterator[(BlockManagerId, Seq[(BlockId, Long, Int)])]
= {
+ logDebug(s"Fetching backup outputs for shuffle $shuffleId, partition
$partitionId")
+ // Fetch the map statuses and merge statuses again since they might have
already been
+ // cleared by another task running in the same executor.
+ val (mapOutputStatuses, mergeResultStatuses) = getStatuses(shuffleId, conf)
+ try {
+ val mergeStatus = mergeResultStatuses(partitionId)
+ // If the original MergeStatus is no longer available, we cannot
identify the list of
+ // unmerged blocks to fetch in this case. Throw
MetadataFetchFailedException in this case.
+ MapOutputTracker.validateStatus(mergeStatus, shuffleId, partitionId)
+ // Use the MergeStatus's partition level bitmap since we are doing
partition level fallback
+ MapOutputTracker.getMapStatusesForMergeStatus(shuffleId, partitionId,
+ mapOutputStatuses, mergeStatus.tracker)
+ } catch {
+ // We experienced a fetch failure so our mapStatuses cache is outdated;
clear it
+ case e: MetadataFetchFailedException =>
+ mapStatuses.clear()
+ mergeStatuses.clear()
+ throw e
+ }
+ }
+
+ override def getMapSizesForMergeResult(
Review comment:
Same as above. Used by the iterator during fallback
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]