Ngone51 commented on a change in pull request #30480:
URL: https://github.com/apache/spark/pull/30480#discussion_r615501309



##########
File path: core/src/main/scala/org/apache/spark/MapOutputTracker.scala
##########
@@ -812,61 +1115,151 @@ private[spark] class MapOutputTrackerWorker(conf: 
SparkConf) extends MapOutputTr
       startPartition: Int,
       endPartition: Int): Iterator[(BlockManagerId, Seq[(BlockId, Long, 
Int)])] = {
     logDebug(s"Fetching outputs for shuffle $shuffleId")
-    val statuses = getStatuses(shuffleId, conf)
+    val (mapOutputStatuses, mergedOutputStatuses) = getStatuses(shuffleId, 
conf)
     try {
-      val actualEndMapIndex = if (endMapIndex == Int.MaxValue) statuses.length 
else endMapIndex
+      val actualEndMapIndex =
+        if (endMapIndex == Int.MaxValue) mapOutputStatuses.length else 
endMapIndex
       logDebug(s"Convert map statuses for shuffle $shuffleId, " +
         s"mappers $startMapIndex-$actualEndMapIndex, partitions 
$startPartition-$endPartition")
       MapOutputTracker.convertMapStatuses(
-        shuffleId, startPartition, endPartition, statuses, startMapIndex, 
actualEndMapIndex)
+        shuffleId, startPartition, endPartition, mapOutputStatuses, 
startMapIndex,
+          actualEndMapIndex, Option(mergedOutputStatuses))
     } catch {
       case e: MetadataFetchFailedException =>
         // We experienced a fetch failure so our mapStatuses cache is 
outdated; clear it:
         mapStatuses.clear()
+        mergeStatuses.clear()
+        throw e
+    }
+  }
+
+  override def getMapSizesForMergeResult(
+      shuffleId: Int,
+      partitionId: Int): Iterator[(BlockManagerId, Seq[(BlockId, Long, Int)])] 
= {
+    logDebug(s"Fetching backup outputs for shuffle $shuffleId, partition 
$partitionId")
+    // Fetch the map statuses and merge statuses again since they might have 
already been
+    // cleared by another task running in the same executor.
+    val (mapOutputStatuses, mergeResultStatuses) = getStatuses(shuffleId, conf)
+    try {
+      val mergeStatus = mergeResultStatuses(partitionId)
+      // If the original MergeStatus is no longer available, we cannot 
identify the list of
+      // unmerged blocks to fetch in this case. Throw 
MetadataFetchFailedException in this case.
+      MapOutputTracker.validateStatus(mergeStatus, shuffleId, partitionId)
+      // Use the MergeStatus's partition level bitmap since we are doing 
partition level fallback
+      MapOutputTracker.getMapStatusesForMergeStatus(shuffleId, partitionId,
+        mapOutputStatuses, mergeStatus.tracker)
+    } catch {
+      // We experienced a fetch failure so our mapStatuses cache is outdated; 
clear it
+      case e: MetadataFetchFailedException =>
+        mapStatuses.clear()
+        mergeStatuses.clear()
+        throw e
+    }
+  }
+
+  override def getMapSizesForMergeResult(
+      shuffleId: Int,
+      partitionId: Int,
+      chunkTracker: RoaringBitmap): Iterator[(BlockManagerId, Seq[(BlockId, 
Long, Int)])] = {
+    logDebug(s"Fetching backup outputs for shuffle $shuffleId, partition 
$partitionId")
+    // Fetch the map statuses and merge statuses again since they might have 
already been
+    // cleared by another task running in the same executor.
+    val (mapOutputStatuses, _) = getStatuses(shuffleId, conf)
+    try {
+      MapOutputTracker.getMapStatusesForMergeStatus(shuffleId, partitionId, 
mapOutputStatuses,
+        chunkTracker)
+    } catch {
+      // We experienced a fetch failure so our mapStatuses cache is outdated; 
clear it:
+      case e: MetadataFetchFailedException =>
+        mapStatuses.clear()
+        mergeStatuses.clear()
         throw e
     }
   }
 
   /**
-   * Get or fetch the array of MapStatuses for a given shuffle ID. NOTE: 
clients MUST synchronize
+   * Get or fetch the array of MapStatuses and MergeStatuses if push based 
shuffle enabled
+   * for a given shuffle ID. NOTE: clients MUST synchronize
    * on this array when reading it, because on the driver, we may be changing 
it in place.
    *
    * (It would be nice to remove this restriction in the future.)
    */
-  private def getStatuses(shuffleId: Int, conf: SparkConf): Array[MapStatus] = 
{
-    val statuses = mapStatuses.get(shuffleId).orNull
-    if (statuses == null) {
-      logInfo("Don't have map outputs for shuffle " + shuffleId + ", fetching 
them")
-      val startTimeNs = System.nanoTime()
-      fetchingLock.withLock(shuffleId) {
-        var fetchedStatuses = mapStatuses.get(shuffleId).orNull
-        if (fetchedStatuses == null) {
-          logInfo("Doing the fetch; tracker endpoint = " + trackerEndpoint)
-          val fetchedBytes = 
askTracker[Array[Byte]](GetMapOutputStatuses(shuffleId))
-          try {
-            fetchedStatuses = 
MapOutputTracker.deserializeMapStatuses(fetchedBytes, conf)
-          } catch {
-            case e: SparkException =>
-              throw new MetadataFetchFailedException(shuffleId, -1,
-                s"Unable to deserialize broadcasted map statuses for shuffle 
$shuffleId: " +
-                  e.getCause)
+  private def getStatuses(
+      shuffleId: Int,
+      conf: SparkConf): (Array[MapStatus], Array[MergeStatus]) = {
+    if (fetchMergeResult) {
+      val mapOutputStatuses = mapStatuses.get(shuffleId).orNull
+      val mergeOutputStatuses = mergeStatuses.get(shuffleId).orNull
+
+      if (mapOutputStatuses == null || mergeOutputStatuses == null) {
+        logInfo("Don't have map/merge outputs for shuffle " + shuffleId + ", 
fetching them")
+        val startTimeNs = System.nanoTime()
+        fetchingLock.withLock(shuffleId) {
+          var fetchedMapStatuses = mapStatuses.get(shuffleId).orNull
+          var fetchedMergeStatuses = mergeStatuses.get(shuffleId).orNull
+          if (fetchedMapStatuses == null || fetchedMergeStatuses == null) {
+            logInfo("Doing the fetch; tracker endpoint = " + trackerEndpoint)
+            val fetchedBytes =
+              askTracker[(Array[Byte], 
Array[Byte])](GetMapAndMergeResultStatuses(shuffleId))

Review comment:
       Yes. (cc @mridulm) 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to