GitHub user JoshRosen opened a pull request:
https://github.com/apache/spark/pull/20179
[SPARK-22982] Remove unsafe asynchronous close() call from
FileDownloadChannel
## What changes were proposed in this pull request?
This patch fixes a severe asynchronous IO bug in Spark's Netty-based file
transfer code. At a high-level, the problem is that an unsafe asynchronous
`close()` of a pipe's source channel creates a race condition where file
transfer code closes a file descriptor then attempts to read from it. If the
closed file descriptor's number has been reused by an `open()` call then this
invalid read may cause unrelated file operations to return incorrect results.
**One manifestation of this problem is incorrect query results.**
For a high-level overview of how file download works, take a look at the
control flow in `NettyRpcEnv.openChannel()`: this code creates a pipe to buffer
results, then submits an asynchronous stream request to a lower-level
TransportClient. The callback passes received data to the sink end of the pipe.
The source end of the pipe is passed back to the caller of `openChannel()`.
Thus `openChannel()` returns immediately and callers interact with the returned
pipe source channel.
Because the underlying stream request is asynchronous, errors may occur
after `openChannel()` has returned and after that method's caller has started
to `read()` from the returned channel. For example, if a client requests an
invalid stream from a remote server then the "stream does not exist" error may
not be received from the remote server until after `openChannel()` has
returned. In order to be able to propagate the "stream does not exist" error to
the file-fetching application thread, this code wraps the pipe's source channel
in a special `FileDownloadChannel` which adds an `setError(t: Throwable)`
method, then calls this `setError()` method in the FileDownloadCallback's
`onFailure` method.
It is possible for `FileDownloadChannel`'s `read()` and `setError()`
methods to be called concurrently from different threads: the `setError()`
method is called from within the Netty RPC system's stream callback handlers,
while the `read()` methods are called from higher-level application code
performing remote stream reads.
The problem lies in `setError()`: the existing code closed the wrapped pipe
source channel. Because `read()` and `setError()` occur in different threads,
this means it is possible for one thread to be calling `source.read()` while
another asynchronously calls `source.close()`. Java's IO libraries do not
guarantee that this will be safe and, in fact, it's possible for these
operations to interleave in such a way that a lower-level `read()` system call
occurs right after a `close()` call. In the best-case, this fails as a read of
a closed file descriptor; in the worst-case, the file descriptor number has
been re-used by an intervening `open()` operation and the read corrupts the
result of an unrelated file IO operation being performed by a different thread.
The solution here is to remove the `stream.close()` call in `onError()`:
the thread that is performing the `read()` calls is responsible for closing the
stream in a `finally` block, so there's no need to close it here. If that
thread is blocked in a `read()` then it will become unblocked when the sink end
of the pipe is closed in `FileDownloadCallback.onFailure()`.
After making this change, we also need to refine the `read()` method to
always check for a `setError()` result, even if the underlying channel `read()`
call has succeeded.
This patch also makes a slight cleanup to a dodgy-looking `catch e:
Exception` block to use a safer `try-finally` error handling idiom.
This bug was introduced in SPARK-11956 / #9941 and is present in Spark
1.6.0+.
## How was this patch tested?
This fix was tested manually against a workload which non-deterministically
hit this bug.
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/JoshRosen/spark
SPARK-22982-fix-unsafe-async-io-in-file-download-channel
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/spark/pull/20179.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #20179
----
commit 2138aa6d8225fb7c343c4a61dadc8c44e902e35b
Author: Josh Rosen <joshrosen@...>
Date: 2018-01-07T23:21:33Z
Add position checks to IndexShuffleBlockResolver.
commit 8e5ffa451f66dc64203dede68b9c0ac5fdc952cf
Author: Josh Rosen <joshrosen@...>
Date: 2018-01-07T23:22:44Z
Remove unsafe asynchronous close() call from FileDownloadChannel
----
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]