Github user JoshRosen commented on a diff in the pull request:
https://github.com/apache/spark/pull/20179#discussion_r160489460
--- Diff:
core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala ---
@@ -196,11 +196,24 @@ private[spark] class IndexShuffleBlockResolver(
// find out the consolidated file, then the offset within that from
our index
val indexFile = getIndexFile(blockId.shuffleId, blockId.mapId)
- val in = new DataInputStream(new FileInputStream(indexFile))
+ // SPARK-22982: if this FileInputStream's position is seeked forward
by another piece of code
+ // which is incorrectly using our file descriptor then this code will
fetch the wrong offsets
+ // (which may cause a reducer to be sent a different reducer's data).
The explicit position
+ // checks added here were a useful debugging aid during SPARK-22982
and may help prevent this
+ // class of issue from re-occurring in the future which is why they
are left here even though
+ // SPARK-22982 is fixed.
+ val channel = Files.newByteChannel(indexFile.toPath)
+ channel.position(blockId.reduceId * 8)
--- End diff --
For some more background: the asynchronous `close()` bug can cause reads
from a closed-and-subsequently-reassigned file descriptor number and in
principle this can affect almost any IO operation _anywhere_ in the
application. For example, if the closed file descriptor number is immediately
recycled by opening a socket then the invalid read can cause that socket read
to miss data (since the data would have been consumed by the invalid reader and
won't be delivered to the legitimate new user of the file descriptor).
Given this, I see how it might be puzzling that this patch is adding a
check only here. There are two reasons for this:
1. Many other IO operations have implicit checksumming such that dropping
data due to an invalid read be detected and cause an exception. For example,
many compression codecs have block-level checksumming (and magic numbers at the
beginning of the stream), so dropping data (especially at the start of a read)
will be detected. This particular shuffle index file, however, does _not_ have
mechanisms to detect corruption: skipping forward in the read by a multiple of
8 bytes will still read structurally-valid data (but it will be the _wrong_
data, causing the wrong output to be read from the shuffle data file).
2. In the investigation which uncovered this bug, the invalid reads were
predominantly impacting shuffle index lookups for reading local blocks. In a
nutshell, there's a subtle race condition where Janino codegen compilation
triggers attempted remote classloading of classes which don't exist, triggering
the error-handling / error-propagation paths in `FileDownloadChannel` and
causing the invalid asynchronous `close()` call to be performed. At the same
time that this `close()` call was being performed, another task from the same
stage attempts to read the shuffle index files of local blocks and experiences
an invalid read due to the falsely-shared file descriptor.
This is a very hard-to-trigger bug: we were only able to reproduce it on
large clusters with very fast machines and shuffles that contain large numbers
of map and reduce tasks (more shuffle blocks means more index file reads and
more chances for the race to occur; faster machines increase the likelihood of
the race occurring; larger clusters give us more chances for the error to
occur). In our reproduction, this race occurred on a microsecond timescale
(measured via kernel syscall tracing) and occurred relatively rarely, requiring
many iterations until we could trigger a reproduction.
While investigating, I added these checks so that the index read fails-fast
when this issue occurs, which made it significantly easier to reproduce and
diagnose the root cause (fixed by the other changes in this patch).
There are a number of interesting details in the story of how we worked
from the original high-level data corruption symptom to this low-level IO bug.
I'll see about writing up the complete story in a blog post at some point.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]