Github user JoshRosen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20179#discussion_r160489460
  
    --- Diff: 
core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala ---
    @@ -196,11 +196,24 @@ private[spark] class IndexShuffleBlockResolver(
         // find out the consolidated file, then the offset within that from 
our index
         val indexFile = getIndexFile(blockId.shuffleId, blockId.mapId)
     
    -    val in = new DataInputStream(new FileInputStream(indexFile))
    +    // SPARK-22982: if this FileInputStream's position is seeked forward 
by another piece of code
    +    // which is incorrectly using our file descriptor then this code will 
fetch the wrong offsets
    +    // (which may cause a reducer to be sent a different reducer's data). 
The explicit position
    +    // checks added here were a useful debugging aid during SPARK-22982 
and may help prevent this
    +    // class of issue from re-occurring in the future which is why they 
are left here even though
    +    // SPARK-22982 is fixed.
    +    val channel = Files.newByteChannel(indexFile.toPath)
    +    channel.position(blockId.reduceId * 8)
    --- End diff --
    
    For some more background: the asynchronous `close()` bug can cause reads 
from a closed-and-subsequently-reassigned file descriptor number and in 
principle this can affect almost any IO operation _anywhere_ in the 
application. For example, if the closed file descriptor number is immediately 
recycled by opening a socket then the invalid read can cause that socket read 
to miss data (since the data would have been consumed by the invalid reader and 
won't be delivered to the legitimate new user of the file descriptor).
    
    Given this, I see how it might be puzzling that this patch is adding a 
check only here. There are two reasons for this:
    
    1. Many other IO operations have implicit checksumming such that dropping 
data due to an invalid read be detected and cause an exception. For example, 
many compression codecs have block-level checksumming (and magic numbers at the 
beginning of the stream), so dropping data (especially at the start of a read) 
will be detected. This particular shuffle index file, however, does _not_ have 
mechanisms to detect corruption: skipping forward in the read by a multiple of 
8 bytes will still read structurally-valid data (but it will be the _wrong_ 
data, causing the wrong output to be read from the shuffle data file).
    2. In the investigation which uncovered this bug, the invalid reads were 
predominantly impacting shuffle index lookups for reading local blocks. In a 
nutshell, there's a subtle race condition where Janino codegen compilation 
triggers attempted remote classloading of classes which don't exist, triggering 
the error-handling / error-propagation paths in `FileDownloadChannel` and 
causing the invalid asynchronous `close()` call to be performed. At the same 
time that this `close()` call was being performed, another task from the same 
stage attempts to read the shuffle index files of local blocks and experiences 
an invalid read due to the falsely-shared file descriptor.
    
       This is a very hard-to-trigger bug: we were only able to reproduce it on 
large clusters with very fast machines and shuffles that contain large numbers 
of map and reduce tasks (more shuffle blocks means more index file reads and 
more chances for the race to occur; faster machines increase the likelihood of 
the race occurring; larger clusters give us more chances for the error to 
occur). In our reproduction, this race occurred on a microsecond timescale 
(measured via kernel syscall tracing) and occurred relatively rarely, requiring 
many iterations until we could trigger a reproduction.
    
    While investigating, I added these checks so that the index read fails-fast 
when this issue occurs, which made it significantly easier to reproduce and 
diagnose the root cause (fixed by the other changes in this patch).
    
    There are a number of interesting details in the story of how we worked 
from the original high-level data corruption symptom to this low-level IO bug. 
I'll see about writing up the complete story in a blog post at some point.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to