GitHub user tigerquoll opened a pull request:
https://github.com/apache/spark/pull/5368
[CORE] [SPARK-6593] Provide a HadoopRDD variant that wraps all reads in a
Try
@rxin @sryza
As per attached jira ticket, I was proposing some way of making hadoop IO
more robust in response to malformated files that cause an exception in the
hadoop input libraries. The previous PR (#5250) simply added an option to
ignore exceptions and continue processing. This raised some concerns about not
giving any indication to the user that an error had occurred (other than a
warning message in the logs.)
As an alternative to the first PR, and to keep the conversation moving, I'm
putting forward this PR, which contains HadoopReliableRDD, which wraps all
Hadoop IO in a Try structure and passes that back to the user so that the user
is forced to act upon any non-fatal errors that occur when reading from an
Hadoop file system using this API.
An example of using this API is given below:
```
import org.apache.hadoop.io.{LongWritable, Text}
import org.apache.hadoop.mapred.TextInputFormat
import scala.util.{Failure, Success, Try}
// hdfs directory contains test[1.4].txt.gz - test4.txt.gz is corrupted
val path = "hdfs:///user/cloudera/*.gz"
val testRdd = sc.hadoopReliableFile(path, classOf[TextInputFormat],
classOf[LongWritable], classOf[Text],2)
â¦
15/04/05 21:24:57 INFO rdd.HadoopRDD: Input split:
hdfs://quickstart.cloudera:8020/user/cloudera/test4.txt.gz:0+42544
15/04/05 21:24:57 INFO rdd.HadoopRDD: Input split:
hdfs://quickstart.cloudera:8020/user/cloudera/test3.txt.gz:0+15043
15/04/05 21:24:57 INFO rdd.HadoopRDD: Input split:
hdfs://quickstart.cloudera:8020/user/cloudera/test1.txt.gz:0+15043
15/04/05 21:24:57 INFO rdd.HadoopRDD: Input split:
hdfs://quickstart.cloudera:8020/user/cloudera/test2.txt.gz:0+15043
â¦
15/04/05 21:24:57 WARN rdd.HadoopReliableRDD: Exception on read attempt
IOException - not a gzip file - stopping any further read attempts from this
split
â¦
testRdd.count
â¦
res5: Long = 4384
testRdd.filter(x=>x.isSuccess).count
...
res6: Long = 4383
testRdd.map( _ match {
case Failure(ex) => s"${ex.getClass.getSimpleName}: ${ex.getMessage}"
case Success(_) => "success"
}).countByValue()
..
res7: scala.collection.Map[String,Long] = Map(IOException: not a gzip file
-> 1, success -> 4383)
testRdd.filter(_.isSuccess).map(_.get.toString).flatMap(_.split("
")).countByValue
...
⦠and on we goâ¦
```
Just to emphasise what we are doing here. We are trapping exceptions that
previously would have stopped Spark from executing, and continuing to process
data in other tasks rather than terminating the entire application which is
what previously happened . If the exceptions were being raised in a
non-deterministic way (eg temporary network failure) then the produced data
could be different if the code is run a second time. If we didn't handle these
errors, then the entire application will currently fail in a non-deterministic
way. A lot of times the later behavior is the desirable response, but
sometimes it is not.
If the exceptions were being raised in a deterministic way (eg file was
corrupt before it was copied into HDFS), then the data will be produced in a
deterministic way that can be repeated with the same results.
I believe giving users some way to process data that Spark currently
crashes on is worth considering, what ever form it finally takes.
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/tigerquoll/spark master
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/spark/pull/5368.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #5368
----
commit bffc68e64098fe24464dfe0eca0eba15afef85c8
Author: Dale <[email protected]>
Date: 2015-04-04T11:43:03Z
[SPARK-6593] Refactored HadoopRDD so that the iterator could be cleanly
wrapped
commit fd75bc1dae5b707bcdd914d4bca852b2594daa60
Author: Dale <[email protected]>
Date: 2015-04-04T11:44:09Z
[SPARK-6593] Added HadoopReliableRDD, which returns all read actions in a
wrapped in a Try
commit d2ab7044516ebda33ceeca70bb6aa9c217af42ca
Author: Dale <[email protected]>
Date: 2015-04-05T12:14:41Z
[SPARK-6593] Fine-tuned the iterator wrapper logic
commit cb88164bce0552617b5dbdfdff0f02fe018d84d5
Author: Dale <[email protected]>
Date: 2015-04-05T12:15:48Z
[SPARK-6593] Added convenience method hadoopReliableFile() to Scala Context
----
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]