otterc commented on a change in pull request #32140:
URL: https://github.com/apache/spark/pull/32140#discussion_r657129548
##########
File path:
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
##########
@@ -712,38 +799,63 @@ final class ShuffleBlockFetcherIterator(
case e: IOException => logError("Failed to create input stream
from local block", e)
}
buf.release()
- throwFetchFailedException(blockId, mapIndex, address, e)
- }
- try {
- input = streamWrapper(blockId, in)
- // If the stream is compressed or wrapped, then we optionally
decompress/unwrap the
- // first maxBytesInFlight/3 bytes into memory, to check for
corruption in that portion
- // of the data. But even if 'detectCorruptUseExtraMemory'
configuration is off, or if
- // the corruption is later, we'll still detect the corruption
later in the stream.
- streamCompressedOrEncrypted = !input.eq(in)
- if (streamCompressedOrEncrypted && detectCorruptUseExtraMemory) {
- // TODO: manage the memory used here, and spill it into disk in
case of OOM.
- input = Utils.copyStreamUpTo(input, maxBytesInFlight / 3)
- }
- } catch {
- case e: IOException =>
- buf.release()
- if (buf.isInstanceOf[FileSegmentManagedBuffer]
- || corruptedBlocks.contains(blockId)) {
- throwFetchFailedException(blockId, mapIndex, address, e)
- } else {
- logWarning(s"got an corrupted block $blockId from $address,
fetch again", e)
- corruptedBlocks += blockId
- fetchRequests += FetchRequest(
- address, Array(FetchBlockInfo(blockId, size, mapIndex)))
+ if (blockId.isShuffleChunk) {
+
pushBasedFetchHelper.initiateFallbackFetchForPushMergedBlock(blockId, address)
+ // Set result to null to trigger another iteration of the
while loop to get either.
result = null
+ null
+ } else {
+ throwFetchFailedException(blockId, mapIndex, address, e)
+ }
+ }
+ if (in != null) {
+ try {
+ input = streamWrapper(blockId, in)
+ // If the stream is compressed or wrapped, then we optionally
decompress/unwrap the
+ // first maxBytesInFlight/3 bytes into memory, to check for
corruption in that portion
+ // of the data. But even if 'detectCorruptUseExtraMemory'
configuration is off, or if
+ // the corruption is later, we'll still detect the corruption
later in the stream.
+ streamCompressedOrEncrypted = !input.eq(in)
+ if (streamCompressedOrEncrypted && detectCorruptUseExtraMemory) {
+ // TODO: manage the memory used here, and spill it into disk
in case of OOM.
+ input = Utils.copyStreamUpTo(input, maxBytesInFlight / 3)
Review comment:
> What I am trying to understand is, we will be initiating a fallback
and discarding merge even though there is nothing really wrong here - other
than the fact that chunk was too small to decompress - right ? (in case chunk
was split at a boundary which causes decompression to fail).
A shuffle merged chunk contains shuffle blocks in its entirety. It will
never contain a partial shuffle block. This is documented for the configuration
`minChunkSizeInMergedShuffleFile`
```
/**
* The minimum size of a chunk when dividing a merged shuffle file into
multiple chunks during
* push-based shuffle.
* A merged shuffle file consists of multiple small shuffle blocks.
Fetching the
* complete merged shuffle file in a single response increases the memory
requirements for the
* clients. Instead of serving the entire merged file, the shuffle service
serves the
* merged file in `chunks`. A `chunk` constitutes few shuffle blocks in
entirety and this
* configuration controls how big a chunk can get. A corresponding index
file for each merged
* shuffle file will be generated indicating chunk boundaries.
*/
public int minChunkSizeInMergedShuffleFile() {
return Ints.checkedCast(JavaUtils.byteStringAsBytes(
conf.get("spark.shuffle.server.minChunkSizeInMergedShuffleFile",
"2m")));
}
```
So if this fails for a shuffle chunk, it would be because the shuffle chunk
was corrupt.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]