GitHub user JoshRosen opened a pull request:

    https://github.com/apache/spark/pull/9382

    [SPARK-11424] Guard against double-close() of RecordReaders

    **TL;DR**: We can rule out one rare but potential cause of input stream 
corruption via defensive programming.
    
    ## Background
    
    [MAPREDUCE-5918](https://issues.apache.org/jira/browse/MAPREDUCE-5918) is a 
bug where an instance of a decompressor ends up getting placed into a pool 
multiple times. Since the pool is backed by a list instead of a set, this can 
lead to the same decompressor being used in different places at the same time, 
which is not safe because those decompressors will overwrite each other's 
buffers. Sometimes this buffer sharing will lead to exceptions but other times 
it will might silently result in invalid / garbled input.
    
    That Hadoop bug is fixed in Hadoop 2.7 but is still present in many Hadoop 
versions that we wish to support. As a result, I think that we should try to 
work around this issue in Spark via defensive programming to prevent 
RecordReaders from being closed multiple times.
    
    So far, I've had a hard time coming up with explanations of exactly how 
double-`close()`s occur in practice, but I do have a couple of explanations 
that work on paper.
    
    For instance, it looks like https://github.com/apache/spark/pull/7424, 
added in 1.5, introduces at least one extremely~rare corner-case path where 
Spark could double-close() a LineRecordReader instance in a way that triggers 
the bug. Here are the steps involved in the bad execution that I brainstormed 
up:
    
    * [The task has finished reading input, so we call 
close()](https://github.com/apache/spark/blob/v1.5.1/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala#L168).
    * [While handling the close call and trying to close the reader, 
reader.close() throws an exception]( 
https://github.com/apache/spark/blob/v1.5.1/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala#L190)
    * We don't set `reader = null` after handling this exception, so the 
[TaskCompletionListener also ends up calling 
NewHadoopRDD.close()](https://github.com/apache/spark/blob/v1.5.1/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala#L156),
 which, in turn, closes the record reader again.
    
    In this hypothetical situation, `LineRecordReader.close()` could [fail with 
an exception if its InputStream failed to 
close](https://github.com/apache/hadoop/blob/release-1.2.1/src/mapred/org/apache/hadoop/mapred/LineRecordReader.java#L212).
    I googled for "Exception in RecordReader.close()" and it looks like it's 
possible for a closed Hadoop FileSystem to trigger an error there: 
[SPARK-757](https://issues.apache.org/jira/browse/SPARK-757), 
[SPARK-2491](https://issues.apache.org/jira/browse/SPARK-2491)
    
    Looking at [SPARK-3052](https://issues.apache.org/jira/browse/SPARK-3052), 
it seems like it's possible to get spurious exceptions there when there is an 
error reading from Hadoop. If the Hadoop FileSystem were to get into an error 
state _right_ after reading the last record then it looks like we could hit the 
bug here in 1.5.
    
    ## The fix
    
    This patch guards against these issues by modifying `HadoopRDD.close()` and 
`NewHadoopRDD.close()` so that they set `reader = null` even if an exception 
occurs in the `reader.close()` call. In addition, I modified `NextIterator. 
closeIfNeeded()` to guard against double-close if the first `close()` call 
throws an exception.
    
    I don't have an easy way to test this, since I haven't been able to 
reproduce the bug that prompted this patch, but these changes seem safe and 
seem to rule out the on-paper reproductions that I was able to brainstorm up.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/JoshRosen/spark 
hadoop-decompressor-pooling-fix

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/9382.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #9382
    
----
commit 087aa63f623c5fd23cdfa7c897eaaadd8b163aac
Author: Josh Rosen <[email protected]>
Date:   2015-10-30T18:25:54Z

    Guard against double-close() of RecordReaders.

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to