Github user JoshRosen commented on a diff in the pull request: https://github.com/apache/spark/pull/20179#discussion_r160489460 --- Diff: core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala --- @@ -196,11 +196,24 @@ private[spark] class IndexShuffleBlockResolver( // find out the consolidated file, then the offset within that from our index val indexFile = getIndexFile(blockId.shuffleId, blockId.mapId) - val in = new DataInputStream(new FileInputStream(indexFile)) + // SPARK-22982: if this FileInputStream's position is seeked forward by another piece of code + // which is incorrectly using our file descriptor then this code will fetch the wrong offsets + // (which may cause a reducer to be sent a different reducer's data). The explicit position + // checks added here were a useful debugging aid during SPARK-22982 and may help prevent this + // class of issue from re-occurring in the future which is why they are left here even though + // SPARK-22982 is fixed. + val channel = Files.newByteChannel(indexFile.toPath) + channel.position(blockId.reduceId * 8) --- End diff -- For some more background: the asynchronous `close()` bug can cause reads from a closed-and-subsequently-reassigned file descriptor number and in principle this can affect almost any IO operation _anywhere_ in the application. For example, if the closed file descriptor number is immediately recycled by opening a socket then the invalid read can cause that socket read to miss data (since the data would have been consumed by the invalid reader and won't be delivered to the legitimate new user of the file descriptor). Given this, I see how it might be puzzling that this patch is adding a check only here. There are two reasons for this: 1. Many other IO operations have implicit checksumming such that dropping data due to an invalid read be detected and cause an exception. For example, many compression codecs have block-level checksumming (and magic numbers at the beginning of the stream), so dropping data (especially at the start of a read) will be detected. This particular shuffle index file, however, does _not_ have mechanisms to detect corruption: skipping forward in the read by a multiple of 8 bytes will still read structurally-valid data (but it will be the _wrong_ data, causing the wrong output to be read from the shuffle data file). 2. In the investigation which uncovered this bug, the invalid reads were predominantly impacting shuffle index lookups for reading local blocks. In a nutshell, there's a subtle race condition where Janino codegen compilation triggers attempted remote classloading of classes which don't exist, triggering the error-handling / error-propagation paths in `FileDownloadChannel` and causing the invalid asynchronous `close()` call to be performed. At the same time that this `close()` call was being performed, another task from the same stage attempts to read the shuffle index files of local blocks and experiences an invalid read due to the falsely-shared file descriptor. This is a very hard-to-trigger bug: we were only able to reproduce it on large clusters with very fast machines and shuffles that contain large numbers of map and reduce tasks (more shuffle blocks means more index file reads and more chances for the race to occur; faster machines increase the likelihood of the race occurring; larger clusters give us more chances for the error to occur). In our reproduction, this race occurred on a microsecond timescale (measured via kernel syscall tracing) and occurred relatively rarely, requiring many iterations until we could trigger a reproduction. While investigating, I added these checks so that the index read fails-fast when this issue occurs, which made it significantly easier to reproduce and diagnose the root cause (fixed by the other changes in this patch). There are a number of interesting details in the story of how we worked from the original high-level data corruption symptom to this low-level IO bug. I'll see about writing up the complete story in a blog post at some point.
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org