turboFei opened a new pull request #28525: URL: https://github.com/apache/spark/pull/28525
## What changes were proposed in this pull request? We've seen some shuffle data corruption during shuffle read phase. As described in SPARK-26089, spark only checks small shuffle blocks before PR #23453, which is proposed by ankuriitg. There are two changes/improvements that are made in PR #23453. 1. Large blocks are checked upto maxBytesInFlight/3 size in a similar way as smaller blocks, so if a large block is corrupt in the starting, that block will be re-fetched and if that also fails, FetchFailureException will be thrown. 2. If large blocks are corrupt after size maxBytesInFlight/3, then any IOException thrown while reading the stream will be converted to FetchFailureException. This is slightly more aggressive than was originally intended but since the consumer of the stream may have already read some records and processed them, we can't just re-fetch the block, we need to fail the whole task. Additionally, we also thought about maybe adding a new type of TaskEndReason, which would re-try the task couple of times before failing the previous stage, but given the complexity involved in that solution we decided to not proceed in that direction. However, I think there still exists some problems with the current shuffle transmitted data verification mechanism: - For a large block, it is checked upto maxBytesInFlight/3 size when fetching shuffle data. So if a large block is corrupt after size maxBytesInFlight/3, it can not be detected in data fetch phase. This has been described in the previous section. - Only the compressed or wrapped blocks are checked, I think we should also check thease blocks which are not wrapped. This pr complete the verification mechanism for shuffle transmitted data: Firstly, crc32 is choosed for the checksum verification of shuffle data. Crc is also used for checksum verification in hadoop, it is simple and fast. During shuffle write phase, after completing the partitionedFile, we compute the crc32 value for each partition and then write these digests with the indexs into shuffle index file. For the sortShuffleWriter and unsafe shuffle writer, there is only one partitionedFile for a shuffleMapTask, so the compution of digests(compute the digests for each partition depend on the indexs of this partitionedFile) is cheap. For the bypassShuffleWriter, the reduce partitions is little than byPassMergeThreshold, so the cost of digests compution is acceptable. During shuffle read phase, the digest value will be passed with the block data. And we will recompute the digest of the data obtained to compare with the origin digest value. When recomputing the digest of data obtained, it only need an additional buffer(2048Bytes) for computing crc32 value. After recomputing, we will reset the obtained data inputStream, if it is markSupported we only need reset it, otherwise it is a fileSegmentManagerBuffer, we need recreate it. So, I think this verification mechanism proposed for shuffle transmitted data is efficient and complete. ## How was this patch tested? Unit test. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
