otterc commented on a change in pull request #30480:
URL: https://github.com/apache/spark/pull/30480#discussion_r614968427
##########
File path: core/src/main/scala/org/apache/spark/MapOutputTracker.scala
##########
@@ -812,61 +1115,151 @@ private[spark] class MapOutputTrackerWorker(conf:
SparkConf) extends MapOutputTr
startPartition: Int,
endPartition: Int): Iterator[(BlockManagerId, Seq[(BlockId, Long,
Int)])] = {
logDebug(s"Fetching outputs for shuffle $shuffleId")
- val statuses = getStatuses(shuffleId, conf)
+ val (mapOutputStatuses, mergedOutputStatuses) = getStatuses(shuffleId,
conf)
try {
- val actualEndMapIndex = if (endMapIndex == Int.MaxValue) statuses.length
else endMapIndex
+ val actualEndMapIndex =
+ if (endMapIndex == Int.MaxValue) mapOutputStatuses.length else
endMapIndex
logDebug(s"Convert map statuses for shuffle $shuffleId, " +
s"mappers $startMapIndex-$actualEndMapIndex, partitions
$startPartition-$endPartition")
MapOutputTracker.convertMapStatuses(
- shuffleId, startPartition, endPartition, statuses, startMapIndex,
actualEndMapIndex)
+ shuffleId, startPartition, endPartition, mapOutputStatuses,
startMapIndex,
+ actualEndMapIndex, Option(mergedOutputStatuses))
} catch {
case e: MetadataFetchFailedException =>
// We experienced a fetch failure so our mapStatuses cache is
outdated; clear it:
mapStatuses.clear()
+ mergeStatuses.clear()
+ throw e
+ }
+ }
+
+ override def getMapSizesForMergeResult(
Review comment:
This is not just for test. It is used by the iterator when fetchfailure
happens for a merged block. This is the wip PR that depends on this change
https://github.com/apache/spark/pull/32140/
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]