GitHub user tigerquoll opened a pull request:

    https://github.com/apache/spark/pull/5368

    [CORE] [SPARK-6593] Provide a HadoopRDD variant that wraps all reads in a 
Try

    @rxin @sryza  
    As per attached jira ticket, I was proposing some way of making hadoop IO 
more robust in response to malformated files that cause an exception in the 
hadoop input libraries.  The previous PR (#5250) simply added an option to 
ignore exceptions and continue processing.  This raised some concerns about not 
giving any indication to the user that an error had occurred (other than a 
warning message in the logs.)
    
    As an alternative to the first PR, and to keep the conversation moving, I'm 
putting forward this PR, which contains HadoopReliableRDD, which wraps all 
Hadoop IO in a Try structure and passes that back to the user so that the user 
is forced to act upon any non-fatal errors that occur when reading from an 
Hadoop file system using this API. 
    
    An example of using this API is given below:
    
    ```
    import org.apache.hadoop.io.{LongWritable, Text}
    import org.apache.hadoop.mapred.TextInputFormat
    import scala.util.{Failure, Success, Try}
    
    // hdfs directory contains test[1.4].txt.gz  - test4.txt.gz is corrupted
    val path = "hdfs:///user/cloudera/*.gz"
    
    val testRdd = sc.hadoopReliableFile(path, classOf[TextInputFormat], 
classOf[LongWritable], classOf[Text],2)
    …
    15/04/05 21:24:57 INFO rdd.HadoopRDD: Input split: 
hdfs://quickstart.cloudera:8020/user/cloudera/test4.txt.gz:0+42544
    15/04/05 21:24:57 INFO rdd.HadoopRDD: Input split: 
hdfs://quickstart.cloudera:8020/user/cloudera/test3.txt.gz:0+15043
    15/04/05 21:24:57 INFO rdd.HadoopRDD: Input split: 
hdfs://quickstart.cloudera:8020/user/cloudera/test1.txt.gz:0+15043
    15/04/05 21:24:57 INFO rdd.HadoopRDD: Input split: 
hdfs://quickstart.cloudera:8020/user/cloudera/test2.txt.gz:0+15043
    …
    15/04/05 21:24:57 WARN rdd.HadoopReliableRDD: Exception on read attempt 
IOException - not a gzip file - stopping any further read attempts from this 
split
    …
    
    testRdd.count 
    …
    res5: Long = 4384
    
    testRdd.filter(x=>x.isSuccess).count
    ...
    res6: Long = 4383
    
    testRdd.map( _ match {
      case Failure(ex) => s"${ex.getClass.getSimpleName}: ${ex.getMessage}"
      case Success(_) => "success"
    }).countByValue()
    ..
    res7: scala.collection.Map[String,Long] = Map(IOException: not a gzip file 
-> 1, success -> 4383)
    
    
    testRdd.filter(_.isSuccess).map(_.get.toString).flatMap(_.split(" 
")).countByValue
    ...
    … and on we go…
    
    ```
    
    Just to emphasise what we are doing here.  We are trapping exceptions that 
previously would have stopped Spark from executing, and continuing to process 
data in other tasks rather than terminating the entire application which is 
what previously happened .  If the exceptions were being raised in a 
non-deterministic way (eg temporary network failure) then the produced data 
could be different if the code is run a second time.  If we didn't handle these 
errors, then the entire application will currently fail in a non-deterministic 
way.  A lot of times the later behavior is the desirable response, but 
sometimes it is not.
    
    If the exceptions were being raised in a deterministic way (eg file was 
corrupt before it was copied into HDFS), then the data will be produced in a 
deterministic way that can be repeated with the same results.  
    
    I believe giving users some way to process data that Spark currently 
crashes on is worth considering, what ever form it finally takes.
    


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/tigerquoll/spark master

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/5368.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #5368
    
----
commit bffc68e64098fe24464dfe0eca0eba15afef85c8
Author: Dale <[email protected]>
Date:   2015-04-04T11:43:03Z

    [SPARK-6593] Refactored HadoopRDD so that the iterator could be cleanly 
wrapped

commit fd75bc1dae5b707bcdd914d4bca852b2594daa60
Author: Dale <[email protected]>
Date:   2015-04-04T11:44:09Z

    [SPARK-6593] Added HadoopReliableRDD, which returns all read actions in a 
wrapped in a Try

commit d2ab7044516ebda33ceeca70bb6aa9c217af42ca
Author: Dale <[email protected]>
Date:   2015-04-05T12:14:41Z

    [SPARK-6593] Fine-tuned the iterator wrapper logic

commit cb88164bce0552617b5dbdfdff0f02fe018d84d5
Author: Dale <[email protected]>
Date:   2015-04-05T12:15:48Z

    [SPARK-6593] Added convenience method hadoopReliableFile() to Scala Context

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to