otterc commented on a change in pull request #32140:
URL: https://github.com/apache/spark/pull/32140#discussion_r660704248



##########
File path: 
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
##########
@@ -767,6 +878,83 @@ final class ShuffleBlockFetcherIterator(
             deferredFetchRequests.getOrElseUpdate(address, new 
Queue[FetchRequest]())
           defReqQueue.enqueue(request)
           result = null
+
+        case FallbackOnPushMergedFailureResult(blockId, address, size, 
isNetworkReqDone) =>
+          // We get this result in 3 cases:
+          // 1. Failure to fetch the data of a remote shuffle chunk. In this 
case, the
+          //    blockId is a ShuffleBlockChunkId.
+          // 2. Failure to read the local push-merged meta. In this case, the 
blockId is
+          //    ShuffleBlockId.
+          // 3. Failure to get the local push-merged directories from the ESS. 
In this case, the
+          //    blockId is ShuffleBlockId.
+          if (pushBasedFetchHelper.isRemotePushMergedBlockAddress(address)) {
+            numBlocksInFlightPerAddress(address) = 
numBlocksInFlightPerAddress(address) - 1
+            bytesInFlight -= size
+          }
+          if (isNetworkReqDone) {
+            reqsInFlight -= 1
+            logDebug("Number of requests in flight " + reqsInFlight)
+          }
+          
pushBasedFetchHelper.initiateFallbackFetchForPushMergedBlock(blockId, address)
+          // Set result to null to trigger another iteration of the while loop 
to get either
+          // a SuccessFetchResult or a FailureFetchResult.
+          result = null
+
+          case PushMergedLocalMetaFetchResult(shuffleId, reduceId, bitmaps, 
localDirs, _) =>
+            // Fetch local push-merged shuffle block data as multiple shuffle 
chunks
+            val shuffleBlockId = ShuffleBlockId(shuffleId, 
SHUFFLE_PUSH_MAP_ID, reduceId)
+            try {
+              val bufs: Seq[ManagedBuffer] = 
blockManager.getLocalMergedBlockData(shuffleBlockId,
+                localDirs)
+              // Since the request for local block meta completed 
successfully, numBlocksToFetch
+              // is decremented.
+              numBlocksToFetch -= 1
+              // Update total number of blocks to fetch, reflecting the 
multiple local shuffle
+              // chunks.
+              numBlocksToFetch += bufs.size
+              bufs.zipWithIndex.foreach { case (buf, chunkId) =>
+                buf.retain()
+                val shuffleChunkId = ShuffleBlockChunkId(shuffleId, reduceId, 
chunkId)
+                pushBasedFetchHelper.addChunk(shuffleChunkId, bitmaps(chunkId))
+                results.put(SuccessFetchResult(shuffleChunkId, 
SHUFFLE_PUSH_MAP_ID,
+                  pushBasedFetchHelper.localShuffleMergerBlockMgrId, 
buf.size(), buf,
+                  isNetworkReqDone = false))
+              }
+            } catch {
+              case e: Exception =>
+                // If we see an exception with reading local push-merged data, 
we fallback to

Review comment:
       There could be IOException while reading either the index or the data 
file which would be caught here. I can mention explicitly `push-merged 
data/index file`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to