GitHub user JoshRosen opened a pull request:

    https://github.com/apache/spark/pull/20179

    [SPARK-22982] Remove unsafe asynchronous close() call from 
FileDownloadChannel

    ## What changes were proposed in this pull request?
    
    This patch fixes a severe asynchronous IO bug in Spark's Netty-based file 
transfer code. At a high-level, the problem is that an unsafe asynchronous 
`close()` of a pipe's source channel creates a race condition where file 
transfer code closes a file descriptor then attempts to read from it. If the 
closed file descriptor's number has been reused by an `open()` call then this 
invalid read may cause unrelated file operations to return incorrect results. 
**One manifestation of this problem is incorrect query results.**
    
    For a high-level overview of how file download works, take a look at the 
control flow in `NettyRpcEnv.openChannel()`: this code creates a pipe to buffer 
results, then submits an asynchronous stream request to a lower-level 
TransportClient. The callback passes received data to the sink end of the pipe. 
The source end of the pipe is passed back to the caller of `openChannel()`. 
Thus `openChannel()` returns immediately and callers interact with the returned 
pipe source channel.
    
    Because the underlying stream request is asynchronous, errors may occur 
after `openChannel()` has returned and after that method's caller has started 
to `read()` from the returned channel. For example, if a client requests an 
invalid stream from a remote server then the "stream does not exist" error may 
not be received from the remote server until after `openChannel()` has 
returned. In order to be able to propagate the "stream does not exist" error to 
the file-fetching application thread, this code wraps the pipe's source channel 
in a special `FileDownloadChannel` which adds an `setError(t: Throwable)` 
method, then calls this `setError()` method in the FileDownloadCallback's 
`onFailure` method.
    
    It is possible for `FileDownloadChannel`'s `read()` and `setError()` 
methods to be called concurrently from different threads: the `setError()` 
method is called from within the Netty RPC system's stream callback handlers, 
while the `read()` methods are called from higher-level application code 
performing remote stream reads.
    
    The problem lies in `setError()`: the existing code closed the wrapped pipe 
source channel. Because `read()` and `setError()` occur in different threads, 
this means it is possible for one thread to be calling `source.read()` while 
another asynchronously calls `source.close()`. Java's IO libraries do not 
guarantee that this will be safe and, in fact, it's possible for these 
operations to interleave in such a way that a lower-level `read()` system call 
occurs right after a `close()` call. In the best-case, this fails as a read of 
a closed file descriptor; in the worst-case, the file descriptor number has 
been re-used by an intervening `open()` operation and the read corrupts the 
result of an unrelated file IO operation being performed by a different thread.
    
    The solution here is to remove the `stream.close()` call in `onError()`: 
the thread that is performing the `read()` calls is responsible for closing the 
stream in a `finally` block, so there's no need to close it here. If that 
thread is blocked in a `read()` then it will become unblocked when the sink end 
of the pipe is closed in `FileDownloadCallback.onFailure()`.
    
    After making this change, we also need to refine the `read()` method to 
always check for a `setError()` result, even if the underlying channel `read()` 
call has succeeded.
    
    This patch also makes a slight cleanup to a dodgy-looking `catch e: 
Exception` block to use a safer `try-finally` error handling idiom.
    
    This bug was introduced in SPARK-11956 / #9941 and is present in Spark 
1.6.0+.
    
    ## How was this patch tested?
    
    This fix was tested manually against a workload which non-deterministically 
hit this bug.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/JoshRosen/spark 
SPARK-22982-fix-unsafe-async-io-in-file-download-channel

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/20179.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #20179
    
----
commit 2138aa6d8225fb7c343c4a61dadc8c44e902e35b
Author: Josh Rosen <joshrosen@...>
Date:   2018-01-07T23:21:33Z

    Add position checks to IndexShuffleBlockResolver.

commit 8e5ffa451f66dc64203dede68b9c0ac5fdc952cf
Author: Josh Rosen <joshrosen@...>
Date:   2018-01-07T23:22:44Z

    Remove unsafe asynchronous close() call from FileDownloadChannel

----


---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to