Ngone51 commented on a change in pull request #32140:
URL: https://github.com/apache/spark/pull/32140#discussion_r660322354
##########
File path:
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
##########
@@ -1124,4 +1403,67 @@ object ShuffleBlockFetcherIterator {
*/
private[storage]
case class DeferFetchRequestResult(fetchRequest: FetchRequest) extends
FetchResult
+
+ /**
+ * Result of an un-successful fetch of either of these:
+ * 1) Remote shuffle chunk.
+ * 2) Local push-merged block.
+ *
+ * Instead of treating this as a [[FailureFetchResult]], we fallback to
fetch the original blocks.
+ *
+ * @param blockId block id
+ * @param address BlockManager that the push-merged block was attempted to
be fetched from
+ * @param size size of the block, used to update bytesInFlight.
+ * @param isNetworkReqDone Is this the last network request for this host in
this fetch
+ * request. Used to update reqsInFlight.
+ */
+ private[storage] case class FallbackOnPushMergedFailureResult(blockId:
BlockId,
+ address: BlockManagerId,
+ size: Long,
+ isNetworkReqDone: Boolean) extends FetchResult
+
+ /**
+ * Result of a successful fetch of meta information for a remote push-merged
block.
+ *
+ * @param shuffleId shuffle id.
+ * @param reduceId reduce id.
+ * @param blockSize size of each push-merged block.
+ * @param bitmaps bitmaps for every chunk.
+ * @param address BlockManager that the meta was fetched from.
+ */
+ private[storage] case class PushMergedRemoteMetaFetchResult(
+ shuffleId: Int,
+ reduceId: Int,
+ blockSize: Long,
+ bitmaps: Array[RoaringBitmap],
+ address: BlockManagerId,
+ blockId: BlockId = DUMMY_SHUFFLE_BLOCK_ID) extends FetchResult
+
+ /**
+ * Result of a failure while fetching the meta information for a remote
push-merged block.
+ *
+ * @param shuffleId shuffle id.
+ * @param reduceId reduce id.
+ * @param address BlockManager that the meta was fetched from.
+ */
+ private[storage] case class PushMergedRemoteMetaFailedFetchResult(
+ shuffleId: Int,
+ reduceId: Int,
+ address: BlockManagerId,
+ blockId: BlockId = DUMMY_SHUFFLE_BLOCK_ID) extends FetchResult
+
+ /**
+ * Result of a successful fetch of meta information for a local push-merged
block.
+ *
+ * @param shuffleId shuffle id.
+ * @param reduceId reduce id.
+ * @param bitmaps bitmaps for every chunk.
+ * @param localDirs local directories where the push-merged shuffle files
are storedl
+ */
+ private[storage] case class PushMergedLocalMetaFetchResult(
+ shuffleId: Int,
+ reduceId: Int,
+ bitmaps: Array[RoaringBitmap],
+ localDirs: Array[String],
+ blockId: BlockId = DUMMY_SHUFFLE_BLOCK_ID) extends FetchResult
Review comment:
`blockId` is never used?
##########
File path:
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
##########
@@ -57,6 +59,8 @@ import org.apache.spark.util.{CompletionIterator,
TaskCompletionListener, Utils}
* block, which indicate the index in the map stage.
* Note that zero-sized blocks are already excluded,
which happened in
*
[[org.apache.spark.MapOutputTracker.convertMapStatuses]].
+ * @param mapOutputTracker [[MapOutputTracker]] for falling back to fetching
the original blocks if
+ * we fail to fetch shuffle chunks when push based
shuffle is enabled.
Review comment:
nit: indents
##########
File path:
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
##########
@@ -386,40 +415,53 @@ final class ShuffleBlockFetcherIterator(
}
val (remoteBlockBytes, numRemoteBlocks) =
collectedRemoteRequests.foldLeft((0L, 0))((x, y) => (x._1 + y.size, x._2
+ y.blocks.size))
- val totalBytes = localBlockBytes + remoteBlockBytes + hostLocalBlockBytes
- assert(numBlocksToFetch == localBlocks.size + hostLocalBlocks.size +
numRemoteBlocks,
- s"The number of non-empty blocks $numBlocksToFetch doesn't equal to the
number of local " +
- s"blocks ${localBlocks.size} + the number of host-local blocks
${hostLocalBlocks.size} " +
- s"+ the number of remote blocks ${numRemoteBlocks}.")
- logInfo(s"Getting $numBlocksToFetch (${Utils.bytesToString(totalBytes)})
non-empty blocks " +
- s"including ${localBlocks.size}
(${Utils.bytesToString(localBlockBytes)}) local and " +
- s"${hostLocalBlocks.size} (${Utils.bytesToString(hostLocalBlockBytes)})
" +
- s"host-local and $numRemoteBlocks
(${Utils.bytesToString(remoteBlockBytes)}) remote blocks")
+ val totalBytes = localBlockBytes + remoteBlockBytes + hostLocalBlockBytes +
+ pushMergedLocalBlockBytes
+ val blocksToFetchCurrentIteration = numBlocksToFetch - prevNumBlocksToFetch
+ assert(blocksToFetchCurrentIteration == localBlocks.size +
+ hostLocalBlocksCurrentIteration.size + numRemoteBlocks +
pushMergedLocalBlocks.size,
+ s"The number of non-empty blocks $blocksToFetchCurrentIteration doesn't
equal to " +
+ s"the number of local blocks ${localBlocks.size} + " +
+ s"the number of host-local blocks
${hostLocalBlocksCurrentIteration.size} " +
+ s"the number of push-merged-local blocks ${pushMergedLocalBlocks.size}
" +
+ s"+ the number of remote blocks ${numRemoteBlocks} ")
+ logInfo(s"Getting $blocksToFetchCurrentIteration " +
+ s"(${Utils.bytesToString(totalBytes)}) non-empty blocks including " +
+ s"${localBlocks.size} (${Utils.bytesToString(localBlockBytes)}) local
and " +
+ s"${hostLocalBlocksCurrentIteration.size}
(${Utils.bytesToString(hostLocalBlockBytes)}) " +
+ s"host-local and ${pushMergedLocalBlocks.size} " +
+ s"(${Utils.bytesToString(pushMergedLocalBlockBytes)}) " +
+ s"local push-merged and $numRemoteBlocks
(${Utils.bytesToString(remoteBlockBytes)}) " +
Review comment:
maybe, to be consistent with `push-merged-local` in all places?
##########
File path:
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
##########
@@ -767,6 +878,83 @@ final class ShuffleBlockFetcherIterator(
deferredFetchRequests.getOrElseUpdate(address, new
Queue[FetchRequest]())
defReqQueue.enqueue(request)
result = null
+
+ case FallbackOnPushMergedFailureResult(blockId, address, size,
isNetworkReqDone) =>
+ // We get this result in 3 cases:
+ // 1. Failure to fetch the data of a remote shuffle chunk. In this
case, the
+ // blockId is a ShuffleBlockChunkId.
+ // 2. Failure to read the local push-merged meta. In this case, the
blockId is
+ // ShuffleBlockId.
+ // 3. Failure to get the local push-merged directories from the ESS.
In this case, the
+ // blockId is ShuffleBlockId.
+ if (pushBasedFetchHelper.isRemotePushMergedBlockAddress(address)) {
+ numBlocksInFlightPerAddress(address) =
numBlocksInFlightPerAddress(address) - 1
+ bytesInFlight -= size
+ }
+ if (isNetworkReqDone) {
+ reqsInFlight -= 1
+ logDebug("Number of requests in flight " + reqsInFlight)
+ }
+
pushBasedFetchHelper.initiateFallbackFetchForPushMergedBlock(blockId, address)
+ // Set result to null to trigger another iteration of the while loop
to get either
+ // a SuccessFetchResult or a FailureFetchResult.
+ result = null
+
+ case PushMergedLocalMetaFetchResult(shuffleId, reduceId, bitmaps,
localDirs, _) =>
+ // Fetch local push-merged shuffle block data as multiple shuffle
chunks
+ val shuffleBlockId = ShuffleBlockId(shuffleId,
SHUFFLE_PUSH_MAP_ID, reduceId)
+ try {
+ val bufs: Seq[ManagedBuffer] =
blockManager.getLocalMergedBlockData(shuffleBlockId,
+ localDirs)
+ // Since the request for local block meta completed
successfully, numBlocksToFetch
+ // is decremented.
+ numBlocksToFetch -= 1
+ // Update total number of blocks to fetch, reflecting the
multiple local shuffle
+ // chunks.
+ numBlocksToFetch += bufs.size
+ bufs.zipWithIndex.foreach { case (buf, chunkId) =>
+ buf.retain()
+ val shuffleChunkId = ShuffleBlockChunkId(shuffleId, reduceId,
chunkId)
+ pushBasedFetchHelper.addChunk(shuffleChunkId, bitmaps(chunkId))
+ results.put(SuccessFetchResult(shuffleChunkId,
SHUFFLE_PUSH_MAP_ID,
+ pushBasedFetchHelper.localShuffleMergerBlockMgrId,
buf.size(), buf,
+ isNetworkReqDone = false))
+ }
+ } catch {
+ case e: Exception =>
+ // If we see an exception with reading local push-merged data,
we fallback to
Review comment:
I think we can only see the exception with reading the push-merged index
file rather than data file, right?
##########
File path:
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
##########
@@ -386,40 +415,53 @@ final class ShuffleBlockFetcherIterator(
}
val (remoteBlockBytes, numRemoteBlocks) =
collectedRemoteRequests.foldLeft((0L, 0))((x, y) => (x._1 + y.size, x._2
+ y.blocks.size))
- val totalBytes = localBlockBytes + remoteBlockBytes + hostLocalBlockBytes
- assert(numBlocksToFetch == localBlocks.size + hostLocalBlocks.size +
numRemoteBlocks,
- s"The number of non-empty blocks $numBlocksToFetch doesn't equal to the
number of local " +
- s"blocks ${localBlocks.size} + the number of host-local blocks
${hostLocalBlocks.size} " +
- s"+ the number of remote blocks ${numRemoteBlocks}.")
- logInfo(s"Getting $numBlocksToFetch (${Utils.bytesToString(totalBytes)})
non-empty blocks " +
- s"including ${localBlocks.size}
(${Utils.bytesToString(localBlockBytes)}) local and " +
- s"${hostLocalBlocks.size} (${Utils.bytesToString(hostLocalBlockBytes)})
" +
- s"host-local and $numRemoteBlocks
(${Utils.bytesToString(remoteBlockBytes)}) remote blocks")
+ val totalBytes = localBlockBytes + remoteBlockBytes + hostLocalBlockBytes +
+ pushMergedLocalBlockBytes
+ val blocksToFetchCurrentIteration = numBlocksToFetch - prevNumBlocksToFetch
+ assert(blocksToFetchCurrentIteration == localBlocks.size +
+ hostLocalBlocksCurrentIteration.size + numRemoteBlocks +
pushMergedLocalBlocks.size,
+ s"The number of non-empty blocks $blocksToFetchCurrentIteration doesn't
equal to " +
+ s"the number of local blocks ${localBlocks.size} + " +
+ s"the number of host-local blocks
${hostLocalBlocksCurrentIteration.size} " +
+ s"the number of push-merged-local blocks ${pushMergedLocalBlocks.size}
" +
+ s"+ the number of remote blocks ${numRemoteBlocks} ")
+ logInfo(s"Getting $blocksToFetchCurrentIteration " +
+ s"(${Utils.bytesToString(totalBytes)}) non-empty blocks including " +
+ s"${localBlocks.size} (${Utils.bytesToString(localBlockBytes)}) local
and " +
+ s"${hostLocalBlocksCurrentIteration.size}
(${Utils.bytesToString(hostLocalBlockBytes)}) " +
+ s"host-local and ${pushMergedLocalBlocks.size} " +
+ s"(${Utils.bytesToString(pushMergedLocalBlockBytes)}) " +
+ s"local push-merged and $numRemoteBlocks
(${Utils.bytesToString(remoteBlockBytes)}) " +
+ s"remote blocks")
+ this.hostLocalBlocks ++= hostLocalBlocksCurrentIteration
Review comment:
Shall we reuse `hostLocalBlocksByExecutor` here? e.g.,
```scala
this.hostLocalBlocks ++= hostLocalBlocksByExecutor.values
.flatMap { infos => infos.map(info => (info._1, info._3)) }
```
so we can get rid of `hostLocalBlocksCurrentIteration`.
##########
File path:
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
##########
@@ -871,6 +1063,82 @@ final class ShuffleBlockFetcherIterator(
"Failed to get block " + blockId + ", which is not a shuffle block",
e)
}
}
+
+ /**
+ * All the below methods are used by [[PushBasedFetchHelper]] to communicate
with the iterator
+ */
+ private[storage] def addToResultsQueue(result: FetchResult): Unit = {
+ results.put(result)
+ }
+
+ private[storage] def incrementNumBlocksToFetch(moreBlocksToFetch: Int): Unit
= {
Review comment:
This looks like only decrease `numBlocksToFetch` indeed. Shall we rename
it to `decreaseNumBlocksToFetch`?
##########
File path:
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
##########
@@ -386,40 +415,53 @@ final class ShuffleBlockFetcherIterator(
}
val (remoteBlockBytes, numRemoteBlocks) =
collectedRemoteRequests.foldLeft((0L, 0))((x, y) => (x._1 + y.size, x._2
+ y.blocks.size))
- val totalBytes = localBlockBytes + remoteBlockBytes + hostLocalBlockBytes
- assert(numBlocksToFetch == localBlocks.size + hostLocalBlocks.size +
numRemoteBlocks,
- s"The number of non-empty blocks $numBlocksToFetch doesn't equal to the
number of local " +
- s"blocks ${localBlocks.size} + the number of host-local blocks
${hostLocalBlocks.size} " +
- s"+ the number of remote blocks ${numRemoteBlocks}.")
- logInfo(s"Getting $numBlocksToFetch (${Utils.bytesToString(totalBytes)})
non-empty blocks " +
- s"including ${localBlocks.size}
(${Utils.bytesToString(localBlockBytes)}) local and " +
- s"${hostLocalBlocks.size} (${Utils.bytesToString(hostLocalBlockBytes)})
" +
- s"host-local and $numRemoteBlocks
(${Utils.bytesToString(remoteBlockBytes)}) remote blocks")
+ val totalBytes = localBlockBytes + remoteBlockBytes + hostLocalBlockBytes +
+ pushMergedLocalBlockBytes
+ val blocksToFetchCurrentIteration = numBlocksToFetch - prevNumBlocksToFetch
+ assert(blocksToFetchCurrentIteration == localBlocks.size +
+ hostLocalBlocksCurrentIteration.size + numRemoteBlocks +
pushMergedLocalBlocks.size,
+ s"The number of non-empty blocks $blocksToFetchCurrentIteration doesn't
equal to " +
Review comment:
`... doesn't equal to the sum of ...` ?
##########
File path:
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
##########
@@ -871,6 +1063,82 @@ final class ShuffleBlockFetcherIterator(
"Failed to get block " + blockId + ", which is not a shuffle block",
e)
}
}
+
+ /**
+ * All the below methods are used by [[PushBasedFetchHelper]] to communicate
with the iterator
+ */
+ private[storage] def addToResultsQueue(result: FetchResult): Unit = {
+ results.put(result)
+ }
+
+ private[storage] def incrementNumBlocksToFetch(moreBlocksToFetch: Int): Unit
= {
+ numBlocksToFetch += moreBlocksToFetch
+ }
+
+ /**
+ * Currently used by [[PushBasedFetchHelper]] to fetch fallback blocks when
there is a fetch
+ * failure related to a push-merged block or shuffle chunk.
+ * This is executed by the task thread when the `iterator.next()` is invoked
and if that initiates
+ * fallback.
+ */
+ private[storage] def fallbackFetch(
+ originalBlocksByAddr: Iterator[(BlockManagerId, Seq[(BlockId, Long,
Int)])]): Unit = {
+ val originalLocalBlocks = mutable.LinkedHashSet[(BlockId, Int)]()
+ val originalHostLocalBlocksByExecutor =
+ mutable.LinkedHashMap[BlockManagerId, Seq[(BlockId, Long, Int)]]()
+ val originalMergedLocalBlocks = mutable.LinkedHashSet[BlockId]()
+ val originalRemoteReqs = partitionBlocksByFetchMode(originalBlocksByAddr,
+ originalLocalBlocks, originalHostLocalBlocksByExecutor,
originalMergedLocalBlocks)
+ // Add the remote requests into our queue in a random order
+ fetchRequests ++= Utils.randomize(originalRemoteReqs)
+ logInfo(s"Started ${originalRemoteReqs.size} fallback remote requests for
push-merged")
Review comment:
"Started" -> "Created"?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]