Ngone51 commented on a change in pull request #32140:
URL: https://github.com/apache/spark/pull/32140#discussion_r648285195
##########
File path:
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
##########
@@ -347,20 +361,48 @@ final class ShuffleBlockFetcherIterator(
}
}
- private[this] def partitionBlocksByFetchMode(): ArrayBuffer[FetchRequest] = {
+ /**
+ * This is called from initialize and also from the fallback which is
triggered from
+ * [[PushBasedFetchHelper]].
+ */
+ private[this] def partitionBlocksByFetchMode(
+ blocksByAddress: Iterator[(BlockManagerId, Seq[(BlockId, Long, Int)])],
+ localBlocks: scala.collection.mutable.LinkedHashSet[(BlockId, Int)],
+ hostLocalBlocksByExecutor: mutable.LinkedHashMap[BlockManagerId,
Seq[(BlockId, Long, Int)]],
+ mergedLocalBlocks: mutable.LinkedHashSet[BlockId]):
ArrayBuffer[FetchRequest] = {
logDebug(s"maxBytesInFlight: $maxBytesInFlight, targetRemoteRequestSize: "
+ s"$targetRemoteRequestSize, maxBlocksInFlightPerAddress:
$maxBlocksInFlightPerAddress")
- // Partition to local, host-local and remote blocks. Remote blocks are
further split into
- // FetchRequests of size at most maxBytesInFlight in order to limit the
amount of data in flight
+ // Partition to local, host-local, merged-local, remote (includes
merged-remote) blocks.
+ // Remote blocks are further split into FetchRequests of size at most
maxBytesInFlight in order
+ // to limit the amount of data in flight
val collectedRemoteRequests = new ArrayBuffer[FetchRequest]
+ val hostLocalBlocksCurrentIteration = mutable.LinkedHashSet[(BlockId,
Int)]()
var localBlockBytes = 0L
var hostLocalBlockBytes = 0L
+ var mergedLocalBlockBytes = 0L
var remoteBlockBytes = 0L
+ val prevNumBlocksToFetch = numBlocksToFetch
val fallback = FallbackStorage.FALLBACK_BLOCK_MANAGER_ID.executorId
for ((address, blockInfos) <- blocksByAddress) {
- if (Seq(blockManager.blockManagerId.executorId,
fallback).contains(address.executorId)) {
+ if (pushBasedFetchHelper.isMergedShuffleBlockAddress(address)) {
+ // These are push-based merged blocks or chunks of these merged blocks.
+ if (address.host == blockManager.blockManagerId.host) {
+ checkBlockSizes(blockInfos)
+ val pushMergedBlockInfos = mergeContinuousShuffleBlockIdsIfNeeded(
+ blockInfos.map(info => FetchBlockInfo(info._1, info._2, info._3)),
doBatchFetch = false)
+ numBlocksToFetch += pushMergedBlockInfos.size
+ mergedLocalBlocks ++= pushMergedBlockInfos.map(info => info.blockId)
+ mergedLocalBlockBytes += pushMergedBlockInfos.map(_.size).sum
+ logInfo(s"Got ${pushMergedBlockInfos.size} local merged blocks " +
+ s"of size $mergedLocalBlockBytes")
Review comment:
I guess you actually want the `pushMergedBlockInfos.map(_.size).sum`
instead of the accumulated `mergedLocalBlockBytes`?
##########
File path:
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
##########
@@ -347,20 +361,48 @@ final class ShuffleBlockFetcherIterator(
}
}
- private[this] def partitionBlocksByFetchMode(): ArrayBuffer[FetchRequest] = {
+ /**
+ * This is called from initialize and also from the fallback which is
triggered from
+ * [[PushBasedFetchHelper]].
+ */
+ private[this] def partitionBlocksByFetchMode(
+ blocksByAddress: Iterator[(BlockManagerId, Seq[(BlockId, Long, Int)])],
+ localBlocks: scala.collection.mutable.LinkedHashSet[(BlockId, Int)],
Review comment:
nit: `mutable.LinkedHashSet` ?
##########
File path:
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
##########
@@ -376,48 +418,62 @@ final class ShuffleBlockFetcherIterator(
val blocksForAddress =
mergedBlockInfos.map(info => (info.blockId, info.size,
info.mapIndex))
hostLocalBlocksByExecutor += address -> blocksForAddress
- hostLocalBlocks ++= blocksForAddress.map(info => (info._1, info._3))
+ hostLocalBlocksCurrentIteration ++= blocksForAddress.map(info =>
(info._1, info._3))
hostLocalBlockBytes += mergedBlockInfos.map(_.size).sum
} else {
remoteBlockBytes += blockInfos.map(_._2).sum
collectFetchRequests(address, blockInfos, collectedRemoteRequests)
}
}
val numRemoteBlocks = collectedRemoteRequests.map(_.blocks.size).sum
- val totalBytes = localBlockBytes + remoteBlockBytes + hostLocalBlockBytes
- assert(numBlocksToFetch == localBlocks.size + hostLocalBlocks.size +
numRemoteBlocks,
- s"The number of non-empty blocks $numBlocksToFetch doesn't equal to the
number of local " +
- s"blocks ${localBlocks.size} + the number of host-local blocks
${hostLocalBlocks.size} " +
- s"+ the number of remote blocks ${numRemoteBlocks}.")
- logInfo(s"Getting $numBlocksToFetch (${Utils.bytesToString(totalBytes)})
non-empty blocks " +
- s"including ${localBlocks.size}
(${Utils.bytesToString(localBlockBytes)}) local and " +
- s"${hostLocalBlocks.size} (${Utils.bytesToString(hostLocalBlockBytes)})
" +
- s"host-local and $numRemoteBlocks
(${Utils.bytesToString(remoteBlockBytes)}) remote blocks")
+ val totalBytes = localBlockBytes + remoteBlockBytes + hostLocalBlockBytes +
+ mergedLocalBlockBytes
+ val blocksToFetchCurrentIteration = numBlocksToFetch - prevNumBlocksToFetch
+ assert(blocksToFetchCurrentIteration == localBlocks.size +
+ hostLocalBlocksCurrentIteration.size + numRemoteBlocks +
mergedLocalBlocks.size,
+ s"The number of non-empty blocks $blocksToFetchCurrentIteration doesn't
equal to " +
+ s"the number of local blocks ${localBlocks.size} + " +
+ s"the number of host-local blocks
${hostLocalBlocksCurrentIteration.size} " +
+ s"the number of merged-local blocks ${mergedLocalBlocks.size} " +
+ s"+ the number of remote blocks ${numRemoteBlocks} ")
+ logInfo(s"[${context.taskAttemptId()}] Getting
$blocksToFetchCurrentIteration " +
Review comment:
If you try to log the task info here, could you try to follow the task
name format?
https://github.com/apache/spark/blob/7d8181b62f17a202ba584c7bba65b61ec4724db2/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala#L558
##########
File path:
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
##########
@@ -1124,4 +1392,298 @@ object ShuffleBlockFetcherIterator {
*/
private[storage]
case class DeferFetchRequestResult(fetchRequest: FetchRequest) extends
FetchResult
+
+ /**
+ * Result of a fetch from a remote merged block unsuccessfully.
+ * Instead of treating this as a FailureFetchResult, we ignore this failure
+ * and fallback to fetch the original unmerged blocks.
+ * @param blockId block id
+ * @param address BlockManager that the merged block was attempted to be
fetched from
+ * @param size size of the block, used to update bytesInFlight.
+ * @param isNetworkReqDone Is this the last network request for this host in
this fetch
+ * request. Used to update reqsInFlight.
+ */
+ private[storage] case class IgnoreFetchResult(blockId: BlockId,
+ address: BlockManagerId,
+ size: Long,
+ isNetworkReqDone: Boolean) extends FetchResult
+
+ /**
+ * Result of a successful fetch of meta information for a merged block.
+ *
+ * @param shuffleId shuffle id.
+ * @param reduceId reduce id.
+ * @param blockSize size of each merged block.
+ * @param numChunks number of chunks in the merged block.
+ * @param bitmaps bitmaps for every chunk.
+ * @param address BlockManager that the merged status was fetched
from.
+ */
+ private[storage] case class MergedBlocksMetaFetchResult(
+ shuffleId: Int,
+ reduceId: Int,
+ blockSize: Long,
+ numChunks: Int,
+ bitmaps: Array[RoaringBitmap],
+ address: BlockManagerId,
+ blockId: BlockId = DUMMY_SHUFFLE_BLOCK_ID) extends FetchResult
+
+ /**
+ * Result of a failure while fetching the meta information for a merged
block.
+ *
+ * @param shuffleId shuffle id.
+ * @param reduceId reduce id.
+ * @param address BlockManager that the merged status was fetched from.
+ */
+ private[storage] case class MergedBlocksMetaFailedFetchResult(
+ shuffleId: Int,
+ reduceId: Int,
+ address: BlockManagerId,
+ blockId: BlockId = DUMMY_SHUFFLE_BLOCK_ID) extends FetchResult
+}
+
+/**
+ * Helper class that encapsulates all the push-based functionality to fetch
merged block meta
+ * and merged shuffle block chunks.
+ */
+private class PushBasedFetchHelper(
+ private val iterator: ShuffleBlockFetcherIterator,
+ private val shuffleClient: BlockStoreClient,
+ private val blockManager: BlockManager,
+ private val mapOutputTracker: MapOutputTracker) extends Logging {
+
+ private[this] val startTimeNs = System.nanoTime()
+
+ private[this] val localShuffleMergerBlockMgrId = BlockManagerId(
+ SHUFFLE_MERGER_IDENTIFIER, blockManager.blockManagerId.host,
+ blockManager.blockManagerId.port, blockManager.blockManagerId.topologyInfo)
+
+ /** A map for storing merged block shuffle chunk bitmap */
+ private[this] val chunksMetaMap = new mutable.HashMap[ShuffleBlockChunkId,
RoaringBitmap]()
+
+ /**
+ * Returns true if the address is for a push-merged block.
+ */
+ def isMergedShuffleBlockAddress(address: BlockManagerId): Boolean = {
+ SHUFFLE_MERGER_IDENTIFIER.equals(address.executorId)
+ }
+
+ /**
+ * Returns true if the address is not of executor local or merged local
block. false otherwise.
+ */
+ def isNotExecutorOrMergedLocal(address: BlockManagerId): Boolean = {
+ (isMergedShuffleBlockAddress(address) && address.host !=
blockManager.blockManagerId.host) ||
+ (!isMergedShuffleBlockAddress(address) && address !=
blockManager.blockManagerId)
+ }
+
+ /**
+ * Returns true if the address if of merged local block. false otherwise.
+ */
+ def isMergedLocal(address: BlockManagerId): Boolean = {
+ isMergedShuffleBlockAddress(address) && address.host ==
blockManager.blockManagerId.host
+ }
+
+ def getNumberOfBlocksInChunk(blockId : ShuffleBlockChunkId): Int = {
+ chunksMetaMap(blockId).getCardinality
+ }
+
+ def removeChunk(blockId: ShuffleBlockChunkId): Unit = {
+ chunksMetaMap.remove(blockId)
+ }
+
+ def createChunkBlockInfosFromMetaResponse(
+ shuffleId: Int,
+ reduceId: Int,
+ blockSize: Long,
+ numChunks: Int,
+ bitmaps: Array[RoaringBitmap]): ArrayBuffer[(BlockId, Long, Int)] = {
+ val approxChunkSize = blockSize / numChunks
+ val blocksToRequest: ArrayBuffer[(BlockId, Long, Int)] =
+ new ArrayBuffer[(BlockId, Long, Int)]()
+ for (i <- 0 until numChunks) {
+ val blockChunkId = ShuffleBlockChunkId(shuffleId, reduceId, i)
+ chunksMetaMap.put(blockChunkId, bitmaps(i))
+ logDebug(s"adding block chunk $blockChunkId of size $approxChunkSize")
+ blocksToRequest += ((blockChunkId, approxChunkSize, SHUFFLE_PUSH_MAP_ID))
+ }
+ blocksToRequest
+ }
+
+ def sendFetchMergedStatusRequest(req: FetchRequest): Unit = {
+ val sizeMap = req.blocks.map {
+ case FetchBlockInfo(blockId, size, _) =>
+ val shuffleBlockId = blockId.asInstanceOf[ShuffleBlockId]
+ ((shuffleBlockId.shuffleId, shuffleBlockId.reduceId), size)}.toMap
+ val address = req.address
+ val mergedBlocksMetaListener = new MergedBlocksMetaListener {
+ override def onSuccess(shuffleId: Int, reduceId: Int, meta:
MergedBlockMeta): Unit = {
+ logInfo(s"Received the meta of merged block for ($shuffleId,
$reduceId) " +
+ s"from ${req.address.host}:${req.address.port}")
+ try {
+ iterator.addToResultsQueue(MergedBlocksMetaFetchResult(shuffleId,
reduceId,
+ sizeMap(shuffleId, reduceId), meta.getNumChunks,
meta.readChunkBitmaps(), address))
+ } catch {
+ case _: Throwable =>
Review comment:
log error?
##########
File path:
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
##########
@@ -1124,4 +1392,298 @@ object ShuffleBlockFetcherIterator {
*/
private[storage]
case class DeferFetchRequestResult(fetchRequest: FetchRequest) extends
FetchResult
+
+ /**
+ * Result of a fetch from a remote merged block unsuccessfully.
+ * Instead of treating this as a FailureFetchResult, we ignore this failure
+ * and fallback to fetch the original unmerged blocks.
+ * @param blockId block id
+ * @param address BlockManager that the merged block was attempted to be
fetched from
+ * @param size size of the block, used to update bytesInFlight.
+ * @param isNetworkReqDone Is this the last network request for this host in
this fetch
+ * request. Used to update reqsInFlight.
+ */
+ private[storage] case class IgnoreFetchResult(blockId: BlockId,
+ address: BlockManagerId,
+ size: Long,
+ isNetworkReqDone: Boolean) extends FetchResult
+
+ /**
+ * Result of a successful fetch of meta information for a merged block.
+ *
+ * @param shuffleId shuffle id.
+ * @param reduceId reduce id.
+ * @param blockSize size of each merged block.
+ * @param numChunks number of chunks in the merged block.
+ * @param bitmaps bitmaps for every chunk.
+ * @param address BlockManager that the merged status was fetched
from.
+ */
+ private[storage] case class MergedBlocksMetaFetchResult(
+ shuffleId: Int,
+ reduceId: Int,
+ blockSize: Long,
+ numChunks: Int,
+ bitmaps: Array[RoaringBitmap],
+ address: BlockManagerId,
+ blockId: BlockId = DUMMY_SHUFFLE_BLOCK_ID) extends FetchResult
+
+ /**
+ * Result of a failure while fetching the meta information for a merged
block.
+ *
+ * @param shuffleId shuffle id.
+ * @param reduceId reduce id.
+ * @param address BlockManager that the merged status was fetched from.
+ */
+ private[storage] case class MergedBlocksMetaFailedFetchResult(
+ shuffleId: Int,
+ reduceId: Int,
+ address: BlockManagerId,
+ blockId: BlockId = DUMMY_SHUFFLE_BLOCK_ID) extends FetchResult
+}
+
+/**
+ * Helper class that encapsulates all the push-based functionality to fetch
merged block meta
+ * and merged shuffle block chunks.
+ */
+private class PushBasedFetchHelper(
+ private val iterator: ShuffleBlockFetcherIterator,
+ private val shuffleClient: BlockStoreClient,
+ private val blockManager: BlockManager,
+ private val mapOutputTracker: MapOutputTracker) extends Logging {
+
+ private[this] val startTimeNs = System.nanoTime()
+
+ private[this] val localShuffleMergerBlockMgrId = BlockManagerId(
+ SHUFFLE_MERGER_IDENTIFIER, blockManager.blockManagerId.host,
+ blockManager.blockManagerId.port, blockManager.blockManagerId.topologyInfo)
+
+ /** A map for storing merged block shuffle chunk bitmap */
+ private[this] val chunksMetaMap = new mutable.HashMap[ShuffleBlockChunkId,
RoaringBitmap]()
+
+ /**
+ * Returns true if the address is for a push-merged block.
+ */
+ def isMergedShuffleBlockAddress(address: BlockManagerId): Boolean = {
+ SHUFFLE_MERGER_IDENTIFIER.equals(address.executorId)
+ }
+
+ /**
+ * Returns true if the address is not of executor local or merged local
block. false otherwise.
+ */
+ def isNotExecutorOrMergedLocal(address: BlockManagerId): Boolean = {
+ (isMergedShuffleBlockAddress(address) && address.host !=
blockManager.blockManagerId.host) ||
+ (!isMergedShuffleBlockAddress(address) && address !=
blockManager.blockManagerId)
+ }
+
+ /**
+ * Returns true if the address if of merged local block. false otherwise.
+ */
+ def isMergedLocal(address: BlockManagerId): Boolean = {
+ isMergedShuffleBlockAddress(address) && address.host ==
blockManager.blockManagerId.host
+ }
+
+ def getNumberOfBlocksInChunk(blockId : ShuffleBlockChunkId): Int = {
+ chunksMetaMap(blockId).getCardinality
+ }
+
+ def removeChunk(blockId: ShuffleBlockChunkId): Unit = {
+ chunksMetaMap.remove(blockId)
+ }
+
+ def createChunkBlockInfosFromMetaResponse(
+ shuffleId: Int,
+ reduceId: Int,
+ blockSize: Long,
+ numChunks: Int,
+ bitmaps: Array[RoaringBitmap]): ArrayBuffer[(BlockId, Long, Int)] = {
+ val approxChunkSize = blockSize / numChunks
+ val blocksToRequest: ArrayBuffer[(BlockId, Long, Int)] =
+ new ArrayBuffer[(BlockId, Long, Int)]()
Review comment:
nit: `blocksToRequest` -> `blocksToFetch`?
##########
File path:
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
##########
@@ -347,20 +361,48 @@ final class ShuffleBlockFetcherIterator(
}
}
- private[this] def partitionBlocksByFetchMode(): ArrayBuffer[FetchRequest] = {
+ /**
+ * This is called from initialize and also from the fallback which is
triggered from
+ * [[PushBasedFetchHelper]].
+ */
+ private[this] def partitionBlocksByFetchMode(
+ blocksByAddress: Iterator[(BlockManagerId, Seq[(BlockId, Long, Int)])],
+ localBlocks: scala.collection.mutable.LinkedHashSet[(BlockId, Int)],
+ hostLocalBlocksByExecutor: mutable.LinkedHashMap[BlockManagerId,
Seq[(BlockId, Long, Int)]],
+ mergedLocalBlocks: mutable.LinkedHashSet[BlockId]):
ArrayBuffer[FetchRequest] = {
logDebug(s"maxBytesInFlight: $maxBytesInFlight, targetRemoteRequestSize: "
+ s"$targetRemoteRequestSize, maxBlocksInFlightPerAddress:
$maxBlocksInFlightPerAddress")
- // Partition to local, host-local and remote blocks. Remote blocks are
further split into
- // FetchRequests of size at most maxBytesInFlight in order to limit the
amount of data in flight
+ // Partition to local, host-local, merged-local, remote (includes
merged-remote) blocks.
+ // Remote blocks are further split into FetchRequests of size at most
maxBytesInFlight in order
+ // to limit the amount of data in flight
val collectedRemoteRequests = new ArrayBuffer[FetchRequest]
+ val hostLocalBlocksCurrentIteration = mutable.LinkedHashSet[(BlockId,
Int)]()
var localBlockBytes = 0L
var hostLocalBlockBytes = 0L
+ var mergedLocalBlockBytes = 0L
var remoteBlockBytes = 0L
+ val prevNumBlocksToFetch = numBlocksToFetch
val fallback = FallbackStorage.FALLBACK_BLOCK_MANAGER_ID.executorId
for ((address, blockInfos) <- blocksByAddress) {
- if (Seq(blockManager.blockManagerId.executorId,
fallback).contains(address.executorId)) {
+ if (pushBasedFetchHelper.isMergedShuffleBlockAddress(address)) {
+ // These are push-based merged blocks or chunks of these merged blocks.
+ if (address.host == blockManager.blockManagerId.host) {
+ checkBlockSizes(blockInfos)
+ val pushMergedBlockInfos = mergeContinuousShuffleBlockIdsIfNeeded(
+ blockInfos.map(info => FetchBlockInfo(info._1, info._2, info._3)),
doBatchFetch = false)
Review comment:
In case of `doBatchFetch = false`,
`mergeContinuousShuffleBlockIdsIfNeeded` simply returns unchanged blockInfos.
Why we need to call it?
##########
File path:
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
##########
@@ -1074,8 +1337,13 @@ object ShuffleBlockFetcherIterator {
* A request to fetch blocks from a remote BlockManager.
* @param address remote BlockManager to fetch from.
* @param blocks Sequence of the information for blocks to fetch from the
same address.
+ * @param hasMergedBlocks true if this request contains merged blocks; false
if it contains
Review comment:
"contains" or all blocks must be merged blocks?
##########
File path:
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
##########
@@ -767,6 +908,43 @@ final class ShuffleBlockFetcherIterator(
deferredFetchRequests.getOrElseUpdate(address, new
Queue[FetchRequest]())
defReqQueue.enqueue(request)
result = null
+
+ case IgnoreFetchResult(blockId, address, size, isNetworkReqDone) =>
+ if (pushBasedFetchHelper.isNotExecutorOrMergedLocal(address)) {
+ numBlocksInFlightPerAddress(address) =
numBlocksInFlightPerAddress(address) - 1
+ bytesInFlight -= size
+ }
+ if (isNetworkReqDone) {
+ reqsInFlight -= 1
+ logDebug("Number of requests in flight " + reqsInFlight)
+ }
+ numBlocksProcessed +=
pushBasedFetchHelper.initiateFallbackBlockFetchForMergedBlock(
+ blockId, address)
+ // Set result to null to trigger another iteration of the while loop
to get either
+ // a SuccessFetchResult or a FailureFetchResult.
+ result = null
+
+ case MergedBlocksMetaFetchResult(shuffleId, reduceId, blockSize,
numChunks, bitmaps,
+ address, _) =>
Review comment:
nit: 2 indents
##########
File path:
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
##########
@@ -835,8 +1013,11 @@ final class ShuffleBlockFetcherIterator(
def send(remoteAddress: BlockManagerId, request: FetchRequest): Unit = {
Review comment:
How about:
```scala
logDebug("Sending request for %d blocks (%s) from %s".format(
request.blocks.size, Utils.bytesToString(request.size),
request.address.hostPort))
if (hasMergedBlocks) {
pushBasedFetchHelper.sendFetchMergedStatusRequest(request)
} else {
sendRequest(request)
numBlocksInFlightPerAddress(remoteAddress) =
numBlocksInFlightPerAddress.getOrElse(remoteAddress, 0) +
request.blocks.size
}
```
##########
File path:
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
##########
@@ -1124,4 +1392,298 @@ object ShuffleBlockFetcherIterator {
*/
private[storage]
case class DeferFetchRequestResult(fetchRequest: FetchRequest) extends
FetchResult
+
+ /**
+ * Result of a fetch from a remote merged block unsuccessfully.
+ * Instead of treating this as a FailureFetchResult, we ignore this failure
+ * and fallback to fetch the original unmerged blocks.
+ * @param blockId block id
+ * @param address BlockManager that the merged block was attempted to be
fetched from
+ * @param size size of the block, used to update bytesInFlight.
+ * @param isNetworkReqDone Is this the last network request for this host in
this fetch
+ * request. Used to update reqsInFlight.
+ */
+ private[storage] case class IgnoreFetchResult(blockId: BlockId,
+ address: BlockManagerId,
+ size: Long,
+ isNetworkReqDone: Boolean) extends FetchResult
+
+ /**
+ * Result of a successful fetch of meta information for a merged block.
+ *
+ * @param shuffleId shuffle id.
+ * @param reduceId reduce id.
+ * @param blockSize size of each merged block.
+ * @param numChunks number of chunks in the merged block.
+ * @param bitmaps bitmaps for every chunk.
+ * @param address BlockManager that the merged status was fetched
from.
+ */
+ private[storage] case class MergedBlocksMetaFetchResult(
+ shuffleId: Int,
+ reduceId: Int,
+ blockSize: Long,
+ numChunks: Int,
+ bitmaps: Array[RoaringBitmap],
+ address: BlockManagerId,
+ blockId: BlockId = DUMMY_SHUFFLE_BLOCK_ID) extends FetchResult
+
+ /**
+ * Result of a failure while fetching the meta information for a merged
block.
+ *
+ * @param shuffleId shuffle id.
+ * @param reduceId reduce id.
+ * @param address BlockManager that the merged status was fetched from.
+ */
+ private[storage] case class MergedBlocksMetaFailedFetchResult(
+ shuffleId: Int,
+ reduceId: Int,
+ address: BlockManagerId,
+ blockId: BlockId = DUMMY_SHUFFLE_BLOCK_ID) extends FetchResult
+}
+
+/**
+ * Helper class that encapsulates all the push-based functionality to fetch
merged block meta
+ * and merged shuffle block chunks.
+ */
+private class PushBasedFetchHelper(
+ private val iterator: ShuffleBlockFetcherIterator,
+ private val shuffleClient: BlockStoreClient,
+ private val blockManager: BlockManager,
+ private val mapOutputTracker: MapOutputTracker) extends Logging {
+
+ private[this] val startTimeNs = System.nanoTime()
+
+ private[this] val localShuffleMergerBlockMgrId = BlockManagerId(
+ SHUFFLE_MERGER_IDENTIFIER, blockManager.blockManagerId.host,
+ blockManager.blockManagerId.port, blockManager.blockManagerId.topologyInfo)
+
+ /** A map for storing merged block shuffle chunk bitmap */
+ private[this] val chunksMetaMap = new mutable.HashMap[ShuffleBlockChunkId,
RoaringBitmap]()
+
+ /**
+ * Returns true if the address is for a push-merged block.
+ */
+ def isMergedShuffleBlockAddress(address: BlockManagerId): Boolean = {
+ SHUFFLE_MERGER_IDENTIFIER.equals(address.executorId)
+ }
+
+ /**
+ * Returns true if the address is not of executor local or merged local
block. false otherwise.
+ */
+ def isNotExecutorOrMergedLocal(address: BlockManagerId): Boolean = {
+ (isMergedShuffleBlockAddress(address) && address.host !=
blockManager.blockManagerId.host) ||
+ (!isMergedShuffleBlockAddress(address) && address !=
blockManager.blockManagerId)
+ }
+
+ /**
+ * Returns true if the address if of merged local block. false otherwise.
+ */
+ def isMergedLocal(address: BlockManagerId): Boolean = {
+ isMergedShuffleBlockAddress(address) && address.host ==
blockManager.blockManagerId.host
+ }
+
+ def getNumberOfBlocksInChunk(blockId : ShuffleBlockChunkId): Int = {
+ chunksMetaMap(blockId).getCardinality
+ }
+
+ def removeChunk(blockId: ShuffleBlockChunkId): Unit = {
+ chunksMetaMap.remove(blockId)
+ }
+
+ def createChunkBlockInfosFromMetaResponse(
+ shuffleId: Int,
+ reduceId: Int,
+ blockSize: Long,
+ numChunks: Int,
+ bitmaps: Array[RoaringBitmap]): ArrayBuffer[(BlockId, Long, Int)] = {
+ val approxChunkSize = blockSize / numChunks
+ val blocksToRequest: ArrayBuffer[(BlockId, Long, Int)] =
+ new ArrayBuffer[(BlockId, Long, Int)]()
+ for (i <- 0 until numChunks) {
+ val blockChunkId = ShuffleBlockChunkId(shuffleId, reduceId, i)
+ chunksMetaMap.put(blockChunkId, bitmaps(i))
+ logDebug(s"adding block chunk $blockChunkId of size $approxChunkSize")
+ blocksToRequest += ((blockChunkId, approxChunkSize, SHUFFLE_PUSH_MAP_ID))
+ }
+ blocksToRequest
+ }
+
+ def sendFetchMergedStatusRequest(req: FetchRequest): Unit = {
+ val sizeMap = req.blocks.map {
+ case FetchBlockInfo(blockId, size, _) =>
+ val shuffleBlockId = blockId.asInstanceOf[ShuffleBlockId]
+ ((shuffleBlockId.shuffleId, shuffleBlockId.reduceId), size)}.toMap
+ val address = req.address
+ val mergedBlocksMetaListener = new MergedBlocksMetaListener {
+ override def onSuccess(shuffleId: Int, reduceId: Int, meta:
MergedBlockMeta): Unit = {
+ logInfo(s"Received the meta of merged block for ($shuffleId,
$reduceId) " +
+ s"from ${req.address.host}:${req.address.port}")
+ try {
+ iterator.addToResultsQueue(MergedBlocksMetaFetchResult(shuffleId,
reduceId,
+ sizeMap(shuffleId, reduceId), meta.getNumChunks,
meta.readChunkBitmaps(), address))
+ } catch {
+ case _: Throwable =>
+ iterator.addToResultsQueue(
+ MergedBlocksMetaFailedFetchResult(shuffleId, reduceId, address))
+ }
+ }
+
+ override def onFailure(shuffleId: Int, reduceId: Int, exception:
Throwable): Unit = {
+ logError(s"Failed to get the meta of merged blocks for ($shuffleId,
$reduceId) " +
+ s"from ${req.address.host}:${req.address.port}", exception)
+
iterator.addToResultsQueue(MergedBlocksMetaFailedFetchResult(shuffleId,
reduceId, address))
+ }
+ }
+ req.blocks.foreach(block => {
Review comment:
nit:
```scala
req.blocks.foreach { block =>
...
}
```
##########
File path:
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
##########
@@ -347,20 +361,48 @@ final class ShuffleBlockFetcherIterator(
}
}
- private[this] def partitionBlocksByFetchMode(): ArrayBuffer[FetchRequest] = {
+ /**
+ * This is called from initialize and also from the fallback which is
triggered from
+ * [[PushBasedFetchHelper]].
+ */
+ private[this] def partitionBlocksByFetchMode(
+ blocksByAddress: Iterator[(BlockManagerId, Seq[(BlockId, Long, Int)])],
+ localBlocks: scala.collection.mutable.LinkedHashSet[(BlockId, Int)],
+ hostLocalBlocksByExecutor: mutable.LinkedHashMap[BlockManagerId,
Seq[(BlockId, Long, Int)]],
+ mergedLocalBlocks: mutable.LinkedHashSet[BlockId]):
ArrayBuffer[FetchRequest] = {
logDebug(s"maxBytesInFlight: $maxBytesInFlight, targetRemoteRequestSize: "
+ s"$targetRemoteRequestSize, maxBlocksInFlightPerAddress:
$maxBlocksInFlightPerAddress")
- // Partition to local, host-local and remote blocks. Remote blocks are
further split into
- // FetchRequests of size at most maxBytesInFlight in order to limit the
amount of data in flight
+ // Partition to local, host-local, merged-local, remote (includes
merged-remote) blocks.
+ // Remote blocks are further split into FetchRequests of size at most
maxBytesInFlight in order
+ // to limit the amount of data in flight
val collectedRemoteRequests = new ArrayBuffer[FetchRequest]
+ val hostLocalBlocksCurrentIteration = mutable.LinkedHashSet[(BlockId,
Int)]()
var localBlockBytes = 0L
var hostLocalBlockBytes = 0L
+ var mergedLocalBlockBytes = 0L
var remoteBlockBytes = 0L
+ val prevNumBlocksToFetch = numBlocksToFetch
val fallback = FallbackStorage.FALLBACK_BLOCK_MANAGER_ID.executorId
for ((address, blockInfos) <- blocksByAddress) {
- if (Seq(blockManager.blockManagerId.executorId,
fallback).contains(address.executorId)) {
+ if (pushBasedFetchHelper.isMergedShuffleBlockAddress(address)) {
+ // These are push-based merged blocks or chunks of these merged blocks.
+ if (address.host == blockManager.blockManagerId.host) {
+ checkBlockSizes(blockInfos)
Review comment:
I'm wondering why don't we check block sizes in the first places since
we'd check for any type of blocks anyway?
e.g.,
```scala
for ((address, blockInfos) <- blocksByAddress) {
checkBlockSizes(blockInfos)
....
}
```
##########
File path:
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
##########
@@ -1124,4 +1392,298 @@ object ShuffleBlockFetcherIterator {
*/
private[storage]
case class DeferFetchRequestResult(fetchRequest: FetchRequest) extends
FetchResult
+
+ /**
+ * Result of a fetch from a remote merged block unsuccessfully.
+ * Instead of treating this as a FailureFetchResult, we ignore this failure
+ * and fallback to fetch the original unmerged blocks.
+ * @param blockId block id
+ * @param address BlockManager that the merged block was attempted to be
fetched from
+ * @param size size of the block, used to update bytesInFlight.
+ * @param isNetworkReqDone Is this the last network request for this host in
this fetch
+ * request. Used to update reqsInFlight.
+ */
+ private[storage] case class IgnoreFetchResult(blockId: BlockId,
+ address: BlockManagerId,
+ size: Long,
+ isNetworkReqDone: Boolean) extends FetchResult
+
+ /**
+ * Result of a successful fetch of meta information for a merged block.
+ *
+ * @param shuffleId shuffle id.
+ * @param reduceId reduce id.
+ * @param blockSize size of each merged block.
+ * @param numChunks number of chunks in the merged block.
+ * @param bitmaps bitmaps for every chunk.
+ * @param address BlockManager that the merged status was fetched
from.
+ */
+ private[storage] case class MergedBlocksMetaFetchResult(
+ shuffleId: Int,
+ reduceId: Int,
+ blockSize: Long,
+ numChunks: Int,
+ bitmaps: Array[RoaringBitmap],
+ address: BlockManagerId,
+ blockId: BlockId = DUMMY_SHUFFLE_BLOCK_ID) extends FetchResult
+
+ /**
+ * Result of a failure while fetching the meta information for a merged
block.
+ *
+ * @param shuffleId shuffle id.
+ * @param reduceId reduce id.
+ * @param address BlockManager that the merged status was fetched from.
+ */
+ private[storage] case class MergedBlocksMetaFailedFetchResult(
+ shuffleId: Int,
+ reduceId: Int,
+ address: BlockManagerId,
+ blockId: BlockId = DUMMY_SHUFFLE_BLOCK_ID) extends FetchResult
+}
+
+/**
+ * Helper class that encapsulates all the push-based functionality to fetch
merged block meta
+ * and merged shuffle block chunks.
+ */
+private class PushBasedFetchHelper(
+ private val iterator: ShuffleBlockFetcherIterator,
+ private val shuffleClient: BlockStoreClient,
+ private val blockManager: BlockManager,
+ private val mapOutputTracker: MapOutputTracker) extends Logging {
+
+ private[this] val startTimeNs = System.nanoTime()
+
+ private[this] val localShuffleMergerBlockMgrId = BlockManagerId(
+ SHUFFLE_MERGER_IDENTIFIER, blockManager.blockManagerId.host,
+ blockManager.blockManagerId.port, blockManager.blockManagerId.topologyInfo)
+
+ /** A map for storing merged block shuffle chunk bitmap */
+ private[this] val chunksMetaMap = new mutable.HashMap[ShuffleBlockChunkId,
RoaringBitmap]()
+
+ /**
+ * Returns true if the address is for a push-merged block.
+ */
+ def isMergedShuffleBlockAddress(address: BlockManagerId): Boolean = {
+ SHUFFLE_MERGER_IDENTIFIER.equals(address.executorId)
+ }
+
+ /**
+ * Returns true if the address is not of executor local or merged local
block. false otherwise.
+ */
+ def isNotExecutorOrMergedLocal(address: BlockManagerId): Boolean = {
+ (isMergedShuffleBlockAddress(address) && address.host !=
blockManager.blockManagerId.host) ||
+ (!isMergedShuffleBlockAddress(address) && address !=
blockManager.blockManagerId)
+ }
+
+ /**
+ * Returns true if the address if of merged local block. false otherwise.
+ */
+ def isMergedLocal(address: BlockManagerId): Boolean = {
+ isMergedShuffleBlockAddress(address) && address.host ==
blockManager.blockManagerId.host
+ }
+
+ def getNumberOfBlocksInChunk(blockId : ShuffleBlockChunkId): Int = {
+ chunksMetaMap(blockId).getCardinality
+ }
+
+ def removeChunk(blockId: ShuffleBlockChunkId): Unit = {
+ chunksMetaMap.remove(blockId)
+ }
+
+ def createChunkBlockInfosFromMetaResponse(
+ shuffleId: Int,
+ reduceId: Int,
+ blockSize: Long,
+ numChunks: Int,
+ bitmaps: Array[RoaringBitmap]): ArrayBuffer[(BlockId, Long, Int)] = {
+ val approxChunkSize = blockSize / numChunks
+ val blocksToRequest: ArrayBuffer[(BlockId, Long, Int)] =
+ new ArrayBuffer[(BlockId, Long, Int)]()
Review comment:
```suggestion
val blocksToRequest = new ArrayBuffer[(BlockId, Long, Int)]()
```
##########
File path:
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
##########
@@ -1124,4 +1392,298 @@ object ShuffleBlockFetcherIterator {
*/
private[storage]
case class DeferFetchRequestResult(fetchRequest: FetchRequest) extends
FetchResult
+
+ /**
+ * Result of a fetch from a remote merged block unsuccessfully.
+ * Instead of treating this as a FailureFetchResult, we ignore this failure
+ * and fallback to fetch the original unmerged blocks.
+ * @param blockId block id
+ * @param address BlockManager that the merged block was attempted to be
fetched from
+ * @param size size of the block, used to update bytesInFlight.
+ * @param isNetworkReqDone Is this the last network request for this host in
this fetch
+ * request. Used to update reqsInFlight.
+ */
+ private[storage] case class IgnoreFetchResult(blockId: BlockId,
+ address: BlockManagerId,
+ size: Long,
+ isNetworkReqDone: Boolean) extends FetchResult
+
+ /**
+ * Result of a successful fetch of meta information for a merged block.
+ *
+ * @param shuffleId shuffle id.
+ * @param reduceId reduce id.
+ * @param blockSize size of each merged block.
+ * @param numChunks number of chunks in the merged block.
+ * @param bitmaps bitmaps for every chunk.
+ * @param address BlockManager that the merged status was fetched
from.
+ */
+ private[storage] case class MergedBlocksMetaFetchResult(
+ shuffleId: Int,
+ reduceId: Int,
+ blockSize: Long,
+ numChunks: Int,
+ bitmaps: Array[RoaringBitmap],
+ address: BlockManagerId,
+ blockId: BlockId = DUMMY_SHUFFLE_BLOCK_ID) extends FetchResult
+
+ /**
+ * Result of a failure while fetching the meta information for a merged
block.
+ *
+ * @param shuffleId shuffle id.
+ * @param reduceId reduce id.
+ * @param address BlockManager that the merged status was fetched from.
+ */
+ private[storage] case class MergedBlocksMetaFailedFetchResult(
+ shuffleId: Int,
+ reduceId: Int,
+ address: BlockManagerId,
+ blockId: BlockId = DUMMY_SHUFFLE_BLOCK_ID) extends FetchResult
+}
+
+/**
+ * Helper class that encapsulates all the push-based functionality to fetch
merged block meta
+ * and merged shuffle block chunks.
+ */
+private class PushBasedFetchHelper(
Review comment:
How about making it a trait? And I think we can put it into a separate
file since it's not small.
##########
File path:
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
##########
@@ -767,6 +908,43 @@ final class ShuffleBlockFetcherIterator(
deferredFetchRequests.getOrElseUpdate(address, new
Queue[FetchRequest]())
defReqQueue.enqueue(request)
result = null
+
+ case IgnoreFetchResult(blockId, address, size, isNetworkReqDone) =>
+ if (pushBasedFetchHelper.isNotExecutorOrMergedLocal(address)) {
+ numBlocksInFlightPerAddress(address) =
numBlocksInFlightPerAddress(address) - 1
+ bytesInFlight -= size
+ }
+ if (isNetworkReqDone) {
+ reqsInFlight -= 1
+ logDebug("Number of requests in flight " + reqsInFlight)
+ }
+ numBlocksProcessed +=
pushBasedFetchHelper.initiateFallbackBlockFetchForMergedBlock(
+ blockId, address)
+ // Set result to null to trigger another iteration of the while loop
to get either
+ // a SuccessFetchResult or a FailureFetchResult.
+ result = null
+
+ case MergedBlocksMetaFetchResult(shuffleId, reduceId, blockSize,
numChunks, bitmaps,
+ address, _) =>
+ // The original meta request is processed so we decrease
numBlocksToFetch by 1. We will
+ // collect new chunks request and the count of this is added to
numBlocksToFetch in
+ // collectFetchReqsFromMergedBlocks.
+ numBlocksToFetch -= 1
+ val blocksToRequest =
pushBasedFetchHelper.createChunkBlockInfosFromMetaResponse(
+ shuffleId, reduceId, blockSize, numChunks, bitmaps)
+ val additionalRemoteReqs = new ArrayBuffer[FetchRequest]
+ collectFetchRequests(address, blocksToRequest.toSeq,
additionalRemoteReqs)
+ fetchRequests ++= additionalRemoteReqs
+ // Set result to null to force another iteration.
+ result = null
Review comment:
Hm..is it possible there's only `FetchRequest(hasMergedBlocks)` at the
beginning? In that case, it seems to cause the fetching process to hang.
We probably need to call `fetchUpToMaxBytes()` here if `reqsInFlight=0`.
##########
File path:
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
##########
@@ -1124,4 +1392,298 @@ object ShuffleBlockFetcherIterator {
*/
private[storage]
case class DeferFetchRequestResult(fetchRequest: FetchRequest) extends
FetchResult
+
+ /**
+ * Result of a fetch from a remote merged block unsuccessfully.
+ * Instead of treating this as a FailureFetchResult, we ignore this failure
+ * and fallback to fetch the original unmerged blocks.
+ * @param blockId block id
+ * @param address BlockManager that the merged block was attempted to be
fetched from
+ * @param size size of the block, used to update bytesInFlight.
+ * @param isNetworkReqDone Is this the last network request for this host in
this fetch
+ * request. Used to update reqsInFlight.
+ */
+ private[storage] case class IgnoreFetchResult(blockId: BlockId,
+ address: BlockManagerId,
+ size: Long,
+ isNetworkReqDone: Boolean) extends FetchResult
+
+ /**
+ * Result of a successful fetch of meta information for a merged block.
+ *
+ * @param shuffleId shuffle id.
+ * @param reduceId reduce id.
+ * @param blockSize size of each merged block.
+ * @param numChunks number of chunks in the merged block.
+ * @param bitmaps bitmaps for every chunk.
+ * @param address BlockManager that the merged status was fetched
from.
+ */
+ private[storage] case class MergedBlocksMetaFetchResult(
+ shuffleId: Int,
+ reduceId: Int,
+ blockSize: Long,
+ numChunks: Int,
+ bitmaps: Array[RoaringBitmap],
+ address: BlockManagerId,
+ blockId: BlockId = DUMMY_SHUFFLE_BLOCK_ID) extends FetchResult
+
+ /**
+ * Result of a failure while fetching the meta information for a merged
block.
+ *
+ * @param shuffleId shuffle id.
+ * @param reduceId reduce id.
+ * @param address BlockManager that the merged status was fetched from.
+ */
+ private[storage] case class MergedBlocksMetaFailedFetchResult(
+ shuffleId: Int,
+ reduceId: Int,
+ address: BlockManagerId,
+ blockId: BlockId = DUMMY_SHUFFLE_BLOCK_ID) extends FetchResult
+}
+
+/**
+ * Helper class that encapsulates all the push-based functionality to fetch
merged block meta
+ * and merged shuffle block chunks.
+ */
+private class PushBasedFetchHelper(
+ private val iterator: ShuffleBlockFetcherIterator,
+ private val shuffleClient: BlockStoreClient,
+ private val blockManager: BlockManager,
+ private val mapOutputTracker: MapOutputTracker) extends Logging {
+
+ private[this] val startTimeNs = System.nanoTime()
+
+ private[this] val localShuffleMergerBlockMgrId = BlockManagerId(
+ SHUFFLE_MERGER_IDENTIFIER, blockManager.blockManagerId.host,
+ blockManager.blockManagerId.port, blockManager.blockManagerId.topologyInfo)
+
+ /** A map for storing merged block shuffle chunk bitmap */
+ private[this] val chunksMetaMap = new mutable.HashMap[ShuffleBlockChunkId,
RoaringBitmap]()
+
+ /**
+ * Returns true if the address is for a push-merged block.
+ */
+ def isMergedShuffleBlockAddress(address: BlockManagerId): Boolean = {
+ SHUFFLE_MERGER_IDENTIFIER.equals(address.executorId)
+ }
+
+ /**
+ * Returns true if the address is not of executor local or merged local
block. false otherwise.
+ */
+ def isNotExecutorOrMergedLocal(address: BlockManagerId): Boolean = {
+ (isMergedShuffleBlockAddress(address) && address.host !=
blockManager.blockManagerId.host) ||
+ (!isMergedShuffleBlockAddress(address) && address !=
blockManager.blockManagerId)
+ }
+
+ /**
+ * Returns true if the address if of merged local block. false otherwise.
+ */
+ def isMergedLocal(address: BlockManagerId): Boolean = {
+ isMergedShuffleBlockAddress(address) && address.host ==
blockManager.blockManagerId.host
+ }
+
+ def getNumberOfBlocksInChunk(blockId : ShuffleBlockChunkId): Int = {
+ chunksMetaMap(blockId).getCardinality
+ }
+
+ def removeChunk(blockId: ShuffleBlockChunkId): Unit = {
+ chunksMetaMap.remove(blockId)
+ }
+
+ def createChunkBlockInfosFromMetaResponse(
+ shuffleId: Int,
+ reduceId: Int,
+ blockSize: Long,
+ numChunks: Int,
+ bitmaps: Array[RoaringBitmap]): ArrayBuffer[(BlockId, Long, Int)] = {
+ val approxChunkSize = blockSize / numChunks
+ val blocksToRequest: ArrayBuffer[(BlockId, Long, Int)] =
+ new ArrayBuffer[(BlockId, Long, Int)]()
+ for (i <- 0 until numChunks) {
+ val blockChunkId = ShuffleBlockChunkId(shuffleId, reduceId, i)
+ chunksMetaMap.put(blockChunkId, bitmaps(i))
+ logDebug(s"adding block chunk $blockChunkId of size $approxChunkSize")
+ blocksToRequest += ((blockChunkId, approxChunkSize, SHUFFLE_PUSH_MAP_ID))
+ }
+ blocksToRequest
+ }
+
+ def sendFetchMergedStatusRequest(req: FetchRequest): Unit = {
+ val sizeMap = req.blocks.map {
+ case FetchBlockInfo(blockId, size, _) =>
+ val shuffleBlockId = blockId.asInstanceOf[ShuffleBlockId]
+ ((shuffleBlockId.shuffleId, shuffleBlockId.reduceId), size)}.toMap
+ val address = req.address
+ val mergedBlocksMetaListener = new MergedBlocksMetaListener {
+ override def onSuccess(shuffleId: Int, reduceId: Int, meta:
MergedBlockMeta): Unit = {
+ logInfo(s"Received the meta of merged block for ($shuffleId,
$reduceId) " +
+ s"from ${req.address.host}:${req.address.port}")
+ try {
+ iterator.addToResultsQueue(MergedBlocksMetaFetchResult(shuffleId,
reduceId,
+ sizeMap(shuffleId, reduceId), meta.getNumChunks,
meta.readChunkBitmaps(), address))
+ } catch {
+ case _: Throwable =>
+ iterator.addToResultsQueue(
+ MergedBlocksMetaFailedFetchResult(shuffleId, reduceId, address))
+ }
+ }
+
+ override def onFailure(shuffleId: Int, reduceId: Int, exception:
Throwable): Unit = {
+ logError(s"Failed to get the meta of merged blocks for ($shuffleId,
$reduceId) " +
+ s"from ${req.address.host}:${req.address.port}", exception)
+
iterator.addToResultsQueue(MergedBlocksMetaFailedFetchResult(shuffleId,
reduceId, address))
+ }
+ }
+ req.blocks.foreach(block => {
+ val shuffleBlockId = block.blockId.asInstanceOf[ShuffleBlockId]
+ shuffleClient.getMergedBlockMeta(address.host, address.port,
shuffleBlockId.shuffleId,
+ shuffleBlockId.reduceId, mergedBlocksMetaListener)
+ })
+ }
+
+ // Fetch all outstanding merged local blocks
+ def fetchAllMergedLocalBlocks(
+ mergedLocalBlocks: mutable.LinkedHashSet[BlockId]): Unit = {
+ if (mergedLocalBlocks.nonEmpty) {
+ blockManager.hostLocalDirManager.foreach(fetchMergedLocalBlocks(_,
mergedLocalBlocks))
+ }
+ }
+
+ /**
+ * Fetch the merged blocks dirs if they are not in the cache and eventually
fetch merged local
+ * blocks.
+ */
+ private def fetchMergedLocalBlocks(
+ hostLocalDirManager: HostLocalDirManager,
+ mergedLocalBlocks: mutable.LinkedHashSet[BlockId]): Unit = {
+ val cachedMergerDirs = hostLocalDirManager.getCachedHostLocalDirs.get(
+ SHUFFLE_MERGER_IDENTIFIER)
+ if (cachedMergerDirs.isDefined) {
+ logDebug(s"Fetching local merged blocks with cached executors dir: " +
+ s"${cachedMergerDirs.get.mkString(", ")}")
+ mergedLocalBlocks.foreach(blockId =>
+ fetchMergedLocalBlock(blockId, cachedMergerDirs.get,
localShuffleMergerBlockMgrId))
+ } else {
+ logDebug(s"Asynchronous fetching local merged blocks without cached
executors dir")
+ hostLocalDirManager.getHostLocalDirs(localShuffleMergerBlockMgrId.host,
+ localShuffleMergerBlockMgrId.port, Array(SHUFFLE_MERGER_IDENTIFIER)) {
+ case Success(dirs) =>
+ mergedLocalBlocks.takeWhile {
+ blockId =>
+ logDebug(s"Successfully fetched local dirs: " +
+ s"${dirs.get(SHUFFLE_MERGER_IDENTIFIER).mkString(", ")}")
+ fetchMergedLocalBlock(blockId, dirs(SHUFFLE_MERGER_IDENTIFIER),
+ localShuffleMergerBlockMgrId)
+ }
+ logDebug(s"Got local merged blocks (without cached executors' dir)
in " +
+ s"${TimeUnit.NANOSECONDS.toMillis(System.nanoTime() -
startTimeNs)} ms")
+ case Failure(throwable) =>
+ // If we see an exception with getting the local dirs for local
merged blocks,
+ // we fallback to fetch the original unmerged blocks. We do not
report block fetch
+ // failure.
+ logWarning(s"Error occurred while getting the local dirs for local
merged " +
+ s"blocks: ${mergedLocalBlocks.mkString(", ")}. Fetch the original
blocks instead",
+ throwable)
+ mergedLocalBlocks.foreach(
+ blockId => iterator.addToResultsQueue(
+ IgnoreFetchResult(blockId, localShuffleMergerBlockMgrId, 0,
isNetworkReqDone = false))
+ )
+ }
+ }
+ }
+
+ /**
+ * Fetch a single local merged block generated.
+ * @param blockId ShuffleBlockId to be fetched
+ * @param localDirs Local directories where the merged shuffle files are
stored
+ * @param blockManagerId BlockManagerId
+ * @return Boolean represents successful or failed fetch
+ */
+ private[this] def fetchMergedLocalBlock(
+ blockId: BlockId,
+ localDirs: Array[String],
+ blockManagerId: BlockManagerId): Boolean = {
+ try {
+ val shuffleBlockId = blockId.asInstanceOf[ShuffleBlockId]
+ val chunksMeta = blockManager.getMergedBlockMeta(shuffleBlockId,
localDirs)
+ .readChunkBitmaps()
+ // Fetch local merged shuffle block data as multiple chunks
+ val bufs: Seq[ManagedBuffer] =
blockManager.getMergedBlockData(shuffleBlockId, localDirs)
+ // Update total number of blocks to fetch, reflecting the multiple local
chunks
+ iterator.foundMoreBlocksToFetch(bufs.size - 1)
+ for (chunkId <- bufs.indices) {
+ val buf = bufs(chunkId)
+ buf.retain()
+ val shuffleChunkId = ShuffleBlockChunkId(shuffleBlockId.shuffleId,
+ shuffleBlockId.reduceId, chunkId)
+ iterator.addToResultsQueue(
+ SuccessFetchResult(shuffleChunkId, SHUFFLE_PUSH_MAP_ID,
blockManagerId, buf.size(), buf,
+ isNetworkReqDone = false))
+ chunksMetaMap.put(shuffleChunkId, chunksMeta(chunkId))
+ }
+ true
+ } catch {
+ case e: Exception =>
+ // If we see an exception with reading a local merged block, we
fallback to
+ // fetch the original unmerged blocks. We do not report block fetch
failure
+ // and will continue with the remaining local block read.
+ logWarning(s"Error occurred while fetching local merged block, " +
+ s"prepare to fetch the original blocks", e)
+ iterator.addToResultsQueue(
+ IgnoreFetchResult(blockId, blockManagerId, 0, isNetworkReqDone =
false))
+ false
+ }
+ }
+
+ /**
+ * Initiate fetching fallback blocks for a merged block (or a merged block
chunk) that's failed
+ * to fetch.
+ * It calls out to map output tracker to get the list of original blocks for
the
+ * given merged blocks, split them into remote and local blocks, and process
them
+ * accordingly.
+ * The fallback happens when:
+ * 1. There is an exception while creating shuffle block chunk from local
merged shuffle block.
+ * See fetchLocalBlock.
+ * 2. There is a failure when fetching remote shuffle block chunks.
+ * 3. There is a failure when processing SuccessFetchResult which is for a
shuffle chunk
+ * (local or remote).
+ *
+ * @return number of blocks processed
+ */
+ def initiateFallbackBlockFetchForMergedBlock(
+ blockId: BlockId,
+ address: BlockManagerId): Int = {
+ logWarning(s"Falling back to fetch the original unmerged blocks for merged
block $blockId")
+ // Increase the blocks processed since we will process another block in
the next iteration of
+ // the while loop in ShuffleBlockFetcherIterator.next().
+ var blocksProcessed = 1
+ val fallbackBlocksByAddr: Iterator[(BlockManagerId, Seq[(BlockId, Long,
Int)])] =
+ if (blockId.isShuffle) {
+ val shuffleBlockId = blockId.asInstanceOf[ShuffleBlockId]
+ mapOutputTracker.getMapSizesForMergeResult(
+ shuffleBlockId.shuffleId, shuffleBlockId.reduceId)
+ } else {
+ val shuffleChunkId = blockId.asInstanceOf[ShuffleBlockChunkId]
+ val chunkBitmap: RoaringBitmap =
chunksMetaMap.remove(shuffleChunkId).orNull
+ if (isNotExecutorOrMergedLocal(address)) {
Review comment:
Could you add some comments to help to understand the logic here better?
##########
File path:
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
##########
@@ -1124,4 +1392,298 @@ object ShuffleBlockFetcherIterator {
*/
private[storage]
case class DeferFetchRequestResult(fetchRequest: FetchRequest) extends
FetchResult
+
+ /**
+ * Result of a fetch from a remote merged block unsuccessfully.
+ * Instead of treating this as a FailureFetchResult, we ignore this failure
+ * and fallback to fetch the original unmerged blocks.
+ * @param blockId block id
+ * @param address BlockManager that the merged block was attempted to be
fetched from
+ * @param size size of the block, used to update bytesInFlight.
+ * @param isNetworkReqDone Is this the last network request for this host in
this fetch
+ * request. Used to update reqsInFlight.
+ */
+ private[storage] case class IgnoreFetchResult(blockId: BlockId,
+ address: BlockManagerId,
+ size: Long,
+ isNetworkReqDone: Boolean) extends FetchResult
+
+ /**
+ * Result of a successful fetch of meta information for a merged block.
+ *
+ * @param shuffleId shuffle id.
+ * @param reduceId reduce id.
+ * @param blockSize size of each merged block.
+ * @param numChunks number of chunks in the merged block.
+ * @param bitmaps bitmaps for every chunk.
+ * @param address BlockManager that the merged status was fetched
from.
+ */
+ private[storage] case class MergedBlocksMetaFetchResult(
+ shuffleId: Int,
+ reduceId: Int,
+ blockSize: Long,
+ numChunks: Int,
+ bitmaps: Array[RoaringBitmap],
+ address: BlockManagerId,
+ blockId: BlockId = DUMMY_SHUFFLE_BLOCK_ID) extends FetchResult
+
+ /**
+ * Result of a failure while fetching the meta information for a merged
block.
+ *
+ * @param shuffleId shuffle id.
+ * @param reduceId reduce id.
+ * @param address BlockManager that the merged status was fetched from.
+ */
+ private[storage] case class MergedBlocksMetaFailedFetchResult(
+ shuffleId: Int,
+ reduceId: Int,
+ address: BlockManagerId,
+ blockId: BlockId = DUMMY_SHUFFLE_BLOCK_ID) extends FetchResult
+}
+
+/**
+ * Helper class that encapsulates all the push-based functionality to fetch
merged block meta
+ * and merged shuffle block chunks.
+ */
+private class PushBasedFetchHelper(
+ private val iterator: ShuffleBlockFetcherIterator,
+ private val shuffleClient: BlockStoreClient,
+ private val blockManager: BlockManager,
+ private val mapOutputTracker: MapOutputTracker) extends Logging {
+
+ private[this] val startTimeNs = System.nanoTime()
+
+ private[this] val localShuffleMergerBlockMgrId = BlockManagerId(
+ SHUFFLE_MERGER_IDENTIFIER, blockManager.blockManagerId.host,
+ blockManager.blockManagerId.port, blockManager.blockManagerId.topologyInfo)
+
+ /** A map for storing merged block shuffle chunk bitmap */
+ private[this] val chunksMetaMap = new mutable.HashMap[ShuffleBlockChunkId,
RoaringBitmap]()
+
+ /**
+ * Returns true if the address is for a push-merged block.
+ */
+ def isMergedShuffleBlockAddress(address: BlockManagerId): Boolean = {
+ SHUFFLE_MERGER_IDENTIFIER.equals(address.executorId)
+ }
+
+ /**
+ * Returns true if the address is not of executor local or merged local
block. false otherwise.
+ */
+ def isNotExecutorOrMergedLocal(address: BlockManagerId): Boolean = {
+ (isMergedShuffleBlockAddress(address) && address.host !=
blockManager.blockManagerId.host) ||
+ (!isMergedShuffleBlockAddress(address) && address !=
blockManager.blockManagerId)
+ }
+
+ /**
+ * Returns true if the address if of merged local block. false otherwise.
+ */
+ def isMergedLocal(address: BlockManagerId): Boolean = {
+ isMergedShuffleBlockAddress(address) && address.host ==
blockManager.blockManagerId.host
+ }
+
+ def getNumberOfBlocksInChunk(blockId : ShuffleBlockChunkId): Int = {
+ chunksMetaMap(blockId).getCardinality
+ }
+
+ def removeChunk(blockId: ShuffleBlockChunkId): Unit = {
+ chunksMetaMap.remove(blockId)
+ }
+
+ def createChunkBlockInfosFromMetaResponse(
+ shuffleId: Int,
+ reduceId: Int,
+ blockSize: Long,
+ numChunks: Int,
+ bitmaps: Array[RoaringBitmap]): ArrayBuffer[(BlockId, Long, Int)] = {
+ val approxChunkSize = blockSize / numChunks
+ val blocksToRequest: ArrayBuffer[(BlockId, Long, Int)] =
+ new ArrayBuffer[(BlockId, Long, Int)]()
+ for (i <- 0 until numChunks) {
+ val blockChunkId = ShuffleBlockChunkId(shuffleId, reduceId, i)
+ chunksMetaMap.put(blockChunkId, bitmaps(i))
+ logDebug(s"adding block chunk $blockChunkId of size $approxChunkSize")
+ blocksToRequest += ((blockChunkId, approxChunkSize, SHUFFLE_PUSH_MAP_ID))
+ }
+ blocksToRequest
+ }
+
+ def sendFetchMergedStatusRequest(req: FetchRequest): Unit = {
+ val sizeMap = req.blocks.map {
+ case FetchBlockInfo(blockId, size, _) =>
+ val shuffleBlockId = blockId.asInstanceOf[ShuffleBlockId]
+ ((shuffleBlockId.shuffleId, shuffleBlockId.reduceId), size)}.toMap
+ val address = req.address
+ val mergedBlocksMetaListener = new MergedBlocksMetaListener {
+ override def onSuccess(shuffleId: Int, reduceId: Int, meta:
MergedBlockMeta): Unit = {
+ logInfo(s"Received the meta of merged block for ($shuffleId,
$reduceId) " +
+ s"from ${req.address.host}:${req.address.port}")
+ try {
+ iterator.addToResultsQueue(MergedBlocksMetaFetchResult(shuffleId,
reduceId,
+ sizeMap(shuffleId, reduceId), meta.getNumChunks,
meta.readChunkBitmaps(), address))
Review comment:
nit: `sizeMap(shuffleId, reduceId)` -> `sizeMap((shuffleId, reduceId))`
(I know there's scala compiler doesn't support the former syntax for the
tuple.)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]