dongjoon-hyun commented on a change in pull request #30480:
URL: https://github.com/apache/spark/pull/30480#discussion_r529776436



##########
File path: core/src/main/scala/org/apache/spark/MapOutputTracker.scala
##########
@@ -1011,4 +1333,47 @@ private[spark] object MapOutputTracker extends Logging {
 
     splitsByAddress.mapValues(_.toSeq).iterator
   }
+
+  /**
+   * Given a shuffle ID, a partition ID, an array of map statuses, and bitmap 
corresponding
+   * to either a merged shuffle partition or a merged shuffle partition chunk, 
identify
+   * the metadata about the shuffle partition blocks that are merged into the 
merged shuffle
+   * partition or partition chunk represented by the bitmap.
+   *
+   * @param shuffleId Identifier for the shuffle
+   * @param partitionId The partition ID of the MergeStatus for which we look 
for the metadata
+   *                    of the merged shuffle partition blocks
+   * @param mapStatuses List of map statuses, indexed by map ID
+   * @param tracker     bitmap containing mapIndexes that belong to the merged 
block or merged
+   *                    block chunk.
+   * @return A sequence of 2-item tuples, where the first item in the tuple is 
a BlockManagerId,
+   *         and the second item is a sequence of (shuffle block ID, shuffle 
block size) tuples
+   *         describing the shuffle blocks that are stored at that block 
manager.
+   */
+  def getMapStatusesForMergeStatus(
+      shuffleId: Int,
+      partitionId: Int,
+      mapStatuses: Array[MapStatus],
+      tracker: RoaringBitmap): Seq[(BlockManagerId, Seq[(BlockId, Long, 
Int)])] = {
+    assert (mapStatuses != null && tracker != null)
+    val splitsByAddress = new HashMap[BlockManagerId, ListBuffer[(BlockId, 
Long, Int)]]
+    for ((status, mapIndex) <- mapStatuses.zipWithIndex) {
+      // Only add blocks that are merged
+      if (tracker.contains(mapIndex)) {
+        MapOutputTracker.validateStatus(status, shuffleId, partitionId)
+        splitsByAddress.getOrElseUpdate(status.location, ListBuffer()) +=
+          ((ShuffleBlockId(shuffleId, status.mapId, partitionId),
+            status.getSizeForBlock(partitionId), mapIndex))
+      }
+    }
+    splitsByAddress.toSeq

Review comment:
       @Victsm . First of all, could you check this place? This seems to break 
Scala 2.13 compilation.
   ```
   [error] 
/home/runner/work/spark/spark/core/src/main/scala/org/apache/spark/MapOutputTracker.scala:1369:21:
 type mismatch;
   [error]  found   : Seq[(org.apache.spark.storage.BlockManagerId, 
scala.collection.mutable.ListBuffer[(org.apache.spark.storage.BlockId, Long, 
Int)])]
   [error]  required: Seq[(org.apache.spark.storage.BlockManagerId, 
Seq[(org.apache.spark.storage.BlockId, Long, Int)])]
   [error]     splitsByAddress.toSeq
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to