turboFei opened a new pull request #28525:
URL: https://github.com/apache/spark/pull/28525


   ## What changes were proposed in this pull request?
   
   We've seen some shuffle data corruption during shuffle read phase. 
   
   As described in SPARK-26089, spark only checks small  shuffle blocks before 
PR #23453, which is proposed by ankuriitg.
   
   There are two changes/improvements that are made in PR #23453.
   
   1. Large blocks are checked upto maxBytesInFlight/3 size in a similar way as 
smaller blocks, so if a
   large block is corrupt in the starting, that block will be re-fetched and if 
that also fails,
   FetchFailureException will be thrown.
   2. If large blocks are corrupt after size maxBytesInFlight/3, then any 
IOException thrown while
   reading the stream will be converted to FetchFailureException. This is 
slightly more aggressive
   than was originally intended but since the consumer of the stream may have 
already read some records and processed them, we can't just re-fetch the block, 
we need to fail the whole task. Additionally, we also thought about maybe 
adding a new type of TaskEndReason, which would re-try the task couple of times 
before failing the previous stage, but given the complexity involved in that 
solution we decided to not proceed in that direction.
   
   However, I think there still exists some problems with the current shuffle 
transmitted data verification mechanism:
   
   - For a large block, it is checked upto  maxBytesInFlight/3 size when 
fetching shuffle data. So if a large block is corrupt after size 
maxBytesInFlight/3, it can not be detected in data fetch phase.  This has been 
described in the previous section.
   - Only the compressed or wrapped blocks are checked, I think we should also 
check thease blocks which are not wrapped.
   
   This pr complete the verification mechanism for shuffle transmitted data:
   
   Firstly, crc32 is choosed for the checksum verification  of shuffle data.
   
   Crc is also used for checksum verification in hadoop, it is simple and fast.
   
   During shuffle write phase, after completing the partitionedFile, we compute 
   
   the crc32 value for each partition and then write these digests with the 
indexs into shuffle index file.
   
   For the sortShuffleWriter and unsafe shuffle writer, there is only one 
partitionedFile for a shuffleMapTask, so the compution of digests(compute the 
digests for each partition depend on the indexs of this partitionedFile) is  
cheap.
   
   For the bypassShuffleWriter, the reduce partitions is little than 
byPassMergeThreshold, so the cost of digests compution is acceptable.
   
   During shuffle read phase, the digest value will be passed with the block 
data.
   
   And we will recompute the digest of the data obtained to compare with the 
origin digest value.
   When recomputing the digest of data obtained, it only need an additional 
buffer(2048Bytes) for computing crc32 value.
   After recomputing, we will reset the obtained data inputStream, if it is 
markSupported we only need reset it, otherwise it is a 
fileSegmentManagerBuffer, we need recreate it.
   
   So, I think this verification mechanism  proposed for shuffle transmitted 
data is efficient and complete.
   
   ## How was this patch tested?
   
   Unit test.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to