otterc commented on a change in pull request #32140:
URL: https://github.com/apache/spark/pull/32140#discussion_r660771829
##########
File path:
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
##########
@@ -767,6 +878,83 @@ final class ShuffleBlockFetcherIterator(
deferredFetchRequests.getOrElseUpdate(address, new
Queue[FetchRequest]())
defReqQueue.enqueue(request)
result = null
+
+ case FallbackOnPushMergedFailureResult(blockId, address, size,
isNetworkReqDone) =>
+ // We get this result in 3 cases:
+ // 1. Failure to fetch the data of a remote shuffle chunk. In this
case, the
+ // blockId is a ShuffleBlockChunkId.
+ // 2. Failure to read the local push-merged meta. In this case, the
blockId is
+ // ShuffleBlockId.
+ // 3. Failure to get the local push-merged directories from the ESS.
In this case, the
+ // blockId is ShuffleBlockId.
+ if (pushBasedFetchHelper.isRemotePushMergedBlockAddress(address)) {
+ numBlocksInFlightPerAddress(address) =
numBlocksInFlightPerAddress(address) - 1
+ bytesInFlight -= size
+ }
+ if (isNetworkReqDone) {
+ reqsInFlight -= 1
+ logDebug("Number of requests in flight " + reqsInFlight)
+ }
+
pushBasedFetchHelper.initiateFallbackFetchForPushMergedBlock(blockId, address)
+ // Set result to null to trigger another iteration of the while loop
to get either
+ // a SuccessFetchResult or a FailureFetchResult.
+ result = null
+
+ case PushMergedLocalMetaFetchResult(shuffleId, reduceId, bitmaps,
localDirs, _) =>
+ // Fetch local push-merged shuffle block data as multiple shuffle
chunks
+ val shuffleBlockId = ShuffleBlockId(shuffleId,
SHUFFLE_PUSH_MAP_ID, reduceId)
+ try {
+ val bufs: Seq[ManagedBuffer] =
blockManager.getLocalMergedBlockData(shuffleBlockId,
+ localDirs)
+ // Since the request for local block meta completed
successfully, numBlocksToFetch
+ // is decremented.
+ numBlocksToFetch -= 1
+ // Update total number of blocks to fetch, reflecting the
multiple local shuffle
+ // chunks.
+ numBlocksToFetch += bufs.size
+ bufs.zipWithIndex.foreach { case (buf, chunkId) =>
+ buf.retain()
+ val shuffleChunkId = ShuffleBlockChunkId(shuffleId, reduceId,
chunkId)
+ pushBasedFetchHelper.addChunk(shuffleChunkId, bitmaps(chunkId))
+ results.put(SuccessFetchResult(shuffleChunkId,
SHUFFLE_PUSH_MAP_ID,
+ pushBasedFetchHelper.localShuffleMergerBlockMgrId,
buf.size(), buf,
+ isNetworkReqDone = false))
+ }
+ } catch {
+ case e: Exception =>
+ // If we see an exception with reading local push-merged data,
we fallback to
Review comment:
Right, it doesn't read data file. Just creates ManagedBuffers. Will
change the comment
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]