Victsm commented on a change in pull request #30480:
URL: https://github.com/apache/spark/pull/30480#discussion_r530549266
##########
File path: core/src/main/scala/org/apache/spark/MapOutputTracker.scala
##########
@@ -1011,4 +1333,47 @@ private[spark] object MapOutputTracker extends Logging {
splitsByAddress.mapValues(_.toSeq).iterator
}
+
+ /**
+ * Given a shuffle ID, a partition ID, an array of map statuses, and bitmap
corresponding
+ * to either a merged shuffle partition or a merged shuffle partition chunk,
identify
+ * the metadata about the shuffle partition blocks that are merged into the
merged shuffle
+ * partition or partition chunk represented by the bitmap.
+ *
+ * @param shuffleId Identifier for the shuffle
+ * @param partitionId The partition ID of the MergeStatus for which we look
for the metadata
+ * of the merged shuffle partition blocks
+ * @param mapStatuses List of map statuses, indexed by map ID
+ * @param tracker bitmap containing mapIndexes that belong to the merged
block or merged
+ * block chunk.
+ * @return A sequence of 2-item tuples, where the first item in the tuple is
a BlockManagerId,
+ * and the second item is a sequence of (shuffle block ID, shuffle
block size) tuples
+ * describing the shuffle blocks that are stored at that block
manager.
+ */
+ def getMapStatusesForMergeStatus(
+ shuffleId: Int,
+ partitionId: Int,
+ mapStatuses: Array[MapStatus],
+ tracker: RoaringBitmap): Seq[(BlockManagerId, Seq[(BlockId, Long,
Int)])] = {
+ assert (mapStatuses != null && tracker != null)
+ val splitsByAddress = new HashMap[BlockManagerId, ListBuffer[(BlockId,
Long, Int)]]
+ for ((status, mapIndex) <- mapStatuses.zipWithIndex) {
+ // Only add blocks that are merged
+ if (tracker.contains(mapIndex)) {
+ MapOutputTracker.validateStatus(status, shuffleId, partitionId)
+ splitsByAddress.getOrElseUpdate(status.location, ListBuffer()) +=
+ ((ShuffleBlockId(shuffleId, status.mapId, partitionId),
+ status.getSizeForBlock(partitionId), mapIndex))
+ }
+ }
+ splitsByAddress.toSeq
Review comment:
@dongjoon-hyun the Scala 2.13 compatibility issue should be fixed now.
The build is still failing with javadoc generation though.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]