mridulm commented on a change in pull request #32140:
URL: https://github.com/apache/spark/pull/32140#discussion_r656858416
##########
File path:
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
##########
@@ -712,38 +799,63 @@ final class ShuffleBlockFetcherIterator(
case e: IOException => logError("Failed to create input stream
from local block", e)
}
buf.release()
- throwFetchFailedException(blockId, mapIndex, address, e)
- }
- try {
- input = streamWrapper(blockId, in)
- // If the stream is compressed or wrapped, then we optionally
decompress/unwrap the
- // first maxBytesInFlight/3 bytes into memory, to check for
corruption in that portion
- // of the data. But even if 'detectCorruptUseExtraMemory'
configuration is off, or if
- // the corruption is later, we'll still detect the corruption
later in the stream.
- streamCompressedOrEncrypted = !input.eq(in)
- if (streamCompressedOrEncrypted && detectCorruptUseExtraMemory) {
- // TODO: manage the memory used here, and spill it into disk in
case of OOM.
- input = Utils.copyStreamUpTo(input, maxBytesInFlight / 3)
- }
- } catch {
- case e: IOException =>
- buf.release()
- if (buf.isInstanceOf[FileSegmentManagedBuffer]
- || corruptedBlocks.contains(blockId)) {
- throwFetchFailedException(blockId, mapIndex, address, e)
- } else {
- logWarning(s"got an corrupted block $blockId from $address,
fetch again", e)
- corruptedBlocks += blockId
- fetchRequests += FetchRequest(
- address, Array(FetchBlockInfo(blockId, size, mapIndex)))
+ if (blockId.isShuffleChunk) {
+
pushBasedFetchHelper.initiateFallbackFetchForPushMergedBlock(blockId, address)
+ // Set result to null to trigger another iteration of the
while loop to get either.
result = null
+ null
+ } else {
+ throwFetchFailedException(blockId, mapIndex, address, e)
+ }
+ }
+ if (in != null) {
+ try {
+ input = streamWrapper(blockId, in)
+ // If the stream is compressed or wrapped, then we optionally
decompress/unwrap the
+ // first maxBytesInFlight/3 bytes into memory, to check for
corruption in that portion
+ // of the data. But even if 'detectCorruptUseExtraMemory'
configuration is off, or if
+ // the corruption is later, we'll still detect the corruption
later in the stream.
+ streamCompressedOrEncrypted = !input.eq(in)
+ if (streamCompressedOrEncrypted && detectCorruptUseExtraMemory) {
+ // TODO: manage the memory used here, and spill it into disk
in case of OOM.
+ input = Utils.copyStreamUpTo(input, maxBytesInFlight / 3)
Review comment:
What I am trying to understand is, we will be initiating a fallback and
discarding merge even though there is nothing really wrong here - other than
the fact that chunk was too small to decompress - right ? (in case chunk was
split at a boundary which causes decompression to fail).
Want to make sure I am not missing something here.
The 2mb is configurable, so we can have `maxBytesInFlight` and chunk sizes
of different sizes - do we enforce any constraint on these to prevent this sort
of issue ?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]