mridulm commented on a change in pull request #32140:
URL: https://github.com/apache/spark/pull/32140#discussion_r656858416



##########
File path: 
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
##########
@@ -712,38 +799,63 @@ final class ShuffleBlockFetcherIterator(
                 case e: IOException => logError("Failed to create input stream 
from local block", e)
               }
               buf.release()
-              throwFetchFailedException(blockId, mapIndex, address, e)
-          }
-          try {
-            input = streamWrapper(blockId, in)
-            // If the stream is compressed or wrapped, then we optionally 
decompress/unwrap the
-            // first maxBytesInFlight/3 bytes into memory, to check for 
corruption in that portion
-            // of the data. But even if 'detectCorruptUseExtraMemory' 
configuration is off, or if
-            // the corruption is later, we'll still detect the corruption 
later in the stream.
-            streamCompressedOrEncrypted = !input.eq(in)
-            if (streamCompressedOrEncrypted && detectCorruptUseExtraMemory) {
-              // TODO: manage the memory used here, and spill it into disk in 
case of OOM.
-              input = Utils.copyStreamUpTo(input, maxBytesInFlight / 3)
-            }
-          } catch {
-            case e: IOException =>
-              buf.release()
-              if (buf.isInstanceOf[FileSegmentManagedBuffer]
-                  || corruptedBlocks.contains(blockId)) {
-                throwFetchFailedException(blockId, mapIndex, address, e)
-              } else {
-                logWarning(s"got an corrupted block $blockId from $address, 
fetch again", e)
-                corruptedBlocks += blockId
-                fetchRequests += FetchRequest(
-                  address, Array(FetchBlockInfo(blockId, size, mapIndex)))
+              if (blockId.isShuffleChunk) {
+                
pushBasedFetchHelper.initiateFallbackFetchForPushMergedBlock(blockId, address)
+                // Set result to null to trigger another iteration of the 
while loop to get either.
                 result = null
+                null
+              } else {
+                throwFetchFailedException(blockId, mapIndex, address, e)
+              }
+          }
+          if (in != null) {
+            try {
+              input = streamWrapper(blockId, in)
+              // If the stream is compressed or wrapped, then we optionally 
decompress/unwrap the
+              // first maxBytesInFlight/3 bytes into memory, to check for 
corruption in that portion
+              // of the data. But even if 'detectCorruptUseExtraMemory' 
configuration is off, or if
+              // the corruption is later, we'll still detect the corruption 
later in the stream.
+              streamCompressedOrEncrypted = !input.eq(in)
+              if (streamCompressedOrEncrypted && detectCorruptUseExtraMemory) {
+                // TODO: manage the memory used here, and spill it into disk 
in case of OOM.
+                input = Utils.copyStreamUpTo(input, maxBytesInFlight / 3)

Review comment:
       What I am trying to understand is, we will be initiating a fallback and 
discarding merge even though there is nothing really wrong here - other than 
the fact that chunk was too small to decompress - right ? (in case chunk was 
split at a boundary which causes decompression to fail).
   Want to make sure I am not missing something here.
   
   The 2mb is configurable, so we can have `maxBytesInFlight` and chunk sizes 
of different sizes - do we enforce any constraint on these to prevent this sort 
of issue ?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to